diff --git a/build.gradle b/build.gradle index a385685f6b7d..12fc53136534 100644 --- a/build.gradle +++ b/build.gradle @@ -38,6 +38,7 @@ buildscript { plugins { id 'nebula.dependency-recommender' version '9.0.2' + id 'org.projectnessie' version '0.2.1' } try { @@ -1059,6 +1060,53 @@ project(':iceberg-pig') { } } +project(':iceberg-nessie') { + apply plugin: 'org.projectnessie' + + java { + registerFeature('clientSupport') { + usingSourceSet(sourceSets.main) + } + } + + dependencies { + compile project(':iceberg-core') + compile project(path: ':iceberg-bundled-guava', configuration: 'shadow') + compile "org.projectnessie:nessie-client" + quarkusAppRunnerConfig "org.projectnessie:nessie-quarkus:0.2.1" + clientSupportImplementation("io.quarkus:quarkus-rest-client") { + exclude group: "org.jboss.logging", module: "commons-logging-jboss-logging" + } + clientSupportImplementation("io.quarkus:quarkus-resteasy-jackson") { + exclude group: "org.codehaus.mojo", module: "animal-sniffer-annotations" + } + + + compileOnly("org.apache.hadoop:hadoop-common") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.apache.httpcomponents', module: 'httpclient' + exclude group: 'org.apache.httpcomponents', module: 'httpcore' + exclude group: 'commons-httpclient', module: 'commons-httpclient' + exclude group: "com.sun.jersey", module: 'jersey-core' + exclude group: "com.sun.jersey", module: 'jersey-json' + exclude group: "com.sun.jersey", module: 'jersey-server' + } + + testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + testCompile(project(":iceberg-nessie")) { + capabilities { + requireCapability("org.apache.iceberg:iceberg-nessie-client-support") + } + } + } + quarkusAppRunnerProperties { + props = ["quarkus.http.test-port": 0] + } + tasks.getByName("quarkus-start").dependsOn("compileTestJava") + tasks.test.dependsOn("quarkus-start").finalizedBy("quarkus-stop") + +} + @Memoized boolean isVersionFileExists() { return file('version.txt').exists() diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java new file mode 100644 index 000000000000..8a80effc0d2f --- /dev/null +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import com.dremio.nessie.api.TreeApi; +import com.dremio.nessie.client.NessieClient; +import com.dremio.nessie.error.BaseNessieClientServerException; +import com.dremio.nessie.error.NessieConflictException; +import com.dremio.nessie.error.NessieNotFoundException; +import com.dremio.nessie.model.Contents; +import com.dremio.nessie.model.IcebergTable; +import com.dremio.nessie.model.ImmutableDelete; +import com.dremio.nessie.model.ImmutableOperations; +import com.dremio.nessie.model.ImmutablePut; +import com.dremio.nessie.model.Operations; +import com.dremio.nessie.model.Reference; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Nessie implementation of Iceberg Catalog. + * + *

+ * A note on namespaces: Nessie namespaces are implicit and do not need to be explicitly created or deleted. + * The create and delete namespace methods are no-ops for the NessieCatalog. One can still list namespaces that have + * objects stored in them to assist with namespace-centric catalog exploration. + *

+ */ +public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable, SupportsNamespaces, Configurable { + private static final Logger logger = LoggerFactory.getLogger(NessieCatalog.class); + private static final Joiner SLASH = Joiner.on("/"); + private NessieClient client; + private String warehouseLocation; + private Configuration config; + private UpdateableReference reference; + private String name; + private FileIO fileIO; + + public NessieCatalog() { + } + + @Override + public void initialize(String inputName, Map options) { + String fileIOImpl = options.get(CatalogProperties.FILE_IO_IMPL); + this.fileIO = fileIOImpl == null ? new HadoopFileIO(config) : CatalogUtil.loadFileIO(fileIOImpl, options, config); + this.name = inputName == null ? "nessie" : inputName; + // remove nessie prefix + final Function removePrefix = x -> x.replace("nessie.", ""); + + this.client = NessieClient.withConfig(x -> options.get(removePrefix.apply(x))); + + this.warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION); + if (warehouseLocation == null) { + throw new IllegalStateException("Parameter warehouse not set, nessie can't store data."); + } + final String requestedRef = options.get(removePrefix.apply(NessieClient.CONF_NESSIE_REF)); + this.reference = loadReference(requestedRef); + } + + @Override + public void close() { + client.close(); + } + + @Override + public String name() { + return name; + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + TableReference pti = TableReference.parse(tableIdentifier); + UpdateableReference newReference = this.reference; + if (pti.reference() != null) { + newReference = loadReference(pti.reference()); + } + return new NessieTableOperations(NessieUtil.toKey(pti.tableIdentifier()), newReference, client, fileIO); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier table) { + if (table.hasNamespace()) { + return SLASH.join(warehouseLocation, table.namespace().toString(), table.name()); + } + return SLASH.join(warehouseLocation, table.name()); + } + + @Override + public List listTables(Namespace namespace) { + return tableStream(namespace).collect(Collectors.toList()); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + reference.checkMutable(); + + IcebergTable existingTable = table(identifier); + if (existingTable == null) { + return false; + } + + // We try to drop the table. Simple retry after ref update. + boolean threw = true; + try { + Tasks.foreach(identifier) + .retry(5) + .stopRetryOn(NessieNotFoundException.class) + .throwFailureWhenFinished() + .run(this::dropTableInner, BaseNessieClientServerException.class); + threw = false; + } catch (NessieConflictException e) { + logger.error("Cannot drop table: failed after retry (update ref and retry)", e); + } catch (NessieNotFoundException e) { + logger.error("Cannot drop table: ref is no longer valid.", e); + } catch (BaseNessieClientServerException e) { + logger.error("Cannot drop table: unknown error", e); + } + return !threw; + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier toOriginal) { + reference.checkMutable(); + + TableIdentifier to = NessieUtil.removeCatalogName(toOriginal, name()); + + IcebergTable existingFromTable = table(from); + if (existingFromTable == null) { + throw new NoSuchTableException("table %s doesn't exists", from.name()); + } + IcebergTable existingToTable = table(to); + if (existingToTable != null) { + throw new AlreadyExistsException("table %s already exists", to.name()); + } + + Operations contents = ImmutableOperations.builder() + .addOperations(ImmutablePut.builder().key(NessieUtil.toKey(to)).contents(existingFromTable).build(), + ImmutableDelete.builder().key(NessieUtil.toKey(from)).build()) + .build(); + + try { + Tasks.foreach(contents) + .retry(5) + .stopRetryOn(NessieNotFoundException.class) + .throwFailureWhenFinished() + .run(c -> { + client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(), reference.getHash(), + "iceberg rename table", c); + refresh(); + }, BaseNessieClientServerException.class); + + } catch (NessieNotFoundException e) { + // important note: the NotFoundException refers to the ref only. If a table was not found it would imply that the + // another commit has deleted the table from underneath us. This would arise as a Conflict exception as opposed to + // a not found exception. This is analogous to a merge conflict in git when a table has been changed by one user + // and removed by another. + throw new RuntimeException("Failed to drop table as ref is no longer valid.", e); + } catch (BaseNessieClientServerException e) { + throw new CommitFailedException(e, "Failed to rename table: the current reference is not up to date."); + } + } + + /** + * creating namespaces in nessie is implicit, therefore this is a no-op. Metadata is ignored. + * + * @param namespace a multi-part namespace + * @param metadata a string Map of properties for the given namespace + */ + @Override + public void createNamespace(Namespace namespace, Map metadata) { + } + + @Override + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + return tableStream(namespace) + .map(TableIdentifier::namespace) + .filter(n -> !n.isEmpty()) + .distinct() + .collect(Collectors.toList()); + } + + /** + * namespace metadata is not supported in Nessie and we return an empty map. + * + * @param namespace a namespace. {@link Namespace} + * @return an empty map + */ + @Override + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + return ImmutableMap.of(); + } + + /** + * Namespaces in Nessie are implicit and deleting them results in a no-op. + * + * @param namespace a namespace. {@link Namespace} + * @return always false. + */ + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + return false; + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) throws NoSuchNamespaceException { + throw new UnsupportedOperationException( + "Cannot set namespace properties " + namespace + " : setProperties is not supported"); + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) throws NoSuchNamespaceException { + throw new UnsupportedOperationException( + "Cannot remove properties " + namespace + " : removeProperties is not supported"); + } + + @Override + public void setConf(Configuration conf) { + this.config = conf; + } + + @Override + public Configuration getConf() { + return config; + } + + TreeApi getTreeApi() { + return client.getTreeApi(); + } + + public void refresh() throws NessieNotFoundException { + reference.refresh(); + } + + public String currentHash() { + return reference.getHash(); + } + + String currentRefName() { + return reference.getName(); + } + + private IcebergTable table(TableIdentifier tableIdentifier) { + try { + Contents table = client.getContentsApi().getContents(NessieUtil.toKey(tableIdentifier), reference.getHash()); + return table.unwrap(IcebergTable.class).orElse(null); + } catch (NessieNotFoundException e) { + return null; + } + } + + private UpdateableReference loadReference(String requestedRef) { + try { + Reference ref = requestedRef == null ? client.getTreeApi().getDefaultBranch() + : client.getTreeApi().getReferenceByName(requestedRef); + return new UpdateableReference(ref, client.getTreeApi()); + } catch (NessieNotFoundException ex) { + if (requestedRef != null) { + throw new IllegalArgumentException(String.format("Nessie ref '%s' does not exist. " + + "This ref must exist before creating a NessieCatalog.", requestedRef), ex); + } + + throw new IllegalArgumentException(String.format("Nessie does not have an existing default branch." + + "Either configure an alternative ref via %s or create the default branch on the server.", + NessieClient.CONF_NESSIE_REF), ex); + } + } + + + public void dropTableInner(TableIdentifier identifier) throws NessieConflictException, NessieNotFoundException { + try { + client.getContentsApi().deleteContents(NessieUtil.toKey(identifier), + reference.getAsBranch().getName(), + reference.getHash(), + String.format("delete table %s", identifier)); + + } finally { + // TODO: fix this so we don't depend on it in tests. and move refresh into catch clause. + refresh(); + } + } + + private Stream tableStream(Namespace namespace) { + try { + return client.getTreeApi() + .getEntries(reference.getHash()) + .getEntries() + .stream() + .filter(NessieUtil.namespacePredicate(namespace)) + .map(NessieUtil::toIdentifier); + } catch (NessieNotFoundException ex) { + throw new NoSuchNamespaceException(ex, "Unable to list tables due to missing ref. %s", reference.getName()); + } + } +} diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java new file mode 100644 index 000000000000..03fad6749fad --- /dev/null +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import com.dremio.nessie.client.NessieClient; +import com.dremio.nessie.error.NessieConflictException; +import com.dremio.nessie.error.NessieNotFoundException; +import com.dremio.nessie.model.Contents; +import com.dremio.nessie.model.ContentsKey; +import com.dremio.nessie.model.IcebergTable; +import com.dremio.nessie.model.ImmutableIcebergTable; +import java.util.Map; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; + +/** + * Nessie implementation of Iceberg TableOperations. + */ +public class NessieTableOperations extends BaseMetastoreTableOperations { + + private final NessieClient client; + private final ContentsKey key; + private UpdateableReference reference; + private IcebergTable table; + private FileIO fileIO; + + /** + * Create a nessie table operations given a table identifier. + */ + public NessieTableOperations( + ContentsKey key, + UpdateableReference reference, + NessieClient client, + FileIO fileIO) { + this.key = key; + this.reference = reference; + this.client = client; + this.fileIO = fileIO; + } + + @Override + protected void doRefresh() { + try { + reference.refresh(); + } catch (NessieNotFoundException e) { + throw new RuntimeException("Failed to refresh as ref is no longer valid.", e); + } + String metadataLocation = null; + try { + Contents contents = client.getContentsApi().getContents(key, reference.getHash()); + this.table = contents.unwrap(IcebergTable.class) + .orElseThrow(() -> + new IllegalStateException("Cannot refresh iceberg table: " + + String.format("Nessie points to a non-Iceberg object for path: %s.", key))); + metadataLocation = table.getMetadataLocation(); + } catch (NessieNotFoundException ex) { + if (currentMetadataLocation() != null) { + throw new NoSuchTableException(ex, "No such table %s", key); + } + } + refreshFromMetadataLocation(metadataLocation, 2); + } + + @Override + protected void doCommit(TableMetadata base, TableMetadata metadata) { + reference.checkMutable(); + + String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); + + boolean threw = true; + try { + IcebergTable newTable = ImmutableIcebergTable.builder().metadataLocation(newMetadataLocation).build(); + client.getContentsApi().setContents(key, + reference.getAsBranch().getName(), + reference.getHash(), + String.format("iceberg commit%s", applicationId()), + newTable); + threw = false; + } catch (NessieConflictException ex) { + throw new CommitFailedException(ex, "Commit failed: Reference hash is out of date. " + + "Update the reference %s and try again", reference.getName()); + } catch (NessieNotFoundException ex) { + throw new RuntimeException(String.format("Commit failed: Reference %s no longer exist", reference.getName()), ex); + } finally { + if (threw) { + io().deleteFile(newMetadataLocation); + } + } + } + + @Override + public FileIO io() { + return fileIO; + } + + /** + * try and get a Spark application id if one exists. + * + *

+ * We haven't figured out a general way to pass commit messages through to the Nessie committer yet. + * This is hacky but gets the job done until we can have a more complete commit/audit log. + *

+ */ + private String applicationId() { + String appId = null; + TableMetadata current = current(); + if (current != null) { + Snapshot snapshot = current.currentSnapshot(); + if (snapshot != null) { + Map summary = snapshot.summary(); + appId = summary.get("spark.app.id"); + } + + } + return appId == null ? "" : ("\nspark.app.id= " + appId); + } + +} diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java new file mode 100644 index 000000000000..f850054573ce --- /dev/null +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import com.dremio.nessie.model.ContentsKey; +import com.dremio.nessie.model.EntriesResponse; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Predicate; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; + +public final class NessieUtil { + + private NessieUtil() { + + } + + static Predicate namespacePredicate(Namespace ns) { + // TODO: filter to just iceberg tables. + if (ns == null) { + return e -> true; + } + + final List namespace = Arrays.asList(ns.levels()); + Predicate predicate = e -> { + List names = e.getName().getElements(); + + if (names.size() <= namespace.size()) { + return false; + } + + return namespace.equals(names.subList(0, namespace.size())); + }; + return predicate; + } + + static TableIdentifier toIdentifier(EntriesResponse.Entry entry) { + List elements = entry.getName().getElements(); + return TableIdentifier.of(elements.toArray(new String[elements.size()])); + } + + static TableIdentifier removeCatalogName(TableIdentifier to, String name) { + + String[] levels = to.namespace().levels(); + // check if the identifier includes the catalog name and remove it + if (levels.length >= 2 && name.equalsIgnoreCase(to.namespace().level(0))) { + Namespace trimmedNamespace = Namespace.of(Arrays.copyOfRange(levels, 1, levels.length)); + return TableIdentifier.of(trimmedNamespace, to.name()); + } + + // return the original unmodified + return to; + } + + static ContentsKey toKey(TableIdentifier tableIdentifier) { + List identifiers = new ArrayList<>(); + if (tableIdentifier.hasNamespace()) { + identifiers.addAll(Arrays.asList(tableIdentifier.namespace().levels())); + } + identifiers.add(tableIdentifier.name()); + + ContentsKey key = new ContentsKey(identifiers); + return key; + } + +} diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/TableReference.java b/nessie/src/main/java/org/apache/iceberg/nessie/TableReference.java new file mode 100644 index 000000000000..e7ba853f0b96 --- /dev/null +++ b/nessie/src/main/java/org/apache/iceberg/nessie/TableReference.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import java.time.Instant; +import org.apache.iceberg.catalog.TableIdentifier; + +public class TableReference { + + private final TableIdentifier tableIdentifier; + private final Instant timestamp; + private final String reference; + + /** + * Container class to specify a TableIdentifier on a specific Reference or at an Instant in time. + */ + public TableReference(TableIdentifier tableIdentifier, Instant timestamp, String reference) { + this.tableIdentifier = tableIdentifier; + this.timestamp = timestamp; + this.reference = reference; + } + + public TableIdentifier tableIdentifier() { + return tableIdentifier; + } + + public Instant timestamp() { + return timestamp; + } + + public String reference() { + return reference; + } + + /** + * Convert dataset read/write options to a table and ref/hash. + */ + public static TableReference parse(TableIdentifier path) { + TableReference pti = parse(path.name()); + return new TableReference(TableIdentifier.of(path.namespace(), pti.tableIdentifier().name()), + pti.timestamp(), + pti.reference()); + } + + /** + * Convert dataset read/write options to a table and ref/hash. + */ + public static TableReference parse(String path) { + // I am assuming tables can't have @ or # symbols + if (path.split("@").length > 2) { + throw new IllegalArgumentException(String.format("Can only reference one branch in %s", path)); + } + if (path.split("#").length > 2) { + throw new IllegalArgumentException(String.format("Can only reference one timestamp in %s", path)); + } + + if (path.contains("@") && path.contains("#")) { + throw new IllegalArgumentException("Invalid table name:" + + " # is not allowed (reference by timestamp is not supported)"); + } + + if (path.contains("@")) { + String[] tableRef = path.split("@"); + TableIdentifier identifier = TableIdentifier.parse(tableRef[0]); + return new TableReference(identifier, null, tableRef[1]); + } + + if (path.contains("#")) { + throw new IllegalArgumentException("Invalid table name:" + + " # is not allowed (reference by timestamp is not supported)"); + } + + TableIdentifier identifier = TableIdentifier.parse(path); + + return new TableReference(identifier, null, null); + } +} diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java b/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java new file mode 100644 index 000000000000..133f787fb892 --- /dev/null +++ b/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import com.dremio.nessie.api.TreeApi; +import com.dremio.nessie.error.NessieNotFoundException; +import com.dremio.nessie.model.Branch; +import com.dremio.nessie.model.Hash; +import com.dremio.nessie.model.Reference; + +class UpdateableReference { + + private Reference reference; + private final TreeApi client; + + UpdateableReference(Reference reference, TreeApi client) { + this.reference = reference; + this.client = client; + } + + public boolean refresh() throws NessieNotFoundException { + if (reference instanceof Hash) { + return false; + } + Reference oldReference = reference; + reference = client.getReferenceByName(reference.getName()); + return !oldReference.equals(reference); + } + + public boolean isBranch() { + return reference instanceof Branch; + } + + public UpdateableReference copy() { + return new UpdateableReference(reference, client); + } + + public String getHash() { + return reference.getHash(); + } + + public Branch getAsBranch() { + if (!isBranch()) { + throw new IllegalArgumentException("Reference is not a branch"); + } + return (Branch) reference; + } + + public void checkMutable() { + if (!isBranch()) { + throw new IllegalArgumentException("You can only mutate tables when using a branch."); + } + } + + public String getName() { + return reference.getName(); + } +} diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java new file mode 100644 index 000000000000..743af4915c27 --- /dev/null +++ b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import com.dremio.nessie.api.ContentsApi; +import com.dremio.nessie.api.TreeApi; +import com.dremio.nessie.client.NessieClient; +import com.dremio.nessie.error.NessieConflictException; +import com.dremio.nessie.error.NessieNotFoundException; +import com.dremio.nessie.model.Branch; +import com.dremio.nessie.model.Reference; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.StructType; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.types.Types.NestedField.required; + +public abstract class BaseTestIceberg { + + private static final Logger LOGGER = LoggerFactory.getLogger(BaseTestIceberg.class); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + protected NessieCatalog catalog; + protected NessieClient client; + protected TreeApi tree; + protected ContentsApi contents; + protected Configuration hadoopConfig; + protected final String branch; + private String path; + + public BaseTestIceberg(String branch) { + this.branch = branch; + } + + private void resetData() throws NessieConflictException, NessieNotFoundException { + for (Reference r : tree.getAllReferences()) { + if (r instanceof Branch) { + tree.deleteBranch(r.getName(), r.getHash()); + } else { + tree.deleteTag(r.getName(), r.getHash()); + } + } + tree.createReference(Branch.of("main", null)); + } + + @Before + public void beforeEach() throws IOException { + String port = System.getProperty("quarkus.http.test-port", "19120"); + path = String.format("http://localhost:%s/api/v1", port); + this.client = NessieClient.none(path); + tree = client.getTreeApi(); + contents = client.getContentsApi(); + + resetData(); + + try { + tree.createReference(Branch.of(branch, null)); + } catch (Exception e) { + // ignore, already created. Cant run this in BeforeAll as quarkus hasn't disabled auth + } + + hadoopConfig = new Configuration(); + catalog = initCatalog(branch); + } + + NessieCatalog initCatalog(String ref) { + NessieCatalog newCatalog = new NessieCatalog(); + newCatalog.setConf(hadoopConfig); + newCatalog.initialize("nessie", ImmutableMap.of("ref", ref, + "url", path, + "auth_type", "NONE", + CatalogProperties.WAREHOUSE_LOCATION, temp.getRoot().toURI().toString() + )); + return newCatalog; + } + + protected Table createTable(TableIdentifier tableIdentifier, int count) { + try { + return catalog.createTable(tableIdentifier, schema(count)); + } catch (Throwable t) { + LOGGER.error("unable to do create " + tableIdentifier.toString(), t); + throw t; + } + } + + protected void createTable(TableIdentifier tableIdentifier) { + Schema schema = new Schema(StructType.of(required(1, "id", LongType.get())) + .fields()); + catalog.createTable(tableIdentifier, schema).location(); + } + + protected static Schema schema(int count) { + List fields = new ArrayList<>(); + for (int i = 0; i < count; i++) { + fields.add(required(i, "id" + i, Types.LongType.get())); + } + return new Schema(Types.StructType.of(fields).fields()); + } + + void createBranch(String name, String hash) throws NessieNotFoundException, NessieConflictException { + tree.createReference(Branch.of(name, hash)); + } + + @After + public void afterEach() throws Exception { + catalog.close(); + client.close(); + catalog = null; + client = null; + hadoopConfig = null; + } + + static String metadataLocation(NessieCatalog catalog, TableIdentifier tableIdentifier) { + Table table = catalog.loadTable(tableIdentifier); + BaseTable baseTable = (BaseTable) table; + TableOperations ops = baseTable.operations(); + NessieTableOperations icebergOps = (NessieTableOperations) ops; + return icebergOps.currentMetadataLocation(); + } + +} diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java new file mode 100644 index 000000000000..432bbef63f8f --- /dev/null +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import com.dremio.nessie.error.NessieConflictException; +import com.dremio.nessie.error.NessieNotFoundException; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestBranchVisibility extends BaseTestIceberg { + + + private final TableIdentifier tableIdentifier1 = TableIdentifier.of("test-ns", "table1"); + private final TableIdentifier tableIdentifier2 = TableIdentifier.of("test-ns", "table2"); + private NessieCatalog testCatalog; + private int schemaCounter = 1; + + public TestBranchVisibility() { + super("main"); + } + + + @Before + public void before() throws NessieNotFoundException, NessieConflictException { + createTable(tableIdentifier1, 1); // table 1 + createTable(tableIdentifier2, 1); // table 2 + catalog.refresh(); + createBranch("test", catalog.currentHash()); + testCatalog = initCatalog("test"); + } + + @After + public void after() throws NessieNotFoundException, NessieConflictException { + catalog.dropTable(tableIdentifier1); + catalog.dropTable(tableIdentifier2); + catalog.refresh(); + catalog.getTreeApi().deleteBranch("test", + catalog.getTreeApi().getReferenceByName("test").getHash()); + testCatalog = null; + } + + @Test + public void testBranchNoChange() { + testCatalogEquality(catalog, testCatalog, true, true); + } + + @Test + public void testUpdateCatalogs() { + // ensure catalogs can't see each others updates + updateSchema(catalog, tableIdentifier1); + + testCatalogEquality(catalog, testCatalog, false, true); + + String initialMetadataLocation = metadataLocation(testCatalog, tableIdentifier2); + updateSchema(testCatalog, tableIdentifier2); + + testCatalogEquality(catalog, testCatalog, false, false); + + // points to the previous metadata location + Assert.assertEquals(initialMetadataLocation, metadataLocation(catalog, tableIdentifier2)); + } + + @Test + public void testCatalogOnReference() throws NessieNotFoundException { + updateSchema(catalog, tableIdentifier1); + updateSchema(testCatalog, tableIdentifier2); + String mainHash = tree.getReferenceByName("main").getHash(); + + // catalog created with ref points to same catalog as above + NessieCatalog refCatalog = initCatalog("test"); + testCatalogEquality(refCatalog, testCatalog, true, true); + + // catalog created with hash points to same catalog as above + NessieCatalog refHashCatalog = initCatalog(mainHash); + testCatalogEquality(refHashCatalog, catalog, true, true); + } + + @Test + public void testCatalogWithTableNames() throws NessieNotFoundException { + updateSchema(testCatalog, tableIdentifier2); + String mainHash = tree.getReferenceByName("main").getHash(); + + // asking for table@branch gives expected regardless of catalog + Assert.assertEquals(metadataLocation(catalog, TableIdentifier.of("test-ns", "table1@test")), + metadataLocation(testCatalog, tableIdentifier1)); + + // asking for table@branch#hash gives expected regardless of catalog + Assert.assertEquals(metadataLocation(catalog, TableIdentifier.of("test-ns", "table1@" + mainHash)), + metadataLocation(testCatalog, tableIdentifier1)); + } + + @Test + public void testConcurrentChanges() throws NessieNotFoundException { + NessieCatalog emptyTestCatalog = initCatalog("test"); + updateSchema(testCatalog, tableIdentifier1); + // Updating table with out of date hash. We expect this to succeed because of retry despite the conflict. + updateSchema(emptyTestCatalog, tableIdentifier1); + } + + private void updateSchema(NessieCatalog catalog, TableIdentifier identifier) { + catalog.loadTable(identifier).updateSchema().addColumn("id" + schemaCounter++, Types.LongType.get()).commit(); + } + + private void testCatalogEquality(NessieCatalog catalog, + NessieCatalog compareCatalog, + boolean table1Equal, + boolean table2Equal) { + String testTable1 = metadataLocation(compareCatalog, tableIdentifier1); + String table1 = metadataLocation(catalog, tableIdentifier1); + String testTable2 = metadataLocation(compareCatalog, tableIdentifier2); + String table2 = metadataLocation(catalog, tableIdentifier2); + + String msg1 = String.format("Table %s on ref %s should%s equal table %s on ref %s", tableIdentifier1.name(), + tableIdentifier2.name(), table1Equal ? "" : " not", catalog.currentRefName(), testCatalog.currentRefName()); + Assert.assertEquals(msg1, table1Equal, table1.equals(testTable1)); + String msg2 = String.format("Table %s on ref %s should%s equal table %s on ref %s", tableIdentifier1.name(), + tableIdentifier2.name(), table1Equal ? "" : " not", catalog.currentRefName(), testCatalog.currentRefName()); + Assert.assertEquals(msg2, table2Equal, table2.equals(testTable2)); + } + +} diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java new file mode 100644 index 000000000000..f4b136dd2ee9 --- /dev/null +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import java.util.List; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestNamespace extends BaseTestIceberg { + private static final String BRANCH = "test-namespace"; + + public TestNamespace() { + super(BRANCH); + } + + @Test + public void testListNamespaces() { + createTable(TableIdentifier.parse("a.b.c.t1")); + createTable(TableIdentifier.parse("a.b.t2")); + createTable(TableIdentifier.parse("a.t3")); + createTable(TableIdentifier.parse("b.c.t4")); + createTable(TableIdentifier.parse("b.t5")); + createTable(TableIdentifier.parse("t6")); + + List tables = catalog.listTables(Namespace.of("a", "b", "c")); + assertEquals(1, tables.size()); + tables = catalog.listTables(Namespace.of("a", "b")); + assertEquals(2, tables.size()); + tables = catalog.listTables(Namespace.of("a")); + assertEquals(3, tables.size()); + tables = catalog.listTables(null); + assertEquals(6, tables.size()); + + List namespaces = catalog.listNamespaces(); + assertEquals(5, namespaces.size()); + namespaces = catalog.listNamespaces(Namespace.of("a")); + assertEquals(3, namespaces.size()); + namespaces = catalog.listNamespaces(Namespace.of("a", "b")); + assertEquals(2, namespaces.size()); + namespaces = catalog.listNamespaces(Namespace.of("b")); + assertEquals(2, namespaces.size()); + } +} diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java new file mode 100644 index 000000000000..4b63bdea2515 --- /dev/null +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + + +import com.dremio.nessie.error.NessieConflictException; +import com.dremio.nessie.error.NessieNotFoundException; +import com.dremio.nessie.model.Branch; +import com.dremio.nessie.model.ContentsKey; +import com.dremio.nessie.model.IcebergTable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Files; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.iceberg.TableMetadataParser.getFileExtension; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + + +public class TestNessieTable extends BaseTestIceberg { + + private static final String BRANCH = "iceberg-table-test"; + + private static final String DB_NAME = "db"; + private static final String TABLE_NAME = "tbl"; + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DB_NAME, TABLE_NAME); + private static final ContentsKey KEY = ContentsKey.of(DB_NAME, TABLE_NAME); + private static final Schema schema = new Schema(Types.StructType.of( + required(1, "id", Types.LongType.get())).fields()); + private static final Schema altered = new Schema(Types.StructType.of( + required(1, "id", Types.LongType.get()), + optional(2, "data", Types.LongType.get())).fields()); + + private Path tableLocation; + + public TestNessieTable() { + super(BRANCH); + } + + @Before + public void beforeEach() throws IOException { + super.beforeEach(); + this.tableLocation = new Path(catalog.createTable(TABLE_IDENTIFIER, schema).location()); + } + + @After + public void afterEach() throws Exception { + // drop the table data + if (tableLocation != null) { + tableLocation.getFileSystem(hadoopConfig).delete(tableLocation, true); + catalog.refresh(); + catalog.dropTable(TABLE_IDENTIFIER, false); + } + + super.afterEach(); + } + + private com.dremio.nessie.model.IcebergTable getTable(ContentsKey key) throws NessieNotFoundException { + return client.getContentsApi() + .getContents(key, BRANCH) + .unwrap(IcebergTable.class).get(); + } + + @Test + public void testCreate() throws NessieNotFoundException, IOException { + // Table should be created in iceberg + // Table should be renamed in iceberg + String tableName = TABLE_IDENTIFIER.name(); + Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER); + // add a column + icebergTable.updateSchema().addColumn("mother", Types.LongType.get()).commit(); + IcebergTable table = getTable(KEY); + // check parameters are in expected state + Assert.assertEquals(getTableLocation(tableName), + (temp.getRoot().toURI().toString() + DB_NAME + "/" + + tableName).replace("//", + "/")); + + // Only 1 snapshotFile Should exist and no manifests should exist + Assert.assertEquals(2, metadataVersionFiles(tableName).size()); + Assert.assertEquals(0, manifestFiles(tableName).size()); + } + + @Test + public void testRename() { + String renamedTableName = "rename_table_name"; + TableIdentifier renameTableIdentifier = TableIdentifier.of(TABLE_IDENTIFIER.namespace(), + renamedTableName); + + Table original = catalog.loadTable(TABLE_IDENTIFIER); + + catalog.renameTable(TABLE_IDENTIFIER, renameTableIdentifier); + Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER)); + Assert.assertTrue(catalog.tableExists(renameTableIdentifier)); + + Table renamed = catalog.loadTable(renameTableIdentifier); + + Assert.assertEquals(original.schema().asStruct(), renamed.schema().asStruct()); + Assert.assertEquals(original.spec(), renamed.spec()); + Assert.assertEquals(original.location(), renamed.location()); + Assert.assertEquals(original.currentSnapshot(), renamed.currentSnapshot()); + + Assert.assertTrue(catalog.dropTable(renameTableIdentifier)); + } + + @Test + public void testDrop() { + Assert.assertTrue(catalog.tableExists(TABLE_IDENTIFIER)); + Assert.assertTrue(catalog.dropTable(TABLE_IDENTIFIER)); + Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER)); + } + + @Test + public void testDropWithoutPurgeLeavesTableData() throws IOException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + + + String fileLocation = addRecordsToFile(table, "file"); + + DataFile file = DataFiles.builder(table.spec()) + .withRecordCount(3) + .withPath(fileLocation) + .withFileSizeInBytes(Files.localInput(fileLocation).getLength()) + .build(); + + table.newAppend().appendFile(file).commit(); + + String manifestListLocation = + table.currentSnapshot().manifestListLocation().replace("file:", ""); + + Assert.assertTrue(catalog.dropTable(TABLE_IDENTIFIER, false)); + Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER)); + + Assert.assertTrue(new File(fileLocation).exists()); + Assert.assertTrue(new File(manifestListLocation).exists()); + } + + @Test + public void testDropTable() throws IOException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + + GenericRecordBuilder recordBuilder = + new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test")); + List records = new ArrayList<>(); + records.add(recordBuilder.set("id", 1L).build()); + records.add(recordBuilder.set("id", 2L).build()); + records.add(recordBuilder.set("id", 3L).build()); + + String location1 = addRecordsToFile(table, "file1"); + String location2 = addRecordsToFile(table, "file2"); + + DataFile file1 = DataFiles.builder(table.spec()) + .withRecordCount(3) + .withPath(location1) + .withFileSizeInBytes(Files.localInput(location2).getLength()) + .build(); + + DataFile file2 = DataFiles.builder(table.spec()) + .withRecordCount(3) + .withPath(location2) + .withFileSizeInBytes(Files.localInput(location1).getLength()) + .build(); + + // add both data files + table.newAppend().appendFile(file1).appendFile(file2).commit(); + + // delete file2 + table.newDelete().deleteFile(file2.path()).commit(); + + String manifestListLocation = + table.currentSnapshot().manifestListLocation().replace("file:", ""); + + List manifests = table.currentSnapshot().allManifests(); + + Assert.assertTrue(catalog.dropTable(TABLE_IDENTIFIER)); + Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER)); + + Assert.assertTrue(new File(location1).exists()); + Assert.assertTrue(new File(location2).exists()); + Assert.assertTrue(new File(manifestListLocation).exists()); + for (ManifestFile manifest : manifests) { + Assert.assertTrue(new File(manifest.path().replace("file:", "")).exists()); + } + Assert.assertTrue(new File( + ((HasTableOperations) table).operations() + .current() + .metadataFileLocation() + .replace("file:", "")) + .exists()); + } + + @Test + public void testExistingTableUpdate() { + Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER); + // add a column + icebergTable.updateSchema().addColumn("data", Types.LongType.get()).commit(); + + icebergTable = catalog.loadTable(TABLE_IDENTIFIER); + + // Only 2 snapshotFile Should exist and no manifests should exist + Assert.assertEquals(2, metadataVersionFiles(TABLE_NAME).size()); + Assert.assertEquals(0, manifestFiles(TABLE_NAME).size()); + Assert.assertEquals(altered.asStruct(), icebergTable.schema().asStruct()); + + } + + @Test + public void testFailure() throws NessieNotFoundException, NessieConflictException { + Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER); + Branch branch = (Branch) client.getTreeApi().getReferenceByName(BRANCH); + + IcebergTable table = client.getContentsApi().getContents(KEY, BRANCH).unwrap(IcebergTable.class).get(); + + client.getContentsApi().setContents(KEY, branch.getName(), branch.getHash(), "", + IcebergTable.of("dummytable.metadata.json")); + + AssertHelpers.assertThrows("Update schema fails with conflict exception, ref not up to date", + CommitFailedException.class, + () -> icebergTable.updateSchema().addColumn("data", Types.LongType.get()).commit()); + } + + @Test + public void testListTables() { + List tableIdents = catalog.listTables(TABLE_IDENTIFIER.namespace()); + List expectedIdents = tableIdents.stream() + .filter(t -> t.namespace() + .level(0) + .equals(DB_NAME) && + t.name().equals(TABLE_NAME)) + .collect(Collectors.toList()); + + Assert.assertEquals(1, expectedIdents.size()); + Assert.assertTrue(catalog.tableExists(TABLE_IDENTIFIER)); + } + + private String getTableBasePath(String tableName) { + String databasePath = temp.getRoot().toString() + "/" + DB_NAME; + return Paths.get(databasePath, tableName).toAbsolutePath().toString(); + } + + protected Path getTableLocationPath(String tableName) { + return new Path("file", null, Paths.get(getTableBasePath(tableName)).toString()); + } + + protected String getTableLocation(String tableName) { + return getTableLocationPath(tableName).toString(); + } + + private String metadataLocation(String tableName) { + return Paths.get(getTableBasePath(tableName), "metadata").toString(); + } + + private List metadataFiles(String tableName) { + return Arrays.stream(new File(metadataLocation(tableName)).listFiles()) + .map(File::getAbsolutePath) + .collect(Collectors.toList()); + } + + protected List metadataVersionFiles(String tableName) { + return filterByExtension(tableName, getFileExtension(TableMetadataParser.Codec.NONE)); + } + + protected List manifestFiles(String tableName) { + return filterByExtension(tableName, ".avro"); + } + + private List filterByExtension(String tableName, String extension) { + return metadataFiles(tableName) + .stream() + .filter(f -> f.endsWith(extension)) + .collect(Collectors.toList()); + } + + private static String addRecordsToFile(Table table, String filename) throws IOException { + GenericRecordBuilder recordBuilder = + new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test")); + List records = new ArrayList<>(); + records.add(recordBuilder.set("id", 1L).build()); + records.add(recordBuilder.set("id", 2L).build()); + records.add(recordBuilder.set("id", 3L).build()); + + String fileLocation = table.location().replace("file:", "") + + String.format("/data/%s.avro", filename); + try (FileAppender writer = Avro.write(Files.localOutput(fileLocation)) + .schema(schema) + .named("test") + .build()) { + for (GenericData.Record rec : records) { + writer.add(rec); + } + } + return fileLocation; + } +} diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestTableReference.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestTableReference.java new file mode 100644 index 000000000000..5d0ab61134d6 --- /dev/null +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestTableReference.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import org.apache.iceberg.AssertHelpers; +import org.junit.Assert; +import org.junit.Test; + +public class TestTableReference { + + + @Test + public void noMarkings() { + String path = "foo"; + TableReference pti = TableReference.parse(path); + Assert.assertEquals(path, pti.tableIdentifier().name()); + Assert.assertNull(pti.reference()); + Assert.assertNull(pti.timestamp()); + } + + @Test + public void branchOnly() { + String path = "foo@bar"; + TableReference pti = TableReference.parse(path); + Assert.assertEquals("foo", pti.tableIdentifier().name()); + Assert.assertEquals("bar", pti.reference()); + Assert.assertNull(pti.timestamp()); + } + + @Test + public void timestampOnly() { + String path = "foo#baz"; + AssertHelpers.assertThrows("TableIdentifier is not parsable", + IllegalArgumentException.class, + "Invalid table name: # is not allowed (reference by timestamp is not supported)", () -> + TableReference.parse(path)); + } + + @Test + public void branchAndTimestamp() { + String path = "foo@bar#baz"; + AssertHelpers.assertThrows("TableIdentifier is not parsable", + IllegalArgumentException.class, + "Invalid table name: # is not allowed (reference by timestamp is not supported)", () -> + TableReference.parse(path)); + } + + @Test + public void twoBranches() { + String path = "foo@bar@boo"; + AssertHelpers.assertThrows("TableIdentifier is not parsable", + IllegalArgumentException.class, + "Can only reference one branch in", () -> + TableReference.parse(path)); + } + + @Test + public void twoTimestamps() { + String path = "foo#baz#baa"; + AssertHelpers.assertThrows("TableIdentifier is not parsable", + IllegalArgumentException.class, + "Can only reference one timestamp in", () -> + TableReference.parse(path)); + } + + @Test + public void strangeCharacters() { + String branch = "bar"; + String path = "/%"; + TableReference pti = TableReference.parse(path); + Assert.assertEquals(path, pti.tableIdentifier().name()); + Assert.assertNull(pti.reference()); + Assert.assertNull(pti.timestamp()); + pti = TableReference.parse(path + "@" + branch); + Assert.assertEquals(path, pti.tableIdentifier().name()); + Assert.assertEquals(branch, pti.reference()); + Assert.assertNull(pti.timestamp()); + path = "&&"; + pti = TableReference.parse(path); + Assert.assertEquals(path, pti.tableIdentifier().name()); + Assert.assertNull(pti.reference()); + Assert.assertNull(pti.timestamp()); + pti = TableReference.parse(path + "@" + branch); + Assert.assertEquals(path, pti.tableIdentifier().name()); + Assert.assertEquals(branch, pti.reference()); + Assert.assertNull(pti.timestamp()); + } + + @Test + public void doubleByte() { + String branch = "bar"; + String path = "/%国"; + TableReference pti = TableReference.parse(path); + Assert.assertEquals(path, pti.tableIdentifier().name()); + Assert.assertNull(pti.reference()); + Assert.assertNull(pti.timestamp()); + pti = TableReference.parse(path + "@" + branch); + Assert.assertEquals(path, pti.tableIdentifier().name()); + Assert.assertEquals(branch, pti.reference()); + Assert.assertNull(pti.timestamp()); + path = "国.国"; + pti = TableReference.parse(path); + Assert.assertEquals(path, pti.tableIdentifier().toString()); + Assert.assertNull(pti.reference()); + Assert.assertNull(pti.timestamp()); + pti = TableReference.parse(path + "@" + branch); + Assert.assertEquals(path, pti.tableIdentifier().toString()); + Assert.assertEquals(branch, pti.reference()); + Assert.assertNull(pti.timestamp()); + } + + @Test + public void whitespace() { + String branch = "bar "; + String path = "foo "; + TableReference pti = TableReference.parse(path); + Assert.assertEquals(path, pti.tableIdentifier().name()); + Assert.assertNull(pti.reference()); + Assert.assertNull(pti.timestamp()); + pti = TableReference.parse(path + "@" + branch); + Assert.assertEquals(path, pti.tableIdentifier().name()); + Assert.assertEquals(branch, pti.reference()); + Assert.assertNull(pti.timestamp()); + } +} diff --git a/settings.gradle b/settings.gradle index a5708c8e408f..037bdf802385 100644 --- a/settings.gradle +++ b/settings.gradle @@ -37,6 +37,7 @@ include 'spark3-extensions' include 'spark3-runtime' include 'pig' include 'hive-metastore' +include 'nessie' project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' @@ -57,6 +58,7 @@ project(':spark3-extensions').name = 'iceberg-spark3-extensions' project(':spark3-runtime').name = 'iceberg-spark3-runtime' project(':pig').name = 'iceberg-pig' project(':hive-metastore').name = 'iceberg-hive-metastore' +project(':nessie').name = 'iceberg-nessie' if (JavaVersion.current() == JavaVersion.VERSION_1_8) { include 'spark2' diff --git a/versions.props b/versions.props index 6e748b705234..65a8741025b9 100644 --- a/versions.props +++ b/versions.props @@ -19,6 +19,9 @@ org.apache.arrow:arrow-memory-netty = 2.0.0 com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1 software.amazon.awssdk:* = 2.15.7 org.scala-lang:scala-library = 2.12.10 +org.projectnessie:* = 0.2.1 +javax.ws.rs:javax.ws.rs-api = 2.1.1 +io.quarkus:* = 1.9.1.Final # test deps junit:junit = 4.12