Skip to content

Commit 2775b55

Browse files
authored
Iceberg sink: upsert rendering pass. (#34033)
The iceberg sink is a multistage dataflow comprised of the following operators: - Batch description minting: The batch minter picks time bounds `x` time wide into the future which all subsequent operators use. - Iceberg Writer Iceberg writer writes data files (parquet) with times bounded by the batch descriptions - Iceberg committer Coalesces the files from all writers and commits them to the catalog ## Missing - [ ] metrics - [ ] tests <!-- Describe the contents of the PR briefly but completely. If you write detailed commit messages, it is acceptable to copy/paste them here, or write "see commit messages for details." If there is only one commit in the PR, GitHub will have already added its commit message above. --> ### Motivation <!-- Which of the following best describes the motivation behind this PR? * This PR fixes a recognized bug. [Ensure issue is linked somewhere.] * This PR adds a known-desirable feature. [Ensure issue is linked somewhere.] * This PR fixes a previously unreported bug. [Describe the bug in detail, as if you were filing a bug report.] * This PR adds a feature that has not yet been specified. [Write a brief specification for the feature, including justification for its inclusion in Materialize, as if you were writing the original feature specification.] * This PR refactors existing code. [Describe what was wrong with the existing code, if it is not obvious.] --> ### Tips for reviewer <!-- Leave some tips for your reviewer, like: * The diff is much smaller if viewed with whitespace hidden. * [Some function/module/file] deserves extra attention. * [Some function/module/file] is pure code movement and only needs a skim. Delete this section if no tips. --> ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent c83f4eb commit 2775b55

File tree

33 files changed

+1782
-181
lines changed

33 files changed

+1782
-181
lines changed

Cargo.lock

Lines changed: 132 additions & 91 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,8 +349,9 @@ async-compression = { git = "https://github.com/MaterializeInc/async-compression
349349

350350
# Custom iceberg features for mz
351351
# All changes should go to the `mz_changes` branch.
352-
iceberg = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "c2d70cbfc4822ea826198c7787c709beb652fa31" }
353-
iceberg-catalog-rest = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "c2d70cbfc4822ea826198c7787c709beb652fa31" }
352+
iceberg = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "cf1e6e0c9de3" }
353+
iceberg-catalog-rest = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "cf1e6e0c9de3" }
354+
iceberg-catalog-s3tables = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "cf1e6e0c9de3" }
354355

355356

356357
# BEGIN LINT CONFIG

deny.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ skip = [
127127
{ name = "darling_macro", version = "0.20.11" },
128128
{ name = "darling_core", version = "0.20.11" },
129129
{ name = "darling", version = "0.20.11" },
130-
{ name = "typed-builder", version = "0.21.2" },
131-
{ name = "typed-builder-macro", version = "0.21.2" },
130+
{ name = "typed-builder", version = "0.20.1" },
131+
{ name = "typed-builder-macro", version = "0.20.1" },
132132
# chrono-tz
133133
{ name = "phf", version = "0.11.3" },
134134
{ name = "phf_shared", version = "0.11.3" },

src/adapter/src/catalog/state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,6 +1418,7 @@ impl CatalogState {
14181418
with_snapshot,
14191419
resolved_ids,
14201420
cluster_id: in_cluster,
1421+
commit_interval: sink.commit_interval,
14211422
}),
14221423
Plan::CreateType(CreateTypePlan { typ, .. }) => {
14231424
// Even if we don't need the `RelationDesc` here, error out

src/adapter/src/coord.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2862,6 +2862,7 @@ impl Coordinator {
28622862
version: sink.version,
28632863
from_storage_metadata: (),
28642864
to_storage_metadata: (),
2865+
commit_interval: sink.commit_interval,
28652866
},
28662867
instance_id: sink.cluster_id,
28672868
},

src/adapter/src/coord/ddl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,7 @@ impl Coordinator {
987987
version: sink.version,
988988
from_storage_metadata: (),
989989
to_storage_metadata: (),
990+
commit_interval: sink.commit_interval,
990991
};
991992

992993
let collection_desc = CollectionDescription {

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,6 +1105,7 @@ impl Coordinator {
11051105
with_snapshot,
11061106
resolved_ids,
11071107
cluster_id: in_cluster,
1108+
commit_interval: sink.commit_interval,
11081109
};
11091110

11101111
let ops = vec![catalog::Op::CreateItem {
@@ -3413,6 +3414,7 @@ impl Coordinator {
34133414
with_snapshot,
34143415
resolved_ids: resolved_ids.clone(),
34153416
cluster_id: in_cluster,
3417+
commit_interval: sink_plan.commit_interval,
34163418
};
34173419

34183420
let ops = vec![catalog::Op::UpdateItem {
@@ -3448,6 +3450,7 @@ impl Coordinator {
34483450
version: sink_plan.version,
34493451
from_storage_metadata: (),
34503452
to_storage_metadata: (),
3453+
commit_interval: sink_plan.commit_interval,
34513454
};
34523455

34533456
self.controller

src/arrow-util/src/builder.rs

Lines changed: 93 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,52 @@ use mz_ore::cast::CastFrom;
2727
use mz_repr::adt::jsonb::JsonbRef;
2828
use mz_repr::{Datum, RelationDesc, Row, SqlScalarType};
2929

30+
const EXTENSION_PREFIX: &str = "materialize.v1.";
31+
3032
pub struct ArrowBuilder {
3133
columns: Vec<ArrowColumn>,
3234
/// A crude estimate of the size of the data in the builder
3335
/// based on the size of the rows added to it.
3436
row_size_bytes: usize,
37+
/// The original schema, if provided. Used to preserve metadata in to_record_batch().
38+
original_schema: Option<Arc<Schema>>,
39+
}
40+
41+
/// Converts a RelationDesc to an Arrow Schema.
42+
pub fn desc_to_schema(desc: &RelationDesc) -> Result<Schema, anyhow::Error> {
43+
let mut fields = vec![];
44+
let mut errs = vec![];
45+
let mut seen_names = BTreeMap::new();
46+
for (col_name, col_type) in desc.iter() {
47+
let mut col_name = col_name.to_string();
48+
// If we allow columns with the same name we encounter two issues:
49+
// 1. The arrow crate will accidentally reuse the same buffers for the columns
50+
// 2. Many parquet readers will error when trying to read the file metadata
51+
// Instead we append a number to the end of the column name for any duplicates.
52+
// TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
53+
seen_names
54+
.entry(col_name.clone())
55+
.and_modify(|e: &mut u32| {
56+
*e += 1;
57+
col_name += &e.to_string();
58+
})
59+
.or_insert(1);
60+
match scalar_to_arrow_datatype(&col_type.scalar_type) {
61+
Ok((data_type, extension_type_name)) => {
62+
fields.push(field_with_typename(
63+
&col_name,
64+
data_type,
65+
col_type.nullable,
66+
&extension_type_name,
67+
));
68+
}
69+
Err(err) => errs.push(err.to_string()),
70+
}
71+
}
72+
if !errs.is_empty() {
73+
anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
74+
}
75+
Ok(Schema::new(fields))
3576
}
3677

3778
impl ArrowBuilder {
@@ -60,43 +101,51 @@ impl ArrowBuilder {
60101
item_capacity: usize,
61102
data_capacity: usize,
62103
) -> Result<Self, anyhow::Error> {
104+
let schema = desc_to_schema(desc)?;
63105
let mut columns = vec![];
64-
let mut errs = vec![];
65-
let mut seen_names = BTreeMap::new();
66-
for (col_name, col_type) in desc.iter() {
67-
let mut col_name = col_name.to_string();
68-
// If we allow columns with the same name we encounter two issues:
69-
// 1. The arrow crate will accidentally reuse the same buffers for the columns
70-
// 2. Many parquet readers will error when trying to read the file metadata
71-
// Instead we append a number to the end of the column name for any duplicates.
72-
// TODO(roshan): We should document this when writing the copy-to-s3 MZ docs.
73-
seen_names
74-
.entry(col_name.clone())
75-
.and_modify(|e: &mut u32| {
76-
*e += 1;
77-
col_name += &e.to_string();
78-
})
79-
.or_insert(1);
80-
match scalar_to_arrow_datatype(&col_type.scalar_type) {
81-
Ok((data_type, extension_type_name)) => {
82-
columns.push(ArrowColumn::new(
83-
col_name,
84-
col_type.nullable,
85-
data_type,
86-
extension_type_name,
87-
item_capacity,
88-
data_capacity,
89-
)?);
90-
}
91-
Err(err) => errs.push(err.to_string()),
92-
}
106+
for field in schema.fields() {
107+
columns.push(ArrowColumn::new(
108+
field.name().clone(),
109+
field.is_nullable(),
110+
field.data_type().clone(),
111+
typename_from_field(field)?,
112+
item_capacity,
113+
data_capacity,
114+
)?);
93115
}
94-
if !errs.is_empty() {
95-
anyhow::bail!("Relation contains unimplemented arrow types: {:?}", errs);
116+
Ok(Self {
117+
columns,
118+
row_size_bytes: 0,
119+
original_schema: None,
120+
})
121+
}
122+
123+
/// Initializes a new ArrowBuilder with a pre-built Arrow Schema.
124+
/// This is useful when you need to preserve schema metadata (e.g., field IDs for Iceberg).
125+
/// `item_capacity` is used to initialize the capacity of each column's builder which defines
126+
/// the number of values that can be appended to each column before reallocating.
127+
/// `data_capacity` is used to initialize the buffer size of the string and binary builders.
128+
/// Errors if the schema contains an unimplemented type.
129+
pub fn new_with_schema(
130+
schema: Arc<Schema>,
131+
item_capacity: usize,
132+
data_capacity: usize,
133+
) -> Result<Self, anyhow::Error> {
134+
let mut columns = vec![];
135+
for field in schema.fields() {
136+
columns.push(ArrowColumn::new(
137+
field.name().clone(),
138+
field.is_nullable(),
139+
field.data_type().clone(),
140+
typename_from_field(field)?,
141+
item_capacity,
142+
data_capacity,
143+
)?);
96144
}
97145
Ok(Self {
98146
columns,
99147
row_size_bytes: 0,
148+
original_schema: Some(schema),
100149
})
101150
}
102151

@@ -118,7 +167,15 @@ impl ArrowBuilder {
118167
arrays.push(col.finish());
119168
fields.push((&col).into());
120169
}
121-
RecordBatch::try_new(Schema::new(fields).into(), arrays)
170+
171+
// If we have an original schema, use it to preserve metadata (e.g., field IDs)
172+
let schema = if let Some(original_schema) = self.original_schema {
173+
original_schema
174+
} else {
175+
Arc::new(Schema::new(fields))
176+
};
177+
178+
RecordBatch::try_new(schema, arrays)
122179
}
123180

124181
/// Appends a row to the builder.
@@ -437,7 +494,7 @@ fn field_with_typename(
437494
) -> Field {
438495
Field::new(name, data_type, nullable).with_metadata(HashMap::from([(
439496
"ARROW:extension:name".to_string(),
440-
format!("materialize.v1.{}", extension_type_name),
497+
format!("{}{}", EXTENSION_PREFIX, extension_type_name),
441498
)]))
442499
}
443500

@@ -447,7 +504,7 @@ fn typename_from_field(field: &Field) -> Result<String, anyhow::Error> {
447504
let extension_name = metadata
448505
.get("ARROW:extension:name")
449506
.ok_or_else(|| anyhow::anyhow!("Missing extension name in metadata"))?;
450-
if let Some(name) = extension_name.strip_prefix("materialize.v1") {
507+
if let Some(name) = extension_name.strip_prefix(EXTENSION_PREFIX) {
451508
Ok(name.to_string())
452509
} else {
453510
anyhow::bail!("Extension name {} does not match expected", extension_name,)
@@ -709,8 +766,8 @@ impl ArrowColumn {
709766
}
710767
builder.append(true).unwrap()
711768
}
712-
(_builder, datum) => {
713-
anyhow::bail!("Datum {:?} does not match builder", datum)
769+
(builder, datum) => {
770+
anyhow::bail!("Datum {:?} does not match builder {:?}", datum, builder)
714771
}
715772
}
716773
Ok(())

src/aws-secrets-controller/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ workspace = true
1515
[dependencies]
1616
anyhow = "1.0.100"
1717
async-trait = "0.1.89"
18-
aws-config = { version = "1.2.0", default-features = false }
19-
aws-credential-types = { version = "1.2.10", features = ["hardcoded-credentials"] }
20-
aws-sdk-secretsmanager = { version = "1.45.0", default-features = false, features = ["rt-tokio"] }
18+
aws-config = { version = "1.8.10", default-features = false }
19+
aws-credential-types = { version = "1.2.8", features = ["hardcoded-credentials"] }
20+
aws-sdk-secretsmanager = { version = "1.93.0", default-features = false, features = ["rt-tokio"] }
2121
aws-types = "1.3.9"
2222
futures = { version = "0.3.31" }
2323
mz-aws-util = { path = "../aws-util", default-features = false }

src/aws-util/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ workspace = true
1111

1212
[dependencies]
1313
anyhow = "1.0.100"
14-
aws-config = { version = "1.2.0", default-features = false }
15-
aws-sdk-s3 = { version = "1.48.0", default-features = false, features = [
14+
aws-config = { version = "1.8.10", default-features = false }
15+
aws-sdk-s3 = { version = "1.94.0", default-features = false, features = [
1616
"rt-tokio",
1717
], optional = true }
1818
aws-smithy-runtime-api = "1.9.2"

0 commit comments

Comments
 (0)