Skip to content

feat(caches): Add PyIceberg-based cache for Apache Iceberg format#970

Open
aaronsteers wants to merge 23 commits intomainfrom
devin/1769653838-iceberg-cache
Open

feat(caches): Add PyIceberg-based cache for Apache Iceberg format#970
aaronsteers wants to merge 23 commits intomainfrom
devin/1769653838-iceberg-cache

Conversation

@aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Jan 29, 2026

Summary

Adds a new IcebergCache that stores synced data in Apache Iceberg format using PyIceberg. This enables users to cache data as Parquet files with Iceberg metadata for efficient querying with any Iceberg-compatible query engine.

Key components:

  • IcebergConfig: Configuration supporting SQLite catalogs (local dev), REST catalogs, and AWS Glue catalogs with S3 storage
  • IcebergProcessor: Writes data to Iceberg tables using a two-phase write approach for proper write method selection
  • IcebergTypeConverter: Maps JSON schema types to Iceberg types with sophisticated object typing configuration
  • IcebergCache: User-facing API with get_pandas_dataframe(), get_arrow_dataset(), and get_iceberg_table() methods

Object typing configuration:

  • object_typing: Choose between nested_types (structs, better query performance) or as_json_strings (JSON strings, more permissive)
  • additional_properties: Handle undeclared properties with fail, ignore, or stringify modes
  • anyof_properties: Handle union types with fail, branch (separate subcolumns), or stringify modes
  • _airbyte_meta is always stored as a StructType (never stringified), matching Kotlin S3 Data Lake destination behavior
  • Stringified columns use __json suffix (e.g., col_a__json) for clarity at query time

S3 and Glue support:

  • AWS credentials (aws_access_key_id, aws_secret_access_key)
  • S3 configuration (s3_bucket_name, s3_bucket_region, s3_endpoint)
  • Glue catalog support (glue_id for AWS Account ID)

Example script: examples/run_iceberg_cache.py demonstrates usage with source-spacex-api.

Updates since last revision

Merge-on-read implementation for incremental syncs:

  • Implemented two-phase write approach: _write_files_to_new_table now stores Arrow tables in memory, and the appropriate write method performs the actual Iceberg write
  • _emulated_merge_temp_table_to_final_table now uses PyIceberg's native upsert() for merge-on-read semantics (delete files for existing records, append new records, deduplication at read time)
  • _append_temp_table_to_final_table uses append() for simple appends without deduplication
  • _swap_temp_table_with_final_table uses overwrite() for REPLACE write method
  • Falls back to append() when no primary keys are defined (no deduplication possible)

Review & Testing Checklist for Human

  • Test incremental sync with deduplication - Run an incremental sync twice with overlapping records (same primary keys) and verify that records are deduplicated, not duplicated. This is the core new functionality.
  • Verify upsert behavior with primary keys - Confirm that catalog_provider.get_primary_keys() returns the correct columns and that upsert uses them properly
  • Test memory usage with large batches - The two-phase write stores Arrow tables in memory before writing. Verify this doesn't cause issues with large data volumes.
  • Verify object typing modes - Test both nested_types and as_json_strings modes with sources that have complex nested schemas
  • Test S3/Glue configuration - The S3 and Glue config fields are added but not integration-tested. Verify they work with actual AWS resources

Suggested test plan for merge-on-read:

import airbyte as ab
from airbyte.caches import IcebergCache

# Create cache
cache = IcebergCache(
    warehouse_path="/tmp/iceberg_test",
    namespace="test",
)

# First sync
source = ab.get_source("source-faker")
source.select_streams(["users"])
result1 = source.read(cache)
count1 = len(list(cache.get_iceberg_table("users").scan().to_arrow()))

# Second sync (incremental - should deduplicate)
result2 = source.read(cache)
count2 = len(list(cache.get_iceberg_table("users").scan().to_arrow()))

# Verify no duplicates (count should be similar, not doubled)
print(f"After first sync: {count1}, After second sync: {count2}")

Notes

Summary by CodeRabbit

  • New Features

    • Added an Apache Iceberg-backed cache with configurable catalog/warehouse/namespace and full read/write support.
  • Exports

    • IcebergCache and IcebergConfig are now exposed for easy import.
  • Examples

    • Added an end-to-end example showing writing to the Iceberg cache and reading back as pandas, PyArrow, or Iceberg table.
  • Tests

    • New unit tests covering config, type/schema mapping, cache behavior, and processor semantics.
  • Chores

    • Added pyiceberg extras for SQLite and PyArrow support.

✏️ Tip: You can customize this high-level summary in your review settings.

This adds a new IcebergCache that stores data in Apache Iceberg format using PyIceberg.

Features:
- IcebergConfig for configuring Iceberg catalogs (SQLite for local, REST for production)
- IcebergProcessor for writing data to Iceberg tables
- IcebergTypeConverter for JSON schema to Iceberg type conversion
- Support for multiple catalog types: sql, rest, glue, hive
- Direct writes to final tables (Iceberg handles transactions natively)
- Methods for accessing data: get_pandas_dataframe, get_arrow_dataset, get_iceberg_table

The implementation follows the existing cache patterns while adapting to Iceberg's
native transaction handling and Parquet-based storage.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653838-iceberg-cache' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653838-iceberg-cache'

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /uv-lock - Updates uv.lock file
  • /test-pr - Runs tests with the updated PyAirbyte
  • /prerelease - Builds and publishes a prerelease version to PyPI
📚 Show Repo Guidance

Helpful Resources

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

@github-actions
Copy link

github-actions bot commented Jan 29, 2026

PyTest Results (Fast Tests Only, No Creds)

345 tests  +19   345 ✅ +19   5m 32s ⏱️ -1s
  1 suites ± 0     0 💤 ± 0 
  1 files   ± 0     0 ❌ ± 0 

Results for commit 40b2ab5. ± Comparison against base commit 8418815.

♻️ This comment has been updated with latest results.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 29, 2026

📝 Walkthrough

Walkthrough

Adds an Iceberg-backed cache: new IcebergConfig (catalog/warehouse), IcebergTypeConverter (JSON Schema→Iceberg), IcebergProcessor (table lifecycle and writes), IcebergCache public API/exports, pyiceberg extras in pyproject.toml, unit tests, and an example script.

Changes

Cohort / File(s) Summary
Iceberg Processor Core
airbyte/_processors/sql/iceberg.py
New IcebergProcessor, IcebergConfig, and IcebergTypeConverter implementing catalog/warehouse URL construction, Iceberg↔Arrow type mapping (including Airbyte internal columns), table existence/creation, and write/append/swap lifecycle methods.
Cache Implementation & Exports
airbyte/caches/iceberg.py, airbyte/caches/__init__.py
Adds IcebergCache (APIs: get_catalog, get_pandas_dataframe, get_arrow_dataset, get_iceberg_table, get_records), exposes IcebergCache/IcebergConfig and adds iceberg to package exports and TYPE_CHECKING.
Dependency Update
pyproject.toml
Adds pyiceberg[sql-sqlite,pyarrow]>=0.9.0,<1.0 to enable SQLite catalog and PyArrow integration.
Tests
tests/unit_tests/test_iceberg_cache.py
New unit tests for IcebergConfig initialization/URL behavior, IcebergTypeConverter mappings, IcebergCache inheritance/API, and IcebergProcessor flags.
Example
examples/run_iceberg_cache.py
New example demonstrating local SQLite catalog + warehouse usage, loading SpaceX streams into the Iceberg cache and reading back via pandas/pyarrow/PyIceberg.

Sequence Diagram(s)

sequenceDiagram
    participant User as User
    participant Cache as IcebergCache
    participant Proc as IcebergProcessor
    participant Catalog as IcebergCatalog
    participant Warehouse as Warehouse

    User->>Cache: get_pandas_dataframe(stream_name)
    Cache->>Proc: resolve SQL table name
    Proc-->>Cache: table_name
    Cache->>Catalog: load_table(namespace, table_name)
    Catalog->>Warehouse: fetch metadata & data files
    Warehouse-->>Catalog: metadata + data
    Catalog-->>Cache: Table object
    Cache->>Catalog: scan().to_pandas()
    Catalog->>Warehouse: read data
    Warehouse-->>Catalog: data
    Catalog-->>User: pandas.DataFrame
Loading
sequenceDiagram
    participant User as User
    participant Proc as IcebergProcessor
    participant TypeConv as IcebergTypeConverter
    participant Writer as JsonlWriter
    participant Catalog as IcebergCatalog
    participant Warehouse as Warehouse

    User->>Proc: write cache(files, stream_name, batch_id)
    Proc->>TypeConv: json_schema_to_iceberg_type(schema)
    TypeConv-->>Proc: Iceberg schema
    Proc->>Catalog: get_or_create_table(namespace, name)
    Catalog->>Warehouse: prepare table
    Catalog-->>Proc: Table object
    Proc->>Writer: read & normalize JSONL files
    Writer-->>Proc: normalized Arrow batches
    Proc->>Catalog: append(PyArrow batches)
    Catalog->>Warehouse: write/commit data files
    Warehouse-->>Catalog: commit complete
    Catalog-->>User: write complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and clearly summarizes the main change: adding a PyIceberg-based cache implementation for Apache Iceberg format. It is concise, specific, and directly reflects the core addition in the changeset.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1769653838-iceberg-cache

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 160-175: get_sql_alchemy_url currently returns the constant
"iceberg://" for non-SQL catalogs which causes SqlConfig.config_hash collisions
across different Iceberg catalog types; change get_sql_alchemy_url to return a
placeholder that includes unique catalog attributes (e.g., incorporate
self.catalog_type and a catalog identifier such as self._get_catalog_uri()
and/or self.warehouse) so each catalog produces a distinct URL string, or
alternatively override SqlConfig.config_hash to incorporate those same unique
fields; update the get_sql_alchemy_url method (and/or implement a config_hash
override) to embed catalog_type/uri/warehouse into the returned SecretString to
avoid collisions.
- Around line 115-121: The current model_post_init always overwrites schema_name
with namespace which drops legacy schema_name-only inputs; update
model_post_init to inspect the pydantic model's __fields_set__ (or
_pydantic_fields_set) to detect which of namespace or schema_name were
explicitly provided: if both provided and values differ, raise a ValueError
describing the mismatch; if only namespace provided, set schema_name =
namespace; if only schema_name provided, set namespace = schema_name; if neither
provided do nothing. Apply these checks inside model_post_init (referencing
self.namespace and self.schema_name) and continue using setattr to mutate the
frozen model.
- Around line 321-373: The issue is that nested object/array columns remain as
Python dict/list and cause PyArrow to infer struct/list types that conflict with
Iceberg's StringType; in _write_files_to_new_table detect columns in combined_df
that contain dict/list (or mixed types with dict/list), JSON-serialize those
values to strings (e.g., via json.dumps applied elementwise or using pandas'
.applymap/.astype after handling NaNs) so the DataFrame columns are plain
strings before calling pa.Table.from_pandas, and then call
pa.Table.from_pandas(combined_df, preserve_index=False) as before; reference
combined_df, expected_columns, iceberg_table.schema(), and pa.Table.from_pandas
when making the change.
- Around line 412-439: The current no-op hooks cause deletion of the real final
table because _write_files_to_new_table writes directly to the final table; fix
by making Iceberg explicitly protect the final table: update
_write_files_to_new_table to return a distinct temp identifier or set a flag
when it wrote directly to final, then implement
_append_temp_table_to_final_table and _swap_temp_table_with_final_table in
icebergs' class to check that temp_table_name != final_table_name (or check the
flag) and if they are equal simply log/return without performing any
drop/replace, and additionally override _drop_temp_table as a true no-op when
the temp identifier equals the final table (or when the flag is set) to avoid
deleting the final table; use the methods named _write_files_to_new_table,
_append_temp_table_to_final_table, _swap_temp_table_with_final_table, and
_drop_temp_table to locate and implement these checks.

devin-ai-integration bot and others added 2 commits January 29, 2026 02:55
- Fix model_post_init to properly handle both namespace and schema_name inputs
- Fix get_sql_alchemy_url to include unique attributes for non-SQL catalogs
- Add JSON serialization for dict/list columns before Arrow conversion

Co-Authored-By: AJ Steers <aj@airbyte.io>
Since Iceberg writes directly to the final table (not a temp table),
the base class may call _drop_temp_table with the final table name.
Making this a no-op prevents accidental deletion of data.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@aaronsteers aaronsteers marked this pull request as ready for review January 29, 2026 03:02
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 241-251: SqlProcessorBase builds self.type_converter from
type_converter_class but this Iceberg processor only annotates type_converter as
IcebergTypeConverter without changing type_converter_class, so the base __init__
will still instantiate SQLTypeConverter and later code (e.g., where
IcebergTypeConverter-specific methods are expected) will fail; set
type_converter_class = IcebergTypeConverter on the Iceberg processor class (or
alternatively override __init__ to instantiate IcebergTypeConverter and assign
to self.type_converter) so the correct converter type is created (refer to the
class attribute type_converter_class and the instance attribute type_converter).

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View issue and 4 additional flags in Devin Review.

Open in Devin Review

Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View issues and 6 additional flags in Devin Review.

Open in Devin Review

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 313-342: The current flow returns an existing table from
_get_or_create_iceberg_table and later _write_files_to_new_table blindly drops
incoming columns not present in iceberg_table.schema(), causing schema evolution
to be lost; update the table schema instead of dropping columns by detecting
missing columns (compare combined_df.columns or _get_iceberg_schema(stream_name)
against iceberg_table.schema().fields) and calling Iceberg's schema-evolution
API to add those columns (e.g., use the table/catalog schema update methods to
add_column for each new field and commit the change) before writing; implement
this either inside _get_or_create_iceberg_table (after load_table) or
immediately before the drop logic in _write_files_to_new_table so new fields are
appended to the Iceberg table schema using the real column names/types from
_get_iceberg_schema or the dataframe.
- Around line 446-474: The current Iceberg processor appends via
_write_files_to_new_table (uses iceberg_table.append()) and leaves
_swap_temp_table_with_final_table as a no-op, so REPLACE writes never remove old
data; override _write_temp_table_to_final_table to detect the REPLACE
write_strategy and perform an Iceberg overwrite (use the table overwrite API
instead of iceberg_table.append()) or implement
_swap_temp_table_with_final_table to call the overwrite operation during
REPLACE, ensuring the method names _write_files_to_new_table,
_write_temp_table_to_final_table, and _swap_temp_table_with_final_table are
updated to invoke the overwrite path when write_strategy ==
WriteDisposition.REPLACE (or equivalent) so old data is removed.

Comment on lines +313 to +342
@overrides
def _ensure_schema_exists(self) -> None:
"""Ensure the Iceberg namespace exists."""
with contextlib.suppress(NamespaceAlreadyExistsError):
self.catalog.create_namespace(self.namespace)

def _table_exists_iceberg(self, table_name: str) -> bool:
"""Check if an Iceberg table exists."""
try:
self.catalog.load_table(self._get_table_identifier(table_name))
except NoSuchTableError:
return False
else:
return True

def _create_iceberg_table(self, stream_name: str, table_name: str) -> Table:
"""Create an Iceberg table for the given stream."""
schema = self._get_iceberg_schema(stream_name)
return self.catalog.create_table(
identifier=self._get_table_identifier(table_name),
schema=schema,
)

def _get_or_create_iceberg_table(self, stream_name: str, table_name: str) -> Table:
"""Get an existing Iceberg table or create a new one."""
try:
return self.catalog.load_table(self._get_table_identifier(table_name))
except NoSuchTableError:
return self._create_iceberg_table(stream_name, table_name)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's find the file and understand its structure
find . -name "iceberg.py" -path "*/sql/*" | head -5

Repository: airbytehq/PyAirbyte

Length of output: 99


🏁 Script executed:

# Get the full context of the iceberg.py file to see related methods
wc -l airbyte/_processors/sql/iceberg.py

Repository: airbytehq/PyAirbyte

Length of output: 101


🏁 Script executed:

# Search for _write_files_to_new_table method to understand column handling
rg "_write_files_to_new_table" -A 30 airbyte/_processors/sql/iceberg.py

Repository: airbytehq/PyAirbyte

Length of output: 2146


🏁 Script executed:

# Search for schema-related methods and evolution patterns
rg "schema|evolve|update_schema|add_column" -i airbyte/_processors/sql/iceberg.py | head -20

Repository: airbytehq/PyAirbyte

Length of output: 1255


🏁 Script executed:

# Get the full _write_files_to_new_table method
rg "_write_files_to_new_table" -A 100 airbyte/_processors/sql/iceberg.py | head -150

Repository: airbytehq/PyAirbyte

Length of output: 4833


🏁 Script executed:

# Look for any schema update or column handling logic in the entire file
rg "update_schema|add_column|append|merge|overwrite" -i airbyte/_processors/sql/iceberg.py

Repository: airbytehq/PyAirbyte

Length of output: 581


🏁 Script executed:

# Check the get_sql_table_name and related table schema methods
rg "def _get_iceberg_schema|def get_sql_table_name" -A 15 airbyte/_processors/sql/iceberg.py

Repository: airbytehq/PyAirbyte

Length of output: 793


🏁 Script executed:

# Check imports to see what Iceberg APIs are available
head -50 airbyte/_processors/sql/iceberg.py | grep -E "^from pyiceberg|^import pyiceberg"

Repository: airbytehq/PyAirbyte

Length of output: 317


🏁 Script executed:

# Search for any test files that might show expected behavior
find . -name "*iceberg*test*" -o -name "*test*iceberg*" | head -10

Repository: airbytehq/PyAirbyte

Length of output: 103


🏁 Script executed:

# Check if there are any references to schema evolution in comments or docstrings
rg "schema.*evolv|evolv.*schema" -i airbyte/_processors/sql/iceberg.py

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check the test file for context on expected behavior
cat -n ./tests/unit_tests/test_iceberg_cache.py | head -100

Repository: airbytehq/PyAirbyte

Length of output: 4033


🏁 Script executed:

# Search in the entire codebase for any schema update/evolution patterns with pyiceberg
rg "table.update_schema|table.add_column|catalog.*schema" airbyte/_processors/sql/iceberg.py

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check the full context around _get_or_create_iceberg_table to understand the flow
rg "_get_or_create_iceberg_table|_create_iceberg_table" -B 5 -A 20 airbyte/_processors/sql/iceberg.py | head -100

Repository: airbytehq/PyAirbyte

Length of output: 3095


Schema evolution needed for Iceberg table updates

When a table already exists, _get_or_create_iceberg_table returns it unchanged, but _write_files_to_new_table then silently drops any columns from the incoming data that aren't in the existing table schema:

expected_columns = {field.name for field in iceberg_table.schema().fields}
columns_to_drop = [col for col in combined_df.columns if col not in expected_columns]
if columns_to_drop:
    combined_df = combined_df.drop(columns=columns_to_drop)

If the source schema evolves and adds new fields, they'll be lost during write. Should we call Iceberg's schema update API to add missing columns to the table when the incoming data contains new fields, wdyt?

🤖 Prompt for AI Agents
In `@airbyte/_processors/sql/iceberg.py` around lines 313 - 342, The current flow
returns an existing table from _get_or_create_iceberg_table and later
_write_files_to_new_table blindly drops incoming columns not present in
iceberg_table.schema(), causing schema evolution to be lost; update the table
schema instead of dropping columns by detecting missing columns (compare
combined_df.columns or _get_iceberg_schema(stream_name) against
iceberg_table.schema().fields) and calling Iceberg's schema-evolution API to add
those columns (e.g., use the table/catalog schema update methods to add_column
for each new field and commit the change) before writing; implement this either
inside _get_or_create_iceberg_table (after load_table) or immediately before the
drop logic in _write_files_to_new_table so new fields are appended to the
Iceberg table schema using the real column names/types from _get_iceberg_schema
or the dataframe.

devin-ai-integration bot and others added 2 commits January 29, 2026 03:17
- Add run_iceberg_cache.py example using source-spacex-api
- Fix IcebergTypeConverter to include to_sql_type method for interface compatibility
- Override _ensure_compatible_table_schema and _emulated_merge_temp_table_to_final_table
- Demonstrate reading data via pandas DataFrame, Arrow dataset, and Iceberg table

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 7 additional flags in Devin Review.

Open in Devin Review

…erter

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View issues and 8 additional flags in Devin Review.

Open in Devin Review

Comment on lines +163 to +173
def get_records(
self,
stream_name: str,
) -> CachedDataset:
"""Get records from the cache as a CachedDataset.

Note: For Iceberg caches, this returns a CachedDataset that wraps
the Iceberg table. For more efficient access to Iceberg-specific
features, use get_iceberg_table() or get_pandas_dataframe() directly.
"""
return CachedDataset(self, stream_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CachedDataset iteration fails because Iceberg data is not in SQLite

The get_records() method returns a CachedDataset that will fail when iterating or getting length.

Click to expand

Root Cause

IcebergCache.get_records() at airbyte/caches/iceberg.py:163-173 returns a CachedDataset. The CachedDataset class at airbyte/datasets/_sql.py:89-94 uses SQLAlchemy to iterate:

def __iter__(self) -> Iterator[dict[str, Any]]:
    with self._cache.processor.get_sql_connection() as conn:
        for row in conn.execute(self._query_statement):
            yield cast("dict[str, Any]", row._mapping)

The query select("*").select_from(text(f"{schema_name}.{table_name}")) queries the SQLite catalog database, but Iceberg data is stored in Parquet files, not in SQLite. The SQLite database only contains Iceberg metadata, not the actual data rows.

Impact

  • for record in cache.get_records("stream") will return no records or fail
  • len(cache.get_records("stream")) will return 0 or fail
  • cache.get_records("stream").to_sql_table() will fail

Note: to_pandas() and to_arrow() work because they call the overridden methods in IcebergCache.

Recommendation: Override get_records() to return a custom dataset class that reads from Iceberg tables instead of using SQLAlchemy queries, or implement custom __iter__ and __len__ methods that use get_iceberg_table().scan().to_arrow().

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a known limitation of the current implementation. The get_records() method returns a CachedDataset for API compatibility, but as noted in the docstring, users should prefer the Iceberg-specific methods for efficient access:

  • get_pandas_dataframe() - reads directly from Iceberg tables
  • get_arrow_dataset() - provides efficient Arrow-based access
  • get_iceberg_table() - gives direct access to the PyIceberg Table object

The example script demonstrates the recommended pattern using get_arrow_dataset() for iteration.

A future enhancement could implement a custom IcebergDataset class that properly iterates over Iceberg tables, but for the initial implementation, the Iceberg-specific methods provide the recommended access patterns.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@github-actions
Copy link

github-actions bot commented Jan 29, 2026

PyTest Results (Full)

415 tests  +19   398 ✅ +19   23m 6s ⏱️ - 1m 51s
  1 suites ± 0    17 💤 ± 0 
  1 files   ± 0     0 ❌ ± 0 

Results for commit 40b2ab5. ± Comparison against base commit 8418815.

♻️ This comment has been updated with latest results.

…rgCache

- Add ObjectTypingMode enum with 'nested_types' and 'as_json_strings' modes
- Add AdditionalPropertiesMode enum with 'fail', 'ignore', 'stringify' options
- Add AnyOfPropertiesMode enum with 'fail', 'branch', 'stringify' options
- Add helper functions for JSON column naming (get_json_column_name, get_additional_properties_column_name)
- Implement __json suffix convention for stringified columns
- Add special handling for _airbyte_meta (always StructType, never stringified)
- Refactor IcebergTypeConverter to use new configuration modes
- Update IcebergProcessor to pass new config to type converter
- Arrays are always strongly typed (ListType) regardless of object_typing mode

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 11 additional flags in Devin Review.

Open in Devin Review

…chemas

Use type_converter.next_field_id() for all fields (top-level and nested)
to ensure unique field IDs across the entire Iceberg schema. Previously,
top-level fields used a local counter starting at 1 while nested fields
used the class-level counter, which could cause duplicate field IDs.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 933-947: The code path in _write_files_to_new_table when
object_typing == ObjectTypingMode.NESTED_TYPES currently leaves dict/list
columns as-is and ignores schema-builder-generated placeholder columns like
__additional_properties__json and anyOf branch fields (e.g., field_str_val,
field_int_val), causing dropped data; update the logic in the block that
processes combined_df (the loop over combined_df.columns) to detect those
placeholder columns and any dict/list source columns and: (1) for
__additional_properties__json columns, collect keys present in the source dicts
that are not mapped to explicit schema fields, serialize those extra key/value
pairs into the __additional_properties__json column per-row; (2) for anyOf
branch fields (the branch placeholder columns created by the schema builder),
inspect the source dicts and populate the correct branch column(s) (e.g., set
field_str_val when the dict contains a string branch) instead of relying on
PyArrow coercion; and (3) if a configured fail/strict mode exists, raise a clear
error when unmapped branches/additional properties are found; implement these
steps near combined_df handling so schema placeholder columns are filled before
writing.

devin-ai-integration bot and others added 3 commits January 29, 2026 07:19
The nested_types mode requires exact schema matching which is complex
for real-world data. The as_json_strings mode (matching Kotlin destination
default behavior) is safer and works with any data structure.

Co-Authored-By: AJ Steers <aj@airbyte.io>
When object_typing=nested_types, columns with StringType in the Iceberg
schema (e.g., schemaless arrays without items definition) but containing
Python dict/list data now get serialized to JSON strings.

This is a partial fix - full nested_types support requires additional
work to handle null values for struct columns and Python dict to
PyArrow struct conversion.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View issues and 15 additional flags in Devin Review.

Open in Devin Review

- Parse ISO 8601 timestamp strings to datetime objects for Arrow timestamp types
- Handle timezone-aware and naive timestamps (default to UTC for naive)
- Parse date strings using fromisoformat
- Remove pandas dependency from data processing pipeline
- All type conversions now work correctly with nested_types mode

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 15 additional flags in Devin Review.

Open in Devin Review

Addresses Devin Review feedback about class-level _field_id_counter causing
global state pollution across all schemas. The counter is now an instance
variable, ensuring field IDs are isolated between different type converter
instances.

Changes:
- Move _field_id_counter from class variable to instance variable
- Convert next_field_id() from classmethod to instance method
- Add reset_field_id_counter() method for explicit counter reset
- Convert json_schema_to_iceberg_type, _handle_anyof_types, and
  _convert_object_to_struct from classmethods to instance methods
- Update all cls. references to self.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 17 additional flags in Devin Review.

Open in Devin Review

…_type

Addresses Devin Review feedback about bool() incorrectly converting string
'false' to True. Now explicitly handles string representations of booleans
by checking if the lowercase value is in {'true', '1', 'yes'}.

Co-Authored-By: AJ Steers <aj@airbyte.io>
The None check inside the boolean handling block was unreachable because
we already check for None at the start of convert_value_to_arrow_type.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 19 additional flags in Devin Review.

Open in Devin Review

Call reset_field_id_counter() at the beginning of _get_iceberg_schema()
to ensure each table's schema has field IDs starting from 1, rather than
continuing to increment across multiple stream schema generations.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View issues and 22 additional flags in Devin Review.

Open in Devin Review

Comment on lines +289 to +292
for field in arrow_schema:
field_name = field.name
value = normalized_record.get(field_name)
result[field_name] = convert_value_to_arrow_type(value, field.type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 AdditionalPropertiesMode.FAIL silently drops properties instead of failing

The additional_properties configuration mode FAIL is documented as "Fail if we encounter an undeclared property at runtime" but the implementation silently drops additional properties instead of raising an error.

Click to expand

How it happens

In convert_record_to_arrow_schema at lines 289-292, the function only processes fields that exist in the Arrow schema:

for field in arrow_schema:
    field_name = field.name
    value = normalized_record.get(field_name)
    result[field_name] = convert_value_to_arrow_type(value, field.type)

When additional_properties=FAIL and the JSON schema has additionalProperties: true, the Iceberg schema is created without a placeholder column for additional properties (airbyte/_processors/sql/iceberg.py:814-816). At runtime, any properties not in the schema are simply ignored by the loop above.

Actual vs Expected

  • Expected: When mode is FAIL and additional properties are present in the data, a PyAirbyteInputError should be raised
  • Actual: Additional properties are silently dropped, making FAIL mode behave identically to IGNORE mode

Impact

Users who configure additional_properties="fail" expecting strict validation will have data silently dropped without any warning or error.

Recommendation: Add a check in convert_record_to_arrow_schema to compare record keys against schema fields, and raise an error when additional_properties mode is FAIL and undeclared properties are found in the data.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid observation. The additional_properties=FAIL mode is designed to fail when undeclared properties are encountered at runtime (per the design discussion), but this runtime validation is not yet implemented in convert_record_to_arrow_schema.

This is a known limitation of the current implementation - the nested_types mode with strict validation is still being developed. For now, the as_json_strings mode is the recommended default for production use.

Will address this in a follow-up once the core functionality is stabilized.

Comment on lines +1117 to +1130
def _ensure_compatible_table_schema(
self,
stream_name: str,
table_name: str,
) -> None:
"""Ensure the Iceberg table schema is compatible with the stream schema.

For Iceberg, we handle schema evolution differently than SQL databases.
Iceberg has native schema evolution support, so we don't need to add
columns via ALTER TABLE. The schema is managed when creating the table.
"""
# Iceberg handles schema evolution natively - no action needed here
# The table schema is set when the table is created in _create_iceberg_table
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Schema evolution not implemented - new columns silently dropped

The _ensure_compatible_table_schema method is a no-op that claims Iceberg handles schema evolution natively, but Iceberg actually requires explicit API calls to add new columns. This causes data loss when source schemas evolve.

Click to expand

How it happens

At airbyte/_processors/sql/iceberg.py:1117-1130, the method is implemented as:

def _ensure_compatible_table_schema(
    self,
    stream_name: str,
    table_name: str,
) -> None:
    # Iceberg handles schema evolution natively - no action needed here
    pass

When a source's JSON schema changes to include new columns:

  1. _get_or_create_iceberg_table loads the existing table with the old schema
  2. arrow_schema = iceberg_table.schema().as_arrow() gets the old schema
  3. convert_record_to_arrow_schema only processes fields in the old schema (line 289-292)
  4. New columns in the data are silently dropped

Actual vs Expected

  • Expected: New columns in source data should either be added to the Iceberg table schema or raise an error
  • Actual: New columns are silently dropped because the schema conversion only includes fields from the existing table schema

Impact

When source schemas evolve (new fields added), all data for those new fields is permanently lost. This is especially problematic for incremental syncs where data cannot be easily re-synced.

Recommendation: Implement actual schema evolution by comparing the stream's JSON schema against the existing Iceberg table schema, then calling iceberg_table.update_schema().add_column(...).commit() for any new columns before writing data.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

…rings

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@airbyte/caches/iceberg.py`:
- Around line 128-149: get_arrow_dataset currently accepts max_chunk_size but
ignores it (arrow_table is fully in-memory and ds.dataset(arrow_table) does not
chunk), so update the get_arrow_dataset docstring to state that max_chunk_size
is accepted for API compatibility but is ignored for in-memory Arrow tables and
explain why (PyArrow dataset built from pa.Table is not chunked) and suggest the
alternative (load Parquet files via ds.dataset on file paths or use
iceberg_table.scan(...).to_batches()/to_table with batching) so callers know how
to get chunked reads; reference the get_arrow_dataset function, the
max_chunk_size parameter, arrow_table variable, and the ds.dataset(arrow_table)
call in the docstring.
- Around line 174-184: Update the get_records docstring in the get_records
method to explicitly state that the returned CachedDataset wraps an Iceberg
table and that CachedDataset.to_sql_table() is not supported for Iceberg caches
(especially with REST catalogs/non-SQLAlchemy URLs); recommend using
get_iceberg_table(), get_pandas_dataframe(), or to_arrow() instead for direct
access to Iceberg-specific features. Reference the CachedDataset class and the
to_sql_table(), get_iceberg_table(), get_pandas_dataframe(), and to_arrow()
methods in the message so readers can find the relevant APIs.
🧹 Nitpick comments (1)
airbyte/_processors/sql/iceberg.py (1)

452-459: Consider removing or implementing the empty validator, wdyt?

The _validate_typing_config method currently does nothing except pass. If there's planned validation logic, perhaps add a TODO comment. Otherwise, removing it would reduce code noise.

🧹 Proposed cleanup
-    `@model_validator`(mode="after")
-    def _validate_typing_config(self) -> IcebergConfig:
-        """Validate that typing configuration options are consistent."""
-        if self.object_typing == ObjectTypingMode.AS_JSON_STRINGS:
-            # additional_properties and anyof_properties are ignored in this mode
-            # but we don't need to warn - they just have no effect
-            pass
-        return self

Comment on lines +128 to +149
def get_arrow_dataset(
self,
stream_name: str,
*,
max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, # noqa: ARG002
) -> ds.Dataset:
"""Return an Arrow Dataset with the stream's data.

This method provides efficient access to the underlying Parquet files
through PyArrow's dataset interface.
"""
table_name = self._read_processor.get_sql_table_name(stream_name)
catalog = self.get_catalog()

try:
iceberg_table = catalog.load_table((self.namespace, table_name))
except NoSuchTableError as e:
raise KeyError(f"Stream '{stream_name}' not found in cache") from e

# Get the Arrow table from Iceberg and convert to dataset
arrow_table: pa.Table = iceberg_table.scan().to_arrow()
return ds.dataset(arrow_table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

The max_chunk_size parameter is accepted but ignored.

Users calling cache.get_arrow_dataset(stream_name, max_chunk_size=1000) might expect chunking behavior, but the parameter has no effect since ds.dataset(arrow_table) creates an in-memory dataset. Could we add a note in the docstring about this limitation, wdyt?

📝 Suggested docstring update
     def get_arrow_dataset(
         self,
         stream_name: str,
         *,
         max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,  # noqa: ARG002
     ) -> ds.Dataset:
         """Return an Arrow Dataset with the stream's data.

         This method provides efficient access to the underlying Parquet files
         through PyArrow's dataset interface.
+
+        Note:
+            The max_chunk_size parameter is accepted for interface compatibility
+            but does not affect chunking behavior for Iceberg-backed datasets.
         """
🤖 Prompt for AI Agents
In `@airbyte/caches/iceberg.py` around lines 128 - 149, get_arrow_dataset
currently accepts max_chunk_size but ignores it (arrow_table is fully in-memory
and ds.dataset(arrow_table) does not chunk), so update the get_arrow_dataset
docstring to state that max_chunk_size is accepted for API compatibility but is
ignored for in-memory Arrow tables and explain why (PyArrow dataset built from
pa.Table is not chunked) and suggest the alternative (load Parquet files via
ds.dataset on file paths or use iceberg_table.scan(...).to_batches()/to_table
with batching) so callers know how to get chunked reads; reference the
get_arrow_dataset function, the max_chunk_size parameter, arrow_table variable,
and the ds.dataset(arrow_table) call in the docstring.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I can add a note to the docstring clarifying that max_chunk_size is accepted for interface compatibility but doesn't affect chunking for Iceberg-backed datasets. Will defer to @aaronsteers on whether to address this now or in a follow-up.

Comment on lines +174 to +184
def get_records(
self,
stream_name: str,
) -> CachedDataset:
"""Get records from the cache as a CachedDataset.

Note: For Iceberg caches, this returns a CachedDataset that wraps
the Iceberg table. For more efficient access to Iceberg-specific
features, use get_iceberg_table() or get_pandas_dataframe() directly.
"""
return CachedDataset(self, stream_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the IcebergProcessor and check for get_sql_table method
echo "=== Checking IcebergProcessor for get_sql_table ==="
rg -n "class IcebergProcessor" -A 20 airbyte/_processors/

Repository: airbytehq/PyAirbyte

Length of output: 1664


🏁 Script executed:

#!/bin/bash
# Search for get_sql_table method definitions
echo "=== Searching for get_sql_table method ==="
rg -n "def get_sql_table" airbyte/ -B 2 -A 10

Repository: airbytehq/PyAirbyte

Length of output: 2892


🏁 Script executed:

#!/bin/bash
# Check CachedDataset implementation and to_sql_table
echo "=== Checking CachedDataset.to_sql_table ==="
rg -n "def to_sql_table" airbyte/ -B 2 -A 15

Repository: airbytehq/PyAirbyte

Length of output: 1522


🏁 Script executed:

#!/bin/bash
# Check if IcebergCache has any override for get_records
echo "=== Checking IcebergCache class structure ==="
rg -n "class IcebergCache" -A 50 airbyte/caches/

Repository: airbytehq/PyAirbyte

Length of output: 3579


🏁 Script executed:

#!/bin/bash
# Check _get_table_by_name implementation
echo "=== Checking _get_table_by_name in SqlProcessorBase ==="
rg -n "def _get_table_by_name" airbyte/ -B 2 -A 15

Repository: airbytehq/PyAirbyte

Length of output: 1423


🏁 Script executed:

#!/bin/bash
# Check if IcebergProcessor overrides _get_table_by_name
echo "=== Checking IcebergProcessor for _get_table_by_name override ==="
rg -n "_get_table_by_name" airbyte/_processors/sql/iceberg.py -B 2 -A 10

Repository: airbytehq/PyAirbyte

Length of output: 128


🏁 Script executed:

#!/bin/bash
# Check tests or usage of to_sql_table with IcebergCache
echo "=== Checking for to_sql_table usage with Iceberg ==="
rg -n "to_sql_table" tests/ -B 2 -A 5

Repository: airbytehq/PyAirbyte

Length of output: 2591


🏁 Script executed:

#!/bin/bash
# Get more of the _get_table_by_name implementation
echo "=== Full _get_table_by_name implementation ==="
rg -n "def _get_table_by_name" airbyte/shared/sql_processor.py -A 50 | head -70

Repository: airbytehq/PyAirbyte

Length of output: 2358


🏁 Script executed:

#!/bin/bash
# Check if test_all_cache_types uses IcebergCache
echo "=== Check if test uses IcebergCache ==="
rg -n "IcebergCache" tests/integration_tests/test_all_cache_types.py

Repository: airbytehq/PyAirbyte

Length of output: 102


🏁 Script executed:

#!/bin/bash
# Check what get_sql_engine returns for IcebergProcessor
echo "=== Checking IcebergProcessor.get_sql_engine ==="
rg -n "def get_sql_engine" airbyte/_processors/sql/iceberg.py -A 20

Repository: airbytehq/PyAirbyte

Length of output: 111


🏁 Script executed:

#!/bin/bash
# Check if IcebergProcessor has a get_sql_engine method or inherits it
echo "=== Checking SqlProcessorBase.get_sql_engine ==="
rg -n "def get_sql_engine" airbyte/shared/sql_processor.py -A 10

Repository: airbytehq/PyAirbyte

Length of output: 1161


🏁 Script executed:

#!/bin/bash
# Check IcebergProcessor sql_config and connection type
echo "=== Checking IcebergConfig and sql_config ==="
rg -n "class IcebergConfig" airbyte/ -A 30 | head -50

Repository: airbytehq/PyAirbyte

Length of output: 2449


🏁 Script executed:

#!/bin/bash
# Check IcebergConfig's get_sql_alchemy_url and get_sql_engine methods
echo "=== Checking IcebergConfig methods ==="
rg -n "def get_sql_alchemy_url|def get_sql_engine" airbyte/_processors/sql/iceberg.py -A 10

Repository: airbytehq/PyAirbyte

Length of output: 694


🏁 Script executed:

#!/bin/bash
# Check SqlConfig to understand the inheritance
echo "=== Checking SqlConfig class ==="
rg -n "class SqlConfig" airbyte/shared/ -A 40 | head -60

Repository: airbytehq/PyAirbyte

Length of output: 2864


🏁 Script executed:

#!/bin/bash
# Check if IcebergProcessor overrides any critical methods
echo "=== Full IcebergProcessor class methods ==="
rg -n "    def " airbyte/_processors/sql/iceberg.py | grep -E "(get_sql|_get_table)"

Repository: airbytehq/PyAirbyte

Length of output: 233


🏁 Script executed:

#!/bin/bash
# Check if there are any Iceberg-specific tests for to_sql_table
echo "=== Searching for Iceberg tests ==="
find tests/ -name "*iceberg*" -type f

Repository: airbytehq/PyAirbyte

Length of output: 137


🏁 Script executed:

#!/bin/bash
# Look for any tests that call to_sql_table on Iceberg
echo "=== Searching for to_sql_table in Iceberg tests ==="
rg -n "to_sql_table" tests/ --glob="*iceberg*"

Repository: airbytehq/PyAirbyte

Length of output: 114


🏁 Script executed:

#!/bin/bash
# Check the full IcebergConfig.get_sql_alchemy_url to understand the returned URL
echo "=== Full get_sql_alchemy_url implementation ==="
rg -n "def get_sql_alchemy_url" airbyte/_processors/sql/iceberg.py -A 30

Repository: airbytehq/PyAirbyte

Length of output: 1619


🏁 Script executed:

#!/bin/bash
# Check the Iceberg test file
echo "=== Checking IcebergCache tests ==="
cat tests/unit_tests/test_iceberg_cache.py | head -100

Repository: airbytehq/PyAirbyte

Length of output: 3418


🏁 Script executed:

#!/bin/bash
# Check if there are any integration tests that use get_sql_table with Iceberg
echo "=== Check all integration tests for IcebergCache usage ==="
rg -n "IcebergCache" tests/integration_tests/ -B 2 -A 5

Repository: airbytehq/PyAirbyte

Length of output: 121


🏁 Script executed:

#!/bin/bash
# Check what happens when we try to use a non-SQL catalog URL as SQLAlchemy URL
echo "=== Checking how sqlalchemy.Table is created with iceberg:// URL ==="
rg -n "autoload_with" airbyte/shared/sql_processor.py -B 5 -A 5

Repository: airbytehq/PyAirbyte

Length of output: 528


🏁 Script executed:

#!/bin/bash
# Look at the full docstring of get_records in IcebergCache
echo "=== Check the full get_records docstring ==="
sed -n '174,184p' airbyte/caches/iceberg.py

Repository: airbytehq/PyAirbyte

Length of output: 536


🏁 Script executed:

#!/bin/bash
# Check if CachedDataset handles Iceberg specially
echo "=== Check CachedDataset class definition ==="
rg -n "class CachedDataset" airbyte/datasets/ -A 30

Repository: airbytehq/PyAirbyte

Length of output: 2317


🏁 Script executed:

#!/bin/bash
# Final check: verify what error would occur if to_sql_table is called with REST catalog
echo "=== Verify the iceberg:// URL would fail with SQLAlchemy ==="
python3 << 'EOF'
from sqlalchemy import create_engine

# This is what would happen with a REST catalog
url = "iceberg://rest:https://example.com:warehouse_path"
try:
    engine = create_engine(url)
    print(f"URL '{url}' created engine successfully")
except Exception as e:
    print(f"URL '{url}' failed with error: {type(e).__name__}: {e}")

# This is what works with SQL catalog
url2 = "sqlite:///test.db"
try:
    engine = create_engine(url2)
    print(f"URL '{url2}' created engine successfully")
except Exception as e:
    print(f"URL '{url2}' failed with error: {type(e).__name__}: {e}")
EOF

Repository: airbytehq/PyAirbyte

Length of output: 263


🏁 Script executed:

#!/bin/bash
# Check if there's any error handling around to_sql_table for non-SQL backends
echo "=== Check if there are any safeguards in to_sql_table ==="
sed -n '207,210p' airbyte/datasets/_sql.py

Repository: airbytehq/PyAirbyte

Length of output: 303


🏁 Script executed:

#!/bin/bash
# Check if the docstring mentions the limitation adequately
echo "=== Full get_records method context ==="
sed -n '174,184p' airbyte/caches/iceberg.py

Repository: airbytehq/PyAirbyte

Length of output: 531


🏁 Script executed:

#!/bin/bash
# Check what other cache implementations do
echo "=== Check DuckDB cache get_records ==="
rg -n "def get_records" airbyte/caches/duckdb.py -A 15

Repository: airbytehq/PyAirbyte

Length of output: 101


Expand the docstring to explicitly document the to_sql_table() limitation.

The existing docstring recommends using get_iceberg_table() or get_pandas_dataframe() for efficiency, but users might not realize that to_sql_table() will fail (especially with REST catalogs using non-SQLAlchemy URLs). Consider clarifying that to_sql_table() is not supported for Iceberg caches, wdyt?

Something like:

Note: For Iceberg caches, this returns a CachedDataset that wraps
the Iceberg table. The `to_sql_table()` method is not supported—instead,
use `get_iceberg_table()`, `get_pandas_dataframe()`, or `to_arrow()` for
direct access to Iceberg-specific features.
🤖 Prompt for AI Agents
In `@airbyte/caches/iceberg.py` around lines 174 - 184, Update the get_records
docstring in the get_records method to explicitly state that the returned
CachedDataset wraps an Iceberg table and that CachedDataset.to_sql_table() is
not supported for Iceberg caches (especially with REST catalogs/non-SQLAlchemy
URLs); recommend using get_iceberg_table(), get_pandas_dataframe(), or
to_arrow() instead for direct access to Iceberg-specific features. Reference the
CachedDataset class and the to_sql_table(), get_iceberg_table(),
get_pandas_dataframe(), and to_arrow() methods in the message so readers can
find the relevant APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point about the to_sql_table() limitation. I can expand the docstring to clarify that to_sql_table() is not supported for Iceberg caches (especially with REST catalogs). Will defer to @aaronsteers on whether to address this now or in a follow-up.

with contextlib.suppress(ValueError):
return int(value)
# Try float first then int (for "123.0" -> 123)
with contextlib.suppress(ValueError):
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 465-472: _get_warehouse_path currently only creates directories
when warehouse_path is a Path, so string local paths never get mkdir'd; update
_get_warehouse_path to detect when warehouse_path is a str that represents a
local filesystem path (e.g., does not look like a URI/scheme such as contains
'://' or matches a known remote scheme) then convert it to a Path, call
mkdir(parents=True, exist_ok=True), and return its absolute string; keep the
existing branch that returns non-local strings unchanged and continue to handle
Path inputs the same way.
- Around line 1060-1117: The implementation of _write_files_to_new_table
accumulates all converted_records across files causing unbounded memory use;
change it to convert and build a PyArrow table per file and call
iceberg_table.append(...) for each file instead of after the loop. Concretely,
inside the for file_path in files loop, read and convert each line into a
per-file list (replace converted_records with a local per_file_records), build
arrays from arrow_schema for that file, construct a pa.Table and call
iceberg_table.append(pa_table) immediately, and only return table_name at the
end; keep the arrow_schema retrieval via iceberg_table.schema().as_arrow() and
reuse it across files. Ensure you skip appending if a file yields no records so
behavior matches the original early-return semantics.

Comment on lines 1060 to 1117
def _write_files_to_new_table(
self,
files: list[Path],
stream_name: str,
batch_id: str, # noqa: ARG002 # Required by base class interface
) -> str:
"""Write files to a new Iceberg table.

Unlike SQL databases, we write directly to the final table in Iceberg
since Iceberg handles transactions and versioning natively.

This method reads JSONL files directly (without pandas) and uses a
recursive converter to transform Python objects to match the Iceberg
schema. This handles:
- Null values for struct/list columns
- Python dicts -> Arrow structs
- Python lists -> Arrow lists
- Schemaless arrays/objects -> JSON strings
"""
table_name = self.get_sql_table_name(stream_name)

# Get or create the Iceberg table (schema is declared before parsing records)
iceberg_table = self._get_or_create_iceberg_table(stream_name, table_name)
arrow_schema = iceberg_table.schema().as_arrow()

# Read all JSONL files and convert records to match the Arrow schema
converted_records: list[dict[str, Any]] = []
for file_path in files:
# Read gzip-compressed JSONL file line by line (progressive read for large files)
with gzip.open(file_path, mode="rt", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
record = json.loads(line)
# Convert record to match the Arrow schema
converted = convert_record_to_arrow_schema(
record, arrow_schema, self.normalizer
)
converted_records.append(converted)

if not converted_records:
return table_name

# Build PyArrow arrays for each column
arrays: list[pa.Array] = []
for field in arrow_schema:
column_values = [record.get(field.name) for record in converted_records]
# Create array with the correct type
array = pa.array(column_values, type=field.type)
arrays.append(array)

# Create PyArrow table from arrays
arrow_table = pa.Table.from_arrays(arrays, schema=arrow_schema)

# Append to the Iceberg table
iceberg_table.append(arrow_table)

return table_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

PyArrow append batch memory best practices Iceberg

💡 Result:

PyArrow: appending batches without blowing up memory

  • Prefer streaming RecordBatches over materializing a big Table. Use a pyarrow.RecordBatchReader (or an iterator of RecordBatches) as your pipeline boundary; only call read_all() (materializes a Table) when you truly need it. [8]
  • Avoid repeatedly “appending” by concatenating growing tables. Repeated concat_tables([t, new_t]) in a loop can create lots of intermediate objects; if you must concatenate, keep it zero‑copy by ensuring schemas match and leaving promote=False. [1]
  • Don’t call combine_chunks() unless required. It (logically) merges chunks and can require additional allocations via the memory pool. [6]
  • Write incrementally to Parquet from batches. Use pyarrow.parquet.ParquetWriter.write_batch(...) to stream batches into row groups instead of building a huge in-memory table first; tune row_group_size to your workload. [2]

Iceberg (PyIceberg): ingestion best practices with batches

  • Know the current limitation: PyIceberg’s table.append(...) API takes a PyArrow Table, so a pure “append RecordBatches directly” workflow isn’t (yet) first-class. [7]
  • Practical pattern for batch/stream ingestion:
    1. Stream your data as RecordBatches and write Parquet incrementally with ParquetWriter.write_batch(...). [2]
    2. Commit those Parquet files into Iceberg using table.add_files([...]), which avoids needing one giant pa.Table in memory. [7]
  • For reads (when validating writes), prefer batch streaming: tbl.scan().to_arrow_batch_reader() returns a RecordBatchReader specifically to reduce memory vs to_arrow() (which eagerly loads everything). [4]

Sources

[1] Apache Arrow docs: pyarrow.concat_tables (zero-copy when promote=False) (arrow.apache.org)
[2] Apache Arrow docs: pyarrow.parquet.ParquetWriter / write_batch (arrow.apache.org)
[4] PyIceberg table scan API: to_arrow_batch_reader memory note (py.iceberg.apache.org)
[6] Apache Arrow docs: pyarrow.Table.combine_chunks (arrow.apache.org)
[7] PyIceberg table API: append(pa.Table) and add_files(...) (py.iceberg.apache.org)
[8] Apache Arrow docs: pyarrow.RecordBatchReader / read_all materializes a Table (arrow.apache.org)

Citations:


🌐 Web query:

PyArrow memory efficiency large datasets multiple batches

💡 Result:

For memory-efficient PyArrow processing of large datasets, prefer a streaming, batch-at-a-time approach and avoid materializing a full Table unless you truly need it:

  • Scan/iterate in RecordBatches (streaming) instead of dataset.to_table(): use pyarrow.dataset and a Scanner so you can process one batch at a time (and keep peak RAM bounded by batch size + readahead buffers). The dataset scan API is designed to only load/project needed columns and can push down filters where possible. [1][2]
  • Tune batch sizing for RAM: reduce batch_size if batches “overflow memory” (PyArrow explicitly calls this out). [1][2]
  • Limit read-ahead (often the hidden RAM multiplier): batch_readahead (batches per file) and fragment_readahead (files) increase RAM usage when raised; lower them if you’re memory-bound. [1][2]
  • Project columns early: scanning with a column subset avoids “loading, copying, and deserializing” columns you don’t need. [1][2]
  • Avoid combine_chunks() on very large/chunked tables unless required: it concatenates chunks into larger contiguous buffers, which can increase peak memory due to new allocations/copies. [3]

Practical pattern: dataset.scanner(columns=[...], filter=..., batch_size=..., batch_readahead=..., fragment_readahead=...) → iterate scanner.to_batches() / scanner.scan_batches() → process/write each batch incrementally (so you never hold the full dataset in memory). [1][2]

Sources: Apache Arrow PyArrow docs on Dataset/Scanner scan options (batch size + readahead + projection) [1][2] and Table.combine_chunks() behavior [3].


🌐 Web query:

Iceberg table append multiple batches vs single batch

💡 Result:

In Apache Iceberg, each append commit creates a new snapshot (a new table version) and updates table metadata. So the main difference between many appends (multiple batches) vs one append (single batch) is how many commits/snapshots/manifests/files you create.

Many appends (multiple small batches)

Pros

  • Lower latency / better for streaming-style ingestion (commit as data arrives).
  • Iceberg supports “fast append” behavior that can add new manifests instead of rewriting existing ones, reducing write work per commit. [1]

Cons

  • Metadata growth: every batch ⇒ new snapshot; frequent commits accumulate snapshots quickly and increase metadata that must be tracked and planned against. [2]
  • Small files + many manifests: small batches often create many small data files; and fast append can produce many small manifest files unless you rewrite/compact them. This degrades query planning and read performance until you maintain the table. [2]
  • Requires regular maintenance (expire snapshots, rewrite data files/manifests, etc.). [3]

Iceberg explicitly recommends tuning commit rate for streaming (e.g., avoid very frequent triggers) because high commit rates produce lots of files/manifests/snapshots. [4]

One append (single larger batch)

Pros

  • Fewer snapshots/commits ⇒ less metadata churn and fewer manifest lists to track. (Each write creates a snapshot.) [3]
  • Typically produces larger files (closer to target size), which is better for read performance and metadata size.

Cons

  • Higher end-to-end latency (you wait to accumulate data).
  • Larger single commits can increase the “blast radius” of a failed job (more data to retry) and may contend more with other writers (depending on your engine and commit coordination).

Practical guidance (rule of thumb)

  • If you can, prefer fewer, larger appends to reduce snapshot/metadata overhead (especially for batch ingestion). (Each write ⇒ snapshot.) [3]
  • If you must write many small batches (streaming / micro-batching), plan for:
    • Less frequent commits (increase trigger interval) to reduce snapshot/file/manifest proliferation. [4]
    • Compaction: run rewrite_data_files to address small files. [2]
    • Manifest rewrite: run rewrite_manifests to reduce many small manifests created by fast appends. [2]
    • Snapshot expiration to control metadata growth. [2][3]
  • Use table properties to target sane file sizes (e.g., write.target-file-size-bytes). [5]
  • Be careful with “fastest write” settings that skip reshuffles: e.g., write.distribution-mode=none can speed writes (esp. streaming) but tends to produce many small files and hurt reads. [6]

Bottom line: Multiple batches are fine (and common) but usually cost you more snapshots + more small files/manifests, so you’ll need maintenance and/or larger micro-batches. Single batch generally yields cleaner metadata and file layout (better for reads) at the cost of higher latency.

Sources: Apache Iceberg spec and docs, plus AWS guidance. [1][2][3][4][5][6]


Cap memory usage by appending per-file instead of accumulating all records.

The current implementation loads all rows from all files into converted_records before building PyArrow arrays, which risks OOM on large syncs. Since iceberg_table.append() accepts individual PyArrow tables, could we move the array-building and append logic inside the file loop to batch per-file? This keeps peak memory bounded to file-size rather than total-sync-size. The suggested diff approach would work well here—thoughts?

💡 Suggested batching approach (per file)
-        # Read all JSONL files and convert records to match the Arrow schema
-        converted_records: list[dict[str, Any]] = []
         for file_path in files:
+            converted_records: list[dict[str, Any]] = []
             # Read gzip-compressed JSONL file line by line (progressive read for large files)
             with gzip.open(file_path, mode="rt", encoding="utf-8") as f:
                 for line in f:
                     if not line.strip():
                         continue
                     record = json.loads(line)
                     # Convert record to match the Arrow schema
                     converted = convert_record_to_arrow_schema(
                         record, arrow_schema, self.normalizer
                     )
                     converted_records.append(converted)
 
-        if not converted_records:
-            return table_name
+            if not converted_records:
+                continue
 
-        # Build PyArrow arrays for each column
-        arrays: list[pa.Array] = []
-        for field in arrow_schema:
-            column_values = [record.get(field.name) for record in converted_records]
-            # Create array with the correct type
-            array = pa.array(column_values, type=field.type)
-            arrays.append(array)
+            # Build PyArrow arrays for each column
+            arrays: list[pa.Array] = []
+            for field in arrow_schema:
+                column_values = [record.get(field.name) for record in converted_records]
+                # Create array with the correct type
+                array = pa.array(column_values, type=field.type)
+                arrays.append(array)
 
-        # Create PyArrow table from arrays
-        arrow_table = pa.Table.from_arrays(arrays, schema=arrow_schema)
+            # Create PyArrow table from arrays
+            arrow_table = pa.Table.from_arrays(arrays, schema=arrow_schema)
 
-        # Append to the Iceberg table
-        iceberg_table.append(arrow_table)
+            # Append to the Iceberg table
+            iceberg_table.append(arrow_table)
 
         return table_name
🤖 Prompt for AI Agents
In `@airbyte/_processors/sql/iceberg.py` around lines 1060 - 1117, The
implementation of _write_files_to_new_table accumulates all converted_records
across files causing unbounded memory use; change it to convert and build a
PyArrow table per file and call iceberg_table.append(...) for each file instead
of after the loop. Concretely, inside the for file_path in files loop, read and
convert each line into a per-file list (replace converted_records with a local
per_file_records), build arrays from arrow_schema for that file, construct a
pa.Table and call iceberg_table.append(pa_table) immediately, and only return
table_name at the end; keep the arrow_schema retrieval via
iceberg_table.schema().as_arrow() and reuse it across files. Ensure you skip
appending if a file yields no records so behavior matches the original
early-return semantics.

…syncs

- Modified _write_files_to_new_table to store Arrow tables in memory instead of writing immediately
- Updated _append_temp_table_to_final_table to use stored Arrow table with append()
- Implemented _emulated_merge_temp_table_to_final_table using PyIceberg upsert() for merge-on-read semantics
- Updated _swap_temp_table_with_final_table to use overwrite() for REPLACE write method
- Added _init_pending_tables helper to initialize the pending tables storage

This two-phase write approach allows using the correct Iceberg operation (append vs upsert vs overwrite) based on the write method, enabling proper deduplication for incremental syncs.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
airbyte/_processors/sql/iceberg.py (3)

465-472: Consider creating local directories for string paths too?

Currently, _get_warehouse_path only creates directories when warehouse_path is a Path object. If a user passes a local filesystem path as a plain string (e.g., "/path/to/warehouse"), the directory won't be created automatically, which could lead to runtime failures when PyIceberg tries to write.

Would it make sense to detect local path strings (those without a :// scheme) and create them too, wdyt?

💡 Suggested enhancement
     def _get_warehouse_path(self) -> str:
         """Get the warehouse path as a string."""
         warehouse = self.warehouse_path
         if isinstance(warehouse, Path):
             # Ensure the directory exists for local warehouses
             warehouse.mkdir(parents=True, exist_ok=True)
             return str(warehouse.absolute())
+        # Handle string paths that look like local filesystem paths
+        if isinstance(warehouse, str) and "://" not in warehouse:
+            local_path = Path(warehouse)
+            local_path.mkdir(parents=True, exist_ok=True)
+            return str(local_path.absolute())
         return warehouse

910-912: Class-level dict annotation may cause confusion.

_pending_arrow_tables is annotated at the class level without a default value, but _init_pending_tables assigns an instance-level dict. This works correctly (the annotation is just a type hint), but it might be clearer to either:

  1. Initialize it in __init__ (would require overriding), or
  2. Add a comment clarifying this is just a type annotation

The current lazy initialization pattern in _init_pending_tables is fine, just wanted to flag it in case it wasn't intentional, wdyt?


1109-1137: Memory usage scales with total sync size.

The current implementation accumulates all records from all files into converted_records before building the Arrow table. For large syncs with many files, this could cause significant memory pressure.

Since iceberg_table.append() can be called multiple times (each call creates a new commit), would it make sense to process and append per-file instead of accumulating everything first? This would cap memory at file-size rather than total-sync-size.

The trade-off is more Iceberg commits (more metadata overhead), but it prevents OOM for large datasets. You could also batch by record count as a middle ground, wdyt?

💡 Per-file batching approach
-        # Read all JSONL files and convert records to match the Arrow schema
-        converted_records: list[dict[str, Any]] = []
         for file_path in files:
+            converted_records: list[dict[str, Any]] = []
             # Read gzip-compressed JSONL file line by line
             with gzip.open(file_path, mode="rt", encoding="utf-8") as f:
                 for line in f:
                     if not line.strip():
                         continue
                     record = json.loads(line)
                     converted = convert_record_to_arrow_schema(
                         record, arrow_schema, self.normalizer
                     )
                     converted_records.append(converted)
 
-        if not converted_records:
-            return table_name
+            if not converted_records:
+                continue
 
-        # Build PyArrow arrays for each column
-        arrays: list[pa.Array] = []
-        # ... rest of array building and storage
+            # Build and store per-file Arrow table
+            arrays = [
+                pa.array([r.get(f.name) for r in converted_records], type=f.type)
+                for f in arrow_schema
+            ]
+            file_arrow_table = pa.Table.from_arrays(arrays, schema=arrow_schema)
+            
+            # Accumulate or write per-file based on strategy
+            if final_table_name in self._pending_arrow_tables:
+                self._pending_arrow_tables[final_table_name] = pa.concat_tables([
+                    self._pending_arrow_tables[final_table_name], file_arrow_table
+                ])
+            else:
+                self._pending_arrow_tables[final_table_name] = file_arrow_table

Comment on lines +1159 to +1173
@overrides
def _ensure_compatible_table_schema(
self,
stream_name: str,
table_name: str,
) -> None:
"""Ensure the Iceberg table schema is compatible with the stream schema.

For Iceberg, we handle schema evolution differently than SQL databases.
Iceberg has native schema evolution support, so we don't need to add
columns via ALTER TABLE. The schema is managed when creating the table.
"""
# Iceberg handles schema evolution natively - no action needed here
# The table schema is set when the table is created in _create_iceberg_table
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, check if the file exists and examine the method in question
fd "iceberg.py" --type f

Repository: airbytehq/PyAirbyte

Length of output: 123


🏁 Script executed:

# Look for the specific method and surrounding context
rg -n "_ensure_compatible_table_schema" airbyte/_processors/sql/iceberg.py -A 20

Repository: airbytehq/PyAirbyte

Length of output: 1013


🏁 Script executed:

# Search for schema update logic in the iceberg file
rg -n "update_schema|schema.*evolution|add_column" airbyte/_processors/sql/iceberg.py -B 2 -A 2

Repository: airbytehq/PyAirbyte

Length of output: 612


🏁 Script executed:

# Search for _write_files_to_new_table method to understand how data is written
rg -n "_write_files_to_new_table" airbyte/_processors/sql/iceberg.py -A 30

Repository: airbytehq/PyAirbyte

Length of output: 4341


🏁 Script executed:

# Check if update_schema is used anywhere in the codebase
rg -n "update_schema" airbyte/ --type py

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Find _get_or_create_iceberg_table method
rg -n "_get_or_create_iceberg_table" airbyte/_processors/sql/iceberg.py -A 25

Repository: airbytehq/PyAirbyte

Length of output: 2684


🏁 Script executed:

# Look for schema comparison logic and see how records are converted
rg -n "converted_records|convert.*record|_normalize_record" airbyte/_processors/sql/iceberg.py -B 2 -A 10

Repository: airbytehq/PyAirbyte

Length of output: 2762


🏁 Script executed:

# Check if there's any incoming stream schema handling
rg -n "stream.*schema|get_stream_schema" airbyte/_processors/sql/iceberg.py -B 2 -A 5

Repository: airbytehq/PyAirbyte

Length of output: 880


🏁 Script executed:

# Search for schema comparison in the broader codebase
rg -n "compare.*schema|schema.*mismatch|schema.*evolution" airbyte/_processors/sql/ -B 2 -A 2

Repository: airbytehq/PyAirbyte

Length of output: 927


🏁 Script executed:

# Look at the full convert_record_to_arrow_schema function implementation
rg -n "def convert_record_to_arrow_schema" airbyte/_processors/sql/iceberg.py -A 35

Repository: airbytehq/PyAirbyte

Length of output: 1352


🏁 Script executed:

# Check if there's any logic in _create_iceberg_table that might handle schema evolution
rg -n "_create_iceberg_table" airbyte/_processors/sql/iceberg.py -A 20

Repository: airbytehq/PyAirbyte

Length of output: 4547


🏁 Script executed:

# Check if there's any validation or comparison of stream schema vs table schema
rg -n "get_stream_schema|stream_schema|expected.*schema" airbyte/_processors/sql/iceberg.py

Repository: airbytehq/PyAirbyte

Length of output: 195


Silently dropping new fields on schema evolution.

The concern is valid: when the source schema evolves with new fields, convert_record_to_arrow_schema (lines 284-313) iterates only through the existing table schema and silently discards any extra fields from the incoming records. This happens at line 1130 where column values are extracted only for fields in the existing arrow_schema, causing data loss.

Additionally, _write_files_to_new_table (line 1107) loads the existing table's schema rather than comparing it against the current stream schema, so evolved columns are never detected or added.

The current _ensure_compatible_table_schema is a no-op, yet the docstring claims "Iceberg has native schema evolution support" — which is true for PyIceberg (via update_schema()), but this code doesn't use it.

Should we implement proper schema evolution by comparing the incoming stream schema against the existing table schema and calling iceberg_table.update_schema() to add missing columns, or would you prefer to document this as a known limitation for now, wdyt?

When warehouse_path is passed as a string (not Path object) representing
a local path (no URI scheme like s3://), now creates the directory to
avoid runtime failures. This matches the behavior for Path objects.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 26 additional flags in Devin Review.

Open in Devin Review

Comment on lines +828 to +832
# Check if field is nullable (default to True for safety)
prop_type = prop_def.get("type")
is_nullable = True
if isinstance(prop_type, list):
is_nullable = "null" in prop_type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Union types without explicit null marked as required, may cause write failures

In _convert_object_to_struct, when the JSON schema property type is a list like ["string", "integer"] (without "null"), the field is marked as required=True in the Iceberg schema. This may cause write failures when records don't have values for these fields.

Click to expand

Code at airbyte/_processors/sql/iceberg.py:828-832:

# Check if field is nullable (default to True for safety)
prop_type = prop_def.get("type")
is_nullable = True
if isinstance(prop_type, list):
    is_nullable = "null" in prop_type

Behavior:

  • If prop_type = "string" (not a list): is_nullable = True → field is optional ✓
  • If prop_type = ["string", "null"]: is_nullable = True → field is optional ✓
  • If prop_type = ["string", "integer"]: is_nullable = False → field is required

Impact:

When a source sends records with missing values for fields that have union types without explicit null (e.g., anyOf(string, integer)), Iceberg may reject the write because the schema marks these fields as required. The comment states "default to True for safety" but the implementation doesn't apply this safety default to list-type unions without null.

Recommendation: Consider keeping is_nullable = True for all cases within _convert_object_to_struct to maintain the safety default, or explicitly handle union types without null as nullable since records may legitimately have missing values for these fields.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 99-100: Docstrings reference the wrong column name for the
placeholder JSON column; update the text to match the actual name used by the
helper. Locate the stringify function and any related docstrings (mentions of
`_additional_properties_json`) and change them to `_additional_properties__json`
(double underscore) so docs and code align; ensure any occurrence in function
docstrings or comments (including the section around the helper that documents
placeholder column behavior) is updated consistently.
- Around line 458-543: The code silently falls back to a SQLite URI in
_get_catalog_uri when catalog_uri is missing, causing later failures for
REST/Hive catalogs; add an early validation (e.g., a pydantic validator or a
model validate/__post_init__ hook) that checks catalog_type and ensures
catalog_uri is provided when catalog_type is "rest" or "hive" (or any
non-"sql"/non-"glue" types that require a URI) and raise a clear ValueError
mentioning catalog_type and missing catalog_uri; reference the fields
catalog_type, catalog_uri and the methods _get_catalog_uri and get_catalog so
the validator prevents calling get_catalog with an invalid config.

Comment on lines +99 to +100
- `stringify`: Create a placeholder column `_additional_properties_json` to hold
additional property data as a JSON string.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Align docstrings with the actual _additional_properties__json column name?

The docs mention _additional_properties_json, but the helper builds _additional_properties__json. Could we align the wording to avoid confusion, wdyt?

✏️ Docstring adjustment
-    - `stringify`: Create a placeholder column `_additional_properties_json` to hold
+    - `stringify`: Create a placeholder column `_additional_properties__json` to hold
-    - 'stringify': Create `_additional_properties_json` column for extra data."""
+    - 'stringify': Create `_additional_properties__json` column for extra data."""

Also applies to: 432-437

🤖 Prompt for AI Agents
In `@airbyte/_processors/sql/iceberg.py` around lines 99 - 100, Docstrings
reference the wrong column name for the placeholder JSON column; update the text
to match the actual name used by the helper. Locate the stringify function and
any related docstrings (mentions of `_additional_properties_json`) and change
them to `_additional_properties__json` (double underscore) so docs and code
align; ensure any occurrence in function docstrings or comments (including the
section around the helper that documents placeholder column behavior) is updated
consistently.

Comment on lines +458 to +543
def _get_catalog_uri(self) -> str:
"""Get the catalog URI, using default if not specified."""
if self.catalog_uri:
return self.catalog_uri
# Default to SQLite catalog in the cache directory
return f"sqlite:///{DEFAULT_ICEBERG_CATALOG_DB}"

def _get_warehouse_path(self) -> str:
"""Get the warehouse path as a string.

For local paths (both Path objects and strings without URI schemes),
ensures the directory exists before returning.
"""
warehouse = self.warehouse_path
if isinstance(warehouse, Path):
# Ensure the directory exists for local warehouses
warehouse.mkdir(parents=True, exist_ok=True)
return str(warehouse.absolute())
# Handle string paths - create directory for local paths (no URI scheme)
if isinstance(warehouse, str) and "://" not in warehouse:
local_path = Path(warehouse)
local_path.mkdir(parents=True, exist_ok=True)
return str(local_path.absolute())
return warehouse

def _get_s3_config(self) -> dict[str, str]:
"""Get S3-specific configuration for PyIceberg.

Returns a dictionary of S3 configuration options that PyIceberg expects.
See: https://py.iceberg.apache.org/configuration/#s3
"""
s3_config: dict[str, str] = {}

if self.aws_access_key_id:
s3_config["s3.access-key-id"] = str(self.aws_access_key_id)
if self.aws_secret_access_key:
s3_config["s3.secret-access-key"] = str(self.aws_secret_access_key)
if self.s3_bucket_region:
s3_config["s3.region"] = self.s3_bucket_region
if self.s3_endpoint:
s3_config["s3.endpoint"] = self.s3_endpoint

return s3_config

def _get_glue_config(self) -> dict[str, str]:
"""Get Glue-specific configuration for PyIceberg.

Returns a dictionary of Glue configuration options that PyIceberg expects.
See: https://py.iceberg.apache.org/configuration/#glue-catalog
"""
glue_config: dict[str, str] = {}

if self.glue_id:
glue_config["glue.id"] = self.glue_id
if self.s3_bucket_region:
glue_config["glue.region"] = self.s3_bucket_region

return glue_config

def get_catalog(self) -> Catalog:
"""Get or create the Iceberg catalog instance."""
if self._catalog is not None:
return self._catalog

catalog_config: dict[str, Any] = {
"warehouse": self._get_warehouse_path(),
}

# Add S3 configuration if any S3 settings are provided
catalog_config.update(self._get_s3_config())

if self.catalog_type == "sql":
catalog_config["uri"] = self._get_catalog_uri()
self._catalog = SqlCatalog(self.catalog_name, **catalog_config)
elif self.catalog_type == "glue":
# Glue catalog configuration
catalog_config["type"] = "glue"
catalog_config.update(self._get_glue_config())
self._catalog = load_catalog(self.catalog_name, **catalog_config)
else:
# For REST, Hive, etc., use the generic load_catalog
catalog_config["type"] = self.catalog_type
catalog_config["uri"] = self._get_catalog_uri()
if self.catalog_credential:
catalog_config["credential"] = str(self.catalog_credential)
self._catalog = load_catalog(self.catalog_name, **catalog_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the full IcebergConfig class to understand the model validator
head -n 500 airbyte/_processors/sql/iceberg.py | tail -n 200

Repository: airbytehq/PyAirbyte

Length of output: 8165


🏁 Script executed:

# Find the model_validator and initialization code
rg -n "model_validator|catalog_type.*=|catalog_uri" airbyte/_processors/sql/iceberg.py | head -30

Repository: airbytehq/PyAirbyte

Length of output: 989


🏁 Script executed:

# Look for the full _validate_typing_config and related validators
ast-grep --pattern $'@model_validator
$$$
def $_($$$) {
  $$$
}'

Repository: airbytehq/PyAirbyte

Length of output: 45


🌐 Web query:

PyIceberg load_catalog REST catalog required uri configuration

💡 Result:

For a PyIceberg REST catalog, load_catalog(...) requires you to provide the catalog uri (the base URL of the Iceberg REST server). If uri is missing, RestCatalog reads it from properties["uri"] and will fail during initialization. [2]

Option A: .pyiceberg.yaml

catalog:
  prod:
    type: rest
    uri: http://rest-catalog/ws/
    # optional (auth), depending on your server:
    # credential: client_id:client_secret
    # token: <bearer token>
    # warehouse: <warehouse identifier>

[1][3]

Then:

from pyiceberg.catalog import load_catalog
catalog = load_catalog("prod")

[3][4]

Option B: Environment variable

export PYICEBERG_CATALOG__PROD__URI="http://rest-catalog/ws/"

(then load_catalog("prod")). [1]

Option C: Pass properties directly

from pyiceberg.catalog import load_catalog
catalog = load_catalog("prod", uri="http://127.0.0.1:8181")

[3][4]

Notes

  • type: rest is supported (and is the default catalog type), but uri is the key required setting for REST. [1][2]
  • The docs commonly show REST URIs ending with /ws/ (match your server’s configured base path). [1][3]

🌐 Web query:

PyIceberg hive catalog required parameters configuration

💡 Result:

For PyIceberg’s HiveCatalog, the required/commonly-needed configuration depends on what you’re doing:

Required (to initialize/connect)

Example (.pyiceberg.yaml):

catalog:
  hive:
    type: hive
    uri: thrift://hive-metastore:9083

(py.iceberg.apache.org)

Required if you want default table locations (i.e., you don’t pass location= when creating tables)

You must have either:

  • a catalog-level warehouse property, or
  • a namespace/database location property in the metastore

Otherwise PyIceberg raises: “No default path is set, please specify a location when creating a table”. (py.iceberg.apache.org)

Example with warehouse:

catalog:
  hive:
    type: hive
    uri: thrift://hive-metastore:9083
    warehouse: s3://my-bucket/warehouse

(py.iceberg.apache.org)

Optional Hive-specific properties

  • hive.hive2-compatible (needed when using Hive 2.x)
  • hive.kerberos-authentication
  • hive.kerberos-service-name
  • ugi (py.iceberg.apache.org)

Citations:


Catch missing catalog_uri for REST/Hive catalogs at validation time?

Right now, when catalog_uri is missing for REST or Hive catalogs, _get_catalog_uri() silently falls back to a SQLite URI, which will fail later during catalog initialization since PyIceberg requires a proper uri for those catalog types. Would it make sense to add early validation in the model validator to catch this upfront with a clear error message?

🛠️ Possible validation addition
 `@model_validator`(mode="after")
 def _validate_typing_config(self) -> IcebergConfig:
     """Validate that typing configuration options are consistent."""
     if self.object_typing == ObjectTypingMode.AS_JSON_STRINGS:
         # additional_properties and anyof_properties are ignored in this mode
         # but we don't need to warn - they just have no effect
         pass
+    if self.catalog_type in {"rest", "hive"} and not self.catalog_uri:
+        raise PyAirbyteInputError(
+            message="catalog_uri is required for REST/Hive Iceberg catalogs.",
+            context={"catalog_type": self.catalog_type},
+        )
     return self
🤖 Prompt for AI Agents
In `@airbyte/_processors/sql/iceberg.py` around lines 458 - 543, The code silently
falls back to a SQLite URI in _get_catalog_uri when catalog_uri is missing,
causing later failures for REST/Hive catalogs; add an early validation (e.g., a
pydantic validator or a model validate/__post_init__ hook) that checks
catalog_type and ensures catalog_uri is provided when catalog_type is "rest" or
"hive" (or any non-"sql"/non-"glue" types that require a URI) and raise a clear
ValueError mentioning catalog_type and missing catalog_uri; reference the fields
catalog_type, catalog_uri and the methods _get_catalog_uri and get_catalog so
the validator prevents calling get_catalog with an invalid config.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant