diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index d591454596..5a0bc9f6d3 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -24,10 +24,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import scala.Option; @@ -36,9 +32,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -87,7 +81,10 @@ * reader.close(); * } * + * + * @deprecated since 0.14.0. This class is kept for Iceberg compatibility only. */ +@Deprecated @IcebergApi public class BatchReader extends RecordReader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(FileReader.class); @@ -110,8 +107,6 @@ public class BatchReader extends RecordReader implements Cl protected AbstractColumnReader[] columnReaders; private CometSchemaImporter importer; protected ColumnarBatch currentBatch; - private Future> prefetchTask; - private LinkedBlockingQueue> prefetchQueue; private FileReader fileReader; private boolean[] missingColumns; protected boolean isInitialized; @@ -363,26 +358,7 @@ public void init() throws URISyntaxException, IOException { } } - // Pre-fetching - boolean preFetchEnabled = - conf.getBoolean( - CometConf.COMET_SCAN_PREFETCH_ENABLED().key(), - (boolean) CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get()); - - if (preFetchEnabled) { - LOG.info("Prefetch enabled for BatchReader."); - this.prefetchQueue = new LinkedBlockingQueue<>(); - } - isInitialized = true; - synchronized (this) { - // if prefetch is enabled, `init()` is called in separate thread. When - // `BatchReader.nextBatch()` is called asynchronously, it is possibly that - // `init()` is not called or finished. We need to hold on `nextBatch` until - // initialization of `BatchReader` is done. Once we are close to finish - // initialization, we notify the waiting thread of `nextBatch` to continue. - notifyAll(); - } } /** @@ -436,51 +412,13 @@ public ColumnarBatch currentBatch() { return currentBatch; } - // Only for testing - public Future> getPrefetchTask() { - return this.prefetchTask; - } - - // Only for testing - public LinkedBlockingQueue> getPrefetchQueue() { - return this.prefetchQueue; - } - /** * Loads the next batch of rows. * * @return true if there are no more rows to read, false otherwise. */ public boolean nextBatch() throws IOException { - if (this.prefetchTask == null) { - Preconditions.checkState(isInitialized, "init() should be called first!"); - } else { - // If prefetch is enabled, this reader will be initialized asynchronously from a - // different thread. Wait until it is initialized - while (!isInitialized) { - synchronized (this) { - try { - // Wait until initialization of current `BatchReader` is finished (i.e., `init()`), - // is done. It is possibly that `init()` is done after entering this while loop, - // so a short timeout is given. - wait(100); - - // Checks if prefetch task is finished. If so, tries to get exception if any. - if (prefetchTask.isDone()) { - Option exception = prefetchTask.get(); - if (exception.isDefined()) { - throw exception.get(); - } - } - } catch (RuntimeException e) { - // Spark will check certain exception e.g. `SchemaColumnConvertNotSupportedException`. - throw e; - } catch (Throwable e) { - throw new IOException(e); - } - } - } - } + Preconditions.checkState(isInitialized, "init() should be called first!"); if (rowsRead >= totalRowCount) return false; boolean hasMore; @@ -547,7 +485,6 @@ public void close() throws IOException { } } - @SuppressWarnings("deprecation") private boolean loadNextRowGroupIfNecessary() throws Throwable { // More rows can be read from loaded row group. No need to load next one. if (rowsRead != totalRowsLoaded) return true; @@ -556,21 +493,7 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { SQLMetric numRowGroupsMetric = metrics.get("ParquetRowGroups"); long startNs = System.nanoTime(); - PageReadStore rowGroupReader = null; - if (prefetchTask != null && prefetchQueue != null) { - // Wait for pre-fetch task to finish. - Pair rowGroupReaderPair = prefetchQueue.take(); - rowGroupReader = rowGroupReaderPair.getLeft(); - - // Update incremental byte read metric. Because this metric in Spark is maintained - // by thread local variable, we need to manually update it. - // TODO: We may expose metrics from `FileReader` and get from it directly. - long incBytesRead = rowGroupReaderPair.getRight(); - FileSystem.getAllStatistics().stream() - .forEach(statistic -> statistic.incrementBytesRead(incBytesRead)); - } else { - rowGroupReader = fileReader.readNextRowGroup(); - } + PageReadStore rowGroupReader = fileReader.readNextRowGroup(); if (rowGroupTimeMetric != null) { rowGroupTimeMetric.add(System.nanoTime() - startNs); @@ -608,48 +531,4 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { totalRowsLoaded += rowGroupReader.getRowCount(); return true; } - - // Submits a prefetch task for this reader. - public void submitPrefetchTask(ExecutorService threadPool) { - this.prefetchTask = threadPool.submit(new PrefetchTask()); - } - - // A task for prefetching parquet row groups. - private class PrefetchTask implements Callable> { - private long getBytesRead() { - return FileSystem.getAllStatistics().stream() - .mapToLong(s -> s.getThreadStatistics().getBytesRead()) - .sum(); - } - - @Override - public Option call() throws Exception { - // Gets the bytes read so far. - long baseline = getBytesRead(); - - try { - init(); - - while (true) { - PageReadStore rowGroupReader = fileReader.readNextRowGroup(); - - if (rowGroupReader == null) { - // Reaches the end of row groups. - return Option.empty(); - } else { - long incBytesRead = getBytesRead() - baseline; - - prefetchQueue.add(Pair.of(rowGroupReader, incBytesRead)); - } - } - } catch (Throwable e) { - // Returns exception thrown from the reader. The reader will re-throw it. - return Option.apply(e); - } finally { - if (fileReader != null) { - fileReader.closeStream(); - } - } - } - } } diff --git a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java index d4bfa2b878..bd66f2deab 100644 --- a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java @@ -24,9 +24,11 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.comet.IcebergApi; import org.apache.comet.vector.CometVector; /** This class is a public interface used by Apache Iceberg to read batches using Comet */ +@IcebergApi public class IcebergCometBatchReader extends BatchReader { public IcebergCometBatchReader(int numColumns, StructType schema) { this.columnReaders = new AbstractColumnReader[numColumns]; diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 522ccbc94c..fc209ec462 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -651,23 +651,6 @@ object CometConf extends ShimCometConf { .doubleConf .createWithDefault(1.0) - val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] = - conf("spark.comet.scan.preFetch.enabled") - .category(CATEGORY_SCAN) - .doc("Whether to enable pre-fetching feature of CometScan.") - .booleanConf - .createWithDefault(false) - - val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] = - conf("spark.comet.scan.preFetch.threadNum") - .category(CATEGORY_SCAN) - .doc( - "The number of threads running pre-fetching for CometScan. Effective if " + - s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " + - "pre-fetching threads means more memory requirement to store pre-fetched row groups.") - .intConf - .createWithDefault(2) - val COMET_NATIVE_LOAD_REQUIRED: ConfigEntry[Boolean] = conf("spark.comet.nativeLoadRequired") .category(CATEGORY_EXEC) .doc( diff --git a/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala b/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala index ca13bba0c4..1759ea2765 100644 --- a/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala +++ b/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala @@ -54,11 +54,6 @@ abstract class CometReaderThreadPool { } -// A thread pool used for pre-fetching files. -object CometPrefetchThreadPool extends CometReaderThreadPool { - override def threadNamePrefix: String = "prefetch_thread" -} - // Thread pool used by the Parquet parallel reader object CometFileReaderThreadPool extends CometReaderThreadPool { override def threadNamePrefix: String = "file_reader_thread" diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala index e07d16d4dd..7874f3774d 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala @@ -57,7 +57,7 @@ import org.apache.comet.vector.CometVector * in [[org.apache.comet.CometSparkSessionExtensions]] * - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader to read values. */ -class CometParquetFileFormat(session: SparkSession, scanImpl: String) +class CometParquetFileFormat(session: SparkSession) extends ParquetFileFormat with MetricsSupport with ShimSQLConf { @@ -110,8 +110,6 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String) // Comet specific configurations val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf) - val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT - (file: PartitionedFile) => { val sharedConf = broadcastedHadoopConf.value.value val footer = FooterReader.readFooter(sharedConf, file) @@ -135,85 +133,42 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String) isCaseSensitive, datetimeRebaseSpec) - val recordBatchReader = - if (nativeIcebergCompat) { - // We still need the predicate in the conf to allow us to generate row indexes based on - // the actual row groups read - val pushed = if (parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates - // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why - // a `flatMap` is used here. - .flatMap(parquetFilters.createFilter) - .reduceOption(FilterApi.and) - } else { - None - } - pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p)) - val pushedNative = if (parquetFilterPushDown) { - parquetFilters.createNativeFilters(filters) - } else { - None - } - val batchReader = new NativeBatchReader( - sharedConf, - file, - footer, - pushedNative.orNull, - capacity, - requiredSchema, - dataSchema, - isCaseSensitive, - useFieldId, - ignoreMissingIds, - datetimeRebaseSpec.mode == CORRECTED, - partitionSchema, - file.partitionValues, - metrics.asJava, - CometMetricNode(metrics)) - try { - batchReader.init() - } catch { - case e: Throwable => - batchReader.close() - throw e - } - batchReader - } else { - val pushed = if (parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates - // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why - // a `flatMap` is used here. - .flatMap(parquetFilters.createFilter) - .reduceOption(FilterApi.and) - } else { - None - } - pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p)) - - val batchReader = new BatchReader( - sharedConf, - file, - footer, - capacity, - requiredSchema, - isCaseSensitive, - useFieldId, - ignoreMissingIds, - datetimeRebaseSpec.mode == CORRECTED, - partitionSchema, - file.partitionValues, - metrics.asJava) - try { - batchReader.init() - } catch { - case e: Throwable => - batchReader.close() - throw e - } - batchReader - } + val pushed = if (parquetFilterPushDown) { + filters + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) + } else { + None + } + pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p)) + val pushedNative = if (parquetFilterPushDown) { + parquetFilters.createNativeFilters(filters) + } else { + None + } + val recordBatchReader = new NativeBatchReader( + sharedConf, + file, + footer, + pushedNative.orNull, + capacity, + requiredSchema, + dataSchema, + isCaseSensitive, + useFieldId, + ignoreMissingIds, + datetimeRebaseSpec.mode == CORRECTED, + partitionSchema, + file.partitionValues, + metrics.asJava, + CometMetricNode(metrics)) + try { + recordBatchReader.init() + } catch { + case e: Throwable => + recordBatchReader.close() + throw e + } val iter = new RecordReaderIterator(recordBatchReader) try { iter.asInstanceOf[Iterator[InternalRow]] diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala deleted file mode 100644 index 495054fc81..0000000000 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.parquet - -import scala.collection.mutable -import scala.jdk.CollectionConverters._ - -import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} -import org.apache.parquet.hadoop.ParquetInputFormat -import org.apache.parquet.hadoop.metadata.ParquetMetadata -import org.apache.spark.TaskContext -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec -import org.apache.spark.sql.connector.read.InputPartition -import org.apache.spark.sql.connector.read.PartitionReader -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} -import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions -import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory -import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration - -import org.apache.comet.{CometConf, CometRuntimeException} -import org.apache.comet.shims.ShimSQLConf - -case class CometParquetPartitionReaderFactory( - usingDataFusionReader: Boolean, - @transient sqlConf: SQLConf, - broadcastedConf: Broadcast[SerializableConfiguration], - readDataSchema: StructType, - partitionSchema: StructType, - filters: Array[Filter], - options: ParquetOptions, - metrics: Map[String, SQLMetric]) - extends FilePartitionReaderFactory - with ShimSQLConf - with Logging { - - private val isCaseSensitive = sqlConf.caseSensitiveAnalysis - private val useFieldId = CometParquetUtils.readFieldId(sqlConf) - private val ignoreMissingIds = CometParquetUtils.ignoreMissingIds(sqlConf) - private val pushDownDate = sqlConf.parquetFilterPushDownDate - private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp - private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate - private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead - private val parquetFilterPushDown = sqlConf.parquetFilterPushDown - - // Comet specific configurations - private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf) - - // This is only called at executor on a Broadcast variable, so we don't want it to be - // materialized at driver. - @transient private lazy val preFetchEnabled = { - val conf = broadcastedConf.value.value - - conf.getBoolean( - CometConf.COMET_SCAN_PREFETCH_ENABLED.key, - CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) && - !usingDataFusionReader // Turn off prefetch if native_iceberg_compat is enabled - } - - private var cometReaders: Iterator[BatchReader] = _ - private val cometReaderExceptionMap = new mutable.HashMap[PartitionedFile, Throwable]() - - // TODO: we may want to revisit this as we're going to only support flat types at the beginning - override def supportColumnarReads(partition: InputPartition): Boolean = true - - override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = { - if (preFetchEnabled) { - val filePartition = partition.asInstanceOf[FilePartition] - val conf = broadcastedConf.value.value - - val threadNum = conf.getInt( - CometConf.COMET_SCAN_PREFETCH_THREAD_NUM.key, - CometConf.COMET_SCAN_PREFETCH_THREAD_NUM.defaultValue.get) - val prefetchThreadPool = CometPrefetchThreadPool.getOrCreateThreadPool(threadNum) - - this.cometReaders = filePartition.files - .map { file => - // `init()` call is deferred to when the prefetch task begins. - // Otherwise we will hold too many resources for readers which are not ready - // to prefetch. - val cometReader = buildCometReader(file) - if (cometReader != null) { - cometReader.submitPrefetchTask(prefetchThreadPool) - } - - cometReader - } - .toSeq - .toIterator - } - - super.createColumnarReader(partition) - } - - override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = - throw new UnsupportedOperationException("Comet doesn't support 'buildReader'") - - private def buildCometReader(file: PartitionedFile): BatchReader = { - val conf = broadcastedConf.value.value - - try { - val (datetimeRebaseSpec, footer, filters) = getFilter(file) - filters.foreach(pushed => ParquetInputFormat.setFilterPredicate(conf, pushed)) - val cometReader = new BatchReader( - conf, - file, - footer, - batchSize, - readDataSchema, - isCaseSensitive, - useFieldId, - ignoreMissingIds, - datetimeRebaseSpec.mode == CORRECTED, - partitionSchema, - file.partitionValues, - metrics.asJava) - val taskContext = Option(TaskContext.get) - taskContext.foreach(_.addTaskCompletionListener[Unit](_ => cometReader.close())) - return cometReader - } catch { - case e: Throwable if preFetchEnabled => - // Keep original exception - cometReaderExceptionMap.put(file, e) - } - null - } - - override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = { - val cometReader = if (!preFetchEnabled) { - // Prefetch is not enabled, create comet reader and initiate it. - val cometReader = buildCometReader(file) - cometReader.init() - - cometReader - } else { - // If prefetch is enabled, we already tried to access the file when in `buildCometReader`. - // It is possibly we got an exception like `FileNotFoundException` and we need to throw it - // now to let Spark handle it. - val reader = cometReaders.next() - val exception = cometReaderExceptionMap.get(file) - exception.foreach(e => throw e) - - if (reader == null) { - throw new CometRuntimeException(s"Cannot find comet file reader for $file") - } - reader - } - CometPartitionReader(cometReader) - } - - def getFilter(file: PartitionedFile): (RebaseSpec, ParquetMetadata, Option[FilterPredicate]) = { - val sharedConf = broadcastedConf.value.value - val footer = FooterReader.readFooter(sharedConf, file) - val footerFileMetaData = footer.getFileMetaData - val datetimeRebaseSpec = CometParquetFileFormat.getDatetimeRebaseSpec( - file, - readDataSchema, - sharedConf, - footerFileMetaData, - datetimeRebaseModeInRead) - - val pushed = if (parquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - readDataSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringPredicate, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter) - .reduceOption(FilterApi.and) - } else { - None - } - (datetimeRebaseSpec, footer, pushed) - } - - override def createReader(inputPartition: InputPartition): PartitionReader[InternalRow] = - throw new UnsupportedOperationException("Only 'createColumnarReader' is supported.") - - /** - * A simple adapter on Comet's [[BatchReader]]. - */ - protected case class CometPartitionReader(reader: BatchReader) - extends PartitionReader[ColumnarBatch] { - - override def next(): Boolean = { - reader.nextBatch() - } - - override def get(): ColumnarBatch = { - reader.currentBatch() - } - - override def close(): Unit = { - reader.close() - } - } -} diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala deleted file mode 100644 index 3f50255761..0000000000 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.parquet - -import scala.jdk.CollectionConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.comet.CometMetricNode -import org.apache.spark.sql.connector.read.PartitionReaderFactory -import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions -import org.apache.spark.sql.execution.datasources.v2.FileScan -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.spark.util.SerializableConfiguration - -import org.apache.comet.MetricsSupport - -// TODO: Consider creating a case class and patch SQL tests if needed, will make life easier. -// currently hacking around this by setting the metrics within the object's apply method. -trait CometParquetScan extends FileScan with MetricsSupport { - def sparkSession: SparkSession - def hadoopConf: Configuration - def readDataSchema: StructType - def readPartitionSchema: StructType - def pushedFilters: Array[Filter] - def options: CaseInsensitiveStringMap - - override def equals(obj: Any): Boolean = obj match { - case other: CometParquetScan => - super.equals(other) && readDataSchema == other.readDataSchema && - readPartitionSchema == other.readPartitionSchema && - equivalentFilters(pushedFilters, other.pushedFilters) - case _ => false - } - - override def hashCode(): Int = getClass.hashCode() - - override def createReaderFactory(): PartitionReaderFactory = { - val sqlConf = sparkSession.sessionState.conf - CometParquetFileFormat.populateConf(sqlConf, hadoopConf) - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - CometParquetPartitionReaderFactory( - usingDataFusionReader = false, // this value is not used since this is v2 scan - sqlConf, - broadcastedConf, - readDataSchema, - readPartitionSchema, - pushedFilters, - new ParquetOptions(options.asScala.toMap, sqlConf), - metrics) - } -} - -object CometParquetScan { - def apply(session: SparkSession, scan: ParquetScan): CometParquetScan = { - val newScan = new ParquetScan( - scan.sparkSession, - scan.hadoopConf, - scan.fileIndex, - scan.dataSchema, - scan.readDataSchema, - scan.readPartitionSchema, - scan.pushedFilters, - scan.options, - partitionFilters = scan.partitionFilters, - dataFilters = scan.dataFilters) with CometParquetScan - - newScan.metrics = CometMetricNode.nativeScanMetrics(session.sparkContext) ++ CometMetricNode - .parquetScanMetrics(session.sparkContext) - - newScan - } -} diff --git a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala index ec33363525..ce57624b75 100644 --- a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala +++ b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala @@ -22,14 +22,13 @@ package org.apache.comet.rules import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.sideBySide -import org.apache.spark.sql.comet.{CometBatchScanExec, CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometScanExec, CometSparkToColumnarExec} +import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometScanExec, CometSparkToColumnarExec} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.comet.CometConf -import org.apache.comet.parquet.CometParquetScan // This rule is responsible for eliminating redundant transitions between row-based and // columnar-based operators for Comet. Currently, three potential redundant transitions are: @@ -157,7 +156,6 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa * This includes: * - CometScanExec with native_iceberg_compat and partition columns - uses * ConstantColumnReader - * - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses BatchReader */ private def hasScanUsingMutableBuffers(op: SparkPlan): Boolean = { op match { @@ -168,7 +166,6 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT && scan.relation.partitionSchema.nonEmpty - case scan: CometBatchScanExec => scan.scan.isInstanceOf[CometParquetScan] case _ => false } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 841bc21aa2..17c31c1485 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -36,7 +36,6 @@ import org.apache.spark.util.collection._ import com.google.common.base.Objects -import org.apache.comet.CometConf import org.apache.comet.parquet.CometParquetFileFormat import org.apache.comet.serde.OperatorOuterClass.Operator @@ -141,8 +140,7 @@ object CometNativeScanExec { // https://github.com/apache/arrow-datafusion-comet/issues/190 def transform(arg: Any): AnyRef = arg match { case _: HadoopFsRelation => - scanExec.relation.copy(fileFormat = - new CometParquetFileFormat(session, CometConf.SCAN_NATIVE_DATAFUSION))(session) + scanExec.relation.copy(fileFormat = new CometParquetFileFormat(session))(session) case other: AnyRef => other case null => null } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index e283f6b2cf..2707f0c040 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -37,15 +37,13 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} -import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.collection._ import org.apache.comet.{CometConf, MetricsSupport} -import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory} +import org.apache.comet.parquet.CometParquetFileFormat /** * Comet physical scan node for DataSource V1. Most of the code here follow Spark's @@ -476,43 +474,13 @@ case class CometScanExec( fsRelation: HadoopFsRelation, readFile: (PartitionedFile) => Iterator[InternalRow], partitions: Seq[FilePartition]): RDD[InternalRow] = { - val hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options) - val usingDataFusionReader: Boolean = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT - - val prefetchEnabled = hadoopConf.getBoolean( - CometConf.COMET_SCAN_PREFETCH_ENABLED.key, - CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) && - !usingDataFusionReader - val sqlConf = fsRelation.sparkSession.sessionState.conf - if (prefetchEnabled) { - CometParquetFileFormat.populateConf(sqlConf, hadoopConf) - val broadcastedConf = - fsRelation.sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val partitionReaderFactory = CometParquetPartitionReaderFactory( - scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - sqlConf, - broadcastedConf, - requiredSchema, - relation.partitionSchema, - pushedDownFilters.toArray, - new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf), - metrics) - - new DataSourceRDD( - fsRelation.sparkSession.sparkContext, - partitions.map(Seq(_)), - partitionReaderFactory, - true, - Map.empty) - } else { - newFileScanRDD( - fsRelation, - readFile, - partitions, - new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), - new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf)) - } + newFileScanRDD( + fsRelation, + readFile, + partitions, + new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), + new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf)) } override def doCanonicalize(): CometScanExec = { @@ -556,8 +524,7 @@ object CometScanExec { // https://github.com/apache/arrow-datafusion-comet/issues/190 def transform(arg: Any): AnyRef = arg match { case _: HadoopFsRelation => - scanExec.relation.copy(fileFormat = new CometParquetFileFormat(session, scanImpl))( - session) + scanExec.relation.copy(fileFormat = new CometParquetFileFormat(session))(session) case other: AnyRef => other case null => null } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 928e66b29b..1495eb34ef 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -19,7 +19,7 @@ package org.apache.comet.parquet -import java.io.{File, FileFilter} +import java.io.File import java.math.{BigDecimal, BigInteger} import java.time.{ZoneId, ZoneOffset} @@ -31,20 +31,17 @@ import scala.util.control.Breaks.breakable import org.scalactic.source.Position import org.scalatest.Tag -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.schema.MessageTypeParser import org.apache.spark.SparkException import org.apache.spark.sql.{CometTestBase, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String import com.google.common.primitives.UnsignedLong @@ -703,76 +700,6 @@ abstract class ParquetReadSuite extends CometTestBase { } } - test("partition column types") { - withTempPath { dir => - Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath) - - val dataTypes = - Seq( - StringType, - BooleanType, - ByteType, - BinaryType, - ShortType, - IntegerType, - LongType, - FloatType, - DoubleType, - DecimalType(25, 5), - DateType, - TimestampType) - - // TODO: support `NullType` here, after we add the support in `ColumnarBatchRow` - val constantValues = - Seq( - UTF8String.fromString("a string"), - true, - 1.toByte, - "Spark SQL".getBytes, - 2.toShort, - 3, - Long.MaxValue, - 0.25.toFloat, - 0.75d, - Decimal("1234.23456"), - DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")), - DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"))) - - dataTypes.zip(constantValues).foreach { case (dt, v) => - val schema = StructType(StructField("pcol", dt) :: Nil) - val conf = SQLConf.get - val partitionValues = new GenericInternalRow(Array(v)) - val file = dir - .listFiles(new FileFilter { - override def accept(pathname: File): Boolean = - pathname.isFile && pathname.toString.endsWith("parquet") - }) - .head - val reader = new BatchReader( - file.toString, - CometConf.COMET_BATCH_SIZE.get(conf), - schema, - partitionValues) - reader.init() - - try { - reader.nextBatch() - val batch = reader.currentBatch() - val actual = batch.getRow(0).get(1, dt) - val expected = v - if (dt.isInstanceOf[BinaryType]) { - assert( - actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]]) - } else { - assert(actual == expected) - } - } finally { - reader.close() - } - } - } - } - test("partition columns - multiple batch") { withSQLConf( CometConf.COMET_BATCH_SIZE.key -> 7.toString, @@ -1535,116 +1462,6 @@ abstract class ParquetReadSuite extends CometTestBase { } } - test("test pre-fetching multiple files") { - def makeRawParquetFile( - path: Path, - dictionaryEnabled: Boolean, - n: Int, - pageSize: Int): Seq[Option[Int]] = { - val schemaStr = - """ - |message root { - | optional boolean _1; - | optional int32 _2(INT_8); - | optional int32 _3(INT_16); - | optional int32 _4; - | optional int64 _5; - | optional float _6; - | optional double _7; - | optional binary _8(UTF8); - | optional int32 _9(UINT_8); - | optional int32 _10(UINT_16); - | optional int32 _11(UINT_32); - | optional int64 _12(UINT_64); - | optional binary _13(ENUM); - |} - """.stripMargin - - val schema = MessageTypeParser.parseMessageType(schemaStr) - val writer = createParquetWriter( - schema, - path, - dictionaryEnabled = dictionaryEnabled, - pageSize = pageSize, - dictionaryPageSize = pageSize) - - val rand = new scala.util.Random(42) - val expected = (0 until n).map { i => - if (rand.nextBoolean()) { - None - } else { - Some(i) - } - } - expected.foreach { opt => - val record = new SimpleGroup(schema) - opt match { - case Some(i) => - record.add(0, i % 2 == 0) - record.add(1, i.toByte) - record.add(2, i.toShort) - record.add(3, i) - record.add(4, i.toLong) - record.add(5, i.toFloat) - record.add(6, i.toDouble) - record.add(7, i.toString * 48) - record.add(8, (-i).toByte) - record.add(9, (-i).toShort) - record.add(10, -i) - record.add(11, (-i).toLong) - record.add(12, i.toString) - case _ => - } - writer.write(record) - } - - writer.close() - expected - } - - val conf = new Configuration() - conf.set("spark.comet.scan.preFetch.enabled", "true"); - conf.set("spark.comet.scan.preFetch.threadNum", "4"); - - withTempDir { dir => - val threadPool = CometPrefetchThreadPool.getOrCreateThreadPool(2) - - val readers = (0 to 10).map { idx => - val path = new Path(dir.toURI.toString, s"part-r-$idx.parquet") - makeRawParquetFile(path, dictionaryEnabled = false, 10000, 500) - - val reader = new BatchReader(conf, path.toString, 1000, null, null) - reader.submitPrefetchTask(threadPool) - - reader - } - - // Wait for all pre-fetch tasks - readers.foreach { reader => - val task = reader.getPrefetchTask() - task.get() - } - - val totolRows = readers.map { reader => - val queue = reader.getPrefetchQueue() - var rowCount = 0L - - while (!queue.isEmpty) { - val rowGroup = queue.take().getLeft - rowCount += rowGroup.getRowCount - } - - reader.close() - - rowCount - }.sum - - readParquetFile(dir.toString) { df => - assert(df.count() == totolRows) - } - } - } - test("test merge scan range") { def makeRawParquetFile(path: Path, n: Int): Seq[Option[Int]] = { val dictionaryPageSize = 1024 @@ -1753,23 +1570,6 @@ abstract class ParquetReadSuite extends CometTestBase { } } - override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit - pos: Position): Unit = { - Seq(true, false).foreach { prefetch => - val cometTestName = if (prefetch) { - testName + " (prefetch enabled)" - } else { - testName - } - - super.test(cometTestName, testTags: _*) { - withSQLConf(CometConf.COMET_SCAN_PREFETCH_ENABLED.key -> prefetch.toString) { - testFun - } - } - } - } - private def withId(id: Int) = new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() @@ -2036,11 +1836,7 @@ class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { val scans = collect(r.filter(f).queryExecution.executedPlan) { case p: CometBatchScanExec => p.scan } - if (CometConf.COMET_ENABLED.get()) { - assert(scans.nonEmpty && scans.forall(_.isInstanceOf[CometParquetScan])) - } else { - assert(!scans.exists(_.isInstanceOf[CometParquetScan])) - } + assert(scans.isEmpty) } } diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala index a349ab2b93..18dec68171 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.comet.CometConf -import org.apache.comet.parquet.CometParquetScan import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} /** @@ -127,11 +126,6 @@ class CometScanRuleSuite extends CometTestBase { if (cometEnabled) { assert(countOperators(transformedPlan, classOf[BatchScanExec]) == 0) assert(countOperators(transformedPlan, classOf[CometBatchScanExec]) == 1) - - // CometScanRule should have replaced the underlying scan - val scan = transformedPlan.collect { case scan: CometBatchScanExec => scan }.head - assert(scan.wrapped.scan.isInstanceOf[CometParquetScan]) - } else { assert(countOperators(transformedPlan, classOf[BatchScanExec]) == 1) assert(countOperators(transformedPlan, classOf[CometBatchScanExec]) == 0) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala index a2f196a4fc..5b0371b277 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector import org.apache.comet.{CometConf, WithHdfsCluster} -import org.apache.comet.parquet.BatchReader /** * Benchmark to measure Comet read performance. To run this benchmark: @@ -179,29 +178,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { } } - sqlBenchmark.addCase("ParquetReader Comet") { _ => - files.map(_.asInstanceOf[String]).foreach { p => - val reader = new BatchReader(p, vectorizedReaderBatchSize) - reader.init() - try { - var totalNumRows = 0 - while (reader.nextBatch()) { - val batch = reader.currentBatch() - val column = batch.column(0) - val numRows = batch.numRows() - var i = 0 - while (i < numRows) { - if (!column.isNullAt(i)) aggregateValue(column, i) - i += 1 - } - totalNumRows += batch.numRows() - } - } finally { - reader.close() - } - } - } - sqlBenchmark.run() } }