diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java index ca47e22522..dc855edf9d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java @@ -18,12 +18,12 @@ package org.apache.amoro.server.optimizing.maintainer; +import org.apache.amoro.TableRuntime; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.maintainer.MaintainerMetrics; import org.apache.amoro.maintainer.OptimizingInfo; import org.apache.amoro.maintainer.TableMaintainerContext; import org.apache.amoro.server.table.DefaultTableRuntime; -import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics; import org.apache.amoro.server.utils.HiveLocationUtil; import org.apache.amoro.table.MixedTable; @@ -31,20 +31,20 @@ import java.util.Set; /** - * Default implementation of TableMaintainerContext for AMS. Adapts DefaultTableRuntime to + * Default implementation of TableMaintainerContext for AMS. Adapts TableRuntime to * TableMaintainerContext interface. */ public class DefaultTableMaintainerContext implements TableMaintainerContext { - private final DefaultTableRuntime tableRuntime; + private final TableRuntime tableRuntime; private final MixedTable mixedTable; - public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime) { + public DefaultTableMaintainerContext(TableRuntime tableRuntime) { this.tableRuntime = tableRuntime; this.mixedTable = null; } - public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime, MixedTable mixedTable) { + public DefaultTableMaintainerContext(TableRuntime tableRuntime, MixedTable mixedTable) { this.tableRuntime = tableRuntime; this.mixedTable = mixedTable; } @@ -56,23 +56,21 @@ public TableConfiguration getTableConfiguration() { @Override public MaintainerMetrics getMetrics() { - TableOrphanFilesCleaningMetrics metrics = tableRuntime.getOrphanFilesCleaningMetrics(); - return new MaintainerMetrics() { - @Override - public void recordOrphanDataFilesCleaned(int expected, int cleaned) { - metrics.completeOrphanDataFiles(expected, cleaned); - } - - @Override - public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) { - metrics.completeOrphanMetadataFiles(expected, cleaned); - } - }; + // Return the full TableMaintainerMetricsImpl directly + // This provides access to all maintainer metrics including orphan files cleaning, + // dangling delete files cleaning, snapshot expiration, data expiration, tag creation, + // and partition expiration. + return tableRuntime.getMaintainerMetrics(); } @Override public OptimizingInfo getOptimizingInfo() { - return new DefaultOptimizingInfo(tableRuntime); + // For AMS DefaultTableRuntime, provide full optimizing info. + // For other TableRuntime implementations, return empty info. + if (tableRuntime instanceof DefaultTableRuntime) { + return new DefaultOptimizingInfo((DefaultTableRuntime) tableRuntime); + } + return OptimizingInfo.EMPTY; } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java index ec1583fa1d..9c3acd9ef9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java @@ -24,8 +24,6 @@ import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer; import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer; import org.apache.amoro.maintainer.TableMaintainer; -import org.apache.amoro.server.table.DefaultTableRuntime; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.table.MixedTable; import org.apache.iceberg.Table; @@ -36,11 +34,11 @@ public class TableMaintainerFactory { * Create an Iceberg table maintainer with AMS context. * * @param table the Iceberg table - * @param tableRuntime the AMS table runtime + * @param tableRuntime the table runtime * @return IcebergTableMaintainer instance */ public static IcebergTableMaintainer createIcebergMaintainer( - Table table, DefaultTableRuntime tableRuntime) { + Table table, TableRuntime tableRuntime) { return new IcebergTableMaintainer( table, tableRuntime.getTableIdentifier().getIdentifier(), @@ -55,19 +53,17 @@ public static IcebergTableMaintainer createIcebergMaintainer( * @return TableMaintainer instance */ public static TableMaintainer create(AmoroTable amoroTable, TableRuntime tableRuntime) { - Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime); - DefaultTableRuntime runtime = (DefaultTableRuntime) tableRuntime; TableFormat format = amoroTable.format(); if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) { MixedTable mixedTable = (MixedTable) amoroTable.originalTable(); return new MixedTableMaintainer( - mixedTable, new DefaultTableMaintainerContext(runtime, mixedTable)); + mixedTable, new DefaultTableMaintainerContext(tableRuntime, mixedTable)); } else if (TableFormat.ICEBERG.equals(format)) { return new IcebergTableMaintainer( (Table) amoroTable.originalTable(), amoroTable.id(), - new DefaultTableMaintainerContext(runtime)); + new DefaultTableMaintainerContext(tableRuntime)); } else { throw new RuntimeException("Unsupported table type" + amoroTable.originalTable().getClass()); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java index eb152bb096..29ef43d457 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java @@ -19,13 +19,8 @@ package org.apache.amoro.server.optimizing.maintainer; import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; -import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer; import org.apache.amoro.maintainer.TableMaintainer; -import org.apache.amoro.server.table.DefaultTableRuntime; -import org.apache.amoro.table.MixedTable; -import org.apache.iceberg.Table; /** Factory for creating {@link TableMaintainer}. */ @Deprecated @@ -40,24 +35,4 @@ public class TableMaintainers { public static TableMaintainer create(AmoroTable amoroTable, TableRuntime tableRuntime) { return TableMaintainerFactory.create(amoroTable, tableRuntime); } - - /** - * Create a {@link TableMaintainer} for the given table with DefaultTableRuntime. - * - * @deprecated since 0.9.0, will be removed in 0.10.0. Use {@link - * TableMaintainerFactory#createIcebergMaintainer(Table, DefaultTableRuntime)} instead. - */ - public static TableMaintainer create(AmoroTable amoroTable, DefaultTableRuntime tableRuntime) { - TableFormat format = amoroTable.format(); - if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) { - MixedTable mixedTable = (MixedTable) amoroTable.originalTable(); - return new MixedTableMaintainer( - mixedTable, new DefaultTableMaintainerContext(tableRuntime, mixedTable)); - } else if (TableFormat.ICEBERG.equals(format)) { - return TableMaintainerFactory.createIcebergMaintainer( - (Table) amoroTable.originalTable(), tableRuntime); - } else { - throw new RuntimeException("Unsupported table type" + amoroTable.originalTable().getClass()); - } - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java index 74e52f0b7e..c1a14adb08 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java @@ -37,6 +37,15 @@ protected AbstractTableMetrics(ServerTableIdentifier identifier) { this.identifier = identifier; } + /** + * Get the table identifier. + * + * @return ServerTableIdentifier + */ + public ServerTableIdentifier getIdentifier() { + return identifier; + } + protected void registerMetric(MetricRegistry registry, MetricDefine define, Metric metric) { MetricKey key = registry.register( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index 9b4b56a95c..365e9c0497 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -23,6 +23,7 @@ import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.iceberg.Constants; +import org.apache.amoro.maintainer.MaintainerMetrics; import org.apache.amoro.metrics.MetricRegistry; import org.apache.amoro.optimizing.OptimizingType; import org.apache.amoro.optimizing.TableRuntimeOptimizingState; @@ -86,7 +87,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime { Lists.newArrayList( OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY, CLEANUP_STATE_KEY); private final TableOptimizingMetrics optimizingMetrics; - private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics; + private final TableMaintainerMetrics maintainerMetrics; private final TableSummaryMetrics tableSummaryMetrics; private volatile long lastPlanTime; private volatile long latestRefreshInterval = AmoroServiceConstants.INVALID_TIME; @@ -100,8 +101,7 @@ public DefaultTableRuntime(TableRuntimeStore store, Supplier> load super(store); this.optimizingMetrics = new TableOptimizingMetrics(store.getTableIdentifier(), store.getGroupName()); - this.orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics(store.getTableIdentifier()); + this.maintainerMetrics = new TableMaintainerMetrics(store.getTableIdentifier()); this.tableSummaryMetrics = new TableSummaryMetrics(store.getTableIdentifier()); this.loader = loader; } @@ -120,12 +120,18 @@ public void recover(OptimizingProcess optimizingProcess) { public void registerMetric(MetricRegistry metricRegistry) { // TODO: extract method to interface. this.optimizingMetrics.register(metricRegistry); - this.orphanFilesCleaningMetrics.register(metricRegistry); + this.maintainerMetrics.register(metricRegistry); this.tableSummaryMetrics.register(metricRegistry); } - public TableOrphanFilesCleaningMetrics getOrphanFilesCleaningMetrics() { - return orphanFilesCleaningMetrics; + /** + * Get the maintainer metrics implementation. + * + * @return MaintainerMetrics instance + */ + @Override + public MaintainerMetrics getMaintainerMetrics() { + return maintainerMetrics; } public long getCurrentSnapshotId() { @@ -458,7 +464,7 @@ public void beginCommitting() { @Override public void unregisterMetric() { tableSummaryMetrics.unregister(); - orphanFilesCleaningMetrics.unregister(); + maintainerMetrics.unregister(); optimizingMetrics.unregister(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableMaintainerMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableMaintainerMetrics.java new file mode 100644 index 0000000000..d6f95d7925 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableMaintainerMetrics.java @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import static org.apache.amoro.metrics.MetricDefine.defineCounter; +import static org.apache.amoro.metrics.MetricDefine.defineGauge; + +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.maintainer.MaintainerMetrics; +import org.apache.amoro.maintainer.MaintainerOperationType; +import org.apache.amoro.metrics.Counter; +import org.apache.amoro.metrics.Gauge; +import org.apache.amoro.metrics.Metric; +import org.apache.amoro.metrics.MetricDefine; +import org.apache.amoro.metrics.MetricKey; +import org.apache.amoro.metrics.MetricRegistry; +import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Table maintenance operation metrics implementation. + * + *

This class handles metrics recording for table maintenance operations such as orphan file + * cleaning, snapshot expiration, data expiration, and tag creation. + * + *

Design notes: + * + *

+ */ +public class TableMaintainerMetrics implements MaintainerMetrics { + + // ========== Orphan Files Related MetricDefine ========== + + /** + * Count of orphan content files cleaned. + * + *

Note: This metric name is retained for backward compatibility. The "content" terminology + * refers to data files (as opposed to metadata files). + */ + public static final MetricDefine TABLE_ORPHAN_CONTENT_FILE_CLEANING_COUNT = + defineCounter("table_orphan_content_file_cleaning_count") + .withDescription("Count of orphan content files cleaned") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** + * Expected count of orphan content files to clean. + * + *

Note: This metric name is retained for backward compatibility. The "content" terminology + * refers to data files (as opposed to metadata files). + */ + public static final MetricDefine TABLE_EXPECTED_ORPHAN_CONTENT_FILE_CLEANING_COUNT = + defineCounter("table_expected_orphan_content_file_cleaning_count") + .withDescription("Expected count of orphan content files to clean") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Count of orphan metadata files cleaned */ + public static final MetricDefine TABLE_ORPHAN_METADATA_FILES_CLEANED_COUNT = + // Keep historical metric name for backward compatibility. + defineCounter("table_orphan_metadata_file_cleaning_count") + .withDescription("Count of orphan metadata files cleaned") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Expected count of orphan metadata files to clean */ + public static final MetricDefine TABLE_ORPHAN_METADATA_FILES_CLEANED_EXPECTED_COUNT = + // Keep historical metric name for backward compatibility. + defineCounter("table_expected_orphan_metadata_file_cleaning_count") + .withDescription("Expected count of orphan metadata files to clean") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of orphan files cleaning operation (milliseconds) */ + public static final MetricDefine TABLE_ORPHAN_FILES_CLEANING_DURATION = + defineGauge("table_orphan_files_cleaning_duration_millis") + .withDescription("Duration of orphan files cleaning operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== Dangling Delete Files Related MetricDefine (Iceberg) ========== + + /** Count of dangling delete files cleaned */ + public static final MetricDefine TABLE_DANGLING_DELETE_FILES_CLEANED_COUNT = + defineCounter("table_dangling_delete_files_cleaned_count") + .withDescription("Count of dangling delete files cleaned") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of dangling delete files cleaning operation (milliseconds) */ + public static final MetricDefine TABLE_DANGLING_DELETE_FILES_CLEANING_DURATION = + defineGauge("table_dangling_delete_files_cleaning_duration_millis") + .withDescription("Duration of dangling delete files cleaning operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== Snapshot Expiration Related MetricDefine ========== + + /** Count of snapshots expired */ + public static final MetricDefine TABLE_SNAPSHOTS_EXPIRED_COUNT = + defineCounter("table_snapshots_expired_count") + .withDescription("Count of snapshots expired") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Count of data files deleted during snapshot expiration */ + public static final MetricDefine TABLE_SNAPSHOTS_EXPIRED_DATA_FILES_DELETED = + defineCounter("table_snapshots_expired_data_files_deleted") + .withDescription("Count of data files deleted during snapshot expiration") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of snapshot expiration operation (milliseconds) */ + public static final MetricDefine TABLE_SNAPSHOTS_EXPIRATION_DURATION = + defineGauge("table_snapshots_expiration_duration_millis") + .withDescription("Duration of snapshot expiration operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== Data Expiration Related MetricDefine (Iceberg) ========== + + /** Count of data files expired */ + public static final MetricDefine TABLE_DATA_EXPIRED_DATA_FILES_COUNT = + defineCounter("table_data_expired_data_files_count") + .withDescription("Count of data files expired") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Count of delete files expired */ + public static final MetricDefine TABLE_DATA_EXPIRED_DELETE_FILES_COUNT = + defineCounter("table_data_expired_delete_files_count") + .withDescription("Count of delete files expired") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of data expiration operation (milliseconds) */ + public static final MetricDefine TABLE_DATA_EXPIRATION_DURATION = + defineGauge("table_data_expiration_duration_millis") + .withDescription("Duration of data expiration operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== Tag Creation Related MetricDefine (Iceberg) ========== + + /** Count of tags created */ + public static final MetricDefine TABLE_TAGS_CREATED_COUNT = + defineCounter("table_tags_created_count") + .withDescription("Count of tags created") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of tag creation operation (milliseconds) */ + public static final MetricDefine TABLE_TAG_CREATION_DURATION = + defineGauge("table_tag_creation_duration_millis") + .withDescription("Duration of tag creation operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== Partition Expiration Related MetricDefine (Paimon) ========== + + /** Count of partitions expired */ + public static final MetricDefine TABLE_PARTITIONS_EXPIRED_COUNT = + defineCounter("table_partitions_expired_count") + .withDescription("Count of partitions expired") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Count of files expired during partition expiration */ + public static final MetricDefine TABLE_PARTITIONS_EXPIRED_FILES_COUNT = + defineCounter("table_partitions_expired_files_count") + .withDescription("Count of files expired during partition expiration") + .withTags("catalog", "database", "table", "table_format") + .build(); + + /** Duration of partition expiration operation (milliseconds) */ + public static final MetricDefine TABLE_PARTITION_EXPIRATION_DURATION = + defineGauge("table_partition_expiration_duration_millis") + .withDescription("Duration of partition expiration operation in milliseconds") + .withTags("catalog", "database", "table", "table_format") + .build(); + + // ========== General Operation Status Related MetricDefine ========== + + /** Count of successful maintainer operations */ + public static final MetricDefine TABLE_MAINTAINER_OPERATION_SUCCESS_COUNT = + defineCounter("table_maintainer_operation_success_count") + .withDescription("Count of successful maintainer operations") + .withTags("catalog", "database", "table", "table_format", "operation_type") + .build(); + + /** Count of failed maintainer operations */ + public static final MetricDefine TABLE_MAINTAINER_OPERATION_FAILURE_COUNT = + defineCounter("table_maintainer_operation_failure_count") + .withDescription("Count of failed maintainer operations") + .withTags("catalog", "database", "table", "table_format", "operation_type") + .build(); + + /** Duration of maintainer operation (milliseconds) */ + public static final MetricDefine TABLE_MAINTAINER_OPERATION_DURATION = + defineGauge("table_maintainer_operation_duration_millis") + .withDescription("Duration of maintainer operation in milliseconds") + .withTags("catalog", "database", "table", "table_format", "operation_type") + .build(); + + // ========== Instance Fields ========== + + private final ServerTableIdentifier identifier; + private final String tableFormat; + private final List registeredMetricKeys = Lists.newArrayList(); + private MetricRegistry globalRegistry; + + // ========== Orphan Files Metrics ========== + private final Counter orphanContentFileCleaningCount = new Counter(); + private final Counter expectedOrphanContentFileCleaningCount = new Counter(); + private final Counter orphanMetadataFilesCount = new Counter(); + private final Counter orphanMetadataFilesExpectedCount = new Counter(); + private final LastOperationDurationGauge orphanFilesCleaningDuration = + new LastOperationDurationGauge(); + + // ========== Dangling Delete Files Metrics ========== + private final Counter danglingDeleteFilesCount = new Counter(); + private final LastOperationDurationGauge danglingDeleteFilesCleaningDuration = + new LastOperationDurationGauge(); + + // ========== Snapshot Expiration Metrics ========== + private final Counter snapshotsExpiredCount = new Counter(); + private final Counter snapshotsExpiredDataFilesDeleted = new Counter(); + private final LastOperationDurationGauge snapshotsExpirationDuration = + new LastOperationDurationGauge(); + + // ========== Data Expiration Metrics ========== + private final Counter dataExpiredDataFilesCount = new Counter(); + private final Counter dataExpiredDeleteFilesCount = new Counter(); + private final LastOperationDurationGauge dataExpirationDuration = + new LastOperationDurationGauge(); + + // ========== Tag Creation Metrics ========== + private final Counter tagsCreatedCount = new Counter(); + private final LastOperationDurationGauge tagCreationDuration = new LastOperationDurationGauge(); + + // ========== Partition Expiration Metrics ========== + private final Counter partitionsExpiredCount = new Counter(); + private final Counter partitionsExpiredFilesCount = new Counter(); + private final LastOperationDurationGauge partitionExpirationDuration = + new LastOperationDurationGauge(); + + // ========== Operation Status Metrics ========== + private final ConcurrentHashMap successCounters = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap failureCounters = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap durationGauges = + new ConcurrentHashMap<>(); + + /** + * Constructor + * + * @param identifier Table identifier (contains format information via getFormat()) + */ + public TableMaintainerMetrics(ServerTableIdentifier identifier) { + this.identifier = identifier; + this.tableFormat = identifier.getFormat().name().toLowerCase(); + // Initialize operation type counters and gauges + for (MaintainerOperationType type : MaintainerOperationType.values()) { + successCounters.put(type, new Counter()); + failureCounters.put(type, new Counter()); + durationGauges.put(type, new OperationDurationGauge()); + } + } + + /** + * Get the table identifier. + * + * @return ServerTableIdentifier + */ + public ServerTableIdentifier getIdentifier() { + return identifier; + } + + public void register(MetricRegistry registry) { + if (globalRegistry != null) { + return; + } + registerMetrics(registry); + globalRegistry = registry; + } + + public void unregister() { + if (globalRegistry != null) { + registeredMetricKeys.forEach(globalRegistry::unregister); + registeredMetricKeys.clear(); + globalRegistry = null; + } + } + + private void registerMetrics(MetricRegistry registry) { + // Build base tags (including table_format) + Map baseTags = + ImmutableMap.of( + "catalog", + identifier.getCatalog(), + "database", + identifier.getDatabase(), + "table", + identifier.getTableName(), + "table_format", + tableFormat); + + // Orphan files + registerMetricWithTags( + registry, + TABLE_ORPHAN_CONTENT_FILE_CLEANING_COUNT, + orphanContentFileCleaningCount, + baseTags); + registerMetricWithTags( + registry, + TABLE_EXPECTED_ORPHAN_CONTENT_FILE_CLEANING_COUNT, + expectedOrphanContentFileCleaningCount, + baseTags); + registerMetricWithTags( + registry, TABLE_ORPHAN_METADATA_FILES_CLEANED_COUNT, orphanMetadataFilesCount, baseTags); + registerMetricWithTags( + registry, + TABLE_ORPHAN_METADATA_FILES_CLEANED_EXPECTED_COUNT, + orphanMetadataFilesExpectedCount, + baseTags); + registerMetricWithTags( + registry, TABLE_ORPHAN_FILES_CLEANING_DURATION, orphanFilesCleaningDuration, baseTags); + + // Dangling delete files + registerMetricWithTags( + registry, TABLE_DANGLING_DELETE_FILES_CLEANED_COUNT, danglingDeleteFilesCount, baseTags); + registerMetricWithTags( + registry, + TABLE_DANGLING_DELETE_FILES_CLEANING_DURATION, + danglingDeleteFilesCleaningDuration, + baseTags); + + // Snapshot expiration + registerMetricWithTags( + registry, TABLE_SNAPSHOTS_EXPIRED_COUNT, snapshotsExpiredCount, baseTags); + registerMetricWithTags( + registry, + TABLE_SNAPSHOTS_EXPIRED_DATA_FILES_DELETED, + snapshotsExpiredDataFilesDeleted, + baseTags); + registerMetricWithTags( + registry, TABLE_SNAPSHOTS_EXPIRATION_DURATION, snapshotsExpirationDuration, baseTags); + + // Data expiration + registerMetricWithTags( + registry, TABLE_DATA_EXPIRED_DATA_FILES_COUNT, dataExpiredDataFilesCount, baseTags); + registerMetricWithTags( + registry, TABLE_DATA_EXPIRED_DELETE_FILES_COUNT, dataExpiredDeleteFilesCount, baseTags); + registerMetricWithTags( + registry, TABLE_DATA_EXPIRATION_DURATION, dataExpirationDuration, baseTags); + + // Tag creation + registerMetricWithTags(registry, TABLE_TAGS_CREATED_COUNT, tagsCreatedCount, baseTags); + registerMetricWithTags(registry, TABLE_TAG_CREATION_DURATION, tagCreationDuration, baseTags); + + // Partition expiration + registerMetricWithTags( + registry, TABLE_PARTITIONS_EXPIRED_COUNT, partitionsExpiredCount, baseTags); + registerMetricWithTags( + registry, TABLE_PARTITIONS_EXPIRED_FILES_COUNT, partitionsExpiredFilesCount, baseTags); + registerMetricWithTags( + registry, TABLE_PARTITION_EXPIRATION_DURATION, partitionExpirationDuration, baseTags); + + // Operation status (needs to include operation_type tag) + for (MaintainerOperationType type : MaintainerOperationType.values()) { + Map operationTags = + ImmutableMap.builder() + .putAll(baseTags) + .put("operation_type", type.getMetricName()) + .build(); + registerMetricWithTags( + registry, + TABLE_MAINTAINER_OPERATION_SUCCESS_COUNT, + successCounters.get(type), + operationTags); + registerMetricWithTags( + registry, + TABLE_MAINTAINER_OPERATION_FAILURE_COUNT, + failureCounters.get(type), + operationTags); + registerMetricWithTags( + registry, TABLE_MAINTAINER_OPERATION_DURATION, durationGauges.get(type), operationTags); + } + } + + /** + * Register metric with specified tags + * + * @param registry MetricRegistry + * @param define MetricDefine + * @param metric Metric instance + * @param tags Tags + */ + private void registerMetricWithTags( + MetricRegistry registry, MetricDefine define, Metric metric, Map tags) { + MetricKey key = registry.register(define, tags, metric); + registeredMetricKeys.add(key); + } + + // ========== MaintainerMetrics Interface Implementation ========== + + @Override + public void recordOrphanDataFilesCleaned(int expected, int cleaned) { + expectedOrphanContentFileCleaningCount.inc(expected); + orphanContentFileCleaningCount.inc(cleaned); + } + + @Override + public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) { + orphanMetadataFilesExpectedCount.inc(expected); + orphanMetadataFilesCount.inc(cleaned); + } + + @Override + public void recordDanglingDeleteFilesCleaned(int cleaned) { + danglingDeleteFilesCount.inc(cleaned); + } + + @Override + public void recordSnapshotsExpired(int snapshotCount, int dataFilesDeleted) { + snapshotsExpiredCount.inc(snapshotCount); + snapshotsExpiredDataFilesDeleted.inc(dataFilesDeleted); + } + + @Override + public void recordDataExpired(int dataFilesExpired, int deleteFilesExpired) { + dataExpiredDataFilesCount.inc(dataFilesExpired); + dataExpiredDeleteFilesCount.inc(deleteFilesExpired); + } + + @Override + public void recordTagsCreated(int tagsCreated) { + tagsCreatedCount.inc(tagsCreated); + } + + @Override + public void recordPartitionsExpired(int partitionsExpired, int filesExpired) { + partitionsExpiredCount.inc(partitionsExpired); + partitionsExpiredFilesCount.inc(filesExpired); + } + + @Override + public void recordOperationSuccess(MaintainerOperationType operationType, long durationMillis) { + successCounters.get(operationType).inc(); + durationGauges.get(operationType).setValue(durationMillis); + updateSpecificDuration(operationType, durationMillis); + } + + @Override + public void recordOperationFailure( + MaintainerOperationType operationType, long durationMillis, Throwable throwable) { + failureCounters.get(operationType).inc(); + durationGauges.get(operationType).setValue(durationMillis); + updateSpecificDuration(operationType, durationMillis); + } + + private void updateSpecificDuration(MaintainerOperationType operationType, long durationMillis) { + switch (operationType) { + case ORPHAN_FILES_CLEANING: + orphanFilesCleaningDuration.setValue(durationMillis); + break; + case DANGLING_DELETE_FILES_CLEANING: + danglingDeleteFilesCleaningDuration.setValue(durationMillis); + break; + case SNAPSHOT_EXPIRATION: + snapshotsExpirationDuration.setValue(durationMillis); + break; + case DATA_EXPIRATION: + dataExpirationDuration.setValue(durationMillis); + break; + case TAG_CREATION: + tagCreationDuration.setValue(durationMillis); + break; + case PARTITION_EXPIRATION: + partitionExpirationDuration.setValue(durationMillis); + break; + default: + break; + } + } + + // ========== Internal Helper Classes ========== + + /** Gauge implementation for recording last operation duration */ + private static class LastOperationDurationGauge implements Gauge { + private volatile long value = 0L; + + public void setValue(long value) { + this.value = value; + } + + @Override + public Long getValue() { + return value; + } + } + + /** Gauge implementation for recording operation duration */ + private static class OperationDurationGauge implements Gauge { + private volatile long value = 0L; + + public void setValue(long value) { + this.value = value; + } + + @Override + public Long getValue() { + return value; + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java deleted file mode 100644 index 481eb175b3..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.table; - -import static org.apache.amoro.metrics.MetricDefine.defineCounter; - -import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.maintainer.MaintainerMetrics; -import org.apache.amoro.metrics.Counter; -import org.apache.amoro.metrics.MetricDefine; -import org.apache.amoro.metrics.MetricRegistry; - -/** Table Orphan Files Cleaning metrics. */ -public class TableOrphanFilesCleaningMetrics extends AbstractTableMetrics - implements MaintainerMetrics { - private final Counter orphanDataFilesCount = new Counter(); - private final Counter expectedOrphanDataFilesCount = new Counter(); - - private final Counter orphanMetadataFilesCount = new Counter(); - private final Counter expectedOrphanMetadataFilesCount = new Counter(); - - public TableOrphanFilesCleaningMetrics(ServerTableIdentifier identifier) { - super(identifier); - } - - public static final MetricDefine TABLE_ORPHAN_CONTENT_FILE_CLEANING_COUNT = - defineCounter("table_orphan_content_file_cleaning_count") - .withDescription("Count of orphan content files cleaned in the table since ams started") - .withTags("catalog", "database", "table") - .build(); - - public static final MetricDefine TABLE_ORPHAN_METADATA_FILE_CLEANING_COUNT = - defineCounter("table_orphan_metadata_file_cleaning_count") - .withDescription("Count of orphan metadata files cleaned in the table since ams started") - .withTags("catalog", "database", "table") - .build(); - - public static final MetricDefine TABLE_EXPECTED_ORPHAN_CONTENT_FILE_CLEANING_COUNT = - defineCounter("table_expected_orphan_content_file_cleaning_count") - .withDescription( - "Expected count of orphan content files cleaned in the table since ams started") - .withTags("catalog", "database", "table") - .build(); - - public static final MetricDefine TABLE_EXPECTED_ORPHAN_METADATA_FILE_CLEANING_COUNT = - defineCounter("table_expected_orphan_metadata_file_cleaning_count") - .withDescription( - "Expected count of orphan metadata files cleaned in the table since ams started") - .withTags("catalog", "database", "table") - .build(); - - @Override - public void registerMetrics(MetricRegistry registry) { - if (globalRegistry == null) { - registerMetric(registry, TABLE_ORPHAN_CONTENT_FILE_CLEANING_COUNT, orphanDataFilesCount); - registerMetric(registry, TABLE_ORPHAN_METADATA_FILE_CLEANING_COUNT, orphanMetadataFilesCount); - registerMetric( - registry, - TABLE_EXPECTED_ORPHAN_CONTENT_FILE_CLEANING_COUNT, - expectedOrphanDataFilesCount); - registerMetric( - registry, - TABLE_EXPECTED_ORPHAN_METADATA_FILE_CLEANING_COUNT, - expectedOrphanMetadataFilesCount); - globalRegistry = registry; - } - } - - public void completeOrphanDataFiles(int expected, int cleaned) { - expectedOrphanDataFilesCount.inc(expected); - orphanDataFilesCount.inc(cleaned); - } - - public void completeOrphanMetadataFiles(int expected, int cleaned) { - expectedOrphanMetadataFilesCount.inc(expected); - orphanMetadataFilesCount.inc(cleaned); - } - - @Override - public void recordOrphanDataFilesCleaned(int expected, int cleaned) { - completeOrphanDataFiles(expected, cleaned); - } - - @Override - public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) { - completeOrphanMetadataFiles(expected, cleaned); - } -} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java index 1326bacdd7..f25d955f67 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java @@ -22,14 +22,13 @@ import static org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.FLINK_JOB_ID; import org.apache.amoro.BasicTableTestHelper; -import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.TableTestHelper; import org.apache.amoro.catalog.BasicCatalogTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer; +import org.apache.amoro.maintainer.MaintainerMetrics; import org.apache.amoro.server.scheduler.inline.ExecutorTestBase; -import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics; import org.apache.amoro.table.TableIdentifier; import org.apache.amoro.table.TableProperties; import org.apache.amoro.table.UnkeyedTable; @@ -110,23 +109,17 @@ public void orphanDataFileClean() throws IOException { Assert.assertTrue(getMixedTable().io().exists(changeOrphanFilePath)); } TableIdentifier tableIdentifier = getMixedTable().id(); - TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics( - ServerTableIdentifier.of( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - getTestFormat())); + MaintainerMetrics metrics = MaintainerMetrics.NOOP; MixedTableMaintainer maintainer = new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())); maintainer.cleanContentFiles( System.currentTimeMillis() - TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT * 60 * 1000, - orphanFilesCleaningMetrics); + metrics); maintainer.cleanMetadata( System.currentTimeMillis() - TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT * 60 * 1000, - orphanFilesCleaningMetrics); + metrics); Assert.assertTrue(getMixedTable().io().exists(baseOrphanFileDir)); Assert.assertTrue(getMixedTable().io().exists(baseOrphanFilePath)); @@ -135,8 +128,8 @@ public void orphanDataFileClean() throws IOException { Assert.assertTrue(getMixedTable().io().exists(changeOrphanFilePath)); } - maintainer.cleanContentFiles(System.currentTimeMillis(), orphanFilesCleaningMetrics); - maintainer.cleanMetadata(System.currentTimeMillis(), orphanFilesCleaningMetrics); + maintainer.cleanContentFiles(System.currentTimeMillis(), metrics); + maintainer.cleanMetadata(System.currentTimeMillis(), metrics); Assert.assertFalse(getMixedTable().io().exists(baseOrphanFileDir)); Assert.assertFalse(getMixedTable().io().exists(baseOrphanFilePath)); @@ -204,15 +197,8 @@ public void orphanMetadataFileClean() throws IOException { MixedTableMaintainer maintainer = new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())); - TableIdentifier tableIdentifier = getMixedTable().id(); - TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics( - ServerTableIdentifier.of( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - getTestFormat())); - maintainer.cleanMetadata(System.currentTimeMillis(), orphanFilesCleaningMetrics); + MaintainerMetrics metrics = MaintainerMetrics.NOOP; + maintainer.cleanMetadata(System.currentTimeMillis(), metrics); Assert.assertFalse(getMixedTable().io().exists(baseOrphanFilePath)); if (isKeyedTable()) { @@ -298,16 +284,8 @@ public void notDeleteFlinkTemporaryFile() throws IOException { MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())); - TableIdentifier tableIdentifier = getMixedTable().id(); - TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics( - ServerTableIdentifier.of( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - getTestFormat())); - - tableMaintainer.cleanMetadata(System.currentTimeMillis(), orphanFilesCleaningMetrics); + MaintainerMetrics metrics = MaintainerMetrics.NOOP; + tableMaintainer.cleanMetadata(System.currentTimeMillis(), metrics); Assert.assertFalse(getMixedTable().io().exists(baseOrphanFilePath)); if (isKeyedTable()) { // files whose file name starts with flink.job-id should not be deleted @@ -334,22 +312,15 @@ public void notDeleteStatisticsFile() { StatisticsFile file3 = commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/data/puffin/test3.puffin"); - TableIdentifier tableIdentifier = getMixedTable().id(); - TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics( - ServerTableIdentifier.of( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - getTestFormat())); + MaintainerMetrics metrics = MaintainerMetrics.NOOP; Assert.assertTrue(unkeyedTable.io().exists(file1.path())); Assert.assertTrue(unkeyedTable.io().exists(file2.path())); Assert.assertTrue(unkeyedTable.io().exists(file3.path())); new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())) - .cleanContentFiles(System.currentTimeMillis() + 1, orphanFilesCleaningMetrics); + .cleanContentFiles(System.currentTimeMillis() + 1, metrics); new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())) - .cleanMetadata(System.currentTimeMillis() + 1, orphanFilesCleaningMetrics); + .cleanMetadata(System.currentTimeMillis() + 1, metrics); Assert.assertTrue(unkeyedTable.io().exists(file1.path())); Assert.assertTrue(unkeyedTable.io().exists(file2.path())); Assert.assertTrue(unkeyedTable.io().exists(file3.path())); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java index 729999525c..50c238318f 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java @@ -20,7 +20,6 @@ import static org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.DATA_FOLDER_NAME; -import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.TableTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; @@ -29,8 +28,7 @@ import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.hive.catalog.HiveTableTestHelper; import org.apache.amoro.hive.table.SupportHive; -import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics; -import org.apache.amoro.table.TableIdentifier; +import org.apache.amoro.maintainer.MaintainerMetrics; import org.apache.iceberg.io.OutputFile; import org.junit.Assert; import org.junit.ClassRule; @@ -87,15 +85,8 @@ public void hiveLocationOrphanDataFileClean() throws IOException { MixedTableMaintainer maintainer = new MixedTableMaintainer(getMixedTable(), TestTableMaintainerContext.of(getMixedTable())); - TableIdentifier tableIdentifier = getMixedTable().id(); - TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics = - new TableOrphanFilesCleaningMetrics( - ServerTableIdentifier.of( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - getTestFormat())); - maintainer.cleanContentFiles(System.currentTimeMillis(), orphanFilesCleaningMetrics); + MaintainerMetrics metrics = MaintainerMetrics.NOOP; + maintainer.cleanContentFiles(System.currentTimeMillis(), metrics); Assert.assertTrue(getMixedTable().io().exists(hiveOrphanFilePath)); } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableMaintainerMetrics.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableMaintainerMetrics.java new file mode 100644 index 0000000000..2375a6c824 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableMaintainerMetrics.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.metrics.MetricRegistry; +import org.junit.Test; + +public class TestTableMaintainerMetrics { + + @Test + public void testOrphanMetadataMetricNameCompatibility() { + ServerTableIdentifier identifier = + ServerTableIdentifier.of(1L, "test_catalog", "test_db", "test_table", TableFormat.ICEBERG); + TableMaintainerMetrics tableMaintainerMetrics = new TableMaintainerMetrics(identifier); + MetricRegistry registry = new MetricRegistry(); + + tableMaintainerMetrics.register(registry); + + assertTrue(containsMetric(registry, "table_orphan_metadata_file_cleaning_count")); + assertTrue(containsMetric(registry, "table_expected_orphan_metadata_file_cleaning_count")); + assertFalse(containsMetric(registry, "table_orphan_metadata_files_cleaned_count")); + assertFalse(containsMetric(registry, "table_orphan_metadata_files_cleaned_expected_count")); + } + + private boolean containsMetric(MetricRegistry registry, String metricName) { + return registry.getMetrics().keySet().stream() + .anyMatch(metricKey -> metricName.equals(metricKey.getDefine().getName())); + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java index 0059593e85..0821ae98ed 100644 --- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java +++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java @@ -19,6 +19,7 @@ package org.apache.amoro; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.maintainer.MaintainerMetrics; import org.apache.amoro.metrics.MetricRegistry; import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.process.TableProcessStore; @@ -115,6 +116,15 @@ default TableFormat getFormat() { return getTableIdentifier().getFormat(); } + /** + * Get the maintainer metrics collector. + * + * @return maintainer metrics collector + */ + default MaintainerMetrics getMaintainerMetrics() { + return MaintainerMetrics.NOOP; + } + /** Dispose the table runtime. */ default void dispose() {} } diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java index 420c61e3b0..b883e118d2 100644 --- a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java @@ -40,6 +40,62 @@ public interface MaintainerMetrics { */ void recordOrphanMetadataFilesCleaned(int expected, int cleaned); + /** + * Record dangling delete files cleaning result. + * + * @param cleaned number of files cleaned + */ + void recordDanglingDeleteFilesCleaned(int cleaned); + + /** + * Record snapshot expiration operation result. + * + * @param snapshotCount number of snapshots expired + * @param dataFilesDeleted number of data files deleted + */ + void recordSnapshotsExpired(int snapshotCount, int dataFilesDeleted); + + /** + * Record data expiration operation result. + * + * @param dataFilesExpired number of data files expired + * @param deleteFilesExpired number of delete files expired + */ + void recordDataExpired(int dataFilesExpired, int deleteFilesExpired); + + /** + * Record tag creation operation result. + * + * @param tagsCreated number of tags created + */ + void recordTagsCreated(int tagsCreated); + + /** + * Record partition expiration operation result. + * + * @param partitionsExpired number of partitions expired + * @param filesExpired number of files expired + */ + void recordPartitionsExpired(int partitionsExpired, int filesExpired); + + /** + * Record operation success completion. + * + * @param operationType operation type + * @param durationMillis operation duration in milliseconds + */ + void recordOperationSuccess(MaintainerOperationType operationType, long durationMillis); + + /** + * Record operation failure. + * + * @param operationType operation type + * @param durationMillis operation duration in milliseconds + * @param throwable exception information + */ + void recordOperationFailure( + MaintainerOperationType operationType, long durationMillis, Throwable throwable); + /** No-op implementation that does nothing. */ MaintainerMetrics NOOP = new MaintainerMetrics() { @@ -48,5 +104,28 @@ public void recordOrphanDataFilesCleaned(int expected, int cleaned) {} @Override public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) {} + + @Override + public void recordDanglingDeleteFilesCleaned(int cleaned) {} + + @Override + public void recordSnapshotsExpired(int snapshotCount, int dataFilesDeleted) {} + + @Override + public void recordDataExpired(int dataFilesExpired, int deleteFilesExpired) {} + + @Override + public void recordTagsCreated(int tagsCreated) {} + + @Override + public void recordPartitionsExpired(int partitionsExpired, int filesExpired) {} + + @Override + public void recordOperationSuccess( + MaintainerOperationType operationType, long durationMillis) {} + + @Override + public void recordOperationFailure( + MaintainerOperationType operationType, long durationMillis, Throwable throwable) {} }; } diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationExecutor.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationExecutor.java new file mode 100644 index 0000000000..a244605729 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationExecutor.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.util.function.Supplier; + +/** + * Executor for running maintainer operations with consistent metrics recording. + * + *

This executor ensures that all maintainer operations record metrics in a consistent way: + * + *

+ * + *

Usage example: + * + *

{@code
+ * MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics);
+ * executor.execute(
+ *     MaintainerOperationType.ORPHAN_FILES_CLEANING,
+ *     () -> {
+ *       // Operation logic here
+ *       cleanOrphanFiles();
+ *     }
+ * );
+ * }
+ */ +public class MaintainerOperationExecutor { + + private final MaintainerMetrics metrics; + + /** + * Creates a new operation executor with the given metrics collector. + * + * @param metrics the metrics collector (can be null, will use NOOP in that case) + */ + public MaintainerOperationExecutor(MaintainerMetrics metrics) { + this.metrics = metrics != null ? metrics : MaintainerMetrics.NOOP; + } + + /** + * Executes a maintainer operation with metrics recording. + * + *

This method will: + * + *

    + *
  1. Execute the provided operation + *
  2. On success: record operation success with duration via {@link + * MaintainerMetrics#recordOperationSuccess} + *
  3. On failure: record operation failure with duration via {@link + * MaintainerMetrics#recordOperationFailure} and rethrow the exception + *
+ * + * @param operationType the type of operation being executed + * @param operation the operation to execute + */ + public void execute(MaintainerOperationType operationType, Runnable operation) { + long startTime = System.currentTimeMillis(); + try { + operation.run(); + long duration = System.currentTimeMillis() - startTime; + metrics.recordOperationSuccess(operationType, duration); + } catch (Throwable t) { + long duration = System.currentTimeMillis() - startTime; + metrics.recordOperationFailure(operationType, duration, t); + throw t; + } + } + + /** + * Executes a maintainer operation with metrics recording and return result. + * + *

This method will: + * + *

    + *
  1. Execute the provided operation + *
  2. On success: record operation success with duration via {@link + * MaintainerMetrics#recordOperationSuccess} + *
  3. On failure: record operation failure with duration via {@link + * MaintainerMetrics#recordOperationFailure} and rethrow the exception + *
+ * + * @param operationType the type of operation being executed + * @param operation the operation to execute + * @param the result type + * @return the operation result + */ + public T executeAndReturn(MaintainerOperationType operationType, Supplier operation) { + long startTime = System.currentTimeMillis(); + try { + T result = operation.get(); + long duration = System.currentTimeMillis() - startTime; + metrics.recordOperationSuccess(operationType, duration); + return result; + } catch (Throwable t) { + long duration = System.currentTimeMillis() - startTime; + metrics.recordOperationFailure(operationType, duration, t); + throw t; + } + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationType.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationType.java new file mode 100644 index 0000000000..41baafe3a0 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOperationType.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +/** + * Enumeration of maintenance operation types + * + *

Corresponds to various maintenance operations in TableMaintainer, used for metrics recording + * and classification + */ +public enum MaintainerOperationType { + /** Orphan files cleaning (including data files and metadata files) */ + ORPHAN_FILES_CLEANING("orphan_files_cleaning"), + + /** Dangling delete files cleaning (Iceberg specific) */ + DANGLING_DELETE_FILES_CLEANING("dangling_delete_files_cleaning"), + + /** Snapshot expiration */ + SNAPSHOT_EXPIRATION("snapshot_expiration"), + + /** Data expiration */ + DATA_EXPIRATION("data_expiration"), + + /** Tag creation */ + TAG_CREATION("tag_creation"), + + /** Partition expiration (Paimon specific) */ + PARTITION_EXPIRATION("partition_expiration"); + + private final String metricName; + + MaintainerOperationType(String metricName) { + this.metricName = metricName; + } + + /** + * Get metric name + * + * @return Metric name + */ + public String getMetricName() { + return metricName; + } +} diff --git a/amoro-common/src/test/java/org/apache/amoro/maintainer/TestMaintainerOperationExecutor.java b/amoro-common/src/test/java/org/apache/amoro/maintainer/TestMaintainerOperationExecutor.java new file mode 100644 index 0000000000..c61bc2678c --- /dev/null +++ b/amoro-common/src/test/java/org/apache/amoro/maintainer/TestMaintainerOperationExecutor.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class TestMaintainerOperationExecutor { + + @Mock private MaintainerMetrics mockMetrics; + + private MaintainerOperationExecutor executor; + + @BeforeEach + public void setUp() { + executor = new MaintainerOperationExecutor(mockMetrics); + } + + @Test + public void testSuccessfulOperation() { + // Execute operation + executor.execute( + MaintainerOperationType.ORPHAN_FILES_CLEANING, + () -> { + // Operation logic here + }); + + // Verify metrics recorded + verify(mockMetrics) + .recordOperationSuccess(eq(MaintainerOperationType.ORPHAN_FILES_CLEANING), anyLong()); + verify(mockMetrics, never()) + .recordOperationFailure( + eq(MaintainerOperationType.ORPHAN_FILES_CLEANING), anyLong(), any()); + } + + @Test + public void testFailedOperation() { + RuntimeException exception = new RuntimeException("Test error"); + + // Execute operation and expect exception + assertThrows( + RuntimeException.class, + () -> + executor.execute( + MaintainerOperationType.SNAPSHOT_EXPIRATION, + () -> { + throw exception; + })); + + // Verify metrics recorded + verify(mockMetrics) + .recordOperationFailure( + eq(MaintainerOperationType.SNAPSHOT_EXPIRATION), anyLong(), eq(exception)); + verify(mockMetrics, never()) + .recordOperationSuccess(eq(MaintainerOperationType.SNAPSHOT_EXPIRATION), anyLong()); + } + + @Test + public void testOperationWithResult() { + Integer expected = 42; + + // Execute operation with result + Integer result = + executor.executeAndReturn( + MaintainerOperationType.DATA_EXPIRATION, + () -> { + return expected; + }); + + // Verify result + assertEquals(expected, result); + + // Verify metrics recorded + verify(mockMetrics) + .recordOperationSuccess(eq(MaintainerOperationType.DATA_EXPIRATION), anyLong()); + } + + @Test + public void testOperationWithResultFailure() { + IllegalStateException exception = new IllegalStateException("Test state error"); + + // Execute operation and expect exception + assertThrows( + IllegalStateException.class, + () -> + executor.executeAndReturn( + MaintainerOperationType.TAG_CREATION, + () -> { + throw exception; + })); + + // Verify metrics recorded + verify(mockMetrics) + .recordOperationFailure(eq(MaintainerOperationType.TAG_CREATION), anyLong(), eq(exception)); + } + + @Test + public void testNullMetricsUsesNoop() { + // Create executor with null metrics + MaintainerOperationExecutor noopExecutor = new MaintainerOperationExecutor(null); + + // Should not throw exception + assertDoesNotThrow( + () -> + noopExecutor.execute( + MaintainerOperationType.DANGLING_DELETE_FILES_CLEANING, + () -> { + // Operation logic here + })); + + // Execute with result + Integer result = + assertDoesNotThrow( + () -> + noopExecutor.executeAndReturn( + MaintainerOperationType.PARTITION_EXPIRATION, () -> 123)); + assertEquals(123, result); + } + + @Test + public void testMultipleOperations() { + // Execute multiple operations + executor.execute( + MaintainerOperationType.ORPHAN_FILES_CLEANING, + () -> { + // First operation + }); + + executor.execute( + MaintainerOperationType.SNAPSHOT_EXPIRATION, + () -> { + // Second operation + }); + + // Verify both operations were recorded + verify(mockMetrics, times(1)) + .recordOperationSuccess(eq(MaintainerOperationType.ORPHAN_FILES_CLEANING), anyLong()); + verify(mockMetrics, times(1)) + .recordOperationSuccess(eq(MaintainerOperationType.SNAPSHOT_EXPIRATION), anyLong()); + } + + @Test + public void testDurationIsRecorded() { + // Add a small delay to ensure duration > 0 + long startTime = System.currentTimeMillis(); + executor.execute( + MaintainerOperationType.DATA_EXPIRATION, + () -> { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + long endTime = System.currentTimeMillis(); + + // Verify duration is recorded and is reasonable + verify(mockMetrics) + .recordOperationSuccess(eq(MaintainerOperationType.DATA_EXPIRATION), anyLong()); + } + + @Test + public void testAllOperationTypes() { + // Test all operation types + MaintainerOperationType[] operationTypes = MaintainerOperationType.values(); + + for (MaintainerOperationType operationType : operationTypes) { + executor.execute( + operationType, + () -> { + // Operation logic + }); + } + + // Verify all operation types were recorded + for (MaintainerOperationType operationType : operationTypes) { + verify(mockMetrics, times(1)).recordOperationSuccess(eq(operationType), anyLong()); + } + } +} diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/AutoCreateIcebergTagAction.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/AutoCreateIcebergTagAction.java index 0253b87910..f42aaa6f0d 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/AutoCreateIcebergTagAction.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/AutoCreateIcebergTagAction.java @@ -51,20 +51,27 @@ public AutoCreateIcebergTagAction( this.tagName = tagConfig.getTriggerPeriod().generateTagName(tagTime, tagConfig.getTagFormat()); } - public void execute() { + /** + * Execute the tag creation action. + * + * @return the number of tags created (0 or 1) + */ + public int execute() { if (!tagConfig.isAutoCreateTag()) { - return; + return 0; } LOG.debug("Start checking the automatic creation of tags for {}", table.name()); if (tagExist()) { LOG.debug("Found the expected tag on {}, skip", table.name()); - return; + return 0; } boolean success = createTag(); if (success) { LOG.info("Created a tag successfully on {}", table.name()); + return 1; } else { LOG.info("Skipped tag creation on {}", table.name()); + return 0; } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java index eb89c07c09..73506bf109 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java @@ -31,6 +31,8 @@ import org.apache.amoro.io.PathInfo; import org.apache.amoro.io.SupportsFileSystemOperations; import org.apache.amoro.maintainer.MaintainerMetrics; +import org.apache.amoro.maintainer.MaintainerOperationExecutor; +import org.apache.amoro.maintainer.MaintainerOperationType; import org.apache.amoro.maintainer.OptimizingInfo; import org.apache.amoro.maintainer.TableMaintainer; import org.apache.amoro.maintainer.TableMaintainerContext; @@ -139,38 +141,63 @@ public void cleanOrphanFiles() { return; } - long keepTime = tableConfiguration.getOrphanExistingMinutes() * 60 * 1000; + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); - cleanContentFiles(System.currentTimeMillis() - keepTime, metrics); + executor.execute( + MaintainerOperationType.ORPHAN_FILES_CLEANING, + () -> { + long keepTime = tableConfiguration.getOrphanExistingMinutes() * 60 * 1000; - // refresh - table.refresh(); + cleanContentFiles(System.currentTimeMillis() - keepTime, metrics); + + // refresh + table.refresh(); - // clear metadata files - cleanMetadata(System.currentTimeMillis() - keepTime, metrics); + // clear metadata files + cleanMetadata(System.currentTimeMillis() - keepTime, metrics); + }); } @Override public void cleanDanglingDeleteFiles() { TableConfiguration tableConfiguration = context.getTableConfiguration(); + MaintainerMetrics metrics = context.getMetrics(); + if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) { return; } - Snapshot currentSnapshot = table.currentSnapshot(); - if (currentSnapshot == null) { - return; - } - Optional totalDeleteFiles = - Optional.ofNullable(currentSnapshot.summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP)); - if (totalDeleteFiles.isPresent() && Long.parseLong(totalDeleteFiles.get()) > 0) { - // clear dangling delete files - doCleanDanglingDeleteFiles(); - } else { - LOG.debug( - "There are no delete files here, so there is no need to clean dangling delete file for table {}", - table.name()); - } + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); + + executor.execute( + MaintainerOperationType.DANGLING_DELETE_FILES_CLEANING, + () -> { + Snapshot currentSnapshot = table.currentSnapshot(); + if (currentSnapshot == null) { + return; + } + Optional totalDeleteFiles = + Optional.ofNullable( + currentSnapshot.summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP)); + if (totalDeleteFiles.isPresent() && Long.parseLong(totalDeleteFiles.get()) > 0) { + // clear dangling delete files + LOG.info("Starting cleaning dangling delete files for table {}", table.name()); + int danglingDeleteFilesCnt = clearInternalTableDanglingDeleteFiles(); + runWithCondition( + danglingDeleteFilesCnt > 0, + () -> { + LOG.info( + "Deleted {} dangling delete files for table {}", + danglingDeleteFilesCnt, + table.name()); + metrics.recordDanglingDeleteFilesCleaned(danglingDeleteFilesCnt); + }); + } else { + LOG.debug( + "There are no delete files here, so there is no need to clean dangling delete file for table {}", + table.name()); + } + }); } @Override @@ -178,68 +205,89 @@ public void expireSnapshots() { if (!expireSnapshotEnabled()) { return; } - expireSnapshots( - mustOlderThan(System.currentTimeMillis()), - context.getTableConfiguration().getSnapshotMinCount()); - } - - public boolean expireSnapshotEnabled() { - TableConfiguration tableConfiguration = context.getTableConfiguration(); - return tableConfiguration.isExpireSnapshotEnabled(); - } + MaintainerMetrics metrics = context.getMetrics(); + long mustOlderThan = mustOlderThan(System.currentTimeMillis()); + int minCount = context.getTableConfiguration().getSnapshotMinCount(); - @VisibleForTesting - public void expireSnapshots(long mustOlderThan, int minCount) { - expireSnapshots(mustOlderThan, minCount, expireSnapshotNeedToExcludeFiles()); + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); + executor.execute( + MaintainerOperationType.SNAPSHOT_EXPIRATION, + () -> expireSnapshotsInternal(mustOlderThan, minCount, metrics)); } - private void expireSnapshots(long olderThan, int minCount, Set exclude) { + private void expireSnapshotsInternal( + long mustOlderThan, int minCount, MaintainerMetrics metrics) { + Set exclude = expireSnapshotNeedToExcludeFiles(); + Set beforeSnapshotIds = snapshotIds(table.snapshots()); LOG.debug( "Starting snapshots expiration for table {}, expiring snapshots older than {} and retain last {} snapshots, excluding {}", table.name(), - olderThan, + mustOlderThan, minCount, exclude); RollingFileCleaner expiredFileCleaner = new RollingFileCleaner(fileIO(), exclude); table .expireSnapshots() .retainLast(Math.max(minCount, 1)) - .expireOlderThan(olderThan) + .expireOlderThan(mustOlderThan) .deleteWith(expiredFileCleaner::addFile) .cleanExpiredFiles( true) /* enable clean only for collecting the expired files, will delete them later */ .commit(); - int collectedFiles = expiredFileCleaner.fileCount(); expiredFileCleaner.clear(); - if (collectedFiles > 0) { + int dataFilesDeleted = expiredFileCleaner.cleanedFileCount(); + table.refresh(); + Set afterSnapshotIds = snapshotIds(table.snapshots()); + beforeSnapshotIds.removeAll(afterSnapshotIds); + int expiredSnapshotCount = beforeSnapshotIds.size(); + + if (expiredSnapshotCount > 0) { LOG.info( - "Expired {}/{} files for table {} order than {}", - collectedFiles, - expiredFileCleaner.cleanedFileCount(), + "Expired {} snapshots and deleted {} files for table {} older than {}", + expiredSnapshotCount, + dataFilesDeleted, table.name(), - DateTimeUtil.formatTimestampMillis(olderThan)); + DateTimeUtil.formatTimestampMillis(mustOlderThan)); + metrics.recordSnapshotsExpired(expiredSnapshotCount, dataFilesDeleted); } else { LOG.debug( - "No expired files found for table {} order than {}", + "No expired files found for table {} older than {}", table.name(), - DateTimeUtil.formatTimestampMillis(olderThan)); + DateTimeUtil.formatTimestampMillis(mustOlderThan)); } } + public boolean expireSnapshotEnabled() { + TableConfiguration tableConfiguration = context.getTableConfiguration(); + return tableConfiguration.isExpireSnapshotEnabled(); + } + + @VisibleForTesting + public void expireSnapshots(long mustOlderThan, int minCount) { + MaintainerMetrics metrics = context.getMetrics(); + expireSnapshotsInternal(mustOlderThan, minCount, metrics); + } + @Override public void expireData() { DataExpirationConfig expirationConfig = context.getTableConfiguration().getExpiringDataConfig(); - try { - Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); - if (!isValidDataExpirationField(expirationConfig, field, table.name())) { - return; - } + MaintainerMetrics metrics = context.getMetrics(); - expireDataFrom(expirationConfig, expireBaseOnRule(expirationConfig, field)); - } catch (Throwable t) { - LOG.error("Unexpected purge error for table {} ", tableIdentifier, t); - } + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); + executor.execute( + MaintainerOperationType.DATA_EXPIRATION, + () -> { + Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); + if (!isValidDataExpirationField(expirationConfig, field, table.name())) { + return; + } + + Instant expireInstant = expireBaseOnRule(expirationConfig, field); + if (!expireInstant.equals(Instant.MIN)) { + doExpireData(expirationConfig, expireInstant, metrics); + } + }); } public Instant expireBaseOnRule(DataExpirationConfig expirationConfig, Types.NestedField field) { @@ -273,26 +321,59 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan if (instant.equals(Instant.MIN)) { return; } + MaintainerMetrics metrics = context.getMetrics(); + doExpireData(expirationConfig, instant, metrics); + } + private void doExpireData( + DataExpirationConfig expirationConfig, Instant instant, MaintainerMetrics metrics) { + if (instant.equals(Instant.MIN)) { + return; + } long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli(); + Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); LOG.info( "Expiring data older than {} in table {} ", - Instant.ofEpochMilli(expireTimestamp) - .atZone( - getDefaultZoneId(table.schema().findField(expirationConfig.getExpirationField()))) - .toLocalDateTime(), + Instant.ofEpochMilli(expireTimestamp).atZone(getDefaultZoneId(field)).toLocalDateTime(), table.name()); Expression dataFilter = getDataExpression(table.schema(), expirationConfig, expireTimestamp); ExpireFiles expiredFiles = expiredFileScan(expirationConfig, dataFilter, expireTimestamp); - expireFiles(expiredFiles, expireTimestamp); + + int dataFilesCount = expiredFiles.dataFiles.size(); + int deleteFilesCount = expiredFiles.deleteFiles.size(); + + if (dataFilesCount > 0 || deleteFilesCount > 0) { + expireFiles(expiredFiles, expireTimestamp); + LOG.info( + "Data expiration completed for table {}, {} data files and {} delete files expired", + table.name(), + dataFilesCount, + deleteFilesCount); + metrics.recordDataExpired(dataFilesCount, deleteFilesCount); + } } @Override public void autoCreateTags() { TagConfiguration tagConfiguration = context.getTableConfiguration().getTagConfiguration(); - new AutoCreateIcebergTagAction(table, tagConfiguration, LocalDateTime.now()).execute(); + MaintainerMetrics metrics = context.getMetrics(); + + MaintainerOperationExecutor executor = new MaintainerOperationExecutor(metrics); + executor.execute( + MaintainerOperationType.TAG_CREATION, + () -> { + if (!tagConfiguration.isAutoCreateTag()) { + return; + } + int tagsCreated = + new AutoCreateIcebergTagAction(table, tagConfiguration, LocalDateTime.now()) + .execute(); + if (tagsCreated > 0) { + metrics.recordTagsCreated(tagsCreated); + } + }); } public void cleanContentFiles(long lastTime, MaintainerMetrics metrics) { @@ -926,6 +1007,14 @@ public static ZoneId getDefaultZoneId(Types.NestedField expireField) { return ZoneOffset.UTC; } + private static Set snapshotIds(Iterable snapshots) { + Set snapshotIds = new HashSet<>(); + for (Snapshot snapshot : snapshots) { + snapshotIds.add(snapshot.snapshotId()); + } + return snapshotIds; + } + private void runWithCondition(boolean condition, Runnable fun) { if (condition) { fun.run(); diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestIcebergTableMaintainerMetrics.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestIcebergTableMaintainerMetrics.java new file mode 100644 index 0000000000..a3eb8d0a45 --- /dev/null +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestIcebergTableMaintainerMetrics.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.formats.iceberg.maintainer; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.maintainer.MaintainerMetrics; +import org.apache.amoro.maintainer.MaintainerOperationType; +import org.apache.amoro.maintainer.OptimizingInfo; +import org.apache.amoro.maintainer.TableMaintainerContext; +import org.apache.amoro.table.TableIdentifier; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.Arrays; +import java.util.Collections; + +/** + * Unit tests for IcebergTableMaintainer metrics recording. + * + *

These tests verify that IcebergTableMaintainer correctly records metrics through the + * MaintainerMetrics interface for all maintainer operations. + */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class TestIcebergTableMaintainerMetrics { + + @Mock private TableMaintainerContext mockContext; + + @Mock private MaintainerMetrics mockMetrics; + + @Mock private TableConfiguration mockTableConfiguration; + + @Mock private OptimizingInfo mockOptimizingInfo; + + @Mock private org.apache.amoro.config.TagConfiguration mockTagConfiguration; + + @Mock private org.apache.amoro.config.DataExpirationConfig mockDataExpirationConfig; + + private IcebergTableMaintainer maintainer; + + @BeforeEach + public void setUp() { + TableIdentifier tableIdentifier = TableIdentifier.of("test_catalog", "test_db", "test_table"); + + when(mockContext.getMetrics()).thenReturn(mockMetrics); + when(mockContext.getTableConfiguration()).thenReturn(mockTableConfiguration); + when(mockContext.getOptimizingInfo()).thenReturn(mockOptimizingInfo); + when(mockOptimizingInfo.isProcessing()).thenReturn(false); + + // Feature disabled by default to avoid complex setup + when(mockTableConfiguration.isCleanOrphanEnabled()).thenReturn(false); + when(mockTableConfiguration.isDeleteDanglingDeleteFilesEnabled()).thenReturn(false); + when(mockTableConfiguration.isExpireSnapshotEnabled()).thenReturn(false); + + // Setup Tag and DataExpiration configs to avoid NPE + when(mockTableConfiguration.getTagConfiguration()).thenReturn(mockTagConfiguration); + when(mockTagConfiguration.isAutoCreateTag()).thenReturn(false); + when(mockTableConfiguration.getExpiringDataConfig()).thenReturn(mockDataExpirationConfig); + // Data expiration disabled by default + when(mockDataExpirationConfig.isEnabled()).thenReturn(false); + // Use a valid field name ("ts" matches the mock table schema) to avoid issues + when(mockDataExpirationConfig.getExpirationField()).thenReturn("ts"); + + // Create a minimal Table mock to avoid complex setup + org.apache.iceberg.Table table = createMinimalTableMock(); + + maintainer = new IcebergTableMaintainer(table, tableIdentifier, mockContext); + } + + @Test + public void testExpireSnapshotsDisabledDoesNotRecordMetrics() { + // Feature disabled by default in setUp() + when(mockTableConfiguration.isExpireSnapshotEnabled()).thenReturn(false); + + // Execute + maintainer.expireSnapshots(); + + // Verify no metrics are recorded when feature is disabled + verify(mockMetrics, never()) + .recordOperationSuccess(any(MaintainerOperationType.class), anyLong()); + } + + @Test + public void testAutoCreateTagsDisabledDoesNotRecordDetailedMetrics() { + // Setup - feature disabled + when(mockTagConfiguration.isAutoCreateTag()).thenReturn(false); + + // Execute + maintainer.autoCreateTags(); + + // Verify operation-level success is still recorded even when no tags are created + verify(mockMetrics).recordOperationSuccess(eq(MaintainerOperationType.TAG_CREATION), anyLong()); + } + + @Test + public void testExpireDataInvalidFieldDoesNotRecordDetailedMetrics() { + // Setup - empty expiration field causes IllegalArgumentException + when(mockDataExpirationConfig.isEnabled()).thenReturn(true); + when(mockDataExpirationConfig.getRetentionTime()).thenReturn(1L); + when(mockDataExpirationConfig.getExpirationField()).thenReturn(""); + when(mockDataExpirationConfig.getBaseOnRule()) + .thenReturn(org.apache.amoro.config.DataExpirationConfig.BaseOnRule.CURRENT_TIME); + + // Execute - this will throw exception for empty field name + assertThrows(IllegalArgumentException.class, () -> maintainer.expireData()); + + // When exception occurs, recordOperationFailure is called with duration + verify(mockMetrics) + .recordOperationFailure(eq(MaintainerOperationType.DATA_EXPIRATION), anyLong(), any()); + } + + @Test + public void testCleanOrphanFilesDisabledDoesNotRecordMetrics() { + // Feature disabled by default in setUp() + when(mockTableConfiguration.isCleanOrphanEnabled()).thenReturn(false); + + // Execute + maintainer.cleanOrphanFiles(); + + // Verify no metrics are recorded when feature is disabled + verify(mockMetrics, never()) + .recordOperationSuccess(any(MaintainerOperationType.class), anyLong()); + } + + @Test + public void testCleanDanglingDeleteFilesDisabledDoesNotRecordMetrics() { + // Feature disabled by default in setUp() + when(mockTableConfiguration.isDeleteDanglingDeleteFilesEnabled()).thenReturn(false); + + // Execute + maintainer.cleanDanglingDeleteFiles(); + + // Verify no dangling delete metrics are recorded when feature is disabled + verify(mockMetrics, never()).recordDanglingDeleteFilesCleaned(anyInt()); + } + + @Test + public void testExpireSnapshotsRecordByExpiredSnapshotCount() { + org.apache.iceberg.Table table = createMinimalTableMock(); + org.apache.iceberg.ExpireSnapshots expireSnapshots = + org.mockito.Mockito.mock(org.apache.iceberg.ExpireSnapshots.class); + when(table.expireSnapshots()).thenReturn(expireSnapshots); + when(expireSnapshots.retainLast(anyInt())).thenReturn(expireSnapshots); + when(expireSnapshots.expireOlderThan(anyLong())).thenReturn(expireSnapshots); + when(expireSnapshots.deleteWith(any())).thenReturn(expireSnapshots); + when(expireSnapshots.cleanExpiredFiles(anyBoolean())).thenReturn(expireSnapshots); + + org.apache.iceberg.Snapshot snapshot1 = + org.mockito.Mockito.mock(org.apache.iceberg.Snapshot.class); + org.apache.iceberg.Snapshot snapshot2 = + org.mockito.Mockito.mock(org.apache.iceberg.Snapshot.class); + when(snapshot1.snapshotId()).thenReturn(1L); + when(snapshot2.snapshotId()).thenReturn(2L); + when(table.snapshots()) + .thenReturn(Arrays.asList(snapshot1, snapshot2), Collections.singletonList(snapshot2)); + + IcebergTableMaintainer snapshotMaintainer = + new IcebergTableMaintainer( + table, TableIdentifier.of("test_catalog", "test_db", "test_table"), mockContext); + + snapshotMaintainer.expireSnapshots(System.currentTimeMillis(), 1); + + verify(mockMetrics).recordSnapshotsExpired(eq(1), eq(0)); + } + + @Test + public void testAllMaintainerOperationsExist() { + // Verify that IcebergTableMaintainer implements methods for all relevant operation types + + // IcebergTableMaintainer should cover 5 out of 6 operation types: + // ORPHAN_FILES_CLEANING - cleanOrphanFiles() + // DANGLING_DELETE_FILES_CLEANING - cleanDanglingDeleteFiles() + // SNAPSHOT_EXPIRATION - expireSnapshots() + // DATA_EXPIRATION - expireData() + // TAG_CREATION - autoCreateTags() + // PARTITION_EXPIRATION - not in IcebergTableMaintainer (Paimon-specific) + + // Verify the methods exist and can be called + assertDoesNotThrow(() -> maintainer.cleanOrphanFiles()); + assertDoesNotThrow(() -> maintainer.cleanDanglingDeleteFiles()); + assertDoesNotThrow(() -> maintainer.expireSnapshots()); + assertDoesNotThrow(() -> maintainer.expireData()); + assertDoesNotThrow(() -> maintainer.autoCreateTags()); + } + + @Test + public void testMaintainerMetricsInterfaceHasAllOperationTypes() { + // Verify MaintainerOperationType enum has all expected operation types + MaintainerOperationType[] operationTypes = MaintainerOperationType.values(); + + // Should have 6 operation types + assertEquals(6, operationTypes.length); + + // Verify specific operation types exist + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("ORPHAN_FILES_CLEANING")); + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("DANGLING_DELETE_FILES_CLEANING")); + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("SNAPSHOT_EXPIRATION")); + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("DATA_EXPIRATION")); + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("TAG_CREATION")); + assertDoesNotThrow(() -> MaintainerOperationType.valueOf("PARTITION_EXPIRATION")); + } + + /** + * Creates a minimal Table mock that avoids complex setup. The mock is configured to handle basic + * method calls without throwing exceptions. + */ + private org.apache.iceberg.Table createMinimalTableMock() { + org.apache.iceberg.Table table = org.mockito.Mockito.mock(org.apache.iceberg.Table.class); + + // Configure minimal behavior to avoid NullPointerExceptions + when(table.name()).thenReturn("test_catalog.test_db.test_table"); + when(table.schema()) + .thenReturn( + new org.apache.iceberg.Schema( + org.apache.iceberg.types.Types.NestedField.optional( + 1, "ts", org.apache.iceberg.types.Types.TimestampType.withoutZone()))); + when(table.currentSnapshot()).thenReturn(null); + + return table; + } +}