-
Notifications
You must be signed in to change notification settings - Fork 35
Description
Search before asking
- I searched in the issues and found nothing similar.
Description
The Rust client currently rejects log scanning for PrimaryKey tables entirely, returning an UnsupportedOperation error in scanner.rs. However, the Java client supports this when the table's log format is ARROW — the wire format includes a per-record ChangeType byte array (Insert, UpdateBefore, UpdateAfter, Delete) before the Arrow IPC data in non-append-only batches.
Motivation / use case:
I'm an independent contributor (not affiliated with either project) working on adding an Apache Fluss data connector to SpiceAI — a portable accelerated SQL query engine written in Rust. SpiceAI already supports CDC streaming from sources like DynamoDB Streams and Debezium/Kafka, and I'd like to add Fluss as another CDC-capable data source. This requires the Rust client to support log scanning on PK tables so that CDC events (Insert, Update, Delete) can be streamed into SpiceAI's accelerated materialized views for real-time querying.
This work has been done with the assistance of Claude (Anthropic's AI).
I have a working implementation in my fork: J0hnG4lt/fluss-rust#2 (feat/pk-table-arrow-cdc-v2 branch, single commit). I'd love to contribute this upstream and am very open to reviews, suggestions, and any changes needed to align with the project's direction. I need to make a cleaner PR for this.
Changes in the implementation:
-
arrow.rs— ParseChangeTypeVectorbytes for non-append-onlyDefaultLogRecordBatch. Theis_append_only()flag is bit 0 of the batch attributes. When not append-only, the firstrecord_countbytes after the fixed header are per-recordChangeTypevalues, followed by the Arrow IPC payload. -
scanner.rs— Replace the blanket PK table rejection with a format check: only reject non-ARROW formats (INDEXED format scanning is genuinely unsupported). PK tables with ARROW format are allowed. -
Tests — Unit tests for ChangeType byte parsing (all 4 change types + error cases) and integration tests for PK table CDC (insert, update, delete change types) using the shared test cluster.
Reference: Java implementation in DefaultLogRecordBatch.columnRecordIterator() and LogScanner (which does not restrict by table type).
Willingness to contribute
- I'm willing to submit a PR!