Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions .github/workflows/velox_backend_x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ jobs:
fail-fast: false
matrix:
os: [ "ubuntu:20.04", "ubuntu:22.04" ]
spark: [ "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0", "spark-4.1" ]
spark: [ "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0", "spark-4.1", "spark-4.2", "spark-main" ]
java: [ "java-8", "java-11", "java-17", "java-21" ]
# Spark supports JDK17 since 3.3.
exclude:
Expand Down Expand Up @@ -139,6 +139,14 @@ jobs:
java: java-8
- spark: spark-4.1
java: java-11
- spark: spark-4.2
java: java-8
- spark: spark-4.2
java: java-11
- spark: spark-main
java: java-8
- spark: spark-main
java: java-11

runs-on: ubuntu-22.04
container: ${{ matrix.os }}
Expand Down Expand Up @@ -197,7 +205,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/${{ matrix.java }}-openjdk-amd64
echo "JAVA_HOME: $JAVA_HOME"
case "${{ matrix.spark }}" in
spark-4.0|spark-4.1)
spark-4.0|spark-4.1|spark-4.2|spark-main)
$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pscala-2.13 -Pbackends-velox -DskipTests
;;
*)
Expand Down Expand Up @@ -240,7 +248,7 @@ jobs:
fail-fast: false
matrix:
os: [ "centos:8" ]
spark: [ "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0", "spark-4.1" ]
spark: [ "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0", "spark-4.1", "spark-4.2", "spark-main" ]
java: [ "java-8", "java-11", "java-17" ]
# Spark supports JDK17 since 3.3.
exclude:
Expand All @@ -260,6 +268,14 @@ jobs:
java: java-8
- spark: spark-4.1
java: java-11
- spark: spark-4.2
java: java-8
- spark: spark-4.2
java: java-11
- spark: spark-main
java: java-8
- spark: spark-main
java: java-11

runs-on: ubuntu-22.04
container: ${{ matrix.os }}
Expand Down Expand Up @@ -304,7 +320,7 @@ jobs:
echo "JAVA_HOME: $JAVA_HOME"
cd $GITHUB_WORKSPACE/
case "${{ matrix.spark }}" in
spark-4.0|spark-4.1)
spark-4.0|spark-4.1|spark-4.2|spark-main)
$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pscala-2.13 -Pbackends-velox -DskipTests
;;
*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public void testRoundTrip() {
JniFilesystem.WriteFile writeFile = fs.openFileForWrite(path);
try {
byte[] bytes = text.getBytes(StandardCharsets.UTF_8);
ByteBuffer buf = PlatformDependent.allocateDirectNoCleaner(bytes.length);
ByteBuffer buf = ByteBuffer.allocateDirect(bytes.length);
buf.put(bytes);
buf.flip();
writeFile.append(bytes.length, PlatformDependent.directBufferAddress(buf));
writeFile.flush();
fileSize = writeFile.size();
Expand All @@ -47,8 +48,10 @@ public void testRoundTrip() {

JniFilesystem.ReadFile readFile = fs.openFileForRead(path);
Assert.assertEquals(fileSize, readFile.size());
ByteBuffer buf = PlatformDependent.allocateDirectNoCleaner((int) fileSize);
ByteBuffer buf = ByteBuffer.allocateDirect((int) fileSize);
readFile.pread(0, fileSize, PlatformDependent.directBufferAddress(buf));
buf.limit((int) fileSize);
buf.position(0);
byte[] out = new byte[(int) fileSize];
buf.get(out);
String decoded = new String(out, StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ object ConverterUtils extends Logging {
sigName = sigName.concat(getTypeSigName(valueType))
sigName = sigName.concat(">")
sigName
case CharType(_) =>
case c: CharType =>
"fchar"
case NullType =>
"nothing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ColumnarShuffleManager(conf: SparkConf)
import ColumnarShuffleManager._

private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf)
override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
override lazy val shuffleBlockResolver = SparkIndexShuffleBlockResolverUtil.create(conf)

/** A mapping from shuffle ids to the number of mappers producing output for those shuffles. */
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]()
Expand Down
174 changes: 174 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,180 @@
</plugins>
</build>
</profile>
<profile>
<id>spark-4.2</id>
<properties>
<sparkbundle.version>4.2</sparkbundle.version>
<sparkshim.artifactId>spark-sql-columnar-shims-spark42</sparkshim.artifactId>
<spark.version>4.2.0-preview3</spark.version>
<iceberg.version>1.10.0</iceberg.version>
<delta.package.name>delta-spark</delta.package.name>
<delta.version>4.0.0</delta.version>
<delta.binary.version>40</delta.binary.version>
<hudi.version>1.1.0</hudi.version>
<paimon.version>1.3.0</paimon.version>
<fasterxml.version>2.18.2</fasterxml.version>
<fasterxml.jackson.version>2.18.2</fasterxml.jackson.version>
<fasterxml.jackson.databind.version>2.18.2</fasterxml.jackson.databind.version>
<hadoop.version>3.4.1</hadoop.version>
<antlr4.version>4.13.1</antlr4.version>
<guava.version>33.4.0-jre</guava.version>
<slf4j.version>2.0.16</slf4j.version>
<log4j.version>2.24.3</log4j.version>
<commons-lang3.version>3.17.0</commons-lang3.version>
<arrow.version>18.1.0</arrow.version>
<arrow-gluten.version>18.1.0</arrow-gluten.version>
<scala.compiler.version>4.9.5</scala.compiler.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<id>enforce-java-17+</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireActiveProfile>
<!-- Spark 4.2 requires Java 17+ -->
<profiles>java-17,java-21</profiles>
<all>false</all>
<message>"-P spark-4.2" requires JDK 17+</message>
</requireActiveProfile>
</rules>
</configuration>
</execution>
<execution>
<id>enforce-scala-213</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireActiveProfile>
<!-- Spark 4.2 requires Scala 2.13 -->
<profiles>scala-2.13</profiles>
<message>"-P spark-4.2" requires Scala 2.13</message>
</requireActiveProfile>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>spark-main</id>
<properties>
<sparkbundle.version>4.2</sparkbundle.version>
<sparkshim.artifactId>spark-sql-columnar-shims-sparkmain</sparkshim.artifactId>
<spark.version>4.2.0-SNAPSHOT</spark.version>
<iceberg.version>1.10.0</iceberg.version>
<delta.package.name>delta-spark</delta.package.name>
<delta.version>4.0.0</delta.version>
<delta.binary.version>40</delta.binary.version>
<hudi.version>1.1.0</hudi.version>
<paimon.version>1.3.0</paimon.version>
<fasterxml.version>2.18.2</fasterxml.version>
<fasterxml.jackson.version>2.18.2</fasterxml.jackson.version>
<fasterxml.jackson.databind.version>2.18.2</fasterxml.jackson.databind.version>
<hadoop.version>3.4.1</hadoop.version>
<antlr4.version>4.13.1</antlr4.version>
<guava.version>33.4.0-jre</guava.version>
<slf4j.version>2.0.16</slf4j.version>
<log4j.version>2.24.3</log4j.version>
<commons-lang3.version>3.17.0</commons-lang3.version>
<arrow.version>18.1.0</arrow.version>
<arrow-gluten.version>18.1.0</arrow-gluten.version>
<scala.compiler.version>4.9.9</scala.compiler.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
<id>apache-spark-snapshots</id>
<url>https://repository.apache.org/content/repositories/snapshots/org/apache/spark/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<id>enforce-java-17+</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireActiveProfile>
<!-- Spark main branch requires Java 17+ -->
<profiles>java-17,java-21</profiles>
<all>false</all>
<message>"-P spark-main" requires JDK 17+</message>
</requireActiveProfile>
</rules>
</configuration>
</execution>
<execution>
<id>enforce-scala-213</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireActiveProfile>
<!-- Spark main branch requires Scala 2.13 -->
<profiles>scala-2.13</profiles>
<message>"-P spark-main" requires Scala 2.13</message>
</requireActiveProfile>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop-2.7.4</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,10 @@ trait SparkShims {
* similar to LeftOuter. Default implementation returns false for Spark 3.x compatibility.
*/
def isLeftSingleJoinType(joinType: JoinType): Boolean = false

/**
* Extract seed value from SampleExec. In Spark 4.0+, seed is Option[Long], while in earlier
* versions it's Long. This method provides a unified interface across versions.
*/
def getSampleExecSeed(plan: SampleExec): Long
}
12 changes: 12 additions & 0 deletions shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@
<module>spark41</module>
</modules>
</profile>
<profile>
<id>spark-4.2</id>
<modules>
<module>spark42</module>
</modules>
</profile>
<profile>
<id>spark-main</id>
<modules>
<module>sparkmain</module>
</modules>
</profile>
<profile>
<id>default</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,9 @@ class Spark33Shims extends SparkShims {
assert(index >= 0)
args.substring(index + "isFinalPlan=".length).trim.toBoolean
}

override def getSampleExecSeed(plan: SampleExec): Long = {
// In Spark 3.3, seed is Long (not Option[Long])
plan.seed
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.shuffle

import org.apache.spark.TaskContext
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.sort.SortShuffleWriter

Expand All @@ -30,3 +30,9 @@ object SparkSortShuffleWriterUtil {
new SortShuffleWriter(handle, mapId, context, shuffleExecutorComponents)
}
}

object SparkIndexShuffleBlockResolverUtil {
def create(conf: SparkConf): IndexShuffleBlockResolver = {
new IndexShuffleBlockResolver(conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,9 @@ class Spark34Shims extends SparkShims {
override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
p.isFinalPlan
}

override def getSampleExecSeed(plan: SampleExec): Long = {
// In Spark 3.4, seed is Long (not Option[Long])
plan.seed
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.shuffle

import org.apache.spark.TaskContext
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.sort.SortShuffleWriter

Expand All @@ -30,3 +30,9 @@ object SparkSortShuffleWriterUtil {
new SortShuffleWriter(handle, mapId, context, writeMetrics, shuffleExecutorComponents)
}
}

object SparkIndexShuffleBlockResolverUtil {
def create(conf: SparkConf): IndexShuffleBlockResolver = {
new IndexShuffleBlockResolver(conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -595,4 +595,9 @@ class Spark35Shims extends SparkShims {
override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
p.isFinalPlan
}

override def getSampleExecSeed(plan: SampleExec): Long = {
// In Spark 3.5, seed is Long (not Option[Long])
plan.seed
}
}
Loading
Loading