Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ fn encoded_meta(is_nullable: bool, has_lists: bool) -> Vec<u8> {
let mut buffer = Vec::with_capacity(1024);
{
let buf = TrackedWrite::new(&mut buffer);
let writer = ParquetMetaDataWriter::new_with_tracked(buf, &metadata);
let writer = ParquetMetaDataWriter::new_with_tracked(buf, &metadata)
.with_write_path_in_schema(false);
writer.finish().unwrap();
}

Expand Down
7 changes: 7 additions & 0 deletions parquet/src/bin/parquet-rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ struct Args {
#[clap(long)]
write_page_header_statistics: Option<bool>,

/// Write path_in_schema to the column metadata.
#[clap(long)]
write_path_in_schema: Option<bool>,

/// Sets whether bloom filter is enabled for all columns.
#[clap(long)]
bloom_filter_enabled: Option<bool>,
Expand Down Expand Up @@ -406,6 +410,9 @@ fn main() {
if let Some(value) = args.coerce_types {
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
}
if let Some(value) = args.write_path_in_schema {
writer_properties_builder = writer_properties_builder.set_write_path_in_schema(value);
}
if let Some(value) = args.write_batch_size {
writer_properties_builder = writer_properties_builder.set_write_batch_size(value);
}
Expand Down
17 changes: 13 additions & 4 deletions parquet/src/file/metadata/thrift/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,10 +1328,15 @@ pub(super) fn serialize_column_meta_data<W: Write>(
.encodings()
.collect::<Vec<_>>()
.write_thrift_field(w, 2, 1)?;
let path = column_chunk.column_descr.path().parts();
let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
path.write_thrift_field(w, 3, 2)?;
column_chunk.compression.write_thrift_field(w, 4, 3)?;
if w.write_path_in_schema() {
let path = column_chunk.column_descr.path().parts();
let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
path.write_thrift_field(w, 3, 2)?;
column_chunk.compression.write_thrift_field(w, 4, 3)?;
} else {
column_chunk.compression.write_thrift_field(w, 4, 2)?;
}

column_chunk.num_values.write_thrift_field(w, 5, 4)?;
column_chunk
.total_uncompressed_size
Expand Down Expand Up @@ -1401,6 +1406,8 @@ pub(super) fn serialize_column_meta_data<W: Write>(
pub(super) struct FileMeta<'a> {
pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
pub(super) row_groups: &'a Vec<RowGroupMetaData>,
// If true, then write the `path_in_schema` field in the ColumnMetaData struct.
pub(super) write_path_in_schema: bool,
}

// struct FileMetaData {
Expand All @@ -1420,6 +1427,8 @@ impl<'a> WriteThrift for FileMeta<'a> {
// needed for last_field_id w/o encryption
#[allow(unused_assignments)]
fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
writer.set_write_path_in_schema(self.write_path_in_schema);

self.file_metadata
.version
.write_thrift_field(writer, 1, 0)?;
Expand Down
21 changes: 20 additions & 1 deletion parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
created_by: Option<String>,
object_writer: MetadataObjectWriter,
writer_version: i32,
write_path_in_schema: bool,
}

impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
Expand Down Expand Up @@ -259,6 +260,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
let file_meta = FileMeta {
file_metadata: &file_metadata,
row_groups: &row_groups,
write_path_in_schema: self.write_path_in_schema,
};

// Write file metadata
Expand Down Expand Up @@ -293,6 +295,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
row_groups: Vec<RowGroupMetaData>,
created_by: Option<String>,
writer_version: i32,
write_path_in_schema: bool,
) -> Self {
Self {
buf,
Expand All @@ -304,6 +307,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
created_by,
object_writer: Default::default(),
writer_version,
write_path_in_schema,
}
}

Expand Down Expand Up @@ -415,6 +419,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
pub struct ParquetMetaDataWriter<'a, W: Write> {
buf: TrackedWrite<W>,
metadata: &'a ParquetMetaData,
write_path_in_schema: bool,
}

impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
Expand All @@ -436,7 +441,20 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
///
/// See example on the struct level documentation
pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
Self { buf, metadata }
Self {
buf,
metadata,
write_path_in_schema: true,
}
}

/// Set whether or not to write the `path_in_schema` field in the Thrift `ColumnMetaData`
/// struct.
pub fn with_write_path_in_schema(self, val: bool) -> Self {
Self {
write_path_in_schema: val,
..self
}
}

/// Write the metadata to the buffer
Expand All @@ -460,6 +478,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
row_groups,
created_by,
file_metadata.version(),
self.write_path_in_schema,
);

if let Some(column_indexes) = column_indexes {
Expand Down
36 changes: 36 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = Some(64);
pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false;
/// Default values for [`WriterProperties::coerce_types`]
pub const DEFAULT_COERCE_TYPES: bool = false;
/// Default value for [`WriterProperties::write_path_in_schema`]
pub const DEFAULT_WRITE_PATH_IN_SCHEMA: bool = true;
/// Default minimum chunk size for content-defined chunking: 256 KiB.
pub const DEFAULT_CDC_MIN_CHUNK_SIZE: usize = 256 * 1024;
/// Default maximum chunk size for content-defined chunking: 1024 KiB.
Expand Down Expand Up @@ -233,6 +235,7 @@ pub struct WriterProperties {
statistics_truncate_length: Option<usize>,
coerce_types: bool,
content_defined_chunking: Option<CdcOptions>,
write_path_in_schema: bool,
#[cfg(feature = "encryption")]
pub(crate) file_encryption_properties: Option<Arc<FileEncryptionProperties>>,
}
Expand Down Expand Up @@ -429,6 +432,14 @@ impl WriterProperties {
self.coerce_types
}

/// Returns `true` if the `path_in_schema` field of the `ColumnMetaData` Thrift struct
/// should be written.
///
/// For more details see [`WriterPropertiesBuilder::set_write_path_in_schema`]
pub fn write_path_in_schema(&self) -> bool {
self.write_path_in_schema
}

/// EXPERIMENTAL: Returns content-defined chunking options, or `None` if CDC is disabled.
///
/// For more details see [`WriterPropertiesBuilder::set_content_defined_chunking`]
Expand Down Expand Up @@ -560,6 +571,7 @@ pub struct WriterPropertiesBuilder {
statistics_truncate_length: Option<usize>,
coerce_types: bool,
content_defined_chunking: Option<CdcOptions>,
write_path_in_schema: bool,
#[cfg(feature = "encryption")]
file_encryption_properties: Option<Arc<FileEncryptionProperties>>,
}
Expand All @@ -584,6 +596,7 @@ impl Default for WriterPropertiesBuilder {
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
coerce_types: DEFAULT_COERCE_TYPES,
content_defined_chunking: None,
write_path_in_schema: DEFAULT_WRITE_PATH_IN_SCHEMA,
#[cfg(feature = "encryption")]
file_encryption_properties: None,
}
Expand Down Expand Up @@ -622,6 +635,7 @@ impl WriterPropertiesBuilder {
statistics_truncate_length: self.statistics_truncate_length,
coerce_types: self.coerce_types,
content_defined_chunking: self.content_defined_chunking,
write_path_in_schema: self.write_path_in_schema,
#[cfg(feature = "encryption")]
file_encryption_properties: self.file_encryption_properties,
}
Expand Down Expand Up @@ -837,6 +851,27 @@ impl WriterPropertiesBuilder {
self
}

/// Should the writer should emit the `path_in_schema` element of the
/// `ColumnMetaData` Thrift struct.
Comment on lines +854 to +855
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth making it more apparently that this will cause incompatibilities with some older readers:

Suggested change
/// Should the writer should emit the `path_in_schema` element of the
/// `ColumnMetaData` Thrift struct.
/// Should the writer should emit the `path_in_schema` element of the
/// `ColumnMetaData` Thrift struct. WARNING: this will make the parquet
/// file unreadable by some older parquet readers. See LINK HERE for details

///
/// WARNING: setting this to `true` will break compatibility with Parquet readers that
/// still expect this field to be present. For more context, see [GH-563].
///
/// The `path_in_schema` field in the Thrift metadata is redundant and wastes a great
/// deal of space. Parquet file footers can be made much smaller by omitting this field.
/// Because the field was originally a mandatory field, this property defaults to `true`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we choose to go this way I think it would help to give some more context here on what types of readers would be affected (basically all the parquet-java based readers prior to late 2026)

We could also perhaps provide a link to the discussion: https://lists.apache.org/thread/czm2bk45wwtkhhpqxqvmx9dk5wkwk1kt

/// to maintain compatibility with older readers that expect this field to be present.
/// If one knows that all readers one plans to use are tolerant of the absence of this field,
/// this may be safely set to `false`.
///
/// At some point in the future this may default to `false`.
///
/// [GH-563]: https://github.com/apache/parquet-format/issues/563
pub fn set_write_path_in_schema(mut self, write_path_in_schema: bool) -> Self {
self.write_path_in_schema = write_path_in_schema;
self
}

/// EXPERIMENTAL: Sets content-defined chunking options, or disables CDC with `None`.
///
/// When enabled, data page boundaries are determined by a rolling hash of the
Expand Down Expand Up @@ -1157,6 +1192,7 @@ impl From<WriterProperties> for WriterPropertiesBuilder {
statistics_truncate_length: props.statistics_truncate_length,
coerce_types: props.coerce_types,
content_defined_chunking: props.content_defined_chunking,
write_path_in_schema: props.write_path_in_schema,
#[cfg(feature = "encryption")]
file_encryption_properties: props.file_encryption_properties,
}
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,14 @@ impl<W: Write + Send> SerializedFileWriter<W> {
let column_indexes = std::mem::take(&mut self.column_indexes);
let offset_indexes = std::mem::take(&mut self.offset_indexes);

let write_path_in_schema = self.props.write_path_in_schema();
let mut encoder = ThriftMetadataWriter::new(
&mut self.buf,
&self.descr,
row_groups,
Some(self.props.created_by().to_string()),
self.props.writer_version().as_num(),
write_path_in_schema,
);

#[cfg(feature = "encryption")]
Expand Down
19 changes: 18 additions & 1 deletion parquet/src/parquet_thrift.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,12 +708,29 @@ where
/// [compact output]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
pub(crate) struct ThriftCompactOutputProtocol<W: Write> {
writer: W,
write_path_in_schema: bool,
}

impl<W: Write> ThriftCompactOutputProtocol<W> {
/// Create a new `ThriftCompactOutputProtocol` wrapping the byte sink `writer`.
pub(crate) fn new(writer: W) -> Self {
Self { writer }
Self {
writer,
write_path_in_schema: true,
}
}

// TODO(ets): at some point there should probably be a properties object
// to control aspects of thrift output. But since this is the only option to date
// I'm choosing a simpler API.
/// Control the writing of the `path_in_schema` element of the `ColumnMetaData`
pub(crate) fn set_write_path_in_schema(&mut self, val: bool) {
self.write_path_in_schema = val;
}

/// Indicate whether or not to emit `path_in_schema`.
pub(crate) fn write_path_in_schema(&self) -> bool {
self.write_path_in_schema
}

/// Write a single byte to the output stream.
Expand Down
Loading