Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions crates/fluss/src/metadata/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,94 @@ impl RowType {
))
}

/// Checks whether this RowType is equal to another, including field IDs.
///
/// Unlike the standard `PartialEq` implementation, this method also compares
/// the field IDs of each field, not just the names and data types.
/// It also recursively checks nested RowTypes for equality including field IDs.
///
/// # Examples
///
/// ```
/// use fluss::metadata::{DataField, DataTypes, RowType};
///
/// let fields1 = vec![
/// DataField::new_with_field_id("id", DataTypes::int(), None, 0),
/// DataField::new_with_field_id("name", DataTypes::string(), None, 1),
/// ];
/// let row_type1 = RowType::new(fields1);
///
/// let fields2 = vec![
/// DataField::new_with_field_id("id", DataTypes::int(), None, 0),
/// DataField::new_with_field_id("name", DataTypes::string(), None, 1),
/// ];
/// let row_type2 = RowType::new(fields2);
///
/// assert!(row_type1.equals_with_field_id(&row_type2));
///
/// // Different field IDs
/// let fields3 = vec![
/// DataField::new_with_field_id("id", DataTypes::int(), None, 0),
/// DataField::new_with_field_id("name", DataTypes::string(), None, 2), // Different ID
/// ];
/// let row_type3 = RowType::new(fields3);
///
/// assert!(!row_type1.equals_with_field_id(&row_type3));
/// ```
pub fn equals_with_field_id(&self, other: &RowType) -> bool {
// First check standard equality (nullable, field count, names, types)
if self.nullable != other.nullable || self.fields.len() != other.fields.len() {
return false;
}

// Check each field for equality including field_id
for (self_field, other_field) in self.fields.iter().zip(other.fields.iter()) {
if self_field.name != other_field.name
|| self_field.field_id != other_field.field_id
{
return false;
}

// Recursively check data types for nested RowTypes
if !Self::data_type_equals_with_field_id(
&self_field.data_type,
&other_field.data_type,
) {
return false;
}
}

true
}

/// Helper method to check if two DataTypes are equal, including field IDs for RowTypes.
fn data_type_equals_with_field_id(left: &DataType, right: &DataType) -> bool {
match (left, right) {
(DataType::Row(left_row), DataType::Row(right_row)) => {
left_row.equals_with_field_id(right_row)
}
(DataType::Array(left_arr), DataType::Array(right_arr)) => {
left_arr.nullable == right_arr.nullable
&& Self::data_type_equals_with_field_id(
&left_arr.element_type,
&right_arr.element_type,
)
}
(DataType::Map(left_map), DataType::Map(right_map)) => {
left_map.nullable == right_map.nullable
&& Self::data_type_equals_with_field_id(
&left_map.key_type,
&right_map.key_type,
)
&& Self::data_type_equals_with_field_id(
&left_map.value_type,
&right_map.value_type,
)
}
_ => left == right,
}
}

#[cfg(test)]
pub fn with_data_types(data_types: Vec<DataType>) -> Self {
let mut fields: Vec<DataField> = Vec::new();
Expand Down Expand Up @@ -1171,6 +1259,7 @@ pub struct DataField {
pub name: String,
pub data_type: DataType,
pub description: Option<String>,
pub field_id: i32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Derived PartialEq/Hash now includes field_id, Java explicitly excludes it. This silently breaks equality

}

impl DataField {
Expand All @@ -1183,6 +1272,21 @@ impl DataField {
name: name.into(),
data_type,
description,
field_id: -1,
}
}

pub fn new_with_field_id<N: Into<String>>(
name: N,
data_type: DataType,
description: Option<String>,
field_id: i32,
) -> DataField {
DataField {
name: name.into(),
data_type,
description,
field_id,
}
}

Expand All @@ -1193,6 +1297,10 @@ impl DataField {
pub fn data_type(&self) -> &DataType {
&self.data_type
}

pub fn field_id(&self) -> i32 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs full wiring, JSON serde doesn't read/write field_id yet

self.field_id
}
}

impl Display for DataField {
Expand Down
4 changes: 3 additions & 1 deletion crates/fluss/src/metadata/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,12 @@ impl SchemaBuilder {

let data_fields = columns
.iter()
.map(|c| DataField {
.enumerate()
.map(|(idx, c)| DataField {
name: c.name.clone(),
data_type: c.data_type.clone(),
description: c.comment.clone(),
field_id: idx as i32,
})
.collect();

Expand Down