Skip to content

Support log scanning for PrimaryKey tables with ARROW format #405

@J0hnG4lt

Description

@J0hnG4lt

Search before asking

  • I searched in the issues and found nothing similar.

Description

The Rust client currently rejects log scanning for PrimaryKey tables entirely, returning an UnsupportedOperation error in scanner.rs. However, the Java client supports this when the table's log format is ARROW — the wire format includes a per-record ChangeType byte array (Insert, UpdateBefore, UpdateAfter, Delete) before the Arrow IPC data in non-append-only batches.

Motivation / use case:

I'm an independent contributor (not affiliated with either project) working on adding an Apache Fluss data connector to SpiceAI — a portable accelerated SQL query engine written in Rust. SpiceAI already supports CDC streaming from sources like DynamoDB Streams and Debezium/Kafka, and I'd like to add Fluss as another CDC-capable data source. This requires the Rust client to support log scanning on PK tables so that CDC events (Insert, Update, Delete) can be streamed into SpiceAI's accelerated materialized views for real-time querying.

This work has been done with the assistance of Claude (Anthropic's AI).

I have a working implementation in my fork: J0hnG4lt/fluss-rust#2 (feat/pk-table-arrow-cdc-v2 branch, single commit). I'd love to contribute this upstream and am very open to reviews, suggestions, and any changes needed to align with the project's direction. I need to make a cleaner PR for this.

Changes in the implementation:

  1. arrow.rs — Parse ChangeTypeVector bytes for non-append-only DefaultLogRecordBatch. The is_append_only() flag is bit 0 of the batch attributes. When not append-only, the first record_count bytes after the fixed header are per-record ChangeType values, followed by the Arrow IPC payload.

  2. scanner.rs — Replace the blanket PK table rejection with a format check: only reject non-ARROW formats (INDEXED format scanning is genuinely unsupported). PK tables with ARROW format are allowed.

  3. Tests — Unit tests for ChangeType byte parsing (all 4 change types + error cases) and integration tests for PK table CDC (insert, update, delete change types) using the shared test cluster.

Reference: Java implementation in DefaultLogRecordBatch.columnRecordIterator() and LogScanner (which does not restrict by table type).

Willingness to contribute

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions