-
Notifications
You must be signed in to change notification settings - Fork 382
feat: add Iceberg table tag management UI and API #4164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
07a2317
c2cc203
090c167
2a327c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -88,6 +88,7 @@ | |
| import org.apache.iceberg.SnapshotSummary; | ||
| import org.apache.iceberg.StructLike; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TableMetadata; | ||
| import org.apache.iceberg.TableOperations; | ||
| import org.apache.iceberg.TableScan; | ||
| import org.apache.iceberg.data.GenericRecord; | ||
|
|
@@ -653,6 +654,52 @@ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) { | |
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| @Override | ||
| public void createTag( | ||
| AmoroTable<?> amoroTable, String tagName, long snapshotId, Long maxRefAgeMs) { | ||
| MixedTable mixedTable = getTable(amoroTable); | ||
| Preconditions.checkArgument( | ||
| !mixedTable.isKeyedTable(), "Creating tags on KeyedTable is not supported yet"); | ||
| Table icebergTable = mixedTable.asUnkeyedTable(); | ||
| TableOperations ops = ((HasTableOperations) icebergTable).operations(); | ||
| TableMetadata base = ops.refresh(); | ||
| Preconditions.checkNotNull(base, "Table metadata is null"); | ||
| Preconditions.checkNotNull( | ||
| base.snapshot(snapshotId), "Snapshot %s not found in table", snapshotId); | ||
| // Align with Iceberg createTag semantics (create-only, fail on existing ref) | ||
| // to avoid silently overwriting an existing tag or branch with the same name. | ||
| Preconditions.checkArgument( | ||
| base.ref(tagName) == null, "Ref %s already exists in table", tagName); | ||
|
|
||
| TableMetadata.Builder builder = TableMetadata.buildFrom(base); | ||
| SnapshotRef.Builder refBuilder = SnapshotRef.tagBuilder(snapshotId); | ||
| if (maxRefAgeMs != null && maxRefAgeMs > 0) { | ||
| refBuilder.maxRefAgeMs(maxRefAgeMs); | ||
| } | ||
| builder.setRef(tagName, refBuilder.build()); | ||
| TableMetadata updated = builder.build(); | ||
| ops.commit(base, updated); | ||
| } | ||
|
|
||
| @Override | ||
| public void deleteTag(AmoroTable<?> amoroTable, String tagName) { | ||
| MixedTable mixedTable = getTable(amoroTable); | ||
| Preconditions.checkArgument( | ||
| !mixedTable.isKeyedTable(), "Deleting tags on KeyedTable is not supported yet"); | ||
| Table icebergTable = mixedTable.asUnkeyedTable(); | ||
| TableOperations ops = ((HasTableOperations) icebergTable).operations(); | ||
| TableMetadata base = ops.refresh(); | ||
| Preconditions.checkNotNull(base, "Table metadata is null"); | ||
| SnapshotRef existingRef = base.ref(tagName); | ||
| Preconditions.checkArgument( | ||
| existingRef != null && existingRef.isTag(), "Tag %s not found in table", tagName); | ||
|
|
||
| TableMetadata.Builder builder = TableMetadata.buildFrom(base); | ||
| builder.removeRef(tagName); | ||
| TableMetadata updated = builder.build(); | ||
| ops.commit(base, updated); | ||
| } | ||
|
|
||
| @Override | ||
| public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo( | ||
| AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [P0] Broken method signature causes compilation failure.\n\nReason summary: right after
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this 090c167 |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -641,6 +641,60 @@ public void getTableTags(Context ctx) { | |
| ctx.json(OkResponse.of(amsPageResult)); | ||
| } | ||
|
|
||
| /** | ||
| * Create a tag manually for an Iceberg table. | ||
| * | ||
| * @param ctx - context for handling the request and response | ||
| */ | ||
| public void createTag(Context ctx) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [P2] Missing test coverage for the new tag management API/flow.\n\nReason summary: this PR adds |
||
| String catalog = ctx.pathParam("catalog"); | ||
| String database = ctx.pathParam("db"); | ||
| String tableName = ctx.pathParam("table"); | ||
| Preconditions.checkArgument( | ||
| StringUtils.isNotBlank(catalog) | ||
| && StringUtils.isNotBlank(database) | ||
| && StringUtils.isNotBlank(tableName), | ||
| "catalog.database.tableName can not be empty in any element"); | ||
|
|
||
| CreateTagRequest request = ctx.bodyAsClass(CreateTagRequest.class); | ||
| Preconditions.checkArgument( | ||
| StringUtils.isNotBlank(request.getTagName()), "tagName can not be empty"); | ||
| Preconditions.checkArgument( | ||
| StringUtils.isNotBlank(request.getSnapshotId()), "snapshotId can not be null"); | ||
| long snapshotId = Long.parseLong(request.getSnapshotId().trim()); | ||
|
|
||
| Long maxRefAgeMs = request.getMaxRefAgeMs(); | ||
|
|
||
| tableDescriptor.createTag( | ||
| TableIdentifier.of(catalog, database, tableName).buildTableIdentifier(), | ||
| request.getTagName(), | ||
| snapshotId, | ||
| maxRefAgeMs); | ||
| ctx.json(OkResponse.ok()); | ||
| } | ||
|
|
||
| /** | ||
| * Delete a tag from an Iceberg table. | ||
| * | ||
| * @param ctx - context for handling the request and response | ||
| */ | ||
| public void deleteTag(Context ctx) { | ||
| String catalog = ctx.pathParam("catalog"); | ||
| String database = ctx.pathParam("db"); | ||
| String tableName = ctx.pathParam("table"); | ||
| String tagName = ctx.pathParam("tagName"); | ||
| Preconditions.checkArgument( | ||
| StringUtils.isNotBlank(catalog) | ||
| && StringUtils.isNotBlank(database) | ||
| && StringUtils.isNotBlank(tableName) | ||
| && StringUtils.isNotBlank(tagName), | ||
| "catalog.database.tableName.tagName can not be empty in any element"); | ||
|
|
||
| tableDescriptor.deleteTag( | ||
| TableIdentifier.of(catalog, database, tableName).buildTableIdentifier(), tagName); | ||
| ctx.json(OkResponse.ok()); | ||
| } | ||
|
|
||
| public void getTableBranches(Context ctx) { | ||
| String catalog = ctx.pathParam("catalog"); | ||
| String database = ctx.pathParam("db"); | ||
|
|
@@ -736,4 +790,35 @@ private List<AMSColumnInfo> transformHiveSchemaToAMSColumnInfo(List<FieldSchema> | |
| }) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| /** Request body for creating a tag on an Iceberg table. */ | ||
| public static class CreateTagRequest { | ||
| private String tagName; | ||
| private String snapshotId; | ||
| private Long maxRefAgeMs; | ||
|
|
||
| public String getTagName() { | ||
| return tagName; | ||
| } | ||
|
|
||
| public void setTagName(String tagName) { | ||
| this.tagName = tagName; | ||
| } | ||
|
|
||
| public String getSnapshotId() { | ||
| return snapshotId; | ||
| } | ||
|
|
||
| public void setSnapshotId(String snapshotId) { | ||
| this.snapshotId = snapshotId; | ||
| } | ||
|
|
||
| public Long getMaxRefAgeMs() { | ||
| return maxRefAgeMs; | ||
| } | ||
|
|
||
| public void setMaxRefAgeMs(Long maxRefAgeMs) { | ||
| this.maxRefAgeMs = maxRefAgeMs; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[P1]
createTagcurrently usessetRefwithout guarding name conflicts.\n\nReason summary:builder.setRef(tagName, ...)can overwrite an existing ref with the same name, which is riskier than IcebergcreateTagsemantics (create-only, fail on existing ref).\n\nSuggested fix: add a pre-check such asPreconditions.checkArgument(base.ref(tagName) == null, ...)before setting the ref, or switch to an Iceberg create-tag path that enforces non-overwrite behavior.