feat(caches): Add PyIceberg-based cache for Apache Iceberg format#970
feat(caches): Add PyIceberg-based cache for Apache Iceberg format#970aaronsteers wants to merge 23 commits intomainfrom
Conversation
This adds a new IcebergCache that stores data in Apache Iceberg format using PyIceberg. Features: - IcebergConfig for configuring Iceberg catalogs (SQLite for local, REST for production) - IcebergProcessor for writing data to Iceberg tables - IcebergTypeConverter for JSON schema to Iceberg type conversion - Support for multiple catalog types: sql, rest, glue, hive - Direct writes to final tables (Iceberg handles transactions natively) - Methods for accessing data: get_pandas_dataframe, get_arrow_dataset, get_iceberg_table The implementation follows the existing cache patterns while adapting to Iceberg's native transaction handling and Parquet-based storage. Co-Authored-By: AJ Steers <aj@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653838-iceberg-cache' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653838-iceberg-cache'PR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful ResourcesCommunity SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
📝 WalkthroughWalkthroughAdds an Iceberg-backed cache: new Changes
Sequence Diagram(s)sequenceDiagram
participant User as User
participant Cache as IcebergCache
participant Proc as IcebergProcessor
participant Catalog as IcebergCatalog
participant Warehouse as Warehouse
User->>Cache: get_pandas_dataframe(stream_name)
Cache->>Proc: resolve SQL table name
Proc-->>Cache: table_name
Cache->>Catalog: load_table(namespace, table_name)
Catalog->>Warehouse: fetch metadata & data files
Warehouse-->>Catalog: metadata + data
Catalog-->>Cache: Table object
Cache->>Catalog: scan().to_pandas()
Catalog->>Warehouse: read data
Warehouse-->>Catalog: data
Catalog-->>User: pandas.DataFrame
sequenceDiagram
participant User as User
participant Proc as IcebergProcessor
participant TypeConv as IcebergTypeConverter
participant Writer as JsonlWriter
participant Catalog as IcebergCatalog
participant Warehouse as Warehouse
User->>Proc: write cache(files, stream_name, batch_id)
Proc->>TypeConv: json_schema_to_iceberg_type(schema)
TypeConv-->>Proc: Iceberg schema
Proc->>Catalog: get_or_create_table(namespace, name)
Catalog->>Warehouse: prepare table
Catalog-->>Proc: Table object
Proc->>Writer: read & normalize JSONL files
Writer-->>Proc: normalized Arrow batches
Proc->>Catalog: append(PyArrow batches)
Catalog->>Warehouse: write/commit data files
Warehouse-->>Catalog: commit complete
Catalog-->>User: write complete
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 160-175: get_sql_alchemy_url currently returns the constant
"iceberg://" for non-SQL catalogs which causes SqlConfig.config_hash collisions
across different Iceberg catalog types; change get_sql_alchemy_url to return a
placeholder that includes unique catalog attributes (e.g., incorporate
self.catalog_type and a catalog identifier such as self._get_catalog_uri()
and/or self.warehouse) so each catalog produces a distinct URL string, or
alternatively override SqlConfig.config_hash to incorporate those same unique
fields; update the get_sql_alchemy_url method (and/or implement a config_hash
override) to embed catalog_type/uri/warehouse into the returned SecretString to
avoid collisions.
- Around line 115-121: The current model_post_init always overwrites schema_name
with namespace which drops legacy schema_name-only inputs; update
model_post_init to inspect the pydantic model's __fields_set__ (or
_pydantic_fields_set) to detect which of namespace or schema_name were
explicitly provided: if both provided and values differ, raise a ValueError
describing the mismatch; if only namespace provided, set schema_name =
namespace; if only schema_name provided, set namespace = schema_name; if neither
provided do nothing. Apply these checks inside model_post_init (referencing
self.namespace and self.schema_name) and continue using setattr to mutate the
frozen model.
- Around line 321-373: The issue is that nested object/array columns remain as
Python dict/list and cause PyArrow to infer struct/list types that conflict with
Iceberg's StringType; in _write_files_to_new_table detect columns in combined_df
that contain dict/list (or mixed types with dict/list), JSON-serialize those
values to strings (e.g., via json.dumps applied elementwise or using pandas'
.applymap/.astype after handling NaNs) so the DataFrame columns are plain
strings before calling pa.Table.from_pandas, and then call
pa.Table.from_pandas(combined_df, preserve_index=False) as before; reference
combined_df, expected_columns, iceberg_table.schema(), and pa.Table.from_pandas
when making the change.
- Around line 412-439: The current no-op hooks cause deletion of the real final
table because _write_files_to_new_table writes directly to the final table; fix
by making Iceberg explicitly protect the final table: update
_write_files_to_new_table to return a distinct temp identifier or set a flag
when it wrote directly to final, then implement
_append_temp_table_to_final_table and _swap_temp_table_with_final_table in
icebergs' class to check that temp_table_name != final_table_name (or check the
flag) and if they are equal simply log/return without performing any
drop/replace, and additionally override _drop_temp_table as a true no-op when
the temp identifier equals the final table (or when the flag is set) to avoid
deleting the final table; use the methods named _write_files_to_new_table,
_append_temp_table_to_final_table, _swap_temp_table_with_final_table, and
_drop_temp_table to locate and implement these checks.
- Fix model_post_init to properly handle both namespace and schema_name inputs - Fix get_sql_alchemy_url to include unique attributes for non-SQL catalogs - Add JSON serialization for dict/list columns before Arrow conversion Co-Authored-By: AJ Steers <aj@airbyte.io>
Since Iceberg writes directly to the final table (not a temp table), the base class may call _drop_temp_table with the final table name. Making this a no-op prevents accidental deletion of data. Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 241-251: SqlProcessorBase builds self.type_converter from
type_converter_class but this Iceberg processor only annotates type_converter as
IcebergTypeConverter without changing type_converter_class, so the base __init__
will still instantiate SQLTypeConverter and later code (e.g., where
IcebergTypeConverter-specific methods are expected) will fail; set
type_converter_class = IcebergTypeConverter on the Iceberg processor class (or
alternatively override __init__ to instantiate IcebergTypeConverter and assign
to self.type_converter) so the correct converter type is created (refer to the
class attribute type_converter_class and the instance attribute type_converter).
Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 313-342: The current flow returns an existing table from
_get_or_create_iceberg_table and later _write_files_to_new_table blindly drops
incoming columns not present in iceberg_table.schema(), causing schema evolution
to be lost; update the table schema instead of dropping columns by detecting
missing columns (compare combined_df.columns or _get_iceberg_schema(stream_name)
against iceberg_table.schema().fields) and calling Iceberg's schema-evolution
API to add those columns (e.g., use the table/catalog schema update methods to
add_column for each new field and commit the change) before writing; implement
this either inside _get_or_create_iceberg_table (after load_table) or
immediately before the drop logic in _write_files_to_new_table so new fields are
appended to the Iceberg table schema using the real column names/types from
_get_iceberg_schema or the dataframe.
- Around line 446-474: The current Iceberg processor appends via
_write_files_to_new_table (uses iceberg_table.append()) and leaves
_swap_temp_table_with_final_table as a no-op, so REPLACE writes never remove old
data; override _write_temp_table_to_final_table to detect the REPLACE
write_strategy and perform an Iceberg overwrite (use the table overwrite API
instead of iceberg_table.append()) or implement
_swap_temp_table_with_final_table to call the overwrite operation during
REPLACE, ensuring the method names _write_files_to_new_table,
_write_temp_table_to_final_table, and _swap_temp_table_with_final_table are
updated to invoke the overwrite path when write_strategy ==
WriteDisposition.REPLACE (or equivalent) so old data is removed.
| @overrides | ||
| def _ensure_schema_exists(self) -> None: | ||
| """Ensure the Iceberg namespace exists.""" | ||
| with contextlib.suppress(NamespaceAlreadyExistsError): | ||
| self.catalog.create_namespace(self.namespace) | ||
|
|
||
| def _table_exists_iceberg(self, table_name: str) -> bool: | ||
| """Check if an Iceberg table exists.""" | ||
| try: | ||
| self.catalog.load_table(self._get_table_identifier(table_name)) | ||
| except NoSuchTableError: | ||
| return False | ||
| else: | ||
| return True | ||
|
|
||
| def _create_iceberg_table(self, stream_name: str, table_name: str) -> Table: | ||
| """Create an Iceberg table for the given stream.""" | ||
| schema = self._get_iceberg_schema(stream_name) | ||
| return self.catalog.create_table( | ||
| identifier=self._get_table_identifier(table_name), | ||
| schema=schema, | ||
| ) | ||
|
|
||
| def _get_or_create_iceberg_table(self, stream_name: str, table_name: str) -> Table: | ||
| """Get an existing Iceberg table or create a new one.""" | ||
| try: | ||
| return self.catalog.load_table(self._get_table_identifier(table_name)) | ||
| except NoSuchTableError: | ||
| return self._create_iceberg_table(stream_name, table_name) | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's find the file and understand its structure
find . -name "iceberg.py" -path "*/sql/*" | head -5Repository: airbytehq/PyAirbyte
Length of output: 99
🏁 Script executed:
# Get the full context of the iceberg.py file to see related methods
wc -l airbyte/_processors/sql/iceberg.pyRepository: airbytehq/PyAirbyte
Length of output: 101
🏁 Script executed:
# Search for _write_files_to_new_table method to understand column handling
rg "_write_files_to_new_table" -A 30 airbyte/_processors/sql/iceberg.pyRepository: airbytehq/PyAirbyte
Length of output: 2146
🏁 Script executed:
# Search for schema-related methods and evolution patterns
rg "schema|evolve|update_schema|add_column" -i airbyte/_processors/sql/iceberg.py | head -20Repository: airbytehq/PyAirbyte
Length of output: 1255
🏁 Script executed:
# Get the full _write_files_to_new_table method
rg "_write_files_to_new_table" -A 100 airbyte/_processors/sql/iceberg.py | head -150Repository: airbytehq/PyAirbyte
Length of output: 4833
🏁 Script executed:
# Look for any schema update or column handling logic in the entire file
rg "update_schema|add_column|append|merge|overwrite" -i airbyte/_processors/sql/iceberg.pyRepository: airbytehq/PyAirbyte
Length of output: 581
🏁 Script executed:
# Check the get_sql_table_name and related table schema methods
rg "def _get_iceberg_schema|def get_sql_table_name" -A 15 airbyte/_processors/sql/iceberg.pyRepository: airbytehq/PyAirbyte
Length of output: 793
🏁 Script executed:
# Check imports to see what Iceberg APIs are available
head -50 airbyte/_processors/sql/iceberg.py | grep -E "^from pyiceberg|^import pyiceberg"Repository: airbytehq/PyAirbyte
Length of output: 317
🏁 Script executed:
# Search for any test files that might show expected behavior
find . -name "*iceberg*test*" -o -name "*test*iceberg*" | head -10Repository: airbytehq/PyAirbyte
Length of output: 103
🏁 Script executed:
# Check if there are any references to schema evolution in comments or docstrings
rg "schema.*evolv|evolv.*schema" -i airbyte/_processors/sql/iceberg.pyRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check the test file for context on expected behavior
cat -n ./tests/unit_tests/test_iceberg_cache.py | head -100Repository: airbytehq/PyAirbyte
Length of output: 4033
🏁 Script executed:
# Search in the entire codebase for any schema update/evolution patterns with pyiceberg
rg "table.update_schema|table.add_column|catalog.*schema" airbyte/_processors/sql/iceberg.pyRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check the full context around _get_or_create_iceberg_table to understand the flow
rg "_get_or_create_iceberg_table|_create_iceberg_table" -B 5 -A 20 airbyte/_processors/sql/iceberg.py | head -100Repository: airbytehq/PyAirbyte
Length of output: 3095
Schema evolution needed for Iceberg table updates
When a table already exists, _get_or_create_iceberg_table returns it unchanged, but _write_files_to_new_table then silently drops any columns from the incoming data that aren't in the existing table schema:
expected_columns = {field.name for field in iceberg_table.schema().fields}
columns_to_drop = [col for col in combined_df.columns if col not in expected_columns]
if columns_to_drop:
combined_df = combined_df.drop(columns=columns_to_drop)If the source schema evolves and adds new fields, they'll be lost during write. Should we call Iceberg's schema update API to add missing columns to the table when the incoming data contains new fields, wdyt?
🤖 Prompt for AI Agents
In `@airbyte/_processors/sql/iceberg.py` around lines 313 - 342, The current flow
returns an existing table from _get_or_create_iceberg_table and later
_write_files_to_new_table blindly drops incoming columns not present in
iceberg_table.schema(), causing schema evolution to be lost; update the table
schema instead of dropping columns by detecting missing columns (compare
combined_df.columns or _get_iceberg_schema(stream_name) against
iceberg_table.schema().fields) and calling Iceberg's schema-evolution API to add
those columns (e.g., use the table/catalog schema update methods to add_column
for each new field and commit the change) before writing; implement this either
inside _get_or_create_iceberg_table (after load_table) or immediately before the
drop logic in _write_files_to_new_table so new fields are appended to the
Iceberg table schema using the real column names/types from _get_iceberg_schema
or the dataframe.
- Add run_iceberg_cache.py example using source-spacex-api - Fix IcebergTypeConverter to include to_sql_type method for interface compatibility - Override _ensure_compatible_table_schema and _emulated_merge_temp_table_to_final_table - Demonstrate reading data via pandas DataFrame, Arrow dataset, and Iceberg table Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-Authored-By: AJ Steers <aj@airbyte.io>
…erter Co-Authored-By: AJ Steers <aj@airbyte.io>
| def get_records( | ||
| self, | ||
| stream_name: str, | ||
| ) -> CachedDataset: | ||
| """Get records from the cache as a CachedDataset. | ||
|
|
||
| Note: For Iceberg caches, this returns a CachedDataset that wraps | ||
| the Iceberg table. For more efficient access to Iceberg-specific | ||
| features, use get_iceberg_table() or get_pandas_dataframe() directly. | ||
| """ | ||
| return CachedDataset(self, stream_name) |
There was a problem hiding this comment.
🔴 CachedDataset iteration fails because Iceberg data is not in SQLite
The get_records() method returns a CachedDataset that will fail when iterating or getting length.
Click to expand
Root Cause
IcebergCache.get_records() at airbyte/caches/iceberg.py:163-173 returns a CachedDataset. The CachedDataset class at airbyte/datasets/_sql.py:89-94 uses SQLAlchemy to iterate:
def __iter__(self) -> Iterator[dict[str, Any]]:
with self._cache.processor.get_sql_connection() as conn:
for row in conn.execute(self._query_statement):
yield cast("dict[str, Any]", row._mapping)The query select("*").select_from(text(f"{schema_name}.{table_name}")) queries the SQLite catalog database, but Iceberg data is stored in Parquet files, not in SQLite. The SQLite database only contains Iceberg metadata, not the actual data rows.
Impact
for record in cache.get_records("stream")will return no records or faillen(cache.get_records("stream"))will return 0 or failcache.get_records("stream").to_sql_table()will fail
Note: to_pandas() and to_arrow() work because they call the overridden methods in IcebergCache.
Recommendation: Override get_records() to return a custom dataset class that reads from Iceberg tables instead of using SQLAlchemy queries, or implement custom __iter__ and __len__ methods that use get_iceberg_table().scan().to_arrow().
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
This is a known limitation of the current implementation. The get_records() method returns a CachedDataset for API compatibility, but as noted in the docstring, users should prefer the Iceberg-specific methods for efficient access:
get_pandas_dataframe()- reads directly from Iceberg tablesget_arrow_dataset()- provides efficient Arrow-based accessget_iceberg_table()- gives direct access to the PyIceberg Table object
The example script demonstrates the recommended pattern using get_arrow_dataset() for iteration.
A future enhancement could implement a custom IcebergDataset class that properly iterates over Iceberg tables, but for the initial implementation, the Iceberg-specific methods provide the recommended access patterns.
Co-Authored-By: AJ Steers <aj@airbyte.io>
…rgCache - Add ObjectTypingMode enum with 'nested_types' and 'as_json_strings' modes - Add AdditionalPropertiesMode enum with 'fail', 'ignore', 'stringify' options - Add AnyOfPropertiesMode enum with 'fail', 'branch', 'stringify' options - Add helper functions for JSON column naming (get_json_column_name, get_additional_properties_column_name) - Implement __json suffix convention for stringified columns - Add special handling for _airbyte_meta (always StructType, never stringified) - Refactor IcebergTypeConverter to use new configuration modes - Update IcebergProcessor to pass new config to type converter - Arrays are always strongly typed (ListType) regardless of object_typing mode Co-Authored-By: AJ Steers <aj@airbyte.io>
…chemas Use type_converter.next_field_id() for all fields (top-level and nested) to ensure unique field IDs across the entire Iceberg schema. Previously, top-level fields used a local counter starting at 1 while nested fields used the class-level counter, which could cause duplicate field IDs. Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 933-947: The code path in _write_files_to_new_table when
object_typing == ObjectTypingMode.NESTED_TYPES currently leaves dict/list
columns as-is and ignores schema-builder-generated placeholder columns like
__additional_properties__json and anyOf branch fields (e.g., field_str_val,
field_int_val), causing dropped data; update the logic in the block that
processes combined_df (the loop over combined_df.columns) to detect those
placeholder columns and any dict/list source columns and: (1) for
__additional_properties__json columns, collect keys present in the source dicts
that are not mapped to explicit schema fields, serialize those extra key/value
pairs into the __additional_properties__json column per-row; (2) for anyOf
branch fields (the branch placeholder columns created by the schema builder),
inspect the source dicts and populate the correct branch column(s) (e.g., set
field_str_val when the dict contains a string branch) instead of relying on
PyArrow coercion; and (3) if a configured fail/strict mode exists, raise a clear
error when unmapped branches/additional properties are found; implement these
steps near combined_df handling so schema placeholder columns are filled before
writing.
The nested_types mode requires exact schema matching which is complex for real-world data. The as_json_strings mode (matching Kotlin destination default behavior) is safer and works with any data structure. Co-Authored-By: AJ Steers <aj@airbyte.io>
When object_typing=nested_types, columns with StringType in the Iceberg schema (e.g., schemaless arrays without items definition) but containing Python dict/list data now get serialized to JSON strings. This is a partial fix - full nested_types support requires additional work to handle null values for struct columns and Python dict to PyArrow struct conversion. Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-Authored-By: AJ Steers <aj@airbyte.io>
- Parse ISO 8601 timestamp strings to datetime objects for Arrow timestamp types - Handle timezone-aware and naive timestamps (default to UTC for naive) - Parse date strings using fromisoformat - Remove pandas dependency from data processing pipeline - All type conversions now work correctly with nested_types mode Co-Authored-By: AJ Steers <aj@airbyte.io>
Addresses Devin Review feedback about class-level _field_id_counter causing global state pollution across all schemas. The counter is now an instance variable, ensuring field IDs are isolated between different type converter instances. Changes: - Move _field_id_counter from class variable to instance variable - Convert next_field_id() from classmethod to instance method - Add reset_field_id_counter() method for explicit counter reset - Convert json_schema_to_iceberg_type, _handle_anyof_types, and _convert_object_to_struct from classmethods to instance methods - Update all cls. references to self. Co-Authored-By: AJ Steers <aj@airbyte.io>
…_type
Addresses Devin Review feedback about bool() incorrectly converting string
'false' to True. Now explicitly handles string representations of booleans
by checking if the lowercase value is in {'true', '1', 'yes'}.
Co-Authored-By: AJ Steers <aj@airbyte.io>
The None check inside the boolean handling block was unreachable because we already check for None at the start of convert_value_to_arrow_type. Co-Authored-By: AJ Steers <aj@airbyte.io>
Call reset_field_id_counter() at the beginning of _get_iceberg_schema() to ensure each table's schema has field IDs starting from 1, rather than continuing to increment across multiple stream schema generations. Co-Authored-By: AJ Steers <aj@airbyte.io>
| for field in arrow_schema: | ||
| field_name = field.name | ||
| value = normalized_record.get(field_name) | ||
| result[field_name] = convert_value_to_arrow_type(value, field.type) |
There was a problem hiding this comment.
🔴 AdditionalPropertiesMode.FAIL silently drops properties instead of failing
The additional_properties configuration mode FAIL is documented as "Fail if we encounter an undeclared property at runtime" but the implementation silently drops additional properties instead of raising an error.
Click to expand
How it happens
In convert_record_to_arrow_schema at lines 289-292, the function only processes fields that exist in the Arrow schema:
for field in arrow_schema:
field_name = field.name
value = normalized_record.get(field_name)
result[field_name] = convert_value_to_arrow_type(value, field.type)When additional_properties=FAIL and the JSON schema has additionalProperties: true, the Iceberg schema is created without a placeholder column for additional properties (airbyte/_processors/sql/iceberg.py:814-816). At runtime, any properties not in the schema are simply ignored by the loop above.
Actual vs Expected
- Expected: When mode is
FAILand additional properties are present in the data, aPyAirbyteInputErrorshould be raised - Actual: Additional properties are silently dropped, making
FAILmode behave identically toIGNOREmode
Impact
Users who configure additional_properties="fail" expecting strict validation will have data silently dropped without any warning or error.
Recommendation: Add a check in convert_record_to_arrow_schema to compare record keys against schema fields, and raise an error when additional_properties mode is FAIL and undeclared properties are found in the data.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Valid observation. The additional_properties=FAIL mode is designed to fail when undeclared properties are encountered at runtime (per the design discussion), but this runtime validation is not yet implemented in convert_record_to_arrow_schema.
This is a known limitation of the current implementation - the nested_types mode with strict validation is still being developed. For now, the as_json_strings mode is the recommended default for production use.
Will address this in a follow-up once the core functionality is stabilized.
| def _ensure_compatible_table_schema( | ||
| self, | ||
| stream_name: str, | ||
| table_name: str, | ||
| ) -> None: | ||
| """Ensure the Iceberg table schema is compatible with the stream schema. | ||
|
|
||
| For Iceberg, we handle schema evolution differently than SQL databases. | ||
| Iceberg has native schema evolution support, so we don't need to add | ||
| columns via ALTER TABLE. The schema is managed when creating the table. | ||
| """ | ||
| # Iceberg handles schema evolution natively - no action needed here | ||
| # The table schema is set when the table is created in _create_iceberg_table | ||
| pass |
There was a problem hiding this comment.
🔴 Schema evolution not implemented - new columns silently dropped
The _ensure_compatible_table_schema method is a no-op that claims Iceberg handles schema evolution natively, but Iceberg actually requires explicit API calls to add new columns. This causes data loss when source schemas evolve.
Click to expand
How it happens
At airbyte/_processors/sql/iceberg.py:1117-1130, the method is implemented as:
def _ensure_compatible_table_schema(
self,
stream_name: str,
table_name: str,
) -> None:
# Iceberg handles schema evolution natively - no action needed here
passWhen a source's JSON schema changes to include new columns:
_get_or_create_iceberg_tableloads the existing table with the old schemaarrow_schema = iceberg_table.schema().as_arrow()gets the old schemaconvert_record_to_arrow_schemaonly processes fields in the old schema (line 289-292)- New columns in the data are silently dropped
Actual vs Expected
- Expected: New columns in source data should either be added to the Iceberg table schema or raise an error
- Actual: New columns are silently dropped because the schema conversion only includes fields from the existing table schema
Impact
When source schemas evolve (new fields added), all data for those new fields is permanently lost. This is especially problematic for incremental syncs where data cannot be easily re-synced.
Recommendation: Implement actual schema evolution by comparing the stream's JSON schema against the existing Iceberg table schema, then calling iceberg_table.update_schema().add_column(...).commit() for any new columns before writing data.
Was this helpful? React with 👍 or 👎 to provide feedback.
…rings Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@airbyte/caches/iceberg.py`:
- Around line 128-149: get_arrow_dataset currently accepts max_chunk_size but
ignores it (arrow_table is fully in-memory and ds.dataset(arrow_table) does not
chunk), so update the get_arrow_dataset docstring to state that max_chunk_size
is accepted for API compatibility but is ignored for in-memory Arrow tables and
explain why (PyArrow dataset built from pa.Table is not chunked) and suggest the
alternative (load Parquet files via ds.dataset on file paths or use
iceberg_table.scan(...).to_batches()/to_table with batching) so callers know how
to get chunked reads; reference the get_arrow_dataset function, the
max_chunk_size parameter, arrow_table variable, and the ds.dataset(arrow_table)
call in the docstring.
- Around line 174-184: Update the get_records docstring in the get_records
method to explicitly state that the returned CachedDataset wraps an Iceberg
table and that CachedDataset.to_sql_table() is not supported for Iceberg caches
(especially with REST catalogs/non-SQLAlchemy URLs); recommend using
get_iceberg_table(), get_pandas_dataframe(), or to_arrow() instead for direct
access to Iceberg-specific features. Reference the CachedDataset class and the
to_sql_table(), get_iceberg_table(), get_pandas_dataframe(), and to_arrow()
methods in the message so readers can find the relevant APIs.
🧹 Nitpick comments (1)
airbyte/_processors/sql/iceberg.py (1)
452-459: Consider removing or implementing the empty validator, wdyt?The
_validate_typing_configmethod currently does nothing exceptpass. If there's planned validation logic, perhaps add a TODO comment. Otherwise, removing it would reduce code noise.🧹 Proposed cleanup
- `@model_validator`(mode="after") - def _validate_typing_config(self) -> IcebergConfig: - """Validate that typing configuration options are consistent.""" - if self.object_typing == ObjectTypingMode.AS_JSON_STRINGS: - # additional_properties and anyof_properties are ignored in this mode - # but we don't need to warn - they just have no effect - pass - return self
| def get_arrow_dataset( | ||
| self, | ||
| stream_name: str, | ||
| *, | ||
| max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, # noqa: ARG002 | ||
| ) -> ds.Dataset: | ||
| """Return an Arrow Dataset with the stream's data. | ||
|
|
||
| This method provides efficient access to the underlying Parquet files | ||
| through PyArrow's dataset interface. | ||
| """ | ||
| table_name = self._read_processor.get_sql_table_name(stream_name) | ||
| catalog = self.get_catalog() | ||
|
|
||
| try: | ||
| iceberg_table = catalog.load_table((self.namespace, table_name)) | ||
| except NoSuchTableError as e: | ||
| raise KeyError(f"Stream '{stream_name}' not found in cache") from e | ||
|
|
||
| # Get the Arrow table from Iceberg and convert to dataset | ||
| arrow_table: pa.Table = iceberg_table.scan().to_arrow() | ||
| return ds.dataset(arrow_table) |
There was a problem hiding this comment.
The max_chunk_size parameter is accepted but ignored.
Users calling cache.get_arrow_dataset(stream_name, max_chunk_size=1000) might expect chunking behavior, but the parameter has no effect since ds.dataset(arrow_table) creates an in-memory dataset. Could we add a note in the docstring about this limitation, wdyt?
📝 Suggested docstring update
def get_arrow_dataset(
self,
stream_name: str,
*,
max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, # noqa: ARG002
) -> ds.Dataset:
"""Return an Arrow Dataset with the stream's data.
This method provides efficient access to the underlying Parquet files
through PyArrow's dataset interface.
+
+ Note:
+ The max_chunk_size parameter is accepted for interface compatibility
+ but does not affect chunking behavior for Iceberg-backed datasets.
"""🤖 Prompt for AI Agents
In `@airbyte/caches/iceberg.py` around lines 128 - 149, get_arrow_dataset
currently accepts max_chunk_size but ignores it (arrow_table is fully in-memory
and ds.dataset(arrow_table) does not chunk), so update the get_arrow_dataset
docstring to state that max_chunk_size is accepted for API compatibility but is
ignored for in-memory Arrow tables and explain why (PyArrow dataset built from
pa.Table is not chunked) and suggest the alternative (load Parquet files via
ds.dataset on file paths or use iceberg_table.scan(...).to_batches()/to_table
with batching) so callers know how to get chunked reads; reference the
get_arrow_dataset function, the max_chunk_size parameter, arrow_table variable,
and the ds.dataset(arrow_table) call in the docstring.
There was a problem hiding this comment.
Good suggestion. I can add a note to the docstring clarifying that max_chunk_size is accepted for interface compatibility but doesn't affect chunking for Iceberg-backed datasets. Will defer to @aaronsteers on whether to address this now or in a follow-up.
| def get_records( | ||
| self, | ||
| stream_name: str, | ||
| ) -> CachedDataset: | ||
| """Get records from the cache as a CachedDataset. | ||
|
|
||
| Note: For Iceberg caches, this returns a CachedDataset that wraps | ||
| the Iceberg table. For more efficient access to Iceberg-specific | ||
| features, use get_iceberg_table() or get_pandas_dataframe() directly. | ||
| """ | ||
| return CachedDataset(self, stream_name) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, find the IcebergProcessor and check for get_sql_table method
echo "=== Checking IcebergProcessor for get_sql_table ==="
rg -n "class IcebergProcessor" -A 20 airbyte/_processors/Repository: airbytehq/PyAirbyte
Length of output: 1664
🏁 Script executed:
#!/bin/bash
# Search for get_sql_table method definitions
echo "=== Searching for get_sql_table method ==="
rg -n "def get_sql_table" airbyte/ -B 2 -A 10Repository: airbytehq/PyAirbyte
Length of output: 2892
🏁 Script executed:
#!/bin/bash
# Check CachedDataset implementation and to_sql_table
echo "=== Checking CachedDataset.to_sql_table ==="
rg -n "def to_sql_table" airbyte/ -B 2 -A 15Repository: airbytehq/PyAirbyte
Length of output: 1522
🏁 Script executed:
#!/bin/bash
# Check if IcebergCache has any override for get_records
echo "=== Checking IcebergCache class structure ==="
rg -n "class IcebergCache" -A 50 airbyte/caches/Repository: airbytehq/PyAirbyte
Length of output: 3579
🏁 Script executed:
#!/bin/bash
# Check _get_table_by_name implementation
echo "=== Checking _get_table_by_name in SqlProcessorBase ==="
rg -n "def _get_table_by_name" airbyte/ -B 2 -A 15Repository: airbytehq/PyAirbyte
Length of output: 1423
🏁 Script executed:
#!/bin/bash
# Check if IcebergProcessor overrides _get_table_by_name
echo "=== Checking IcebergProcessor for _get_table_by_name override ==="
rg -n "_get_table_by_name" airbyte/_processors/sql/iceberg.py -B 2 -A 10Repository: airbytehq/PyAirbyte
Length of output: 128
🏁 Script executed:
#!/bin/bash
# Check tests or usage of to_sql_table with IcebergCache
echo "=== Checking for to_sql_table usage with Iceberg ==="
rg -n "to_sql_table" tests/ -B 2 -A 5Repository: airbytehq/PyAirbyte
Length of output: 2591
🏁 Script executed:
#!/bin/bash
# Get more of the _get_table_by_name implementation
echo "=== Full _get_table_by_name implementation ==="
rg -n "def _get_table_by_name" airbyte/shared/sql_processor.py -A 50 | head -70Repository: airbytehq/PyAirbyte
Length of output: 2358
🏁 Script executed:
#!/bin/bash
# Check if test_all_cache_types uses IcebergCache
echo "=== Check if test uses IcebergCache ==="
rg -n "IcebergCache" tests/integration_tests/test_all_cache_types.pyRepository: airbytehq/PyAirbyte
Length of output: 102
🏁 Script executed:
#!/bin/bash
# Check what get_sql_engine returns for IcebergProcessor
echo "=== Checking IcebergProcessor.get_sql_engine ==="
rg -n "def get_sql_engine" airbyte/_processors/sql/iceberg.py -A 20Repository: airbytehq/PyAirbyte
Length of output: 111
🏁 Script executed:
#!/bin/bash
# Check if IcebergProcessor has a get_sql_engine method or inherits it
echo "=== Checking SqlProcessorBase.get_sql_engine ==="
rg -n "def get_sql_engine" airbyte/shared/sql_processor.py -A 10Repository: airbytehq/PyAirbyte
Length of output: 1161
🏁 Script executed:
#!/bin/bash
# Check IcebergProcessor sql_config and connection type
echo "=== Checking IcebergConfig and sql_config ==="
rg -n "class IcebergConfig" airbyte/ -A 30 | head -50Repository: airbytehq/PyAirbyte
Length of output: 2449
🏁 Script executed:
#!/bin/bash
# Check IcebergConfig's get_sql_alchemy_url and get_sql_engine methods
echo "=== Checking IcebergConfig methods ==="
rg -n "def get_sql_alchemy_url|def get_sql_engine" airbyte/_processors/sql/iceberg.py -A 10Repository: airbytehq/PyAirbyte
Length of output: 694
🏁 Script executed:
#!/bin/bash
# Check SqlConfig to understand the inheritance
echo "=== Checking SqlConfig class ==="
rg -n "class SqlConfig" airbyte/shared/ -A 40 | head -60Repository: airbytehq/PyAirbyte
Length of output: 2864
🏁 Script executed:
#!/bin/bash
# Check if IcebergProcessor overrides any critical methods
echo "=== Full IcebergProcessor class methods ==="
rg -n " def " airbyte/_processors/sql/iceberg.py | grep -E "(get_sql|_get_table)"Repository: airbytehq/PyAirbyte
Length of output: 233
🏁 Script executed:
#!/bin/bash
# Check if there are any Iceberg-specific tests for to_sql_table
echo "=== Searching for Iceberg tests ==="
find tests/ -name "*iceberg*" -type fRepository: airbytehq/PyAirbyte
Length of output: 137
🏁 Script executed:
#!/bin/bash
# Look for any tests that call to_sql_table on Iceberg
echo "=== Searching for to_sql_table in Iceberg tests ==="
rg -n "to_sql_table" tests/ --glob="*iceberg*"Repository: airbytehq/PyAirbyte
Length of output: 114
🏁 Script executed:
#!/bin/bash
# Check the full IcebergConfig.get_sql_alchemy_url to understand the returned URL
echo "=== Full get_sql_alchemy_url implementation ==="
rg -n "def get_sql_alchemy_url" airbyte/_processors/sql/iceberg.py -A 30Repository: airbytehq/PyAirbyte
Length of output: 1619
🏁 Script executed:
#!/bin/bash
# Check the Iceberg test file
echo "=== Checking IcebergCache tests ==="
cat tests/unit_tests/test_iceberg_cache.py | head -100Repository: airbytehq/PyAirbyte
Length of output: 3418
🏁 Script executed:
#!/bin/bash
# Check if there are any integration tests that use get_sql_table with Iceberg
echo "=== Check all integration tests for IcebergCache usage ==="
rg -n "IcebergCache" tests/integration_tests/ -B 2 -A 5Repository: airbytehq/PyAirbyte
Length of output: 121
🏁 Script executed:
#!/bin/bash
# Check what happens when we try to use a non-SQL catalog URL as SQLAlchemy URL
echo "=== Checking how sqlalchemy.Table is created with iceberg:// URL ==="
rg -n "autoload_with" airbyte/shared/sql_processor.py -B 5 -A 5Repository: airbytehq/PyAirbyte
Length of output: 528
🏁 Script executed:
#!/bin/bash
# Look at the full docstring of get_records in IcebergCache
echo "=== Check the full get_records docstring ==="
sed -n '174,184p' airbyte/caches/iceberg.pyRepository: airbytehq/PyAirbyte
Length of output: 536
🏁 Script executed:
#!/bin/bash
# Check if CachedDataset handles Iceberg specially
echo "=== Check CachedDataset class definition ==="
rg -n "class CachedDataset" airbyte/datasets/ -A 30Repository: airbytehq/PyAirbyte
Length of output: 2317
🏁 Script executed:
#!/bin/bash
# Final check: verify what error would occur if to_sql_table is called with REST catalog
echo "=== Verify the iceberg:// URL would fail with SQLAlchemy ==="
python3 << 'EOF'
from sqlalchemy import create_engine
# This is what would happen with a REST catalog
url = "iceberg://rest:https://example.com:warehouse_path"
try:
engine = create_engine(url)
print(f"URL '{url}' created engine successfully")
except Exception as e:
print(f"URL '{url}' failed with error: {type(e).__name__}: {e}")
# This is what works with SQL catalog
url2 = "sqlite:///test.db"
try:
engine = create_engine(url2)
print(f"URL '{url2}' created engine successfully")
except Exception as e:
print(f"URL '{url2}' failed with error: {type(e).__name__}: {e}")
EOFRepository: airbytehq/PyAirbyte
Length of output: 263
🏁 Script executed:
#!/bin/bash
# Check if there's any error handling around to_sql_table for non-SQL backends
echo "=== Check if there are any safeguards in to_sql_table ==="
sed -n '207,210p' airbyte/datasets/_sql.pyRepository: airbytehq/PyAirbyte
Length of output: 303
🏁 Script executed:
#!/bin/bash
# Check if the docstring mentions the limitation adequately
echo "=== Full get_records method context ==="
sed -n '174,184p' airbyte/caches/iceberg.pyRepository: airbytehq/PyAirbyte
Length of output: 531
🏁 Script executed:
#!/bin/bash
# Check what other cache implementations do
echo "=== Check DuckDB cache get_records ==="
rg -n "def get_records" airbyte/caches/duckdb.py -A 15Repository: airbytehq/PyAirbyte
Length of output: 101
Expand the docstring to explicitly document the to_sql_table() limitation.
The existing docstring recommends using get_iceberg_table() or get_pandas_dataframe() for efficiency, but users might not realize that to_sql_table() will fail (especially with REST catalogs using non-SQLAlchemy URLs). Consider clarifying that to_sql_table() is not supported for Iceberg caches, wdyt?
Something like:
Note: For Iceberg caches, this returns a CachedDataset that wraps
the Iceberg table. The `to_sql_table()` method is not supported—instead,
use `get_iceberg_table()`, `get_pandas_dataframe()`, or `to_arrow()` for
direct access to Iceberg-specific features.
🤖 Prompt for AI Agents
In `@airbyte/caches/iceberg.py` around lines 174 - 184, Update the get_records
docstring in the get_records method to explicitly state that the returned
CachedDataset wraps an Iceberg table and that CachedDataset.to_sql_table() is
not supported for Iceberg caches (especially with REST catalogs/non-SQLAlchemy
URLs); recommend using get_iceberg_table(), get_pandas_dataframe(), or
to_arrow() instead for direct access to Iceberg-specific features. Reference the
CachedDataset class and the to_sql_table(), get_iceberg_table(),
get_pandas_dataframe(), and to_arrow() methods in the message so readers can
find the relevant APIs.
There was a problem hiding this comment.
Valid point about the to_sql_table() limitation. I can expand the docstring to clarify that to_sql_table() is not supported for Iceberg caches (especially with REST catalogs). Will defer to @aaronsteers on whether to address this now or in a follow-up.
Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 465-472: _get_warehouse_path currently only creates directories
when warehouse_path is a Path, so string local paths never get mkdir'd; update
_get_warehouse_path to detect when warehouse_path is a str that represents a
local filesystem path (e.g., does not look like a URI/scheme such as contains
'://' or matches a known remote scheme) then convert it to a Path, call
mkdir(parents=True, exist_ok=True), and return its absolute string; keep the
existing branch that returns non-local strings unchanged and continue to handle
Path inputs the same way.
- Around line 1060-1117: The implementation of _write_files_to_new_table
accumulates all converted_records across files causing unbounded memory use;
change it to convert and build a PyArrow table per file and call
iceberg_table.append(...) for each file instead of after the loop. Concretely,
inside the for file_path in files loop, read and convert each line into a
per-file list (replace converted_records with a local per_file_records), build
arrays from arrow_schema for that file, construct a pa.Table and call
iceberg_table.append(pa_table) immediately, and only return table_name at the
end; keep the arrow_schema retrieval via iceberg_table.schema().as_arrow() and
reuse it across files. Ensure you skip appending if a file yields no records so
behavior matches the original early-return semantics.
| def _write_files_to_new_table( | ||
| self, | ||
| files: list[Path], | ||
| stream_name: str, | ||
| batch_id: str, # noqa: ARG002 # Required by base class interface | ||
| ) -> str: | ||
| """Write files to a new Iceberg table. | ||
|
|
||
| Unlike SQL databases, we write directly to the final table in Iceberg | ||
| since Iceberg handles transactions and versioning natively. | ||
|
|
||
| This method reads JSONL files directly (without pandas) and uses a | ||
| recursive converter to transform Python objects to match the Iceberg | ||
| schema. This handles: | ||
| - Null values for struct/list columns | ||
| - Python dicts -> Arrow structs | ||
| - Python lists -> Arrow lists | ||
| - Schemaless arrays/objects -> JSON strings | ||
| """ | ||
| table_name = self.get_sql_table_name(stream_name) | ||
|
|
||
| # Get or create the Iceberg table (schema is declared before parsing records) | ||
| iceberg_table = self._get_or_create_iceberg_table(stream_name, table_name) | ||
| arrow_schema = iceberg_table.schema().as_arrow() | ||
|
|
||
| # Read all JSONL files and convert records to match the Arrow schema | ||
| converted_records: list[dict[str, Any]] = [] | ||
| for file_path in files: | ||
| # Read gzip-compressed JSONL file line by line (progressive read for large files) | ||
| with gzip.open(file_path, mode="rt", encoding="utf-8") as f: | ||
| for line in f: | ||
| if not line.strip(): | ||
| continue | ||
| record = json.loads(line) | ||
| # Convert record to match the Arrow schema | ||
| converted = convert_record_to_arrow_schema( | ||
| record, arrow_schema, self.normalizer | ||
| ) | ||
| converted_records.append(converted) | ||
|
|
||
| if not converted_records: | ||
| return table_name | ||
|
|
||
| # Build PyArrow arrays for each column | ||
| arrays: list[pa.Array] = [] | ||
| for field in arrow_schema: | ||
| column_values = [record.get(field.name) for record in converted_records] | ||
| # Create array with the correct type | ||
| array = pa.array(column_values, type=field.type) | ||
| arrays.append(array) | ||
|
|
||
| # Create PyArrow table from arrays | ||
| arrow_table = pa.Table.from_arrays(arrays, schema=arrow_schema) | ||
|
|
||
| # Append to the Iceberg table | ||
| iceberg_table.append(arrow_table) | ||
|
|
||
| return table_name |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
PyArrow append batch memory best practices Iceberg
💡 Result:
PyArrow: appending batches without blowing up memory
- Prefer streaming
RecordBatches over materializing a bigTable. Use apyarrow.RecordBatchReader(or an iterator ofRecordBatches) as your pipeline boundary; only callread_all()(materializes aTable) when you truly need it. [8] - Avoid repeatedly “appending” by concatenating growing tables. Repeated
concat_tables([t, new_t])in a loop can create lots of intermediate objects; if you must concatenate, keep it zero‑copy by ensuring schemas match and leavingpromote=False. [1] - Don’t call
combine_chunks()unless required. It (logically) merges chunks and can require additional allocations via the memory pool. [6] - Write incrementally to Parquet from batches. Use
pyarrow.parquet.ParquetWriter.write_batch(...)to stream batches into row groups instead of building a huge in-memory table first; tunerow_group_sizeto your workload. [2]
Iceberg (PyIceberg): ingestion best practices with batches
- Know the current limitation: PyIceberg’s
table.append(...)API takes a PyArrowTable, so a pure “append RecordBatches directly” workflow isn’t (yet) first-class. [7] - Practical pattern for batch/stream ingestion:
- Stream your data as
RecordBatches and write Parquet incrementally withParquetWriter.write_batch(...). [2] - Commit those Parquet files into Iceberg using
table.add_files([...]), which avoids needing one giantpa.Tablein memory. [7]
- Stream your data as
- For reads (when validating writes), prefer batch streaming:
tbl.scan().to_arrow_batch_reader()returns aRecordBatchReaderspecifically to reduce memory vsto_arrow()(which eagerly loads everything). [4]
Sources
[1] Apache Arrow docs: pyarrow.concat_tables (zero-copy when promote=False) (arrow.apache.org)
[2] Apache Arrow docs: pyarrow.parquet.ParquetWriter / write_batch (arrow.apache.org)
[4] PyIceberg table scan API: to_arrow_batch_reader memory note (py.iceberg.apache.org)
[6] Apache Arrow docs: pyarrow.Table.combine_chunks (arrow.apache.org)
[7] PyIceberg table API: append(pa.Table) and add_files(...) (py.iceberg.apache.org)
[8] Apache Arrow docs: pyarrow.RecordBatchReader / read_all materializes a Table (arrow.apache.org)
Citations:
- 1: https://arrow.apache.org/docs/5.0/python/generated/pyarrow.concat_tables.html?utm_source=openai
- 2: https://arrow.apache.org/docs/11.0/python/generated/pyarrow.parquet.ParquetWriter.html?utm_source=openai
- 3: https://py.iceberg.apache.org/reference/pyiceberg/table/?utm_source=openai
- 4: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html?utm_source=openai
- 5: https://py.iceberg.apache.org/reference/pyiceberg/table/?utm_source=openai
- 6: https://arrow.apache.org/docs/10.0/python/generated/pyarrow.RecordBatchReader.html?utm_source=openai
🌐 Web query:
PyArrow memory efficiency large datasets multiple batches
💡 Result:
For memory-efficient PyArrow processing of large datasets, prefer a streaming, batch-at-a-time approach and avoid materializing a full Table unless you truly need it:
- Scan/iterate in
RecordBatches (streaming) instead ofdataset.to_table(): usepyarrow.datasetand aScannerso you can process one batch at a time (and keep peak RAM bounded by batch size + readahead buffers). The dataset scan API is designed to only load/project needed columns and can push down filters where possible. [1][2] - Tune batch sizing for RAM: reduce
batch_sizeif batches “overflow memory” (PyArrow explicitly calls this out). [1][2] - Limit read-ahead (often the hidden RAM multiplier):
batch_readahead(batches per file) andfragment_readahead(files) increase RAM usage when raised; lower them if you’re memory-bound. [1][2] - Project columns early: scanning with a column subset avoids “loading, copying, and deserializing” columns you don’t need. [1][2]
- Avoid
combine_chunks()on very large/chunked tables unless required: it concatenates chunks into larger contiguous buffers, which can increase peak memory due to new allocations/copies. [3]
Practical pattern: dataset.scanner(columns=[...], filter=..., batch_size=..., batch_readahead=..., fragment_readahead=...) → iterate scanner.to_batches() / scanner.scan_batches() → process/write each batch incrementally (so you never hold the full dataset in memory). [1][2]
Sources: Apache Arrow PyArrow docs on Dataset/Scanner scan options (batch size + readahead + projection) [1][2] and Table.combine_chunks() behavior [3].
🌐 Web query:
Iceberg table append multiple batches vs single batch
💡 Result:
In Apache Iceberg, each append commit creates a new snapshot (a new table version) and updates table metadata. So the main difference between many appends (multiple batches) vs one append (single batch) is how many commits/snapshots/manifests/files you create.
Many appends (multiple small batches)
Pros
- Lower latency / better for streaming-style ingestion (commit as data arrives).
- Iceberg supports “fast append” behavior that can add new manifests instead of rewriting existing ones, reducing write work per commit. [1]
Cons
- Metadata growth: every batch ⇒ new snapshot; frequent commits accumulate snapshots quickly and increase metadata that must be tracked and planned against. [2]
- Small files + many manifests: small batches often create many small data files; and fast append can produce many small manifest files unless you rewrite/compact them. This degrades query planning and read performance until you maintain the table. [2]
- Requires regular maintenance (expire snapshots, rewrite data files/manifests, etc.). [3]
Iceberg explicitly recommends tuning commit rate for streaming (e.g., avoid very frequent triggers) because high commit rates produce lots of files/manifests/snapshots. [4]
One append (single larger batch)
Pros
- Fewer snapshots/commits ⇒ less metadata churn and fewer manifest lists to track. (Each write creates a snapshot.) [3]
- Typically produces larger files (closer to target size), which is better for read performance and metadata size.
Cons
- Higher end-to-end latency (you wait to accumulate data).
- Larger single commits can increase the “blast radius” of a failed job (more data to retry) and may contend more with other writers (depending on your engine and commit coordination).
Practical guidance (rule of thumb)
- If you can, prefer fewer, larger appends to reduce snapshot/metadata overhead (especially for batch ingestion). (Each write ⇒ snapshot.) [3]
- If you must write many small batches (streaming / micro-batching), plan for:
- Less frequent commits (increase trigger interval) to reduce snapshot/file/manifest proliferation. [4]
- Compaction: run
rewrite_data_filesto address small files. [2] - Manifest rewrite: run
rewrite_manifeststo reduce many small manifests created by fast appends. [2] - Snapshot expiration to control metadata growth. [2][3]
- Use table properties to target sane file sizes (e.g.,
write.target-file-size-bytes). [5] - Be careful with “fastest write” settings that skip reshuffles: e.g.,
write.distribution-mode=nonecan speed writes (esp. streaming) but tends to produce many small files and hurt reads. [6]
Bottom line: Multiple batches are fine (and common) but usually cost you more snapshots + more small files/manifests, so you’ll need maintenance and/or larger micro-batches. Single batch generally yields cleaner metadata and file layout (better for reads) at the cost of higher latency.
Sources: Apache Iceberg spec and docs, plus AWS guidance. [1][2][3][4][5][6]
Cap memory usage by appending per-file instead of accumulating all records.
The current implementation loads all rows from all files into converted_records before building PyArrow arrays, which risks OOM on large syncs. Since iceberg_table.append() accepts individual PyArrow tables, could we move the array-building and append logic inside the file loop to batch per-file? This keeps peak memory bounded to file-size rather than total-sync-size. The suggested diff approach would work well here—thoughts?
💡 Suggested batching approach (per file)
- # Read all JSONL files and convert records to match the Arrow schema
- converted_records: list[dict[str, Any]] = []
for file_path in files:
+ converted_records: list[dict[str, Any]] = []
# Read gzip-compressed JSONL file line by line (progressive read for large files)
with gzip.open(file_path, mode="rt", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
record = json.loads(line)
# Convert record to match the Arrow schema
converted = convert_record_to_arrow_schema(
record, arrow_schema, self.normalizer
)
converted_records.append(converted)
- if not converted_records:
- return table_name
+ if not converted_records:
+ continue
- # Build PyArrow arrays for each column
- arrays: list[pa.Array] = []
- for field in arrow_schema:
- column_values = [record.get(field.name) for record in converted_records]
- # Create array with the correct type
- array = pa.array(column_values, type=field.type)
- arrays.append(array)
+ # Build PyArrow arrays for each column
+ arrays: list[pa.Array] = []
+ for field in arrow_schema:
+ column_values = [record.get(field.name) for record in converted_records]
+ # Create array with the correct type
+ array = pa.array(column_values, type=field.type)
+ arrays.append(array)
- # Create PyArrow table from arrays
- arrow_table = pa.Table.from_arrays(arrays, schema=arrow_schema)
+ # Create PyArrow table from arrays
+ arrow_table = pa.Table.from_arrays(arrays, schema=arrow_schema)
- # Append to the Iceberg table
- iceberg_table.append(arrow_table)
+ # Append to the Iceberg table
+ iceberg_table.append(arrow_table)
return table_name🤖 Prompt for AI Agents
In `@airbyte/_processors/sql/iceberg.py` around lines 1060 - 1117, The
implementation of _write_files_to_new_table accumulates all converted_records
across files causing unbounded memory use; change it to convert and build a
PyArrow table per file and call iceberg_table.append(...) for each file instead
of after the loop. Concretely, inside the for file_path in files loop, read and
convert each line into a per-file list (replace converted_records with a local
per_file_records), build arrays from arrow_schema for that file, construct a
pa.Table and call iceberg_table.append(pa_table) immediately, and only return
table_name at the end; keep the arrow_schema retrieval via
iceberg_table.schema().as_arrow() and reuse it across files. Ensure you skip
appending if a file yields no records so behavior matches the original
early-return semantics.
…syncs - Modified _write_files_to_new_table to store Arrow tables in memory instead of writing immediately - Updated _append_temp_table_to_final_table to use stored Arrow table with append() - Implemented _emulated_merge_temp_table_to_final_table using PyIceberg upsert() for merge-on-read semantics - Updated _swap_temp_table_with_final_table to use overwrite() for REPLACE write method - Added _init_pending_tables helper to initialize the pending tables storage This two-phase write approach allows using the correct Iceberg operation (append vs upsert vs overwrite) based on the write method, enabling proper deduplication for incremental syncs. Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
airbyte/_processors/sql/iceberg.py (3)
465-472: Consider creating local directories for string paths too?Currently,
_get_warehouse_pathonly creates directories whenwarehouse_pathis aPathobject. If a user passes a local filesystem path as a plain string (e.g.,"/path/to/warehouse"), the directory won't be created automatically, which could lead to runtime failures when PyIceberg tries to write.Would it make sense to detect local path strings (those without a
://scheme) and create them too, wdyt?💡 Suggested enhancement
def _get_warehouse_path(self) -> str: """Get the warehouse path as a string.""" warehouse = self.warehouse_path if isinstance(warehouse, Path): # Ensure the directory exists for local warehouses warehouse.mkdir(parents=True, exist_ok=True) return str(warehouse.absolute()) + # Handle string paths that look like local filesystem paths + if isinstance(warehouse, str) and "://" not in warehouse: + local_path = Path(warehouse) + local_path.mkdir(parents=True, exist_ok=True) + return str(local_path.absolute()) return warehouse
910-912: Class-level dict annotation may cause confusion.
_pending_arrow_tablesis annotated at the class level without a default value, but_init_pending_tablesassigns an instance-level dict. This works correctly (the annotation is just a type hint), but it might be clearer to either:
- Initialize it in
__init__(would require overriding), or- Add a comment clarifying this is just a type annotation
The current lazy initialization pattern in
_init_pending_tablesis fine, just wanted to flag it in case it wasn't intentional, wdyt?
1109-1137: Memory usage scales with total sync size.The current implementation accumulates all records from all files into
converted_recordsbefore building the Arrow table. For large syncs with many files, this could cause significant memory pressure.Since
iceberg_table.append()can be called multiple times (each call creates a new commit), would it make sense to process and append per-file instead of accumulating everything first? This would cap memory at file-size rather than total-sync-size.The trade-off is more Iceberg commits (more metadata overhead), but it prevents OOM for large datasets. You could also batch by record count as a middle ground, wdyt?
💡 Per-file batching approach
- # Read all JSONL files and convert records to match the Arrow schema - converted_records: list[dict[str, Any]] = [] for file_path in files: + converted_records: list[dict[str, Any]] = [] # Read gzip-compressed JSONL file line by line with gzip.open(file_path, mode="rt", encoding="utf-8") as f: for line in f: if not line.strip(): continue record = json.loads(line) converted = convert_record_to_arrow_schema( record, arrow_schema, self.normalizer ) converted_records.append(converted) - if not converted_records: - return table_name + if not converted_records: + continue - # Build PyArrow arrays for each column - arrays: list[pa.Array] = [] - # ... rest of array building and storage + # Build and store per-file Arrow table + arrays = [ + pa.array([r.get(f.name) for r in converted_records], type=f.type) + for f in arrow_schema + ] + file_arrow_table = pa.Table.from_arrays(arrays, schema=arrow_schema) + + # Accumulate or write per-file based on strategy + if final_table_name in self._pending_arrow_tables: + self._pending_arrow_tables[final_table_name] = pa.concat_tables([ + self._pending_arrow_tables[final_table_name], file_arrow_table + ]) + else: + self._pending_arrow_tables[final_table_name] = file_arrow_table
| @overrides | ||
| def _ensure_compatible_table_schema( | ||
| self, | ||
| stream_name: str, | ||
| table_name: str, | ||
| ) -> None: | ||
| """Ensure the Iceberg table schema is compatible with the stream schema. | ||
|
|
||
| For Iceberg, we handle schema evolution differently than SQL databases. | ||
| Iceberg has native schema evolution support, so we don't need to add | ||
| columns via ALTER TABLE. The schema is managed when creating the table. | ||
| """ | ||
| # Iceberg handles schema evolution natively - no action needed here | ||
| # The table schema is set when the table is created in _create_iceberg_table | ||
| pass |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check if the file exists and examine the method in question
fd "iceberg.py" --type fRepository: airbytehq/PyAirbyte
Length of output: 123
🏁 Script executed:
# Look for the specific method and surrounding context
rg -n "_ensure_compatible_table_schema" airbyte/_processors/sql/iceberg.py -A 20Repository: airbytehq/PyAirbyte
Length of output: 1013
🏁 Script executed:
# Search for schema update logic in the iceberg file
rg -n "update_schema|schema.*evolution|add_column" airbyte/_processors/sql/iceberg.py -B 2 -A 2Repository: airbytehq/PyAirbyte
Length of output: 612
🏁 Script executed:
# Search for _write_files_to_new_table method to understand how data is written
rg -n "_write_files_to_new_table" airbyte/_processors/sql/iceberg.py -A 30Repository: airbytehq/PyAirbyte
Length of output: 4341
🏁 Script executed:
# Check if update_schema is used anywhere in the codebase
rg -n "update_schema" airbyte/ --type pyRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Find _get_or_create_iceberg_table method
rg -n "_get_or_create_iceberg_table" airbyte/_processors/sql/iceberg.py -A 25Repository: airbytehq/PyAirbyte
Length of output: 2684
🏁 Script executed:
# Look for schema comparison logic and see how records are converted
rg -n "converted_records|convert.*record|_normalize_record" airbyte/_processors/sql/iceberg.py -B 2 -A 10Repository: airbytehq/PyAirbyte
Length of output: 2762
🏁 Script executed:
# Check if there's any incoming stream schema handling
rg -n "stream.*schema|get_stream_schema" airbyte/_processors/sql/iceberg.py -B 2 -A 5Repository: airbytehq/PyAirbyte
Length of output: 880
🏁 Script executed:
# Search for schema comparison in the broader codebase
rg -n "compare.*schema|schema.*mismatch|schema.*evolution" airbyte/_processors/sql/ -B 2 -A 2Repository: airbytehq/PyAirbyte
Length of output: 927
🏁 Script executed:
# Look at the full convert_record_to_arrow_schema function implementation
rg -n "def convert_record_to_arrow_schema" airbyte/_processors/sql/iceberg.py -A 35Repository: airbytehq/PyAirbyte
Length of output: 1352
🏁 Script executed:
# Check if there's any logic in _create_iceberg_table that might handle schema evolution
rg -n "_create_iceberg_table" airbyte/_processors/sql/iceberg.py -A 20Repository: airbytehq/PyAirbyte
Length of output: 4547
🏁 Script executed:
# Check if there's any validation or comparison of stream schema vs table schema
rg -n "get_stream_schema|stream_schema|expected.*schema" airbyte/_processors/sql/iceberg.pyRepository: airbytehq/PyAirbyte
Length of output: 195
Silently dropping new fields on schema evolution.
The concern is valid: when the source schema evolves with new fields, convert_record_to_arrow_schema (lines 284-313) iterates only through the existing table schema and silently discards any extra fields from the incoming records. This happens at line 1130 where column values are extracted only for fields in the existing arrow_schema, causing data loss.
Additionally, _write_files_to_new_table (line 1107) loads the existing table's schema rather than comparing it against the current stream schema, so evolved columns are never detected or added.
The current _ensure_compatible_table_schema is a no-op, yet the docstring claims "Iceberg has native schema evolution support" — which is true for PyIceberg (via update_schema()), but this code doesn't use it.
Should we implement proper schema evolution by comparing the incoming stream schema against the existing table schema and calling iceberg_table.update_schema() to add missing columns, or would you prefer to document this as a known limitation for now, wdyt?
When warehouse_path is passed as a string (not Path object) representing a local path (no URI scheme like s3://), now creates the directory to avoid runtime failures. This matches the behavior for Path objects. Co-Authored-By: AJ Steers <aj@airbyte.io>
| # Check if field is nullable (default to True for safety) | ||
| prop_type = prop_def.get("type") | ||
| is_nullable = True | ||
| if isinstance(prop_type, list): | ||
| is_nullable = "null" in prop_type |
There was a problem hiding this comment.
🟡 Union types without explicit null marked as required, may cause write failures
In _convert_object_to_struct, when the JSON schema property type is a list like ["string", "integer"] (without "null"), the field is marked as required=True in the Iceberg schema. This may cause write failures when records don't have values for these fields.
Click to expand
Code at airbyte/_processors/sql/iceberg.py:828-832:
# Check if field is nullable (default to True for safety)
prop_type = prop_def.get("type")
is_nullable = True
if isinstance(prop_type, list):
is_nullable = "null" in prop_typeBehavior:
- If
prop_type = "string"(not a list):is_nullable = True→ field is optional ✓ - If
prop_type = ["string", "null"]:is_nullable = True→ field is optional ✓ - If
prop_type = ["string", "integer"]:is_nullable = False→ field is required ✗
Impact:
When a source sends records with missing values for fields that have union types without explicit null (e.g., anyOf(string, integer)), Iceberg may reject the write because the schema marks these fields as required. The comment states "default to True for safety" but the implementation doesn't apply this safety default to list-type unions without null.
Recommendation: Consider keeping is_nullable = True for all cases within _convert_object_to_struct to maintain the safety default, or explicitly handle union types without null as nullable since records may legitimately have missing values for these fields.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@airbyte/_processors/sql/iceberg.py`:
- Around line 99-100: Docstrings reference the wrong column name for the
placeholder JSON column; update the text to match the actual name used by the
helper. Locate the stringify function and any related docstrings (mentions of
`_additional_properties_json`) and change them to `_additional_properties__json`
(double underscore) so docs and code align; ensure any occurrence in function
docstrings or comments (including the section around the helper that documents
placeholder column behavior) is updated consistently.
- Around line 458-543: The code silently falls back to a SQLite URI in
_get_catalog_uri when catalog_uri is missing, causing later failures for
REST/Hive catalogs; add an early validation (e.g., a pydantic validator or a
model validate/__post_init__ hook) that checks catalog_type and ensures
catalog_uri is provided when catalog_type is "rest" or "hive" (or any
non-"sql"/non-"glue" types that require a URI) and raise a clear ValueError
mentioning catalog_type and missing catalog_uri; reference the fields
catalog_type, catalog_uri and the methods _get_catalog_uri and get_catalog so
the validator prevents calling get_catalog with an invalid config.
| - `stringify`: Create a placeholder column `_additional_properties_json` to hold | ||
| additional property data as a JSON string. |
There was a problem hiding this comment.
Align docstrings with the actual _additional_properties__json column name?
The docs mention _additional_properties_json, but the helper builds _additional_properties__json. Could we align the wording to avoid confusion, wdyt?
✏️ Docstring adjustment
- - `stringify`: Create a placeholder column `_additional_properties_json` to hold
+ - `stringify`: Create a placeholder column `_additional_properties__json` to hold- - 'stringify': Create `_additional_properties_json` column for extra data."""
+ - 'stringify': Create `_additional_properties__json` column for extra data."""Also applies to: 432-437
🤖 Prompt for AI Agents
In `@airbyte/_processors/sql/iceberg.py` around lines 99 - 100, Docstrings
reference the wrong column name for the placeholder JSON column; update the text
to match the actual name used by the helper. Locate the stringify function and
any related docstrings (mentions of `_additional_properties_json`) and change
them to `_additional_properties__json` (double underscore) so docs and code
align; ensure any occurrence in function docstrings or comments (including the
section around the helper that documents placeholder column behavior) is updated
consistently.
| def _get_catalog_uri(self) -> str: | ||
| """Get the catalog URI, using default if not specified.""" | ||
| if self.catalog_uri: | ||
| return self.catalog_uri | ||
| # Default to SQLite catalog in the cache directory | ||
| return f"sqlite:///{DEFAULT_ICEBERG_CATALOG_DB}" | ||
|
|
||
| def _get_warehouse_path(self) -> str: | ||
| """Get the warehouse path as a string. | ||
|
|
||
| For local paths (both Path objects and strings without URI schemes), | ||
| ensures the directory exists before returning. | ||
| """ | ||
| warehouse = self.warehouse_path | ||
| if isinstance(warehouse, Path): | ||
| # Ensure the directory exists for local warehouses | ||
| warehouse.mkdir(parents=True, exist_ok=True) | ||
| return str(warehouse.absolute()) | ||
| # Handle string paths - create directory for local paths (no URI scheme) | ||
| if isinstance(warehouse, str) and "://" not in warehouse: | ||
| local_path = Path(warehouse) | ||
| local_path.mkdir(parents=True, exist_ok=True) | ||
| return str(local_path.absolute()) | ||
| return warehouse | ||
|
|
||
| def _get_s3_config(self) -> dict[str, str]: | ||
| """Get S3-specific configuration for PyIceberg. | ||
|
|
||
| Returns a dictionary of S3 configuration options that PyIceberg expects. | ||
| See: https://py.iceberg.apache.org/configuration/#s3 | ||
| """ | ||
| s3_config: dict[str, str] = {} | ||
|
|
||
| if self.aws_access_key_id: | ||
| s3_config["s3.access-key-id"] = str(self.aws_access_key_id) | ||
| if self.aws_secret_access_key: | ||
| s3_config["s3.secret-access-key"] = str(self.aws_secret_access_key) | ||
| if self.s3_bucket_region: | ||
| s3_config["s3.region"] = self.s3_bucket_region | ||
| if self.s3_endpoint: | ||
| s3_config["s3.endpoint"] = self.s3_endpoint | ||
|
|
||
| return s3_config | ||
|
|
||
| def _get_glue_config(self) -> dict[str, str]: | ||
| """Get Glue-specific configuration for PyIceberg. | ||
|
|
||
| Returns a dictionary of Glue configuration options that PyIceberg expects. | ||
| See: https://py.iceberg.apache.org/configuration/#glue-catalog | ||
| """ | ||
| glue_config: dict[str, str] = {} | ||
|
|
||
| if self.glue_id: | ||
| glue_config["glue.id"] = self.glue_id | ||
| if self.s3_bucket_region: | ||
| glue_config["glue.region"] = self.s3_bucket_region | ||
|
|
||
| return glue_config | ||
|
|
||
| def get_catalog(self) -> Catalog: | ||
| """Get or create the Iceberg catalog instance.""" | ||
| if self._catalog is not None: | ||
| return self._catalog | ||
|
|
||
| catalog_config: dict[str, Any] = { | ||
| "warehouse": self._get_warehouse_path(), | ||
| } | ||
|
|
||
| # Add S3 configuration if any S3 settings are provided | ||
| catalog_config.update(self._get_s3_config()) | ||
|
|
||
| if self.catalog_type == "sql": | ||
| catalog_config["uri"] = self._get_catalog_uri() | ||
| self._catalog = SqlCatalog(self.catalog_name, **catalog_config) | ||
| elif self.catalog_type == "glue": | ||
| # Glue catalog configuration | ||
| catalog_config["type"] = "glue" | ||
| catalog_config.update(self._get_glue_config()) | ||
| self._catalog = load_catalog(self.catalog_name, **catalog_config) | ||
| else: | ||
| # For REST, Hive, etc., use the generic load_catalog | ||
| catalog_config["type"] = self.catalog_type | ||
| catalog_config["uri"] = self._get_catalog_uri() | ||
| if self.catalog_credential: | ||
| catalog_config["credential"] = str(self.catalog_credential) | ||
| self._catalog = load_catalog(self.catalog_name, **catalog_config) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the full IcebergConfig class to understand the model validator
head -n 500 airbyte/_processors/sql/iceberg.py | tail -n 200Repository: airbytehq/PyAirbyte
Length of output: 8165
🏁 Script executed:
# Find the model_validator and initialization code
rg -n "model_validator|catalog_type.*=|catalog_uri" airbyte/_processors/sql/iceberg.py | head -30Repository: airbytehq/PyAirbyte
Length of output: 989
🏁 Script executed:
# Look for the full _validate_typing_config and related validators
ast-grep --pattern $'@model_validator
$$$
def $_($$$) {
$$$
}'Repository: airbytehq/PyAirbyte
Length of output: 45
🌐 Web query:
PyIceberg load_catalog REST catalog required uri configuration
💡 Result:
For a PyIceberg REST catalog, load_catalog(...) requires you to provide the catalog uri (the base URL of the Iceberg REST server). If uri is missing, RestCatalog reads it from properties["uri"] and will fail during initialization. [2]
Option A: .pyiceberg.yaml
catalog:
prod:
type: rest
uri: http://rest-catalog/ws/
# optional (auth), depending on your server:
# credential: client_id:client_secret
# token: <bearer token>
# warehouse: <warehouse identifier>[1][3]
Then:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("prod")[3][4]
Option B: Environment variable
export PYICEBERG_CATALOG__PROD__URI="http://rest-catalog/ws/"(then load_catalog("prod")). [1]
Option C: Pass properties directly
from pyiceberg.catalog import load_catalog
catalog = load_catalog("prod", uri="http://127.0.0.1:8181")[3][4]
Notes
type: restis supported (and is the default catalog type), buturiis the key required setting for REST. [1][2]- The docs commonly show REST URIs ending with
/ws/(match your server’s configured base path). [1][3]
🌐 Web query:
PyIceberg hive catalog required parameters configuration
💡 Result:
For PyIceberg’s HiveCatalog, the required/commonly-needed configuration depends on what you’re doing:
Required (to initialize/connect)
uri: Hive Metastore Thrift URI (can be a comma-separated list for failover). (py.iceberg.apache.org)
Example (.pyiceberg.yaml):
catalog:
hive:
type: hive
uri: thrift://hive-metastore:9083Required if you want default table locations (i.e., you don’t pass location= when creating tables)
You must have either:
- a catalog-level
warehouseproperty, or - a namespace/database
locationproperty in the metastore
Otherwise PyIceberg raises: “No default path is set, please specify a location when creating a table”. (py.iceberg.apache.org)
Example with warehouse:
catalog:
hive:
type: hive
uri: thrift://hive-metastore:9083
warehouse: s3://my-bucket/warehouseOptional Hive-specific properties
hive.hive2-compatible(needed when using Hive 2.x)hive.kerberos-authenticationhive.kerberos-service-nameugi(py.iceberg.apache.org)
Citations:
- 1: https://py.iceberg.apache.org/reference/pyiceberg/catalog/hive/
- 2: https://py.iceberg.apache.org/configuration/
- 3: https://py.iceberg.apache.org/reference/pyiceberg/catalog/?utm_source=openai
- 4: https://py.iceberg.apache.org/reference/pyiceberg/catalog/?utm_source=openai
- 5: https://py.iceberg.apache.org/configuration/
Catch missing catalog_uri for REST/Hive catalogs at validation time?
Right now, when catalog_uri is missing for REST or Hive catalogs, _get_catalog_uri() silently falls back to a SQLite URI, which will fail later during catalog initialization since PyIceberg requires a proper uri for those catalog types. Would it make sense to add early validation in the model validator to catch this upfront with a clear error message?
🛠️ Possible validation addition
`@model_validator`(mode="after")
def _validate_typing_config(self) -> IcebergConfig:
"""Validate that typing configuration options are consistent."""
if self.object_typing == ObjectTypingMode.AS_JSON_STRINGS:
# additional_properties and anyof_properties are ignored in this mode
# but we don't need to warn - they just have no effect
pass
+ if self.catalog_type in {"rest", "hive"} and not self.catalog_uri:
+ raise PyAirbyteInputError(
+ message="catalog_uri is required for REST/Hive Iceberg catalogs.",
+ context={"catalog_type": self.catalog_type},
+ )
return self🤖 Prompt for AI Agents
In `@airbyte/_processors/sql/iceberg.py` around lines 458 - 543, The code silently
falls back to a SQLite URI in _get_catalog_uri when catalog_uri is missing,
causing later failures for REST/Hive catalogs; add an early validation (e.g., a
pydantic validator or a model validate/__post_init__ hook) that checks
catalog_type and ensures catalog_uri is provided when catalog_type is "rest" or
"hive" (or any non-"sql"/non-"glue" types that require a URI) and raise a clear
ValueError mentioning catalog_type and missing catalog_uri; reference the fields
catalog_type, catalog_uri and the methods _get_catalog_uri and get_catalog so
the validator prevents calling get_catalog with an invalid config.
Summary
Adds a new
IcebergCachethat stores synced data in Apache Iceberg format using PyIceberg. This enables users to cache data as Parquet files with Iceberg metadata for efficient querying with any Iceberg-compatible query engine.Key components:
IcebergConfig: Configuration supporting SQLite catalogs (local dev), REST catalogs, and AWS Glue catalogs with S3 storageIcebergProcessor: Writes data to Iceberg tables using a two-phase write approach for proper write method selectionIcebergTypeConverter: Maps JSON schema types to Iceberg types with sophisticated object typing configurationIcebergCache: User-facing API withget_pandas_dataframe(),get_arrow_dataset(), andget_iceberg_table()methodsObject typing configuration:
object_typing: Choose betweennested_types(structs, better query performance) oras_json_strings(JSON strings, more permissive)additional_properties: Handle undeclared properties withfail,ignore, orstringifymodesanyof_properties: Handle union types withfail,branch(separate subcolumns), orstringifymodes_airbyte_metais always stored as a StructType (never stringified), matching Kotlin S3 Data Lake destination behavior__jsonsuffix (e.g.,col_a__json) for clarity at query timeS3 and Glue support:
aws_access_key_id,aws_secret_access_key)s3_bucket_name,s3_bucket_region,s3_endpoint)glue_idfor AWS Account ID)Example script:
examples/run_iceberg_cache.pydemonstrates usage withsource-spacex-api.Updates since last revision
Merge-on-read implementation for incremental syncs:
_write_files_to_new_tablenow stores Arrow tables in memory, and the appropriate write method performs the actual Iceberg write_emulated_merge_temp_table_to_final_tablenow uses PyIceberg's nativeupsert()for merge-on-read semantics (delete files for existing records, append new records, deduplication at read time)_append_temp_table_to_final_tableusesappend()for simple appends without deduplication_swap_temp_table_with_final_tableusesoverwrite()for REPLACE write methodappend()when no primary keys are defined (no deduplication possible)Review & Testing Checklist for Human
catalog_provider.get_primary_keys()returns the correct columns and that upsert uses them properlynested_typesandas_json_stringsmodes with sources that have complex nested schemasSuggested test plan for merge-on-read:
Notes
Summary by CodeRabbit
New Features
Exports
Examples
Tests
Chores
✏️ Tip: You can customize this high-level summary in your review settings.