Skip to content

feat(experimental): Add schema change support for BigQuery#499

Merged
iambriccardo merged 198 commits into
mainfrom
riccardo/feat/ddl-support-3
Apr 20, 2026
Merged

feat(experimental): Add schema change support for BigQuery#499
iambriccardo merged 198 commits into
mainfrom
riccardo/feat/ddl-support-3

Conversation

@iambriccardo
Copy link
Copy Markdown
Contributor

@iambriccardo iambriccardo commented Dec 11, 2025

Summary

This PR adds experimental schema-change support for BigQuery pipelines.

At a high level, the pipeline can now observe Postgres schema changes, persist versioned table schemas in its own state store, and use that information to decide what schema should exist in the destination at a given point in the replication stream.

How It Works

Schema changes are captured transactionally from Postgres with a DDL event trigger. When an ALTER TABLE happens, the trigger emits a logical replication message in the same transaction, so schema changes stay ordered with the row-level changes that follow.

The pipeline stores table schemas as versioned records keyed by snapshot_id. The initial schema is stored at 0/0, and later schema versions use the LSN associated with the DDL change. This gives the system deterministic schema indexing: every DDL message identifies the schema version by the LSN carried in the stream. On startup or recovery, the pipeline loads the latest schema version whose snapshot_id is less than or equal to the current flush position.

To avoid relying on the destination catalog as the source of truth, this PR replaces legacy table mappings with destination table metadata. That metadata stores the last applied schema version, the previous version, and a replication mask describing which columns are actively replicated. Destinations receive replicated schema information with events, so they can diff and apply schema changes directly instead of reloading schema ad hoc.

Key Design Points

Consistent Schema Loading

This PR makes schema loading consistent by using the same schema-description path for both the initial load and later schema updates. That means the schema shape the pipeline stores and the schema shape it later uses for diffing come from the same source of truth, which reduces drift between bootstrap and steady-state replication.

Replication Masks

A key structure in the design is the replication mask. Stored schemas can contain the full table definition, but not every column is necessarily replicated because publications can apply column-level filtering. The replication mask tells the system which columns in a schema version are actually active for replication.

That separation is important because it lets us:

  • keep a stable, complete schema history for each table
  • apply publication-level column filtering without mutating stored schema versions
  • diff and apply destination schema changes against the columns that are actually replicated

Migration Assumption

The storage migrations in this PR are intentionally designed as a one-way upgrade to keep the rollout simple and fast. In practice, that means multiple pipelines pointing at the same source database cannot safely run with mixed old and new state-store code.

If a source database has more than one pipeline using these ETL tables, they need to be upgraded together. Otherwise, an older pipeline can stop working after the migrated schema is used by a newer pipeline, for example on the next schema-store write, state-store write, or schema change.

Crash-Safe Properties

The crash-safety model is based on the assumption that we always know which schema the destination should have by looking at persisted metadata, not by inspecting the destination itself.

The metadata stores which schema version is currently applied, which version was applied before it, and a status that signals whether a schema transition completed. Because schema versions are indexed deterministically by LSN-backed snapshot_id, the system can recover by reloading the schema version for the current replication position and comparing it with the metadata for the destination.

Today, the status field is intentionally simple. In practice, it is mainly a signal for whether a schema transition finished cleanly or whether the destination should be treated as corrupted. If a destination table is left in applying, the current interpretation is that the schema change may have failed partway through and the destination is no longer trustworthy without cleanup or restart.

This is a pragmatic first step rather than a full repair model. Different destinations can later implement more sophisticated repair or reconciliation flows, but for now the engine mainly uses the status to detect potentially corrupted state and stop pretending the destination is healthy.

BigQuery still does not provide fully atomic multi-statement DDL, so schema application can fail partway through. This PR makes that state explicit through metadata such as applied and applying, but the exact recovery and repair semantics should be defined more clearly in follow-up work.

What Changed

  • capture ALTER TABLE changes in the replication stream with a DDL event trigger
  • store multiple schema versions per table using snapshot_id
  • replace legacy table mappings with destination table metadata, including schema state and replication masks
  • pass replicated schema information with events so destinations can diff and apply changes directly
  • add support for composite primary-key ordinals in schema storage
  • update integration tests to match the migrated etl.table_columns layout, including primary_key_ordinal_position

Validation

  • cargo test -p etl-api --test main --all-features pipelines:: -- --nocapture

Next Steps

  • clean up outdated schema snapshots once the retention condition is met; this can land in a follow-up PR because customers can start using the feature safely before cleanup exists
  • better define schema-change semantics, especially around column deletion and whether operations should behave as hard deletes, soft deletes, or other destination-specific policies
  • better define recovery semantics around applying state and partial schema application, including what cleanup or repair guarantees the engine should provide
  • extend the schema-change model to other destinations in follow-up work; this PR is intentionally focused on BigQuery first

Comment thread etl-destinations/src/ducklake/core.rs Outdated
Comment thread etl-destinations/src/bigquery/core.rs
Copy link
Copy Markdown
Contributor

@farazdagi farazdagi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely amazing work. I've done a first pass and the architecture looks solid.

Most of the comments are just nitpicks, so feel free to ignore when resolving.

Comment thread etl/src/replication/apply.rs Outdated
Comment thread etl/src/conversions/table_row.rs Outdated
Comment thread etl/src/failpoints.rs Outdated
Comment thread etl/migrations/20260415090000_schema_storage_ddl_support.sql Outdated
Comment thread etl-postgres/src/types/schema.rs Outdated
Comment thread etl/src/conversions/event.rs
Comment thread etl/src/conversions/event.rs Outdated
Comment thread etl/src/state/table.rs Outdated
Comment thread etl/src/replication/table_cache.rs Outdated
Comment thread etl/src/replication/apply.rs Outdated
@iambriccardo iambriccardo enabled auto-merge (squash) April 20, 2026 06:54
@iambriccardo iambriccardo merged commit 08a59b7 into main Apr 20, 2026
12 checks passed
@iambriccardo iambriccardo deleted the riccardo/feat/ddl-support-3 branch April 20, 2026 06:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants