Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ Creates a catalog entry for a metadata.json file which already exists but does n
|---------------|-----------|------|-------------|
| `table` | ✔️ | string | Table which is to be registered |
| `metadata_file`| ✔️ | string | Metadata file which is to be registered as a new catalog identifier |
| `overwrite` | | boolean | Overwrite the table if it already exists (defaults to false) |

!!! warning
Having the same metadata.json registered in more than one catalog can lead to missing updates, loss of data, and table corruption.
Expand All @@ -764,6 +765,15 @@ CALL spark_catalog.system.register_table(
);
```

Register a new table as `db.tbl` to `spark_catalog` pointing to metadata.json file `path/to/metadata/file.json` and overwrite the existing table.
```sql
CALL spark_catalog.system.register_table(
table => 'db.tbl',
metadata_file => 'path/to/metadata/file.json',
overwrite => true
);
```

## Metadata information

### `ancestors_of`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.atIndex;

import java.util.List;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
Expand Down Expand Up @@ -82,4 +85,45 @@ public void testRegisterTable() throws NoSuchTableException, ParseException {
.as("Should have the right datafile count in the procedure result")
.contains(originalFileCount, atIndex(2));
}

@TestTemplate
public void testRegisterTableAlreadyExistsFails() throws Exception {
long numRows = 1000;

sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName);

Table table = Spark3Util.loadIcebergTable(spark, tableName);
String metadataJson = TableUtil.metadataFileLocation(table);

sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, metadataJson);

assertThatThrownBy(
() ->
sql(
"CALL %s.system.register_table('%s', '%s')",
catalogName, targetName, metadataJson))
.isInstanceOf(AlreadyExistsException.class)
.hasMessageContaining("Table already exists");
}

@TestTemplate
public void testRegisterTableWithOverwriteNotSupported() throws Exception {
long numRows = 1000;

sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName);
Table table = Spark3Util.loadIcebergTable(spark, tableName);
String metadataJson = TableUtil.metadataFileLocation(table);

sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, metadataJson);

assertThatThrownBy(
() ->
sql(
"CALL %s.system.register_table('%s', '%s', true)",
catalogName, targetName, metadataJson))
// RESTException is for RESTCatalog; UnsupportedOperationException is for HiveCatalog and
// HadoopCatalog.
.isInstanceOfAny(UnsupportedOperationException.class, RESTException.class)
.hasMessageContaining("Registering tables with overwrite is not supported");
}
Comment thread
huaxingao marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.TestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/**
* Currently, {@code registerTable} (with overwrite) is not overridden by {@code HiveCatalog} and
* {@code HadoopCatalog}, so they fall back to the default implementation in the {@code Catalog}
* interface which does not support overwrite. {@code RESTCatalog} delegates the call to a backend,
* but the backend used in this test environment also does not support it. Since none of the
* standard catalogs available in {@code ExtensionsTestBase} support overwrite for registration,
* this new test class was created with a custom catalog ({@link OverwriteSupportedCatalog}) to
* verify that the {@code overwrite} parameter is correctly passed through the Spark procedure.
*/
public class TestRegisterTableProcedureWithOverwriteSupport extends TestBase {

@TempDir protected java.nio.file.Path temp;

private String localTargetName = "register_table";
private String sourceTableName = "source_table";
private String customCatalogName;

@AfterEach
public void dropTables() {
if (customCatalogName != null) {
sql("DROP TABLE IF EXISTS %s.default.%s", customCatalogName, sourceTableName);
sql("DROP TABLE IF EXISTS %s.default.%s", customCatalogName, localTargetName);
}
}

@Test
public void testRegisterTableWithOverwrite() throws Exception {
customCatalogName = "custom_catalog";
spark
.conf()
.set("spark.sql.catalog." + customCatalogName, "org.apache.iceberg.spark.SparkCatalog");
spark
.conf()
.set(
"spark.sql.catalog." + customCatalogName + ".catalog-impl",
OverwriteSupportedCatalog.class.getName());
spark
.conf()
.set(
"spark.sql.catalog." + customCatalogName + ".warehouse",
temp.resolve("warehouse").toString());

sql("USE %s", customCatalogName);

// Create namespace in custom catalog
sql("CREATE NAMESPACE IF NOT EXISTS %s.default", customCatalogName);

// Create source table in custom catalog
sql(
"CREATE TABLE %s.default.%s (id int, data string) using ICEBERG",
customCatalogName, sourceTableName);

Table table =
Spark3Util.loadIcebergTable(spark, customCatalogName + ".default." + sourceTableName);
String metadataJson = TableUtil.metadataFileLocation(table);

// Create target table in custom catalog so it exists
sql(
"CREATE TABLE %s.default.%s (id int, data string) using ICEBERG",
customCatalogName, localTargetName);

// Call register_table with overwrite = true
sql(
"CALL %s.system.register_table(table => 'default.%s', metadata_file => '%s', overwrite => true)",
customCatalogName, localTargetName, metadataJson);

// Verify that the table in custom catalog now has the content of the source table
List<Object[]> original =
sql("SELECT * FROM %s.default.%s", customCatalogName, sourceTableName);
List<Object[]> registered =
sql("SELECT * FROM %s.default.%s", customCatalogName, localTargetName);
assertThat(registered)
.as("Registered table rows should match original table rows")
.isEqualTo(original);
}

public static class OverwriteSupportedCatalog extends InMemoryCatalog {
@Override
public Table registerTable(
TableIdentifier identifier, String metadataFileLocation, boolean overwrite) {
if (overwrite && tableExists(identifier)) {
dropTable(identifier, false);
}
return registerTable(identifier, metadataFileLocation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ class RegisterTableProcedure extends BaseProcedure {
requiredInParameter("table", DataTypes.StringType);
private static final ProcedureParameter METADATA_FILE_PARAM =
requiredInParameter("metadata_file", DataTypes.StringType);
private static final ProcedureParameter OVERWRITE_PARAM =
optionalInParameter("overwrite", DataTypes.BooleanType, "false");
Comment thread
dbjnbnrj marked this conversation as resolved.

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM};
new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM, OVERWRITE_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
Expand Down Expand Up @@ -96,7 +98,8 @@ public Iterator<Scan> call(InternalRow args) {
"Cannot handle an empty argument metadata_file");

Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
Table table = icebergCatalog.registerTable(tableName, metadataFile);
boolean overwrite = input.asBoolean(OVERWRITE_PARAM, false);
Table table = icebergCatalog.registerTable(tableName, metadataFile, overwrite);
Long currentSnapshotId = null;
Long totalDataFiles = null;
Long totalRecords = null;
Expand Down
Loading