From 47f1d384b2eccf8f6097766c387f5fd5dabd06cb Mon Sep 17 00:00:00 2001 From: xiaolong Date: Thu, 5 Feb 2026 20:01:08 +0800 Subject: [PATCH 1/3] [FLINK-39035][format][avro] Support Avro fast-read and column pruning in AvroDeserializationSchema via configuration --- .../avro/AvroDeserializationSchema.java | 81 ++++++- .../flink/formats/avro/AvroFormatFactory.java | 18 +- .../flink/formats/avro/AvroFormatOptions.java | 18 ++ .../AvroRowDataDeserializationSchema.java | 28 +++ .../avro/AvroDeserializationSchemaTest.java | 174 ++++++++++++++ .../AvroRowDataDeSerializationSchemaTest.java | 225 ++++++++++++++++++ 6 files changed, 533 insertions(+), 11 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java index 226f47a339b21..f30254437656f 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java @@ -71,7 +71,26 @@ public static AvroDeserializationSchema forGeneric(Schema schema) */ public static AvroDeserializationSchema forGeneric( Schema schema, AvroEncoding encoding) { - return new AvroDeserializationSchema<>(GenericRecord.class, schema, encoding); + return forGeneric(null, schema, encoding, null, false); + } + + /** + * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided + * schema. + * + * @param actualSchema schema of upstream actual write + * @param schema schema of produced records + * @param encoding Avro serialization approach to use for decoding + * @return deserialized record in form of {@link GenericRecord} + */ + public static AvroDeserializationSchema forGeneric( + Schema actualSchema, + Schema schema, + AvroEncoding encoding, + String actualSchemaString, + boolean fastRead) { + return new AvroDeserializationSchema<>( + GenericRecord.class, schema, actualSchema, encoding, actualSchemaString, fastRead); } /** @@ -104,9 +123,12 @@ public static AvroDeserializationSchema forSpecifi /** Class to deserialize to. */ private final Class recordClazz; - /** Schema in case of GenericRecord for serialization purpose. */ + /** Schema in case of GenericRecord for downstream serialization purpose. */ private final String schemaString; + /** Schema in case of GenericRecord for upstream serialization purpose. */ + private final String writeSchemaString; + /** Reader that deserializes byte array into a record. */ private transient GenericDatumReader datumReader; @@ -119,9 +141,15 @@ public static AvroDeserializationSchema forSpecifi /** Avro decoder that decodes data. */ private transient Decoder decoder; - /** Avro schema for the reader. */ + /** Avro schema for the downstream reader. */ private transient Schema reader; + /** Avro schema for the upstream writer. */ + private transient Schema writer; + + /** Avro schema for the upstream writer. */ + private final boolean fastRead; + /** * Creates a Avro deserialization schema. * @@ -133,6 +161,27 @@ public static AvroDeserializationSchema forSpecifi */ AvroDeserializationSchema( Class recordClazz, @Nullable Schema reader, AvroEncoding encoding) { + this(recordClazz, reader, null, encoding, null, false); + } + + /** + * Creates a Avro deserialization schema. + * + * @param recordClazz class to which deserialize. Should be one of: {@link + * org.apache.avro.specific.SpecificRecord}, {@link org.apache.avro.generic.GenericRecord}. + * @param reader reader's Avro schema. Should be provided if recordClazz is {@link + * GenericRecord} + * @param writer writer's Avro schema. Should be provided if user want to column pruning. + * @param encoding encoding approach to use. Identifies the Avro decoder class to use. + * @param writeSchemaString Optional for advanced users manually specify. + */ + AvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + @Nullable Schema writer, + AvroEncoding encoding, + @Nullable String writeSchemaString, + boolean fastRead) { Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); this.recordClazz = recordClazz; this.reader = reader; @@ -141,7 +190,16 @@ public static AvroDeserializationSchema forSpecifi } else { this.schemaString = null; } + this.writer = writer; + if (writeSchemaString != null) { + this.writeSchemaString = writeSchemaString; + } else if (this.writer != null) { + this.writeSchemaString = this.writer.toString(); + } else { + this.writeSchemaString = null; + } this.encoding = encoding; + this.fastRead = fastRead; } GenericDatumReader getDatumReader() { @@ -172,15 +230,9 @@ public T deserialize(@Nullable byte[] message) throws IOException { // read record checkAvroInitialized(); inputStream.setBuffer(message); - Schema readerSchema = getReaderSchema(); - GenericDatumReader datumReader = getDatumReader(); - - datumReader.setSchema(readerSchema); - if (encoding == AvroEncoding.JSON) { ((JsonDecoder) this.decoder).configure(inputStream); } - return datumReader.read(null, decoder); } @@ -195,12 +247,21 @@ void checkAvroInitialized() throws IOException { SpecificData specificData = AvroFactory.getSpecificDataForClass( (Class) recordClazz, cl); + specificData.setFastReaderEnabled(fastRead); this.datumReader = new SpecificDatumReader<>(specificData); this.reader = AvroFactory.extractAvroSpecificSchema(recordClazz, specificData); + datumReader.setSchema(reader); } else { this.reader = new Schema.Parser().parse(schemaString); GenericData genericData = new GenericData(cl); - this.datumReader = new GenericDatumReader<>(null, this.reader, genericData); + genericData.setFastReaderEnabled(fastRead); + this.writer = + new Schema.Parser() + .parse( + writeSchemaString == null || writeSchemaString.isEmpty() + ? schemaString + : writeSchemaString); + this.datumReader = new GenericDatumReader<>(this.writer, this.reader, genericData); } this.inputStream = new MutableByteArrayInputStream(); diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java index eda1cc1a898ff..a567c12af031d 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java @@ -45,7 +45,9 @@ import java.util.Set; import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_ENCODING; +import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_FAST_READ; import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_TIMESTAMP_LEGACY_MAPPING; +import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_WRITER_SCHEMA_STRING; /** * Table format factory for providing configured instances of Avro to RowData {@link @@ -63,6 +65,8 @@ public DecodingFormat> createDecodingFormat( AvroEncoding encoding = formatOptions.get(AVRO_ENCODING); boolean legacyTimestampMapping = formatOptions.get(AVRO_TIMESTAMP_LEGACY_MAPPING); + boolean fastRead = formatOptions.get(AVRO_FAST_READ); + String writerSchemaString = formatOptions.get(AVRO_WRITER_SCHEMA_STRING); return new ProjectableDecodingFormat>() { @Override @@ -75,8 +79,18 @@ public DeserializationSchema createRuntimeDecoder( final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation rowDataTypeInfo = context.createTypeInformation(producedDataType); + // producedDataType denotes the schema after column pruning, whereas + // actualRowType + // maintains the upstream table's original structure. + final RowType actualRowType = (RowType) physicalDataType.getLogicalType(); return new AvroRowDataDeserializationSchema( - rowType, rowDataTypeInfo, encoding, legacyTimestampMapping); + actualRowType, + rowType, + rowDataTypeInfo, + encoding, + legacyTimestampMapping, + fastRead, + writerSchemaString); } @Override @@ -125,6 +139,8 @@ public Set> optionalOptions() { Set> options = new HashSet<>(); options.add(AVRO_ENCODING); options.add(AVRO_TIMESTAMP_LEGACY_MAPPING); + options.add(AVRO_FAST_READ); + options.add(AVRO_WRITER_SCHEMA_STRING); return options; } } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java index 6a9d81ea19564..3ecde0c6f147d 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java @@ -83,5 +83,23 @@ public InlineElement getDescription() { + "you can obtain the correct mapping by disable using this legacy mapping." + " Use legacy behavior by default for compatibility consideration."); + public static final ConfigOption AVRO_FAST_READ = + ConfigOptions.key("avro.fast-read.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional for avro fast reader. " + + "Avro Fastread improves Avro read speeds by constructing a resolution chain. " + + "get more information about this feature, please visit https://issues.apache.org/jira/browse/AVRO-3230"); + + public static final ConfigOption AVRO_WRITER_SCHEMA_STRING = + ConfigOptions.key("avro.writer.schemaString") + .stringType() + .defaultValue(null) + .withDescription( + "Optional for Avro deserialization. This string serves as the reader schema " + + "for the upstream table. Configuring this allows for effective column pruning, " + + "as only the fields specified in this schema will be read."); + private AvroFormatOptions() {} } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java index 799eb90b98f0d..d95632a7a0229 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java @@ -108,6 +108,34 @@ public AvroRowDataDeserializationSchema( typeInfo); } + /** + * Creates an Avro deserialization schema for the given logical type. + * + * @param rowType The logical type used to deserialize the data. + * @param typeInfo The TypeInformation to be used by {@link + * AvroRowDataDeserializationSchema#getProducedType()}. + * @param encoding The serialization approach used to deserialize the data. + * @param legacyTimestampMapping Whether to use legacy timestamp mapping. + */ + public AvroRowDataDeserializationSchema( + RowType actualRowType, + RowType rowType, + TypeInformation typeInfo, + AvroEncoding encoding, + boolean legacyTimestampMapping, + boolean fastRead, + String writerSchemaString) { + this( + AvroDeserializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(actualRowType, legacyTimestampMapping), + AvroSchemaConverter.convertToSchema(rowType, legacyTimestampMapping), + encoding, + writerSchemaString, + fastRead), + AvroToRowDataConverters.createRowConverter(rowType, legacyTimestampMapping), + typeInfo); + } + /** * Creates a Avro deserialization schema for the given logical type. * diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java index fd0d05ffa4bc9..c96063dcb5fe5 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java @@ -24,13 +24,21 @@ import org.apache.flink.formats.avro.generated.UnionLogicalType; import org.apache.flink.formats.avro.utils.TestDataGenerator; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.Encoder; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import java.io.ByteArrayOutputStream; import java.time.Instant; import java.util.Random; +import static org.apache.flink.formats.avro.utils.AvroTestUtils.createEncoder; import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord; import static org.assertj.core.api.Assertions.assertThat; @@ -85,4 +93,170 @@ void testSpecificRecordWithUnionLogicalType(AvroEncoding encoding) throws Except UnionLogicalType deserializedData = deserializer.deserialize(encodedData); assertThat(deserializedData).isEqualTo(data); } + + @ParameterizedTest + @EnumSource(AvroEncoding.class) + void testFastReadWithGenericRecord(AvroEncoding encoding) throws Exception { + // Create a schema with multiple fields + Schema schema = + SchemaBuilder.record("TestRecord") + .namespace("org.apache.flink.formats.avro.test") + .fields() + .requiredString("name") + .requiredInt("age") + .requiredDouble("score") + .endRecord(); + + // Create a GenericRecord with test data + GenericRecord record = new GenericData.Record(schema); + record.put("name", "Alice"); + record.put("age", 30); + record.put("score", 95.5); + + // Serialize the record + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + GenericDatumWriter datumWriter = new GenericDatumWriter<>(schema); + Encoder encoder = createEncoder(encoding, schema, outputStream); + datumWriter.write(record, encoder); + encoder.flush(); + byte[] encodedData = outputStream.toByteArray(); + + // Create deserializer with fast read enabled + DeserializationSchema deserializer = + AvroDeserializationSchema.forGeneric(null, schema, encoding, null, true); + + // Deserialize and verify + GenericRecord deserializedRecord = deserializer.deserialize(encodedData); + assertThat(deserializedRecord).isNotNull(); + assertThat(deserializedRecord.get("name").toString()).isEqualTo("Alice"); + assertThat(deserializedRecord.get("age")).isEqualTo(30); + assertThat(deserializedRecord.get("score")).isEqualTo(95.5); + } + + @ParameterizedTest + @EnumSource(AvroEncoding.class) + void testFastReadWithSpecificRecord(AvroEncoding encoding) throws Exception { + // Use the existing Address test data + Address address = TestDataGenerator.generateRandomAddress(new Random(42)); + + // Serialize the address + byte[] encodedAddress = writeRecord(address, encoding); + + // Create deserializer with fast read enabled + DeserializationSchema
deserializer = + new AvroDeserializationSchema<>(Address.class, null, null, encoding, null, true); + + // Deserialize and verify + Address deserializedAddress = deserializer.deserialize(encodedAddress); + assertThat(deserializedAddress).isEqualTo(address); + } + + @ParameterizedTest + @EnumSource(AvroEncoding.class) + void testColumnPruningWithGenericRecord(AvroEncoding encoding) throws Exception { + // Create a full schema with 5 fields + Schema fullSchema = + SchemaBuilder.record("FullRecord") + .namespace("org.apache.flink.formats.avro.test") + .fields() + .requiredLong("id") + .requiredString("name") + .requiredInt("age") + .requiredString("email") + .requiredDouble("score") + .endRecord(); + + // Create a projected schema with only 3 fields (id, name, score) + Schema projectedSchema = + SchemaBuilder.record("ProjectedRecord") + .namespace("org.apache.flink.formats.avro.test") + .fields() + .requiredLong("id") + .requiredString("name") + .requiredDouble("score") + .endRecord(); + + // Create a GenericRecord with all 5 fields + GenericRecord fullRecord = new GenericData.Record(fullSchema); + fullRecord.put("id", 123L); + fullRecord.put("name", "Bob"); + fullRecord.put("age", 25); + fullRecord.put("email", "bob@example.com"); + fullRecord.put("score", 88.5); + + // Serialize the full record + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + GenericDatumWriter datumWriter = new GenericDatumWriter<>(fullSchema); + Encoder encoder = createEncoder(encoding, fullSchema, outputStream); + datumWriter.write(fullRecord, encoder); + encoder.flush(); + byte[] encodedData = outputStream.toByteArray(); + + // Create deserializer with column pruning (writer schema = full, reader schema + // = projected) + DeserializationSchema deserializer = + AvroDeserializationSchema.forGeneric( + fullSchema, projectedSchema, encoding, null, false); + + // Deserialize and verify only projected fields are present + GenericRecord deserializedRecord = deserializer.deserialize(encodedData); + assertThat(deserializedRecord).isNotNull(); + assertThat(deserializedRecord.get("id")).isEqualTo(123L); + assertThat(deserializedRecord.get("name").toString()).isEqualTo("Bob"); + assertThat(deserializedRecord.get("score")).isEqualTo(88.5); + + // Verify that the projected schema only has 3 fields + assertThat(deserializedRecord.getSchema().getFields()).hasSize(3); + } + + @ParameterizedTest + @EnumSource(AvroEncoding.class) + void testFastReadAndColumnPruningCombined(AvroEncoding encoding) throws Exception { + // Create a full schema with 4 fields + Schema fullSchema = + SchemaBuilder.record("FullRecord") + .namespace("org.apache.flink.formats.avro.test") + .fields() + .requiredLong("id") + .requiredString("name") + .requiredInt("age") + .requiredDouble("score") + .endRecord(); + + // Create a projected schema with only 2 fields (id, score) + Schema projectedSchema = + SchemaBuilder.record("ProjectedRecord") + .namespace("org.apache.flink.formats.avro.test") + .fields() + .requiredLong("id") + .requiredDouble("score") + .endRecord(); + + // Create a GenericRecord with all 4 fields + GenericRecord fullRecord = new GenericData.Record(fullSchema); + fullRecord.put("id", 789L); + fullRecord.put("name", "David"); + fullRecord.put("age", 35); + fullRecord.put("score", 87.3); + + // Serialize the full record + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + GenericDatumWriter datumWriter = new GenericDatumWriter<>(fullSchema); + Encoder encoder = createEncoder(encoding, fullSchema, outputStream); + datumWriter.write(fullRecord, encoder); + encoder.flush(); + byte[] encodedData = outputStream.toByteArray(); + + // Create deserializer with both fast read and column pruning enabled + DeserializationSchema deserializer = + AvroDeserializationSchema.forGeneric( + fullSchema, projectedSchema, encoding, null, true); + + // Deserialize and verify + GenericRecord deserializedRecord = deserializer.deserialize(encodedData); + assertThat(deserializedRecord).isNotNull(); + assertThat(deserializedRecord.getSchema().getFields()).hasSize(2); + assertThat(deserializedRecord.get("id")).isEqualTo(789L); + assertThat(deserializedRecord.get("score")).isEqualTo(87.3); + } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java index 84f9994f6665c..38a1d3cfc8e77 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java @@ -425,4 +425,229 @@ private AvroRowDataDeserializationSchema createDeserializationSchema( deserializationSchema.open(null); return deserializationSchema; } + + private AvroRowDataDeserializationSchema createDeserializationSchemaWithFastRead( + DataType dataType, AvroEncoding encoding, boolean legacyTimestampMapping) + throws Exception { + final RowType rowType = (RowType) dataType.getLogicalType(); + final TypeInformation typeInfo = InternalTypeInfo.of(rowType); + + AvroRowDataDeserializationSchema deserializationSchema = + new AvroRowDataDeserializationSchema( + rowType, rowType, typeInfo, encoding, legacyTimestampMapping, true, null); + deserializationSchema.open(null); + return deserializationSchema; + } + + private AvroRowDataDeserializationSchema createDeserializationSchemaWithColumnPruning( + RowType actualRowType, + RowType projectedRowType, + AvroEncoding encoding, + boolean legacyTimestampMapping) + throws Exception { + final TypeInformation typeInfo = InternalTypeInfo.of(projectedRowType); + + AvroRowDataDeserializationSchema deserializationSchema = + new AvroRowDataDeserializationSchema( + actualRowType, + projectedRowType, + typeInfo, + encoding, + legacyTimestampMapping, + false, + null); + deserializationSchema.open(null); + return deserializationSchema; + } + + @ParameterizedTest + @EnumSource(AvroEncoding.class) + void testFastReadEnabled(AvroEncoding encoding) throws Exception { + final DataType dataType = + ROW(FIELD("name", STRING()), FIELD("age", INT()), FIELD("score", DOUBLE())) + .notNull(); + final RowType rowType = (RowType) dataType.getLogicalType(); + + final Schema schema = AvroSchemaConverter.convertToSchema(rowType); + final GenericRecord record = new GenericData.Record(schema); + record.put(0, "Alice"); + record.put(1, 30); + record.put(2, 95.5); + + AvroRowDataDeserializationSchema deserializationSchema = + createDeserializationSchemaWithFastRead(dataType, encoding, true); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GenericDatumWriter datumWriter = new GenericDatumWriter<>(schema); + Encoder encoder = createEncoder(encoding, schema, byteArrayOutputStream); + datumWriter.write(record, encoder); + encoder.flush(); + byte[] input = byteArrayOutputStream.toByteArray(); + + RowData rowData = deserializationSchema.deserialize(input); + + assertThat(rowData).isNotNull(); + assertThat(rowData.getString(0).toString()).isEqualTo("Alice"); + assertThat(rowData.getInt(1)).isEqualTo(30); + assertThat(rowData.getDouble(2)).isEqualTo(95.5); + } + + @ParameterizedTest + @EnumSource(AvroEncoding.class) + void testColumnPruning(AvroEncoding encoding) throws Exception { + // Create a full schema with 5 fields + final DataType fullDataType = + ROW( + FIELD("id", BIGINT()), + FIELD("name", STRING()), + FIELD("age", INT()), + FIELD("email", STRING()), + FIELD("score", DOUBLE())) + .notNull(); + final RowType fullRowType = (RowType) fullDataType.getLogicalType(); + + // Create a projected schema with only 3 fields (id, name, score) + final DataType projectedDataType = + ROW(FIELD("id", BIGINT()), FIELD("name", STRING()), FIELD("score", DOUBLE())) + .notNull(); + final RowType projectedRowType = (RowType) projectedDataType.getLogicalType(); + + final Schema fullSchema = AvroSchemaConverter.convertToSchema(fullRowType); + final GenericRecord record = new GenericData.Record(fullSchema); + record.put(0, 123L); + record.put(1, "Bob"); + record.put(2, 25); + record.put(3, "bob@example.com"); + record.put(4, 88.5); + + AvroRowDataDeserializationSchema deserializationSchema = + createDeserializationSchemaWithColumnPruning( + fullRowType, projectedRowType, encoding, true); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GenericDatumWriter datumWriter = new GenericDatumWriter<>(fullSchema); + Encoder encoder = createEncoder(encoding, fullSchema, byteArrayOutputStream); + datumWriter.write(record, encoder); + encoder.flush(); + byte[] input = byteArrayOutputStream.toByteArray(); + + RowData rowData = deserializationSchema.deserialize(input); + + // Verify only the projected fields are present + assertThat(rowData).isNotNull(); + assertThat(rowData.getArity()).isEqualTo(3); + assertThat(rowData.getLong(0)).isEqualTo(123L); + assertThat(rowData.getString(1).toString()).isEqualTo("Bob"); + assertThat(rowData.getDouble(2)).isEqualTo(88.5); + } + + @ParameterizedTest + @EnumSource(AvroEncoding.class) + void testColumnPruningWithNestedTypes(AvroEncoding encoding) throws Exception { + // Create a full schema with nested types + final DataType fullDataType = + ROW( + FIELD("id", BIGINT()), + FIELD("name", STRING()), + FIELD("tags", ARRAY(STRING())), + FIELD("metadata", MAP(STRING(), STRING())), + FIELD("score", DOUBLE())) + .notNull(); + final RowType fullRowType = (RowType) fullDataType.getLogicalType(); + + // Project only id, tags, and score + final DataType projectedDataType = + ROW(FIELD("id", BIGINT()), FIELD("tags", ARRAY(STRING())), FIELD("score", DOUBLE())) + .notNull(); + final RowType projectedRowType = (RowType) projectedDataType.getLogicalType(); + + final Schema fullSchema = AvroSchemaConverter.convertToSchema(fullRowType); + final GenericRecord record = new GenericData.Record(fullSchema); + record.put(0, 456L); + record.put(1, "Charlie"); + record.put(2, Arrays.asList("tag1", "tag2", "tag3")); + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + record.put(3, metadata); + record.put(4, 92.0); + + AvroRowDataDeserializationSchema deserializationSchema = + createDeserializationSchemaWithColumnPruning( + fullRowType, projectedRowType, encoding, true); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GenericDatumWriter datumWriter = new GenericDatumWriter<>(fullSchema); + Encoder encoder = createEncoder(encoding, fullSchema, byteArrayOutputStream); + datumWriter.write(record, encoder); + encoder.flush(); + byte[] input = byteArrayOutputStream.toByteArray(); + + RowData rowData = deserializationSchema.deserialize(input); + + assertThat(rowData).isNotNull(); + assertThat(rowData.getArity()).isEqualTo(3); + assertThat(rowData.getLong(0)).isEqualTo(456L); + assertThat(rowData.getArray(1).size()).isEqualTo(3); + assertThat(rowData.getDouble(2)).isEqualTo(92.0); + } + + @ParameterizedTest + @EnumSource(AvroEncoding.class) + void testFastReadWithColumnPruning(AvroEncoding encoding) throws Exception { + // Test both features together + final DataType fullDataType = + ROW( + FIELD("id", BIGINT()), + FIELD("name", STRING()), + FIELD("age", INT()), + FIELD("score", DOUBLE())) + .notNull(); + final RowType fullRowType = (RowType) fullDataType.getLogicalType(); + + final DataType projectedDataType = + ROW(FIELD("id", BIGINT()), FIELD("score", DOUBLE())).notNull(); + final RowType projectedRowType = (RowType) projectedDataType.getLogicalType(); + + final Schema fullSchema = AvroSchemaConverter.convertToSchema(fullRowType); + final GenericRecord record = new GenericData.Record(fullSchema); + record.put(0, 789L); + record.put(1, "David"); + record.put(2, 35); + record.put(3, 87.3); + + final TypeInformation typeInfo = InternalTypeInfo.of(projectedRowType); + AvroRowDataDeserializationSchema deserializationSchema = + new AvroRowDataDeserializationSchema( + fullRowType, projectedRowType, typeInfo, encoding, true, true, null); + deserializationSchema.open(null); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GenericDatumWriter datumWriter = new GenericDatumWriter<>(fullSchema); + Encoder encoder = createEncoder(encoding, fullSchema, byteArrayOutputStream); + datumWriter.write(record, encoder); + encoder.flush(); + byte[] input = byteArrayOutputStream.toByteArray(); + + RowData rowData = deserializationSchema.deserialize(input); + + assertThat(rowData).isNotNull(); + assertThat(rowData.getArity()).isEqualTo(2); + assertThat(rowData.getLong(0)).isEqualTo(789L); + assertThat(rowData.getDouble(1)).isEqualTo(87.3); + } + + @Test + void testSerializability() throws Exception { + final DataType dataType = ROW(FIELD("name", STRING()), FIELD("age", INT())).notNull(); + final RowType rowType = (RowType) dataType.getLogicalType(); + final TypeInformation typeInfo = InternalTypeInfo.of(rowType); + + AvroRowDataDeserializationSchema original = + new AvroRowDataDeserializationSchema( + rowType, rowType, typeInfo, AvroEncoding.BINARY, true, true, null); + + AvroRowDataDeserializationSchema deserialized = InstantiationUtil.clone(original); + + assertThat(deserialized).isNotNull(); + } } From 7d1074673d020b7e3b9f0370a37cca3ba26ee5ae Mon Sep 17 00:00:00 2001 From: xiaolong Date: Mon, 9 Feb 2026 15:49:40 +0800 Subject: [PATCH 2/3] [FLINK-39035][format][avro] add test for avro.writer.schemaString --- .../avro/AvroDeserializationSchema.java | 42 +++++++++----- .../flink/formats/avro/AvroFormatOptions.java | 4 +- .../avro/AvroDeserializationSchemaTest.java | 56 +++++++++++++++++++ .../formats/avro/AvroFormatFactoryTest.java | 36 ++++++++++++ .../AvroRowDataDeSerializationSchemaTest.java | 48 ++++++++++++++++ 5 files changed, 170 insertions(+), 16 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java index f30254437656f..2504228efe7df 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java @@ -18,15 +18,6 @@ package org.apache.flink.formats.avro; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding; -import org.apache.flink.formats.avro.typeutils.AvroFactory; -import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; -import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; -import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; -import org.apache.flink.util.Preconditions; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -37,12 +28,20 @@ import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; - -import javax.annotation.Nullable; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding; +import org.apache.flink.formats.avro.typeutils.AvroFactory; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.Objects; +import javax.annotation.Nullable; + /** * Deserialization schema that deserializes from Avro binary format. * @@ -81,6 +80,8 @@ public static AvroDeserializationSchema forGeneric( * @param actualSchema schema of upstream actual write * @param schema schema of produced records * @param encoding Avro serialization approach to use for decoding + * @param actualSchemaString Avro writer's Schema. + * @param fastRead option to enable or disable fastread. * @return deserialized record in form of {@link GenericRecord} */ public static AvroDeserializationSchema forGeneric( @@ -174,6 +175,7 @@ public static AvroDeserializationSchema forSpecifi * @param writer writer's Avro schema. Should be provided if user want to column pruning. * @param encoding encoding approach to use. Identifies the Avro decoder class to use. * @param writeSchemaString Optional for advanced users manually specify. + * @param fastRead option to enable or disable fastread. */ AvroDeserializationSchema( Class recordClazz, @@ -207,7 +209,7 @@ GenericDatumReader getDatumReader() { } Schema getReaderSchema() { - return reader; + return writer != null ? writer : reader; } MutableByteArrayInputStream getInputStream() { @@ -297,11 +299,23 @@ public boolean equals(Object o) { return false; } AvroDeserializationSchema that = (AvroDeserializationSchema) o; - return recordClazz.equals(that.recordClazz) && Objects.equals(reader, that.reader); + return fastRead == that.fastRead + && recordClazz.equals(that.recordClazz) + && Objects.equals(schemaString, that.schemaString) + && Objects.equals( + getEffectiveWriteSchemaString(), that.getEffectiveWriteSchemaString()) + && encoding == that.encoding; + } + + private String getEffectiveWriteSchemaString() { + return (writeSchemaString == null || writeSchemaString.isEmpty()) + ? schemaString + : writeSchemaString; } @Override public int hashCode() { - return Objects.hash(recordClazz, reader); + return Objects.hash( + recordClazz, schemaString, getEffectiveWriteSchemaString(), encoding, fastRead); } } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java index 3ecde0c6f147d..662e0b7aed26c 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java @@ -84,7 +84,7 @@ public InlineElement getDescription() { + " Use legacy behavior by default for compatibility consideration."); public static final ConfigOption AVRO_FAST_READ = - ConfigOptions.key("avro.fast-read.enabled") + ConfigOptions.key("fast-read.enabled") .booleanType() .defaultValue(false) .withDescription( @@ -93,7 +93,7 @@ public InlineElement getDescription() { + "get more information about this feature, please visit https://issues.apache.org/jira/browse/AVRO-3230"); public static final ConfigOption AVRO_WRITER_SCHEMA_STRING = - ConfigOptions.key("avro.writer.schemaString") + ConfigOptions.key("writer.schemaString") .stringType() .defaultValue(null) .withDescription( diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java index c96063dcb5fe5..bac14e9901ec6 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java @@ -259,4 +259,60 @@ void testFastReadAndColumnPruningCombined(AvroEncoding encoding) throws Exceptio assertThat(deserializedRecord.get("id")).isEqualTo(789L); assertThat(deserializedRecord.get("score")).isEqualTo(87.3); } + + @ParameterizedTest + @EnumSource(AvroEncoding.class) + void testSchemaEvolution(AvroEncoding encoding) throws Exception { + // Writer schema (older version) + Schema writerSchema = + SchemaBuilder.record("EvolutionRecord") + .namespace("org.apache.flink.formats.avro.test") + .fields() + .requiredLong("id") + .requiredString("name") + .endRecord(); + + // Reader schema (newer version with reordered fields and a new field with default) + Schema readerSchema = + SchemaBuilder.record("EvolutionRecord") + .namespace("org.apache.flink.formats.avro.test") + .fields() + .name("name") + .type() + .stringType() + .noDefault() + .name("new_field") + .type() + .stringType() + .stringDefault("default_val") + .name("id") + .type() + .longType() + .noDefault() + .endRecord(); + + GenericRecord writerRecord = new GenericData.Record(writerSchema); + writerRecord.put("id", 101L); + writerRecord.put("name", "Eve"); + + // Serialize with writer schema + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + GenericDatumWriter datumWriter = new GenericDatumWriter<>(writerSchema); + Encoder encoder = createEncoder(encoding, writerSchema, outputStream); + datumWriter.write(writerRecord, encoder); + encoder.flush(); + byte[] encodedData = outputStream.toByteArray(); + + // Create deserializer with writer and reader schema + DeserializationSchema deserializer = + AvroDeserializationSchema.forGeneric( + writerSchema, readerSchema, encoding, null, false); + + // Deserialize and verify + GenericRecord deserializedRecord = deserializer.deserialize(encodedData); + assertThat(deserializedRecord).isNotNull(); + assertThat(deserializedRecord.get("name").toString()).isEqualTo("Eve"); + assertThat(deserializedRecord.get("id")).isEqualTo(101L); + assertThat(deserializedRecord.get("new_field").toString()).isEqualTo("default_val"); + } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java index 0b9d572a13b07..565de8092c7c9 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java @@ -33,6 +33,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; +import org.apache.avro.SchemaBuilder; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -78,6 +79,7 @@ void testSeDeSchema(AvroEncoding encoding) { ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), encoding); final Map options = getAllOptions(true); + options.put("avro.encoding", encoding.toString()); final DynamicTableSource actualSource = FactoryMocks.createTableSource(SCHEMA, options); assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class); @@ -104,6 +106,40 @@ void testSeDeSchema(AvroEncoding encoding) { assertThat(actualSer).isEqualTo(expectedSer); } + @Test + void testFastReadAndWriterSchemaOptions() { + final Map options = getAllOptions(true); + options.put("avro.fast-read.enabled", "true"); + final String writerSchema = + SchemaBuilder.record("Writer") + .fields() + .requiredInt("a") + .requiredInt("b") + .endRecord() + .toString(); + options.put("avro.writer.schemaString", writerSchema); + + final DynamicTableSource actualSource = FactoryMocks.createTableSource(SCHEMA, options); + TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock = + (TestDynamicTableFactory.DynamicTableSourceMock) actualSource; + + DeserializationSchema actualDeser = + scanSourceMock.valueFormat.createRuntimeDecoder( + ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType()); + + final AvroRowDataDeserializationSchema expectedDeser = + new AvroRowDataDeserializationSchema( + ROW_TYPE, + ROW_TYPE, + InternalTypeInfo.of(ROW_TYPE), + AvroEncoding.BINARY, + true, + true, + writerSchema); + + assertThat(actualDeser).isEqualTo(expectedDeser); + } + @Test void testOldSeDeNewSchema() { assertThatThrownBy( diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java index 38a1d3cfc8e77..71d25fa379b39 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java @@ -636,6 +636,54 @@ void testFastReadWithColumnPruning(AvroEncoding encoding) throws Exception { assertThat(rowData.getDouble(1)).isEqualTo(87.3); } + @ParameterizedTest + @EnumSource(AvroEncoding.class) + void testWriterSchemaString(AvroEncoding encoding) throws Exception { + // Full schema with 3 fields + final DataType fullDataType = + ROW(FIELD("id", BIGINT()), FIELD("name", STRING()), FIELD("score", DOUBLE())) + .notNull(); + final RowType fullRowType = (RowType) fullDataType.getLogicalType(); + final Schema fullSchema = AvroSchemaConverter.convertToSchema(fullRowType); + + // Projected schema with 2 fields (id, score) + final DataType projectedDataType = + ROW(FIELD("id", BIGINT()), FIELD("score", DOUBLE())).notNull(); + final RowType projectedRowType = (RowType) projectedDataType.getLogicalType(); + + final GenericRecord record = new GenericData.Record(fullSchema); + record.put(0, 101L); + record.put(1, "Eve"); + record.put(2, 99.0); + + final TypeInformation typeInfo = InternalTypeInfo.of(projectedRowType); + // Use fullSchema.toString() as writerSchemaString + AvroRowDataDeserializationSchema deserializationSchema = + new AvroRowDataDeserializationSchema( + fullRowType, + projectedRowType, + typeInfo, + encoding, + true, + false, + fullSchema.toString()); + deserializationSchema.open(null); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GenericDatumWriter datumWriter = new GenericDatumWriter<>(fullSchema); + Encoder encoder = createEncoder(encoding, fullSchema, byteArrayOutputStream); + datumWriter.write(record, encoder); + encoder.flush(); + byte[] input = byteArrayOutputStream.toByteArray(); + + RowData rowData = deserializationSchema.deserialize(input); + + assertThat(rowData).isNotNull(); + assertThat(rowData.getArity()).isEqualTo(2); + assertThat(rowData.getLong(0)).isEqualTo(101L); + assertThat(rowData.getDouble(1)).isEqualTo(99.0); + } + @Test void testSerializability() throws Exception { final DataType dataType = ROW(FIELD("name", STRING()), FIELD("age", INT())).notNull(); From 241507ec5389469a8a2ccfdd4417a1107e79d811 Mon Sep 17 00:00:00 2001 From: xiaolong Date: Mon, 9 Feb 2026 15:51:57 +0800 Subject: [PATCH 3/3] [FLINK-39035][format][avro] format --- .../avro/AvroDeserializationSchema.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java index 2504228efe7df..1ca96e9a475dd 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java @@ -18,6 +18,15 @@ package org.apache.flink.formats.avro; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding; +import org.apache.flink.formats.avro.typeutils.AvroFactory; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -28,20 +37,12 @@ import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding; -import org.apache.flink.formats.avro.typeutils.AvroFactory; -import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; -import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; -import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; -import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; import java.io.IOException; import java.util.Objects; -import javax.annotation.Nullable; - /** * Deserialization schema that deserializes from Avro binary format. *