diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java index 1f19187e9e..35cdc4ddc6 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java @@ -24,17 +24,25 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.exception.PersistenceException; +import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.mapper.TableProcessMapper; +import org.apache.amoro.server.process.TableProcessMeta; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.apache.amoro.server.utils.SnowflakeIdGenerator; +import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -48,6 +56,12 @@ public abstract class PeriodicTableScheduler extends RuntimeHandlerChain { protected final Logger logger = LoggerFactory.getLogger(getClass()); private static final long START_DELAY = 10 * 1000L; + private static final String CLEANUP_EXECUTION_ENGINE = "AMORO"; + private static final String CLEANUP_PROCESS_STAGE = "CLEANUP"; + private static final String EXTERNAL_PROCESS_IDENTIFIER = ""; + private static final SnowflakeIdGenerator ID_GENERATOR = new SnowflakeIdGenerator(); + + private final PersistenceHelper persistenceHelper = new PersistenceHelper(); protected final Set scheduledTables = Collections.synchronizedSet(new HashSet<>()); @@ -123,16 +137,31 @@ private void scheduleTableExecution(TableRuntime tableRuntime, long delay) { } private void executeTask(TableRuntime tableRuntime) { + TableProcessMeta cleanupProcessMeta = null; + CleanupOperation cleanupOperation = null; + Exception executionError = null; + long cleanupEndTime = 0L; + try { if (isExecutable(tableRuntime)) { + cleanupOperation = getCleanupOperation(); + // create and persist cleanup process info + cleanupProcessMeta = createCleanupProcessInfo(tableRuntime, cleanupOperation); + execute(tableRuntime); + // Different tables take different amounts of time to execute the end of execute(), // so you need to perform the update operation separately for each table. - persistUpdatingCleanupTime(tableRuntime); + cleanupEndTime = System.currentTimeMillis(); + persistUpdatingCleanupTime(tableRuntime, cleanupEndTime); } } catch (Exception e) { logger.error("exception when schedule for table: {}", tableRuntime.getTableIdentifier(), e); + executionError = e; } finally { + // persist cleanup result info. + persistCleanupResult( + tableRuntime, cleanupOperation, cleanupProcessMeta, cleanupEndTime, executionError); scheduledTables.remove(tableRuntime.getTableIdentifier()); scheduleIfNecessary(tableRuntime, getNextExecutingTime(tableRuntime)); } @@ -156,14 +185,13 @@ protected boolean shouldExecute(Long lastCleanupEndTime) { return true; } - private void persistUpdatingCleanupTime(TableRuntime tableRuntime) { + private void persistUpdatingCleanupTime(TableRuntime tableRuntime, long currentTime) { CleanupOperation cleanupOperation = getCleanupOperation(); if (shouldSkipOperation(tableRuntime, cleanupOperation)) { return; } try { - long currentTime = System.currentTimeMillis(); ((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, currentTime); logger.debug( @@ -178,6 +206,125 @@ private void persistUpdatingCleanupTime(TableRuntime tableRuntime) { } } + @VisibleForTesting + public TableProcessMeta createCleanupProcessInfo( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + if (shouldSkipOperation(tableRuntime, cleanupOperation)) { + return null; + } + + TableProcessMeta cleanupProcessMeta = buildCleanupProcessMeta(tableRuntime, cleanupOperation); + persistenceHelper.beginAndPersistCleanupProcess(cleanupProcessMeta); + logger.debug( + "Successfully persist cleanup process [processId={}, tableId={}, processType={}]", + cleanupProcessMeta.getProcessId(), + cleanupProcessMeta.getTableId(), + cleanupProcessMeta.getProcessType()); + + return cleanupProcessMeta; + } + + private TableProcessMeta buildCleanupProcessMeta( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + TableProcessMeta cleanupProcessMeta = new TableProcessMeta(); + + cleanupProcessMeta.setTableId(tableRuntime.getTableIdentifier().getId()); + cleanupProcessMeta.setProcessId(ID_GENERATOR.generateId()); + cleanupProcessMeta.setExternalProcessIdentifier(EXTERNAL_PROCESS_IDENTIFIER); + cleanupProcessMeta.setStatus(ProcessStatus.RUNNING); + cleanupProcessMeta.setProcessType(cleanupOperation.name()); + cleanupProcessMeta.setProcessStage(CLEANUP_PROCESS_STAGE); + cleanupProcessMeta.setExecutionEngine(CLEANUP_EXECUTION_ENGINE); + cleanupProcessMeta.setRetryNumber(0); + cleanupProcessMeta.setFinishTime(0); + cleanupProcessMeta.setFailMessage(""); + cleanupProcessMeta.setCreateTime(System.currentTimeMillis()); + cleanupProcessMeta.setProcessParameters(new HashMap<>()); + cleanupProcessMeta.setSummary(new HashMap<>()); + + return cleanupProcessMeta; + } + + @VisibleForTesting + public void persistCleanupResult( + TableRuntime tableRuntime, + CleanupOperation cleanupOperation, + TableProcessMeta cleanupProcessMeta, + long cleanupEndTime, + Exception executionError) { + + if (cleanupOperation == null + || cleanupProcessMeta == null + || shouldSkipOperation(tableRuntime, cleanupOperation)) { + return; + } + + cleanupProcessMeta.setFinishTime(cleanupEndTime); + if (executionError != null) { + cleanupProcessMeta.setStatus(ProcessStatus.FAILED); + cleanupProcessMeta.setFailMessage(executionError.getMessage()); + } else { + cleanupProcessMeta.setStatus(ProcessStatus.SUCCESS); + } + + try { + persistenceHelper.updateAndPersistCleanupProcess(cleanupProcessMeta); + } catch (PersistenceException e) { + logger.error( + "Failed to persist cleanup process result [processId={}, tableId={}, processType={}]", + cleanupProcessMeta.getProcessId(), + cleanupProcessMeta.getTableId(), + cleanupProcessMeta.getProcessType(), + e); + } + + logger.debug( + "Successfully updated lastCleanTime and cleanupProcess for table {} with processId={}, cleanup operation {}", + tableRuntime.getTableIdentifier().getTableName(), + cleanupProcessMeta.getProcessId(), + cleanupOperation); + } + + private static class PersistenceHelper extends PersistentBase { + + public PersistenceHelper() {} + + private void beginAndPersistCleanupProcess(TableProcessMeta meta) { + doAs( + TableProcessMapper.class, + mapper -> + mapper.insertProcess( + meta.getTableId(), + meta.getProcessId(), + meta.getExternalProcessIdentifier(), + meta.getStatus(), + meta.getProcessType(), + meta.getProcessStage(), + meta.getExecutionEngine(), + meta.getRetryNumber(), + meta.getCreateTime(), + meta.getProcessParameters(), + meta.getSummary())); + } + + private void updateAndPersistCleanupProcess(TableProcessMeta meta) { + doAs( + TableProcessMapper.class, + mapper -> + mapper.updateProcess( + meta.getTableId(), + meta.getProcessId(), + meta.getExternalProcessIdentifier(), + meta.getStatus(), + meta.getProcessStage(), + meta.getRetryNumber(), + meta.getFinishTime(), + meta.getFailMessage(), + meta.getProcessParameters(), + meta.getSummary())); + } + } + /** * Get cleanup operation. Default is NONE, subclasses should override this method to provide * specific operation. diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java index de25eac21d..e6e339eae6 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java @@ -20,15 +20,15 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; -import org.apache.amoro.TableRuntime; -import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; +import org.apache.amoro.server.persistence.mapper.TableProcessMapper; import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper; +import org.apache.amoro.server.process.TableProcessMeta; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.DefaultTableRuntimeStore; -import org.apache.amoro.server.table.TableRuntimeHandler; import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.apache.amoro.table.TableRuntimeStore; import org.apache.amoro.table.TableSummary; @@ -48,6 +48,12 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase { private static final String TEST_CATALOG = "test_catalog"; private static final String TEST_DB = "test_db"; private static final String TEST_TABLE = "test_table"; + private static final List CLEANUP_OPERATIONS = + Arrays.asList( + CleanupOperation.ORPHAN_FILES_CLEANING, + CleanupOperation.DANGLING_DELETE_FILES_CLEANING, + CleanupOperation.DATA_EXPIRING, + CleanupOperation.SNAPSHOTS_EXPIRING); static { try { @@ -57,18 +63,6 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase { } } - private static final TableRuntimeHandler TEST_HANDLER = - new TableRuntimeHandler() { - @Override - public void handleTableChanged( - TableRuntime tableRuntime, - org.apache.amoro.server.optimizing.OptimizingStatus originalStatus) {} - - @Override - public void handleTableChanged( - TableRuntime tableRuntime, TableConfiguration originalConfig) {} - }; - /** * Create a test server table identifier with the given ID * @@ -103,7 +97,7 @@ private DefaultTableRuntime createDefaultTableRuntime(ServerTableIdentifier iden return new DefaultTableRuntime(store, () -> null); } - private void cleanUpTableRuntimeData(List tableIds) { + private void cleanupTableRuntimeData(List tableIds) { doAs( TableRuntimeMapper.class, mapper -> { @@ -135,7 +129,7 @@ private void cleanUpTableRuntimeData(List tableIds) { * @param testTableIds list of table IDs to clean up */ private void prepareTestEnvironment(List testTableIds) { - cleanUpTableRuntimeData(testTableIds); + cleanupTableRuntimeData(testTableIds); } /** @@ -165,16 +159,8 @@ private PeriodicTableSchedulerTestBase createTestExecutor(CleanupOperation clean */ @Test public void testShouldExecuteTaskWithNoPreviousCleanup() { - List operations = - Arrays.asList( - CleanupOperation.ORPHAN_FILES_CLEANING, - CleanupOperation.DANGLING_DELETE_FILES_CLEANING, - CleanupOperation.DATA_EXPIRING, - CleanupOperation.SNAPSHOTS_EXPIRING); - - for (CleanupOperation operation : operations) { - List testTableIds = Collections.singletonList(1L); - prepareTestEnvironment(testTableIds); + for (CleanupOperation operation : CLEANUP_OPERATIONS) { + prepareTestEnvironment(Collections.singletonList(1L)); PeriodicTableSchedulerTestBase executor = createTestExecutor(operation); ServerTableIdentifier identifier = createTableIdentifier(1L); @@ -190,16 +176,8 @@ public void testShouldExecuteTaskWithNoPreviousCleanup() { /** Test should not execute task with recent cleanup */ @Test public void testShouldNotExecuteTaskWithRecentCleanup() { - List operations = - Arrays.asList( - CleanupOperation.ORPHAN_FILES_CLEANING, - CleanupOperation.DANGLING_DELETE_FILES_CLEANING, - CleanupOperation.DATA_EXPIRING, - CleanupOperation.SNAPSHOTS_EXPIRING); - - for (CleanupOperation operation : operations) { - List testTableIds = Collections.singletonList(1L); - cleanUpTableRuntimeData(testTableIds); + for (CleanupOperation operation : CLEANUP_OPERATIONS) { + cleanupTableRuntimeData(Collections.singletonList(1L)); PeriodicTableSchedulerTestBase executor = createTestExecutor(operation); @@ -220,16 +198,8 @@ public void testShouldNotExecuteTaskWithRecentCleanup() { /** Test should execute task with old cleanup */ @Test public void testShouldExecuteTaskWithOldCleanup() { - List operations = - Arrays.asList( - CleanupOperation.ORPHAN_FILES_CLEANING, - CleanupOperation.DANGLING_DELETE_FILES_CLEANING, - CleanupOperation.DATA_EXPIRING, - CleanupOperation.SNAPSHOTS_EXPIRING); - - for (CleanupOperation operation : operations) { - List testTableIds = Collections.singletonList(1L); - cleanUpTableRuntimeData(testTableIds); + for (CleanupOperation operation : CLEANUP_OPERATIONS) { + cleanupTableRuntimeData(Collections.singletonList(1L)); PeriodicTableSchedulerTestBase executor = createTestExecutor(operation); @@ -262,4 +232,51 @@ public void testShouldExecuteTaskWithNoneOperation() { boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, CleanupOperation.NONE); Assert.assertTrue("Should always execute with NONE operation", shouldExecute); } + + /** Test cleanup process info is persisted with SUCCESS status for each cleanup operation */ + @Test + public void testCleanupProcessPersistedOnSuccess() { + for (CleanupOperation operation : CLEANUP_OPERATIONS) { + assertCleanupProcessPersisted(operation, null, ProcessStatus.SUCCESS, null); + } + } + + /** Test cleanup process info is persisted with FAILED status when execution fails */ + @Test + public void testCleanupProcessPersistedOnFailure() { + for (CleanupOperation operation : CLEANUP_OPERATIONS) { + Exception error = new RuntimeException("Simulated cleanup failure for " + operation); + assertCleanupProcessPersisted(operation, error, ProcessStatus.FAILED, error.getMessage()); + } + } + + /** Assert cleanup process is persisted with expected status and failMessage */ + private void assertCleanupProcessPersisted( + CleanupOperation operation, + Exception executionError, + ProcessStatus expectedStatus, + String expectedFailMessage) { + prepareTestEnvironment(Collections.singletonList(1L)); + + PeriodicTableSchedulerTestBase executor = createTestExecutor(operation); + DefaultTableRuntime tableRuntime = createDefaultTableRuntime(createTableIdentifier(1L)); + + // 1、Create cleanup process info and persist result + TableProcessMeta meta = executor.createCleanupProcessInfo(tableRuntime, operation); + + // 2、Update cleanup result with execution outcome + long cleanupEndTime = System.currentTimeMillis(); + executor.persistCleanupResult(tableRuntime, operation, meta, cleanupEndTime, executionError); + + // 3、Verify persisted process info in database + TableProcessMeta persisted = + getAs(TableProcessMapper.class, mapper -> mapper.getProcessMeta(meta.getProcessId())); + + Assert.assertEquals(expectedStatus, persisted.getStatus()); + Assert.assertEquals(operation.name(), persisted.getProcessType()); + Assert.assertEquals(cleanupEndTime, persisted.getFinishTime()); + if (expectedFailMessage != null) { + Assert.assertEquals(expectedFailMessage, persisted.getFailMessage()); + } + } }