From 317568c997e0e98a4cdc5f2bcd398366e696ad0b Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Thu, 16 Apr 2026 23:12:40 -0700 Subject: [PATCH 1/3] Arrow reader: reject unsigned Parquet integer columns with clear error The vectorized Arrow reader was silently reading unsigned Parquet integer columns (uint8, uint16, uint32, uint64) as signed, producing incorrect values for any value exceeding the signed maximum for that bit width. Since Iceberg has no unsigned integer type, throw UnsupportedOperationException when the Arrow reader encounters an unsigned integer logical type annotation, consistent with how the schema conversion layer already rejects uint64. Fixes #14547 --- .../vectorized/VectorizedArrowReader.java | 8 +++ .../arrow/vectorized/TestArrowReader.java | 64 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 2cc7cde4541a..3f9c3504ca45 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -584,6 +584,14 @@ public Optional visit( @Override public Optional visit( LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { + if (!intLogicalType.isSigned()) { + throw new UnsupportedOperationException( + String.format( + java.util.Locale.ROOT, + "Cannot read unsigned integer column '%s' (uint%d): " + + "Iceberg does not support unsigned integer types", + primitive.getName(), intLogicalType.getBitWidth())); + } FieldVector vector = arrowField.createVector(rootAlloc); int bitWidth = intLogicalType.getBitWidth(); diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java index 34e83de15207..212f2dd835a3 100644 --- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.Files.localInput; import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -383,6 +384,69 @@ public void testTimestampMillisAreReadCorrectly() throws Exception { assertThat(totalRowsRead).as("Should read all rows").isEqualTo(millisValues.size()); } + @Test + public void testUnsignedIntegerColumnThrowsException() throws Exception { + tables = new HadoopTables(); + + for (int[] spec : new int[][] {{8, 32}, {16, 32}, {32, 32}, {64, 64}}) { + int unsignedBitWidth = spec[0]; + int physicalBitWidth = spec[1]; + PrimitiveType.PrimitiveTypeName physicalType = + physicalBitWidth == 32 + ? PrimitiveType.PrimitiveTypeName.INT32 + : PrimitiveType.PrimitiveTypeName.INT64; + + Schema schema = + new Schema(Types.NestedField.optional(1, "col", Types.IntegerType.get())); + Table table = tables.create(schema, tempDir.toURI() + "/uint" + unsignedBitWidth); + + MessageType parquetSchema = + new MessageType( + "test", + primitive(physicalType, Type.Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.intType(unsignedBitWidth, false)) + .id(1) + .named("col")); + + File testFile = new File(tempDir, "unsigned-int" + unsignedBitWidth + ".parquet"); + try (ParquetWriter writer = + ExampleParquetWriter.builder(new Path(testFile.toURI())) + .withType(parquetSchema) + .build()) { + SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); + Group group = factory.newGroup(); + if (physicalBitWidth == 32) { + group.add("col", 100); + } else { + group.add("col", 100L); + } + writer.write(group); + } + + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(testFile.getAbsolutePath()) + .withFileSizeInBytes(testFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(1) + .build(); + table.newAppend().appendFile(dataFile).commit(); + + assertThatThrownBy( + () -> { + try (VectorizedTableScanIterable vectorizedReader = + new VectorizedTableScanIterable(table.newScan(), 1024, false)) { + for (ColumnarBatch batch : vectorizedReader) { + batch.createVectorSchemaRootFromVectors().close(); + } + } + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("unsigned integer") + .hasMessageContaining("col"); + } + } + /** * Run the following verifications: * From 2e6657242f78487913b99b9699be789bb97d2fa0 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Thu, 16 Apr 2026 23:22:25 -0700 Subject: [PATCH 2/3] Apply spotless formatting --- .../apache/iceberg/arrow/vectorized/VectorizedArrowReader.java | 3 ++- .../org/apache/iceberg/arrow/vectorized/TestArrowReader.java | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 3f9c3504ca45..87d25257df0d 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -590,7 +590,8 @@ public Optional visit( java.util.Locale.ROOT, "Cannot read unsigned integer column '%s' (uint%d): " + "Iceberg does not support unsigned integer types", - primitive.getName(), intLogicalType.getBitWidth())); + primitive.getName(), + intLogicalType.getBitWidth())); } FieldVector vector = arrowField.createVector(rootAlloc); int bitWidth = intLogicalType.getBitWidth(); diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java index 212f2dd835a3..1ed933d90412 100644 --- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java @@ -396,8 +396,7 @@ public void testUnsignedIntegerColumnThrowsException() throws Exception { ? PrimitiveType.PrimitiveTypeName.INT32 : PrimitiveType.PrimitiveTypeName.INT64; - Schema schema = - new Schema(Types.NestedField.optional(1, "col", Types.IntegerType.get())); + Schema schema = new Schema(Types.NestedField.optional(1, "col", Types.IntegerType.get())); Table table = tables.create(schema, tempDir.toURI() + "/uint" + unsignedBitWidth); MessageType parquetSchema = From 041164473193357f4d45d8f9cc5d0ffd5fac6b94 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Fri, 17 Apr 2026 19:31:44 -0700 Subject: [PATCH 3/3] address comments --- .../vectorized/VectorizedArrowReader.java | 17 ++- .../arrow/vectorized/TestArrowReader.java | 121 ++++++++++++++---- 2 files changed, 103 insertions(+), 35 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 87d25257df0d..e9ebed2826f4 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -584,23 +584,22 @@ public Optional visit( @Override public Optional visit( LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { - if (!intLogicalType.isSigned()) { - throw new UnsupportedOperationException( - String.format( - java.util.Locale.ROOT, - "Cannot read unsigned integer column '%s' (uint%d): " - + "Iceberg does not support unsigned integer types", - primitive.getName(), - intLogicalType.getBitWidth())); - } FieldVector vector = arrowField.createVector(rootAlloc); int bitWidth = intLogicalType.getBitWidth(); if (bitWidth == 8 || bitWidth == 16 || bitWidth == 32) { + // Iceberg has no unsigned integer type. Reading UINT32 into a 32-bit signed value would + // silently produce negative results for inputs above Integer.MAX_VALUE. UINT8 and UINT16 + // both fit losslessly in a signed int32 and are allowed, matching the policy in + // BaseParquetReaders for the non-vectorized path. + Preconditions.checkArgument( + intLogicalType.isSigned() || bitWidth < 32, "Cannot read UINT32 as an int value"); ((IntVector) vector).allocateNew(batchSize); return Optional.of( new LogicalTypeVisitorResult(vector, ReadType.INT, (int) IntVector.TYPE_WIDTH)); } else if (bitWidth == 64) { + Preconditions.checkArgument( + intLogicalType.isSigned(), "Cannot read UINT64 as a long value"); ((BigIntVector) vector).allocateNew(batchSize); return Optional.of( new LogicalTypeVisitorResult(vector, ReadType.LONG, (int) BigIntVector.TYPE_WIDTH)); diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java index 1ed933d90412..e5412317ea33 100644 --- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; @@ -102,6 +103,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; /** * Test cases for {@link ArrowReader}. @@ -384,17 +388,82 @@ public void testTimestampMillisAreReadCorrectly() throws Exception { assertThat(totalRowsRead).as("Should read all rows").isEqualTo(millisValues.size()); } + private static Stream rejectedUnsignedIntegerCases() { + return Stream.of( + Arguments.of( + 32, + PrimitiveType.PrimitiveTypeName.INT32, + new Schema(Types.NestedField.optional(1, "col", Types.IntegerType.get())), + "Cannot read UINT32 as an int value"), + Arguments.of( + 64, + PrimitiveType.PrimitiveTypeName.INT64, + new Schema(Types.NestedField.optional(1, "col", Types.LongType.get())), + "Cannot read UINT64 as a long value")); + } + + @ParameterizedTest + @MethodSource("rejectedUnsignedIntegerCases") + public void testUnsignedIntegerColumnThrowsException( + int unsignedBitWidth, + PrimitiveType.PrimitiveTypeName physicalType, + Schema schema, + String expectedMessage) + throws Exception { + tables = new HadoopTables(); + Table table = tables.create(schema, tempDir.toURI() + "/uint" + unsignedBitWidth); + + MessageType parquetSchema = + new MessageType( + "test", + primitive(physicalType, Type.Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.intType(unsignedBitWidth, false)) + .id(1) + .named("col")); + + File testFile = + new File(tempDir, "unsigned-int" + unsignedBitWidth + "-" + System.nanoTime() + ".parquet"); + try (ParquetWriter writer = + ExampleParquetWriter.builder(new Path(testFile.toURI())).withType(parquetSchema).build()) { + SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); + Group group = factory.newGroup(); + if (unsignedBitWidth == 64) { + group.add("col", 100L); + } else { + group.add("col", 100); + } + writer.write(group); + } + + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(testFile.getAbsolutePath()) + .withFileSizeInBytes(testFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(1) + .build(); + table.newAppend().appendFile(dataFile).commit(); + + assertThatThrownBy( + () -> { + try (VectorizedTableScanIterable vectorizedReader = + new VectorizedTableScanIterable(table.newScan(), 1024, false)) { + for (ColumnarBatch batch : vectorizedReader) { + batch.createVectorSchemaRootFromVectors().close(); + } + } + }) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining(expectedMessage); + } + @Test - public void testUnsignedIntegerColumnThrowsException() throws Exception { + public void testUnsignedSmallIntegerColumnRoundtrips() throws Exception { tables = new HadoopTables(); - for (int[] spec : new int[][] {{8, 32}, {16, 32}, {32, 32}, {64, 64}}) { + for (int[] spec : new int[][] {{8, 250}, {16, 50000}}) { int unsignedBitWidth = spec[0]; - int physicalBitWidth = spec[1]; - PrimitiveType.PrimitiveTypeName physicalType = - physicalBitWidth == 32 - ? PrimitiveType.PrimitiveTypeName.INT32 - : PrimitiveType.PrimitiveTypeName.INT64; + int value = spec[1]; Schema schema = new Schema(Types.NestedField.optional(1, "col", Types.IntegerType.get())); Table table = tables.create(schema, tempDir.toURI() + "/uint" + unsignedBitWidth); @@ -402,23 +471,21 @@ public void testUnsignedIntegerColumnThrowsException() throws Exception { MessageType parquetSchema = new MessageType( "test", - primitive(physicalType, Type.Repetition.OPTIONAL) + primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL) .as(LogicalTypeAnnotation.intType(unsignedBitWidth, false)) .id(1) .named("col")); - File testFile = new File(tempDir, "unsigned-int" + unsignedBitWidth + ".parquet"); + File testFile = + new File( + tempDir, "unsigned-int" + unsignedBitWidth + "-" + System.nanoTime() + ".parquet"); try (ParquetWriter writer = ExampleParquetWriter.builder(new Path(testFile.toURI())) .withType(parquetSchema) .build()) { SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); Group group = factory.newGroup(); - if (physicalBitWidth == 32) { - group.add("col", 100); - } else { - group.add("col", 100L); - } + group.add("col", value); writer.write(group); } @@ -431,18 +498,20 @@ public void testUnsignedIntegerColumnThrowsException() throws Exception { .build(); table.newAppend().appendFile(dataFile).commit(); - assertThatThrownBy( - () -> { - try (VectorizedTableScanIterable vectorizedReader = - new VectorizedTableScanIterable(table.newScan(), 1024, false)) { - for (ColumnarBatch batch : vectorizedReader) { - batch.createVectorSchemaRootFromVectors().close(); - } - } - }) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("unsigned integer") - .hasMessageContaining("col"); + int totalRows = 0; + try (VectorizedTableScanIterable vectorizedReader = + new VectorizedTableScanIterable(table.newScan(), 1024, false)) { + for (ColumnarBatch batch : vectorizedReader) { + VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors(); + assertThat(((IntVector) root.getVector("col")).get(0)) + .as("UINT%d value should round-trip through int", unsignedBitWidth) + .isEqualTo(value); + totalRows += root.getRowCount(); + root.close(); + } + } + + assertThat(totalRows).isEqualTo(1); } }