Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ scalastyle-output.xml
!.idea/vcs.xml
!.idea/icon.png
.vscode
.claude
.metals
.bloop
.cursor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.catalog.CatalogStoreHolder;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.factories.ApiFactoryUtil;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableFactoryUtil;
Expand Down Expand Up @@ -390,9 +391,9 @@ private static CatalogManager buildCatalogManager(
SessionEnvironment environment) {

CatalogStoreFactory catalogStoreFactory =
TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, userClassLoader);
ApiFactoryUtil.findAndCreateCatalogStoreFactory(configuration, userClassLoader);
CatalogStoreFactory.Context catalogStoreFactoryContext =
TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, userClassLoader);
ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, userClassLoader);
catalogStoreFactory.open(catalogStoreFactoryContext);
CatalogStoreHolder catalogStore =
CatalogStoreHolder.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.ApiFactoryUtil;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.factories.TableFactoryUtil;
Expand Down Expand Up @@ -112,17 +113,11 @@ public static StreamTableEnvironment create(
new ResourceManager(settings.getConfiguration(), userClassLoader);
final ModuleManager moduleManager = new ModuleManager();

final CatalogStoreFactory catalogStoreFactory =
TableFactoryUtil.findAndCreateCatalogStoreFactory(
settings.getConfiguration(), userClassLoader);
final CatalogStoreFactory.Context catalogStoreFactoryContext =
TableFactoryUtil.buildCatalogStoreFactoryContext(
settings.getConfiguration(), userClassLoader);
catalogStoreFactory.open(catalogStoreFactoryContext);
final CatalogStore catalogStore =
settings.getCatalogStore() != null
? settings.getCatalogStore()
: catalogStoreFactory.createCatalogStore();
final ApiFactoryUtil.CatalogStoreResult catalogStoreResult =
ApiFactoryUtil.getOrCreateCatalogStore(
settings.getCatalogStore(), settings.getConfiguration(), userClassLoader);
final CatalogStore catalogStore = catalogStoreResult.getCatalogStore();
final CatalogStoreFactory catalogStoreFactory = catalogStoreResult.getCatalogStoreFactory();

final CatalogManager catalogManager =
CatalogManager.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,8 @@ public ClassLoader getUserClassLoader() {
}

@Internal
@Nullable
public CatalogStore getCatalogStore() {
return catalogStore;
public Optional<CatalogStore> getCatalogStore() {
return Optional.ofNullable(catalogStore);
}

@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.flink.table.expressions.ModelReferenceExpression;
import org.apache.flink.table.expressions.TableReferenceExpression;
import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
import org.apache.flink.table.factories.ApiFactoryUtil;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
Expand Down Expand Up @@ -251,17 +252,11 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
userClassLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
final Executor executor = executorFactory.create(settings.getConfiguration());

final CatalogStoreFactory catalogStoreFactory =
TableFactoryUtil.findAndCreateCatalogStoreFactory(
settings.getConfiguration(), userClassLoader);
final CatalogStoreFactory.Context context =
TableFactoryUtil.buildCatalogStoreFactoryContext(
settings.getConfiguration(), userClassLoader);
catalogStoreFactory.open(context);
final CatalogStore catalogStore =
settings.getCatalogStore() != null
? settings.getCatalogStore()
: catalogStoreFactory.createCatalogStore();
final ApiFactoryUtil.CatalogStoreResult catalogStoreResult =
ApiFactoryUtil.getOrCreateCatalogStore(
settings.getCatalogStore(), settings.getConfiguration(), userClassLoader);
final CatalogStore catalogStore = catalogStoreResult.getCatalogStore();
final CatalogStoreFactory catalogStoreFactory = catalogStoreResult.getCatalogStoreFactory();

// use configuration to init table config
final TableConfig tableConfig = TableConfig.getDefault();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.factories;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.catalog.CommonCatalogOptions;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Optional;

/** Utility for dealing with catalog store factories. */
@Internal
public class ApiFactoryUtil {

/** Result holder for catalog store and factory. */
@Internal
public static class CatalogStoreResult {
private final CatalogStore catalogStore;
@Nullable private final CatalogStoreFactory catalogStoreFactory;

public CatalogStoreResult(
CatalogStore catalogStore, @Nullable CatalogStoreFactory catalogStoreFactory) {
this.catalogStore = catalogStore;
this.catalogStoreFactory = catalogStoreFactory;
}

public CatalogStore getCatalogStore() {
return catalogStore;
}

@Nullable
public CatalogStoreFactory getCatalogStoreFactory() {
return catalogStoreFactory;
}
}

/**
* Gets or creates a {@link CatalogStore}. If a catalog store is provided in settings, it will
* be used directly. Otherwise, a new catalog store will be created using the factory.
*
* @param providedCatalogStore the catalog store from settings, if present
* @param configuration the configuration
* @param classLoader the user classloader
* @return a result containing the catalog store and factory (factory is null if store was
* provided)
*/
public static CatalogStoreResult getOrCreateCatalogStore(
Optional<CatalogStore> providedCatalogStore,
Configuration configuration,
ClassLoader classLoader) {
if (providedCatalogStore.isPresent()) {
return new CatalogStoreResult(providedCatalogStore.get(), null);
} else {
CatalogStoreFactory catalogStoreFactory =
findAndCreateCatalogStoreFactory(configuration, classLoader);
CatalogStoreFactory.Context catalogStoreFactoryContext =
buildCatalogStoreFactoryContext(configuration, classLoader);
catalogStoreFactory.open(catalogStoreFactoryContext);
CatalogStore catalogStore = catalogStoreFactory.createCatalogStore();
return new CatalogStoreResult(catalogStore, catalogStoreFactory);
}
}

/**
* Finds and creates a {@link CatalogStoreFactory} using the provided {@link Configuration} and
* user classloader.
*
* <p>The configuration format should be as follows:
*
* <pre>{@code
* table.catalog-store.kind: {identifier}
* table.catalog-store.{identifier}.{param1}: xxx
* table.catalog-store.{identifier}.{param2}: xxx
* }</pre>
*/
public static CatalogStoreFactory findAndCreateCatalogStoreFactory(
Configuration configuration, ClassLoader classLoader) {
String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);

CatalogStoreFactory catalogStoreFactory =
FactoryUtil.discoverFactory(classLoader, CatalogStoreFactory.class, identifier);

return catalogStoreFactory;
}

/**
* Build a {@link CatalogStoreFactory.Context} for opening the {@link CatalogStoreFactory}.
*
* <p>The configuration format should be as follows:
*
* <pre>{@code
* table.catalog-store.kind: {identifier}
* table.catalog-store.{identifier}.{param1}: xxx
* table.catalog-store.{identifier}.{param2}: xxx
* }</pre>
*/
public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext(
Configuration configuration, ClassLoader classLoader) {
String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
String catalogStoreOptionPrefix =
CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + identifier + ".";
Map<String, String> options =
new DelegatingConfiguration(configuration, catalogStoreOptionPrefix).toMap();
CatalogStoreFactory.Context context =
new FactoryUtil.DefaultCatalogStoreContext(options, configuration, classLoader);

return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@
package org.apache.flink.table.factories;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
Expand All @@ -42,10 +39,14 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/** Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. */
/**
* Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}.
*
* @deprecated Use {@link FactoryUtil} instead.
*/
@Deprecated
@Internal
public class TableFactoryUtil {

Expand Down Expand Up @@ -174,50 +175,4 @@ public ClassLoader getUserClassLoader() {
}))
.collect(Collectors.toList());
}

/**
* Finds and creates a {@link CatalogStoreFactory} using the provided {@link Configuration} and
* user classloader.
*
* <p>The configuration format should be as follows:
*
* <pre>{@code
* table.catalog-store.kind: {identifier}
* table.catalog-store.{identifier}.{param1}: xxx
* table.catalog-store.{identifier}.{param2}: xxx
* }</pre>
*/
public static CatalogStoreFactory findAndCreateCatalogStoreFactory(
Configuration configuration, ClassLoader classLoader) {
String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);

CatalogStoreFactory catalogStoreFactory =
FactoryUtil.discoverFactory(classLoader, CatalogStoreFactory.class, identifier);

return catalogStoreFactory;
}

/**
* Build a {@link CatalogStoreFactory.Context} for opening the {@link CatalogStoreFactory}.
*
* <p>The configuration format should be as follows:
*
* <pre>{@code
* table.catalog-store.kind: {identifier}
* table.catalog-store.{identifier}.{param1}: xxx
* table.catalog-store.{identifier}.{param2}: xxx
* }</pre>
*/
public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext(
Configuration configuration, ClassLoader classLoader) {
String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
String catalogStoreOptionPrefix =
CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + identifier + ".";
Map<String, String> options =
new DelegatingConfiguration(configuration, catalogStoreOptionPrefix).toMap();
CatalogStoreFactory.Context context =
new FactoryUtil.DefaultCatalogStoreContext(options, configuration, classLoader);

return context;
}
}
Loading