diff --git a/docs/docs.json b/docs/docs.json index 023a1a7..0b15e36 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -176,6 +176,7 @@ "pages": [ "geneva/jobs/contexts", "geneva/jobs/backfilling", + "geneva/jobs/bulk-load-columns", "geneva/jobs/materialized-views", "geneva/jobs/lifecycle", "geneva/jobs/conflicts", diff --git a/docs/geneva/jobs/bulk-load-columns.mdx b/docs/geneva/jobs/bulk-load-columns.mdx new file mode 100644 index 0000000..c44adb5 --- /dev/null +++ b/docs/geneva/jobs/bulk-load-columns.mdx @@ -0,0 +1,207 @@ +--- +title: Bulk Loading & Updating Columns +sidebarTitle: Bulk Load / Update Columns +description: Load or update column data from external sources (Parquet, Lance, IPC) into your LanceDB table using a primary-key join. +icon: columns-3 +--- + +Introduced in Geneva 0.13.0 + +## Overview + +A common scenario is having column data that **already exists** in an external dataset — embeddings from a vendor, features exported from a data warehouse, or columnar data in cloud storage — that you want to load into an existing LanceDB table. + +`load_columns` joins value columns from an external source into your table by primary key. It works for both use cases: + +- **Adding new columns:** If the specified columns don't exist in the destination table, they are created automatically. +- **Updating existing columns:** If the columns already exist, matched rows are updated with the source values. Unmatched rows are controlled by the `on_missing` parameter. + +**Destination table (before):** + +| pk | col_a | col_b | +|----|-------|-------| +| 1 | x | 10 | +| 2 | y | 20 | +| 3 | z | 30 | + +**External source (Parquet / Lance / IPC):** + +| pk | embedding | +|----|------------| +| 1 | [.1, .2] | +| 2 | [.3, .4] | +| 3 | [.5, .6] | + +**Destination table (after `load_columns` join on `pk`):** + +| pk | col_a | col_b | embedding | +|----|-------|-------|-----------| +| 1 | x | 10 | [.1, .2] | +| 2 | y | 20 | [.3, .4] | +| 3 | z | 30 | [.5, .6] | + +### When to use + +- **Loading new columns:** Attach pre-computed embeddings from a vendor, or add features exported from Spark/BigQuery as Parquet. +- **Updating existing columns:** Replace outdated embeddings with a newer model's output, or refresh feature values from an updated export. +- **Partial updates:** Update a subset of rows (e.g., only rows whose embeddings were recomputed) while preserving all other values via carry semantics. +- **Format consolidation:** Merge columnar data spread across Parquet files into an existing Lance table. + +## Basic usage + +Supports Parquet, Lance, and IPC sources. The format is auto-detected from the URI suffix, or can be overridden with `source_format`. You can load one or more columns in a single call. + + +```python Python icon="python" +import lancedb + +db = lancedb.connect("my_db") +table = db.open_table("my_table") + +# Add a new embedding column from a Parquet source +table.load_columns( + source="s3://bucket/embeddings/", + pk="document_id", + columns=["embedding"], +) + +# Later, update the same column with refreshed embeddings +table.load_columns( + source="s3://bucket/embeddings_v2/", + pk="document_id", + columns=["embedding"], +) +``` + + +For non-blocking execution, use `load_columns_async` which returns a `JobFuture` — call `.result()` to block until completion. + +## Handling missing keys + +When the source doesn't cover every row in the destination, the `on_missing` parameter controls what happens to unmatched rows: + +| Mode | Behavior | +|------|----------| +| `"carry"` (default) | Keep existing value. NULL if the column is new. | +| `"null"` | Explicitly set to NULL. | +| `"error"` | Raise an error on the first unmatched row. | + + +```python Python icon="python" +# Default: unmatched rows keep their current value +table.load_columns( + source="s3://bucket/partial_embeddings/", + pk="document_id", + columns=["embedding"], + on_missing="carry", +) + +# Strict mode: fail if source doesn't cover all rows +table.load_columns( + source="s3://bucket/embeddings/", + pk="document_id", + columns=["embedding"], + on_missing="error", +) +``` + + +The `carry` mode is particularly important for partial and multi-pass loads — it ensures that previously loaded values are never overwritten. + +## Performance tuning + +### Concurrency + +The `concurrency` parameter controls the number of worker processes. The default is 8 — set this to match your available cluster resources. + + +```python Python icon="python" +table.load_columns( + source="s3://bucket/embeddings/", + pk="document_id", + columns=["embedding"], + concurrency=16, +) +``` + + +### Checkpointing + +Bulk load jobs checkpoint each batch for fault tolerance, using the same infrastructure as [backfill jobs](/geneva/jobs/backfilling#adaptive-checkpoint-sizing). Key parameters: + +- `checkpoint_interval_seconds`: Target seconds per checkpoint batch (default 60s). The adaptive sizer grows or shrinks batch sizes to hit this target. +- `min_checkpoint_size` / `max_checkpoint_size`: Bounds for adaptive sizing. + + +If your job is small enough to complete without needing fault tolerance, you can get better performance by effectively disabling checkpoints. Increase `checkpoint_interval_seconds` to a large value and set `min_checkpoint_size` high enough that each worker processes its entire workload in a single batch. + + +### Commit visibility + +For long-running jobs, `commit_granularity` controls how many fragments complete before an intermediate commit makes partial results visible to readers. + + +```python Python icon="python" +table.load_columns( + source="s3://bucket/embeddings/", + pk="document_id", + columns=["embedding"], + commit_granularity=10, +) +``` + + +### Multi-pass loads for large sources + +`load_columns` builds an in-memory primary-key index from the source. If the source is too large to fit in memory, split it into chunks and run sequential calls. Carry semantics guarantee correctness across passes. + +Index memory depends on primary key type: + +| PK type | ~Memory per row | 100M rows | 1B rows | +|---------|----------------|-----------|---------| +| int64 | ~8 bytes | ~800 MB | ~8 GB | +| string (avg 32 bytes) | ~32 bytes | ~3.2 GB | ~32 GB | + +Choose N so that `source_size / N` fits in memory: + + +```python Python icon="python" +import pyarrow.dataset as pads + +# Discover source files (metadata only, no data I/O) +source_files = pads.dataset("s3://bucket/embeddings/", format="parquet").files + +# Split into N chunks and run sequentially +N = 4 +total = len(source_files) +for i in range(N): + chunk = source_files[i * total // N : (i + 1) * total // N] + table.load_columns( + source=chunk, + pk="document_id", + columns=["embedding"], + ) +``` + + +Each pass reads only its assigned files, so total source I/O stays at 1x. If the source is already partitioned into subdirectories, pass each URI directly: + + +```python Python icon="python" +for shard in range(4): + table.load_columns( + source=f"s3://bucket/embeddings/shard_{shard}/", + pk="document_id", + columns=["embedding"], + ) +``` + + + +Multi-pass loads must run **sequentially**, not concurrently. Two `load_columns` calls running at the same time against the same column produce an interleaved end state. Use a plain `for` loop, not `concurrent.futures`. + + +## Reference + +* [`load_columns` API](https://lancedb.github.io/geneva/api/table/#geneva.table.Table.load_columns) +* [`load_columns_async` API](https://lancedb.github.io/geneva/api/table/#geneva.table.Table.load_columns_async)