diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java index b088f6f27c..10b4174578 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java @@ -49,4 +49,20 @@ public interface OptimizingProcess { void commit(); MetricsSummary getSummary(); + + /** + * Whether all tasks in this process have reached {@code SUCCESS} status and are ready to be + * committed. + * + *
This is primarily used by {@link
+ * org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor} to detect a stuck {@code
+ * RUNNING} process whose tasks all succeeded but never transitioned to {@link
+ * OptimizingStatus#COMMITTING} (see issue #4172: a transient failure of {@code
+ * beginCommitting()}, such as a DB lock wait timeout, otherwise leaves the table permanently
+ * stuck in {@code *_OPTIMIZING} until AMS is restarted).
+ *
+ * @return {@code true} if the process has at least one task and all of them are in {@code
+ * SUCCESS} status
+ */
+ boolean allTasksPrepared();
}
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 52eb46b4cc..eac983a620 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -764,15 +764,27 @@ private Map Lock-protected so that callers from other threads (e.g. {@link
+ * org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor}) observe a consistent
+ * view of {@code taskMap}. See issue #4172.
+ *
+ * @return {@code true} if there is at least one task and every task is in {@code SUCCESS}
+ * status
*/
- private boolean allTasksPrepared() {
- if (!taskMap.isEmpty()) {
- return taskMap.values().stream().allMatch(t -> t.getStatus() == TaskRuntime.Status.SUCCESS);
+ @Override
+ public boolean allTasksPrepared() {
+ lock.lock();
+ try {
+ if (!taskMap.isEmpty()) {
+ return taskMap.values().stream()
+ .allMatch(t -> t.getStatus() == TaskRuntime.Status.SUCCESS);
+ }
+ return false;
+ } finally {
+ lock.unlock();
}
- return false;
}
/**
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
index ae9a1ea1d9..7dc7f93dc7 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
@@ -181,11 +181,58 @@ public void execute(TableRuntime tableRuntime) {
long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime);
defaultTableRuntime.setLatestRefreshInterval(newInterval);
}
+
+ // issue #4172: a RUNNING process whose tasks all succeeded may remain stuck in
+ // *_OPTIMIZING when a previous beginCommitting() failed transiently (e.g. DB lock
+ // wait timeout). Detect it here and re-drive the transition to COMMITTING so that
+ // OptimizingCommitExecutor can take over normally.
+ tryHealStuckCommitting(defaultTableRuntime);
} catch (Throwable throwable) {
logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable);
}
}
+ /**
+ * Detects and heals a {@code RUNNING} optimizing process whose tasks have all succeeded but the
+ * table's status never transitioned to {@link OptimizingStatus#COMMITTING} (e.g. because the
+ * previous {@code beginCommitting()} DB update failed transiently). Without this self-heal, the
+ * table stays in {@code *_OPTIMIZING} forever until AMS is restarted - see issue #4172.
+ *
+ * The recovery path in {@code OptimizingQueue#initTableRuntime} already handles this case on
+ * AMS restart; this method handles it while AMS is still running, by re-invoking {@link
+ * DefaultTableRuntime#beginCommitting()} from the periodic refresh loop. On success, the status
+ * transition will fire {@code handleTableChanged} and the normal {@code OptimizingCommitExecutor}
+ * pipeline will resume. On failure (e.g. the DB is still unhealthy), this method logs and
+ * returns; the next refresh cycle will try again.
+ */
+ private void tryHealStuckCommitting(DefaultTableRuntime tableRuntime) {
+ OptimizingProcess process = tableRuntime.getOptimizingProcess();
+ if (process == null
+ || process.getStatus() != ProcessStatus.RUNNING
+ || tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING
+ || !tableRuntime.getOptimizingStatus().isProcessing()) {
+ return;
+ }
+ if (!process.allTasksPrepared()) {
+ return;
+ }
+ logger.warn(
+ "{} detected stuck RUNNING optimizing process (processId={}, status={}): all tasks have "
+ + "succeeded but the table never transitioned to COMMITTING. Self-healing by "
+ + "re-driving beginCommitting() (issue #4172).",
+ tableRuntime.getTableIdentifier(),
+ process.getProcessId(),
+ tableRuntime.getOptimizingStatus());
+ try {
+ tableRuntime.beginCommitting();
+ } catch (Exception e) {
+ logger.warn(
+ "{} self-heal beginCommitting() failed, will retry on the next refresh cycle.",
+ tableRuntime.getTableIdentifier(),
+ e);
+ }
+ }
+
/**
* Calculate adaptive execution interval based on table optimization status.
*
diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 417150b390..edeb499560 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -456,6 +456,58 @@ public void testReloadAllTasksCompletedNotYetCommitting() {
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
}
+ /**
+ * Regression test for issue #4172: a {@code RUNNING} optimizing process whose tasks have all
+ * succeeded must not stay stuck in {@code *_OPTIMIZING} forever when a previous {@code
+ * beginCommitting()} transition failed transiently and AMS keeps running (i.e. no restart).
+ * {@link TableRuntimeRefreshExecutor#execute} should detect the stuck state on the next refresh
+ * cycle and re-drive the transition to {@code COMMITTING}.
+ *
+ * This complements {@link #testReloadAllTasksCompletedNotYetCommitting}, which only exercises
+ * the restart-recovery path in {@code OptimizingQueue#initTableRuntime}.
+ */
+ @Test
+ public void testRefreshExecutorHealsStuckRunningProcess() {
+ // 1. Poll / ack / complete - the table should be in COMMITTING with a RUNNING process.
+ OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+ Assertions.assertNotNull(task);
+ optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+ optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId()));
+
+ DefaultTableRuntime runtime = getDefaultTableRuntime(serverTableIdentifier().getId());
+ Assertions.assertEquals(OptimizingStatus.COMMITTING, runtime.getOptimizingStatus());
+ Assertions.assertNotNull(runtime.getOptimizingProcess());
+ Assertions.assertEquals(ProcessStatus.RUNNING, runtime.getOptimizingProcess().getStatus());
+ Assertions.assertTrue(
+ runtime.getOptimizingProcess().allTasksPrepared(),
+ "All tasks should be prepared after successful completeTask()");
+
+ // 2. Simulate the stuck state described in issue #4172: pretend beginCommitting() never
+ // transitioned the status out of *_OPTIMIZING. Mutate the in-memory status directly via the
+ // state store (no reload, so we model "AMS is still running but beginCommitting() silently
+ // failed / was rolled back").
+ runtime
+ .store()
+ .begin()
+ .updateStatusCode(code -> OptimizingStatus.MINOR_OPTIMIZING.getCode())
+ .commit();
+ Assertions.assertEquals(OptimizingStatus.MINOR_OPTIMIZING, runtime.getOptimizingStatus());
+
+ // 3. Trigger a refresh cycle — the executor should detect the stuck state and self-heal
+ // by re-invoking beginCommitting().
+ TableRuntimeRefresher refresher = new TableRuntimeRefresher();
+ try {
+ refresher.refreshPending();
+ } finally {
+ refresher.dispose();
+ }
+
+ Assertions.assertEquals(
+ OptimizingStatus.COMMITTING,
+ getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus(),
+ "TableRuntimeRefreshExecutor must re-drive beginCommitting() for a stuck RUNNING process");
+ }
+
@Test
public void testReloadPlanningWithOrphanedProcess() {
// 1. Poll and ack a task - table is now in optimizing state with an active process