Skip to content

Commit

Permalink
Core: Support registerTable with REST session catalog (#6512)
Browse files Browse the repository at this point in the history
  • Loading branch information
krvikash authored Jul 4, 2023
1 parent b8102d5 commit f3826bd
Show file tree
Hide file tree
Showing 13 changed files with 443 additions and 35 deletions.
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
Expand Down Expand Up @@ -223,6 +224,21 @@ public static LoadTableResponse createTable(
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
}

public static LoadTableResponse registerTable(
Catalog catalog, Namespace namespace, RegisterTableRequest request) {
request.validate();

TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
Table table = catalog.registerTable(identifier, request.metadataLocation());
if (table instanceof BaseTable) {
return LoadTableResponse.builder()
.withTableMetadata(((BaseTable) table).operations().current())
.build();
}

throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
}

public static void dropTable(Catalog catalog, TableIdentifier ident) {
boolean dropped = catalog.dropTable(ident, false);
if (!dropped) {
Expand Down
28 changes: 27 additions & 1 deletion core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CommitTransactionRequestParser;
import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest;
import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequestParser;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequestParser;
import org.apache.iceberg.rest.requests.UpdateRequirementParser;
Expand Down Expand Up @@ -93,7 +96,12 @@ public static void registerAll(ObjectMapper mapper) {
.addSerializer(CommitTransactionRequest.class, new CommitTransactionRequestSerializer())
.addDeserializer(CommitTransactionRequest.class, new CommitTransactionRequestDeserializer())
.addSerializer(UpdateTableRequest.class, new UpdateTableRequestSerializer())
.addDeserializer(UpdateTableRequest.class, new UpdateTableRequestDeserializer());
.addDeserializer(UpdateTableRequest.class, new UpdateTableRequestDeserializer())
.addSerializer(RegisterTableRequest.class, new RegisterTableRequestSerializer<>())
.addDeserializer(RegisterTableRequest.class, new RegisterTableRequestDeserializer<>())
.addSerializer(ImmutableRegisterTableRequest.class, new RegisterTableRequestSerializer<>())
.addDeserializer(
ImmutableRegisterTableRequest.class, new RegisterTableRequestDeserializer<>());
mapper.registerModule(module);
}

Expand Down Expand Up @@ -353,4 +361,22 @@ public UpdateTableRequest deserialize(JsonParser p, DeserializationContext conte
return UpdateTableRequestParser.fromJson(jsonNode);
}
}

public static class RegisterTableRequestSerializer<T extends RegisterTableRequest>
extends JsonSerializer<T> {
@Override
public void serialize(T request, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
RegisterTableRequestParser.toJson(request, gen);
}
}

public static class RegisterTableRequestDeserializer<T extends RegisterTableRequest>
extends JsonDeserializer<T> {
@Override
public T deserialize(JsonParser p, DeserializationContext context) throws IOException {
JsonNode jsonNode = p.getCodec().readTree(p);
return (T) RegisterTableRequestParser.fromJson(jsonNode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
Expand Down Expand Up @@ -404,7 +406,40 @@ public void invalidateTable(SessionContext context, TableIdentifier ident) {}
@Override
public Table registerTable(
SessionContext context, TableIdentifier ident, String metadataFileLocation) {
throw new UnsupportedOperationException("Register table is not supported");
checkIdentifierIsValid(ident);

Preconditions.checkArgument(
metadataFileLocation != null && !metadataFileLocation.isEmpty(),
"Invalid metadata file location: %s",
metadataFileLocation);

RegisterTableRequest request =
ImmutableRegisterTableRequest.builder()
.name(ident.name())
.metadataLocation(metadataFileLocation)
.build();

LoadTableResponse response =
client.post(
paths.register(ident.namespace()),
request,
LoadTableResponse.class,
headers(context),
ErrorHandlers.tableErrorHandler());

AuthSession session = tableSession(response.config(), session(context));
RESTTableOperations ops =
new RESTTableOperations(
client,
paths.table(ident),
session::headers,
tableFileIO(context, response.config()),
response.tableMetadata());

trackFileIO(ops);

return new BaseTable(
ops, fullTableName(ident), metricsReporter(paths.metrics(ident), session::headers));
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public String table(TableIdentifier ident) {
RESTUtil.encodeString(ident.name()));
}

public String register(Namespace ns) {
return SLASH.join("v1", prefix, "namespaces", RESTUtil.encodeNamespace(ns), "register");
}

public String rename() {
return SLASH.join("v1", prefix, "tables", "rename");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.requests;

import org.apache.iceberg.rest.RESTRequest;
import org.immutables.value.Value;

@Value.Immutable
public interface RegisterTableRequest extends RESTRequest {

String name();

String metadataLocation();

@Override
default void validate() {
// nothing to validate as it's not possible to create an invalid instance
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.requests;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.JsonUtil;

public class RegisterTableRequestParser {

private static final String NAME = "name";
private static final String METADATA_LOCATION = "metadata-location";

private RegisterTableRequestParser() {}

public static String toJson(RegisterTableRequest request) {
return toJson(request, false);
}

public static String toJson(RegisterTableRequest request, boolean pretty) {
return JsonUtil.generate(gen -> toJson(request, gen), pretty);
}

public static void toJson(RegisterTableRequest request, JsonGenerator gen) throws IOException {
Preconditions.checkArgument(null != request, "Invalid register table request: null");

gen.writeStartObject();

gen.writeStringField(NAME, request.name());
gen.writeStringField(METADATA_LOCATION, request.metadataLocation());

gen.writeEndObject();
}

public static RegisterTableRequest fromJson(String json) {
return JsonUtil.parse(json, RegisterTableRequestParser::fromJson);
}

public static RegisterTableRequest fromJson(JsonNode json) {
Preconditions.checkArgument(
null != json, "Cannot parse register table request from null object");

String name = JsonUtil.getString(NAME, json);
String metadataLocation = JsonUtil.getString(METADATA_LOCATION, json);

return ImmutableRegisterTableRequest.builder()
.name(name)
.metadataLocation(metadataLocation)
.build();
}
}
86 changes: 86 additions & 0 deletions core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateSchema;
Expand Down Expand Up @@ -2615,6 +2616,91 @@ public void tableCreationWithoutNamespace() {
.hasMessageContaining("Namespace does not exist: non-existing");
}

@Test
public void testRegisterTable() {
C catalog = catalog();

if (requiresNamespaceCreate()) {
catalog.createNamespace(TABLE.namespace());
}

Map<String, String> properties =
ImmutableMap.of("user", "someone", "created-at", "2023-01-15T00:00:01");
Table originalTable =
catalog
.buildTable(TABLE, SCHEMA)
.withPartitionSpec(SPEC)
.withSortOrder(WRITE_ORDER)
.withProperties(properties)
.create();

originalTable.newFastAppend().appendFile(FILE_A).commit();
originalTable.newFastAppend().appendFile(FILE_B).commit();
originalTable.newDelete().deleteFile(FILE_A).commit();
originalTable.newFastAppend().appendFile(FILE_C).commit();

TableOperations ops = ((BaseTable) originalTable).operations();
String metadataLocation = ops.current().metadataFileLocation();

catalog.dropTable(TABLE, false /* do not purge */);

Table registeredTable = catalog.registerTable(TABLE, metadataLocation);

Assertions.assertThat(registeredTable).isNotNull();
Assertions.assertThat(catalog.tableExists(TABLE)).as("Table must exist").isTrue();
Assertions.assertThat(registeredTable.properties())
.as("Props must match")
.containsAllEntriesOf(properties);
Assertions.assertThat(registeredTable.schema().asStruct())
.as("Schema must match")
.isEqualTo(originalTable.schema().asStruct());
Assertions.assertThat(registeredTable.specs())
.as("Specs must match")
.isEqualTo(originalTable.specs());
Assertions.assertThat(registeredTable.sortOrders())
.as("Sort orders must match")
.isEqualTo(originalTable.sortOrders());
Assertions.assertThat(registeredTable.currentSnapshot())
.as("Current snapshot must match")
.isEqualTo(originalTable.currentSnapshot());
Assertions.assertThat(registeredTable.snapshots())
.as("Snapshots must match")
.isEqualTo(originalTable.snapshots());
Assertions.assertThat(registeredTable.history())
.as("History must match")
.isEqualTo(originalTable.history());

TestHelpers.assertSameSchemaMap(registeredTable.schemas(), originalTable.schemas());
assertFiles(registeredTable, FILE_B, FILE_C);

registeredTable.newFastAppend().appendFile(FILE_A).commit();
assertFiles(registeredTable, FILE_B, FILE_C, FILE_A);

Assertions.assertThat(catalog.loadTable(TABLE)).isNotNull();
Assertions.assertThat(catalog.dropTable(TABLE)).isTrue();
Assertions.assertThat(catalog.tableExists(TABLE)).isFalse();
}

@Test
public void testRegisterExistingTable() {
C catalog = catalog();

TableIdentifier identifier = TableIdentifier.of("a", "t1");

if (requiresNamespaceCreate()) {
catalog.createNamespace(identifier.namespace());
}

catalog.createTable(identifier, SCHEMA);
Table table = catalog.loadTable(identifier);
TableOperations ops = ((BaseTable) table).operations();
String metadataLocation = ops.current().metadataFileLocation();
Assertions.assertThatThrownBy(() -> catalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: a.t1");
Assertions.assertThat(catalog.dropTable(identifier)).isTrue();
}

private static void assertEmpty(String context, Catalog catalog, Namespace ns) {
try {
Assertions.assertThat(catalog.listTables(ns)).as(context).isEmpty();
Expand Down
33 changes: 0 additions & 33 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.Namespace;
Expand Down Expand Up @@ -751,37 +749,6 @@ public void testConversions() {
assertThat(JdbcUtil.stringToNamespace(nsString)).isEqualTo(ns);
}

@Test
public void testRegisterTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
catalog.dropTable(identifier, false);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((JdbcTableOperations) ops).currentMetadataLocation();
Table registeredTable = catalog.registerTable(identifier, metadataLocation);
Assertions.assertThat(registeredTable).isNotNull();
TestHelpers.assertSerializedAndLoadedMetadata(registeringTable, registeredTable);
String expectedMetadataLocation =
((HasTableOperations) registeredTable).operations().current().metadataFileLocation();
Assertions.assertThat(metadataLocation).isEqualTo(expectedMetadataLocation);
Assertions.assertThat(catalog.loadTable(identifier)).isNotNull();
Assertions.assertThat(catalog.dropTable(identifier)).isTrue();
}

@Test
public void testRegisterExistingTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((JdbcTableOperations) ops).currentMetadataLocation();
Assertions.assertThatThrownBy(() -> catalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: a.t1");
Assertions.assertThat(catalog.dropTable(identifier)).isTrue();
}

@Test
public void testCatalogWithCustomMetricsReporter() throws IOException {
JdbcCatalog catalogWithCustomReporter =
Expand Down
Loading

0 comments on commit f3826bd

Please sign in to comment.