Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,28 @@ public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema)
*/
public static AvroDeserializationSchema<GenericRecord> forGeneric(
Schema schema, AvroEncoding encoding) {
return new AvroDeserializationSchema<>(GenericRecord.class, schema, encoding);
return forGeneric(null, schema, encoding, null, false);
}

/**
* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided
* schema.
*
* @param actualSchema schema of upstream actual write
* @param schema schema of produced records
* @param encoding Avro serialization approach to use for decoding
* @param actualSchemaString Avro writer's Schema.
* @param fastRead option to enable or disable fastread.
* @return deserialized record in form of {@link GenericRecord}
*/
public static AvroDeserializationSchema<GenericRecord> forGeneric(
Schema actualSchema,
Schema schema,
AvroEncoding encoding,
String actualSchemaString,
boolean fastRead) {
return new AvroDeserializationSchema<>(
GenericRecord.class, schema, actualSchema, encoding, actualSchemaString, fastRead);
}

/**
Expand Down Expand Up @@ -104,9 +125,12 @@ public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecifi
/** Class to deserialize to. */
private final Class<T> recordClazz;

/** Schema in case of GenericRecord for serialization purpose. */
/** Schema in case of GenericRecord for downstream serialization purpose. */
private final String schemaString;

/** Schema in case of GenericRecord for upstream serialization purpose. */
private final String writeSchemaString;

/** Reader that deserializes byte array into a record. */
private transient GenericDatumReader<T> datumReader;

Expand All @@ -119,9 +143,15 @@ public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecifi
/** Avro decoder that decodes data. */
private transient Decoder decoder;

/** Avro schema for the reader. */
/** Avro schema for the downstream reader. */
private transient Schema reader;

/** Avro schema for the upstream writer. */
private transient Schema writer;

/** Avro schema for the upstream writer. */
private final boolean fastRead;

/**
* Creates a Avro deserialization schema.
*
Expand All @@ -133,6 +163,28 @@ public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecifi
*/
AvroDeserializationSchema(
Class<T> recordClazz, @Nullable Schema reader, AvroEncoding encoding) {
this(recordClazz, reader, null, encoding, null, false);
}

/**
* Creates a Avro deserialization schema.
*
* @param recordClazz class to which deserialize. Should be one of: {@link
* org.apache.avro.specific.SpecificRecord}, {@link org.apache.avro.generic.GenericRecord}.
* @param reader reader's Avro schema. Should be provided if recordClazz is {@link
* GenericRecord}
* @param writer writer's Avro schema. Should be provided if user want to column pruning.
* @param encoding encoding approach to use. Identifies the Avro decoder class to use.
* @param writeSchemaString Optional for advanced users manually specify.
* @param fastRead option to enable or disable fastread.
*/
AvroDeserializationSchema(
Class<T> recordClazz,
@Nullable Schema reader,
@Nullable Schema writer,
AvroEncoding encoding,
@Nullable String writeSchemaString,
boolean fastRead) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
this.recordClazz = recordClazz;
this.reader = reader;
Expand All @@ -141,15 +193,24 @@ public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecifi
} else {
this.schemaString = null;
}
this.writer = writer;
if (writeSchemaString != null) {
this.writeSchemaString = writeSchemaString;
} else if (this.writer != null) {
this.writeSchemaString = this.writer.toString();
} else {
this.writeSchemaString = null;
}
this.encoding = encoding;
this.fastRead = fastRead;
}

GenericDatumReader<T> getDatumReader() {
return datumReader;
}

Schema getReaderSchema() {
return reader;
return writer != null ? writer : reader;
}

MutableByteArrayInputStream getInputStream() {
Expand All @@ -172,15 +233,9 @@ public T deserialize(@Nullable byte[] message) throws IOException {
// read record
checkAvroInitialized();
inputStream.setBuffer(message);
Schema readerSchema = getReaderSchema();
GenericDatumReader<T> datumReader = getDatumReader();

datumReader.setSchema(readerSchema);

if (encoding == AvroEncoding.JSON) {
((JsonDecoder) this.decoder).configure(inputStream);
}

return datumReader.read(null, decoder);
}

Expand All @@ -195,12 +250,21 @@ void checkAvroInitialized() throws IOException {
SpecificData specificData =
AvroFactory.getSpecificDataForClass(
(Class<? extends SpecificData>) recordClazz, cl);
specificData.setFastReaderEnabled(fastRead);
this.datumReader = new SpecificDatumReader<>(specificData);
this.reader = AvroFactory.extractAvroSpecificSchema(recordClazz, specificData);
datumReader.setSchema(reader);
} else {
this.reader = new Schema.Parser().parse(schemaString);
GenericData genericData = new GenericData(cl);
this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
genericData.setFastReaderEnabled(fastRead);
this.writer =
new Schema.Parser()
.parse(
writeSchemaString == null || writeSchemaString.isEmpty()
? schemaString
: writeSchemaString);
this.datumReader = new GenericDatumReader<>(this.writer, this.reader, genericData);
}

this.inputStream = new MutableByteArrayInputStream();
Expand Down Expand Up @@ -236,11 +300,23 @@ public boolean equals(Object o) {
return false;
}
AvroDeserializationSchema<?> that = (AvroDeserializationSchema<?>) o;
return recordClazz.equals(that.recordClazz) && Objects.equals(reader, that.reader);
return fastRead == that.fastRead
&& recordClazz.equals(that.recordClazz)
&& Objects.equals(schemaString, that.schemaString)
&& Objects.equals(
getEffectiveWriteSchemaString(), that.getEffectiveWriteSchemaString())
&& encoding == that.encoding;
}

private String getEffectiveWriteSchemaString() {
return (writeSchemaString == null || writeSchemaString.isEmpty())
? schemaString
: writeSchemaString;
}

@Override
public int hashCode() {
return Objects.hash(recordClazz, reader);
return Objects.hash(
recordClazz, schemaString, getEffectiveWriteSchemaString(), encoding, fastRead);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import java.util.Set;

import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_ENCODING;
import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_FAST_READ;
import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_TIMESTAMP_LEGACY_MAPPING;
import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_WRITER_SCHEMA_STRING;

/**
* Table format factory for providing configured instances of Avro to RowData {@link
Expand All @@ -63,6 +65,8 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(

AvroEncoding encoding = formatOptions.get(AVRO_ENCODING);
boolean legacyTimestampMapping = formatOptions.get(AVRO_TIMESTAMP_LEGACY_MAPPING);
boolean fastRead = formatOptions.get(AVRO_FAST_READ);
String writerSchemaString = formatOptions.get(AVRO_WRITER_SCHEMA_STRING);

return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
@Override
Expand All @@ -75,8 +79,18 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(producedDataType);
// producedDataType denotes the schema after column pruning, whereas
// actualRowType
// maintains the upstream table's original structure.
final RowType actualRowType = (RowType) physicalDataType.getLogicalType();
return new AvroRowDataDeserializationSchema(
rowType, rowDataTypeInfo, encoding, legacyTimestampMapping);
actualRowType,
rowType,
rowDataTypeInfo,
encoding,
legacyTimestampMapping,
fastRead,
writerSchemaString);
}

@Override
Expand Down Expand Up @@ -125,6 +139,8 @@ public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(AVRO_ENCODING);
options.add(AVRO_TIMESTAMP_LEGACY_MAPPING);
options.add(AVRO_FAST_READ);
options.add(AVRO_WRITER_SCHEMA_STRING);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,23 @@ public InlineElement getDescription() {
+ "you can obtain the correct mapping by disable using this legacy mapping."
+ " Use legacy behavior by default for compatibility consideration.");

public static final ConfigOption<Boolean> AVRO_FAST_READ =
ConfigOptions.key("fast-read.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Optional for avro fast reader. "
+ "Avro Fastread improves Avro read speeds by constructing a resolution chain. "
+ "get more information about this feature, please visit https://issues.apache.org/jira/browse/AVRO-3230");

public static final ConfigOption<String> AVRO_WRITER_SCHEMA_STRING =
Copy link
Contributor

@davidradl davidradl Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see any test for this option.

In the Confluent Avro format, which will inherit the Avro options, you can specify a schema to use there, also the Confluent schema registry can supply the real schema. I think we should understand and document which options take precedent.

Also I suggest we say that this writer schema needs to be compatible with the table definition and what that means. I am thinking about

  • compatibility between nullable and non nullable fields
  • what it means for pruning nested columns
  • changing to castable types.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad to see your reply, and I will add more unit tests and documentation as suggested.

ConfigOptions.key("writer.schemaString")
.stringType()
.defaultValue(null)
.withDescription(
"Optional for Avro deserialization. This string serves as the reader schema "
+ "for the upstream table. Configuring this allows for effective column pruning, "
+ "as only the fields specified in this schema will be read.");

private AvroFormatOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,34 @@ public AvroRowDataDeserializationSchema(
typeInfo);
}

/**
* Creates an Avro deserialization schema for the given logical type.
*
* @param rowType The logical type used to deserialize the data.
* @param typeInfo The TypeInformation to be used by {@link
* AvroRowDataDeserializationSchema#getProducedType()}.
* @param encoding The serialization approach used to deserialize the data.
* @param legacyTimestampMapping Whether to use legacy timestamp mapping.
*/
public AvroRowDataDeserializationSchema(
RowType actualRowType,
RowType rowType,
TypeInformation<RowData> typeInfo,
AvroEncoding encoding,
boolean legacyTimestampMapping,
boolean fastRead,
String writerSchemaString) {
this(
AvroDeserializationSchema.forGeneric(
AvroSchemaConverter.convertToSchema(actualRowType, legacyTimestampMapping),
AvroSchemaConverter.convertToSchema(rowType, legacyTimestampMapping),
encoding,
writerSchemaString,
fastRead),
AvroToRowDataConverters.createRowConverter(rowType, legacyTimestampMapping),
typeInfo);
}

/**
* Creates a Avro deserialization schema for the given logical type.
*
Expand Down
Loading