Skip to content

Conversation

@Will-Lo
Copy link
Collaborator

@Will-Lo Will-Lo commented Dec 4, 2025

Summary

Issue Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.

When committing table metadatas when multiple schema evolutions have been performed within a single commit, there can be issues ensuring that the schema IDs remain consistent. This is more commonly seen in replication tables as shown below:

---
title: Source Table
---
flowchart TD;
   A[Schema A - id 1] --> B[Schema B - id 2]
   B --> C[Schema C - id 3]
   C --> D[Schema D - id 4]
Loading
---
title: Replica Table
---
flowchart TD;
   A[Schema A - id 1] --> D[Schema D - id 2]
Loading

This causes issues because replication uses table snapshots generated directly on the source table, where each snapshot contains a reference to its corresponding schema ID. This causes the replica table to be unreadable for that specific snapshot. This affects certain compute engines such as Trino, as well as time travel queries.

To resolve this issue, we want to support multiple schema updates within a single commit on the server side. This resolves an extra field to send the delta of schemas, identified by schema ID, when performing a replicated table commit (this can be generalized to not only affect replica tables). Each schema needs to be serialized in order to ensure that the column IDs are consistent.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

For all the boxes checked, include additional details of the changes made in this pull request.

@Will-Lo Will-Lo force-pushed the add-multi-schema-update-support-tables-api branch from d8d7dad to d366bce Compare December 4, 2025 20:58
@Will-Lo Will-Lo marked this pull request as ready for review December 4, 2025 21:50
@cbb330
Copy link
Collaborator

cbb330 commented Dec 4, 2025

this is a similar requirement to snapshots

in snapshots API, we pass List<snapshots> and List<snapshotrefs> via tabledto and deserialize them on server. server also has access to the existing baseMetadata object. so, once on server we:

  1. truncate snapshots that are on baseMetadata but not present in the new serialized version.
  2. add snapshots given in the new serialized version.
  3. update ref pointers

i think it would be useful for code reuse and logically simpler to implement the same API, but for schemas. so, pass List<schemas> which would be all schemas, and do a truncate (i don't think schemas can be truncated) and a append (only add new schemas)

which would preserve ordering.

@cbb330
Copy link
Collaborator

cbb330 commented Dec 4, 2025

does schema require a pointer per branch or is it global?

is last schema in the list always the current schema?

@cbb330
Copy link
Collaborator

cbb330 commented Dec 4, 2025

the table metadata object that we must serialize has these attributes

  private final String metadataFileLocation;
  private final int formatVersion;
  private final String uuid;
  private final String location;
  private final long lastSequenceNumber;
  private final long lastUpdatedMillis;
  private final int lastColumnId;
  private final int currentSchemaId;
  private final List<Schema> schemas;
  private final int defaultSpecId;
  private final List<PartitionSpec> specs;
  private final int lastAssignedPartitionId;
  private final int defaultSortOrderId;
  private final List<SortOrder> sortOrders;
  private final Map<String, String> properties;
  private final long currentSnapshotId;
  private final Map<Integer, Schema> schemasById;
  private final Map<Integer, PartitionSpec> specsById;
  private final Map<Integer, SortOrder> sortOrdersById;
  private final List<HistoryEntry> snapshotLog;
  private final List<MetadataLogEntry> previousFiles;
  private final List<StatisticsFile> statisticsFiles;
  private final List<PartitionStatisticsFile> partitionStatisticsFiles;
  private final List<MetadataUpdate> changes;
  private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
  private volatile List<Snapshot> snapshots;
  private volatile Map<Long, Snapshot> snapshotsById;
  private volatile Map<String, SnapshotRef> refs;
  private volatile boolean snapshotsLoaded;

we know that list has to be serialized

but what about these?

 private final int currentSchemaId;
  private final Map<Integer, Schema> schemasById;

one idea for why curretnschemaid is required is for if we ever support schema rollback, the api will be ready.

schemasById <- i'm not sure what this is but maybe it would be helpful

@Will-Lo
Copy link
Collaborator Author

Will-Lo commented Dec 5, 2025

@cbb330 good questions so I'll go over it one at a time.
Schemas are similar to snapshots but not exactly 1-1 because snapshots are the SoT for determining which schema to use in Iceberg. When performing queries over branches or from timetravel, it will reference the snapshot's schema ID, so schemas just need to be in the map schemasById. Also schemas cannot be expired unlike snapshots, so a table metadata will store its schemas from creation time.

I checked through the docs here based on your questions: https://iceberg.apache.org/docs/nightly/branching/#usage

It is important to understand that the schema tracked for a table is valid across all branches. When working with branches, the table's schema is used as that's the schema being validated when writing data to a branch. On the other hands, querying a tag uses the snapshot's schema, which is the schema id that snapshot pointed to when the snapshot was created.

I didn't want to always send in all schemas in the request similarly to snapshots because like you said, schemas don't go through truncation. Also schemas never get expired on Iceberg, even when the snapshots referencing the schema is expired, so copying over all the schemas from the metadata in each request can blow up the payload size. Hence why the code here only looks at deltas.

There is a concern though that there are already some tables that currently have a mismatch from source <-> dest schemas already, in which case the proposed approach to ensure equality is better. It becomes a tradeoff of handling very large schemas that have been evolved a lot vs fixing existing tables that are already corrupted. I think it is safer to optimize for only sending in the schema deltas though, especially since schemas can't be expired unlike snapshots. Let me know your thoughts.

SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(tableDto.getSnapshotRefs()));
}
}
if (!MapUtils.isEmpty(tableDto.getIntermediateSchemas())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this field would be populated on the tableDto?

+ "This is used to preserve schema history when multiple schema updates occur in a single commit.",
example =
"{\"1\": \"{\\\"type\\\": \\\"struct\\\", \\\"fields\\\": [...]}\", \"2\": \"{...}\"}")
private Map<String, String> intermediateSchemas;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we have changed the api model to pass and capture intermediate schemas. How the intermediate schemas would be determined in the context of replication?

intermediateSchemas.put(
String.valueOf(i), SchemaParser.toJson(metadata.schemasById().get(i), false));
}
createUpdateTableRequestBody.setIntermediateSchemas(intermediateSchemas);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are determining intermediate schema here and this method is called as part of put snapshots and create/update table.

// TODO: consider allowing this for any table type, not just replica tables
if (isMultiSchemaUpdateCommit(base, metadata)
&& getTableType(base, metadata)
== CreateUpdateTableRequestBody.TableTypeEnum.REPLICA_TABLE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yesh makes sense to enable this for replication only.

== CreateUpdateTableRequestBody.TableTypeEnum.REPLICA_TABLE) {
Map<String, String> intermediateSchemas = new HashMap<>();
int startSchemaId = base == null ? 0 : base.currentSchemaId() + 1;
for (int i = startSchemaId; i < metadata.currentSchemaId(); i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the intermediate schema excludes the current/latest schema based on this condition?

tableIdentifier,
e.getMessage(),
e);
throw new RuntimeException(
Copy link
Member

@abhisheknath2011 abhisheknath2011 Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be the table corruption case where one of the intermediate schema is invalid or bad and we are throwing exception for this. But what happens if the next schema is valid? Shall we emit metrics for this use case?

Copy link
Member

@abhisheknath2011 abhisheknath2011 Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this PR, the intermediate schema update is applicable only to the replica table which is good. So can we process all the schemas to see if recent or latest schemas are good and then throw exception for the failed one if needed. But not sure how much guarantee the intermediate schema copy would provide and should not end up corrupting the replica table.

Copy link
Collaborator

@cbb330 cbb330 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strategy wise, intermediate schemas approach seems like an ok tradeoff vs sending all schemas, given that schemas list is append only and content per schema are huge. i have some concerns on rollout.

  1. what will be the impact to destination tables which are already in a "bad" state?
  2. are there any source tables which are in a bad state? because in theory its possible if client evolved the schema twice before committing.

the answer to these two questions may dictate us to ideate on the rollout strategy because we will be "correcting" the old behavior which will change the contract with existing tables.

starting the conversation on the 1) assume destination table has schemas [0,1] but source table has [0,1,2,3,4]. this works now because we don't calculate "newschemas", and destinatino table snapshots which refer to 4 are somehow fine in spark ( but trino doesn't work) the only way to fix this is a true merge because newsnapshots can't be calculated.


// Sort by schema ID to process in order
List<Map.Entry<String, String>> sortedSchemas =
intermediateSchemas.entrySet().stream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are being lexicograhically sorted by string value instead of integer representation e.g. 1, 2, 10 will be sorted to 1,10,2

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should trust the client ordering similarly to how we trust in snapshot list for consistency

private TableMetadata rebuildTblMetaWithSchema(
TableMetadata newMetadata, String schemaKey, boolean reuseMetadata) {
Schema writerSchema = SchemaParser.fromJson(newMetadata.properties().get(schemaKey));
TableMetadata newMetadata, String schemaJson, boolean reuseMetadata) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets evaluate if TableMetadata.java has better setters for this behavior so that we can delete this method and replace with a loop inside process schema because rebuilding the metadata from scratch per iteration is complex error prone and costly

public final class CatalogConstants {
public static final String SNAPSHOTS_JSON_KEY = "snapshotsJsonToBePut";
public static final String SNAPSHOTS_REFS_KEY = "snapshotsRefs";
public static final String INTERMEDIATE_SCHEMAS_KEY = "intermediateSchemas";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename intermediateSchemas to newSchemas everywhere to better semantically represent that this object is append only behavior (since intermediate implies that the schemas are between two states)

}
}

private Catalog getOpenHouseCatalog(SparkSession spark) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is in the "apps/spark" directory which is primarily for maintenance jobs.

can we move to

integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/e2e/

should be easy since it looks like youre not relying on any apps/spark dependencies.

beware that the catalogtest/e2e gradle test structure is a little wrong due to copying between spark3.1/spark3.5 and gradle test commands may be harder to write.

}
}

private Catalog getOpenHouseCatalog(SparkSession spark) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function can be replaced with

CatalogPlugin plugin = spark.sessionState().catalogManager().catalog("openhouse");
      Catalog catalog = ((HasIcebergCatalog) plugin).icebergCatalog();

and used like

      Table table = catalog.loadTable(TableIdentifier.parse(name));

example is in integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/e2e/BranchJavaTest.java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants