Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,20 @@ public interface OptimizingProcess {
void commit();

MetricsSummary getSummary();

/**
* Whether all tasks in this process have reached {@code SUCCESS} status and are ready to be
* committed.
*
* <p>This is primarily used by {@link
* org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor} to detect a stuck {@code
* RUNNING} process whose tasks all succeeded but never transitioned to {@link
* OptimizingStatus#COMMITTING} (see issue #4172: a transient failure of {@code
* beginCommitting()}, such as a DB lock wait timeout, otherwise leaves the table permanently
* stuck in {@code *_OPTIMIZING} until AMS is restarted).
*
* @return {@code true} if the process has at least one task and all of them are in {@code
* SUCCESS} status
*/
boolean allTasksPrepared();
}
Original file line number Diff line number Diff line change
Expand Up @@ -764,15 +764,27 @@ private Map<OptimizingTaskId, TaskRuntime<RewriteStageTask>> getTaskMap() {
}

/**
* if all tasks are Prepared
* Whether all tasks in this process are in {@code SUCCESS} status.
*
* @return true if tasks is not empty and all Prepared
* <p>Lock-protected so that callers from other threads (e.g. {@link
* org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor}) observe a consistent
* view of {@code taskMap}. See issue #4172.
*
* @return {@code true} if there is at least one task and every task is in {@code SUCCESS}
* status
*/
private boolean allTasksPrepared() {
if (!taskMap.isEmpty()) {
return taskMap.values().stream().allMatch(t -> t.getStatus() == TaskRuntime.Status.SUCCESS);
@Override
public boolean allTasksPrepared() {
lock.lock();
try {
if (!taskMap.isEmpty()) {
return taskMap.values().stream()
.allMatch(t -> t.getStatus() == TaskRuntime.Status.SUCCESS);
}
return false;
} finally {
lock.unlock();
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,58 @@ public void execute(TableRuntime tableRuntime) {
long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime);
defaultTableRuntime.setLatestRefreshInterval(newInterval);
}

// issue #4172: a RUNNING process whose tasks all succeeded may remain stuck in
// *_OPTIMIZING when a previous beginCommitting() failed transiently (e.g. DB lock
// wait timeout). Detect it here and re-drive the transition to COMMITTING so that
// OptimizingCommitExecutor can take over normally.
tryHealStuckCommitting(defaultTableRuntime);
} catch (Throwable throwable) {
logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable);
}
}

/**
* Detects and heals a {@code RUNNING} optimizing process whose tasks have all succeeded but the
* table's status never transitioned to {@link OptimizingStatus#COMMITTING} (e.g. because the
* previous {@code beginCommitting()} DB update failed transiently). Without this self-heal, the
* table stays in {@code *_OPTIMIZING} forever until AMS is restarted - see issue #4172.
*
* <p>The recovery path in {@code OptimizingQueue#initTableRuntime} already handles this case on
* AMS restart; this method handles it while AMS is still running, by re-invoking {@link
* DefaultTableRuntime#beginCommitting()} from the periodic refresh loop. On success, the status
* transition will fire {@code handleTableChanged} and the normal {@code OptimizingCommitExecutor}
* pipeline will resume. On failure (e.g. the DB is still unhealthy), this method logs and
* returns; the next refresh cycle will try again.
*/
private void tryHealStuckCommitting(DefaultTableRuntime tableRuntime) {
OptimizingProcess process = tableRuntime.getOptimizingProcess();
if (process == null
|| process.getStatus() != ProcessStatus.RUNNING
|| tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING
|| !tableRuntime.getOptimizingStatus().isProcessing()) {
return;
}
if (!process.allTasksPrepared()) {
return;
}
logger.warn(
"{} detected stuck RUNNING optimizing process (processId={}, status={}): all tasks have "
+ "succeeded but the table never transitioned to COMMITTING. Self-healing by "
+ "re-driving beginCommitting() (issue #4172).",
tableRuntime.getTableIdentifier(),
process.getProcessId(),
tableRuntime.getOptimizingStatus());
try {
tableRuntime.beginCommitting();
} catch (Exception e) {
logger.warn(
"{} self-heal beginCommitting() failed, will retry on the next refresh cycle.",
tableRuntime.getTableIdentifier(),
e);
}
}

/**
* Calculate adaptive execution interval based on table optimization status.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,58 @@ public void testReloadAllTasksCompletedNotYetCommitting() {
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
}

/**
* Regression test for issue #4172: a {@code RUNNING} optimizing process whose tasks have all
* succeeded must not stay stuck in {@code *_OPTIMIZING} forever when a previous {@code
* beginCommitting()} transition failed transiently and AMS keeps running (i.e. no restart).
* {@link TableRuntimeRefreshExecutor#execute} should detect the stuck state on the next refresh
* cycle and re-drive the transition to {@code COMMITTING}.
*
* <p>This complements {@link #testReloadAllTasksCompletedNotYetCommitting}, which only exercises
* the restart-recovery path in {@code OptimizingQueue#initTableRuntime}.
*/
@Test
public void testRefreshExecutorHealsStuckRunningProcess() {
// 1. Poll / ack / complete - the table should be in COMMITTING with a RUNNING process.
OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
Assertions.assertNotNull(task);
optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId()));

DefaultTableRuntime runtime = getDefaultTableRuntime(serverTableIdentifier().getId());
Assertions.assertEquals(OptimizingStatus.COMMITTING, runtime.getOptimizingStatus());
Assertions.assertNotNull(runtime.getOptimizingProcess());
Assertions.assertEquals(ProcessStatus.RUNNING, runtime.getOptimizingProcess().getStatus());
Assertions.assertTrue(
runtime.getOptimizingProcess().allTasksPrepared(),
"All tasks should be prepared after successful completeTask()");

// 2. Simulate the stuck state described in issue #4172: pretend beginCommitting() never
// transitioned the status out of *_OPTIMIZING. Mutate the in-memory status directly via the
// state store (no reload, so we model "AMS is still running but beginCommitting() silently
// failed / was rolled back").
runtime
.store()
.begin()
.updateStatusCode(code -> OptimizingStatus.MINOR_OPTIMIZING.getCode())
.commit();
Assertions.assertEquals(OptimizingStatus.MINOR_OPTIMIZING, runtime.getOptimizingStatus());

// 3. Trigger a refresh cycle — the executor should detect the stuck state and self-heal
// by re-invoking beginCommitting().
TableRuntimeRefresher refresher = new TableRuntimeRefresher();
try {
refresher.refreshPending();
} finally {
refresher.dispose();
}

Assertions.assertEquals(
OptimizingStatus.COMMITTING,
getDefaultTableRuntime(serverTableIdentifier().getId()).getOptimizingStatus(),
"TableRuntimeRefreshExecutor must re-drive beginCommitting() for a stuck RUNNING process");
}

@Test
public void testReloadPlanningWithOrphanedProcess() {
// 1. Poll and ack a task - table is now in optimizing state with an active process
Expand Down
Loading