diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 8e594caa12d4..08db2b52e32b 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -741,6 +741,7 @@ Creates a catalog entry for a metadata.json file which already exists but does n |---------------|-----------|------|-------------| | `table` | ✔️ | string | Table which is to be registered | | `metadata_file`| ✔️ | string | Metadata file which is to be registered as a new catalog identifier | +| `overwrite` | | boolean | Overwrite the table if it already exists (defaults to false) | !!! warning Having the same metadata.json registered in more than one catalog can lead to missing updates, loss of data, and table corruption. @@ -764,6 +765,15 @@ CALL spark_catalog.system.register_table( ); ``` +Register a new table as `db.tbl` to `spark_catalog` pointing to metadata.json file `path/to/metadata/file.json` and overwrite the existing table. +```sql +CALL spark_catalog.system.register_table( + table => 'db.tbl', + metadata_file => 'path/to/metadata/file.json', + overwrite => true +); +``` + ## Metadata information ### `ancestors_of` diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java index a06a67b7d612..5726e86c669e 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java @@ -19,12 +19,15 @@ package org.apache.iceberg.spark.extensions; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.atIndex; import java.util.List; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.TableUtil; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.spark.Spark3Util; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.ParseException; @@ -82,4 +85,45 @@ public void testRegisterTable() throws NoSuchTableException, ParseException { .as("Should have the right datafile count in the procedure result") .contains(originalFileCount, atIndex(2)); } + + @TestTemplate + public void testRegisterTableAlreadyExistsFails() throws Exception { + long numRows = 1000; + + sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName); + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + String metadataJson = TableUtil.metadataFileLocation(table); + + sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, metadataJson); + + assertThatThrownBy( + () -> + sql( + "CALL %s.system.register_table('%s', '%s')", + catalogName, targetName, metadataJson)) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageContaining("Table already exists"); + } + + @TestTemplate + public void testRegisterTableWithOverwriteNotSupported() throws Exception { + long numRows = 1000; + + sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + String metadataJson = TableUtil.metadataFileLocation(table); + + sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, metadataJson); + + assertThatThrownBy( + () -> + sql( + "CALL %s.system.register_table('%s', '%s', true)", + catalogName, targetName, metadataJson)) + // RESTException is for RESTCatalog; UnsupportedOperationException is for HiveCatalog and + // HadoopCatalog. + .isInstanceOfAny(UnsupportedOperationException.class, RESTException.class) + .hasMessageContaining("Registering tables with overwrite is not supported"); + } } diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedureWithOverwriteSupport.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedureWithOverwriteSupport.java new file mode 100644 index 000000000000..3d5ca7050c56 --- /dev/null +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedureWithOverwriteSupport.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableUtil; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.TestBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Currently, {@code registerTable} (with overwrite) is not overridden by {@code HiveCatalog} and + * {@code HadoopCatalog}, so they fall back to the default implementation in the {@code Catalog} + * interface which does not support overwrite. {@code RESTCatalog} delegates the call to a backend, + * but the backend used in this test environment also does not support it. Since none of the + * standard catalogs available in {@code ExtensionsTestBase} support overwrite for registration, + * this new test class was created with a custom catalog ({@link OverwriteSupportedCatalog}) to + * verify that the {@code overwrite} parameter is correctly passed through the Spark procedure. + */ +public class TestRegisterTableProcedureWithOverwriteSupport extends TestBase { + + @TempDir protected java.nio.file.Path temp; + + private String localTargetName = "register_table"; + private String sourceTableName = "source_table"; + private String customCatalogName; + + @AfterEach + public void dropTables() { + if (customCatalogName != null) { + sql("DROP TABLE IF EXISTS %s.default.%s", customCatalogName, sourceTableName); + sql("DROP TABLE IF EXISTS %s.default.%s", customCatalogName, localTargetName); + } + } + + @Test + public void testRegisterTableWithOverwrite() throws Exception { + customCatalogName = "custom_catalog"; + spark + .conf() + .set("spark.sql.catalog." + customCatalogName, "org.apache.iceberg.spark.SparkCatalog"); + spark + .conf() + .set( + "spark.sql.catalog." + customCatalogName + ".catalog-impl", + OverwriteSupportedCatalog.class.getName()); + spark + .conf() + .set( + "spark.sql.catalog." + customCatalogName + ".warehouse", + temp.resolve("warehouse").toString()); + + sql("USE %s", customCatalogName); + + // Create namespace in custom catalog + sql("CREATE NAMESPACE IF NOT EXISTS %s.default", customCatalogName); + + // Create source table in custom catalog + sql( + "CREATE TABLE %s.default.%s (id int, data string) using ICEBERG", + customCatalogName, sourceTableName); + + Table table = + Spark3Util.loadIcebergTable(spark, customCatalogName + ".default." + sourceTableName); + String metadataJson = TableUtil.metadataFileLocation(table); + + // Create target table in custom catalog so it exists + sql( + "CREATE TABLE %s.default.%s (id int, data string) using ICEBERG", + customCatalogName, localTargetName); + + // Call register_table with overwrite = true + sql( + "CALL %s.system.register_table(table => 'default.%s', metadata_file => '%s', overwrite => true)", + customCatalogName, localTargetName, metadataJson); + + // Verify that the table in custom catalog now has the content of the source table + List original = + sql("SELECT * FROM %s.default.%s", customCatalogName, sourceTableName); + List registered = + sql("SELECT * FROM %s.default.%s", customCatalogName, localTargetName); + assertThat(registered) + .as("Registered table rows should match original table rows") + .isEqualTo(original); + } + + public static class OverwriteSupportedCatalog extends InMemoryCatalog { + @Override + public Table registerTable( + TableIdentifier identifier, String metadataFileLocation, boolean overwrite) { + if (overwrite && tableExists(identifier)) { + dropTable(identifier, false); + } + return registerTable(identifier, metadataFileLocation); + } + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java index 9ba577ad7e24..c5cf5a11141c 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java @@ -46,9 +46,11 @@ class RegisterTableProcedure extends BaseProcedure { requiredInParameter("table", DataTypes.StringType); private static final ProcedureParameter METADATA_FILE_PARAM = requiredInParameter("metadata_file", DataTypes.StringType); + private static final ProcedureParameter OVERWRITE_PARAM = + optionalInParameter("overwrite", DataTypes.BooleanType, "false"); private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM}; + new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM, OVERWRITE_PARAM}; private static final StructType OUTPUT_TYPE = new StructType( @@ -96,7 +98,8 @@ public Iterator call(InternalRow args) { "Cannot handle an empty argument metadata_file"); Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog(); - Table table = icebergCatalog.registerTable(tableName, metadataFile); + boolean overwrite = input.asBoolean(OVERWRITE_PARAM, false); + Table table = icebergCatalog.registerTable(tableName, metadataFile, overwrite); Long currentSnapshotId = null; Long totalDataFiles = null; Long totalRecords = null;