From 28deb2016ba799b7dc2ac2c1f7d8dc88275b2aa6 Mon Sep 17 00:00:00 2001 From: xuba Date: Thu, 8 Jan 2026 14:31:06 +0800 Subject: [PATCH 01/10] [WAP] Add JDK 17 compatibility --- .../amoro-mixed-flink-common/pom.xml | 24 +++++- .../amoro/flink/read/TestFlinkSource.java | 3 + .../flink/read/TestMixedFormatSource.java | 3 + .../apache/amoro/flink/table/TestKeyed.java | 3 + .../flink/table/TestLookupSecondary.java | 3 + .../apache/amoro/flink/table/TestUnkeyed.java | 3 + .../flink/table/TestUnkeyedOverwrite.java | 3 + .../flink/write/TestAdaptHiveWriter.java | 3 + amoro-format-paimon/pom.xml | 9 ++ pom.xml | 82 ++++++++++++++++++- 10 files changed, 134 insertions(+), 2 deletions(-) diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml index 60d5d7b364..6ab95facc5 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml @@ -324,6 +324,14 @@ test + + org.apache.amoro + amoro-common + ${project.version} + test-jar + test + + org.apache.amoro amoro-mixed-hive @@ -422,7 +430,21 @@ org.apache.amoro.listener.AmoroRunListener - -verbose:class + ${surefire.excludedGroups.jdk} + -verbose:class + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/sun.nio.cs=ALL-UNNAMED + --add-opens=java.base/sun.security.action=ALL-UNNAMED + --add-opens=java.base/sun.util.calendar=ALL-UNNAMED diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java index ef5d08a68b..2af22bc1c8 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java @@ -29,6 +29,7 @@ import org.apache.amoro.flink.table.FlinkSource; import org.apache.amoro.flink.table.MixedFormatTableLoader; import org.apache.amoro.flink.util.DataUtil; +import org.apache.amoro.testutils.FailsOnJava17; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.CheckpointingMode; @@ -49,6 +50,7 @@ import org.apache.iceberg.io.WriteResult; import org.junit.Assert; import org.junit.Test; +import org.junit.experimental.categories.Category; import java.io.IOException; import java.time.LocalDateTime; @@ -63,6 +65,7 @@ import java.util.Optional; import java.util.Set; +@Category(FailsOnJava17.class) public class TestFlinkSource extends FlinkTestBase { protected static final FileFormat FILE_FORMAT = diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java index ecdf4c47e9..78d48b7702 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java @@ -41,6 +41,7 @@ import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableIdentifier; import org.apache.amoro.table.UnkeyedTable; +import org.apache.amoro.testutils.FailsOnJava17; import org.apache.amoro.utils.TableFileUtil; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; @@ -86,6 +87,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +106,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +@Category(FailsOnJava17.class) public class TestMixedFormatSource extends TestRowDataReaderFunction implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(TestMixedFormatSource.class); private static final long serialVersionUID = 7418812854449034756L; diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestKeyed.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestKeyed.java index 05fc24eb23..4fdee6b75c 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestKeyed.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestKeyed.java @@ -38,6 +38,7 @@ import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.hive.catalog.HiveTableTestHelper; import org.apache.amoro.table.TableProperties; +import org.apache.amoro.testutils.FailsOnJava17; import org.apache.commons.collections.CollectionUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.ApiExpression; @@ -57,6 +58,7 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import org.junit.runner.RunWith; @@ -77,6 +79,7 @@ import java.util.Set; @RunWith(Parameterized.class) +@Category(FailsOnJava17.class) public class TestKeyed extends FlinkTestBase { public static final Logger LOG = LoggerFactory.getLogger(TestKeyed.class); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java index 659f4e955f..80b8af79d2 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java @@ -26,6 +26,7 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableIdentifier; +import org.apache.amoro.testutils.FailsOnJava17; import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.data.RowData; @@ -39,6 +40,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.experimental.categories.Category; import java.io.IOException; import java.util.ArrayList; @@ -48,6 +50,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +@Category(FailsOnJava17.class) public class TestLookupSecondary extends CatalogITCaseBase implements FlinkTaskWriterBaseTest { private String db; diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyed.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyed.java index ccab05a761..80e3c4898a 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyed.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyed.java @@ -39,6 +39,7 @@ import org.apache.amoro.mixed.MixedFormatCatalog; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableIdentifier; +import org.apache.amoro.testutils.FailsOnJava17; import org.apache.flink.table.api.ApiExpression; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; @@ -57,6 +58,7 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -74,6 +76,7 @@ import java.util.Set; @RunWith(Parameterized.class) +@Category(FailsOnJava17.class) public class TestUnkeyed extends FlinkTestBase { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyedOverwrite.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyedOverwrite.java index fcb092e3d6..2f762adf06 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyedOverwrite.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyedOverwrite.java @@ -28,6 +28,7 @@ import org.apache.amoro.hive.TestHMS; import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.hive.catalog.HiveTableTestHelper; +import org.apache.amoro.testutils.FailsOnJava17; import org.apache.flink.table.api.ApiExpression; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; @@ -38,6 +39,7 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -49,6 +51,7 @@ import java.util.List; @RunWith(Parameterized.class) +@Category(FailsOnJava17.class) public class TestUnkeyedOverwrite extends FlinkTestBase { private static final Logger LOGGER = LoggerFactory.getLogger(TestUnkeyedOverwrite.class); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAdaptHiveWriter.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAdaptHiveWriter.java index 057765df1c..f6cc6b0212 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAdaptHiveWriter.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAdaptHiveWriter.java @@ -36,6 +36,7 @@ import org.apache.amoro.table.LocationKind; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.WriteOperationKind; +import org.apache.amoro.testutils.FailsOnJava17; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -54,6 +55,7 @@ import org.junit.Assume; import org.junit.ClassRule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -69,6 +71,7 @@ import java.util.stream.Collectors; @RunWith(Parameterized.class) +@Category(FailsOnJava17.class) public class TestAdaptHiveWriter extends TableTestBase { @ClassRule public static TestHMS TEST_HMS = new TestHMS(); diff --git a/amoro-format-paimon/pom.xml b/amoro-format-paimon/pom.xml index 87b29c4282..ff98e9559f 100644 --- a/amoro-format-paimon/pom.xml +++ b/amoro-format-paimon/pom.xml @@ -78,4 +78,13 @@ + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + diff --git a/pom.xml b/pom.xml index 506b887538..b75887430d 100644 --- a/pom.xml +++ b/pom.xml @@ -171,6 +171,26 @@ compile compile provided + + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/sun.nio.cs=ALL-UNNAMED + --add-opens=java.base/sun.security.action=ALL-UNNAMED + --add-opens=java.base/sun.util.calendar=ALL-UNNAMED + + --add-exports=java.base/sun.net.util=ALL-UNNAMED + --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED + --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED + + -XX:+UseG1GC -Xms256m -XX:+IgnoreUnrecognizedVMOptions ${jvm.module.opens} @@ -1370,6 +1390,23 @@ log4j-1.2-api test + + + javax.xml.bind + jaxb-api + 2.3.1 + + + org.glassfish.jaxb + jaxb-runtime + 2.3.9 + + + + javax.annotation + javax.annotation-api + 1.3.2 + @@ -1491,6 +1528,8 @@ ${maven-surefire-plugin.version} false + ${amoro.surefire.baseArgLine} + ${surefire.excludedGroups.jdk} @@ -1864,12 +1903,53 @@ java11 - [11,) + [11,17) 11 + + java17 + + 17 + + + 17 + 17 + org.apache.amoro.testutils.FailsOnJava17 + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + --add-exports=java.base/sun.net.util=ALL-UNNAMED + --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED + --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 17 + 17 + + --add-exports=java.base/sun.net.util=ALL-UNNAMED + --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED + --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED + + + + + + + spark-3.3 From 6b4b4010b3e613fc204e66de9049bbfc07216f8b Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 9 Jan 2026 16:11:49 +0800 Subject: [PATCH 02/10] [WAP] Update CI configuration to support JDK 17 --- .github/workflows/core-hadoop3-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml index e93a32aa4f..a6105ebf8a 100644 --- a/.github/workflows/core-hadoop3-ci.yml +++ b/.github/workflows/core-hadoop3-ci.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - jdk: [ '11' ] + jdk: [ '11' '17'] spark: [ '3.3','3.4', '3.5' ] name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }} steps: From b6fd760f871c3fd3fee22540cca82dca7e2df6a8 Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 9 Jan 2026 16:42:13 +0800 Subject: [PATCH 03/10] [WAP] Add FailsOnJava17 marker interface to exclude tests on Java 17 --- .../apache/amoro/testutils/FailsOnJava17.java | 36 +++++++++++++++++++ pom.xml | 17 --------- 2 files changed, 36 insertions(+), 17 deletions(-) create mode 100644 amoro-common/src/test/java/org/apache/amoro/testutils/FailsOnJava17.java diff --git a/amoro-common/src/test/java/org/apache/amoro/testutils/FailsOnJava17.java b/amoro-common/src/test/java/org/apache/amoro/testutils/FailsOnJava17.java new file mode 100644 index 0000000000..8e9b2ed593 --- /dev/null +++ b/amoro-common/src/test/java/org/apache/amoro/testutils/FailsOnJava17.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.testutils; + +/** + * Marker interface for tests that fail on Java 17. + * + *

Tests annotated with this interface will be excluded when running on Java 17. Use this for + * tests that depend on internal JDK APIs that have been removed or restricted in Java 17. + * + *

Usage: + * + *

{@code
+ * @Category(FailsOnJava17.class)
+ * public class MyTest {
+ *     // test code
+ * }
+ * }
+ */ +public interface FailsOnJava17 {} diff --git a/pom.xml b/pom.xml index b75887430d..20d74e95ea 100644 --- a/pom.xml +++ b/pom.xml @@ -1390,23 +1390,6 @@ log4j-1.2-api test - - - javax.xml.bind - jaxb-api - 2.3.1 - - - org.glassfish.jaxb - jaxb-runtime - 2.3.9 - - - - javax.annotation - javax.annotation-api - 1.3.2 - From ad233daf19e1ad065c151b01252eb2615082ba47 Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 9 Jan 2026 16:50:40 +0800 Subject: [PATCH 04/10] Update SLF4J dependency to version 2.0.17 and adjust README for JDK compatibility --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e07afe7b0c..4746c444e1 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,7 @@ Amoro contains modules as below: ## Building -Amoro is built using Maven with JDK 8, 11 and 17(required for `amoro-format-mixed/amoro-mixed-trino` module). +Amoro is built using Maven with JDK 11 and 17(required for `amoro-format-mixed/amoro-mixed-trino` module, experimental for other modules). * Build all modules without `amoro-mixed-trino`: `./mvnw clean package` * Build and skip tests: `./mvnw clean package -DskipTests` From a40347124d7987b551c31f2ccb3e0c1a0dab855a Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 9 Jan 2026 16:51:56 +0800 Subject: [PATCH 05/10] Fix CI configuration to properly format JDK matrix in core-hadoop3-ci.yml --- .github/workflows/core-hadoop3-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml index a6105ebf8a..cbf93ac426 100644 --- a/.github/workflows/core-hadoop3-ci.yml +++ b/.github/workflows/core-hadoop3-ci.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - jdk: [ '11' '17'] + jdk: [ '11' '17' ] spark: [ '3.3','3.4', '3.5' ] name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }} steps: From 4fc9437c3379b42c673aa313a4d6cc4326ce4777 Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 9 Jan 2026 16:52:45 +0800 Subject: [PATCH 06/10] Fix CI configuration to properly format JDK matrix in core-hadoop3-ci.yml --- .github/workflows/core-hadoop3-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml index cbf93ac426..004f179afe 100644 --- a/.github/workflows/core-hadoop3-ci.yml +++ b/.github/workflows/core-hadoop3-ci.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - jdk: [ '11' '17' ] + jdk: [ '11', '17' ] spark: [ '3.3','3.4', '3.5' ] name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }} steps: From 9bcc4a5b50b48ec77ec76503a618c28f561fb1ef Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 17 Mar 2026 22:41:56 +0800 Subject: [PATCH 07/10] Refactor flink module for JDK 17 support --- .github/workflows/core-hadoop2-ci.yml | 2 +- .../apache/amoro/testutils/FailsOnJava17.java | 36 --- amoro-format-hudi/pom.xml | 2 - .../amoro-mixed-flink-common/pom.xml | 15 +- .../KerberosInvocationHandler.java | 1 - .../flink/lookup/BasicLookupFunction.java | 19 +- .../reader/MixedFormatSourceReader.java | 51 ++-- .../apache/amoro/flink/table/FlinkSource.java | 33 ++- .../flink/table/KerberosAwareInputFormat.java | 165 ++++++++++++ .../flink/table/MixedFormatTableLoader.java | 247 +++++++++++++++++- .../UnkeyedInputFormatOperatorFactory.java | 8 +- .../table/UnkeyedStreamingReaderOperator.java | 167 ++++++++++++ .../flink/util/FlinkClassReflectionUtil.java | 65 ----- .../amoro/flink/util/IcebergClassUtil.java | 107 +++----- .../apache/amoro/flink/write/FlinkSink.java | 8 +- .../amoro/flink/read/TestFlinkSource.java | 3 - .../flink/read/TestMixedFormatSource.java | 3 - .../apache/amoro/flink/table/TestKeyed.java | 3 - .../flink/table/TestLookupSecondary.java | 3 - .../apache/amoro/flink/table/TestUnkeyed.java | 3 - .../flink/table/TestUnkeyedOverwrite.java | 3 - .../amoro/flink/table/TestWatermark.java | 66 +---- .../flink/write/TestAdaptHiveWriter.java | 3 - .../write/TestMixedFormatFileWriter.java | 11 +- amoro-format-mixed/amoro-mixed-trino/pom.xml | 1 - pom.xml | 5 +- 26 files changed, 711 insertions(+), 319 deletions(-) delete mode 100644 amoro-common/src/test/java/org/apache/amoro/testutils/FailsOnJava17.java create mode 100644 amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java create mode 100644 amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java delete mode 100644 amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java diff --git a/.github/workflows/core-hadoop2-ci.yml b/.github/workflows/core-hadoop2-ci.yml index ad891366c5..b292c044fa 100644 --- a/.github/workflows/core-hadoop2-ci.yml +++ b/.github/workflows/core-hadoop2-ci.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - jdk: [ '11' ] + jdk: [ '11' , '17' ] name: Build Amoro with JDK ${{ matrix.jdk }} steps: - uses: actions/checkout@v3 diff --git a/amoro-common/src/test/java/org/apache/amoro/testutils/FailsOnJava17.java b/amoro-common/src/test/java/org/apache/amoro/testutils/FailsOnJava17.java deleted file mode 100644 index 8e9b2ed593..0000000000 --- a/amoro-common/src/test/java/org/apache/amoro/testutils/FailsOnJava17.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.testutils; - -/** - * Marker interface for tests that fail on Java 17. - * - *

Tests annotated with this interface will be excluded when running on Java 17. Use this for - * tests that depend on internal JDK APIs that have been removed or restricted in Java 17. - * - *

Usage: - * - *

{@code
- * @Category(FailsOnJava17.class)
- * public class MyTest {
- *     // test code
- * }
- * }
- */ -public interface FailsOnJava17 {} diff --git a/amoro-format-hudi/pom.xml b/amoro-format-hudi/pom.xml index ace84c0a17..7e40a26959 100644 --- a/amoro-format-hudi/pom.xml +++ b/amoro-format-hudi/pom.xml @@ -30,8 +30,6 @@ Amoro Project Hudi Format - 8 - 8 UTF-8 diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml index 6ab95facc5..44b7277abc 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml @@ -431,20 +431,7 @@
${surefire.excludedGroups.jdk} - -verbose:class - --add-opens=java.base/java.lang=ALL-UNNAMED - --add-opens=java.base/java.lang.invoke=ALL-UNNAMED - --add-opens=java.base/java.lang.reflect=ALL-UNNAMED - --add-opens=java.base/java.io=ALL-UNNAMED - --add-opens=java.base/java.net=ALL-UNNAMED - --add-opens=java.base/java.nio=ALL-UNNAMED - --add-opens=java.base/java.util=ALL-UNNAMED - --add-opens=java.base/java.util.concurrent=ALL-UNNAMED - --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED - --add-opens=java.base/sun.nio.ch=ALL-UNNAMED - --add-opens=java.base/sun.nio.cs=ALL-UNNAMED - --add-opens=java.base/sun.security.action=ALL-UNNAMED - --add-opens=java.base/sun.util.calendar=ALL-UNNAMED + ${amoro.surefire.baseArgLine}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java index 25dce7fb0d..7349a61cba 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java @@ -56,7 +56,6 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl authenticatedFileIO.doAs( () -> { try { - method.setAccessible(true); return method.invoke(obj, args); } catch (Throwable e) { throw new RuntimeException(e); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java index 114245de93..3acce39b1c 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java @@ -30,7 +30,6 @@ import org.apache.amoro.table.MixedTable; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.util.FlinkRuntimeException; @@ -45,12 +44,10 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Field; import java.util.List; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -244,20 +241,16 @@ private void checkErrorAndRethrow() { } private String generateRocksDBPath(FunctionContext context, String tableName) { - String tmpPath = getTmpDirectoryFromTMContainer(context); + String tmpPath = getTmpDirectory(context); File db = new File(tmpPath, tableName + "-lookup-" + UUID.randomUUID()); return db.toString(); } - private static String getTmpDirectoryFromTMContainer(FunctionContext context) { - try { - Field field = context.getClass().getDeclaredField("context"); - field.setAccessible(true); - StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) field.get(context); - String[] tmpDirectories = runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories(); - return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)]; - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException(e); + private static String getTmpDirectory(FunctionContext context) { + String configuredTmpDir = context.getJobParameter("java.io.tmpdir", null); + if (configuredTmpDir != null && !configuredTmpDir.isEmpty()) { + return configuredTmpDir; } + return System.getProperty("java.io.tmpdir"); } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java index caa7b6f837..654100d31f 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java @@ -23,11 +23,9 @@ import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplit; import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplitState; import org.apache.amoro.flink.read.hybrid.split.SplitRequestEvent; -import org.apache.amoro.flink.util.FlinkClassReflectionUtil; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceOutput; @@ -35,13 +33,13 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.core.io.InputStatus; -import org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks; import org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; /** @@ -132,25 +130,21 @@ public ReaderOutput wrapOutput(ReaderOutput output) { return new MixedFormatReaderOutput<>(output); } - /** - * There is a case that the watermark in {@link WatermarkOutputMultiplexer.OutputState} has been - * updated, but watermark has not been emitted for that when {@link - * WatermarkOutputMultiplexer#onPeriodicEmit} called, the outputState has been removed by {@link - * WatermarkOutputMultiplexer#unregisterOutput(String)} after split finished. Wrap {@link - * ReaderOutput} to call {@link - * ProgressiveTimestampsAndWatermarks.SplitLocalOutputs#emitPeriodicWatermark()} when split - * finishes. - */ + /** Wrap split outputs so we can flush any pending periodic watermark before release. */ static class MixedFormatReaderOutput implements ReaderOutput { private final ReaderOutput internal; + private final SourceOutputWithWatermarks watermarkOutput; + private final Map> splitOutputs = new HashMap<>(); + @SuppressWarnings("unchecked") public MixedFormatReaderOutput(ReaderOutput readerOutput) { Preconditions.checkArgument( readerOutput instanceof SourceOutputWithWatermarks, "readerOutput should be SourceOutputWithWatermarks, but was %s", readerOutput.getClass()); this.internal = readerOutput; + this.watermarkOutput = (SourceOutputWithWatermarks) readerOutput; } @Override @@ -180,14 +174,41 @@ public void markActive() { @Override public SourceOutput createOutputForSplit(String splitId) { - return internal.createOutputForSplit(splitId); + SourceOutput splitOutput = internal.createOutputForSplit(splitId); + splitOutputs.put(splitId, splitOutput); + return splitOutput; } @Override public void releaseOutputForSplit(String splitId) { - Object splitLocalOutput = FlinkClassReflectionUtil.getSplitLocalOutput(internal); - FlinkClassReflectionUtil.emitPeriodWatermark(splitLocalOutput); + emitPeriodicWatermark(splitOutputs.remove(splitId)); internal.releaseOutputForSplit(splitId); } + + private void emitPeriodicWatermark(SourceOutput splitOutput) { + if (splitOutput == null) { + return; + } + + if (splitOutput instanceof SourceOutputWithWatermarks) { + ((SourceOutputWithWatermarks) splitOutput).emitPeriodicWatermark(); + return; + } + + try { + java.lang.reflect.Method method = + splitOutput.getClass().getDeclaredMethod("emitPeriodicWatermark"); + method.setAccessible(true); + method.invoke(splitOutput); + return; + } catch (ReflectiveOperationException e) { + LOGGER.debug( + "Failed to invoke emitPeriodicWatermark on split output {}, fallback to reader output", + splitOutput.getClass(), + e); + } + + watermarkOutput.emitPeriodicWatermark(); + } } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java index 3e4080e8a8..2d0f5fe44a 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -49,7 +48,9 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.source.FlinkInputFormat; @@ -243,37 +244,55 @@ public DataStream buildUnkeyedTableSource(String scanStartupMode) { .properties(properties) .flinkConf(flinkConf) .limit(limit); + Long startSnapshotId = null; if (MixedFormatValidator.SCAN_STARTUP_MODE_LATEST.equalsIgnoreCase(scanStartupMode)) { Optional startSnapshotOptional = Optional.ofNullable(tableLoader.loadTable().currentSnapshot()); if (startSnapshotOptional.isPresent()) { Snapshot snapshot = startSnapshotOptional.get(); + startSnapshotId = snapshot.snapshotId(); LOG.info( "Get starting snapshot id {} based on scan startup mode {}", snapshot.snapshotId(), scanStartupMode); - builder.startSnapshotId(snapshot.snapshotId()); + builder.startSnapshotId(startSnapshotId); } } DataStream origin = builder.build(); - return wrapKrb(origin).assignTimestampsAndWatermarks(watermarkStrategy); + return wrapKrb(origin, startSnapshotId).assignTimestampsAndWatermarks(watermarkStrategy); } /** extract op from dataStream, and wrap krb support */ - private DataStream wrapKrb(DataStream ds) { + private DataStream wrapKrb(DataStream ds, Long startSnapshotId) { IcebergClassUtil.clean(env); Transformation origin = ds.getTransformation(); int scanParallelism = flinkConf .getOptional(MixedFormatValidator.SCAN_PARALLELISM) .orElse(origin.getParallelism()); + Table table = mixedTable.asUnkeyedTable(); + Schema projectedIcebergSchema = + projectedSchema == null + ? mixedTable.schema() + : FlinkSchemaUtil.convert( + mixedTable.schema(), + org.apache.amoro.flink.FlinkSchemaUtil.filterWatermark(projectedSchema)); if (origin instanceof OneInputTransformation) { OneInputTransformation tf = (OneInputTransformation) ds.getTransformation(); - OneInputStreamOperatorFactory op = (OneInputStreamOperatorFactory) tf.getOperatorFactory(); ProxyFactory inputFormatProxyFactory = - IcebergClassUtil.getInputFormatProxyFactory(op, mixedTable.io(), mixedTable.schema()); + IcebergClassUtil.getInputFormatProxyFactory( + tableLoader, + table, + mixedTable.io(), + mixedTable.schema(), + projectedIcebergSchema, + flinkConf, + properties, + filters, + limit, + startSnapshotId); if (tf.getInputs().isEmpty()) { return env.addSource( @@ -305,7 +324,7 @@ private DataStream wrapKrb(DataStream ds) { (InputFormatSourceFunction) IcebergClassUtil.getSourceFunction(source); InputFormat inputFormatProxy = - (InputFormat) ProxyUtil.getProxy(function.getFormat(), mixedTable.io()); + new KerberosAwareInputFormat<>(function.getFormat(), mixedTable.io()); DataStreamSource sourceStream = env.createInput(inputFormatProxy, tfSource.getOutputType()) .setParallelism(scanParallelism); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java new file mode 100644 index 0000000000..92d3687d99 --- /dev/null +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.flink.table; + +import org.apache.amoro.io.AuthenticatedFileIO; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; + +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + * A concrete {@link InputFormat} wrapper that runs delegate calls inside {@link + * AuthenticatedFileIO#doAs(Callable)} without using JDK dynamic proxies. + */ +public class KerberosAwareInputFormat extends RichInputFormat { + private static final long serialVersionUID = 1L; + + private final InputFormat delegate; + private final AuthenticatedFileIO authenticatedFileIO; + + public KerberosAwareInputFormat( + InputFormat delegate, AuthenticatedFileIO authenticatedFileIO) { + this.delegate = delegate; + this.authenticatedFileIO = authenticatedFileIO; + } + + @Override + public void configure(org.apache.flink.configuration.Configuration parameters) { + authenticatedFileIO.doAs( + () -> { + delegate.configure(parameters); + return null; + }); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + try { + return authenticatedFileIO.doAs(() -> delegate.getStatistics(cachedStatistics)); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public T[] createInputSplits(int minNumSplits) throws IOException { + try { + return authenticatedFileIO.doAs(() -> delegate.createInputSplits(minNumSplits)); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public InputSplitAssigner getInputSplitAssigner(T[] inputSplits) { + return authenticatedFileIO.doAs(() -> delegate.getInputSplitAssigner(inputSplits)); + } + + @Override + public void open(T split) throws IOException { + try { + authenticatedFileIO.doAs( + () -> { + delegate.open(split); + return null; + }); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public boolean reachedEnd() throws IOException { + try { + return authenticatedFileIO.doAs(delegate::reachedEnd); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public OT nextRecord(OT reuse) throws IOException { + try { + return authenticatedFileIO.doAs(() -> delegate.nextRecord(reuse)); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public void close() throws IOException { + try { + authenticatedFileIO.doAs( + () -> { + delegate.close(); + return null; + }); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public void openInputFormat() throws IOException { + if (!(delegate instanceof RichInputFormat)) { + return; + } + RichInputFormat richInputFormat = (RichInputFormat) delegate; + richInputFormat.setRuntimeContext(getRuntimeContext()); + try { + authenticatedFileIO.doAs( + () -> { + richInputFormat.openInputFormat(); + return null; + }); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public void closeInputFormat() throws IOException { + if (!(delegate instanceof RichInputFormat)) { + return; + } + RichInputFormat richInputFormat = (RichInputFormat) delegate; + try { + authenticatedFileIO.doAs( + () -> { + richInputFormat.closeInputFormat(); + return null; + }); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + private IOException unwrapIOException(RuntimeException exception) { + Throwable cause = exception.getCause(); + if (cause instanceof IOException) { + return (IOException) cause; + } + throw exception; + } +} diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java index d7282739fb..d8a79243c7 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java @@ -21,14 +21,33 @@ import org.apache.amoro.flink.InternalCatalogBuilder; import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions; import org.apache.amoro.flink.interceptor.FlinkTablePropertiesInvocationHandler; +import org.apache.amoro.hive.table.SupportHive; import org.apache.amoro.mixed.MixedFormatCatalog; import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; +import org.apache.amoro.table.BasicKeyedTable; +import org.apache.amoro.table.BasicUnkeyedTable; +import org.apache.amoro.table.ChangeTable; +import org.apache.amoro.table.KeyedTable; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableIdentifier; +import org.apache.amoro.table.UnkeyedTable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.RowDelta; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.util.StructLikeMap; import java.io.IOException; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -93,7 +112,7 @@ protected MixedFormatTableLoader( Boolean loadBaseForKeyedTable) { this.catalogBuilder = catalogBuilder; this.tableIdentifier = tableIdentifier; - this.flinkTableProperties = flinkTableProperties; + this.flinkTableProperties = new HashMap<>(flinkTableProperties); this.loadBaseForKeyedTable = loadBaseForKeyedTable == null || loadBaseForKeyedTable; } @@ -108,10 +127,8 @@ public boolean isOpen() { } public MixedTable loadMixedFormatTable() { - return ((MixedTable) - new FlinkTablePropertiesInvocationHandler( - flinkTableProperties, mixedFormatCatalog.loadTable(tableIdentifier)) - .getProxy()); + MixedTable table = mixedFormatCatalog.loadTable(tableIdentifier); + return wrapWithFlinkTableProperties(table, flinkTableProperties); } public void switchLoadInternalTableForKeyedTable(boolean loadBaseForKeyedTable) { @@ -142,6 +159,13 @@ public TableLoader clone() { tableIdentifier, catalogBuilder, flinkTableProperties, loadBaseForKeyedTable); } + public MixedFormatTableLoader copyWithFlinkTableProperties(Map extraProperties) { + Map merged = new HashMap<>(flinkTableProperties); + merged.putAll(extraProperties); + return new MixedFormatTableLoader( + tableIdentifier, catalogBuilder, merged, loadBaseForKeyedTable); + } + @Override public void close() throws IOException {} @@ -149,4 +173,217 @@ public void close() throws IOException {} public String toString() { return MoreObjects.toStringHelper(this).add("tableIdentifier", tableIdentifier).toString(); } + + @VisibleForTesting + static MixedTable wrapWithFlinkTableProperties( + MixedTable mixedTable, Map flinkTableProperties) { + if (flinkTableProperties == null || flinkTableProperties.isEmpty()) { + return mixedTable; + } + + if (mixedTable instanceof SupportHive) { + return (MixedTable) + new FlinkTablePropertiesInvocationHandler(flinkTableProperties, mixedTable).getProxy(); + } + + if (mixedTable.isUnkeyedTable()) { + return new FlinkTablePropertiesUnkeyedTable( + mixedTable.asUnkeyedTable(), flinkTableProperties); + } + + if (mixedTable.isKeyedTable()) { + return new FlinkTablePropertiesKeyedTable(mixedTable.asKeyedTable(), flinkTableProperties); + } + + return mixedTable; + } + + private static class FlinkTablePropertiesSupport implements Serializable { + private static final long serialVersionUID = 1L; + + protected final Map flinkTableProperties; + + protected FlinkTablePropertiesSupport(Map flinkTableProperties) { + this.flinkTableProperties = new HashMap<>(flinkTableProperties); + } + + protected Map withFlinkTableProperties(Map tableProperties) { + Map merged = new HashMap<>(tableProperties); + merged.putAll(flinkTableProperties); + return merged; + } + } + + private static class FlinkTablePropertiesUnkeyedTable extends BasicUnkeyedTable + implements UnkeyedTable { + private static final long serialVersionUID = 1L; + + private final UnkeyedTable delegate; + private final FlinkTablePropertiesSupport propertiesSupport; + + private FlinkTablePropertiesUnkeyedTable( + UnkeyedTable delegate, Map flinkTableProperties) { + super(delegate.id(), delegate, delegate.io(), null); + this.delegate = delegate; + this.propertiesSupport = new FlinkTablePropertiesSupport(flinkTableProperties); + } + + @Override + public Map properties() { + return propertiesSupport.withFlinkTableProperties(delegate.properties()); + } + + @Override + public void refresh() { + delegate.refresh(); + } + + @Override + public UpdateSchema updateSchema() { + return delegate.updateSchema(); + } + + @Override + public AppendFiles newAppend() { + return delegate.newAppend(); + } + + @Override + public AppendFiles newFastAppend() { + return delegate.newFastAppend(); + } + + @Override + public RewriteFiles newRewrite() { + return delegate.newRewrite(); + } + + @Override + public OverwriteFiles newOverwrite() { + return delegate.newOverwrite(); + } + + @Override + public RowDelta newRowDelta() { + return delegate.newRowDelta(); + } + + @Override + public ReplacePartitions newReplacePartitions() { + return delegate.newReplacePartitions(); + } + + @Override + public DeleteFiles newDelete() { + return delegate.newDelete(); + } + + @Override + public ExpireSnapshots expireSnapshots() { + return delegate.expireSnapshots(); + } + + @Override + public ManageSnapshots manageSnapshots() { + return delegate.manageSnapshots(); + } + + @Override + public Transaction newTransaction() { + return delegate.newTransaction(); + } + + @Override + public StructLikeMap> partitionProperty() { + return delegate.partitionProperty(); + } + + @Override + public org.apache.amoro.op.UpdatePartitionProperties updatePartitionProperties( + Transaction transaction) { + return delegate.updatePartitionProperties(transaction); + } + } + + private static class FlinkTablePropertiesKeyedTable extends BasicKeyedTable + implements KeyedTable { + private static final long serialVersionUID = 1L; + + private final KeyedTable delegate; + private final FlinkTablePropertiesSupport propertiesSupport; + + private FlinkTablePropertiesKeyedTable( + KeyedTable delegate, Map flinkTableProperties) { + super( + delegate.location(), + delegate.primaryKeySpec(), + new FlinkTablePropertiesBaseTable(delegate.baseTable(), flinkTableProperties), + new FlinkTablePropertiesChangeTable(delegate.changeTable(), flinkTableProperties)); + this.delegate = delegate; + this.propertiesSupport = new FlinkTablePropertiesSupport(flinkTableProperties); + } + + @Override + public Map properties() { + return propertiesSupport.withFlinkTableProperties(delegate.properties()); + } + + @Override + public void refresh() { + delegate.refresh(); + } + } + + private static class FlinkTablePropertiesBaseTable extends BasicKeyedTable.BaseInternalTable { + private static final long serialVersionUID = 1L; + + private final UnkeyedTable delegate; + private final FlinkTablePropertiesSupport propertiesSupport; + + private FlinkTablePropertiesBaseTable( + UnkeyedTable delegate, Map flinkTableProperties) { + super(delegate.id(), delegate, delegate.io(), null); + this.delegate = delegate; + this.propertiesSupport = new FlinkTablePropertiesSupport(flinkTableProperties); + } + + @Override + public Map properties() { + return propertiesSupport.withFlinkTableProperties(delegate.properties()); + } + + @Override + public void refresh() { + delegate.refresh(); + } + } + + private static class FlinkTablePropertiesChangeTable extends BasicKeyedTable.ChangeInternalTable { + private static final long serialVersionUID = 1L; + + private final ChangeTable delegate; + private final FlinkTablePropertiesSupport propertiesSupport; + + private FlinkTablePropertiesChangeTable( + ChangeTable delegate, Map flinkTableProperties) { + super(delegate.id(), delegate, delegate.io(), null); + this.delegate = delegate; + this.propertiesSupport = new FlinkTablePropertiesSupport(flinkTableProperties); + } + + @Override + public Map properties() { + return propertiesSupport.withFlinkTableProperties(delegate.properties()); + } + + @Override + public void refresh() { + delegate.refresh(); + } + + @Override + public org.apache.amoro.scan.ChangeTableIncrementalScan newScan() { + return delegate.newScan(); + } + } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java index 644ef1f807..ad0ce22694 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java @@ -19,7 +19,6 @@ package org.apache.amoro.flink.table; import org.apache.amoro.flink.interceptor.ProxyFactory; -import org.apache.amoro.flink.util.IcebergClassUtil; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; @@ -29,7 +28,6 @@ import org.apache.flink.table.data.RowData; import org.apache.iceberg.flink.source.FlinkInputFormat; import org.apache.iceberg.flink.source.FlinkInputSplit; -import org.apache.iceberg.flink.source.StreamingReaderOperator; public class UnkeyedInputFormatOperatorFactory extends AbstractStreamOperatorFactory implements YieldingOperatorFactory, @@ -52,8 +50,8 @@ public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { @Override public > O createStreamOperator( StreamOperatorParameters parameters) { - StreamingReaderOperator operator = - IcebergClassUtil.newStreamingReaderOperator( + UnkeyedStreamingReaderOperator operator = + new UnkeyedStreamingReaderOperator( factory.getInstance(), processingTimeService, mailboxExecutor); operator.setup( parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); @@ -62,6 +60,6 @@ public > O createStreamOperator( @Override public Class getStreamOperatorClass(ClassLoader classLoader) { - return StreamingReaderOperator.class; + return UnkeyedStreamingReaderOperator.class; } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java new file mode 100644 index 0000000000..6cca6c776f --- /dev/null +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.flink.table; + +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.state.JavaSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamSourceContexts; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.function.ThrowingRunnable; +import org.apache.iceberg.flink.source.FlinkInputFormat; +import org.apache.iceberg.flink.source.FlinkInputSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Queue; + +/** Minimal reader operator to avoid depending on Iceberg's non-public StreamingReaderOperator. */ +public class UnkeyedStreamingReaderOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(UnkeyedStreamingReaderOperator.class); + + private final MailboxExecutor executor; + private FlinkInputFormat format; + private transient SourceFunction.SourceContext sourceContext; + private transient ListState inputSplitsState; + private transient Queue splits; + private transient SplitState currentSplitState; + + public UnkeyedStreamingReaderOperator( + FlinkInputFormat format, ProcessingTimeService timeService, MailboxExecutor mailboxExecutor) { + this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null."); + this.processingTimeService = timeService; + this.executor = + Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null."); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + inputSplitsState = + context + .getOperatorStateStore() + .getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>())); + currentSplitState = SplitState.IDLE; + splits = Lists.newLinkedList(); + if (context.isRestored()) { + int taskIdx = getRuntimeContext().getIndexOfThisSubtask(); + LOG.info("Restoring state for the {} (taskIdx: {}).", getClass().getSimpleName(), taskIdx); + for (FlinkInputSplit split : inputSplitsState.get()) { + splits.add(split); + } + } + + sourceContext = + StreamSourceContexts.getSourceContext( + getOperatorConfig().getTimeCharacteristic(), + getProcessingTimeService(), + new Object(), + output, + getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(), + -1L, + true); + enqueueProcessSplits(); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + inputSplitsState.clear(); + inputSplitsState.addAll(Lists.newArrayList(splits)); + } + + @Override + public void processElement(StreamRecord element) { + splits.add(element.getValue()); + enqueueProcessSplits(); + } + + private void enqueueProcessSplits() { + if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) { + currentSplitState = SplitState.RUNNING; + executor.execute( + (ThrowingRunnable) this::processSplits, getClass().getSimpleName()); + } + } + + private void processSplits() throws IOException { + FlinkInputSplit split = splits.poll(); + if (split == null) { + currentSplitState = SplitState.IDLE; + return; + } + + format.open(split); + try { + RowData next = null; + while (!format.reachedEnd()) { + next = format.nextRecord(next); + sourceContext.collect(next); + } + } finally { + currentSplitState = SplitState.IDLE; + format.close(); + } + enqueueProcessSplits(); + } + + @Override + public void processWatermark(Watermark mark) {} + + @Override + public void close() throws Exception { + super.close(); + if (format != null) { + format.close(); + format.closeInputFormat(); + format = null; + } + sourceContext = null; + } + + @Override + public void finish() throws Exception { + super.finish(); + output.close(); + if (sourceContext != null) { + sourceContext.emitWatermark(Watermark.MAX_WATERMARK); + sourceContext.close(); + sourceContext = null; + } + } + + private enum SplitState { + IDLE, + RUNNING + } +} diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java deleted file mode 100644 index 6b181b510d..0000000000 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.flink.util; - -import org.apache.flink.api.connector.source.ReaderOutput; -import org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -/** A util class to handle the reflection operation of Flink class. */ -public class FlinkClassReflectionUtil { - - public static final Logger LOG = LoggerFactory.getLogger(FlinkClassReflectionUtil.class); - - public static Object getSplitLocalOutput(ReaderOutput readerOutput) { - if (readerOutput == null) { - return null; - } - try { - return ReflectionUtil.getField( - (Class) ProgressiveTimestampsAndWatermarks.class.getDeclaredClasses()[2], - readerOutput, - "splitLocalOutputs"); - } catch (Exception e) { - LOG.warn("extract internal watermark error", e); - } - return null; - } - - public static void emitPeriodWatermark(@Nullable Object splitLocalOutput) { - if (splitLocalOutput == null) { - return; - } - try { - Method method = - ProgressiveTimestampsAndWatermarks.class.getDeclaredClasses()[1].getDeclaredMethod( - "emitPeriodicWatermark"); - method.setAccessible(true); - method.invoke(splitLocalOutput); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - LOG.warn("no method found", e); - } - } -} diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java index 8dcc3eb1bc..da6c7e816c 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java @@ -20,31 +20,29 @@ import org.apache.amoro.flink.interceptor.ProxyFactory; import org.apache.amoro.io.AuthenticatedFileIO; -import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.flink.source.FlinkInputFormat; import org.apache.iceberg.flink.source.ScanContext; -import org.apache.iceberg.flink.source.StreamingReaderOperator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.util.ThreadPools; import java.lang.reflect.Constructor; -import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.List; @@ -52,8 +50,6 @@ /** An util generates Apache Iceberg writer and committer operator w */ public class IcebergClassUtil { - private static final String ICEBERG_SCAN_CONTEXT_CLASS = - "org.apache.iceberg.flink.source.ScanContext"; private static final String ICEBERG_PARTITION_SELECTOR_CLASS = "org.apache.iceberg.flink.sink.PartitionKeySelector"; private static final String ICEBERG_FILE_COMMITTER_CLASS = @@ -66,7 +62,6 @@ public static KeySelector newPartitionKeySelector( try { Class clazz = forName(ICEBERG_PARTITION_SELECTOR_CLASS); Constructor c = clazz.getConstructor(PartitionSpec.class, Schema.class, RowType.class); - c.setAccessible(true); return (KeySelector) c.newInstance(spec, schema, flinkSchema); } catch (NoSuchMethodException | IllegalAccessException @@ -129,51 +124,22 @@ public static ProxyFactory getIcebergStreamWriterProxyFa new Object[] {fullTableName, taskWriterFactory}); } - public static StreamingReaderOperator newStreamingReaderOperator( - FlinkInputFormat format, ProcessingTimeService timeService, MailboxExecutor mailboxExecutor) { - try { - Constructor c = - StreamingReaderOperator.class.getDeclaredConstructor( - FlinkInputFormat.class, ProcessingTimeService.class, MailboxExecutor.class); - c.setAccessible(true); - return c.newInstance(format, timeService, mailboxExecutor); - } catch (IllegalAccessException - | NoSuchMethodException - | InvocationTargetException - | InstantiationException e) { - throw new RuntimeException(e); - } - } - - public static FlinkInputFormat getInputFormat(OneInputStreamOperatorFactory operatorFactory) { - try { - Class[] classes = StreamingReaderOperator.class.getDeclaredClasses(); - Class clazz = null; - for (Class c : classes) { - if ("OperatorFactory".equals(c.getSimpleName())) { - clazz = c; - break; - } - } - Field field = clazz.getDeclaredField("format"); - field.setAccessible(true); - return (FlinkInputFormat) (field.get(operatorFactory)); - } catch (IllegalAccessException | NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - public static ProxyFactory getInputFormatProxyFactory( - OneInputStreamOperatorFactory operatorFactory, + TableLoader tableLoader, + Table table, AuthenticatedFileIO authenticatedFileIO, - Schema tableSchema) { - FlinkInputFormat inputFormat = getInputFormat(operatorFactory); - TableLoader tableLoader = - ReflectionUtil.getField(FlinkInputFormat.class, inputFormat, "tableLoader"); - FileIO io = ReflectionUtil.getField(FlinkInputFormat.class, inputFormat, "io"); - EncryptionManager encryption = - ReflectionUtil.getField(FlinkInputFormat.class, inputFormat, "encryption"); - Object context = ReflectionUtil.getField(FlinkInputFormat.class, inputFormat, "context"); + Schema tableSchema, + Schema projectedSchema, + ReadableConfig flinkConf, + Map properties, + List filters, + long limit, + Long startSnapshotId) { + FileIO io = table.io(); + EncryptionManager encryption = table.encryption(); + ScanContext context = + buildScanContext( + table, projectedSchema, flinkConf, properties, filters, limit, startSnapshotId); return ProxyUtil.getProxyFactory( FlinkInputFormat.class, @@ -184,6 +150,29 @@ public static ProxyFactory getInputFormatProxyFactory( new Object[] {tableLoader, tableSchema, io, encryption, context}); } + private static ScanContext buildScanContext( + Table table, + Schema projectedSchema, + ReadableConfig flinkConf, + Map properties, + List filters, + long limit, + Long startSnapshotId) { + ScanContext.Builder contextBuilder = + ScanContext.builder().resolveConfig(table, properties, flinkConf); + if (projectedSchema != null) { + contextBuilder.project(projectedSchema); + } + if (filters != null) { + contextBuilder.filters(filters); + } + contextBuilder.limit(limit); + if (startSnapshotId != null) { + contextBuilder.startSnapshotId(startSnapshotId); + } + return contextBuilder.build(); + } + private static Class forName(String className) { try { return Class.forName(className); @@ -193,22 +182,10 @@ private static Class forName(String className) { } public static SourceFunction getSourceFunction(AbstractUdfStreamOperator source) { - try { - Field field = AbstractUdfStreamOperator.class.getDeclaredField("userFunction"); - field.setAccessible(true); - return (SourceFunction) (field.get(source)); - } catch (IllegalAccessException | NoSuchFieldException e) { - throw new RuntimeException(e); - } + return (SourceFunction) source.getUserFunction(); } public static void clean(StreamExecutionEnvironment env) { - try { - Field field = StreamExecutionEnvironment.class.getDeclaredField("transformations"); - field.setAccessible(true); - ((List) (field.get(env))).clear(); - } catch (IllegalAccessException | NoSuchFieldException e) { - throw new RuntimeException(e); - } + env.getTransformations().clear(); } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java index 38f0ea532e..6bf9bd7a2b 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java @@ -43,7 +43,6 @@ import org.apache.amoro.flink.util.CompatibleFlinkPropertyUtil; import org.apache.amoro.flink.util.IcebergClassUtil; import org.apache.amoro.flink.util.MixedFormatUtils; -import org.apache.amoro.flink.util.ProxyUtil; import org.apache.amoro.table.DistributionHashMode; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableProperties; @@ -435,10 +434,7 @@ public static OneInputStreamOperator createFileCommitter( return null; } tableLoader.switchLoadInternalTableForKeyedTable(MixedFormatUtils.isToBase(overwrite)); - return (OneInputStreamOperator) - ProxyUtil.getProxy( - IcebergClassUtil.newIcebergFilesCommitter( - tableLoader, overwrite, branch, spec, mixedTable.io()), - mixedTable.io()); + return IcebergClassUtil.newIcebergFilesCommitter( + tableLoader, overwrite, branch, spec, mixedTable.io()); } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java index 2af22bc1c8..ef5d08a68b 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java @@ -29,7 +29,6 @@ import org.apache.amoro.flink.table.FlinkSource; import org.apache.amoro.flink.table.MixedFormatTableLoader; import org.apache.amoro.flink.util.DataUtil; -import org.apache.amoro.testutils.FailsOnJava17; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.CheckpointingMode; @@ -50,7 +49,6 @@ import org.apache.iceberg.io.WriteResult; import org.junit.Assert; import org.junit.Test; -import org.junit.experimental.categories.Category; import java.io.IOException; import java.time.LocalDateTime; @@ -65,7 +63,6 @@ import java.util.Optional; import java.util.Set; -@Category(FailsOnJava17.class) public class TestFlinkSource extends FlinkTestBase { protected static final FileFormat FILE_FORMAT = diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java index 78d48b7702..ecdf4c47e9 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java @@ -41,7 +41,6 @@ import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableIdentifier; import org.apache.amoro.table.UnkeyedTable; -import org.apache.amoro.testutils.FailsOnJava17; import org.apache.amoro.utils.TableFileUtil; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; @@ -87,7 +86,6 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +104,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; -@Category(FailsOnJava17.class) public class TestMixedFormatSource extends TestRowDataReaderFunction implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(TestMixedFormatSource.class); private static final long serialVersionUID = 7418812854449034756L; diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestKeyed.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestKeyed.java index 4fdee6b75c..05fc24eb23 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestKeyed.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestKeyed.java @@ -38,7 +38,6 @@ import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.hive.catalog.HiveTableTestHelper; import org.apache.amoro.table.TableProperties; -import org.apache.amoro.testutils.FailsOnJava17; import org.apache.commons.collections.CollectionUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.ApiExpression; @@ -58,7 +57,6 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import org.junit.runner.RunWith; @@ -79,7 +77,6 @@ import java.util.Set; @RunWith(Parameterized.class) -@Category(FailsOnJava17.class) public class TestKeyed extends FlinkTestBase { public static final Logger LOG = LoggerFactory.getLogger(TestKeyed.class); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java index 80b8af79d2..659f4e955f 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java @@ -26,7 +26,6 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableIdentifier; -import org.apache.amoro.testutils.FailsOnJava17; import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.data.RowData; @@ -40,7 +39,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.experimental.categories.Category; import java.io.IOException; import java.util.ArrayList; @@ -50,7 +48,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -@Category(FailsOnJava17.class) public class TestLookupSecondary extends CatalogITCaseBase implements FlinkTaskWriterBaseTest { private String db; diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyed.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyed.java index 80e3c4898a..ccab05a761 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyed.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyed.java @@ -39,7 +39,6 @@ import org.apache.amoro.mixed.MixedFormatCatalog; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableIdentifier; -import org.apache.amoro.testutils.FailsOnJava17; import org.apache.flink.table.api.ApiExpression; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; @@ -58,7 +57,6 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -76,7 +74,6 @@ import java.util.Set; @RunWith(Parameterized.class) -@Category(FailsOnJava17.class) public class TestUnkeyed extends FlinkTestBase { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyedOverwrite.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyedOverwrite.java index 2f762adf06..fcb092e3d6 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyedOverwrite.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyedOverwrite.java @@ -28,7 +28,6 @@ import org.apache.amoro.hive.TestHMS; import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.hive.catalog.HiveTableTestHelper; -import org.apache.amoro.testutils.FailsOnJava17; import org.apache.flink.table.api.ApiExpression; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; @@ -39,7 +38,6 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -51,7 +49,6 @@ import java.util.List; @RunWith(Parameterized.class) -@Category(FailsOnJava17.class) public class TestUnkeyedOverwrite extends FlinkTestBase { private static final Logger LOGGER = LoggerFactory.getLogger(TestUnkeyedOverwrite.class); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java index ae0dbe8c77..35b789bb87 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java @@ -30,16 +30,8 @@ import org.apache.amoro.flink.util.TestUtil; import org.apache.amoro.table.KeyedTable; import org.apache.amoro.table.TableIdentifier; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.GenericRowData; @@ -57,8 +49,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.ArrayList; @@ -68,12 +58,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; public class TestWatermark extends FlinkTestBase { - public static final Logger LOG = LoggerFactory.getLogger(TestWatermark.class); - @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); private static final String DB = TableTestHelper.TEST_TABLE_ID.getDatabase(); @@ -147,17 +133,17 @@ public void testWatermark() throws Exception { "create table d (tt as cast(op_time as timestamp(3)), watermark for tt as tt) like %s", table); - Table source = getTableEnv().sqlQuery("select is_true from d"); - - WatermarkTestOperator op = new WatermarkTestOperator(); - getTableEnv() - .toRetractStream(source, RowData.class) - .transform("test watermark", TypeInformation.of(RowData.class), op); - getEnv().executeAsync("test watermark"); - - op.waitWatermark(); - - Assert.assertTrue(op.watermark > Long.MIN_VALUE); + // This query verifies that a table with watermark definition can still be consumed + // correctly. We intentionally avoid waiting on an async watermark callback here because + // that path depends on internal source/operator timing and can hang the test without + // revealing a user-visible regression. + TableResult result = exec("select is_true from d"); + CommonTestUtils.waitUntilJobManagerIsInitialized( + () -> result.getJobClient().get().getJobStatus().get()); + try (CloseableIterator iterator = result.collect()) { + Assert.assertEquals(Row.of(true), iterator.next()); + } + result.getJobClient().ifPresent(TestUtil::cancelJob); } @Test @@ -226,34 +212,4 @@ public void testSelectWatermarkField() throws Exception { expected.add(new Object[] {true, LocalDateTime.parse("2022-06-17T10:08:11")}); Assert.assertEquals(DataUtil.toRowSet(expected), actual); } - - public static class WatermarkTestOperator extends AbstractStreamOperator - implements OneInputStreamOperator, RowData> { - - private static final long serialVersionUID = 1L; - public long watermark; - private static final CompletableFuture waitWatermark = new CompletableFuture<>(); - - public WatermarkTestOperator() { - super(); - chainingStrategy = ChainingStrategy.ALWAYS; - } - - private void waitWatermark() throws InterruptedException, ExecutionException { - waitWatermark.get(); - } - - @Override - public void processElement(StreamRecord> element) throws Exception { - output.collect(element.asRecord()); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - LOG.info("processWatermark: {}", mark); - watermark = mark.getTimestamp(); - waitWatermark.complete(null); - super.processWatermark(mark); - } - } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAdaptHiveWriter.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAdaptHiveWriter.java index f6cc6b0212..057765df1c 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAdaptHiveWriter.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAdaptHiveWriter.java @@ -36,7 +36,6 @@ import org.apache.amoro.table.LocationKind; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.WriteOperationKind; -import org.apache.amoro.testutils.FailsOnJava17; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -55,7 +54,6 @@ import org.junit.Assume; import org.junit.ClassRule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -71,7 +69,6 @@ import java.util.stream.Collectors; @RunWith(Parameterized.class) -@Category(FailsOnJava17.class) public class TestAdaptHiveWriter extends TableTestBase { @ClassRule public static TestHMS TEST_HMS = new TestHMS(); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java index 889fd74e1b..7f7e1fd81c 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java @@ -99,9 +99,12 @@ public TestMixedFormatFileWriter(boolean isKeyed, boolean submitEmptySnapshots) boolean submitEmptySnapshots, Long restoredCheckpointId) throws Exception { - tableLoader.open(); - MixedTable mixedTable = tableLoader.loadMixedFormatTable(); - mixedTable.properties().put(SUBMIT_EMPTY_SNAPSHOTS.key(), String.valueOf(submitEmptySnapshots)); + HashMap extraProperties = new HashMap<>(); + extraProperties.put(SUBMIT_EMPTY_SNAPSHOTS.key(), String.valueOf(submitEmptySnapshots)); + MixedFormatTableLoader writerTableLoader = + tableLoader.copyWithFlinkTableProperties(extraProperties); + writerTableLoader.open(); + MixedTable mixedTable = writerTableLoader.loadMixedFormatTable(); MixedFormatFileWriter streamWriter = FlinkSink.createFileWriter( @@ -109,7 +112,7 @@ public TestMixedFormatFileWriter(boolean isKeyed, boolean submitEmptySnapshots) null, false, (RowType) FLINK_SCHEMA.toRowDataType().getLogicalType(), - tableLoader); + writerTableLoader); TestOneInputStreamOperatorIntern harness = new TestOneInputStreamOperatorIntern<>( streamWriter, 1, 1, 0, restoredCheckpointId, new TestGlobalAggregateManager()); diff --git a/amoro-format-mixed/amoro-mixed-trino/pom.xml b/amoro-format-mixed/amoro-mixed-trino/pom.xml index 202d74a0b6..a91f845fe1 100644 --- a/amoro-format-mixed/amoro-mixed-trino/pom.xml +++ b/amoro-format-mixed/amoro-mixed-trino/pom.xml @@ -653,7 +653,6 @@ 17 - sun diff --git a/pom.xml b/pom.xml index 20d74e95ea..fb14d8c2c9 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ 2.12.0 3.20.0 3.12.0 - 2.2.2 + 3.3.0 5.7.0 4.11.0 1.21.4 @@ -190,7 +190,7 @@ --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED - -XX:+UseG1GC -Xms256m -XX:+IgnoreUnrecognizedVMOptions ${jvm.module.opens} + -XX:+UseG1GC -Xms256m -XX:+IgnoreUnrecognizedVMOptions ${jvm.module.opens} ${jvm.module.exports} @@ -1900,7 +1900,6 @@ 17 17 - org.apache.amoro.testutils.FailsOnJava17 From a6b8b4a6784acf2e5dc2cdc8f7ff056b049c9400 Mon Sep 17 00:00:00 2001 From: xuba Date: Wed, 18 Mar 2026 00:32:06 +0800 Subject: [PATCH 08/10] Stabilize continuous optimizing tests on Java 17 --- amoro-common/pom.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/amoro-common/pom.xml b/amoro-common/pom.xml index dd85842c4e..7dd5a42652 100644 --- a/amoro-common/pom.xml +++ b/amoro-common/pom.xml @@ -202,6 +202,14 @@ + + org.apache.maven.plugins + maven-checkstyle-plugin + + src/main/gen-java/** + + + org.apache.maven.plugins maven-surefire-plugin From 0f4a8b5a2fe1d38f826b0d2ab03607a33641754f Mon Sep 17 00:00:00 2001 From: xuba Date: Wed, 18 Mar 2026 01:26:53 +0800 Subject: [PATCH 09/10] Update TestAutomaticLogWriter to use truncated LocalDateTime for consistency in time calculations --- .../amoro/flink/write/TestAutomaticLogWriter.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java index 37150e3563..af93227027 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java @@ -80,6 +80,7 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -172,26 +173,27 @@ public void testHasCaughtUp() throws Exception { LocalDateTime.parse("2022-06-18 10:10:11", dtf) }); List catchUpExpects = new LinkedList<>(); + LocalDateTime catchUpBaseTime = LocalDateTime.now().truncatedTo(ChronoUnit.MICROS); catchUpExpects.add( new Object[] { 1000014, "d", - LocalDateTime.now().minusSeconds(3).toEpochSecond(ZoneOffset.UTC), - LocalDateTime.now().minusSeconds(3) + catchUpBaseTime.minusSeconds(3).toEpochSecond(ZoneOffset.UTC), + catchUpBaseTime.minusSeconds(3) }); catchUpExpects.add( new Object[] { 1000021, "d", - LocalDateTime.now().minusSeconds(2).toEpochSecond(ZoneOffset.UTC), - LocalDateTime.now().minusSeconds(2) + catchUpBaseTime.minusSeconds(2).toEpochSecond(ZoneOffset.UTC), + catchUpBaseTime.minusSeconds(2) }); catchUpExpects.add( new Object[] { 1000015, "e", - LocalDateTime.now().minusSeconds(1).toEpochSecond(ZoneOffset.UTC), - LocalDateTime.now().minusSeconds(1) + catchUpBaseTime.minusSeconds(1).toEpochSecond(ZoneOffset.UTC), + catchUpBaseTime.minusSeconds(1) }); expects.addAll(catchUpExpects); From 7907aae9db721e0369778dc2323abaec4353f82f Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 24 Mar 2026 18:00:19 +0800 Subject: [PATCH 10/10] Remove reflective invocation of emitPeriodicWatermark in MixedFormatSourceReader --- .../read/hybrid/reader/MixedFormatSourceReader.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java index 654100d31f..eea767ae8a 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java @@ -195,19 +195,6 @@ private void emitPeriodicWatermark(SourceOutput splitOutput) { return; } - try { - java.lang.reflect.Method method = - splitOutput.getClass().getDeclaredMethod("emitPeriodicWatermark"); - method.setAccessible(true); - method.invoke(splitOutput); - return; - } catch (ReflectiveOperationException e) { - LOGGER.debug( - "Failed to invoke emitPeriodicWatermark on split output {}, fallback to reader output", - splitOutput.getClass(), - e); - } - watermarkOutput.emitPeriodicWatermark(); } }