diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java index b088f6f27c..10b4174578 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java @@ -49,4 +49,20 @@ public interface OptimizingProcess { void commit(); MetricsSummary getSummary(); + + /** + * Whether all tasks in this process have reached {@code SUCCESS} status and are ready to be + * committed. + * + *

This is primarily used by {@link + * org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor} to detect a stuck {@code + * RUNNING} process whose tasks all succeeded but never transitioned to {@link + * OptimizingStatus#COMMITTING} (see issue #4172: a transient failure of {@code + * beginCommitting()}, such as a DB lock wait timeout, otherwise leaves the table permanently + * stuck in {@code *_OPTIMIZING} until AMS is restarted). + * + * @return {@code true} if the process has at least one task and all of them are in {@code + * SUCCESS} status + */ + boolean allTasksPrepared(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 52eb46b4cc..eac983a620 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -764,15 +764,27 @@ private Map> getTaskMap() { } /** - * if all tasks are Prepared + * Whether all tasks in this process are in {@code SUCCESS} status. * - * @return true if tasks is not empty and all Prepared + *

Lock-protected so that callers from other threads (e.g. {@link + * org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor}) observe a consistent + * view of {@code taskMap}. See issue #4172. + * + * @return {@code true} if there is at least one task and every task is in {@code SUCCESS} + * status */ - private boolean allTasksPrepared() { - if (!taskMap.isEmpty()) { - return taskMap.values().stream().allMatch(t -> t.getStatus() == TaskRuntime.Status.SUCCESS); + @Override + public boolean allTasksPrepared() { + lock.lock(); + try { + if (!taskMap.isEmpty()) { + return taskMap.values().stream() + .allMatch(t -> t.getStatus() == TaskRuntime.Status.SUCCESS); + } + return false; + } finally { + lock.unlock(); } - return false; } /** diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java index ae9a1ea1d9..7dc7f93dc7 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java @@ -181,11 +181,58 @@ public void execute(TableRuntime tableRuntime) { long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime); defaultTableRuntime.setLatestRefreshInterval(newInterval); } + + // issue #4172: a RUNNING process whose tasks all succeeded may remain stuck in + // *_OPTIMIZING when a previous beginCommitting() failed transiently (e.g. DB lock + // wait timeout). Detect it here and re-drive the transition to COMMITTING so that + // OptimizingCommitExecutor can take over normally. + tryHealStuckCommitting(defaultTableRuntime); } catch (Throwable throwable) { logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable); } } + /** + * Detects and heals a {@code RUNNING} optimizing process whose tasks have all succeeded but the + * table's status never transitioned to {@link OptimizingStatus#COMMITTING} (e.g. because the + * previous {@code beginCommitting()} DB update failed transiently). Without this self-heal, the + * table stays in {@code *_OPTIMIZING} forever until AMS is restarted - see issue #4172. + * + *

The recovery path in {@code OptimizingQueue#initTableRuntime} already handles this case on + * AMS restart; this method handles it while AMS is still running, by re-invoking {@link + * DefaultTableRuntime#beginCommitting()} from the periodic refresh loop. On success, the status + * transition will fire {@code handleTableChanged} and the normal {@code OptimizingCommitExecutor} + * pipeline will resume. On failure (e.g. the DB is still unhealthy), this method logs and + * returns; the next refresh cycle will try again. + */ + private void tryHealStuckCommitting(DefaultTableRuntime tableRuntime) { + OptimizingProcess process = tableRuntime.getOptimizingProcess(); + if (process == null + || process.getStatus() != ProcessStatus.RUNNING + || tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING + || !tableRuntime.getOptimizingStatus().isProcessing()) { + return; + } + if (!process.allTasksPrepared()) { + return; + } + logger.warn( + "{} detected stuck RUNNING optimizing process (processId={}, status={}): all tasks have " + + "succeeded but the table never transitioned to COMMITTING. Self-healing by " + + "re-driving beginCommitting() (issue #4172).", + tableRuntime.getTableIdentifier(), + process.getProcessId(), + tableRuntime.getOptimizingStatus()); + try { + tableRuntime.beginCommitting(); + } catch (Exception e) { + logger.warn( + "{} self-heal beginCommitting() failed, will retry on the next refresh cycle.", + tableRuntime.getTableIdentifier(), + e); + } + } + /** * Calculate adaptive execution interval based on table optimization status. * diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 417150b390..edeb499560 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -456,6 +456,58 @@ public void testReloadAllTasksCompletedNotYetCommitting() { getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus()); } + /** + * Regression test for issue #4172: a {@code RUNNING} optimizing process whose tasks have all + * succeeded must not stay stuck in {@code *_OPTIMIZING} forever when a previous {@code + * beginCommitting()} transition failed transiently and AMS keeps running (i.e. no restart). + * {@link TableRuntimeRefreshExecutor#execute} should detect the stuck state on the next refresh + * cycle and re-drive the transition to {@code COMMITTING}. + * + *

This complements {@link #testReloadAllTasksCompletedNotYetCommitting}, which only exercises + * the restart-recovery path in {@code OptimizingQueue#initTableRuntime}. + */ + @Test + public void testRefreshExecutorHealsStuckRunningProcess() { + // 1. Poll / ack / complete - the table should be in COMMITTING with a RUNNING process. + OptimizingTask task = optimizingService().pollTask(token, THREAD_ID); + Assertions.assertNotNull(task); + optimizingService().ackTask(token, THREAD_ID, task.getTaskId()); + optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId())); + + DefaultTableRuntime runtime = getDefaultTableRuntime(serverTableIdentifier().getId()); + Assertions.assertEquals(OptimizingStatus.COMMITTING, runtime.getOptimizingStatus()); + Assertions.assertNotNull(runtime.getOptimizingProcess()); + Assertions.assertEquals(ProcessStatus.RUNNING, runtime.getOptimizingProcess().getStatus()); + Assertions.assertTrue( + runtime.getOptimizingProcess().allTasksPrepared(), + "All tasks should be prepared after successful completeTask()"); + + // 2. Simulate the stuck state described in issue #4172: pretend beginCommitting() never + // transitioned the status out of *_OPTIMIZING. Mutate the in-memory status directly via the + // state store (no reload, so we model "AMS is still running but beginCommitting() silently + // failed / was rolled back"). + runtime + .store() + .begin() + .updateStatusCode(code -> OptimizingStatus.MINOR_OPTIMIZING.getCode()) + .commit(); + Assertions.assertEquals(OptimizingStatus.MINOR_OPTIMIZING, runtime.getOptimizingStatus()); + + // 3. Trigger a refresh cycle — the executor should detect the stuck state and self-heal + // by re-invoking beginCommitting(). + TableRuntimeRefresher refresher = new TableRuntimeRefresher(); + try { + refresher.refreshPending(); + } finally { + refresher.dispose(); + } + + Assertions.assertEquals( + OptimizingStatus.COMMITTING, + getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus(), + "TableRuntimeRefreshExecutor must re-drive beginCommitting() for a stuck RUNNING process"); + } + @Test public void testReloadPlanningWithOrphanedProcess() { // 1. Poll and ack a task - table is now in optimizing state with an active process