From 23ab654b196aa4a04ac82fb797c4950761e33694 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Tue, 17 Oct 2023 10:53:33 +0530 Subject: [PATCH 1/6] Nessie: Support views for NessieCatalog --- .../apache/iceberg/nessie/NessieCatalog.java | 103 ++++-- .../iceberg/nessie/NessieIcebergClient.java | 229 +++++++++--- .../iceberg/nessie/NessieTableOperations.java | 94 ++--- .../org/apache/iceberg/nessie/NessieUtil.java | 124 +++++++ .../iceberg/nessie/NessieViewOperations.java | 137 +++++++ .../iceberg/nessie/UpdateableReference.java | 3 +- .../iceberg/nessie/BaseTestIceberg.java | 57 +++ .../iceberg/nessie/TestBranchVisibility.java | 77 +++- .../iceberg/nessie/TestNessieTable.java | 32 +- .../apache/iceberg/nessie/TestNessieView.java | 337 ++++++++++++++++++ .../iceberg/nessie/TestNessieViewCatalog.java | 217 +++++++++++ 11 files changed, 1229 insertions(+), 181 deletions(-) create mode 100644 nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java create mode 100644 nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java create mode 100644 nessie/src/test/java/org/apache/iceberg/nessie/TestNessieViewCatalog.java diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java index 5a8d2c157236..584cdfa4f846 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.UUID; import java.util.function.Function; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableOperations; @@ -41,19 +40,22 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.ViewOperations; import org.projectnessie.client.NessieClientBuilder; import org.projectnessie.client.NessieConfigConstants; import org.projectnessie.client.api.NessieApiV1; import org.projectnessie.client.api.NessieApiV2; import org.projectnessie.client.config.NessieClientConfigSource; import org.projectnessie.client.config.NessieClientConfigSources; +import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; import org.projectnessie.model.TableReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Nessie implementation of Iceberg Catalog. */ -public class NessieCatalog extends BaseMetastoreCatalog +public class NessieCatalog extends BaseMetastoreViewCatalog implements AutoCloseable, SupportsNamespaces, Configurable { private static final Logger LOG = LoggerFactory.getLogger(NessieCatalog.class); @@ -203,8 +205,7 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { org.projectnessie.model.Namespace.of(tableIdentifier.namespace().levels()), tr.getName()), client.withReference(tr.getReference(), tr.getHash()), - fileIO, - catalogOptions); + fileIO); } @Override @@ -231,41 +232,17 @@ protected String defaultWarehouseLocation(TableIdentifier table) { @Override public List listTables(Namespace namespace) { - return client.listTables(namespace); + return client.listContents(namespace, Content.Type.ICEBERG_TABLE); } @Override public boolean dropTable(TableIdentifier identifier, boolean purge) { - TableReference tableReference = parseTableReference(identifier); - return client - .withReference(tableReference.getReference(), tableReference.getHash()) - .dropTable(identifierWithoutTableReference(identifier, tableReference), purge); + return dropContent(identifier, Content.Type.ICEBERG_TABLE); } @Override public void renameTable(TableIdentifier from, TableIdentifier to) { - TableReference fromTableReference = parseTableReference(from); - TableReference toTableReference = parseTableReference(to); - String fromReference = - fromTableReference.hasReference() - ? fromTableReference.getReference() - : client.getRef().getName(); - String toReference = - toTableReference.hasReference() - ? toTableReference.getReference() - : client.getRef().getName(); - Preconditions.checkArgument( - fromReference.equalsIgnoreCase(toReference), - "from: %s and to: %s reference name must be same", - fromReference, - toReference); - - client - .withReference(fromTableReference.getReference(), fromTableReference.getHash()) - .renameTable( - identifierWithoutTableReference(from, fromTableReference), - NessieUtil.removeCatalogName( - identifierWithoutTableReference(to, toTableReference), name())); + renameContent(from, to, Content.Type.ICEBERG_TABLE); } @Override @@ -347,4 +324,68 @@ private TableIdentifier identifierWithoutTableReference( protected Map properties() { return catalogOptions; } + + @Override + protected ViewOperations newViewOps(TableIdentifier identifier) { + TableReference tr = parseTableReference(identifier); + return new NessieViewOperations( + ContentKey.of( + org.projectnessie.model.Namespace.of(identifier.namespace().levels()), tr.getName()), + client.withReference(tr.getReference(), tr.getHash()), + fileIO); + } + + @Override + public List listViews(Namespace namespace) { + return client.listContents(namespace, Content.Type.ICEBERG_VIEW); + } + + @Override + public boolean dropView(TableIdentifier identifier) { + return dropContent(identifier, Content.Type.ICEBERG_VIEW); + } + + @Override + public void renameView(TableIdentifier from, TableIdentifier to) { + renameContent(from, to, Content.Type.ICEBERG_VIEW); + } + + private boolean dropContent(TableIdentifier identifier, Content.Type type) { + TableReference tableReference = parseTableReference(identifier); + return client + .withReference(tableReference.getReference(), tableReference.getHash()) + .dropContent(identifierWithoutTableReference(identifier, tableReference), false, type); + } + + private void renameContent(TableIdentifier from, TableIdentifier to, Content.Type type) { + TableReference fromTableReference = parseTableReference(from); + TableReference toTableReference = parseTableReference(to); + String fromReference = + fromTableReference.hasReference() + ? fromTableReference.getReference() + : client.getRef().getName(); + String toReference = + toTableReference.hasReference() + ? toTableReference.getReference() + : client.getRef().getName(); + Preconditions.checkArgument( + fromReference.equalsIgnoreCase(toReference), + "Cannot rename %s '%s' on reference '%s' to '%s' on reference '%s':" + + " source and target references must be the same.", + NessieUtil.contentTypeString(type).toLowerCase(), + fromTableReference.getName(), + fromReference, + toTableReference.getName(), + toReference); + + TableIdentifier fromIdentifier = + NessieUtil.removeCatalogName( + identifierWithoutTableReference(from, fromTableReference), name()); + TableIdentifier toIdentifier = + NessieUtil.removeCatalogName(identifierWithoutTableReference(to, toTableReference), name()); + + client + .withReference(fromTableReference.getReference(), fromTableReference.getHash()) + .renameContent(fromIdentifier, toIdentifier, type); + } } diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java index 4cbbe4a562c1..c1fde38f5883 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java @@ -40,9 +40,11 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.relocated.com.google.common.base.Suppliers; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.view.ViewMetadata; import org.projectnessie.client.NessieConfigConstants; import org.projectnessie.client.api.CommitMultipleOperationsBuilder; import org.projectnessie.client.api.GetContentBuilder; @@ -56,13 +58,17 @@ import org.projectnessie.error.NessieReferenceConflictException; import org.projectnessie.error.NessieReferenceNotFoundException; import org.projectnessie.model.Branch; +import org.projectnessie.model.CommitMeta; import org.projectnessie.model.Conflict; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; import org.projectnessie.model.EntriesResponse; +import org.projectnessie.model.IcebergContent; import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.IcebergView; import org.projectnessie.model.ImmutableCommitMeta; import org.projectnessie.model.ImmutableIcebergTable; +import org.projectnessie.model.ImmutableIcebergView; import org.projectnessie.model.Operation; import org.projectnessie.model.Reference; import org.projectnessie.model.Tag; @@ -141,16 +147,26 @@ private UpdateableReference loadReference(String requestedRef, String hash) { } } + /** @deprecated will be removed after 1.5.0; use listContents() instead */ + @Deprecated public List listTables(Namespace namespace) { + return listContents(namespace, Content.Type.ICEBERG_TABLE); + } + + /** Lists Iceberg table or view from the given namespace */ + public List listContents(Namespace namespace, Content.Type type) { try { return withReference(api.getEntries()).get().getEntries().stream() .filter(namespacePredicate(namespace)) - .filter(e -> Content.Type.ICEBERG_TABLE == e.getType()) + .filter(e -> type.equals(e.getType())) .map(this::toIdentifier) .collect(Collectors.toList()); } catch (NessieNotFoundException ex) { throw new NoSuchNamespaceException( - ex, "Unable to list tables due to missing ref '%s'", getRef().getName()); + ex, + "Unable to list %ss due to missing ref '%s'", + NessieUtil.contentTypeString(type).toLowerCase(), + getRef().getName()); } } @@ -176,11 +192,11 @@ private TableIdentifier toIdentifier(EntriesResponse.Entry entry) { return TableIdentifier.of(elements.toArray(new String[elements.size()])); } - public IcebergTable table(TableIdentifier tableIdentifier) { + public IcebergContent fetchContent(TableIdentifier tableIdentifier) { try { ContentKey key = NessieUtil.toKey(tableIdentifier); - Content table = withReference(api.getContent().key(key)).get().get(key); - return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null; + Content content = withReference(api.getContent().key(key)).get().get(key); + return content != null ? content.unwrap(IcebergContent.class).orElse(null) : null; } catch (NessieNotFoundException e) { return null; } @@ -398,23 +414,27 @@ namespace, getRef().getName()), } } + /** @deprecated will be removed after 1.5.0; use renameContent() instead */ + @Deprecated public void renameTable(TableIdentifier from, TableIdentifier to) { + renameContent(from, to, Content.Type.ICEBERG_TABLE); + } + + public void renameContent(TableIdentifier from, TableIdentifier to, Content.Type type) { getRef().checkMutable(); - IcebergTable existingFromTable = table(from); - if (existingFromTable == null) { - throw new NoSuchTableException("Table does not exist: %s", from.name()); - } - IcebergTable existingToTable = table(to); - if (existingToTable != null) { - throw new AlreadyExistsException("Table already exists: %s", to.name()); - } + IcebergContent existingFromContent = fetchContent(from); + validateFromContentForRename(from, type, existingFromContent); + IcebergContent existingToContent = fetchContent(to); + validateToContentForRename(from, to, existingToContent); + + String contentType = NessieUtil.contentTypeString(type).toLowerCase(); try { commitRetry( - String.format("Iceberg rename table from '%s' to '%s'", from, to), + String.format("Iceberg rename %s from '%s' to '%s'", contentType, from, to), Operation.Delete.of(NessieUtil.toKey(from)), - Operation.Put.of(NessieUtil.toKey(to), existingFromTable)); + Operation.Put.of(NessieUtil.toKey(to), existingFromContent)); } catch (NessieNotFoundException e) { // important note: the NotFoundException refers to the ref only. If a table was not found it // would imply that the @@ -425,15 +445,20 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { // and removed by another. throw new RuntimeException( String.format( - "Cannot rename table '%s' to '%s': ref '%s' no longer exists.", - from.name(), to.name(), getRef().getName()), + "Cannot rename %s '%s' to '%s': ref '%s' no longer exists.", + contentType, from, to, getRef().getName()), e); } catch (BaseNessieClientServerException e) { + if (e instanceof NessieConflictException) { + NessieUtil.handleExceptionsForCommits(e, getRef().getName(), type); + } + throw new CommitFailedException( e, - "Cannot rename table '%s' to '%s': the current reference is not up to date.", - from.name(), - to.name()); + "Cannot rename %s '%s' to '%s': the current reference is not up to date.", + contentType, + from, + to); } catch (HttpClientException ex) { // Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant // to catch all kinds of network errors (e.g. connection reset). Network code implementation @@ -448,19 +473,67 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { // behavior. So better be safe than sorry. } + private static void validateToContentForRename( + TableIdentifier from, TableIdentifier to, IcebergContent existingToContent) { + if (existingToContent != null) { + if (existingToContent.getType() == Content.Type.ICEBERG_VIEW) { + throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to); + } else if (existingToContent.getType() == Content.Type.ICEBERG_TABLE) { + throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to); + } else { + throw new AlreadyExistsException( + "Cannot rename %s to %s. Another content with same name already exists", from, to); + } + } + } + + private static void validateFromContentForRename( + TableIdentifier from, Content.Type type, IcebergContent existingFromContent) { + if (existingFromContent == null) { + if (type == Content.Type.ICEBERG_VIEW) { + throw new NoSuchViewException("View does not exist: %s", from); + } else if (type == Content.Type.ICEBERG_TABLE) { + throw new NoSuchTableException("Table does not exist: %s", from); + } else { + throw new RuntimeException("Cannot perform rename for content type: " + type); + } + } else if (existingFromContent.getType() != type) { + throw new RuntimeException( + String.format("content type of from identifier %s should be of %s", from, type)); + } + } + + /** @deprecated will be removed after 1.5.0; use dropContent() instead */ + @Deprecated public boolean dropTable(TableIdentifier identifier, boolean purge) { + return dropContent(identifier, purge, Content.Type.ICEBERG_TABLE); + } + + public boolean dropContent(TableIdentifier identifier, boolean purge, Content.Type type) { getRef().checkMutable(); - IcebergTable existingTable = table(identifier); - if (existingTable == null) { + IcebergContent existingContent = fetchContent(identifier); + if (existingContent == null) { return false; } + if (existingContent.getType() != type) { + throw new RuntimeException( + String.format( + "Cannot drop %s: not matching with the type `%s`", + identifier, NessieUtil.contentTypeString(type))); + } + + String contentType = NessieUtil.contentTypeString(type).toLowerCase(); + if (purge) { - LOG.info("Purging data for table {} was set to true but is ignored", identifier.toString()); + LOG.info( + "Purging data for {} {} was set to true but is ignored", + contentType, + identifier.toString()); } - // We try to drop the table. Simple retry after ref update. + // We try to drop the content. Simple retry after ref update. try { commitRetry( String.format("Iceberg delete table %s", identifier), @@ -468,13 +541,14 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { return true; } catch (NessieConflictException e) { LOG.error( - "Cannot drop table: failed after retry (update ref '{}' and retry)", + "Cannot drop {}: failed after retry (update ref '{}' and retry)", + contentType, getRef().getName(), e); } catch (NessieNotFoundException e) { - LOG.error("Cannot drop table: ref '{}' is no longer valid.", getRef().getName(), e); + LOG.error("Cannot drop {}: ref '{}' is no longer valid.", contentType, getRef().getName(), e); } catch (BaseNessieClientServerException e) { - LOG.error("Cannot drop table: unknown error", e); + LOG.error("Cannot drop {}: unknown error", contentType, e); } return false; } @@ -499,23 +573,17 @@ public void commitTable( String contentId, ContentKey key) throws NessieConflictException, NessieNotFoundException { - UpdateableReference updateableReference = getRef(); - - updateableReference.checkMutable(); - - Branch current = (Branch) updateableReference.getReference(); - Branch expectedHead = current; - if (base != null) { - String metadataCommitId = - base.property(NessieTableOperations.NESSIE_COMMIT_ID_PROPERTY, expectedHead.getHash()); - if (metadataCommitId != null) { - expectedHead = Branch.of(expectedHead.getName(), metadataCommitId); - } - } Snapshot snapshot = metadata.currentSnapshot(); long snapshotId = snapshot != null ? snapshot.snapshotId() : -1L; + ImmutableCommitMeta.Builder builder = ImmutableCommitMeta.builder(); + builder.message(buildCommitMsg(base, metadata, key.toString())); + if (isSnapshotOperation(base, metadata)) { + builder.putProperties("iceberg.operation", snapshot.operation()); + } + CommitMeta commitMeta = NessieUtil.catalogOptions(builder, catalogOptions).build(); + ImmutableIcebergTable.Builder newTableBuilder = ImmutableIcebergTable.builder(); IcebergTable newTable = newTableBuilder @@ -527,22 +595,74 @@ public void commitTable( .metadataLocation(newMetadataLocation) .build(); + Map properties = base != null ? base.properties() : null; + commitContent(key, newTable, properties, commitMeta); + } + + public void commitView( + ViewMetadata base, + ViewMetadata metadata, + String newMetadataLocation, + String contentId, + ContentKey key) + throws NessieConflictException, NessieNotFoundException { + + long versionId = metadata.currentVersion().versionId(); + ImmutableIcebergView.Builder newViewBuilder = ImmutableIcebergView.builder(); + IcebergView newView = + newViewBuilder + .id(contentId) + .versionId(versionId) + .schemaId(metadata.currentSchemaId()) + .metadataLocation(newMetadataLocation) + // Only view metadata location need to be tracked from Nessie. + // Other information can be extracted by parsing the view metadata file. + .sqlText("-") + .dialect("-") + .build(); + + ImmutableCommitMeta.Builder builder = ImmutableCommitMeta.builder(); + builder.message(buildCommitMsg(base, metadata, key.toString())); + builder.putProperties("iceberg.operation", metadata.currentVersion().operation()); + CommitMeta commitMeta = NessieUtil.catalogOptions(builder, catalogOptions).build(); + + Map properties = base != null ? base.properties() : null; + commitContent(key, newView, properties, commitMeta); + } + + private void commitContent( + ContentKey key, + IcebergContent newContent, + Map properties, + CommitMeta commitMeta) + throws NessieNotFoundException, NessieConflictException { + UpdateableReference updateableReference = getRef(); + + updateableReference.checkMutable(); + + Branch current = (Branch) updateableReference.getReference(); + Branch expectedHead = current; + if (properties != null) { + String metadataCommitId = + properties.getOrDefault( + NessieTableOperations.NESSIE_COMMIT_ID_PROPERTY, expectedHead.getHash()); + if (metadataCommitId != null) { + expectedHead = Branch.of(expectedHead.getName(), metadataCommitId); + } + } + LOG.debug( "Committing '{}' against '{}', current is '{}': {}", key, expectedHead, current.getHash(), - newTable); - ImmutableCommitMeta.Builder builder = ImmutableCommitMeta.builder(); - builder.message(buildCommitMsg(base, metadata, key.toString())); - if (isSnapshotOperation(base, metadata)) { - builder.putProperties("iceberg.operation", snapshot.operation()); - } + newContent); + Branch branch = getApi() .commitMultipleOperations() - .operation(Operation.Put.of(key, newTable)) - .commitMeta(NessieUtil.catalogOptions(builder, catalogOptions).build()) + .operation(Operation.Put.of(key, newContent)) + .commitMeta(commitMeta) .branch(expectedHead) .commit(); LOG.info( @@ -576,11 +696,20 @@ private String buildCommitMsg(TableMetadata base, TableMetadata metadata, String return String.format( "Iceberg %s against %s", metadata.currentSnapshot().operation(), tableName); } else if (base != null && metadata.currentSchemaId() != base.currentSchemaId()) { - return String.format("Iceberg schema change against %s", tableName); + return String.format("Iceberg schema change against table %s", tableName); } else if (base == null) { return String.format("Iceberg table created/registered with name %s", tableName); } - return String.format("Iceberg commit against %s", tableName); + return String.format("Iceberg commit against table %s", tableName); + } + + private String buildCommitMsg(ViewMetadata base, ViewMetadata metadata, String viewName) { + String operation = metadata.currentVersion().operation(); + if (base != null && !metadata.currentSchemaId().equals(base.currentSchemaId())) { + return String.format( + "Iceberg schema change against view %s for the operation %s", viewName, operation); + } + return String.format("Iceberg view %sd with name %s", operation, viewName); } public String refName() { diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java index a5d7e7b21428..a395dc846f97 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java @@ -18,26 +18,20 @@ */ package org.apache.iceberg.nessie; -import java.util.EnumSet; -import java.util.Map; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.exceptions.CommitStateUnknownException; -import org.apache.iceberg.exceptions.NamespaceNotEmptyException; -import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; import org.projectnessie.client.http.HttpClientException; +import org.projectnessie.error.NessieBadRequestException; import org.projectnessie.error.NessieConflictException; import org.projectnessie.error.NessieNotFoundException; -import org.projectnessie.error.NessieReferenceConflictException; -import org.projectnessie.model.Conflict; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.IcebergView; import org.projectnessie.model.Reference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,18 +53,12 @@ public class NessieTableOperations extends BaseMetastoreTableOperations { private final ContentKey key; private IcebergTable table; private final FileIO fileIO; - private final Map catalogOptions; /** Create a nessie table operations given a table identifier. */ - NessieTableOperations( - ContentKey key, - NessieIcebergClient client, - FileIO fileIO, - Map catalogOptions) { + NessieTableOperations(ContentKey key, NessieIcebergClient client, FileIO fileIO) { this.key = key; this.client = client; this.fileIO = fileIO; - this.catalogOptions = catalogOptions; } @Override @@ -85,7 +73,7 @@ protected void doRefresh() { } catch (NessieNotFoundException e) { throw new RuntimeException( String.format( - "Failed to refresh as ref '%s' " + "is no longer valid.", client.getRef().getName()), + "Failed to refresh as ref '%s' is no longer valid.", client.getRef().getName()), e); } String metadataLocation = null; @@ -102,12 +90,17 @@ protected void doRefresh() { content .unwrap(IcebergTable.class) .orElseThrow( - () -> - new IllegalStateException( - String.format( - "Cannot refresh iceberg table: " - + "Nessie points to a non-Iceberg object for path: %s.", - key))); + () -> { + if (content instanceof IcebergView) { + return new AlreadyExistsException( + "View with same name already exists: %s", key); + } else { + return new AlreadyExistsException( + "Cannot refresh Iceberg table: " + + "Nessie points to a non-Iceberg object for path: %s.", + key); + } + }); metadataLocation = table.getMetadataLocation(); } } catch (NessieNotFoundException ex) { @@ -133,32 +126,19 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); - String refName = client.refName(); boolean failure = false; try { String contentId = table == null ? null : table.getId(); client.commitTable(base, metadata, newMetadataLocation, contentId, key); - } catch (NessieConflictException ex) { - failure = true; - if (ex instanceof NessieReferenceConflictException) { - // Throws a specialized exception, if possible - maybeThrowSpecializedException((NessieReferenceConflictException) ex); + } catch (NessieConflictException | NessieNotFoundException | HttpClientException ex) { + if (ex instanceof NessieConflictException || ex instanceof NessieNotFoundException) { + failure = true; } - throw new CommitFailedException( - ex, - "Cannot commit: Reference hash is out of date. " - + "Update the reference '%s' and try again", - refName); - } catch (HttpClientException ex) { - // Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant - // to catch all kinds of network errors (e.g. connection reset). Network code implementation - // details and all kinds of network devices can induce unexpected behavior. So better be - // safe than sorry. - throw new CommitStateUnknownException(ex); - } catch (NessieNotFoundException ex) { + NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_TABLE); + } catch (NessieBadRequestException ex) { failure = true; - throw new RuntimeException( - String.format("Cannot commit: Reference '%s' no longer exists", refName), ex); + NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_TABLE); + throw ex; } finally { if (failure) { io().deleteFile(newMetadataLocation); @@ -166,36 +146,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } } - private static void maybeThrowSpecializedException(NessieReferenceConflictException ex) { - NessieUtil.extractSingleConflict( - ex, - EnumSet.of( - Conflict.ConflictType.NAMESPACE_ABSENT, - Conflict.ConflictType.NAMESPACE_NOT_EMPTY, - Conflict.ConflictType.KEY_DOES_NOT_EXIST, - Conflict.ConflictType.KEY_EXISTS)) - .ifPresent( - conflict -> { - switch (conflict.conflictType()) { - case NAMESPACE_ABSENT: - throw new NoSuchNamespaceException( - ex, "Namespace does not exist: %s", conflict.key()); - case NAMESPACE_NOT_EMPTY: - throw new NamespaceNotEmptyException( - ex, "Namespace not empty: %s", conflict.key()); - case KEY_DOES_NOT_EXIST: - throw new NoSuchTableException( - ex, "Table or view does not exist: %s", conflict.key()); - case KEY_EXISTS: - throw new AlreadyExistsException( - ex, "Table or view already exists: %s", conflict.key()); - default: - // Explicit fall-through - break; - } - }); - } - @Override public FileIO io() { return fileIO; diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java index 3c3b0afd64d3..7b4d08243f17 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -31,13 +32,25 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.view.ViewMetadata; +import org.projectnessie.client.http.HttpClientException; +import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.error.NessieReferenceConflictException; import org.projectnessie.error.ReferenceConflicts; import org.projectnessie.model.CommitMeta; import org.projectnessie.model.Conflict; +import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; import org.projectnessie.model.IcebergTable; import org.projectnessie.model.ImmutableCommitMeta; @@ -190,4 +203,115 @@ public static Optional extractSingleConflict( Conflict conflict = conflicts.get(0); return Optional.of(conflict); } + + public static ViewMetadata loadViewMetadata( + ViewMetadata metadata, String metadataLocation, Reference reference) { + Map newProperties = Maps.newHashMap(metadata.properties()); + newProperties.put(NessieTableOperations.NESSIE_COMMIT_ID_PROPERTY, reference.getHash()); + + return ViewMetadata.buildFrom( + ViewMetadata.buildFrom(metadata).setProperties(newProperties).build()) + .setMetadataLocation(metadataLocation) + .build(); + } + + static void handleExceptionsForCommits(Exception exception, String refName, Content.Type type) { + if (exception instanceof NessieConflictException) { + if (exception instanceof NessieReferenceConflictException) { + // Throws a specialized exception, if possible + NessieUtil.maybeThrowSpecializedException( + (NessieReferenceConflictException) exception, type); + } + + throw new CommitFailedException( + exception, + "Cannot commit: Reference hash is out of date. Update the reference '%s' and try again", + refName); + } + + if (exception instanceof NessieNotFoundException) { + throw new RuntimeException( + String.format("Cannot commit: Reference '%s' no longer exists", refName), exception); + } + + if (exception instanceof HttpClientException) { + // Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant + // to catch all kinds of network errors (e.g. connection reset). Network code implementation + // details and all kinds of network devices can induce unexpected behavior. So better be + // safe than sorry. + throw new CommitStateUnknownException(exception); + } + } + + static void handleBadRequestForCommit( + NessieIcebergClient client, ContentKey key, Content.Type type) { + Content.Type anotherType = + type == Content.Type.ICEBERG_TABLE ? Content.Type.ICEBERG_VIEW : Content.Type.ICEBERG_TABLE; + try { + Content content = + client.getApi().getContent().key(key).reference(client.getReference()).get().get(key); + if (content != null) { + if (content.getType().equals(anotherType)) { + throw new AlreadyExistsException( + "%s with same name already exists: %s in %s", + NessieUtil.contentTypeString(anotherType), key, client.getReference()); + } else if (!content.getType().equals(type)) { + throw new AlreadyExistsException( + "Another content with same name already exists: %s in %s", + key, client.getReference()); + } + } + } catch (NessieNotFoundException e) { + throw new RuntimeException(e); + } + } + + private static void maybeThrowSpecializedException( + NessieReferenceConflictException ex, Content.Type type) { + String contentType = contentTypeString(type); + + NessieUtil.extractSingleConflict( + ex, + EnumSet.of( + Conflict.ConflictType.NAMESPACE_ABSENT, + Conflict.ConflictType.NAMESPACE_NOT_EMPTY, + Conflict.ConflictType.KEY_DOES_NOT_EXIST, + Conflict.ConflictType.KEY_EXISTS)) + .ifPresent( + conflict -> { + switch (conflict.conflictType()) { + case NAMESPACE_ABSENT: + throw new NoSuchNamespaceException( + ex, "Namespace does not exist: %s", conflict.key()); + case NAMESPACE_NOT_EMPTY: + throw new NamespaceNotEmptyException( + ex, "Namespace not empty: %s", conflict.key()); + case KEY_DOES_NOT_EXIST: + if (type == Content.Type.ICEBERG_VIEW) { + throw new NoSuchViewException( + ex, "%s does not exist: %s", contentType, conflict.key()); + } else { + throw new NoSuchTableException( + ex, "%s does not exist: %s", contentType, conflict.key()); + } + case KEY_EXISTS: + throw new AlreadyExistsException( + ex, "%s already exists: %s", contentType, conflict.key()); + default: + // Explicit fall-through + break; + } + }); + } + + static String contentTypeString(Content.Type type) { + if (type.equals(Content.Type.ICEBERG_VIEW)) { + return "View"; + } else if (type.equals(Content.Type.ICEBERG_TABLE)) { + return "Table"; + } else if (type.equals(Content.Type.NAMESPACE)) { + return "Namespace"; + } + throw new IllegalArgumentException("Unsupported Nessie content type " + type.name()); + } } diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java new file mode 100644 index 000000000000..c86d11ecc8a7 --- /dev/null +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.nessie; + +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewMetadataParser; +import org.projectnessie.client.http.HttpClientException; +import org.projectnessie.error.NessieBadRequestException; +import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.model.Content; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.IcebergView; +import org.projectnessie.model.Reference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NessieViewOperations extends BaseViewOperations { + + private static final Logger LOG = LoggerFactory.getLogger(NessieViewOperations.class); + + private final NessieIcebergClient client; + private final ContentKey key; + private final FileIO fileIO; + private IcebergView icebergView; + + NessieViewOperations(ContentKey key, NessieIcebergClient client, FileIO fileIO) { + this.key = key; + this.client = client; + this.fileIO = fileIO; + } + + @Override + public void doRefresh() { + try { + client.refresh(); + } catch (NessieNotFoundException e) { + throw new RuntimeException( + String.format( + "Failed to refresh as ref '%s' is no longer valid.", client.getRef().getName()), + e); + } + String metadataLocation = null; + Reference reference = client.getRef().getReference(); + try { + Content content = client.getApi().getContent().key(key).reference(reference).get().get(key); + LOG.debug("Content '{}' at '{}': {}", key, reference, content); + if (content == null) { + if (currentMetadataLocation() != null) { + throw new NoSuchViewException("View does not exist: %s in %s", key, reference); + } + } else { + this.icebergView = + content + .unwrap(IcebergView.class) + .orElseThrow( + () -> { + if (content instanceof IcebergTable) { + return new AlreadyExistsException( + "Table with same name already exists: %s in %s", key, reference); + } else { + return new AlreadyExistsException( + "Cannot refresh Iceberg view: Nessie points to a non-Iceberg object for path: %s in %s", + key, reference); + } + }); + metadataLocation = icebergView.getMetadataLocation(); + } + } catch (NessieNotFoundException ex) { + if (currentMetadataLocation() != null) { + throw new NoSuchViewException("View does not exist: %s in %s", key, reference); + } + } + refreshFromMetadataLocation( + metadataLocation, + null, + 2, + location -> + NessieUtil.loadViewMetadata( + ViewMetadataParser.read(io().newInputFile(location)), location, reference)); + } + + @Override + public void doCommit(ViewMetadata base, ViewMetadata metadata) { + String newMetadataLocation = writeNewMetadataIfRequired(metadata); + + boolean failure = false; + try { + String contentId = icebergView == null ? null : icebergView.getId(); + client.commitView(base, metadata, newMetadataLocation, contentId, key); + } catch (NessieConflictException | NessieNotFoundException | HttpClientException ex) { + if (ex instanceof NessieConflictException || ex instanceof NessieNotFoundException) { + failure = true; + } + NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_VIEW); + } catch (NessieBadRequestException ex) { + failure = true; + NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_VIEW); + throw ex; + } finally { + if (failure) { + io().deleteFile(newMetadataLocation); + } + } + } + + @Override + protected String viewName() { + return key.toString(); + } + + @Override + public FileIO io() { + return fileIO; + } +} diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java b/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java index 28ef7fe7c22b..09702675fdc2 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java @@ -62,7 +62,8 @@ public Reference getReference() { public void checkMutable() { Preconditions.checkArgument( - mutable, "You can only mutate tables when using a branch without a hash or timestamp."); + mutable, + "You can only mutate tables/views when using a branch without a hash or timestamp."); } public String getName() { diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java index 69dae7c21f54..287b8a0ac313 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java @@ -18,14 +18,19 @@ */ package org.apache.iceberg.nessie; +import static org.apache.iceberg.TableMetadataParser.getFileExtension; import static org.apache.iceberg.types.Types.NestedField.required; +import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.file.Path; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.avro.generic.GenericData.Record; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.BaseTable; @@ -35,6 +40,7 @@ import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.catalog.Namespace; @@ -46,6 +52,8 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.view.BaseView; +import org.apache.iceberg.view.View; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @@ -180,6 +188,32 @@ protected Table createTable(TableIdentifier tableIdentifier, Schema schema) { return catalog.createTable(tableIdentifier, schema); } + protected View createView(NessieCatalog nessieCatalog, TableIdentifier tableIdentifier) { + Schema schema = new Schema(StructType.of(required(1, "id", LongType.get())).fields()); + return createView(nessieCatalog, tableIdentifier, schema); + } + + protected View createView( + NessieCatalog nessieCatalog, TableIdentifier tableIdentifier, Schema schema) { + createMissingNamespaces(tableIdentifier); + return nessieCatalog + .buildView(tableIdentifier) + .withSchema(schema) + .withDefaultNamespace(tableIdentifier.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + } + + protected View replaceView(NessieCatalog nessieCatalog, TableIdentifier identifier) { + Schema schema = new Schema(StructType.of(required(2, "age", Types.IntegerType.get())).fields()); + return nessieCatalog + .buildView(identifier) + .withSchema(schema) + .withDefaultNamespace(identifier.namespace()) + .withQuery("trino", "select age from ns.tbl") + .replace(); + } + protected void createMissingNamespaces(TableIdentifier tableIdentifier) { createMissingNamespaces(catalog, tableIdentifier); } @@ -247,6 +281,10 @@ static String metadataLocation(NessieCatalog catalog, TableIdentifier tableIdent return icebergOps.currentMetadataLocation(); } + static String viewMetadataLocation(NessieCatalog catalog, TableIdentifier identifier) { + return ((BaseView) catalog.loadView(identifier)).operations().current().metadataFileLocation(); + } + static String writeRecordsToFile( Table table, Schema schema, String filename, List records) throws IOException { String fileLocation = @@ -267,4 +305,23 @@ static DataFile makeDataFile(Table icebergTable, String fileLocation) { .withFileSizeInBytes(Files.localInput(fileLocation).getLength()) .build(); } + + protected static List metadataVersionFiles(String tablePath) { + return filterByExtension(tablePath, getFileExtension(TableMetadataParser.Codec.NONE)); + } + + protected static List filterByExtension(String tablePath, String extension) { + return metadataFiles(tablePath).stream() + .filter(f -> f.endsWith(extension)) + .collect(Collectors.toList()); + } + + @SuppressWarnings( + "RegexpSinglelineJava") // respecting this rule requires a lot more lines of code + private static List metadataFiles(String tablePath) { + return Arrays.stream( + Objects.requireNonNull(new File((tablePath + "/" + "metadata")).listFiles())) + .map(File::getAbsolutePath) + .collect(Collectors.toList()); + } } diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java index 31a6d57c9c02..f38be059360b 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java @@ -38,6 +38,8 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.view.BaseView; +import org.apache.iceberg.view.View; import org.assertj.core.api.AbstractStringAssert; import org.assertj.core.api.Assertions; import org.assertj.core.api.InstanceOfAssertFactories; @@ -481,7 +483,8 @@ public void testWithRefAndHash() throws NessieConflictException, NessieNotFoundE // updates should not be possible Assertions.assertThatThrownBy(() -> catalogAtHash2.createTable(identifier, schema)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("You can only mutate tables when using a branch without a hash or timestamp."); + .hasMessage( + "You can only mutate tables/views when using a branch without a hash or timestamp."); Assertions.assertThat(catalogAtHash2.listTables(namespaceAB)).isEmpty(); // updates should be still possible here @@ -521,4 +524,76 @@ public void testDifferentTableSameName() throws NessieConflictException, NessieN Assertions.assertThat(table1.location()).isNotEqualTo(table2.location()); } + + @Test + public void testViewMetadataLocation() throws Exception { + String branch1 = "branch-1"; + String branch2 = "branch-2"; + + TableIdentifier viewIdentifier = TableIdentifier.of("test-ns", "view1"); + createView(catalog, viewIdentifier); + + createBranch(branch1, catalog.currentHash()); + // commit on viewIdentifier on branch1 + NessieCatalog catalog = initCatalog(branch1); + String metadataLocationOfCommit1 = + ((BaseView) replaceView(catalog, viewIdentifier)) + .operations() + .current() + .metadataFileLocation(); + + createBranch(branch2, catalog.currentHash(), branch1); + // commit on viewIdentifier on branch2 + catalog = initCatalog(branch2); + String metadataLocationOfCommit2 = + ((BaseView) replaceView(catalog, viewIdentifier)) + .operations() + .current() + .metadataFileLocation(); + + Assertions.assertThat(metadataLocationOfCommit2) + .isNotNull() + .isNotEqualTo(metadataLocationOfCommit1); + + catalog = initCatalog(branch1); + // load viewIdentifier on branch1 + BaseView view = (BaseView) catalog.loadView(viewIdentifier); + // branch1's viewIdentifier's metadata location must not have changed + Assertions.assertThat(view.operations().current().metadataFileLocation()) + .isNotNull() + .isNotEqualTo(metadataLocationOfCommit2); + + catalog.dropView(viewIdentifier); + } + + @Test + public void testDifferentViewSameName() throws NessieConflictException, NessieNotFoundException { + String branch1 = "branch1"; + String branch2 = "branch2"; + createBranch(branch1); + createBranch(branch2); + Schema schema1 = + new Schema(Types.StructType.of(required(1, "id", Types.LongType.get())).fields()); + Schema schema2 = + new Schema( + Types.StructType.of( + required(1, "file_count", Types.IntegerType.get()), + required(2, "record_count", Types.LongType.get())) + .fields()); + + TableIdentifier identifier = TableIdentifier.of("db", "view1"); + + NessieCatalog nessieCatalog = initCatalog(branch1); + + createMissingNamespaces(nessieCatalog, identifier); + View view1 = createView(nessieCatalog, identifier, schema1); + Assertions.assertThat(view1.schema().asStruct()).isEqualTo(schema1.asStruct()); + + nessieCatalog = initCatalog(branch2); + createMissingNamespaces(nessieCatalog, identifier); + View view2 = createView(nessieCatalog, identifier, schema2); + Assertions.assertThat(view2.schema().asStruct()).isEqualTo(schema2.asStruct()); + + Assertions.assertThat(view1.location()).isNotEqualTo(view2.location()); + } } diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java index 028f178b2dd7..b2eedc468bd4 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.nessie; -import static org.apache.iceberg.TableMetadataParser.getFileExtension; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -28,10 +27,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Arrays; import java.util.Comparator; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -45,7 +42,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -309,7 +305,8 @@ public void testRenameWithTableReferenceInvalidCase() throws NessieNotFoundExcep Assertions.assertThatThrownBy(() -> catalog.renameTable(fromIdentifier, toIdentifier)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("from: Something and to: iceberg-table-test reference name must be same"); + .hasMessage( + "Cannot rename table 'tbl' on reference 'Something' to 'rename_table_name' on reference 'iceberg-table-test': source and target references must be the same."); fromTableReference = ImmutableTableReference.builder() @@ -328,7 +325,8 @@ public void testRenameWithTableReferenceInvalidCase() throws NessieNotFoundExcep Assertions.assertThatThrownBy(() -> catalog.renameTable(fromIdentifierNew, toIdentifierNew)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("from: iceberg-table-test and to: Something reference name must be same"); + .hasMessage( + "Cannot rename table 'tbl' on reference 'iceberg-table-test' to 'rename_table_name' on reference 'Something': source and target references must be the same."); } private void verifyCommitMetadata() throws NessieNotFoundException { @@ -487,7 +485,8 @@ public void testRegisterTableFailureScenarios() Assertions.assertThatThrownBy( () -> catalog.registerTable(tagIdentifier, "file:" + metadataVersionFiles.get(0))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("You can only mutate tables when using a branch without a hash or timestamp."); + .hasMessage( + "You can only mutate tables/views when using a branch without a hash or timestamp."); // Case 4: non-null metadata path with null metadata location Assertions.assertThatThrownBy( () -> @@ -670,29 +669,10 @@ private String getTableBasePath(String tableName) { return temp.toUri() + DB_NAME + "/" + tableName; } - @SuppressWarnings( - "RegexpSinglelineJava") // respecting this rule requires a lot more lines of code - private List metadataFiles(String tablePath) { - return Arrays.stream( - Objects.requireNonNull(new File((tablePath + "/" + "metadata")).listFiles())) - .map(File::getAbsolutePath) - .collect(Collectors.toList()); - } - - protected List metadataVersionFiles(String tablePath) { - return filterByExtension(tablePath, getFileExtension(TableMetadataParser.Codec.NONE)); - } - protected List manifestFiles(String tablePath) { return filterByExtension(tablePath, ".avro"); } - private List filterByExtension(String tablePath, String extension) { - return metadataFiles(tablePath).stream() - .filter(f -> f.endsWith(extension)) - .collect(Collectors.toList()); - } - private static String addRecordsToFile(Table table, String filename) throws IOException { GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test")); diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java new file mode 100644 index 000000000000..ea721e937f4c --- /dev/null +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.nessie; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.SQLViewRepresentation; +import org.apache.iceberg.view.View; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.projectnessie.client.ext.NessieClientFactory; +import org.projectnessie.client.ext.NessieClientUri; +import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.model.Branch; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.IcebergView; +import org.projectnessie.model.ImmutableTableReference; +import org.projectnessie.model.LogResponse.LogEntry; + +public class TestNessieView extends BaseTestIceberg { + + private static final String BRANCH = "iceberg-view-test"; + + private static final String DB_NAME = "db"; + private static final String VIEW_NAME = "view"; + private static final TableIdentifier VIEW_IDENTIFIER = TableIdentifier.of(DB_NAME, VIEW_NAME); + private static final ContentKey KEY = ContentKey.of(DB_NAME, VIEW_NAME); + private static final Schema SCHEMA = + new Schema(Types.StructType.of(required(1, "id", Types.LongType.get())).fields()); + private static final Schema ALTERED = + new Schema( + Types.StructType.of( + required(1, "id", Types.LongType.get()), + optional(2, "data", Types.LongType.get())) + .fields()); + + private String viewLocation; + + public TestNessieView() { + super(BRANCH); + } + + @Override + @BeforeEach + public void beforeEach(NessieClientFactory clientFactory, @NessieClientUri URI nessieUri) + throws IOException { + super.beforeEach(clientFactory, nessieUri); + this.viewLocation = + createView(catalog, VIEW_IDENTIFIER, SCHEMA).location().replaceFirst("file:", ""); + } + + @Override + @AfterEach + public void afterEach() throws Exception { + // drop the view data + if (viewLocation != null) { + try (Stream walk = Files.walk(Paths.get(viewLocation))) { + walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } + catalog.dropView(VIEW_IDENTIFIER); + } + + super.afterEach(); + } + + private IcebergView getView(ContentKey key) throws NessieNotFoundException { + return getView(BRANCH, key); + } + + private IcebergView getView(String ref, ContentKey key) throws NessieNotFoundException { + return api.getContent().key(key).refName(ref).get().get(key).unwrap(IcebergView.class).get(); + } + + /** Verify that Nessie always returns the globally-current global-content w/ only DMLs. */ + @Test + public void verifyStateMovesForDML() throws Exception { + // 1. initialize view + View icebergView = catalog.loadView(VIEW_IDENTIFIER); + icebergView + .replaceVersion() + .withQuery("spark", "some query") + .withSchema(SCHEMA) + .withDefaultNamespace(VIEW_IDENTIFIER.namespace()) + .commit(); + + // 2. create 2nd branch + String testCaseBranch = "verify-global-moving"; + api.createReference() + .sourceRefName(BRANCH) + .reference(Branch.of(testCaseBranch, catalog.currentHash())) + .create(); + IcebergView contentInitialMain = getView(BRANCH, KEY); + IcebergView contentInitialBranch = getView(testCaseBranch, KEY); + View viewInitialMain = catalog.loadView(VIEW_IDENTIFIER); + + // verify view-metadata-location + version-id + Assertions.assertThat(contentInitialMain) + .as("global-contents + snapshot-id equal on both branches in Nessie") + .isEqualTo(contentInitialBranch); + Assertions.assertThat(viewInitialMain.currentVersion()).isNotNull(); + + // 3. modify view in "main" branch + + icebergView + .replaceVersion() + .withQuery("trino", "some other query") + .withSchema(SCHEMA) + .withDefaultNamespace(VIEW_IDENTIFIER.namespace()) + .commit(); + + IcebergView contentsAfter1Main = getView(KEY); + IcebergView contentsAfter1Branch = getView(testCaseBranch, KEY); + View viewAfter1Main = catalog.loadView(VIEW_IDENTIFIER); + + // --> assert getValue() against both branches returns the updated metadata-location + // verify view-metadata-location + Assertions.assertThat(contentInitialMain.getMetadataLocation()) + .describedAs("metadata-location must change on %s", BRANCH) + .isNotEqualTo(contentsAfter1Main.getMetadataLocation()); + Assertions.assertThat(contentInitialBranch.getMetadataLocation()) + .describedAs("metadata-location must not change on %s", testCaseBranch) + .isEqualTo(contentsAfter1Branch.getMetadataLocation()); + Assertions.assertThat(contentsAfter1Main) + .extracting(IcebergView::getSchemaId) + .describedAs("schema ID must be same across branches") + .isEqualTo(contentsAfter1Branch.getSchemaId()); + // verify updates + Assertions.assertThat( + ((SQLViewRepresentation) viewAfter1Main.currentVersion().representations().get(0)) + .dialect()) + .isEqualTo("trino"); + + // 4. modify view in "main" branch again + + icebergView + .replaceVersion() + .withQuery("flink", "some query") + .withSchema(SCHEMA) + .withDefaultNamespace(VIEW_IDENTIFIER.namespace()) + .commit(); + + IcebergView contentsAfter2Main = getView(KEY); + IcebergView contentsAfter2Branch = getView(testCaseBranch, KEY); + View viewAfter2Main = catalog.loadView(VIEW_IDENTIFIER); + + // --> assert getValue() against both branches returns the updated metadata-location + // verify view-metadata-location + Assertions.assertThat(contentsAfter2Main.getMetadataLocation()) + .describedAs("metadata-location must change on %s", BRANCH) + .isNotEqualTo(contentsAfter1Main.getMetadataLocation()); + Assertions.assertThat(contentsAfter2Branch.getMetadataLocation()) + .describedAs("on-reference-state must not change on %s", testCaseBranch) + .isEqualTo(contentsAfter1Branch.getMetadataLocation()); + Assertions.assertThat( + ((SQLViewRepresentation) viewAfter2Main.currentVersion().representations().get(0)) + .dialect()) + .isEqualTo("flink"); + } + + @Test + public void testUpdate() throws IOException { + String viewName = VIEW_IDENTIFIER.name(); + View icebergView = catalog.loadView(VIEW_IDENTIFIER); + // add a column + icebergView + .replaceVersion() + .withQuery("spark", "some query") + .withSchema(ALTERED) + .withDefaultNamespace(VIEW_IDENTIFIER.namespace()) + .commit(); + + getView(KEY); // sanity, check view exists + // check parameters are in expected state + String expected = temp.toUri() + DB_NAME + "/" + viewName; + Assertions.assertThat(getViewBasePath(viewName)).isEqualTo(expected); + + Assertions.assertThat(metadataVersionFiles(viewLocation)).isNotNull().hasSize(2); + + verifyCommitMetadata(); + } + + @Test + public void testRenameWithTableReference() throws NessieNotFoundException { + String renamedViewName = "rename_view_name"; + TableIdentifier renameViewIdentifier = + TableIdentifier.of(VIEW_IDENTIFIER.namespace(), renamedViewName); + + ImmutableTableReference fromTableReference = + ImmutableTableReference.builder() + .reference(catalog.currentRefName()) + .name(VIEW_IDENTIFIER.name()) + .build(); + ImmutableTableReference toTableReference = + ImmutableTableReference.builder() + .reference(catalog.currentRefName()) + .name(renameViewIdentifier.name()) + .build(); + TableIdentifier fromIdentifier = + TableIdentifier.of(VIEW_IDENTIFIER.namespace(), fromTableReference.toString()); + TableIdentifier toIdentifier = + TableIdentifier.of(VIEW_IDENTIFIER.namespace(), toTableReference.toString()); + + catalog.renameView(fromIdentifier, toIdentifier); + Assertions.assertThat(catalog.viewExists(fromIdentifier)).isFalse(); + Assertions.assertThat(catalog.viewExists(toIdentifier)).isTrue(); + + Assertions.assertThat(catalog.dropView(toIdentifier)).isTrue(); + + verifyCommitMetadata(); + } + + @Test + public void testRenameWithTableReferenceInvalidCase() { + String renamedViewName = "rename_view_name"; + TableIdentifier renameViewIdentifier = + TableIdentifier.of(VIEW_IDENTIFIER.namespace(), renamedViewName); + + ImmutableTableReference fromTableReference = + ImmutableTableReference.builder() + .reference("Something") + .name(VIEW_IDENTIFIER.name()) + .build(); + ImmutableTableReference toTableReference = + ImmutableTableReference.builder() + .reference(catalog.currentRefName()) + .name(renameViewIdentifier.name()) + .build(); + TableIdentifier fromIdentifier = + TableIdentifier.of(VIEW_IDENTIFIER.namespace(), fromTableReference.toString()); + TableIdentifier toIdentifier = + TableIdentifier.of(VIEW_IDENTIFIER.namespace(), toTableReference.toString()); + + Assertions.assertThatThrownBy(() -> catalog.renameView(fromIdentifier, toIdentifier)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Cannot rename view 'view' on reference 'Something' to 'rename_view_name' on reference 'iceberg-view-test': source and target references must be the same."); + + fromTableReference = + ImmutableTableReference.builder() + .reference(catalog.currentRefName()) + .name(VIEW_IDENTIFIER.name()) + .build(); + toTableReference = + ImmutableTableReference.builder() + .reference("Something") + .name(renameViewIdentifier.name()) + .build(); + TableIdentifier fromIdentifierNew = + TableIdentifier.of(VIEW_IDENTIFIER.namespace(), fromTableReference.toString()); + TableIdentifier toIdentifierNew = + TableIdentifier.of(VIEW_IDENTIFIER.namespace(), toTableReference.toString()); + + Assertions.assertThatThrownBy(() -> catalog.renameView(fromIdentifierNew, toIdentifierNew)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Cannot rename view 'view' on reference 'iceberg-view-test' to 'rename_view_name' on reference 'Something': source and target references must be the same."); + } + + private void verifyCommitMetadata() throws NessieNotFoundException { + // check that the author is properly set + List log = api.getCommitLog().refName(BRANCH).get().getLogEntries(); + Assertions.assertThat(log) + .isNotNull() + .isNotEmpty() + .filteredOn(e -> !e.getCommitMeta().getMessage().startsWith("create namespace ")) + .allSatisfy( + logEntry -> { + CommitMeta commit = logEntry.getCommitMeta(); + Assertions.assertThat(commit.getAuthor()).isNotNull().isNotEmpty(); + Assertions.assertThat(commit.getAuthor()).isEqualTo(System.getProperty("user.name")); + Assertions.assertThat(commit.getProperties().get(NessieUtil.APPLICATION_TYPE)) + .isEqualTo("iceberg"); + Assertions.assertThat(commit.getMessage()).startsWith("Iceberg"); + }); + } + + @Test + public void testDrop() throws NessieNotFoundException { + Assertions.assertThat(catalog.viewExists(VIEW_IDENTIFIER)).isTrue(); + Assertions.assertThat(catalog.dropView(VIEW_IDENTIFIER)).isTrue(); + Assertions.assertThat(catalog.viewExists(VIEW_IDENTIFIER)).isFalse(); + Assertions.assertThat(catalog.dropView(VIEW_IDENTIFIER)).isFalse(); + verifyCommitMetadata(); + } + + @Test + public void testListviews() { + TableIdentifier newIdentifier = TableIdentifier.of(DB_NAME, "newView"); + createView(catalog, newIdentifier, SCHEMA); + + List tableIdents = catalog.listViews(VIEW_IDENTIFIER.namespace()); + List expectedIdents = + tableIdents.stream() + .filter(t -> t.equals(newIdentifier) || t.equals(VIEW_IDENTIFIER)) + .collect(Collectors.toList()); + Assertions.assertThat(expectedIdents).hasSize(2); + Assertions.assertThat(catalog.viewExists(VIEW_IDENTIFIER)).isTrue(); + Assertions.assertThat(catalog.viewExists(newIdentifier)).isTrue(); + } + + private String getViewBasePath(String viewName) { + return temp.toUri() + DB_NAME + "/" + viewName; + } +} diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieViewCatalog.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieViewCatalog.java new file mode 100644 index 000000000000..d5ba68eb7454 --- /dev/null +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieViewCatalog.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.nessie; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Path; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.view.BaseView; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewCatalogTests; +import org.apache.iceberg.view.ViewMetadata; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.projectnessie.client.api.NessieApiV1; +import org.projectnessie.client.ext.NessieApiVersion; +import org.projectnessie.client.ext.NessieApiVersions; +import org.projectnessie.client.ext.NessieClientFactory; +import org.projectnessie.client.ext.NessieClientUri; +import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.jaxrs.ext.NessieJaxRsExtension; +import org.projectnessie.model.Branch; +import org.projectnessie.model.Reference; +import org.projectnessie.model.Tag; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.inmemory.InmemoryBackendTestFactory; +import org.projectnessie.versioned.storage.testextension.NessieBackend; +import org.projectnessie.versioned.storage.testextension.NessiePersist; +import org.projectnessie.versioned.storage.testextension.PersistExtension; + +@ExtendWith(PersistExtension.class) +@NessieBackend(InmemoryBackendTestFactory.class) +@NessieApiVersions // test all versions +public class TestNessieViewCatalog extends ViewCatalogTests { + + @NessiePersist static Persist persist; + + @RegisterExtension + static NessieJaxRsExtension server = NessieJaxRsExtension.jaxRsExtension(() -> persist); + + @TempDir public Path temp; + + private NessieCatalog catalog; + private NessieApiV1 api; + private NessieApiVersion apiVersion; + private Configuration hadoopConfig; + private String initialHashOfDefaultBranch; + private String uri; + + @BeforeEach + public void setUp(NessieClientFactory clientFactory, @NessieClientUri URI nessieUri) + throws NessieNotFoundException { + api = clientFactory.make(); + apiVersion = clientFactory.apiVersion(); + initialHashOfDefaultBranch = api.getDefaultBranch().getHash(); + uri = nessieUri.toASCIIString(); + hadoopConfig = new Configuration(); + catalog = initNessieCatalog("main"); + } + + @AfterEach + public void afterEach() throws IOException { + resetData(); + try { + if (catalog != null) { + catalog.close(); + } + api.close(); + } finally { + catalog = null; + api = null; + hadoopConfig = null; + } + } + + private void resetData() throws NessieConflictException, NessieNotFoundException { + Branch defaultBranch = api.getDefaultBranch(); + for (Reference r : api.getAllReferences().get().getReferences()) { + if (r instanceof Branch && !r.getName().equals(defaultBranch.getName())) { + api.deleteBranch().branch((Branch) r).delete(); + } + if (r instanceof Tag) { + api.deleteTag().tag((Tag) r).delete(); + } + } + api.assignBranch() + .assignTo(Branch.of(defaultBranch.getName(), initialHashOfDefaultBranch)) + .branch(defaultBranch) + .assign(); + } + + private NessieCatalog initNessieCatalog(String ref) { + NessieCatalog newCatalog = new NessieCatalog(); + newCatalog.setConf(hadoopConfig); + ImmutableMap options = + ImmutableMap.of( + "ref", + ref, + CatalogProperties.URI, + uri, + CatalogProperties.WAREHOUSE_LOCATION, + temp.toUri().toString(), + "client-api-version", + apiVersion == NessieApiVersion.V2 ? "2" : "1"); + newCatalog.initialize("nessie", options); + return newCatalog; + } + + @Override + protected NessieCatalog catalog() { + return catalog; + } + + @Override + protected Catalog tableCatalog() { + return catalog; + } + + @Override + protected boolean requiresNamespaceCreate() { + return true; + } + + // Overriding the below rename view testcases to exclude checking same view metadata after rename. + // Nessie adds extra properties (like commit id) on every operation. Hence, view metadata will not + // be same after rename. + @Override + public void renameView() { + TableIdentifier from = TableIdentifier.of("ns", "view"); + TableIdentifier to = TableIdentifier.of("ns", "renamedView"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(from.namespace()); + } + + assertThat(catalog().viewExists(from)).as("View should not exist").isFalse(); + + View view = + catalog() + .buildView(from) + .withSchema(SCHEMA) + .withDefaultNamespace(from.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + + assertThat(catalog().viewExists(from)).as("View should exist").isTrue(); + + ViewMetadata original = ((BaseView) view).operations().current(); + assertThat(original.metadataFileLocation()).isNotNull(); + + catalog().renameView(from, to); + + assertThat(catalog().viewExists(from)).as("View should not exist with old name").isFalse(); + assertThat(catalog().viewExists(to)).as("View should exist with new name").isTrue(); + + assertThat(catalog().dropView(from)).isFalse(); + assertThat(catalog().dropView(to)).isTrue(); + assertThat(catalog().viewExists(to)).as("View should not exist").isFalse(); + } + + @Override + public void renameViewUsingDifferentNamespace() { + TableIdentifier from = TableIdentifier.of("ns", "view"); + TableIdentifier to = TableIdentifier.of("other_ns", "renamedView"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(from.namespace()); + catalog().createNamespace(to.namespace()); + } + + assertThat(catalog().viewExists(from)).as("View should not exist").isFalse(); + + catalog() + .buildView(from) + .withSchema(SCHEMA) + .withDefaultNamespace(from.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + + assertThat(catalog().viewExists(from)).as("View should exist").isTrue(); + + catalog().renameView(from, to); + + assertThat(catalog().viewExists(from)).as("View should not exist with old name").isFalse(); + assertThat(catalog().viewExists(to)).as("View should exist with new name").isTrue(); + + assertThat(catalog().dropView(from)).isFalse(); + assertThat(catalog().dropView(to)).isTrue(); + assertThat(catalog().viewExists(to)).as("View should not exist").isFalse(); + } +} From a52e0d5dc7bdda5e752d4e1d0d026b9e4679ece9 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Fri, 8 Dec 2023 15:20:31 +0530 Subject: [PATCH 2/6] Address comments --- .../iceberg/nessie/NessieIcebergClient.java | 27 +++++++----- .../iceberg/nessie/NessieTableOperations.java | 4 +- .../org/apache/iceberg/nessie/NessieUtil.java | 19 +++++---- .../iceberg/nessie/NessieViewOperations.java | 3 +- .../iceberg/nessie/BaseTestIceberg.java | 7 ++-- .../iceberg/nessie/TestNessieTable.java | 9 ++-- .../apache/iceberg/nessie/TestNessieView.java | 41 +++++++++++-------- .../iceberg/nessie/TestNessieViewCatalog.java | 5 ++- 8 files changed, 66 insertions(+), 49 deletions(-) diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java index c1fde38f5883..fd5adabe36af 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java @@ -147,14 +147,16 @@ private UpdateableReference loadReference(String requestedRef, String hash) { } } - /** @deprecated will be removed after 1.5.0; use listContents() instead */ - @Deprecated public List listTables(Namespace namespace) { return listContents(namespace, Content.Type.ICEBERG_TABLE); } + public List listViews(Namespace namespace) { + return listContents(namespace, Content.Type.ICEBERG_VIEW); + } + /** Lists Iceberg table or view from the given namespace */ - public List listContents(Namespace namespace, Content.Type type) { + protected List listContents(Namespace namespace, Content.Type type) { try { return withReference(api.getEntries()).get().getEntries().stream() .filter(namespacePredicate(namespace)) @@ -414,13 +416,15 @@ namespace, getRef().getName()), } } - /** @deprecated will be removed after 1.5.0; use renameContent() instead */ - @Deprecated public void renameTable(TableIdentifier from, TableIdentifier to) { renameContent(from, to, Content.Type.ICEBERG_TABLE); } - public void renameContent(TableIdentifier from, TableIdentifier to, Content.Type type) { + public void renameView(TableIdentifier from, TableIdentifier to) { + renameContent(from, to, Content.Type.ICEBERG_VIEW); + } + + protected void renameContent(TableIdentifier from, TableIdentifier to, Content.Type type) { getRef().checkMutable(); IcebergContent existingFromContent = fetchContent(from); @@ -482,7 +486,8 @@ private static void validateToContentForRename( throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to); } else { throw new AlreadyExistsException( - "Cannot rename %s to %s. Another content with same name already exists", from, to); + "Cannot rename %s to %s. Another content of type %s with same name already exists", + from, to, existingToContent.getType()); } } } @@ -503,13 +508,15 @@ private static void validateFromContentForRename( } } - /** @deprecated will be removed after 1.5.0; use dropContent() instead */ - @Deprecated public boolean dropTable(TableIdentifier identifier, boolean purge) { return dropContent(identifier, purge, Content.Type.ICEBERG_TABLE); } - public boolean dropContent(TableIdentifier identifier, boolean purge, Content.Type type) { + public boolean dropView(TableIdentifier identifier, boolean purge) { + return dropContent(identifier, purge, Content.Type.ICEBERG_VIEW); + } + + protected boolean dropContent(TableIdentifier identifier, boolean purge, Content.Type type) { getRef().checkMutable(); IcebergContent existingContent = fetchContent(identifier); diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java index a395dc846f97..ab633471284f 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java @@ -137,8 +137,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_TABLE); } catch (NessieBadRequestException ex) { failure = true; - NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_TABLE); - throw ex; + throw NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_TABLE) + .orElse(ex); } finally { if (failure) { io().deleteFile(newMetadataLocation); diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java index 7b4d08243f17..0c5f5e358edf 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java @@ -243,7 +243,7 @@ static void handleExceptionsForCommits(Exception exception, String refName, Cont } } - static void handleBadRequestForCommit( + static Optional handleBadRequestForCommit( NessieIcebergClient client, ContentKey key, Content.Type type) { Content.Type anotherType = type == Content.Type.ICEBERG_TABLE ? Content.Type.ICEBERG_VIEW : Content.Type.ICEBERG_TABLE; @@ -252,18 +252,21 @@ static void handleBadRequestForCommit( client.getApi().getContent().key(key).reference(client.getReference()).get().get(key); if (content != null) { if (content.getType().equals(anotherType)) { - throw new AlreadyExistsException( - "%s with same name already exists: %s in %s", - NessieUtil.contentTypeString(anotherType), key, client.getReference()); + return Optional.of( + new AlreadyExistsException( + "%s with same name already exists: %s in %s", + NessieUtil.contentTypeString(anotherType), key, client.getReference())); } else if (!content.getType().equals(type)) { - throw new AlreadyExistsException( - "Another content with same name already exists: %s in %s", - key, client.getReference()); + return Optional.of( + new AlreadyExistsException( + "Another content with same name already exists: %s in %s", + key, client.getReference())); } } } catch (NessieNotFoundException e) { - throw new RuntimeException(e); + return Optional.of(new RuntimeException(e)); } + return Optional.empty(); } private static void maybeThrowSpecializedException( diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java index c86d11ecc8a7..a348e28fb66f 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java @@ -116,8 +116,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_VIEW); } catch (NessieBadRequestException ex) { failure = true; - NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_VIEW); - throw ex; + throw NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_VIEW).orElse(ex); } finally { if (failure) { io().deleteFile(newMetadataLocation); diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java index 287b8a0ac313..4cb768f2aa64 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java @@ -51,7 +51,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.LongType; -import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.view.BaseView; import org.apache.iceberg.view.View; import org.junit.jupiter.api.AfterEach; @@ -179,7 +178,7 @@ protected Table createTable(TableIdentifier tableIdentifier, int count) { protected void createTable(TableIdentifier tableIdentifier) { createMissingNamespaces(tableIdentifier); - Schema schema = new Schema(StructType.of(required(1, "id", LongType.get())).fields()); + Schema schema = new Schema(required(1, "id", LongType.get())); catalog.createTable(tableIdentifier, schema).location(); } @@ -189,7 +188,7 @@ protected Table createTable(TableIdentifier tableIdentifier, Schema schema) { } protected View createView(NessieCatalog nessieCatalog, TableIdentifier tableIdentifier) { - Schema schema = new Schema(StructType.of(required(1, "id", LongType.get())).fields()); + Schema schema = new Schema(required(1, "id", LongType.get())); return createView(nessieCatalog, tableIdentifier, schema); } @@ -205,7 +204,7 @@ protected View createView( } protected View replaceView(NessieCatalog nessieCatalog, TableIdentifier identifier) { - Schema schema = new Schema(StructType.of(required(2, "age", Types.IntegerType.get())).fields()); + Schema schema = new Schema(required(2, "age", Types.IntegerType.get())); return nessieCatalog .buildView(identifier) .withSchema(schema) diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java index b2eedc468bd4..25016100e00b 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java @@ -69,6 +69,7 @@ import org.projectnessie.model.ImmutableTableReference; import org.projectnessie.model.LogResponse.LogEntry; import org.projectnessie.model.Operation; +import org.projectnessie.model.TableReference; import org.projectnessie.model.Tag; public class TestNessieTable extends BaseTestIceberg { @@ -249,12 +250,12 @@ public void testRenameWithTableReference() throws NessieNotFoundException { TableIdentifier renameTableIdentifier = TableIdentifier.of(TABLE_IDENTIFIER.namespace(), renamedTableName); - ImmutableTableReference fromTableReference = + TableReference fromTableReference = ImmutableTableReference.builder() .reference(catalog.currentRefName()) .name(TABLE_IDENTIFIER.name()) .build(); - ImmutableTableReference toTableReference = + TableReference toTableReference = ImmutableTableReference.builder() .reference(catalog.currentRefName()) .name(renameTableIdentifier.name()) @@ -288,12 +289,12 @@ public void testRenameWithTableReferenceInvalidCase() throws NessieNotFoundExcep TableIdentifier renameTableIdentifier = TableIdentifier.of(TABLE_IDENTIFIER.namespace(), renamedTableName); - ImmutableTableReference fromTableReference = + TableReference fromTableReference = ImmutableTableReference.builder() .reference("Something") .name(TABLE_IDENTIFIER.name()) .build(); - ImmutableTableReference toTableReference = + TableReference toTableReference = ImmutableTableReference.builder() .reference(catalog.currentRefName()) .name(renameTableIdentifier.name()) diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java index ea721e937f4c..e134f2d888e6 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java @@ -29,7 +29,6 @@ import java.nio.file.Paths; import java.util.Comparator; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.TableIdentifier; @@ -49,6 +48,7 @@ import org.projectnessie.model.IcebergView; import org.projectnessie.model.ImmutableTableReference; import org.projectnessie.model.LogResponse.LogEntry; +import org.projectnessie.model.TableReference; public class TestNessieView extends BaseTestIceberg { @@ -130,10 +130,9 @@ public void verifyStateMovesForDML() throws Exception { Assertions.assertThat(contentInitialMain) .as("global-contents + snapshot-id equal on both branches in Nessie") .isEqualTo(contentInitialBranch); - Assertions.assertThat(viewInitialMain.currentVersion()).isNotNull(); + Assertions.assertThat(viewInitialMain.currentVersion().versionId()).isEqualTo(2); // 3. modify view in "main" branch - icebergView .replaceVersion() .withQuery("trino", "some other query") @@ -158,6 +157,7 @@ public void verifyStateMovesForDML() throws Exception { .describedAs("schema ID must be same across branches") .isEqualTo(contentsAfter1Branch.getSchemaId()); // verify updates + Assertions.assertThat(viewAfter1Main.currentVersion().versionId()).isEqualTo(3); Assertions.assertThat( ((SQLViewRepresentation) viewAfter1Main.currentVersion().representations().get(0)) .dialect()) @@ -178,12 +178,15 @@ public void verifyStateMovesForDML() throws Exception { // --> assert getValue() against both branches returns the updated metadata-location // verify view-metadata-location + Assertions.assertThat(contentsAfter2Main.getVersionId()).isEqualTo(4); Assertions.assertThat(contentsAfter2Main.getMetadataLocation()) .describedAs("metadata-location must change on %s", BRANCH) .isNotEqualTo(contentsAfter1Main.getMetadataLocation()); + Assertions.assertThat(contentsAfter1Main.getVersionId()).isEqualTo(3); Assertions.assertThat(contentsAfter2Branch.getMetadataLocation()) .describedAs("on-reference-state must not change on %s", testCaseBranch) .isEqualTo(contentsAfter1Branch.getMetadataLocation()); + Assertions.assertThat(viewAfter2Main.currentVersion().versionId()).isEqualTo(4); Assertions.assertThat( ((SQLViewRepresentation) viewAfter2Main.currentVersion().representations().get(0)) .dialect()) @@ -218,12 +221,12 @@ public void testRenameWithTableReference() throws NessieNotFoundException { TableIdentifier renameViewIdentifier = TableIdentifier.of(VIEW_IDENTIFIER.namespace(), renamedViewName); - ImmutableTableReference fromTableReference = + TableReference fromTableReference = ImmutableTableReference.builder() .reference(catalog.currentRefName()) .name(VIEW_IDENTIFIER.name()) .build(); - ImmutableTableReference toTableReference = + TableReference toTableReference = ImmutableTableReference.builder() .reference(catalog.currentRefName()) .name(renameViewIdentifier.name()) @@ -233,9 +236,13 @@ public void testRenameWithTableReference() throws NessieNotFoundException { TableIdentifier toIdentifier = TableIdentifier.of(VIEW_IDENTIFIER.namespace(), toTableReference.toString()); + View viewBeforeRename = catalog.loadView(fromIdentifier); catalog.renameView(fromIdentifier, toIdentifier); Assertions.assertThat(catalog.viewExists(fromIdentifier)).isFalse(); Assertions.assertThat(catalog.viewExists(toIdentifier)).isTrue(); + View viewAfterRename = catalog.loadView(toIdentifier); + Assertions.assertThat(viewBeforeRename.currentVersion().versionId()) + .isEqualTo(viewAfterRename.currentVersion().versionId()); Assertions.assertThat(catalog.dropView(toIdentifier)).isTrue(); @@ -248,12 +255,12 @@ public void testRenameWithTableReferenceInvalidCase() { TableIdentifier renameViewIdentifier = TableIdentifier.of(VIEW_IDENTIFIER.namespace(), renamedViewName); - ImmutableTableReference fromTableReference = + TableReference fromTableReference = ImmutableTableReference.builder() .reference("Something") .name(VIEW_IDENTIFIER.name()) .build(); - ImmutableTableReference toTableReference = + TableReference toTableReference = ImmutableTableReference.builder() .reference(catalog.currentRefName()) .name(renameViewIdentifier.name()) @@ -299,10 +306,12 @@ private void verifyCommitMetadata() throws NessieNotFoundException { .allSatisfy( logEntry -> { CommitMeta commit = logEntry.getCommitMeta(); - Assertions.assertThat(commit.getAuthor()).isNotNull().isNotEmpty(); - Assertions.assertThat(commit.getAuthor()).isEqualTo(System.getProperty("user.name")); - Assertions.assertThat(commit.getProperties().get(NessieUtil.APPLICATION_TYPE)) - .isEqualTo("iceberg"); + Assertions.assertThat(commit.getAuthor()) + .isNotNull() + .isNotEmpty() + .isEqualTo(System.getProperty("user.name")); + Assertions.assertThat(commit.getProperties()) + .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg"); Assertions.assertThat(commit.getMessage()).startsWith("Iceberg"); }); } @@ -317,16 +326,12 @@ public void testDrop() throws NessieNotFoundException { } @Test - public void testListviews() { + public void testListViews() { TableIdentifier newIdentifier = TableIdentifier.of(DB_NAME, "newView"); createView(catalog, newIdentifier, SCHEMA); - List tableIdents = catalog.listViews(VIEW_IDENTIFIER.namespace()); - List expectedIdents = - tableIdents.stream() - .filter(t -> t.equals(newIdentifier) || t.equals(VIEW_IDENTIFIER)) - .collect(Collectors.toList()); - Assertions.assertThat(expectedIdents).hasSize(2); + List viewIdents = catalog.listViews(VIEW_IDENTIFIER.namespace()); + Assertions.assertThat(viewIdents).contains(VIEW_IDENTIFIER, newIdentifier); Assertions.assertThat(catalog.viewExists(VIEW_IDENTIFIER)).isTrue(); Assertions.assertThat(catalog.viewExists(newIdentifier)).isTrue(); } diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieViewCatalog.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieViewCatalog.java index d5ba68eb7454..005e700f431b 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieViewCatalog.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieViewCatalog.java @@ -34,6 +34,7 @@ import org.apache.iceberg.view.ViewMetadata; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; @@ -64,7 +65,7 @@ public class TestNessieViewCatalog extends ViewCatalogTests { @RegisterExtension static NessieJaxRsExtension server = NessieJaxRsExtension.jaxRsExtension(() -> persist); - @TempDir public Path temp; + @TempDir private Path temp; private NessieCatalog catalog; private NessieApiV1 api; @@ -151,6 +152,7 @@ protected boolean requiresNamespaceCreate() { // Nessie adds extra properties (like commit id) on every operation. Hence, view metadata will not // be same after rename. @Override + @Test public void renameView() { TableIdentifier from = TableIdentifier.of("ns", "view"); TableIdentifier to = TableIdentifier.of("ns", "renamedView"); @@ -185,6 +187,7 @@ public void renameView() { } @Override + @Test public void renameViewUsingDifferentNamespace() { TableIdentifier from = TableIdentifier.of("ns", "view"); TableIdentifier to = TableIdentifier.of("other_ns", "renamedView"); From 3b44157498837f0f35b24632c51af1705e222ccd Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Fri, 8 Dec 2023 17:22:23 +0530 Subject: [PATCH 3/6] Address comments again --- .../apache/iceberg/nessie/NessieCatalog.java | 62 +++++++------ .../iceberg/nessie/NessieIcebergClient.java | 24 +++--- .../iceberg/nessie/NessieTableOperations.java | 8 +- .../org/apache/iceberg/nessie/NessieUtil.java | 86 +++++++++++-------- .../iceberg/nessie/NessieViewOperations.java | 6 +- .../apache/iceberg/nessie/TestNessieView.java | 4 - 6 files changed, 110 insertions(+), 80 deletions(-) diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java index 584cdfa4f846..ea5571bb12cc 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java @@ -232,17 +232,32 @@ protected String defaultWarehouseLocation(TableIdentifier table) { @Override public List listTables(Namespace namespace) { - return client.listContents(namespace, Content.Type.ICEBERG_TABLE); + return client.listTables(namespace); } @Override public boolean dropTable(TableIdentifier identifier, boolean purge) { - return dropContent(identifier, Content.Type.ICEBERG_TABLE); + TableReference tableReference = parseTableReference(identifier); + return client + .withReference(tableReference.getReference(), tableReference.getHash()) + .dropTable(identifierWithoutTableReference(identifier, tableReference), false); } @Override public void renameTable(TableIdentifier from, TableIdentifier to) { - renameContent(from, to, Content.Type.ICEBERG_TABLE); + TableReference fromTableReference = parseTableReference(from); + TableReference toTableReference = parseTableReference(to); + + validateReferenceForRename(fromTableReference, toTableReference, Content.Type.ICEBERG_TABLE); + + TableIdentifier fromIdentifier = + NessieUtil.removeCatalogName( + identifierWithoutTableReference(from, fromTableReference), name()); + TableIdentifier toIdentifier = + NessieUtil.removeCatalogName(identifierWithoutTableReference(to, toTableReference), name()); + client + .withReference(fromTableReference.getReference(), fromTableReference.getHash()) + .renameTable(fromIdentifier, toIdentifier); } @Override @@ -337,29 +352,36 @@ protected ViewOperations newViewOps(TableIdentifier identifier) { @Override public List listViews(Namespace namespace) { - return client.listContents(namespace, Content.Type.ICEBERG_VIEW); + return client.listViews(namespace); } @Override public boolean dropView(TableIdentifier identifier) { - return dropContent(identifier, Content.Type.ICEBERG_VIEW); - } - - @Override - public void renameView(TableIdentifier from, TableIdentifier to) { - renameContent(from, to, Content.Type.ICEBERG_VIEW); - } - - private boolean dropContent(TableIdentifier identifier, Content.Type type) { TableReference tableReference = parseTableReference(identifier); return client .withReference(tableReference.getReference(), tableReference.getHash()) - .dropContent(identifierWithoutTableReference(identifier, tableReference), false, type); + .dropView(identifierWithoutTableReference(identifier, tableReference), false); } - private void renameContent(TableIdentifier from, TableIdentifier to, Content.Type type) { + @Override + public void renameView(TableIdentifier from, TableIdentifier to) { TableReference fromTableReference = parseTableReference(from); TableReference toTableReference = parseTableReference(to); + + validateReferenceForRename(fromTableReference, toTableReference, Content.Type.ICEBERG_VIEW); + + TableIdentifier fromIdentifier = + NessieUtil.removeCatalogName( + identifierWithoutTableReference(from, fromTableReference), name()); + TableIdentifier toIdentifier = + NessieUtil.removeCatalogName(identifierWithoutTableReference(to, toTableReference), name()); + client + .withReference(fromTableReference.getReference(), fromTableReference.getHash()) + .renameView(fromIdentifier, toIdentifier); + } + + private void validateReferenceForRename( + TableReference fromTableReference, TableReference toTableReference, Content.Type type) { String fromReference = fromTableReference.hasReference() ? fromTableReference.getReference() @@ -377,15 +399,5 @@ private void renameContent(TableIdentifier from, TableIdentifier to, Content.Typ fromReference, toTableReference.getName(), toReference); - - TableIdentifier fromIdentifier = - NessieUtil.removeCatalogName( - identifierWithoutTableReference(from, fromTableReference), name()); - TableIdentifier toIdentifier = - NessieUtil.removeCatalogName(identifierWithoutTableReference(to, toTableReference), name()); - - client - .withReference(fromTableReference.getReference(), fromTableReference.getHash()) - .renameContent(fromIdentifier, toIdentifier, type); } } diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java index fd5adabe36af..8956dcea0c54 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java @@ -156,7 +156,7 @@ public List listViews(Namespace namespace) { } /** Lists Iceberg table or view from the given namespace */ - protected List listContents(Namespace namespace, Content.Type type) { + private List listContents(Namespace namespace, Content.Type type) { try { return withReference(api.getEntries()).get().getEntries().stream() .filter(namespacePredicate(namespace)) @@ -424,7 +424,7 @@ public void renameView(TableIdentifier from, TableIdentifier to) { renameContent(from, to, Content.Type.ICEBERG_VIEW); } - protected void renameContent(TableIdentifier from, TableIdentifier to, Content.Type type) { + private void renameContent(TableIdentifier from, TableIdentifier to, Content.Type type) { getRef().checkMutable(); IcebergContent existingFromContent = fetchContent(from); @@ -453,16 +453,18 @@ protected void renameContent(TableIdentifier from, TableIdentifier to, Content.T contentType, from, to, getRef().getName()), e); } catch (BaseNessieClientServerException e) { + CommitFailedException commitFailedException = + new CommitFailedException( + e, + "Cannot rename %s '%s' to '%s': the current reference is not up to date.", + contentType, + from, + to); + Optional exception = Optional.empty(); if (e instanceof NessieConflictException) { - NessieUtil.handleExceptionsForCommits(e, getRef().getName(), type); + exception = NessieUtil.handleExceptionsForCommits(e, getRef().getName(), type); } - - throw new CommitFailedException( - e, - "Cannot rename %s '%s' to '%s': the current reference is not up to date.", - contentType, - from, - to); + throw exception.orElse(commitFailedException); } catch (HttpClientException ex) { // Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant // to catch all kinds of network errors (e.g. connection reset). Network code implementation @@ -516,7 +518,7 @@ public boolean dropView(TableIdentifier identifier, boolean purge) { return dropContent(identifier, purge, Content.Type.ICEBERG_VIEW); } - protected boolean dropContent(TableIdentifier identifier, boolean purge, Content.Type type) { + private boolean dropContent(TableIdentifier identifier, boolean purge, Content.Type type) { getRef().checkMutable(); IcebergContent existingContent = fetchContent(identifier); diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java index ab633471284f..32ec41887939 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java @@ -134,7 +134,13 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { if (ex instanceof NessieConflictException || ex instanceof NessieNotFoundException) { failure = true; } - NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_TABLE); + + NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_TABLE) + .ifPresent( + exception -> { + throw exception; + }); + } catch (NessieBadRequestException ex) { failure = true; throw NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_TABLE) diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java index 0c5f5e358edf..8f4e704b7fef 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java @@ -215,23 +215,30 @@ public static ViewMetadata loadViewMetadata( .build(); } - static void handleExceptionsForCommits(Exception exception, String refName, Content.Type type) { + static Optional handleExceptionsForCommits( + Exception exception, String refName, Content.Type type) { if (exception instanceof NessieConflictException) { if (exception instanceof NessieReferenceConflictException) { // Throws a specialized exception, if possible - NessieUtil.maybeThrowSpecializedException( - (NessieReferenceConflictException) exception, type); + Optional specializedException = + NessieUtil.maybeUseSpecializedException( + (NessieReferenceConflictException) exception, type); + if (specializedException.isPresent()) { + return specializedException; + } } - throw new CommitFailedException( - exception, - "Cannot commit: Reference hash is out of date. Update the reference '%s' and try again", - refName); + return Optional.of( + new CommitFailedException( + exception, + "Cannot commit: Reference hash is out of date. Update the reference '%s' and try again", + refName)); } if (exception instanceof NessieNotFoundException) { - throw new RuntimeException( - String.format("Cannot commit: Reference '%s' no longer exists", refName), exception); + return Optional.of( + new RuntimeException( + String.format("Cannot commit: Reference '%s' no longer exists", refName), exception)); } if (exception instanceof HttpClientException) { @@ -239,8 +246,9 @@ static void handleExceptionsForCommits(Exception exception, String refName, Cont // to catch all kinds of network errors (e.g. connection reset). Network code implementation // details and all kinds of network devices can induce unexpected behavior. So better be // safe than sorry. - throw new CommitStateUnknownException(exception); + return Optional.of(new CommitStateUnknownException(exception)); } + return Optional.empty(); } static Optional handleBadRequestForCommit( @@ -269,42 +277,44 @@ static Optional handleBadRequestForCommit( return Optional.empty(); } - private static void maybeThrowSpecializedException( + private static Optional maybeUseSpecializedException( NessieReferenceConflictException ex, Content.Type type) { String contentType = contentTypeString(type); - NessieUtil.extractSingleConflict( + Optional singleConflict = + NessieUtil.extractSingleConflict( ex, EnumSet.of( Conflict.ConflictType.NAMESPACE_ABSENT, Conflict.ConflictType.NAMESPACE_NOT_EMPTY, Conflict.ConflictType.KEY_DOES_NOT_EXIST, - Conflict.ConflictType.KEY_EXISTS)) - .ifPresent( - conflict -> { - switch (conflict.conflictType()) { - case NAMESPACE_ABSENT: - throw new NoSuchNamespaceException( - ex, "Namespace does not exist: %s", conflict.key()); - case NAMESPACE_NOT_EMPTY: - throw new NamespaceNotEmptyException( - ex, "Namespace not empty: %s", conflict.key()); - case KEY_DOES_NOT_EXIST: - if (type == Content.Type.ICEBERG_VIEW) { - throw new NoSuchViewException( - ex, "%s does not exist: %s", contentType, conflict.key()); - } else { - throw new NoSuchTableException( - ex, "%s does not exist: %s", contentType, conflict.key()); - } - case KEY_EXISTS: - throw new AlreadyExistsException( - ex, "%s already exists: %s", contentType, conflict.key()); - default: - // Explicit fall-through - break; - } - }); + Conflict.ConflictType.KEY_EXISTS)); + if (!singleConflict.isPresent()) { + return Optional.empty(); + } + + Conflict conflict = singleConflict.get(); + switch (conflict.conflictType()) { + case NAMESPACE_ABSENT: + return Optional.of( + new NoSuchNamespaceException(ex, "Namespace does not exist: %s", conflict.key())); + case NAMESPACE_NOT_EMPTY: + return Optional.of( + new NamespaceNotEmptyException(ex, "Namespace not empty: %s", conflict.key())); + case KEY_DOES_NOT_EXIST: + if (type == Content.Type.ICEBERG_VIEW) { + return Optional.of( + new NoSuchViewException(ex, "%s does not exist: %s", contentType, conflict.key())); + } else { + return Optional.of( + new NoSuchTableException(ex, "%s does not exist: %s", contentType, conflict.key())); + } + case KEY_EXISTS: + return Optional.of( + new AlreadyExistsException(ex, "%s already exists: %s", contentType, conflict.key())); + default: + return Optional.empty(); + } } static String contentTypeString(Content.Type type) { diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java index a348e28fb66f..3e8781280f5b 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java @@ -113,7 +113,11 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { if (ex instanceof NessieConflictException || ex instanceof NessieNotFoundException) { failure = true; } - NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_VIEW); + NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_VIEW) + .ifPresent( + exception -> { + throw exception; + }); } catch (NessieBadRequestException ex) { failure = true; throw NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_VIEW).orElse(ex); diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java index e134f2d888e6..656363ff072b 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieView.java @@ -236,13 +236,9 @@ public void testRenameWithTableReference() throws NessieNotFoundException { TableIdentifier toIdentifier = TableIdentifier.of(VIEW_IDENTIFIER.namespace(), toTableReference.toString()); - View viewBeforeRename = catalog.loadView(fromIdentifier); catalog.renameView(fromIdentifier, toIdentifier); Assertions.assertThat(catalog.viewExists(fromIdentifier)).isFalse(); Assertions.assertThat(catalog.viewExists(toIdentifier)).isTrue(); - View viewAfterRename = catalog.loadView(toIdentifier); - Assertions.assertThat(viewBeforeRename.currentVersion().versionId()) - .isEqualTo(viewAfterRename.currentVersion().versionId()); Assertions.assertThat(catalog.dropView(toIdentifier)).isTrue(); From 029dc6c9624cbe76d47d19b2bd65c3b0608640c8 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 11 Dec 2023 13:57:51 +0530 Subject: [PATCH 4/6] Fix test failures --- .../iceberg/nessie/NessieTableOperations.java | 18 ++---------------- .../iceberg/nessie/NessieViewOperations.java | 15 ++------------- 2 files changed, 4 insertions(+), 29 deletions(-) diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java index 32ec41887939..6ce3e1b763ca 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java @@ -21,17 +21,16 @@ import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; -import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; import org.projectnessie.client.http.HttpClientException; import org.projectnessie.error.NessieBadRequestException; import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; import org.projectnessie.model.IcebergTable; -import org.projectnessie.model.IcebergView; import org.projectnessie.model.Reference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,18 +88,7 @@ protected void doRefresh() { this.table = content .unwrap(IcebergTable.class) - .orElseThrow( - () -> { - if (content instanceof IcebergView) { - return new AlreadyExistsException( - "View with same name already exists: %s", key); - } else { - return new AlreadyExistsException( - "Cannot refresh Iceberg table: " - + "Nessie points to a non-Iceberg object for path: %s.", - key); - } - }); + .orElseThrow(() -> new NessieContentNotFoundException(key, reference.getName())); metadataLocation = table.getMetadataLocation(); } } catch (NessieNotFoundException ex) { @@ -134,13 +122,11 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { if (ex instanceof NessieConflictException || ex instanceof NessieNotFoundException) { failure = true; } - NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_TABLE) .ifPresent( exception -> { throw exception; }); - } catch (NessieBadRequestException ex) { failure = true; throw NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_TABLE) diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java index 3e8781280f5b..3974ecbac454 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.nessie; -import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.view.BaseViewOperations; @@ -27,10 +26,10 @@ import org.projectnessie.client.http.HttpClientException; import org.projectnessie.error.NessieBadRequestException; import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; -import org.projectnessie.model.IcebergTable; import org.projectnessie.model.IcebergView; import org.projectnessie.model.Reference; import org.slf4j.Logger; @@ -74,17 +73,7 @@ public void doRefresh() { this.icebergView = content .unwrap(IcebergView.class) - .orElseThrow( - () -> { - if (content instanceof IcebergTable) { - return new AlreadyExistsException( - "Table with same name already exists: %s in %s", key, reference); - } else { - return new AlreadyExistsException( - "Cannot refresh Iceberg view: Nessie points to a non-Iceberg object for path: %s in %s", - key, reference); - } - }); + .orElseThrow(() -> new NessieContentNotFoundException(key, reference.getName())); metadataLocation = icebergView.getMetadataLocation(); } } catch (NessieNotFoundException ex) { From 589e428b67754e49d570ef454ac472820dba743f Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 11 Dec 2023 19:51:54 +0530 Subject: [PATCH 5/6] Address toLowerCase() --- .../main/java/org/apache/iceberg/nessie/NessieCatalog.java | 3 ++- .../org/apache/iceberg/nessie/NessieIcebergClient.java | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java index ea5571bb12cc..70c632738a7b 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -394,7 +395,7 @@ private void validateReferenceForRename( fromReference.equalsIgnoreCase(toReference), "Cannot rename %s '%s' on reference '%s' to '%s' on reference '%s':" + " source and target references must be the same.", - NessieUtil.contentTypeString(type).toLowerCase(), + NessieUtil.contentTypeString(type).toLowerCase(Locale.ENGLISH), fromTableReference.getName(), fromReference, toTableReference.getName(), diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java index 8956dcea0c54..5d5b4ce69e6c 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -167,7 +168,7 @@ private List listContents(Namespace namespace, Content.Type typ throw new NoSuchNamespaceException( ex, "Unable to list %ss due to missing ref '%s'", - NessieUtil.contentTypeString(type).toLowerCase(), + NessieUtil.contentTypeString(type).toLowerCase(Locale.ENGLISH), getRef().getName()); } } @@ -433,7 +434,7 @@ private void renameContent(TableIdentifier from, TableIdentifier to, Content.Typ IcebergContent existingToContent = fetchContent(to); validateToContentForRename(from, to, existingToContent); - String contentType = NessieUtil.contentTypeString(type).toLowerCase(); + String contentType = NessieUtil.contentTypeString(type).toLowerCase(Locale.ENGLISH); try { commitRetry( String.format("Iceberg rename %s from '%s' to '%s'", contentType, from, to), @@ -533,7 +534,7 @@ private boolean dropContent(TableIdentifier identifier, boolean purge, Content.T identifier, NessieUtil.contentTypeString(type))); } - String contentType = NessieUtil.contentTypeString(type).toLowerCase(); + String contentType = NessieUtil.contentTypeString(type).toLowerCase(Locale.ENGLISH); if (purge) { LOG.info( From 5981bad276a4ed35b41e2fa6fb837b5fc0856a73 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 11 Dec 2023 21:45:31 +0530 Subject: [PATCH 6/6] Handle comments --- .../src/main/java/org/apache/iceberg/nessie/NessieCatalog.java | 2 +- .../java/org/apache/iceberg/nessie/NessieIcebergClient.java | 2 +- nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java | 3 +++ .../java/org/apache/iceberg/nessie/NessieViewOperations.java | 1 + 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java index 70c632738a7b..13a7d70cc4b2 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java @@ -241,7 +241,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { TableReference tableReference = parseTableReference(identifier); return client .withReference(tableReference.getReference(), tableReference.getHash()) - .dropTable(identifierWithoutTableReference(identifier, tableReference), false); + .dropTable(identifierWithoutTableReference(identifier, tableReference), purge); } @Override diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java index 5d5b4ce69e6c..5b5d4b194e44 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java @@ -546,7 +546,7 @@ private boolean dropContent(TableIdentifier identifier, boolean purge, Content.T // We try to drop the content. Simple retry after ref update. try { commitRetry( - String.format("Iceberg delete table %s", identifier), + String.format("Iceberg delete %s %s", contentType, identifier), Operation.Delete.of(NessieUtil.toKey(identifier))); return true; } catch (NessieConflictException e) { diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java index 8f4e704b7fef..1d2cc9ccb111 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java @@ -248,6 +248,7 @@ static Optional handleExceptionsForCommits( // safe than sorry. return Optional.of(new CommitStateUnknownException(exception)); } + return Optional.empty(); } @@ -274,6 +275,7 @@ static Optional handleBadRequestForCommit( } catch (NessieNotFoundException e) { return Optional.of(new RuntimeException(e)); } + return Optional.empty(); } @@ -325,6 +327,7 @@ static String contentTypeString(Content.Type type) { } else if (type.equals(Content.Type.NAMESPACE)) { return "Namespace"; } + throw new IllegalArgumentException("Unsupported Nessie content type " + type.name()); } } diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java index 3974ecbac454..e1d46550358c 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java @@ -102,6 +102,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { if (ex instanceof NessieConflictException || ex instanceof NessieNotFoundException) { failure = true; } + NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_VIEW) .ifPresent( exception -> {