Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ private EndpointGroup apiGroup() {
"/catalogs/{catalog}/dbs/{db}/tables/{table}/operations",
tableController::getTableOperations);
get("/catalogs/{catalog}/dbs/{db}/tables/{table}/tags", tableController::getTableTags);
post(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/tags", tableController::createTag);
delete(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/tags/{tagName}",
tableController::deleteTag);
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/branches",
tableController::getTableBranches);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.data.GenericRecord;
Expand Down Expand Up @@ -653,6 +654,52 @@ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
return Collections.emptyList();
}

@Override
public void createTag(
AmoroTable<?> amoroTable, String tagName, long snapshotId, Long maxRefAgeMs) {
MixedTable mixedTable = getTable(amoroTable);
Preconditions.checkArgument(
!mixedTable.isKeyedTable(), "Creating tags on KeyedTable is not supported yet");
Table icebergTable = mixedTable.asUnkeyedTable();
TableOperations ops = ((HasTableOperations) icebergTable).operations();
TableMetadata base = ops.refresh();
Preconditions.checkNotNull(base, "Table metadata is null");
Preconditions.checkNotNull(
base.snapshot(snapshotId), "Snapshot %s not found in table", snapshotId);
// Align with Iceberg createTag semantics (create-only, fail on existing ref)
// to avoid silently overwriting an existing tag or branch with the same name.
Preconditions.checkArgument(
base.ref(tagName) == null, "Ref %s already exists in table", tagName);

TableMetadata.Builder builder = TableMetadata.buildFrom(base);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] createTag currently uses setRef without guarding name conflicts.\n\nReason summary: builder.setRef(tagName, ...) can overwrite an existing ref with the same name, which is riskier than Iceberg createTag semantics (create-only, fail on existing ref).\n\nSuggested fix: add a pre-check such as Preconditions.checkArgument(base.ref(tagName) == null, ...) before setting the ref, or switch to an Iceberg create-tag path that enforces non-overwrite behavior.

SnapshotRef.Builder refBuilder = SnapshotRef.tagBuilder(snapshotId);
if (maxRefAgeMs != null && maxRefAgeMs > 0) {
refBuilder.maxRefAgeMs(maxRefAgeMs);
}
builder.setRef(tagName, refBuilder.build());
TableMetadata updated = builder.build();
ops.commit(base, updated);
}

@Override
public void deleteTag(AmoroTable<?> amoroTable, String tagName) {
MixedTable mixedTable = getTable(amoroTable);
Preconditions.checkArgument(
!mixedTable.isKeyedTable(), "Deleting tags on KeyedTable is not supported yet");
Table icebergTable = mixedTable.asUnkeyedTable();
TableOperations ops = ((HasTableOperations) icebergTable).operations();
TableMetadata base = ops.refresh();
Preconditions.checkNotNull(base, "Table metadata is null");
SnapshotRef existingRef = base.ref(tagName);
Preconditions.checkArgument(
existingRef != null && existingRef.isTag(), "Tag %s not found in table", tagName);

TableMetadata.Builder builder = TableMetadata.buildFrom(base);
builder.removeRef(tagName);
TableMetadata updated = builder.build();
ops.commit(base, updated);
}

@Override
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P0] Broken method signature causes compilation failure.\n\nReason summary: right after deleteTag, the method declaration for getOptimizingProcessesInfo(...) is missing, so this file becomes syntactically invalid (; expected) and amoro-ams cannot build.\n\nSuggested fix: restore the full method signature (@Override, return type, method name, params) and rerun ./mvnw -pl amoro-ams -am -DskipTests -Pskip-dashboard-build compile to verify.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this 090c167

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ public List<ConsumerInfo> getTableConsumersInfos(TableIdentifier tableIdentifier
return formatTableDescriptor.getTableConsumerInfos(amoroTable);
}

public void createTag(
TableIdentifier tableIdentifier, String tagName, long snapshotId, Long maxRefAgeMs) {
AmoroTable<?> amoroTable = loadTable(tableIdentifier);
FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format());
formatTableDescriptor.createTag(amoroTable, tagName, snapshotId, maxRefAgeMs);
}

public void deleteTag(TableIdentifier tableIdentifier, String tagName) {
AmoroTable<?> amoroTable = loadTable(tableIdentifier);
FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format());
formatTableDescriptor.deleteTag(amoroTable, tagName);
}

public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
TableIdentifier tableIdentifier, String type, ProcessStatus status, int limit, int offset) {
AmoroTable<?> amoroTable = loadTable(tableIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,60 @@ public void getTableTags(Context ctx) {
ctx.json(OkResponse.of(amsPageResult));
}

/**
* Create a tag manually for an Iceberg table.
*
* @param ctx - context for handling the request and response
*/
public void createTag(Context ctx) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Missing test coverage for the new tag management API/flow.\n\nReason summary: this PR adds POST/DELETE tags, descriptor logic, and UI changes, but there are no accompanying tests in backend or frontend for success/error paths.\n\nSuggested fix: add at least controller/descriptor tests (create success, duplicate name, invalid snapshot, delete missing tag) and UI interaction tests (entry visibility, create/delete success/failure feedback).

String catalog = ctx.pathParam("catalog");
String database = ctx.pathParam("db");
String tableName = ctx.pathParam("table");
Preconditions.checkArgument(
StringUtils.isNotBlank(catalog)
&& StringUtils.isNotBlank(database)
&& StringUtils.isNotBlank(tableName),
"catalog.database.tableName can not be empty in any element");

CreateTagRequest request = ctx.bodyAsClass(CreateTagRequest.class);
Preconditions.checkArgument(
StringUtils.isNotBlank(request.getTagName()), "tagName can not be empty");
Preconditions.checkArgument(
StringUtils.isNotBlank(request.getSnapshotId()), "snapshotId can not be null");
long snapshotId = Long.parseLong(request.getSnapshotId().trim());

Long maxRefAgeMs = request.getMaxRefAgeMs();

tableDescriptor.createTag(
TableIdentifier.of(catalog, database, tableName).buildTableIdentifier(),
request.getTagName(),
snapshotId,
maxRefAgeMs);
ctx.json(OkResponse.ok());
}

/**
* Delete a tag from an Iceberg table.
*
* @param ctx - context for handling the request and response
*/
public void deleteTag(Context ctx) {
String catalog = ctx.pathParam("catalog");
String database = ctx.pathParam("db");
String tableName = ctx.pathParam("table");
String tagName = ctx.pathParam("tagName");
Preconditions.checkArgument(
StringUtils.isNotBlank(catalog)
&& StringUtils.isNotBlank(database)
&& StringUtils.isNotBlank(tableName)
&& StringUtils.isNotBlank(tagName),
"catalog.database.tableName.tagName can not be empty in any element");

tableDescriptor.deleteTag(
TableIdentifier.of(catalog, database, tableName).buildTableIdentifier(), tagName);
ctx.json(OkResponse.ok());
}

public void getTableBranches(Context ctx) {
String catalog = ctx.pathParam("catalog");
String database = ctx.pathParam("db");
Expand Down Expand Up @@ -736,4 +790,35 @@ private List<AMSColumnInfo> transformHiveSchemaToAMSColumnInfo(List<FieldSchema>
})
.collect(Collectors.toList());
}

/** Request body for creating a tag on an Iceberg table. */
public static class CreateTagRequest {
private String tagName;
private String snapshotId;
private Long maxRefAgeMs;

public String getTagName() {
return tagName;
}

public void setTagName(String tagName) {
this.tagName = tagName;
}

public String getSnapshotId() {
return snapshotId;
}

public void setSnapshotId(String snapshotId) {
this.snapshotId = snapshotId;
}

public Long getMaxRefAgeMs() {
return maxRefAgeMs;
}

public void setMaxRefAgeMs(Long maxRefAgeMs) {
this.maxRefAgeMs = maxRefAgeMs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,22 @@ Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(

/** Get the consumer information of the {@link AmoroTable}. */
List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable);

/**
* Create a tag manually for the table.
*
* @param amoroTable target table
* @param tagName tag name
* @param snapshotId snapshot ID to tag
* @param maxRefAgeMs max retention time in milliseconds, null means keep forever
*/
void createTag(AmoroTable<?> amoroTable, String tagName, long snapshotId, Long maxRefAgeMs);

/**
* Delete a tag from the table.
*
* @param amoroTable target table
* @param tagName tag name to delete
*/
void deleteTag(AmoroTable<?> amoroTable, String tagName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,19 @@ public List<TagOrBranchInfo> getTableBranches(AmoroTable<?> amoroTable) {
new TagOrBranchInfo("hoodie-timeline", -1, -1, 0L, 0L, TagOrBranchInfo.BRANCH));
}

@Override
public void createTag(
AmoroTable<?> amoroTable, String tagName, long snapshotId, Long maxRefAgeMs) {
throw new UnsupportedOperationException(
"Creating tags via dashboard is not supported for Hudi tables");
}

@Override
public void deleteTag(AmoroTable<?> amoroTable, String tagName) {
throw new UnsupportedOperationException(
"Deleting tags via dashboard is not supported for Hudi tables");
}

@Override
public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,19 @@ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
return consumerInfos;
}

@Override
public void createTag(
AmoroTable<?> amoroTable, String tagName, long snapshotId, Long maxRefAgeMs) {
throw new UnsupportedOperationException(
"Creating tags via dashboard is not supported for Paimon tables");
}

@Override
public void deleteTag(AmoroTable<?> amoroTable, String tagName) {
throw new UnsupportedOperationException(
"Deleting tags via dashboard is not supported for Paimon tables");
}

private AmoroSnapshotsOfTable manifestListInfo(
FileStore<?> store,
Snapshot snapshot,
Expand Down
10 changes: 10 additions & 0 deletions amoro-web/src/language/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ export default {
nothingToShow: 'Nothing to show',
filterBranchesOrTagsOrConsumers: 'Filter branches/tags/consumers',
findATag: 'Find a tag',
createTag: 'Create Tag',
deleteTag: 'Delete Tag',
tagName: 'Tag Name',
snapshotIdLabel: 'Snapshot ID',
maxRefAgeMs: 'Max Retention Time (ms)',
createTagSuccess: 'Tag created successfully',
deleteTagSuccess: 'Tag deleted successfully',
deleteTagConfirm: 'Are you sure to delete tag "{name}"?',
tagNameRequired: 'Tag name is required',
snapshotIdRequired: 'Snapshot ID is required',
fileSearchPlaceholder: 'Filter partitions',
noResourceGroupsTitle: 'No resource groups available.',
noResourceGroupsContent: 'Please create an optimizer group first.',
Expand Down
10 changes: 10 additions & 0 deletions amoro-web/src/language/zh.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ export default {
nothingToShow: '无内容可展示',
filterBranchesOrTagsOrConsumers: '过滤分支/标签/消费者',
findATag: '查找标签',
createTag: '创建标签',
deleteTag: '删除标签',
tagName: '标签名称',
snapshotIdLabel: '快照 ID',
maxRefAgeMs: '最大保留时间(毫秒)',
createTagSuccess: '标签创建成功',
deleteTagSuccess: '标签删除成功',
deleteTagConfirm: '确定要删除标签 "{name}" 吗?',
tagNameRequired: '标签名称不能为空',
snapshotIdRequired: '快照 ID 不能为空',
fileSearchPlaceholder: '过滤分区',
noResourceGroupsTitle: '没有任何优化组',
noResourceGroupsContent: '需要首先创建一个默认优化组',
Expand Down
28 changes: 28 additions & 0 deletions amoro-web/src/services/table.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,31 @@ export function getConsumers(params: { catalog: string, db: string, table: strin
const { catalog, db, table } = params
return request.get(`/api/ams/v1/tables/catalogs/${catalog}/dbs/${db}/tables/${table}/consumers`)
}

// Create a tag manually
export function createTag(params: {
catalog: string
db: string
table: string
tagName: string
snapshotId: string
maxRefAgeMs?: number
}) {
const { catalog, db, table, tagName, snapshotId, maxRefAgeMs } = params
return request.post(`/api/ams/v1/tables/catalogs/${catalog}/dbs/${db}/tables/${table}/tags`, {
tagName,
snapshotId,
maxRefAgeMs,
})
}

// Delete a tag
export function deleteTag(params: {
catalog: string
db: string
table: string
tagName: string
}) {
const { catalog, db, table, tagName } = params
return request.delete(`/api/ams/v1/tables/catalogs/${catalog}/dbs/${db}/tables/${table}/tags/${tagName}`)
}
Loading
Loading