-
Notifications
You must be signed in to change notification settings - Fork 63
Add multi schema update support for tables during replica table commits #407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add multi schema update support for tables during replica table commits #407
Conversation
d8d7dad to
d366bce
Compare
|
this is a similar requirement to snapshots in snapshots API, we pass List<snapshots> and List<snapshotrefs> via tabledto and deserialize them on server. server also has access to the existing baseMetadata object. so, once on server we:
i think it would be useful for code reuse and logically simpler to implement the same API, but for schemas. so, pass List<schemas> which would be all schemas, and do a truncate (i don't think schemas can be truncated) and a append (only add new schemas) which would preserve ordering. |
|
does schema require a pointer per branch or is it global? is last schema in the list always the current schema? |
|
the table metadata object that we must serialize has these attributes private final String metadataFileLocation;
private final int formatVersion;
private final String uuid;
private final String location;
private final long lastSequenceNumber;
private final long lastUpdatedMillis;
private final int lastColumnId;
private final int currentSchemaId;
private final List<Schema> schemas;
private final int defaultSpecId;
private final List<PartitionSpec> specs;
private final int lastAssignedPartitionId;
private final int defaultSortOrderId;
private final List<SortOrder> sortOrders;
private final Map<String, String> properties;
private final long currentSnapshotId;
private final Map<Integer, Schema> schemasById;
private final Map<Integer, PartitionSpec> specsById;
private final Map<Integer, SortOrder> sortOrdersById;
private final List<HistoryEntry> snapshotLog;
private final List<MetadataLogEntry> previousFiles;
private final List<StatisticsFile> statisticsFiles;
private final List<PartitionStatisticsFile> partitionStatisticsFiles;
private final List<MetadataUpdate> changes;
private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
private volatile List<Snapshot> snapshots;
private volatile Map<Long, Snapshot> snapshotsById;
private volatile Map<String, SnapshotRef> refs;
private volatile boolean snapshotsLoaded;we know that list has to be serialized but what about these? private final int currentSchemaId;
private final Map<Integer, Schema> schemasById;one idea for why curretnschemaid is required is for if we ever support schema rollback, the api will be ready. schemasById <- i'm not sure what this is but maybe it would be helpful |
|
@cbb330 good questions so I'll go over it one at a time. I checked through the docs here based on your questions: https://iceberg.apache.org/docs/nightly/branching/#usage
I didn't want to always send in all schemas in the request similarly to snapshots because like you said, schemas don't go through truncation. Also schemas never get expired on Iceberg, even when the snapshots referencing the schema is expired, so copying over all the schemas from the metadata in each request can blow up the payload size. Hence why the code here only looks at deltas. There is a concern though that there are already some tables that currently have a mismatch from source <-> dest schemas already, in which case the proposed approach to ensure equality is better. It becomes a tradeoff of handling very large schemas that have been evolved a lot vs fixing existing tables that are already corrupted. I think it is safer to optimize for only sending in the schema deltas though, especially since schemas can't be expired unlike snapshots. Let me know your thoughts. |
| SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(tableDto.getSnapshotRefs())); | ||
| } | ||
| } | ||
| if (!MapUtils.isEmpty(tableDto.getIntermediateSchemas())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How this field would be populated on the tableDto?
| + "This is used to preserve schema history when multiple schema updates occur in a single commit.", | ||
| example = | ||
| "{\"1\": \"{\\\"type\\\": \\\"struct\\\", \\\"fields\\\": [...]}\", \"2\": \"{...}\"}") | ||
| private Map<String, String> intermediateSchemas; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we have changed the api model to pass and capture intermediate schemas. How the intermediate schemas would be determined in the context of replication?
| intermediateSchemas.put( | ||
| String.valueOf(i), SchemaParser.toJson(metadata.schemasById().get(i), false)); | ||
| } | ||
| createUpdateTableRequestBody.setIntermediateSchemas(intermediateSchemas); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are determining intermediate schema here and this method is called as part of put snapshots and create/update table.
| // TODO: consider allowing this for any table type, not just replica tables | ||
| if (isMultiSchemaUpdateCommit(base, metadata) | ||
| && getTableType(base, metadata) | ||
| == CreateUpdateTableRequestBody.TableTypeEnum.REPLICA_TABLE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yesh makes sense to enable this for replication only.
| == CreateUpdateTableRequestBody.TableTypeEnum.REPLICA_TABLE) { | ||
| Map<String, String> intermediateSchemas = new HashMap<>(); | ||
| int startSchemaId = base == null ? 0 : base.currentSchemaId() + 1; | ||
| for (int i = startSchemaId; i < metadata.currentSchemaId(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the intermediate schema excludes the current/latest schema based on this condition?
| tableIdentifier, | ||
| e.getMessage(), | ||
| e); | ||
| throw new RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be the table corruption case where one of the intermediate schema is invalid or bad and we are throwing exception for this. But what happens if the next schema is valid? Shall we emit metrics for this use case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on this PR, the intermediate schema update is applicable only to the replica table which is good. So can we process all the schemas to see if recent or latest schemas are good and then throw exception for the failed one if needed. But not sure how much guarantee the intermediate schema copy would provide and should not end up corrupting the replica table.
cbb330
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strategy wise, intermediate schemas approach seems like an ok tradeoff vs sending all schemas, given that schemas list is append only and content per schema are huge. i have some concerns on rollout.
- what will be the impact to destination tables which are already in a "bad" state?
- are there any source tables which are in a bad state? because in theory its possible if client evolved the schema twice before committing.
the answer to these two questions may dictate us to ideate on the rollout strategy because we will be "correcting" the old behavior which will change the contract with existing tables.
starting the conversation on the 1) assume destination table has schemas [0,1] but source table has [0,1,2,3,4]. this works now because we don't calculate "newschemas", and destinatino table snapshots which refer to 4 are somehow fine in spark ( but trino doesn't work) the only way to fix this is a true merge because newsnapshots can't be calculated.
|
|
||
| // Sort by schema ID to process in order | ||
| List<Map.Entry<String, String>> sortedSchemas = | ||
| intermediateSchemas.entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are being lexicograhically sorted by string value instead of integer representation e.g. 1, 2, 10 will be sorted to 1,10,2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should trust the client ordering similarly to how we trust in snapshot list for consistency
| private TableMetadata rebuildTblMetaWithSchema( | ||
| TableMetadata newMetadata, String schemaKey, boolean reuseMetadata) { | ||
| Schema writerSchema = SchemaParser.fromJson(newMetadata.properties().get(schemaKey)); | ||
| TableMetadata newMetadata, String schemaJson, boolean reuseMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets evaluate if TableMetadata.java has better setters for this behavior so that we can delete this method and replace with a loop inside process schema because rebuilding the metadata from scratch per iteration is complex error prone and costly
| public final class CatalogConstants { | ||
| public static final String SNAPSHOTS_JSON_KEY = "snapshotsJsonToBePut"; | ||
| public static final String SNAPSHOTS_REFS_KEY = "snapshotsRefs"; | ||
| public static final String INTERMEDIATE_SCHEMAS_KEY = "intermediateSchemas"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we rename intermediateSchemas to newSchemas everywhere to better semantically represent that this object is append only behavior (since intermediate implies that the schemas are between two states)
| } | ||
| } | ||
|
|
||
| private Catalog getOpenHouseCatalog(SparkSession spark) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test is in the "apps/spark" directory which is primarily for maintenance jobs.
can we move to
integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/e2e/
should be easy since it looks like youre not relying on any apps/spark dependencies.
beware that the catalogtest/e2e gradle test structure is a little wrong due to copying between spark3.1/spark3.5 and gradle test commands may be harder to write.
| } | ||
| } | ||
|
|
||
| private Catalog getOpenHouseCatalog(SparkSession spark) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function can be replaced with
CatalogPlugin plugin = spark.sessionState().catalogManager().catalog("openhouse");
Catalog catalog = ((HasIcebergCatalog) plugin).icebergCatalog();and used like
Table table = catalog.loadTable(TableIdentifier.parse(name));example is in integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/e2e/BranchJavaTest.java
Summary
Issue Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.
When committing table metadatas when multiple schema evolutions have been performed within a single commit, there can be issues ensuring that the schema IDs remain consistent. This is more commonly seen in replication tables as shown below:
This causes issues because replication uses table snapshots generated directly on the source table, where each snapshot contains a reference to its corresponding schema ID. This causes the replica table to be unreadable for that specific snapshot. This affects certain compute engines such as Trino, as well as time travel queries.
To resolve this issue, we want to support multiple schema updates within a single commit on the server side. This resolves an extra field to send the delta of schemas, identified by schema ID, when performing a replicated table commit (this can be generalized to not only affect replica tables). Each schema needs to be serialized in order to ensure that the column IDs are consistent.
Changes
For all the boxes checked, please include additional details of the changes made in this pull request.
Testing Done
For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.
Additional Information
For all the boxes checked, include additional details of the changes made in this pull request.