Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method void org.apache.iceberg.io.DataWriter<T>::add(T)"
justification: "Removing deprecated method"
"1.1.0":
org.apache.iceberg:iceberg-api:
- code: "java.method.addedToInterface"
new: "method java.util.List<java.lang.Integer> org.apache.iceberg.ContentFile<F>::partialFieldIds()"
justification: "{add new feature}"
org.apache.iceberg:iceberg-data:
- code: "java.method.abstractMethodAdded"
new: "method T org.apache.iceberg.data.DeleteFilter<T>::combineRecord(T, org.apache.iceberg.StructLike,\
\ org.apache.iceberg.Schema, org.apache.iceberg.Schema)"
justification: "{add new feature}"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
2 changes: 2 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public interface ContentFile<F> {
*/
List<Integer> equalityFieldIds();

List<Integer> partialFieldIds();

/**
* Returns the sort order id of this file, which describes how the file is ordered. This
* information will be useful for merging data and equality delete files more efficiently when
Expand Down
17 changes: 15 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,14 @@ public interface DataFile extends ContentFile<DataFile> {
int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
// NEXT ID TO ASSIGN: 142

Types.NestedField PARTIAL_IDS =
optional(
142,
"partial_ids",
ListType.ofRequired(143, IntegerType.get()),
"partial comparison field IDs");
// NEXT ID TO ASSIGN: 144

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
Expand All @@ -123,7 +130,8 @@ static StructType getType(StructType partitionType) {
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS,
SORT_ORDER_ID);
SORT_ORDER_ID,
PARTIAL_IDS);
}

/** @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES */
Expand All @@ -136,4 +144,9 @@ default FileContent content() {
default List<Integer> equalityFieldIds() {
return null;
}

@Override
default List<Integer> partialFieldIds() {
return null;
}
}
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/FileContent.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
public enum FileContent {
DATA(0),
POSITION_DELETES(1),
EQUALITY_DELETES(2);
EQUALITY_DELETES(2),
PARTIAL_UPDATE(3);

private final int id;

Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public PartitionData copy() {
private Map<Integer, ByteBuffer> upperBounds = null;
private long[] splitOffsets = null;
private int[] equalityIds = null;
private int[] partialIds = null;
private byte[] keyMetadata = null;
private Integer sortOrderId;

Expand Down Expand Up @@ -132,6 +133,7 @@ public PartitionData copy() {
Map<Integer, ByteBuffer> upperBounds,
List<Long> splitOffsets,
int[] equalityFieldIds,
int[] partialFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata) {
this.partitionSpecId = specId;
Expand Down Expand Up @@ -159,6 +161,7 @@ public PartitionData copy() {
this.upperBounds = SerializableByteBufferMap.wrap(upperBounds);
this.splitOffsets = ArrayUtil.toLongArray(splitOffsets);
this.equalityIds = equalityFieldIds;
this.partialIds = partialFieldIds;
this.sortOrderId = sortOrderId;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
}
Expand Down Expand Up @@ -207,6 +210,10 @@ public PartitionData copy() {
toCopy.equalityIds != null
? Arrays.copyOf(toCopy.equalityIds, toCopy.equalityIds.length)
: null;
this.partialIds =
toCopy.partialIds != null
? Arrays.copyOf(toCopy.partialIds, toCopy.partialIds.length)
: null;
this.sortOrderId = toCopy.sortOrderId;
}

Expand Down Expand Up @@ -294,6 +301,9 @@ public void put(int i, Object value) {
this.sortOrderId = (Integer) value;
return;
case 17:
this.partialIds = ArrayUtil.toIntArray((List<Integer>) value);
return;
case 18:
this.fileOrdinal = (long) value;
return;
default:
Expand Down Expand Up @@ -349,6 +359,8 @@ public Object get(int i) {
case 16:
return sortOrderId;
case 17:
return partialFieldIds();
case 18:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
Expand Down Expand Up @@ -445,6 +457,11 @@ public List<Integer> equalityFieldIds() {
return ArrayUtil.toIntList(equalityIds);
}

@Override
public List<Integer> partialFieldIds() {
return ArrayUtil.toIntList(partialIds);
}

@Override
public Integer sortOrderId() {
return sortOrderId;
Expand Down Expand Up @@ -478,6 +495,7 @@ public String toString() {
.add("split_offsets", splitOffsets == null ? "null" : splitOffsets())
.add("equality_ids", equalityIds == null ? "null" : equalityFieldIds())
.add("sort_order_id", sortOrderId)
.add("partial_ids", equalityIds == null ? "null" : partialFieldIds())
.toString();
}
}
90 changes: 90 additions & 0 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,80 @@ private static boolean canContainDeletesForFile(

case EQUALITY_DELETES:
return canContainEqDeletesForFile(dataFile, deleteFile, schema);

case PARTIAL_UPDATE:
return canContainPartialDeletesForFile(dataFile, deleteFile, schema);
}

return true;
}

// todo: add actual implementation
private static boolean canContainPartialDeletesForFile(
DataFile dataFile, DeleteFile deleteFile, Schema schema) {
// whether to check data ranges or to assume that the ranges match
// if upper/lower bounds are missing, null counts may still be used to determine delete files
// can be skipped
boolean checkRanges =
dataFile.lowerBounds() != null
&& dataFile.upperBounds() != null
&& deleteFile.lowerBounds() != null
&& deleteFile.upperBounds() != null;

Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
Map<Integer, ByteBuffer> deleteLowers = deleteFile.lowerBounds();
Map<Integer, ByteBuffer> deleteUppers = deleteFile.upperBounds();

Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
Map<Integer, Long> deleteNullCounts = deleteFile.nullValueCounts();
Map<Integer, Long> deleteValueCounts = deleteFile.valueCounts();

for (int id : deleteFile.equalityFieldIds()) {
Types.NestedField field = schema.findField(id);
if (!field.type().isPrimitiveType()) {
// stats are not kept for nested types. assume that the delete file may match
continue;
}

if (containsNull(dataNullCounts, field) && containsNull(deleteNullCounts, field)) {
// the data has null values and null has been deleted, so the deletes must be applied
continue;
}

if (allNull(dataNullCounts, dataValueCounts, field) && allNonNull(deleteNullCounts, field)) {
// the data file contains only null values for this field, but there are no deletes for null
// values
return false;
}

if (allNull(deleteNullCounts, deleteValueCounts, field)
&& allNonNull(dataNullCounts, field)) {
// the delete file removes only null rows with null for this field, but there are no data
// rows with null
return false;
}

if (!checkRanges) {
// some upper and lower bounds are missing, assume they match
continue;
}

ByteBuffer dataLower = dataLowers.get(id);
ByteBuffer dataUpper = dataUppers.get(id);
ByteBuffer deleteLower = deleteLowers.get(id);
ByteBuffer deleteUpper = deleteUppers.get(id);
if (dataLower == null || dataUpper == null || deleteLower == null || deleteUpper == null) {
// at least one bound is not known, assume the delete file may match
continue;
}

if (!rangesOverlap(
field.type().asPrimitiveType(), dataLower, dataUpper, deleteLower, deleteUpper)) {
// no values overlap between the data file and the deletes
return false;
}
}

return true;
Expand Down Expand Up @@ -474,6 +548,22 @@ DeleteFileIndex build() {
globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

// fixme: this will overlap equal deletes
List<Pair<Long, DeleteFile>> partialDeleteSortedBySeq =
deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.PARTIAL_UPDATE)
.map(
entry ->
// a delete file is indexed by the sequence number it should be applied to
Pair.of(entry.dataSequenceNumber(), entry.file()))
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());
if (partialDeleteSortedBySeq.size() > 0) {
globalApplySeqs = partialDeleteSortedBySeq.stream().mapToLong(Pair::first).toArray();
globalDeletes =
partialDeleteSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
}

List<Pair<Long, DeleteFile>> posFilesSortedBySeq =
deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.POSITION_DELETES)
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/FileMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static class Builder {
private final int specId;
private FileContent content = null;
private int[] equalityFieldIds = null;
private int[] partialFieldIds = null;
private PartitionData partitionData;
private String filePath = null;
private FileFormat format = null;
Expand Down Expand Up @@ -116,6 +117,13 @@ public Builder ofEqualityDeletes(int... fieldIds) {
return this;
}

public Builder ofPartialDeletes(int[] newEqualityFieldIds, int[] newPartialFieldIds) {
this.content = FileContent.PARTIAL_UPDATE;
this.equalityFieldIds = newEqualityFieldIds;
this.partialFieldIds = newPartialFieldIds;
return this;
}

public Builder withStatus(FileStatus stat) {
this.filePath = stat.getPath().toString();
this.fileSizeInBytes = stat.getLen();
Expand Down Expand Up @@ -222,6 +230,8 @@ public DeleteFile build() {
sortOrderId == null, "Position delete file should not have sort order");
break;
case EQUALITY_DELETES:

case PARTIAL_UPDATE:
if (sortOrderId == null) {
sortOrderId = SortOrder.unsorted().orderId();
}
Expand All @@ -246,6 +256,7 @@ public DeleteFile build() {
lowerBounds,
upperBounds),
equalityFieldIds,
partialFieldIds,
sortOrderId,
keyMetadata);
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
metrics.upperBounds(),
splitOffsets,
null,
null,
sortOrderId,
keyMetadata);
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
long fileSizeInBytes,
Metrics metrics,
int[] equalityFieldIds,
int[] partialFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata) {
super(
Expand All @@ -57,6 +58,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
metrics.upperBounds(),
null,
equalityFieldIds,
partialFieldIds,
sortOrderId,
keyMetadata);
}
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,14 @@ private static class UpdateMetrics {
private int addedPosDeleteFiles = 0;
private int removedPosDeleteFiles = 0;
private int addedDeleteFiles = 0;
private int addedPartialFiles = 0;
private int removedDeleteFiles = 0;
private long addedRecords = 0L;
private long deletedRecords = 0L;
private long addedPosDeletes = 0L;
private long removedPosDeletes = 0L;
private long addedEqDeletes = 0L;
private long addedPartialUpdates = 0L;
private long removedEqDeletes = 0L;
private boolean trustSizeAndDeleteCounts = true;

Expand Down Expand Up @@ -290,6 +292,12 @@ void addedFile(ContentFile<?> file) {
this.addedEqDeleteFiles += 1;
this.addedEqDeletes += file.recordCount();
break;
case PARTIAL_UPDATE:
this.addedDeleteFiles += 1;
this.addedPartialFiles += 1;
this.addedPartialUpdates += file.recordCount();
break;

default:
throw new UnsupportedOperationException(
"Unsupported file content type: " + file.content());
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ static Types.StructType fileType(Types.StructType partitionType) {
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID);
DataFile.SORT_ORDER_ID,
DataFile.PARTIAL_IDS);
}

static class IndexedManifestEntry<F extends ContentFile<F>>
Expand Down Expand Up @@ -456,6 +457,8 @@ public Object get(int pos) {
return wrapped.equalityFieldIds();
case 15:
return wrapped.sortOrderId();
case 16:
return wrapped.partialFieldIds();
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
Expand Down Expand Up @@ -550,6 +553,11 @@ public List<Integer> equalityFieldIds() {
return wrapped.equalityFieldIds();
}

@Override
public List<Integer> partialFieldIds() {
return wrapped.partialFieldIds();
}

@Override
public Integer sortOrderId() {
return wrapped.sortOrderId();
Expand Down
Loading