Skip to content

Add ducklake destination#627

Merged
bnjjj merged 47 commits into
mainfrom
bnjjj/ducklake
Mar 31, 2026
Merged

Add ducklake destination#627
bnjjj merged 47 commits into
mainfrom
bnjjj/ducklake

Conversation

@bnjjj
Copy link
Copy Markdown
Contributor

@bnjjj bnjjj commented Mar 9, 2026

Add ducklake destination, duckdb doesn't implement an async compliant api so in order to provide good performances and not break the entire async runtime I had to deal with semaphore and blocking task using spawn_blocking. The most important thing is to spend less time as possible in this spawn_blocking task, that's one of the main reason why I use Semaphore. Because without this we would have to potentially wait on pool.get() from r2d2, by using a Semaphore with the same "size" as the pool size we avoid waiting too long on pool.get(). Another point is I put the retry mechanism outside of spawn_blocking to asynchronously sleep and not blocking too long in spawn_blocking.

Outside of these considerations, this destination use duckdb-rs crate and it also uses the ducklake extension.

TODO:

  • Clarify extensions loading, maybe vendor the needed extensions to avoid relying on an external repository and also pin the versions

bnjjj and others added 4 commits February 20, 2026 17:14
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Co-authored-by: Tyler Hillery <tyhillery@gmail.com>
@bnjjj bnjjj requested a review from a team as a code owner March 9, 2026 15:48
…n code

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 9, 2026

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Added a feature-gated DuckLake destination for streaming Postgres CDC with optional S3-compatible storage and table lifecycle handling.
    • New example CLI to run replication to DuckLake.
  • Documentation

    • Expanded examples and README with DuckLake prerequisites, run commands, flags, and query guidance.
  • Tests

    • Added end-to-end integration tests covering writes, truncation, concurrency, and type mappings.
  • Chores

    • Minor .gitignore cleanup and readability fixes.

Walkthrough

This PR introduces a new DuckLake destination feature: workspace Cargo dependencies and feature gates; a ducklake module with config, core, and schema implementations; a DuckLakeDestination type with constructor and Destination impl; S3/catalog URL parsing and libpq conninfo generation; DDL/type-mapping utilities; connection pooling with per-connection setup SQL and retried batched writes; CDC event handling; integration tests; an example binary and README updates; plus minor project file changes (.gitignore, workspace Cargo.toml).

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant Pipeline
    participant DuckLakeDestination
    participant Pool as DuckDB Pool
    participant DuckLake as DuckLake Storage

    Client->>Pipeline: submit CDC events / row batch
    Pipeline->>DuckLakeDestination: write_table_rows(rows) / write_events(events)
    DuckLakeDestination->>Pool: acquire connection
    Pool->>DuckDB: open connection & run setup SQL (attach catalog, load extensions)
    DuckLakeDestination->>Pool: CREATE TABLE IF NOT EXISTS (if needed)
    DuckLakeDestination->>Pool: CREATE TEMP staging table
    DuckLakeDestination->>Pool: INSERT rows into staging
    DuckLakeDestination->>Pool: INSERT INTO ducklake.table SELECT ... FROM staging
    Pool->>DuckLake: persist Parquet snapshot / apply changes
    DuckLakeDestination->>Pipeline: respond success / error
Loading
sequenceDiagram
    participant App as Application
    participant Config as DuckLake Config
    participant Pool as Connection Pool
    participant DuckDB as DuckDB Engine
    participant Catalog as DuckLake Catalog

    App->>Config: provide catalog_url + data_path (+ optional S3)
    Config->>Config: catalog_conninfo_from_url / validate_data_path / build_setup_sql
    Config-->>App: setup SQL (attach catalog, create secret if S3)
    App->>Pool: create pool with setup SQL
    Pool->>DuckDB: get connection and execute setup SQL
    DuckDB->>Catalog: INSTALL/LOAD ducklake (and postgres/httpfs if needed)
    alt S3 used
        DuckDB->>Catalog: CREATE SECRET for httpfs (endpoint, creds)
    end
    Catalog-->>DuckDB: catalog attached and ready
    Pool-->>App: pool initialized and ready for use
Loading

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment thread etl-destinations/src/ducklake/core.rs Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl-destinations/src/ducklake/config.rs`:
- Around line 142-147: The code pushes tcp_user_timeout using seconds (calling
as_secs()), but libpq requires milliseconds; in the block that checks
config.get_tcp_user_timeout() (the parts.push call that formats
"tcp_user_timeout={}" in config.rs) change the conversion to use as_millis() so
the URL encodes milliseconds rather than seconds and ensure the formatted value
is an integer millisecond value compatible with libpq.
- Around line 314-316: validate_data_path currently accepts the "az" scheme but
Azure is not initialized; either remove "az" from the allowed schemes in
validate_data_path or fully implement Azure support in the initialization code:
when handling az:// URIs ensure the DuckDB "azure" extension is installed (not
just httpfs), and create a DuckDB secret with TYPE azure and the Azure
credentials (instead of creating an S3 secret). Also update the setup logic that
installs httpfs and creates S3 secrets to branch on the scheme and
install/create the azure extension/secret for "az" URIs.

In `@etl-destinations/src/ducklake/schema.rs`:
- Around line 73-85: Replace the debug formatting usage that produces invalid
SQL identifiers: call pg_escape::quote_identifier on the table name and on each
column name instead of using {:?} in the CREATE TABLE assembly (the code that
builds col_defs and the final format! call). Specifically, in the iterator
mapping that builds col_defs (referencing col.name and
postgres_column_type_to_ducklake_sql) wrap the column identifier with
quote_identifier and use that escaped identifier in the format string, and
likewise escape table_name with quote_identifier when constructing the final
CREATE TABLE string so both table and column identifiers are valid SQL
identifiers.

In `@etl-examples/src/bin/ducklake.rs`:
- Around line 150-165: Remove the unsafe std::env::set_var use in set_log_level
and instead set the default log level via the EnvFilter fallback in
init_tracing: delete the unsafe block in set_log_level (or make set_log_level a
no-op) and change init_tracing to call
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_|
tracing_subscriber::EnvFilter::new("ducklake=info")) so the default
"ducklake=info" is provided without mutating process environment; reference
init_tracing and set_log_level to locate the code.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro

Run ID: 18b38e58-5cb2-46c6-9c8b-1a1851055534

📥 Commits

Reviewing files that changed from the base of the PR and between 01337b9 and 1afa7ef.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • replication_bis.db is excluded by !**/*.db
📒 Files selected for processing (12)
  • .gitignore
  • Cargo.toml
  • etl-destinations/Cargo.toml
  • etl-destinations/src/ducklake/config.rs
  • etl-destinations/src/ducklake/core.rs
  • etl-destinations/src/ducklake/mod.rs
  • etl-destinations/src/ducklake/schema.rs
  • etl-destinations/src/lib.rs
  • etl-destinations/tests/ducklake_destination.rs
  • etl-examples/Cargo.toml
  • etl-examples/README.md
  • etl-examples/src/bin/ducklake.rs

Comment thread etl-destinations/src/ducklake/config.rs Outdated
Comment thread etl-destinations/src/ducklake/config.rs Outdated
Comment thread etl-destinations/src/ducklake/core.rs Outdated
Comment thread etl-destinations/src/ducklake/schema.rs
Comment thread etl-examples/src/bin/ducklake.rs
bnjjj added 2 commits March 9, 2026 17:16
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl-destinations/src/ducklake/core.rs`:
- Around line 267-293: The current write_events_inner treats Event::Update and
Event::Delete the same as inserts by pushing rows into table_id_to_rows and
calling insert_rows_with_retry; change this to honor the Destination trait
contract by handling each event type appropriately: keep Event::Insert behavior
using insert_rows_with_retry, for Event::Delete construct and route delete
keys/rows to a delete_rows_with_retry (or issue DELETE statements) instead of
inserting old_row, and for Event::Update either emit a delete for the old_row
and an insert for the new table_row or call an upsert/merge API (depending on
DuckLake capabilities) so the old version is removed/merged; update
write_events_inner to track separate collections (e.g., insert_rows_by_table,
delete_rows_by_table, upsert_rows_by_table) and call the corresponding helper
functions (insert_rows_with_retry, delete_rows_with_retry, or
upsert_rows_with_retry), referencing Event::Update, Event::Delete,
write_events_inner, insert_rows_with_retry, table_id_to_rows, and cell_to_value
to locate and modify the code paths.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro

Run ID: d25f47ea-50c9-4add-94df-edb6868728ad

📥 Commits

Reviewing files that changed from the base of the PR and between 1afa7ef and e2d7bbe.

📒 Files selected for processing (4)
  • etl-destinations/Cargo.toml
  • etl-destinations/src/ducklake/core.rs
  • etl-destinations/tests/ducklake_destination.rs
  • etl-examples/src/bin/ducklake.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • etl-destinations/tests/ducklake_destination.rs

Comment thread etl-destinations/src/ducklake/core.rs
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
etl-destinations/src/ducklake/core.rs (2)

445-452: ⚠️ Potential issue | 🔴 Critical

The table-name encoding is still lossy and not safe for downstream SQL use.

This mapping is not injective (a_.b and a._b both collapse to a___b), so two source tables can end up sharing one DuckLake table. It also preserves characters like " that later break the raw SQL interpolation paths used for truncate/insert statements.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl-destinations/src/ducklake/core.rs` around lines 445 - 452, The current
table_name_to_ducklake_table_name implementation is lossy and unsafe; replace it
with an injective, SQL-safe encoding: percent-encode (or otherwise escape) the
entire schema and name components (including the escape char, delimiter, and
quote characters like '"') and then join them with TABLE_NAME_DELIMITER so the
mapping is reversible and unique. Concretely, update
table_name_to_ducklake_table_name (and any helpers/constants if needed) to first
percent-encode each TableName.schema and TableName.name (ensuring '%' itself is
encoded), then return
format!("{encoded_schema}{TABLE_NAME_DELIMITER}{encoded_table}") so collisions
like `a_.b` vs `a._b` cannot occur and raw SQL interpolation no longer breaks;
keep references to TABLE_NAME_DELIMITER, TABLE_NAME_ESCAPE, TableName, and
DuckLakeTableName to locate the change.

256-293: ⚠️ Potential issue | 🔴 Critical

Update/Delete events are still being written as inserts.

Lines 268-293 push both Event::Update and Event::Delete rows into table_id_to_rows, and that batch is later sent through insert_rows_with_retry. That means deletes re-insert old rows and updates leave the old version behind, so the destination diverges from the replication stream.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl-destinations/src/ducklake/core.rs` around lines 256 - 293, The current
Event::Update and Event::Delete branches are accumulating rows into
table_id_to_rows and later calling insert_rows_with_retry, causing updates to
re-insert old data and deletes to resurrect deleted rows. Change the logic so
inserts and upserts go to the existing table_id_to_rows (used with
insert_rows_with_retry or an upsert variant), but collect deletes separately
(e.g., table_id_to_deletes) using the primary-key values from
delete.old_table_row and call the appropriate delete_rows_with_retry/delete_rows
function; for updates, ensure you send the new row as an upsert (or perform
delete by pk + insert of new row) rather than treating it as a blind insert.
Update the Event::Update and Event::Delete branches to populate the correct
collections and invoke the matching insert/upsert/delete helper functions
instead of always calling insert_rows_with_retry.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl-examples/README.md`:
- Around line 46-55: The README overclaims support for GCS/Azure and autoloading
of the "ducklake" and "httpfs" extensions while the CLI binary (ducklake.rs)
currently only implements S3-specific cloud flags; update the documentation to
accurately reflect the shipped surface by restricting storage examples to the
supported S3/local flows and removing references to GCS/Azure and autoloaded
extensions, or alternatively implement the missing cloud backends and
extension-loading behavior in the ducklake CLI (the code that parses cloud flags
and loads extensions in ducklake.rs) before merging so the README and runtime
behavior match.

---

Duplicate comments:
In `@etl-destinations/src/ducklake/core.rs`:
- Around line 445-452: The current table_name_to_ducklake_table_name
implementation is lossy and unsafe; replace it with an injective, SQL-safe
encoding: percent-encode (or otherwise escape) the entire schema and name
components (including the escape char, delimiter, and quote characters like '"')
and then join them with TABLE_NAME_DELIMITER so the mapping is reversible and
unique. Concretely, update table_name_to_ducklake_table_name (and any
helpers/constants if needed) to first percent-encode each TableName.schema and
TableName.name (ensuring '%' itself is encoded), then return
format!("{encoded_schema}{TABLE_NAME_DELIMITER}{encoded_table}") so collisions
like `a_.b` vs `a._b` cannot occur and raw SQL interpolation no longer breaks;
keep references to TABLE_NAME_DELIMITER, TABLE_NAME_ESCAPE, TableName, and
DuckLakeTableName to locate the change.
- Around line 256-293: The current Event::Update and Event::Delete branches are
accumulating rows into table_id_to_rows and later calling
insert_rows_with_retry, causing updates to re-insert old data and deletes to
resurrect deleted rows. Change the logic so inserts and upserts go to the
existing table_id_to_rows (used with insert_rows_with_retry or an upsert
variant), but collect deletes separately (e.g., table_id_to_deletes) using the
primary-key values from delete.old_table_row and call the appropriate
delete_rows_with_retry/delete_rows function; for updates, ensure you send the
new row as an upsert (or perform delete by pk + insert of new row) rather than
treating it as a blind insert. Update the Event::Update and Event::Delete
branches to populate the correct collections and invoke the matching
insert/upsert/delete helper functions instead of always calling
insert_rows_with_retry.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro

Run ID: 600105be-7205-4915-a9f7-87d2d80d2cbe

📥 Commits

Reviewing files that changed from the base of the PR and between e2d7bbe and 881ce2d.

📒 Files selected for processing (4)
  • etl-destinations/src/ducklake/config.rs
  • etl-destinations/src/ducklake/core.rs
  • etl-destinations/src/ducklake/schema.rs
  • etl-examples/README.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • etl-destinations/src/ducklake/schema.rs

Comment thread etl-examples/README.md Outdated
Copy link
Copy Markdown
Contributor

@iambriccardo iambriccardo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, left a few minor comments!

Comment thread etl-destinations/Cargo.toml Outdated
Comment thread etl-examples/Cargo.toml
Comment thread etl-destinations/tests/ducklake_destination.rs
Comment thread etl-destinations/tests/ducklake_destination.rs
Comment thread etl-destinations/src/ducklake/core.rs Outdated
Comment thread etl-destinations/src/ducklake/core.rs Outdated
Comment thread etl-destinations/src/ducklake/core.rs Outdated
Comment thread etl-destinations/src/ducklake/core.rs Outdated
Comment thread etl-destinations/src/ducklake/core.rs Outdated
bnjjj added 15 commits March 11, 2026 11:27
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
bnjjj added 4 commits March 26, 2026 21:25
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Comment thread etl/src/replication/apply.rs Outdated
bnjjj added 11 commits March 27, 2026 21:02
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
…nations

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
@bnjjj bnjjj requested a review from iambriccardo March 30, 2026 14:16
bnjjj added 3 commits March 30, 2026 17:10
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

@iambriccardo iambriccardo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left some minor comments!

Comment thread etl-destinations/src/ducklake/METRICS.md
Comment thread etl-api/src/k8s/core.rs
Comment thread etl-destinations/src/ducklake/core.rs Outdated
bnjjj added 6 commits March 31, 2026 14:20
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
@coveralls
Copy link
Copy Markdown

Coverage Status

coverage: 78.225% (+0.5%) from 77.743%
when pulling 47f7c3c on bnjjj/ducklake
into 33aba35 on main.

@bnjjj bnjjj merged commit 0ae3e60 into main Mar 31, 2026
13 checks passed
@bnjjj bnjjj deleted the bnjjj/ducklake branch March 31, 2026 15:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants