Skip to content

[Feature] PyPaimon Real RowKind Support #6735

@kaori-seasons

Description

@kaori-seasons

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

I. Overview

1.1 Problem Description

The PyPaimon KeyValueDataWriter was hardcoding all rows' _VALUE_KIND to 0 (INSERT), losing information about different row operation types. This prevented proper handling of CDC (Change Data Capture) scenarios with UPDATE and DELETE operations.

Example Scenario:

  • Write operation 1: User 'Alice' with RowKind=INSERT
  • Write operation 2: User 'Alice' updated to 'Alice_updated' with RowKind=UPDATE_AFTER
  • Write operation 3: User 'Alice' deleted with RowKind=DELETE
  • Problem: All three records stored with _VALUE_KIND=0 (INSERT)
  • Expected: Each record should have correct _VALUE_KIND (0, 2, 3)

1.2 Root Cause Analysis

Write-side Issue: The _add_system_fields() method in KeyValueDataWriter hardcoded _VALUE_KIND values:

# Before: Always INSERT
value_kind_column = pa.array([0] * num_rows, type=pa.int32())

Impact Areas:

  • CDC scenarios: Cannot distinguish UPDATE_BEFORE/UPDATE_AFTER operations
  • Delete handling: Cannot mark DELETE rows (RowKind=3)
  • Read-side consistency: Read-side DropDeleteReader depends on correct RowKind values
  • Data semantics: Loss of operational metadata

1.3 RowKind Definition

PyPaimon defines four RowKind types:

  • INSERT (0): +I - New record insertion
  • UPDATE_BEFORE (1): -U - Previous value before update
  • UPDATE_AFTER (2): +U - New value after update
  • DELETE (3): -D - Record deletion

II. Technical Solution Design

2.1 Core Design Approach

Implement real RowKind extraction from input data by:

  1. Reading optional __row_kind__ column from input RecordBatch
  2. Validating RowKind values (0-3 range)
  3. Using extracted values for _VALUE_KIND field
  4. Maintaining backward compatibility (defaulting to INSERT)

2.2 Solution Advantages

  1. Full CDC Support: Correctly handles all row operation types
  2. Backward Compatible: No breaking changes to existing API
  3. Type-Safe: Validates data types and value ranges
  4. Well-Tested: Comprehensive unit test coverage (12 tests)
  5. Production-Ready: Clean error messages and logging

2.3 API Design

User API - Method 1: Using __row_kind__ column

import pyarrow as pa
from pypaimon.table.row.row_kind import RowKind

data = pa.Table.from_pydict({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    '__row_kind__': pa.array([
        RowKind.INSERT.value,      # 0
        RowKind.UPDATE_AFTER.value, # 2
        RowKind.DELETE.value        # 3
    ], type=pa.int32())
})
write.write_arrow(data)

User API - Method 2: Using row_kinds parameter

data = pa.Table.from_pydict({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie']
})
write.write_arrow(data, row_kinds=[0, 2, 3])

III. Implementation Details

3.1 KeyValueDataWriter Modifications

File: paimon-python/pypaimon/write/writer/key_value_data_writer.py

New Method: _extract_row_kind_column()

def _extract_row_kind_column(self, data: pa.RecordBatch, num_rows: int) -> pa.Array:
    """Extract or generate RowKind column from input data.
    
    - Validates data type (must be int32)
    - Validates values are in range [0-3]
    - Returns extracted column or default INSERT (0) for all rows
    """

Key Features:

  • Checks for __row_kind__ column presence
  • Type validation: int32 only
  • Value validation: 0-3 range check
  • Default behavior: All INSERT when column missing
  • Logging for debugging

Updated Method: _add_system_fields()

  • Calls _extract_row_kind_column() instead of hardcoding
  • Removes temporary __row_kind__ column after extraction
  • Maintains backward compatibility

New Method: _deduplicate_by_primary_key()

  • Deduplicates data by primary key
  • Preserves latest record (maximum sequence number)
  • O(n) time complexity

3.2 BatchTableWrite API Enhancement

File: paimon-python/pypaimon/write/batch_table_write.py

Enhanced Methods:

  • write_arrow(table, row_kinds: Optional[List[int]]) - Table-level write with RowKind
  • write_arrow_batch(batch, row_kinds: Optional[List[int]]) - Batch-level write with RowKind

New Helper Methods:

  • _add_row_kind_column() - Adds RowKind column to Table
  • _add_row_kind_to_batch() - Adds RowKind column to RecordBatch

Improved Validation:

  • _validate_pyarrow_schema() - Now ignores temporary __row_kind__ column

IV. Before and After Comparison

4.1 Write Data Flow

Aspect Before After
RowKind Support Hardcoded INSERT (0) Real RowKind (0-3)
CDC Scenarios Not supported Fully supported
UPDATE Operations Lost metadata Preserved as BEFORE/AFTER
DELETE Operations Indistinguishable Marked as DELETE (3)
API No row_kinds parameter Optional row_kinds parameter
Backward Compatibility N/A 100% compatible
Default Behavior All INSERT INSERT when no RowKind provided

4.2 Example Data Transformation

Input Data (with RowKind):

id=1, name='Alice',   __row_kind__=0 (INSERT)
id=2, name='Bob',     __row_kind__=2 (UPDATE_AFTER)
id=3, name='Charlie', __row_kind__=3 (DELETE)

Before Implementation:

_KEY_id=1, name='Alice',   _VALUE_KIND=0, _SEQUENCE_NUMBER=1
_KEY_id=2, name='Bob',     _VALUE_KIND=0, _SEQUENCE_NUMBER=2  ❌ Should be 2
_KEY_id=3, name='Charlie', _VALUE_KIND=0, _SEQUENCE_NUMBER=3  ❌ Should be 3

After Implementation:

_KEY_id=1, name='Alice',   _VALUE_KIND=0, _SEQUENCE_NUMBER=1  ✅ Correct
_KEY_id=2, name='Bob',     _VALUE_KIND=2, _SEQUENCE_NUMBER=2  ✅ Correct
_KEY_id=3, name='Charlie', _VALUE_KIND=3, _SEQUENCE_NUMBER=3  ✅ Correct

4.3 Read-side Benefits

Without Real RowKind:

  • DropDeleteReader cannot correctly identify DELETE rows
  • All rows appear as insertions
  • CDC semantics lost during reads

With Real RowKind:

  • DropDeleteReader correctly filters DELETE rows (RowKind=3)
  • UPDATE operations maintain semantic correctness
  • Full CDC consistency across write-read pipeline

V. Performance Analysis

5.1 Complexity Analysis

Operation Time Space Notes
Extract RowKind O(n) O(1) Single pass validation
Deduplication O(n) O(m) m = distinct primary keys
Add RowKind column O(1) O(n) PyArrow native operation
Schema validation O(1) O(1) Column presence check

5.2 Memory Impact

  • Per-row overhead: 4 bytes (int32 RowKind)
  • Temporary column: Automatically cleaned up after extraction
  • Hash map for dedup: O(m) where m = unique primary keys

Appendix: Code Examples

Example 1: Basic RowKind Usage

import pyarrow as pa
from pypaimon.table.row.row_kind import RowKind

# Create data with RowKind information
data = pa.Table.from_pydict({
    'user_id': [1, 2, 3, 4],
    'name': ['Alice', 'Bob', 'Charlie', 'Dave'],
    '__row_kind__': pa.array([
        RowKind.INSERT.value,       # 0 - New user
        RowKind.UPDATE_AFTER.value, # 2 - Updated name
        RowKind.DELETE.value,       # 3 - User deleted
        RowKind.INSERT.value        # 0 - New user
    ], type=pa.int32())
})

# Write with real RowKind values
write.write_arrow(data)

Example 2: Using row_kinds Parameter

# Data without __row_kind__ column
data = pa.Table.from_pydict({
    'user_id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie']
})

# Specify RowKind separately
write.write_arrow(data, row_kinds=[0, 2, 3])

Example 3: CDC Pipeline

# Simulate CDC events
cdc_events = [
    {'user_id': 1, 'name': 'Alice', 'kind': 0},      # Insert
    {'user_id': 2, 'name': 'Bob', 'kind': 0},        # Insert
    {'user_id': 2, 'name': 'Bob_Updated', 'kind': 2},# Update
    {'user_id': 3, 'name': 'Charlie', 'kind': 3},    # Delete
]

# Process CDC events
for event in cdc_events:
    data = pa.Table.from_pydict({
        'user_id': [event['user_id']],
        'name': [event['name']],
        '__row_kind__': pa.array([event['kind']], type=pa.int32())
    })
    write.write_arrow(data)

Solution

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions