Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,24 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
val fileSizes = new JArrayList[JLong]()
val modificationTimes = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]
val metadataColumns = new JArrayList[JMap[String, String]]
val needMetadataColumns = metadataColumnNames != null && metadataColumnNames.nonEmpty
val emptyMetadataColumn: JMap[String, String] =
java.util.Collections.emptyMap[String, String]()
val metadataColumns = new JArrayList[JMap[String, String]](f.files.length)
val otherMetadataColumns = new JArrayList[JMap[String, Object]]
f.files.foreach {
file =>
paths.add(new URI(file.filePath.toString()).toASCIIString)
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val metadataColumn =
SparkShimLoader.getSparkShims
.generateMetadataColumns(file, metadataColumnNames)
.asJava
if (needMetadataColumns) {
SparkShimLoader.getSparkShims
.generateMetadataColumns(file, metadataColumnNames)
.asJava
} else {
emptyMetadataColumn
}
metadataColumns.add(metadataColumn)
val partitionColumn = new JHashMap[String, String]()
for (i <- 0 until file.partitionValues.numFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,19 @@ class VeloxIteratorApi extends IteratorApi with Logging {
.unzip

val partitionColumns = getPartitionColumns(partitionSchema, partitionFiles)
val metadataColumns = partitionFiles
.map(
f => SparkShimLoader.getSparkShims.generateMetadataColumns(f, metadataColumnNames).asJava)
val needMetadataColumns = metadataColumnNames != null && metadataColumnNames.nonEmpty
val emptyMetadataColumn: java.util.Map[String, String] =
java.util.Collections.emptyMap[String, String]()
val metadataColumns: java.util.List[java.util.Map[String, String]] =
if (needMetadataColumns) {
partitionFiles
.map(
f =>
SparkShimLoader.getSparkShims.generateMetadataColumns(f, metadataColumnNames).asJava)
.asJava
} else {
java.util.Collections.nCopies(partitionFiles.size, emptyMetadataColumn)
}
val otherMetadataColumns = partitionFiles
.map(f => SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f))

Expand All @@ -106,7 +116,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
fileSizes.asJava,
modificationTimes.asJava,
partitionColumns.map(_.asJava).asJava,
metadataColumns.asJava,
metadataColumns,
fileFormat,
locations.toList.asJava,
mapAsJavaMap(properties),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,26 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
case _ => Seq(partition)
}

val metadataFromSpark = getMetadataColumns().map(_.name)

val inputFileRelatedMetadataKeys = Seq(
InputFileName().prettyName,
InputFileBlockStart().prettyName,
InputFileBlockLength().prettyName)

val neededInputFileRelatedMetadataKeys =
inputFileRelatedMetadataKeys.filter(k => output.exists(_.name == k))

val metadataColumnNames = (metadataFromSpark ++ neededInputFileRelatedMetadataKeys).distinct

BackendsApiManager.getIteratorApiInstance
.genSplitInfo(
partition.index,
part,
getPartitionSchema,
getDataSchema,
readFileFormat,
getMetadataColumns().map(_.name),
metadataColumnNames,
getProperties)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,33 @@ trait SparkShims {
def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String] = Seq.empty): Map[String, String] = {
Map(
InputFileName().prettyName -> file.filePath.toString,
InputFileBlockStart().prettyName -> file.start.toString,
InputFileBlockLength().prettyName -> file.length.toString
)
if (metadataColumnNames == null || metadataColumnNames.isEmpty) {
return Map.empty
}

val inputFileName = InputFileName().prettyName
val inputFileBlockStart = InputFileBlockStart().prettyName
val inputFileBlockLength = InputFileBlockLength().prettyName

if (
!metadataColumnNames.contains(inputFileName) &&
!metadataColumnNames.contains(inputFileBlockStart) &&
!metadataColumnNames.contains(inputFileBlockLength)
) {
return Map.empty
}

var metadataColumn = Map.empty[String, String]
if (metadataColumnNames.contains(inputFileName)) {
metadataColumn += (inputFileName -> file.filePath.toString)
}
if (metadataColumnNames.contains(inputFileBlockStart)) {
metadataColumn += (inputFileBlockStart -> file.start.toString)
}
if (metadataColumnNames.contains(inputFileBlockLength)) {
metadataColumn += (inputFileBlockLength -> file.length.toString)
}
metadataColumn
}

// For compatibility with Spark-3.5.
Expand Down
Loading