Add ducklake destination#627
Conversation
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Co-authored-by: Tyler Hillery <tyhillery@gmail.com>
…n code Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
📝 WalkthroughSummary by CodeRabbit
WalkthroughThis PR introduces a new DuckLake destination feature: workspace Cargo dependencies and feature gates; a Sequence Diagram(s)sequenceDiagram
actor Client
participant Pipeline
participant DuckLakeDestination
participant Pool as DuckDB Pool
participant DuckLake as DuckLake Storage
Client->>Pipeline: submit CDC events / row batch
Pipeline->>DuckLakeDestination: write_table_rows(rows) / write_events(events)
DuckLakeDestination->>Pool: acquire connection
Pool->>DuckDB: open connection & run setup SQL (attach catalog, load extensions)
DuckLakeDestination->>Pool: CREATE TABLE IF NOT EXISTS (if needed)
DuckLakeDestination->>Pool: CREATE TEMP staging table
DuckLakeDestination->>Pool: INSERT rows into staging
DuckLakeDestination->>Pool: INSERT INTO ducklake.table SELECT ... FROM staging
Pool->>DuckLake: persist Parquet snapshot / apply changes
DuckLakeDestination->>Pipeline: respond success / error
sequenceDiagram
participant App as Application
participant Config as DuckLake Config
participant Pool as Connection Pool
participant DuckDB as DuckDB Engine
participant Catalog as DuckLake Catalog
App->>Config: provide catalog_url + data_path (+ optional S3)
Config->>Config: catalog_conninfo_from_url / validate_data_path / build_setup_sql
Config-->>App: setup SQL (attach catalog, create secret if S3)
App->>Pool: create pool with setup SQL
Pool->>DuckDB: get connection and execute setup SQL
DuckDB->>Catalog: INSTALL/LOAD ducklake (and postgres/httpfs if needed)
alt S3 used
DuckDB->>Catalog: CREATE SECRET for httpfs (endpoint, creds)
end
Catalog-->>DuckDB: catalog attached and ready
Pool-->>App: pool initialized and ready for use
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@etl-destinations/src/ducklake/config.rs`:
- Around line 142-147: The code pushes tcp_user_timeout using seconds (calling
as_secs()), but libpq requires milliseconds; in the block that checks
config.get_tcp_user_timeout() (the parts.push call that formats
"tcp_user_timeout={}" in config.rs) change the conversion to use as_millis() so
the URL encodes milliseconds rather than seconds and ensure the formatted value
is an integer millisecond value compatible with libpq.
- Around line 314-316: validate_data_path currently accepts the "az" scheme but
Azure is not initialized; either remove "az" from the allowed schemes in
validate_data_path or fully implement Azure support in the initialization code:
when handling az:// URIs ensure the DuckDB "azure" extension is installed (not
just httpfs), and create a DuckDB secret with TYPE azure and the Azure
credentials (instead of creating an S3 secret). Also update the setup logic that
installs httpfs and creates S3 secrets to branch on the scheme and
install/create the azure extension/secret for "az" URIs.
In `@etl-destinations/src/ducklake/schema.rs`:
- Around line 73-85: Replace the debug formatting usage that produces invalid
SQL identifiers: call pg_escape::quote_identifier on the table name and on each
column name instead of using {:?} in the CREATE TABLE assembly (the code that
builds col_defs and the final format! call). Specifically, in the iterator
mapping that builds col_defs (referencing col.name and
postgres_column_type_to_ducklake_sql) wrap the column identifier with
quote_identifier and use that escaped identifier in the format string, and
likewise escape table_name with quote_identifier when constructing the final
CREATE TABLE string so both table and column identifiers are valid SQL
identifiers.
In `@etl-examples/src/bin/ducklake.rs`:
- Around line 150-165: Remove the unsafe std::env::set_var use in set_log_level
and instead set the default log level via the EnvFilter fallback in
init_tracing: delete the unsafe block in set_log_level (or make set_log_level a
no-op) and change init_tracing to call
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_|
tracing_subscriber::EnvFilter::new("ducklake=info")) so the default
"ducklake=info" is provided without mutating process environment; reference
init_tracing and set_log_level to locate the code.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Central YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: 18b38e58-5cb2-46c6-9c8b-1a1851055534
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.lockreplication_bis.dbis excluded by!**/*.db
📒 Files selected for processing (12)
.gitignoreCargo.tomletl-destinations/Cargo.tomletl-destinations/src/ducklake/config.rsetl-destinations/src/ducklake/core.rsetl-destinations/src/ducklake/mod.rsetl-destinations/src/ducklake/schema.rsetl-destinations/src/lib.rsetl-destinations/tests/ducklake_destination.rsetl-examples/Cargo.tomletl-examples/README.mdetl-examples/src/bin/ducklake.rs
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@etl-destinations/src/ducklake/core.rs`:
- Around line 267-293: The current write_events_inner treats Event::Update and
Event::Delete the same as inserts by pushing rows into table_id_to_rows and
calling insert_rows_with_retry; change this to honor the Destination trait
contract by handling each event type appropriately: keep Event::Insert behavior
using insert_rows_with_retry, for Event::Delete construct and route delete
keys/rows to a delete_rows_with_retry (or issue DELETE statements) instead of
inserting old_row, and for Event::Update either emit a delete for the old_row
and an insert for the new table_row or call an upsert/merge API (depending on
DuckLake capabilities) so the old version is removed/merged; update
write_events_inner to track separate collections (e.g., insert_rows_by_table,
delete_rows_by_table, upsert_rows_by_table) and call the corresponding helper
functions (insert_rows_with_retry, delete_rows_with_retry, or
upsert_rows_with_retry), referencing Event::Update, Event::Delete,
write_events_inner, insert_rows_with_retry, table_id_to_rows, and cell_to_value
to locate and modify the code paths.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Central YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: d25f47ea-50c9-4add-94df-edb6868728ad
📒 Files selected for processing (4)
etl-destinations/Cargo.tomletl-destinations/src/ducklake/core.rsetl-destinations/tests/ducklake_destination.rsetl-examples/src/bin/ducklake.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- etl-destinations/tests/ducklake_destination.rs
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
etl-destinations/src/ducklake/core.rs (2)
445-452:⚠️ Potential issue | 🔴 CriticalThe table-name encoding is still lossy and not safe for downstream SQL use.
This mapping is not injective (
a_.banda._bboth collapse toa___b), so two source tables can end up sharing one DuckLake table. It also preserves characters like"that later break the raw SQL interpolation paths used for truncate/insert statements.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@etl-destinations/src/ducklake/core.rs` around lines 445 - 452, The current table_name_to_ducklake_table_name implementation is lossy and unsafe; replace it with an injective, SQL-safe encoding: percent-encode (or otherwise escape) the entire schema and name components (including the escape char, delimiter, and quote characters like '"') and then join them with TABLE_NAME_DELIMITER so the mapping is reversible and unique. Concretely, update table_name_to_ducklake_table_name (and any helpers/constants if needed) to first percent-encode each TableName.schema and TableName.name (ensuring '%' itself is encoded), then return format!("{encoded_schema}{TABLE_NAME_DELIMITER}{encoded_table}") so collisions like `a_.b` vs `a._b` cannot occur and raw SQL interpolation no longer breaks; keep references to TABLE_NAME_DELIMITER, TABLE_NAME_ESCAPE, TableName, and DuckLakeTableName to locate the change.
256-293:⚠️ Potential issue | 🔴 CriticalUpdate/Delete events are still being written as inserts.
Lines 268-293 push both
Event::UpdateandEvent::Deleterows intotable_id_to_rows, and that batch is later sent throughinsert_rows_with_retry. That means deletes re-insert old rows and updates leave the old version behind, so the destination diverges from the replication stream.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@etl-destinations/src/ducklake/core.rs` around lines 256 - 293, The current Event::Update and Event::Delete branches are accumulating rows into table_id_to_rows and later calling insert_rows_with_retry, causing updates to re-insert old data and deletes to resurrect deleted rows. Change the logic so inserts and upserts go to the existing table_id_to_rows (used with insert_rows_with_retry or an upsert variant), but collect deletes separately (e.g., table_id_to_deletes) using the primary-key values from delete.old_table_row and call the appropriate delete_rows_with_retry/delete_rows function; for updates, ensure you send the new row as an upsert (or perform delete by pk + insert of new row) rather than treating it as a blind insert. Update the Event::Update and Event::Delete branches to populate the correct collections and invoke the matching insert/upsert/delete helper functions instead of always calling insert_rows_with_retry.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@etl-examples/README.md`:
- Around line 46-55: The README overclaims support for GCS/Azure and autoloading
of the "ducklake" and "httpfs" extensions while the CLI binary (ducklake.rs)
currently only implements S3-specific cloud flags; update the documentation to
accurately reflect the shipped surface by restricting storage examples to the
supported S3/local flows and removing references to GCS/Azure and autoloaded
extensions, or alternatively implement the missing cloud backends and
extension-loading behavior in the ducklake CLI (the code that parses cloud flags
and loads extensions in ducklake.rs) before merging so the README and runtime
behavior match.
---
Duplicate comments:
In `@etl-destinations/src/ducklake/core.rs`:
- Around line 445-452: The current table_name_to_ducklake_table_name
implementation is lossy and unsafe; replace it with an injective, SQL-safe
encoding: percent-encode (or otherwise escape) the entire schema and name
components (including the escape char, delimiter, and quote characters like '"')
and then join them with TABLE_NAME_DELIMITER so the mapping is reversible and
unique. Concretely, update table_name_to_ducklake_table_name (and any
helpers/constants if needed) to first percent-encode each TableName.schema and
TableName.name (ensuring '%' itself is encoded), then return
format!("{encoded_schema}{TABLE_NAME_DELIMITER}{encoded_table}") so collisions
like `a_.b` vs `a._b` cannot occur and raw SQL interpolation no longer breaks;
keep references to TABLE_NAME_DELIMITER, TABLE_NAME_ESCAPE, TableName, and
DuckLakeTableName to locate the change.
- Around line 256-293: The current Event::Update and Event::Delete branches are
accumulating rows into table_id_to_rows and later calling
insert_rows_with_retry, causing updates to re-insert old data and deletes to
resurrect deleted rows. Change the logic so inserts and upserts go to the
existing table_id_to_rows (used with insert_rows_with_retry or an upsert
variant), but collect deletes separately (e.g., table_id_to_deletes) using the
primary-key values from delete.old_table_row and call the appropriate
delete_rows_with_retry/delete_rows function; for updates, ensure you send the
new row as an upsert (or perform delete by pk + insert of new row) rather than
treating it as a blind insert. Update the Event::Update and Event::Delete
branches to populate the correct collections and invoke the matching
insert/upsert/delete helper functions instead of always calling
insert_rows_with_retry.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Central YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: 600105be-7205-4915-a9f7-87d2d80d2cbe
📒 Files selected for processing (4)
etl-destinations/src/ducklake/config.rsetl-destinations/src/ducklake/core.rsetl-destinations/src/ducklake/schema.rsetl-examples/README.md
🚧 Files skipped from review as they are similar to previous changes (1)
- etl-destinations/src/ducklake/schema.rs
iambriccardo
left a comment
There was a problem hiding this comment.
Great work, left a few minor comments!
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
…nations Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
iambriccardo
left a comment
There was a problem hiding this comment.
LGTM, left some minor comments!
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>
Add ducklake destination, duckdb doesn't implement an async compliant api so in order to provide good performances and not break the entire async runtime I had to deal with semaphore and blocking task using
spawn_blocking. The most important thing is to spend less time as possible in thisspawn_blockingtask, that's one of the main reason why I useSemaphore. Because without this we would have to potentially wait onpool.get()from r2d2, by using aSemaphorewith the same "size" as the pool size we avoid waiting too long onpool.get(). Another point is I put the retry mechanism outside ofspawn_blockingto asynchronously sleep and not blocking too long inspawn_blocking.Outside of these considerations, this destination use
duckdb-rs crate and it also uses theducklakeextension.TODO: