Skip to content

Commit

Permalink
Address comments again
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Dec 11, 2023
1 parent a52e0d5 commit 3b44157
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 80 deletions.
62 changes: 37 additions & 25 deletions nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,17 +232,32 @@ protected String defaultWarehouseLocation(TableIdentifier table) {

@Override
public List<TableIdentifier> listTables(Namespace namespace) {
return client.listContents(namespace, Content.Type.ICEBERG_TABLE);
return client.listTables(namespace);
}

@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
return dropContent(identifier, Content.Type.ICEBERG_TABLE);
TableReference tableReference = parseTableReference(identifier);
return client
.withReference(tableReference.getReference(), tableReference.getHash())
.dropTable(identifierWithoutTableReference(identifier, tableReference), false);
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
renameContent(from, to, Content.Type.ICEBERG_TABLE);
TableReference fromTableReference = parseTableReference(from);
TableReference toTableReference = parseTableReference(to);

validateReferenceForRename(fromTableReference, toTableReference, Content.Type.ICEBERG_TABLE);

TableIdentifier fromIdentifier =
NessieUtil.removeCatalogName(
identifierWithoutTableReference(from, fromTableReference), name());
TableIdentifier toIdentifier =
NessieUtil.removeCatalogName(identifierWithoutTableReference(to, toTableReference), name());
client
.withReference(fromTableReference.getReference(), fromTableReference.getHash())
.renameTable(fromIdentifier, toIdentifier);
}

@Override
Expand Down Expand Up @@ -337,29 +352,36 @@ protected ViewOperations newViewOps(TableIdentifier identifier) {

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
return client.listContents(namespace, Content.Type.ICEBERG_VIEW);
return client.listViews(namespace);
}

@Override
public boolean dropView(TableIdentifier identifier) {
return dropContent(identifier, Content.Type.ICEBERG_VIEW);
}

@Override
public void renameView(TableIdentifier from, TableIdentifier to) {
renameContent(from, to, Content.Type.ICEBERG_VIEW);
}

private boolean dropContent(TableIdentifier identifier, Content.Type type) {
TableReference tableReference = parseTableReference(identifier);
return client
.withReference(tableReference.getReference(), tableReference.getHash())
.dropContent(identifierWithoutTableReference(identifier, tableReference), false, type);
.dropView(identifierWithoutTableReference(identifier, tableReference), false);
}

private void renameContent(TableIdentifier from, TableIdentifier to, Content.Type type) {
@Override
public void renameView(TableIdentifier from, TableIdentifier to) {
TableReference fromTableReference = parseTableReference(from);
TableReference toTableReference = parseTableReference(to);

validateReferenceForRename(fromTableReference, toTableReference, Content.Type.ICEBERG_VIEW);

TableIdentifier fromIdentifier =
NessieUtil.removeCatalogName(
identifierWithoutTableReference(from, fromTableReference), name());
TableIdentifier toIdentifier =
NessieUtil.removeCatalogName(identifierWithoutTableReference(to, toTableReference), name());
client
.withReference(fromTableReference.getReference(), fromTableReference.getHash())
.renameView(fromIdentifier, toIdentifier);
}

private void validateReferenceForRename(
TableReference fromTableReference, TableReference toTableReference, Content.Type type) {
String fromReference =
fromTableReference.hasReference()
? fromTableReference.getReference()
Expand All @@ -377,15 +399,5 @@ private void renameContent(TableIdentifier from, TableIdentifier to, Content.Typ
fromReference,
toTableReference.getName(),
toReference);

TableIdentifier fromIdentifier =
NessieUtil.removeCatalogName(
identifierWithoutTableReference(from, fromTableReference), name());
TableIdentifier toIdentifier =
NessieUtil.removeCatalogName(identifierWithoutTableReference(to, toTableReference), name());

client
.withReference(fromTableReference.getReference(), fromTableReference.getHash())
.renameContent(fromIdentifier, toIdentifier, type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public List<TableIdentifier> listViews(Namespace namespace) {
}

/** Lists Iceberg table or view from the given namespace */
protected List<TableIdentifier> listContents(Namespace namespace, Content.Type type) {
private List<TableIdentifier> listContents(Namespace namespace, Content.Type type) {
try {
return withReference(api.getEntries()).get().getEntries().stream()
.filter(namespacePredicate(namespace))
Expand Down Expand Up @@ -424,7 +424,7 @@ public void renameView(TableIdentifier from, TableIdentifier to) {
renameContent(from, to, Content.Type.ICEBERG_VIEW);
}

protected void renameContent(TableIdentifier from, TableIdentifier to, Content.Type type) {
private void renameContent(TableIdentifier from, TableIdentifier to, Content.Type type) {
getRef().checkMutable();

IcebergContent existingFromContent = fetchContent(from);
Expand Down Expand Up @@ -453,16 +453,18 @@ protected void renameContent(TableIdentifier from, TableIdentifier to, Content.T
contentType, from, to, getRef().getName()),
e);
} catch (BaseNessieClientServerException e) {
CommitFailedException commitFailedException =
new CommitFailedException(
e,
"Cannot rename %s '%s' to '%s': the current reference is not up to date.",
contentType,
from,
to);
Optional<RuntimeException> exception = Optional.empty();
if (e instanceof NessieConflictException) {
NessieUtil.handleExceptionsForCommits(e, getRef().getName(), type);
exception = NessieUtil.handleExceptionsForCommits(e, getRef().getName(), type);
}

throw new CommitFailedException(
e,
"Cannot rename %s '%s' to '%s': the current reference is not up to date.",
contentType,
from,
to);
throw exception.orElse(commitFailedException);
} catch (HttpClientException ex) {
// Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant
// to catch all kinds of network errors (e.g. connection reset). Network code implementation
Expand Down Expand Up @@ -516,7 +518,7 @@ public boolean dropView(TableIdentifier identifier, boolean purge) {
return dropContent(identifier, purge, Content.Type.ICEBERG_VIEW);
}

protected boolean dropContent(TableIdentifier identifier, boolean purge, Content.Type type) {
private boolean dropContent(TableIdentifier identifier, boolean purge, Content.Type type) {
getRef().checkMutable();

IcebergContent existingContent = fetchContent(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,13 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
if (ex instanceof NessieConflictException || ex instanceof NessieNotFoundException) {
failure = true;
}
NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_TABLE);

NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_TABLE)
.ifPresent(
exception -> {
throw exception;
});

} catch (NessieBadRequestException ex) {
failure = true;
throw NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_TABLE)
Expand Down
86 changes: 48 additions & 38 deletions nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,32 +215,40 @@ public static ViewMetadata loadViewMetadata(
.build();
}

static void handleExceptionsForCommits(Exception exception, String refName, Content.Type type) {
static Optional<RuntimeException> handleExceptionsForCommits(
Exception exception, String refName, Content.Type type) {
if (exception instanceof NessieConflictException) {
if (exception instanceof NessieReferenceConflictException) {
// Throws a specialized exception, if possible
NessieUtil.maybeThrowSpecializedException(
(NessieReferenceConflictException) exception, type);
Optional<RuntimeException> specializedException =
NessieUtil.maybeUseSpecializedException(
(NessieReferenceConflictException) exception, type);
if (specializedException.isPresent()) {
return specializedException;
}
}

throw new CommitFailedException(
exception,
"Cannot commit: Reference hash is out of date. Update the reference '%s' and try again",
refName);
return Optional.of(
new CommitFailedException(
exception,
"Cannot commit: Reference hash is out of date. Update the reference '%s' and try again",
refName));
}

if (exception instanceof NessieNotFoundException) {
throw new RuntimeException(
String.format("Cannot commit: Reference '%s' no longer exists", refName), exception);
return Optional.of(
new RuntimeException(
String.format("Cannot commit: Reference '%s' no longer exists", refName), exception));
}

if (exception instanceof HttpClientException) {
// Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant
// to catch all kinds of network errors (e.g. connection reset). Network code implementation
// details and all kinds of network devices can induce unexpected behavior. So better be
// safe than sorry.
throw new CommitStateUnknownException(exception);
return Optional.of(new CommitStateUnknownException(exception));
}
return Optional.empty();
}

static Optional<RuntimeException> handleBadRequestForCommit(
Expand Down Expand Up @@ -269,42 +277,44 @@ static Optional<RuntimeException> handleBadRequestForCommit(
return Optional.empty();
}

private static void maybeThrowSpecializedException(
private static Optional<RuntimeException> maybeUseSpecializedException(
NessieReferenceConflictException ex, Content.Type type) {
String contentType = contentTypeString(type);

NessieUtil.extractSingleConflict(
Optional<Conflict> singleConflict =
NessieUtil.extractSingleConflict(
ex,
EnumSet.of(
Conflict.ConflictType.NAMESPACE_ABSENT,
Conflict.ConflictType.NAMESPACE_NOT_EMPTY,
Conflict.ConflictType.KEY_DOES_NOT_EXIST,
Conflict.ConflictType.KEY_EXISTS))
.ifPresent(
conflict -> {
switch (conflict.conflictType()) {
case NAMESPACE_ABSENT:
throw new NoSuchNamespaceException(
ex, "Namespace does not exist: %s", conflict.key());
case NAMESPACE_NOT_EMPTY:
throw new NamespaceNotEmptyException(
ex, "Namespace not empty: %s", conflict.key());
case KEY_DOES_NOT_EXIST:
if (type == Content.Type.ICEBERG_VIEW) {
throw new NoSuchViewException(
ex, "%s does not exist: %s", contentType, conflict.key());
} else {
throw new NoSuchTableException(
ex, "%s does not exist: %s", contentType, conflict.key());
}
case KEY_EXISTS:
throw new AlreadyExistsException(
ex, "%s already exists: %s", contentType, conflict.key());
default:
// Explicit fall-through
break;
}
});
Conflict.ConflictType.KEY_EXISTS));
if (!singleConflict.isPresent()) {
return Optional.empty();
}

Conflict conflict = singleConflict.get();
switch (conflict.conflictType()) {
case NAMESPACE_ABSENT:
return Optional.of(
new NoSuchNamespaceException(ex, "Namespace does not exist: %s", conflict.key()));
case NAMESPACE_NOT_EMPTY:
return Optional.of(
new NamespaceNotEmptyException(ex, "Namespace not empty: %s", conflict.key()));
case KEY_DOES_NOT_EXIST:
if (type == Content.Type.ICEBERG_VIEW) {
return Optional.of(
new NoSuchViewException(ex, "%s does not exist: %s", contentType, conflict.key()));
} else {
return Optional.of(
new NoSuchTableException(ex, "%s does not exist: %s", contentType, conflict.key()));
}
case KEY_EXISTS:
return Optional.of(
new AlreadyExistsException(ex, "%s already exists: %s", contentType, conflict.key()));
default:
return Optional.empty();
}
}

static String contentTypeString(Content.Type type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) {
if (ex instanceof NessieConflictException || ex instanceof NessieNotFoundException) {
failure = true;
}
NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_VIEW);
NessieUtil.handleExceptionsForCommits(ex, client.refName(), Content.Type.ICEBERG_VIEW)
.ifPresent(
exception -> {
throw exception;
});
} catch (NessieBadRequestException ex) {
failure = true;
throw NessieUtil.handleBadRequestForCommit(client, key, Content.Type.ICEBERG_VIEW).orElse(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,9 @@ public void testRenameWithTableReference() throws NessieNotFoundException {
TableIdentifier toIdentifier =
TableIdentifier.of(VIEW_IDENTIFIER.namespace(), toTableReference.toString());

View viewBeforeRename = catalog.loadView(fromIdentifier);
catalog.renameView(fromIdentifier, toIdentifier);
Assertions.assertThat(catalog.viewExists(fromIdentifier)).isFalse();
Assertions.assertThat(catalog.viewExists(toIdentifier)).isTrue();
View viewAfterRename = catalog.loadView(toIdentifier);
Assertions.assertThat(viewBeforeRename.currentVersion().versionId())
.isEqualTo(viewAfterRename.currentVersion().versionId());

Assertions.assertThat(catalog.dropView(toIdentifier)).isTrue();

Expand Down

0 comments on commit 3b44157

Please sign in to comment.