From eeef2ad0084b643b1f5b5eb998a3a2f3f1b9ab71 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 8 Jun 2021 22:53:31 -0700 Subject: [PATCH 1/2] AWS: add DynamoDb catalog --- .../aws/dynamodb/DynamoDbCatalogTest.java | 289 +++++++++ .../org/apache/iceberg/aws/AwsProperties.java | 22 + .../iceberg/aws/dynamodb/DynamoDbCatalog.java | 613 ++++++++++++++++++ .../aws/dynamodb/DynamoDbTableOperations.java | 215 ++++++ 4 files changed, 1139 insertions(+) create mode 100644 aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java create mode 100644 aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java create mode 100644 aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java diff --git a/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java new file mode 100644 index 000000000000..8d4273c1625f --- /dev/null +++ b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.dynamodb; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.aws.AwsClientFactories; +import org.apache.iceberg.aws.AwsClientFactory; +import org.apache.iceberg.aws.AwsIntegTestUtil; +import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; + +public class DynamoDbCatalogTest { + + private static final ForkJoinPool POOL = new ForkJoinPool(16); + private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); + + private static String catalogTableName; + private static DynamoDbClient dynamo; + private static S3Client s3; + private static DynamoDbCatalog catalog; + private static String testBucket; + + @BeforeClass + public static void beforeClass() { + catalogTableName = genRandomName(); + AwsClientFactory clientFactory = AwsClientFactories.defaultFactory(); + dynamo = clientFactory.dynamo(); + s3 = clientFactory.s3(); + catalog = new DynamoDbCatalog(); + testBucket = AwsIntegTestUtil.testBucketName(); + catalog.initialize("test", ImmutableMap.of( + AwsProperties.DYNAMODB_TABLE_NAME, catalogTableName, + CatalogProperties.WAREHOUSE_LOCATION, "s3://" + testBucket + "/" + genRandomName())); + } + + @AfterClass + public static void afterClass() { + dynamo.deleteTable(DeleteTableRequest.builder().tableName(catalogTableName).build()); + } + + @Test + public void testCreateNamespace() { + Namespace namespace = Namespace.of(genRandomName()); + catalog.createNamespace(namespace); + GetItemResponse response = dynamo.getItem(GetItemRequest.builder() + .tableName(catalogTableName) + .key(DynamoDbCatalog.namespacePrimaryKey(namespace)) + .build()); + Assert.assertTrue("namespace must exist", response.hasItem()); + Assert.assertEquals("namespace must be stored in DynamoDB", + namespace.toString(), response.item().get("namespace").s()); + + AssertHelpers.assertThrows("should not create duplicated namespace", + AlreadyExistsException.class, + "already exists", + () -> catalog.createNamespace(namespace)); + } + + @Test + public void testCreateNamespaceBadName() { + AssertHelpers.assertThrows("should not create namespace with empty level", + ValidationException.class, + "must not be empty", + () -> catalog.createNamespace(Namespace.of("a", "", "b"))); + + AssertHelpers.assertThrows("should not create namespace with dot in level", + ValidationException.class, + "must not contain dot", + () -> catalog.createNamespace(Namespace.of("a", "b.c"))); + } + + @Test + public void testListSubNamespaces() { + Namespace parent = Namespace.of(genRandomName()); + List namespaceList = IntStream.range(0, 3) + .mapToObj(i -> Namespace.of(parent.toString(), genRandomName())) + .collect(Collectors.toList()); + catalog.createNamespace(parent); + namespaceList.forEach(ns -> catalog.createNamespace(ns)); + Assert.assertEquals(4, catalog.listNamespaces(parent).size()); + } + + @Test + public void testNamespaceProperties() { + Namespace namespace = Namespace.of(genRandomName()); + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + catalog.createNamespace(namespace, properties); + Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace)); + + properties.put("key3", "val3"); + properties.put("key2", "val2-1"); + catalog.setProperties(namespace, properties); + Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace)); + + properties.remove("key3"); + catalog.removeProperties(namespace, Sets.newHashSet("key3")); + Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace)); + } + + @Test + public void testCreateTable() { + Namespace namespace = Namespace.of(genRandomName()); + catalog.createNamespace(namespace); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName()); + catalog.createTable(tableIdentifier, SCHEMA); + GetItemResponse response = dynamo.getItem(GetItemRequest.builder() + .tableName(catalogTableName) + .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)) + .build()); + Assert.assertTrue("table must exist", response.hasItem()); + Assert.assertEquals("table must be stored in DynamoDB with table identifier as partition key", + tableIdentifier.toString(), response.item().get("identifier").s()); + Assert.assertEquals("table must be stored in DynamoDB with namespace as sort key", + namespace.toString(), response.item().get("namespace").s()); + + AssertHelpers.assertThrows("should not create duplicated table", + AlreadyExistsException.class, + "already exists", + () -> catalog.createTable(tableIdentifier, SCHEMA)); + } + + @Test + public void testCreateTableBadName() { + Namespace namespace = Namespace.of(genRandomName()); + catalog.createNamespace(namespace); + AssertHelpers.assertThrows("should not create table name with empty namespace", + ValidationException.class, + "Table namespace must not be empty", + () -> catalog.createTable(TableIdentifier.of(Namespace.empty(), "a"), SCHEMA)); + + AssertHelpers.assertThrows("should not create table name with dot", + ValidationException.class, + "must not contain dot", + () -> catalog.createTable(TableIdentifier.of(namespace, "a.b"), SCHEMA)); + } + + @Test + public void testListTable() { + Namespace namespace = Namespace.of(genRandomName()); + catalog.createNamespace(namespace); + List tableIdentifiers = IntStream.range(0, 3) + .mapToObj(i -> TableIdentifier.of(namespace, genRandomName())) + .collect(Collectors.toList()); + tableIdentifiers.forEach(id -> catalog.createTable(id, SCHEMA)); + Assert.assertEquals(3, catalog.listTables(namespace).size()); + } + + @Test + public void testDropTable() { + Namespace namespace = Namespace.of(genRandomName()); + catalog.createNamespace(namespace); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName()); + catalog.createTable(tableIdentifier, SCHEMA); + String metadataLocation = dynamo.getItem(GetItemRequest.builder() + .tableName(catalogTableName) + .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build()) + .item().get("p.metadata_location").s(); + catalog.dropTable(tableIdentifier, true); + Assert.assertFalse("table entry should not exist in dynamo", + dynamo.getItem(GetItemRequest.builder() + .tableName(catalogTableName) + .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build()) + .hasItem()); + AssertHelpers.assertThrows("metadata location should be deleted", + NoSuchKeyException.class, + () -> s3.headObject(HeadObjectRequest.builder() + .bucket(testBucket) + .key(metadataLocation.substring(testBucket.length() + 6)) // s3:// + end slash + .build())); + } + + @Test + public void testRenameTable() { + Namespace namespace = Namespace.of(genRandomName()); + catalog.createNamespace(namespace); + Namespace namespace2 = Namespace.of(genRandomName()); + catalog.createNamespace(namespace2); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName()); + catalog.createTable(tableIdentifier, SCHEMA); + TableIdentifier tableIdentifier2 = TableIdentifier.of(namespace2, genRandomName()); + + AssertHelpers.assertThrows("should not be able to rename a table not exist", + NoSuchTableException.class, + "does not exist", + () -> catalog.renameTable(TableIdentifier.of(namespace, "a"), tableIdentifier2)); + + AssertHelpers.assertThrows("should not be able to rename an existing table", + AlreadyExistsException.class, + "already exists", + () -> catalog.renameTable(tableIdentifier, tableIdentifier)); + + String metadataLocation = dynamo.getItem(GetItemRequest.builder() + .tableName(catalogTableName) + .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build()) + .item().get("p.metadata_location").s(); + + catalog.renameTable(tableIdentifier, tableIdentifier2); + + String metadataLocation2 = dynamo.getItem(GetItemRequest.builder() + .tableName(catalogTableName) + .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier2)).build()) + .item().get("p.metadata_location").s(); + + Assert.assertEquals("metadata location should be copied to new table entry", + metadataLocation, metadataLocation2); + } + + @Test + public void testUpdateTable() { + Namespace namespace = Namespace.of(genRandomName()); + catalog.createNamespace(namespace); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName()); + catalog.createTable(tableIdentifier, SCHEMA); + Table table = catalog.loadTable(tableIdentifier); + table.updateSchema().addColumn("data", Types.StringType.get()).commit(); + table.refresh(); + Assert.assertEquals(2, table.schema().columns().size()); + } + + @Test + public void testConcurrentCommits() throws Exception { + Namespace namespace = Namespace.of(genRandomName()); + catalog.createNamespace(namespace); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName()); + catalog.createTable(tableIdentifier, SCHEMA); + Table table = catalog.loadTable(tableIdentifier); + POOL.submit(() -> IntStream.range(0, 16).parallel() + .forEach(i -> { + try { + table.updateSchema().addColumn(genRandomName(), Types.StringType.get()).commit(); + } catch (Exception e) { + // ignore + } + })).get(); + + Assert.assertEquals(2, table.schema().columns().size()); + } + + private static String genRandomName() { + return UUID.randomUUID().toString().replace("-", ""); + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java index ffc30c1f7aa7..0985559b8895 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Map; +import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.PropertyUtil; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; @@ -130,6 +131,12 @@ public class AwsProperties implements Serializable { */ public static final String S3FILEIO_ACL = "s3.acl"; + /** + * DynamoDB table name for {@link DynamoDbCatalog} + */ + public static final String DYNAMODB_TABLE_NAME = "dynamodb.table-name"; + public static final String DYNAMODB_TABLE_NAME_DEFAULT = "iceberg"; + /** * The implementation class of {@link AwsClientFactory} to customize AWS client configurations. * If set, all AWS clients will be initialized by the specified factory. @@ -180,6 +187,8 @@ public class AwsProperties implements Serializable { private String glueCatalogId; private boolean glueCatalogSkipArchive; + private String dynamoDbTableName; + public AwsProperties() { this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE; this.s3FileIoSseKey = null; @@ -193,6 +202,8 @@ public AwsProperties() { this.glueCatalogId = null; this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT; + + this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT; } public AwsProperties(Map properties) { @@ -236,6 +247,9 @@ public AwsProperties(Map properties) { this.s3FileIoAcl = ObjectCannedACL.fromValue(aclType); Preconditions.checkArgument(s3FileIoAcl == null || !s3FileIoAcl.equals(ObjectCannedACL.UNKNOWN_TO_SDK_VERSION), "Cannot support S3 CannedACL " + aclType); + + this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME, + DYNAMODB_TABLE_NAME_DEFAULT); } public String s3FileIoSseType() { @@ -317,4 +331,12 @@ public ObjectCannedACL s3FileIoAcl() { public void setS3FileIoAcl(ObjectCannedACL acl) { this.s3FileIoAcl = acl; } + + public String dynamoDbTableName() { + return dynamoDbTableName; + } + + public void setDynamoDbTableName(String name) { + this.dynamoDbTableName = name; + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java new file mode 100644 index 000000000000..e4414d08d831 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.dynamodb; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.aws.AwsClientFactories; +import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.Delete; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.Projection; +import software.amazon.awssdk.services.dynamodb.model.ProjectionType; +import software.amazon.awssdk.services.dynamodb.model.Put; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.QueryRequest; +import software.amazon.awssdk.services.dynamodb.model.QueryResponse; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.services.dynamodb.model.TableStatus; +import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; +import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; + +/** + * DynamoDB implementation of Iceberg catalog + */ +public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable { + + private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class); + private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5; + static final Joiner COMMA = Joiner.on(','); + + private static final String GSI_NAMESPACE_IDENTIFIER = "namespace-identifier"; + private static final String COL_IDENTIFIER = "identifier"; + private static final String COL_IDENTIFIER_NAMESPACE = "NAMESPACE"; + private static final String COL_NAMESPACE = "namespace"; + private static final String PROPERTY_COL_PREFIX = "p."; + private static final String PROPERTY_DEFAULT_LOCATION = "default_location"; + private static final String COL_CREATED_AT = "created_at"; + private static final String COL_UPDATED_AT = "updated_at"; + + // field used for optimistic locking + static final String COL_VERSION = "v"; + + private DynamoDbClient dynamo; + private Configuration hadoopConf; + private String catalogName; + private String warehousePath; + private AwsProperties awsProperties; + private FileIO fileIO; + + public DynamoDbCatalog() { + } + + @Override + public void initialize(String name, Map properties) { + initialize( + name, + properties.get(CatalogProperties.WAREHOUSE_LOCATION), + new AwsProperties(properties), + AwsClientFactories.from(properties).dynamo(), + initializeFileIO(properties)); + } + + @VisibleForTesting + void initialize(String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) { + this.catalogName = name; + this.awsProperties = properties; + this.warehousePath = cleanWarehousePath(path); + this.dynamo = client; + this.fileIO = io; + ensureCatalogTableExistsOrCreate(); + } + + @Override + public String name() { + return catalogName; + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + validateTableIdentifier(tableIdentifier); + return new DynamoDbTableOperations(dynamo, awsProperties, catalogName, fileIO, tableIdentifier); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + validateTableIdentifier(tableIdentifier); + GetItemResponse response = dynamo.getItem(GetItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .consistentRead(true) + .key(namespacePrimaryKey(tableIdentifier.namespace())) + .build()); + + if (!response.hasItem()) { + throw new NoSuchNamespaceException("Cannot find default warehouse location: namespace %s does not exist", + tableIdentifier.namespace()); + } + + String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION); + if (response.item().containsKey(defaultLocationCol)) { + return String.format("%s/%s", response.item().get(defaultLocationCol), tableIdentifier.name()); + } else { + return String.format("%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name()); + } + } + + @Override + public void createNamespace(Namespace namespace, Map metadata) { + validateNamespace(namespace); + Map values = namespacePrimaryKey(namespace); + setNewCatalogEntryMetadata(values); + metadata.forEach((key, value) -> values.put(toPropertyCol(key), AttributeValue.builder().s(value).build())); + + try { + dynamo.putItem(PutItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")") + .item(values) + .build()); + } catch (ConditionalCheckFailedException e) { + throw new AlreadyExistsException("Cannot create namespace %s: already exists", namespace); + } + } + + @Override + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + validateNamespace(namespace); + List namespaces = Lists.newArrayList(); + Map lastEvaluatedKey = null; + String condition = COL_IDENTIFIER + " = :identifier"; + Map conditionValues = Maps.newHashMap(); + conditionValues.put(":identifier", AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build()); + if (!namespace.isEmpty()) { + condition += " AND " + "begins_with(" + COL_NAMESPACE + ",:ns)"; + conditionValues.put(":ns", AttributeValue.builder().s(namespace.toString()).build()); + } + + do { + QueryResponse response = dynamo.query(QueryRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .consistentRead(true) + .keyConditionExpression(condition) + .expressionAttributeValues(conditionValues) + .exclusiveStartKey(lastEvaluatedKey) + .build()); + + if (response.hasItems()) { + for (Map item : response.items()) { + String ns = item.get(COL_NAMESPACE).s(); + namespaces.add(Namespace.of(ns.split("\\."))); + } + } + + lastEvaluatedKey = response.lastEvaluatedKey(); + } while (!lastEvaluatedKey.isEmpty()); + + return namespaces; + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + validateNamespace(namespace); + GetItemResponse response = dynamo.getItem(GetItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .consistentRead(true) + .key(namespacePrimaryKey(namespace)) + .build()); + + if (!response.hasItem()) { + throw new NoSuchNamespaceException("Cannot find namespace %s", namespace); + } + + return response.item().entrySet().stream() + .filter(e -> isProperty(e.getKey())) + .collect(Collectors.toMap(e -> toPropertyKey(e.getKey()), e -> e.getValue().s())); + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + validateNamespace(namespace); + if (!listTables(namespace).isEmpty()) { + throw new NamespaceNotEmptyException("Cannot delete non-empty namespace %s", namespace); + } + + try { + dynamo.deleteItem(DeleteItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .key(namespacePrimaryKey(namespace)) + .conditionExpression("attribute_exists(" + namespace + ")") + .build()); + return true; + } catch (ConditionalCheckFailedException e) { + return false; + } + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) throws NoSuchNamespaceException { + List updateParts = Lists.newArrayList(); + Map attributeNames = Maps.newHashMap(); + Map attributeValues = Maps.newHashMap(); + int idx = 0; + for (Map.Entry property : properties.entrySet()) { + String attributeValue = ":v" + idx; + String attributeKey = "#k" + idx; + idx++; + updateParts.add(attributeKey + " = " + attributeValue); + attributeNames.put(attributeKey, toPropertyCol(property.getKey())); + attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build()); + } + + updateCatalogEntryMetadata(updateParts, attributeValues); + String updateExpression = "SET " + COMMA.join(updateParts); + return updateProperties(namespace, updateExpression, attributeValues, attributeNames); + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) throws NoSuchNamespaceException { + List removeParts = Lists.newArrayList(properties.iterator()); + Map attributeNames = Maps.newHashMap(); + Map attributeValues = Maps.newHashMap(); + int idx = 0; + for (String property : properties) { + String attributeKey = "#k" + idx; + idx++; + removeParts.add(attributeKey); + attributeNames.put(attributeKey, toPropertyCol(property)); + } + + List updateParts = Lists.newArrayList(); + updateCatalogEntryMetadata(updateParts, attributeValues); + String updateExpression = "REMOVE " + COMMA.join(removeParts) + " SET " + COMMA.join(updateParts); + return updateProperties(namespace, updateExpression, attributeValues, attributeNames); + } + + @Override + public List listTables(Namespace namespace) { + List identifiers = Lists.newArrayList(); + Map lastEvaluatedKey; + String condition = COL_NAMESPACE + " = :ns"; + Map conditionValues = ImmutableMap.of( + ":ns", AttributeValue.builder().s(namespace.toString()).build()); + do { + QueryResponse response = dynamo.query(QueryRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .indexName(GSI_NAMESPACE_IDENTIFIER) + .keyConditionExpression(condition) + .expressionAttributeValues(conditionValues) + .build()); + + if (response.hasItems()) { + for (Map item : response.items()) { + String identifier = item.get(COL_IDENTIFIER).s(); + if (!COL_IDENTIFIER_NAMESPACE.equals(identifier)) { + identifiers.add(TableIdentifier.of(identifier.split("\\."))); + } + } + } + + lastEvaluatedKey = response.lastEvaluatedKey(); + } while (!lastEvaluatedKey.isEmpty()); + return identifiers; + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + Map key = tablePrimaryKey(identifier); + try { + GetItemResponse response = dynamo.getItem(GetItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .consistentRead(true) + .key(key) + .build()); + + if (!response.hasItem()) { + throw new NoSuchTableException("Cannot find table %s to drop", identifier); + } + + TableOperations ops = newTableOps(identifier); + TableMetadata lastMetadata = ops.current(); + dynamo.deleteItem(DeleteItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .key(tablePrimaryKey(identifier)) + .conditionExpression(COL_VERSION + " = :v") + .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION))) + .build()); + LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier); + + if (purge && lastMetadata != null) { + CatalogUtil.dropTableData(ops.io(), lastMetadata); + LOG.info("Table {} data purged", identifier); + } + + LOG.info("Dropped table: {}", identifier); + return true; + } catch (ConditionalCheckFailedException e) { + LOG.error("Cannot complete drop table operation for {}: commit conflict", identifier, e); + return false; + } catch (Exception e) { + LOG.error("Cannot complete drop table operation for {}: unexpected exception", identifier, e); + throw e; + } + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + Map fromKey = tablePrimaryKey(from); + Map toKey = tablePrimaryKey(to); + + GetItemResponse fromResponse = dynamo.getItem(GetItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .consistentRead(true) + .key(fromKey) + .build()); + + if (!fromResponse.hasItem()) { + throw new NoSuchTableException("Cannot rename table %s to %s: %s does not exist", from, to, from); + } + + GetItemResponse toResponse = dynamo.getItem(GetItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .consistentRead(true) + .key(toKey) + .build()); + + if (toResponse.hasItem()) { + throw new AlreadyExistsException("Cannot rename table %s to %s: %s already exists", from, to, to); + } + + fromResponse.item().entrySet().stream() + .filter(e -> isProperty(e.getKey())) + .forEach(e -> toKey.put(e.getKey(), e.getValue())); + + setNewCatalogEntryMetadata(toKey); + + dynamo.transactWriteItems(TransactWriteItemsRequest.builder() + .transactItems( + TransactWriteItem.builder() + .delete(Delete.builder() + .tableName(awsProperties.dynamoDbTableName()) + .key(fromKey) + .conditionExpression(COL_VERSION + " = :v") + .expressionAttributeValues(ImmutableMap.of(":v", fromResponse.item().get(COL_VERSION))) + .build()) + .build(), + TransactWriteItem.builder() + .put(Put.builder() + .tableName(awsProperties.dynamoDbTableName()) + .item(toKey) + .conditionExpression("attribute_not_exists(" + COL_VERSION + ")") + .build()) + .build()) + .build()); + + LOG.info("Successfully renamed table from {} to {}", from, to); + } + + @Override + public void setConf(Configuration conf) { + hadoopConf = conf; + } + + @Override + public Configuration getConf() { + return hadoopConf; + } + + @Override + public void close() throws IOException { + dynamo.close(); + } + + /** + * The property used to set a default location for tables in a namespace. + * Call {@link #setProperties(Namespace, Map)} to set a path value using this property for a namespace, + * then all tables in the namespace will have default table root path under that given path. + * @return default location property key + */ + public static String defaultLocationProperty() { + return PROPERTY_DEFAULT_LOCATION; + } + + static String toPropertyCol(String propertyKey) { + return PROPERTY_COL_PREFIX + propertyKey; + } + + static boolean isProperty(String dynamoCol) { + return dynamoCol.startsWith(PROPERTY_COL_PREFIX); + } + + static String toPropertyKey(String propertyCol) { + return propertyCol.substring(PROPERTY_COL_PREFIX.length()); + } + + static Map namespacePrimaryKey(Namespace namespace) { + Map key = Maps.newHashMap(); + key.put(COL_IDENTIFIER, AttributeValue.builder().s(COL_IDENTIFIER_NAMESPACE).build()); + key.put(COL_NAMESPACE, AttributeValue.builder().s(namespace.toString()).build()); + return key; + } + + static Map tablePrimaryKey(TableIdentifier identifier) { + Map key = Maps.newHashMap(); + key.put(COL_IDENTIFIER, AttributeValue.builder().s(identifier.toString()).build()); + key.put(COL_NAMESPACE, AttributeValue.builder().s(identifier.namespace().toString()).build()); + return key; + } + + static void setNewCatalogEntryMetadata(Map values) { + String current = Long.toString(System.currentTimeMillis()); + values.put(COL_CREATED_AT, AttributeValue.builder().n(current).build()); + values.put(COL_UPDATED_AT, AttributeValue.builder().n(current).build()); + values.put(COL_VERSION, AttributeValue.builder().s(UUID.randomUUID().toString()).build()); + } + + static void updateCatalogEntryMetadata(List updateParts, Map attributeValues) { + updateParts.add(COL_UPDATED_AT + " = :uat"); + attributeValues.put(":uat", AttributeValue.builder().n(Long.toString(System.currentTimeMillis())).build()); + updateParts.add(COL_VERSION + " = :uv"); + attributeValues.put(":uv", AttributeValue.builder().s(UUID.randomUUID().toString()).build()); + } + + private FileIO initializeFileIO(Map properties) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + if (fileIOImpl == null) { + FileIO io = new S3FileIO(); + io.initialize(properties); + return io; + } else { + return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf); + } + } + + private String cleanWarehousePath(String path) { + Preconditions.checkArgument(path != null && path.length() > 0, + "Cannot initialize DynamoDbCatalog because warehousePath must not be null"); + int len = path.length(); + if (path.charAt(len - 1) == '/') { + return path.substring(0, len - 1); + } else { + return path; + } + } + + private void validateNamespace(Namespace namespace) { + for (String level : namespace.levels()) { + ValidationException.check(level != null && !level.isEmpty(), + "Namespace level must not be empty: %s", namespace); + ValidationException.check(!level.contains("."), + "Namespace level must not contain dot, but found %s in %s", level, namespace); + } + } + + private void validateTableIdentifier(TableIdentifier identifier) { + validateNamespace(identifier.namespace()); + ValidationException.check(identifier.hasNamespace(), + "Table namespace must not be empty: %s", identifier); + String tableName = identifier.name(); + ValidationException.check(!tableName.contains("."), + "Table name must not contain dot: %s", tableName); + } + + private boolean dynamoDbTableExists(String tableName) { + try { + dynamo.describeTable(DescribeTableRequest.builder() + .tableName(tableName) + .build()); + return true; + } catch (ResourceNotFoundException e) { + return false; + } + } + + private void ensureCatalogTableExistsOrCreate() { + + if (dynamoDbTableExists(awsProperties.dynamoDbTableName())) { + return; + } + + LOG.info("DynamoDb catalog table {} not found, trying to create", awsProperties.dynamoDbTableName()); + dynamo.createTable(CreateTableRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .keySchema( + KeySchemaElement.builder().attributeName(COL_IDENTIFIER).keyType(KeyType.HASH).build(), + KeySchemaElement.builder().attributeName(COL_NAMESPACE).keyType(KeyType.RANGE).build()) + .attributeDefinitions( + AttributeDefinition.builder().attributeName(COL_IDENTIFIER).attributeType(ScalarAttributeType.S).build(), + AttributeDefinition.builder().attributeName(COL_NAMESPACE).attributeType(ScalarAttributeType.S).build()) + .globalSecondaryIndexes(GlobalSecondaryIndex.builder() + .indexName(GSI_NAMESPACE_IDENTIFIER) + .keySchema( + KeySchemaElement.builder().attributeName(COL_NAMESPACE).keyType(KeyType.HASH).build(), + KeySchemaElement.builder().attributeName(COL_IDENTIFIER).keyType(KeyType.RANGE).build()) + .projection(Projection.builder().projectionType(ProjectionType.KEYS_ONLY).build()) + .build()) + .billingMode(BillingMode.PAY_PER_REQUEST) + .build()); + + Tasks.foreach(awsProperties.dynamoDbTableName()) + .retry(CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX) + .throwFailureWhenFinished() + .onlyRetryOn(IllegalStateException.class) + .run(this::checkTableActive); + } + + private void checkTableActive(String tableName) { + try { + DescribeTableResponse response = dynamo.describeTable(DescribeTableRequest.builder() + .tableName(tableName) + .build()); + TableStatus currentStatus = response.table().tableStatus(); + if (!currentStatus.equals(TableStatus.ACTIVE)) { + throw new IllegalStateException(String.format("Dynamo catalog table %s is not active, current status: %s", + tableName, currentStatus)); + } + } catch (ResourceNotFoundException e) { + throw new IllegalStateException(String.format("Cannot find Dynamo catalog table %s", tableName)); + } + } + + private boolean updateProperties(Namespace namespace, String updateExpression, + Map attributeValues, + Map attributeNames) { + validateNamespace(namespace); + Map key = namespacePrimaryKey(namespace); + try { + GetItemResponse response = dynamo.getItem(GetItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .consistentRead(true) + .key(key) + .build()); + + if (!response.hasItem()) { + throw new NoSuchNamespaceException("Cannot find namespace %s", namespace); + } + + attributeValues.put(":v", response.item().get(COL_VERSION)); + dynamo.updateItem(UpdateItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .key(key) + .conditionExpression(COL_VERSION + " = :v") + .updateExpression(updateExpression) + .expressionAttributeValues(attributeValues) + .expressionAttributeNames(attributeNames) + .build()); + return true; + } catch (ConditionalCheckFailedException e) { + return false; + } + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java new file mode 100644 index 000000000000..81157e91b3b2 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.dynamodb; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; + +class DynamoDbTableOperations extends BaseMetastoreTableOperations { + + private static final Logger LOG = LoggerFactory.getLogger(DynamoDbTableOperations.class); + + private final DynamoDbClient dynamo; + private final AwsProperties awsProperties; + private final TableIdentifier tableIdentifier; + private final String fullTableName; + private final FileIO fileIO; + + DynamoDbTableOperations( + DynamoDbClient dynamo, + AwsProperties awsProperties, + String catalogName, + FileIO fileIO, + TableIdentifier tableIdentifier) { + this.dynamo = dynamo; + this.awsProperties = awsProperties; + this.fullTableName = String.format("%s.%s", catalogName, tableIdentifier); + this.tableIdentifier = tableIdentifier; + this.fileIO = fileIO; + } + + @Override + protected String tableName() { + return fullTableName; + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + protected void doRefresh() { + String metadataLocation = null; + GetItemResponse table = dynamo.getItem(GetItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .consistentRead(true) + .key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)) + .build()); + if (table.hasItem()) { + metadataLocation = getMetadataLocation(table); + } else { + if (currentMetadataLocation() != null) { + throw new NoSuchTableException("Cannot find table %s after refresh, " + + "maybe another process deleted it or revoked your access permission", tableName()); + } + } + + refreshFromMetadataLocation(metadataLocation); + } + + @Override + protected void doCommit(TableMetadata base, TableMetadata metadata) { + String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); + CommitStatus commitStatus = CommitStatus.FAILURE; + Map tableKey = DynamoDbCatalog.tablePrimaryKey(tableIdentifier); + try { + GetItemResponse table = dynamo.getItem(GetItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .consistentRead(true) + .key(tableKey) + .build()); + checkMetadataLocation(table, base); + Map properties = prepareProperties(table, newMetadataLocation); + persistTable(tableKey, table, properties); + commitStatus = CommitStatus.SUCCESS; + } catch (ConditionalCheckFailedException e) { + throw new CommitFailedException(e, "Cannot commit %s: concurrent update detected", tableName()); + } catch (RuntimeException persistFailure) { + LOG.error("Confirming if commit to {} indeed failed to persist, attempting to reconnect and check.", + fullTableName, persistFailure); + commitStatus = checkCommitStatus(newMetadataLocation, metadata); + + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw new CommitFailedException(persistFailure, + "Cannot commit %s due to unexpected exception", tableName()); + case UNKNOWN: + throw new CommitStateUnknownException(persistFailure); + } + } finally { + try { + if (commitStatus == CommitStatus.FAILURE) { + // if anything went wrong, clean up the uncommitted metadata file + io().deleteFile(newMetadataLocation); + } + } catch (RuntimeException e) { + LOG.error("Fail to cleanup metadata file at {}", newMetadataLocation, e); + throw e; + } + } + } + + private void checkMetadataLocation(GetItemResponse table, TableMetadata base) { + String dynamoMetadataLocation = table.hasItem() ? getMetadataLocation(table) : null; + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + if (!Objects.equals(baseMetadataLocation, dynamoMetadataLocation)) { + throw new CommitFailedException( + "Cannot commit %s because base metadata location '%s' is not same as the current DynamoDb location '%s'", + tableName(), baseMetadataLocation, dynamoMetadataLocation); + } + } + + private String getMetadataLocation(GetItemResponse table) { + return table.item().get(DynamoDbCatalog.toPropertyCol(METADATA_LOCATION_PROP)).s(); + } + + private Map prepareProperties(GetItemResponse response, String newMetadataLocation) { + Map properties = response.hasItem() ? getProperties(response) : Maps.newHashMap(); + properties.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); + properties.put(METADATA_LOCATION_PROP, newMetadataLocation); + if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { + properties.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); + } + + return properties; + } + + private Map getProperties(GetItemResponse table) { + return table.item().entrySet().stream() + .filter(e -> DynamoDbCatalog.isProperty(e.getKey())) + .collect(Collectors.toMap(e -> DynamoDbCatalog.toPropertyKey(e.getKey()), e -> e.getValue().s())); + } + + void persistTable(Map tableKey, GetItemResponse table, Map parameters) { + if (table.hasItem()) { + LOG.debug("Committing existing DynamoDb catalog table: {}", tableName()); + List updateParts = Lists.newArrayList(); + Map attributeNames = Maps.newHashMap(); + Map attributeValues = Maps.newHashMap(); + int idx = 0; + for (Map.Entry property : parameters.entrySet()) { + String attributeValue = ":v" + idx; + String attributeKey = "#k" + idx; + idx++; + updateParts.add(attributeKey + " = " + attributeValue); + attributeNames.put(attributeKey, DynamoDbCatalog.toPropertyCol(property.getKey())); + attributeValues.put(attributeValue, AttributeValue.builder().s(property.getValue()).build()); + } + DynamoDbCatalog.updateCatalogEntryMetadata(updateParts, attributeValues); + String updateExpression = "SET " + DynamoDbCatalog.COMMA.join(updateParts); + attributeValues.put(":v", table.item().get(DynamoDbCatalog.COL_VERSION)); + dynamo.updateItem(UpdateItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .key(tableKey) + .conditionExpression(DynamoDbCatalog.COL_VERSION + " = :v") + .updateExpression(updateExpression) + .expressionAttributeValues(attributeValues) + .expressionAttributeNames(attributeNames) + .build()); + } else { + LOG.debug("Committing new DynamoDb catalog table: {}", tableName()); + Map values = Maps.newHashMap(tableKey); + parameters.forEach((k, v) -> values.put(DynamoDbCatalog.toPropertyCol(k), + AttributeValue.builder().s(v).build())); + DynamoDbCatalog.setNewCatalogEntryMetadata(values); + + dynamo.putItem(PutItemRequest.builder() + .tableName(awsProperties.dynamoDbTableName()) + .item(values) + .conditionExpression("attribute_not_exists(" + DynamoDbCatalog.COL_VERSION + ")") + .build()); + } + } +} From 8a7d4c07204ee34ac19800baeffc5b37ca90d56e Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 22 Jun 2021 01:48:41 -0700 Subject: [PATCH 2/2] fix spacing, add comment --- .../java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java index e4414d08d831..f9bef1514604 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java @@ -534,7 +534,6 @@ private boolean dynamoDbTableExists(String tableName) { } private void ensureCatalogTableExistsOrCreate() { - if (dynamoDbTableExists(awsProperties.dynamoDbTableName())) { return; } @@ -558,6 +557,7 @@ private void ensureCatalogTableExistsOrCreate() { .billingMode(BillingMode.PAY_PER_REQUEST) .build()); + // wait for the dynamo table to complete provisioning, which takes around 10 seconds Tasks.foreach(awsProperties.dynamoDbTableName()) .retry(CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX) .throwFailureWhenFinished()