diff --git a/crates/fluss/src/metadata/datatype.rs b/crates/fluss/src/metadata/datatype.rs index 6d888d9d..21e1ce59 100644 --- a/crates/fluss/src/metadata/datatype.rs +++ b/crates/fluss/src/metadata/datatype.rs @@ -979,6 +979,94 @@ impl RowType { )) } + /// Checks whether this RowType is equal to another, including field IDs. + /// + /// Unlike the standard `PartialEq` implementation, this method also compares + /// the field IDs of each field, not just the names and data types. + /// It also recursively checks nested RowTypes for equality including field IDs. + /// + /// # Examples + /// + /// ``` + /// use fluss::metadata::{DataField, DataTypes, RowType}; + /// + /// let fields1 = vec![ + /// DataField::new_with_field_id("id", DataTypes::int(), None, 0), + /// DataField::new_with_field_id("name", DataTypes::string(), None, 1), + /// ]; + /// let row_type1 = RowType::new(fields1); + /// + /// let fields2 = vec![ + /// DataField::new_with_field_id("id", DataTypes::int(), None, 0), + /// DataField::new_with_field_id("name", DataTypes::string(), None, 1), + /// ]; + /// let row_type2 = RowType::new(fields2); + /// + /// assert!(row_type1.equals_with_field_id(&row_type2)); + /// + /// // Different field IDs + /// let fields3 = vec![ + /// DataField::new_with_field_id("id", DataTypes::int(), None, 0), + /// DataField::new_with_field_id("name", DataTypes::string(), None, 2), // Different ID + /// ]; + /// let row_type3 = RowType::new(fields3); + /// + /// assert!(!row_type1.equals_with_field_id(&row_type3)); + /// ``` + pub fn equals_with_field_id(&self, other: &RowType) -> bool { + // First check standard equality (nullable, field count, names, types) + if self.nullable != other.nullable || self.fields.len() != other.fields.len() { + return false; + } + + // Check each field for equality including field_id + for (self_field, other_field) in self.fields.iter().zip(other.fields.iter()) { + if self_field.name != other_field.name + || self_field.field_id != other_field.field_id + { + return false; + } + + // Recursively check data types for nested RowTypes + if !Self::data_type_equals_with_field_id( + &self_field.data_type, + &other_field.data_type, + ) { + return false; + } + } + + true + } + + /// Helper method to check if two DataTypes are equal, including field IDs for RowTypes. + fn data_type_equals_with_field_id(left: &DataType, right: &DataType) -> bool { + match (left, right) { + (DataType::Row(left_row), DataType::Row(right_row)) => { + left_row.equals_with_field_id(right_row) + } + (DataType::Array(left_arr), DataType::Array(right_arr)) => { + left_arr.nullable == right_arr.nullable + && Self::data_type_equals_with_field_id( + &left_arr.element_type, + &right_arr.element_type, + ) + } + (DataType::Map(left_map), DataType::Map(right_map)) => { + left_map.nullable == right_map.nullable + && Self::data_type_equals_with_field_id( + &left_map.key_type, + &right_map.key_type, + ) + && Self::data_type_equals_with_field_id( + &left_map.value_type, + &right_map.value_type, + ) + } + _ => left == right, + } + } + #[cfg(test)] pub fn with_data_types(data_types: Vec) -> Self { let mut fields: Vec = Vec::new(); @@ -1171,6 +1259,7 @@ pub struct DataField { pub name: String, pub data_type: DataType, pub description: Option, + pub field_id: i32, } impl DataField { @@ -1183,6 +1272,21 @@ impl DataField { name: name.into(), data_type, description, + field_id: -1, + } + } + + pub fn new_with_field_id>( + name: N, + data_type: DataType, + description: Option, + field_id: i32, + ) -> DataField { + DataField { + name: name.into(), + data_type, + description, + field_id, } } @@ -1193,6 +1297,10 @@ impl DataField { pub fn data_type(&self) -> &DataType { &self.data_type } + + pub fn field_id(&self) -> i32 { + self.field_id + } } impl Display for DataField { diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 7f0d2e2a..94f5eeda 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -250,10 +250,12 @@ impl SchemaBuilder { let data_fields = columns .iter() - .map(|c| DataField { + .enumerate() + .map(|(idx, c)| DataField { name: c.name.clone(), data_type: c.data_type.clone(), description: c.comment.clone(), + field_id: idx as i32, }) .collect();