diff --git a/.gitignore b/.gitignore index 292b9e2844ba3..d3741826bcf33 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ scalastyle-output.xml !.idea/vcs.xml !.idea/icon.png .vscode +.claude .metals .bloop .cursor diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java index 12bfc2d8e5a87..b5463d44267fe 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java @@ -31,6 +31,7 @@ import org.apache.flink.table.catalog.CatalogStoreHolder; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.factories.ApiFactoryUtil; import org.apache.flink.table.factories.CatalogStoreFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.TableFactoryUtil; @@ -390,9 +391,9 @@ private static CatalogManager buildCatalogManager( SessionEnvironment environment) { CatalogStoreFactory catalogStoreFactory = - TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, userClassLoader); + ApiFactoryUtil.findAndCreateCatalogStoreFactory(configuration, userClassLoader); CatalogStoreFactory.Context catalogStoreFactoryContext = - TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, userClassLoader); + ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, userClassLoader); catalogStoreFactory.open(catalogStoreFactoryContext); CatalogStoreHolder catalogStore = CatalogStoreHolder.newBuilder() diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index 2bba1d76d696c..810546ed6a6a5 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -46,6 +46,7 @@ import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.ApiFactoryUtil; import org.apache.flink.table.factories.CatalogStoreFactory; import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.factories.TableFactoryUtil; @@ -112,17 +113,11 @@ public static StreamTableEnvironment create( new ResourceManager(settings.getConfiguration(), userClassLoader); final ModuleManager moduleManager = new ModuleManager(); - final CatalogStoreFactory catalogStoreFactory = - TableFactoryUtil.findAndCreateCatalogStoreFactory( - settings.getConfiguration(), userClassLoader); - final CatalogStoreFactory.Context catalogStoreFactoryContext = - TableFactoryUtil.buildCatalogStoreFactoryContext( - settings.getConfiguration(), userClassLoader); - catalogStoreFactory.open(catalogStoreFactoryContext); - final CatalogStore catalogStore = - settings.getCatalogStore() != null - ? settings.getCatalogStore() - : catalogStoreFactory.createCatalogStore(); + final ApiFactoryUtil.CatalogStoreResult catalogStoreResult = + ApiFactoryUtil.getOrCreateCatalogStore( + settings.getCatalogStore(), settings.getConfiguration(), userClassLoader); + final CatalogStore catalogStore = catalogStoreResult.getCatalogStore(); + final CatalogStoreFactory catalogStoreFactory = catalogStoreResult.getCatalogStoreFactory(); final CatalogManager catalogManager = CatalogManager.newBuilder() diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java index 67fcfcec46309..e08704a9b99b0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -144,9 +144,8 @@ public ClassLoader getUserClassLoader() { } @Internal - @Nullable - public CatalogStore getCatalogStore() { - return catalogStore; + public Optional getCatalogStore() { + return Optional.ofNullable(catalogStore); } @Internal diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 6a40fa9c2384d..388b4f969dbf1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -83,6 +83,7 @@ import org.apache.flink.table.expressions.ModelReferenceExpression; import org.apache.flink.table.expressions.TableReferenceExpression; import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor; +import org.apache.flink.table.factories.ApiFactoryUtil; import org.apache.flink.table.factories.CatalogStoreFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.PlannerFactoryUtil; @@ -251,17 +252,11 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) { userClassLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER); final Executor executor = executorFactory.create(settings.getConfiguration()); - final CatalogStoreFactory catalogStoreFactory = - TableFactoryUtil.findAndCreateCatalogStoreFactory( - settings.getConfiguration(), userClassLoader); - final CatalogStoreFactory.Context context = - TableFactoryUtil.buildCatalogStoreFactoryContext( - settings.getConfiguration(), userClassLoader); - catalogStoreFactory.open(context); - final CatalogStore catalogStore = - settings.getCatalogStore() != null - ? settings.getCatalogStore() - : catalogStoreFactory.createCatalogStore(); + final ApiFactoryUtil.CatalogStoreResult catalogStoreResult = + ApiFactoryUtil.getOrCreateCatalogStore( + settings.getCatalogStore(), settings.getConfiguration(), userClassLoader); + final CatalogStore catalogStore = catalogStoreResult.getCatalogStore(); + final CatalogStoreFactory catalogStoreFactory = catalogStoreResult.getCatalogStoreFactory(); // use configuration to init table config final TableConfig tableConfig = TableConfig.getDefault(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java new file mode 100644 index 0000000000000..cb3b9189d4666 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.catalog.CommonCatalogOptions; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Optional; + +/** Utility for dealing with catalog store factories. */ +@Internal +public class ApiFactoryUtil { + + /** Result holder for catalog store and factory. */ + @Internal + public static class CatalogStoreResult { + private final CatalogStore catalogStore; + @Nullable private final CatalogStoreFactory catalogStoreFactory; + + public CatalogStoreResult( + CatalogStore catalogStore, @Nullable CatalogStoreFactory catalogStoreFactory) { + this.catalogStore = catalogStore; + this.catalogStoreFactory = catalogStoreFactory; + } + + public CatalogStore getCatalogStore() { + return catalogStore; + } + + @Nullable + public CatalogStoreFactory getCatalogStoreFactory() { + return catalogStoreFactory; + } + } + + /** + * Gets or creates a {@link CatalogStore}. If a catalog store is provided in settings, it will + * be used directly. Otherwise, a new catalog store will be created using the factory. + * + * @param providedCatalogStore the catalog store from settings, if present + * @param configuration the configuration + * @param classLoader the user classloader + * @return a result containing the catalog store and factory (factory is null if store was + * provided) + */ + public static CatalogStoreResult getOrCreateCatalogStore( + Optional providedCatalogStore, + Configuration configuration, + ClassLoader classLoader) { + if (providedCatalogStore.isPresent()) { + return new CatalogStoreResult(providedCatalogStore.get(), null); + } else { + CatalogStoreFactory catalogStoreFactory = + findAndCreateCatalogStoreFactory(configuration, classLoader); + CatalogStoreFactory.Context catalogStoreFactoryContext = + buildCatalogStoreFactoryContext(configuration, classLoader); + catalogStoreFactory.open(catalogStoreFactoryContext); + CatalogStore catalogStore = catalogStoreFactory.createCatalogStore(); + return new CatalogStoreResult(catalogStore, catalogStoreFactory); + } + } + + /** + * Finds and creates a {@link CatalogStoreFactory} using the provided {@link Configuration} and + * user classloader. + * + *

The configuration format should be as follows: + * + *

{@code
+     * table.catalog-store.kind: {identifier}
+     * table.catalog-store.{identifier}.{param1}: xxx
+     * table.catalog-store.{identifier}.{param2}: xxx
+     * }
+ */ + public static CatalogStoreFactory findAndCreateCatalogStoreFactory( + Configuration configuration, ClassLoader classLoader) { + String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND); + + CatalogStoreFactory catalogStoreFactory = + FactoryUtil.discoverFactory(classLoader, CatalogStoreFactory.class, identifier); + + return catalogStoreFactory; + } + + /** + * Build a {@link CatalogStoreFactory.Context} for opening the {@link CatalogStoreFactory}. + * + *

The configuration format should be as follows: + * + *

{@code
+     * table.catalog-store.kind: {identifier}
+     * table.catalog-store.{identifier}.{param1}: xxx
+     * table.catalog-store.{identifier}.{param2}: xxx
+     * }
+ */ + public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext( + Configuration configuration, ClassLoader classLoader) { + String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND); + String catalogStoreOptionPrefix = + CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + identifier + "."; + Map options = + new DelegatingConfiguration(configuration, catalogStoreOptionPrefix).toMap(); + CatalogStoreFactory.Context context = + new FactoryUtil.DefaultCatalogStoreContext(options, configuration, classLoader); + + return context; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index ad681b5439441..09e3951fe4053 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -19,14 +19,11 @@ package org.apache.flink.table.factories; import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.listener.CatalogModificationListener; @@ -42,10 +39,14 @@ import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -/** Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. */ +/** + * Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. + * + * @deprecated Use {@link FactoryUtil} instead. + */ +@Deprecated @Internal public class TableFactoryUtil { @@ -174,50 +175,4 @@ public ClassLoader getUserClassLoader() { })) .collect(Collectors.toList()); } - - /** - * Finds and creates a {@link CatalogStoreFactory} using the provided {@link Configuration} and - * user classloader. - * - *

The configuration format should be as follows: - * - *

{@code
-     * table.catalog-store.kind: {identifier}
-     * table.catalog-store.{identifier}.{param1}: xxx
-     * table.catalog-store.{identifier}.{param2}: xxx
-     * }
- */ - public static CatalogStoreFactory findAndCreateCatalogStoreFactory( - Configuration configuration, ClassLoader classLoader) { - String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND); - - CatalogStoreFactory catalogStoreFactory = - FactoryUtil.discoverFactory(classLoader, CatalogStoreFactory.class, identifier); - - return catalogStoreFactory; - } - - /** - * Build a {@link CatalogStoreFactory.Context} for opening the {@link CatalogStoreFactory}. - * - *

The configuration format should be as follows: - * - *

{@code
-     * table.catalog-store.kind: {identifier}
-     * table.catalog-store.{identifier}.{param1}: xxx
-     * table.catalog-store.{identifier}.{param2}: xxx
-     * }
- */ - public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext( - Configuration configuration, ClassLoader classLoader) { - String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND); - String catalogStoreOptionPrefix = - CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + identifier + "."; - Map options = - new DelegatingConfiguration(configuration, catalogStoreOptionPrefix).toMap(); - CatalogStoreFactory.Context context = - new FactoryUtil.DefaultCatalogStoreContext(options, configuration, classLoader); - - return context; - } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java new file mode 100644 index 0000000000000..c8e1908291b85 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.FileCatalogStoreFactory; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.File; +import java.util.Map; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ApiFactoryUtil}. */ +class ApiFactoryUtilTest { + + @ParameterizedTest(name = "kind={0}, expectedFactory={1}") + @MethodSource("catalogStoreFactoryTestParameters") + void testFindAndCreateCatalogStoreFactory(String kind, Class expectedFactoryClass) { + Configuration configuration = new Configuration(); + if (kind != null) { + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, kind); + } + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory factory = + ApiFactoryUtil.findAndCreateCatalogStoreFactory(configuration, classLoader); + + assertThat(factory).isInstanceOf(expectedFactoryClass); + } + + @Test + void testBuildCatalogStoreFactoryContext(@TempDir File tempFolder) { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "file"); + configuration.setString("table.catalog-store.file.path", tempFolder.getAbsolutePath()); + configuration.setString("table.catalog-store.file.option1", "value1"); + configuration.setString("table.catalog-store.file.option2", "value2"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf( + Map.of( + "path", tempFolder.getAbsolutePath(), + "option1", "value1", + "option2", "value2")); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildCatalogStoreFactoryContextWithGenericInMemory() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "generic_in_memory"); + configuration.setString("table.catalog-store.generic_in_memory.option1", "value1"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf(Map.of("option1", "value1")); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildCatalogStoreFactoryContextWithoutOptions() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "generic_in_memory"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()).isEmpty(); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildCatalogStoreFactoryContextOnlyExtractsRelevantOptions() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "file"); + configuration.setString("table.catalog-store.file.path", "/test/path"); + configuration.setString("table.catalog-store.file.option1", "value1"); + configuration.setString("table.catalog-store.other.irrelevant", "should-not-appear"); + configuration.setString("other.config.key", "should-not-appear"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf( + Map.of("path", "/test/path", "option1", "value1")); + } + + private static Stream catalogStoreFactoryTestParameters() { + return Stream.of( + Arguments.of("generic_in_memory", GenericInMemoryCatalogStoreFactory.class), + Arguments.of("file", FileCatalogStoreFactory.class), + Arguments.of(null, GenericInMemoryCatalogStoreFactory.class)); + } +} diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index 7e86c5bf256cd..c8cc4e99463d2 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -28,7 +28,7 @@ import org.apache.flink.table.catalog._ import org.apache.flink.table.connector.ChangelogMode import org.apache.flink.table.delegation.{Executor, Planner} import org.apache.flink.table.expressions.Expression -import org.apache.flink.table.factories.{PlannerFactoryUtil, TableFactoryUtil} +import org.apache.flink.table.factories.{ApiFactoryUtil, PlannerFactoryUtil, TableFactoryUtil} import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedFunctionHelper} import org.apache.flink.table.legacy.sources.TableSource import org.apache.flink.table.module.ModuleManager @@ -250,14 +250,19 @@ object StreamTableEnvironmentImpl { val resourceManager = new ResourceManager(settings.getConfiguration, userClassLoader) val moduleManager = new ModuleManager - val catalogStoreFactory = - TableFactoryUtil.findAndCreateCatalogStoreFactory(settings.getConfiguration, userClassLoader) - val catalogStoreFactoryContext = - TableFactoryUtil.buildCatalogStoreFactoryContext(settings.getConfiguration, userClassLoader) - catalogStoreFactory.open(catalogStoreFactoryContext) - val catalogStore = - if (settings.getCatalogStore != null) settings.getCatalogStore - else catalogStoreFactory.createCatalogStore() + val (catalogStore, catalogStoreFactory) = + if (settings.getCatalogStore.isPresent) { + (settings.getCatalogStore.get(), null) + } else { + val factory = + ApiFactoryUtil.findAndCreateCatalogStoreFactory( + settings.getConfiguration, + userClassLoader) + val factoryContext = + ApiFactoryUtil.buildCatalogStoreFactoryContext(settings.getConfiguration, userClassLoader) + factory.open(factoryContext) + (factory.createCatalogStore(), factory) + } val catalogManager = CatalogManager.newBuilder .classLoader(userClassLoader)