From 60340bbfda58e11cd4bff8c294be4e1029e608cf Mon Sep 17 00:00:00 2001 From: "liyang.127" Date: Thu, 9 Apr 2026 20:22:33 +0800 Subject: [PATCH] avoid driver oom caused by unnecessary metadata columns in splitinfo --- .../clickhouse/CHIteratorApi.scala | 15 ++++++--- .../backendsapi/velox/VeloxIteratorApi.scala | 18 ++++++++--- .../execution/BasicScanExecTransformer.scala | 14 +++++++- .../apache/gluten/sql/shims/SparkShims.scala | 32 ++++++++++++++++--- 4 files changed, 65 insertions(+), 14 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index a705675e2d68..aaa11a5b6d6f 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -161,7 +161,10 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { val fileSizes = new JArrayList[JLong]() val modificationTimes = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]] - val metadataColumns = new JArrayList[JMap[String, String]] + val needMetadataColumns = metadataColumnNames != null && metadataColumnNames.nonEmpty + val emptyMetadataColumn: JMap[String, String] = + java.util.Collections.emptyMap[String, String]() + val metadataColumns = new JArrayList[JMap[String, String]](f.files.length) val otherMetadataColumns = new JArrayList[JMap[String, Object]] f.files.foreach { file => @@ -169,9 +172,13 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { starts.add(JLong.valueOf(file.start)) lengths.add(JLong.valueOf(file.length)) val metadataColumn = - SparkShimLoader.getSparkShims - .generateMetadataColumns(file, metadataColumnNames) - .asJava + if (needMetadataColumns) { + SparkShimLoader.getSparkShims + .generateMetadataColumns(file, metadataColumnNames) + .asJava + } else { + emptyMetadataColumn + } metadataColumns.add(metadataColumn) val partitionColumn = new JHashMap[String, String]() for (i <- 0 until file.partitionValues.numFields) { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 60018554dd5c..846be7eae62c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -91,9 +91,19 @@ class VeloxIteratorApi extends IteratorApi with Logging { .unzip val partitionColumns = getPartitionColumns(partitionSchema, partitionFiles) - val metadataColumns = partitionFiles - .map( - f => SparkShimLoader.getSparkShims.generateMetadataColumns(f, metadataColumnNames).asJava) + val needMetadataColumns = metadataColumnNames != null && metadataColumnNames.nonEmpty + val emptyMetadataColumn: java.util.Map[String, String] = + java.util.Collections.emptyMap[String, String]() + val metadataColumns: java.util.List[java.util.Map[String, String]] = + if (needMetadataColumns) { + partitionFiles + .map( + f => + SparkShimLoader.getSparkShims.generateMetadataColumns(f, metadataColumnNames).asJava) + .asJava + } else { + java.util.Collections.nCopies(partitionFiles.size, emptyMetadataColumn) + } val otherMetadataColumns = partitionFiles .map(f => SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f)) @@ -106,7 +116,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { fileSizes.asJava, modificationTimes.asJava, partitionColumns.map(_.asJava).asJava, - metadataColumns.asJava, + metadataColumns, fileFormat, locations.toList.asJava, mapAsJavaMap(properties), diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 3b142fecb952..9e3c9f4d474d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -112,6 +112,18 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource case _ => Seq(partition) } + val metadataFromSpark = getMetadataColumns().map(_.name) + + val inputFileRelatedMetadataKeys = Seq( + InputFileName().prettyName, + InputFileBlockStart().prettyName, + InputFileBlockLength().prettyName) + + val neededInputFileRelatedMetadataKeys = + inputFileRelatedMetadataKeys.filter(k => output.exists(_.name == k)) + + val metadataColumnNames = (metadataFromSpark ++ neededInputFileRelatedMetadataKeys).distinct + BackendsApiManager.getIteratorApiInstance .genSplitInfo( partition.index, @@ -119,7 +131,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource getPartitionSchema, getDataSchema, readFileFormat, - getMetadataColumns().map(_.name), + metadataColumnNames, getProperties) } diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index 2f5350f38a08..e3fed795b1b5 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -171,11 +171,33 @@ trait SparkShims { def generateMetadataColumns( file: PartitionedFile, metadataColumnNames: Seq[String] = Seq.empty): Map[String, String] = { - Map( - InputFileName().prettyName -> file.filePath.toString, - InputFileBlockStart().prettyName -> file.start.toString, - InputFileBlockLength().prettyName -> file.length.toString - ) + if (metadataColumnNames == null || metadataColumnNames.isEmpty) { + return Map.empty + } + + val inputFileName = InputFileName().prettyName + val inputFileBlockStart = InputFileBlockStart().prettyName + val inputFileBlockLength = InputFileBlockLength().prettyName + + if ( + !metadataColumnNames.contains(inputFileName) && + !metadataColumnNames.contains(inputFileBlockStart) && + !metadataColumnNames.contains(inputFileBlockLength) + ) { + return Map.empty + } + + var metadataColumn = Map.empty[String, String] + if (metadataColumnNames.contains(inputFileName)) { + metadataColumn += (inputFileName -> file.filePath.toString) + } + if (metadataColumnNames.contains(inputFileBlockStart)) { + metadataColumn += (inputFileBlockStart -> file.start.toString) + } + if (metadataColumnNames.contains(inputFileBlockLength)) { + metadataColumn += (inputFileBlockLength -> file.length.toString) + } + metadataColumn } // For compatibility with Spark-3.5.