Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aws.dynamodb;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.aws.AwsIntegTestUtil;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;

public class DynamoDbCatalogTest {

private static final ForkJoinPool POOL = new ForkJoinPool(16);
private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "id", Types.StringType.get()));

private static String catalogTableName;
private static DynamoDbClient dynamo;
private static S3Client s3;
private static DynamoDbCatalog catalog;
private static String testBucket;

@BeforeClass
public static void beforeClass() {
catalogTableName = genRandomName();
AwsClientFactory clientFactory = AwsClientFactories.defaultFactory();
dynamo = clientFactory.dynamo();
s3 = clientFactory.s3();
catalog = new DynamoDbCatalog();
testBucket = AwsIntegTestUtil.testBucketName();
catalog.initialize("test", ImmutableMap.of(
AwsProperties.DYNAMODB_TABLE_NAME, catalogTableName,
CatalogProperties.WAREHOUSE_LOCATION, "s3://" + testBucket + "/" + genRandomName()));
}

@AfterClass
public static void afterClass() {
dynamo.deleteTable(DeleteTableRequest.builder().tableName(catalogTableName).build());
}

@Test
public void testCreateNamespace() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
.tableName(catalogTableName)
.key(DynamoDbCatalog.namespacePrimaryKey(namespace))
.build());
Assert.assertTrue("namespace must exist", response.hasItem());
Assert.assertEquals("namespace must be stored in DynamoDB",
namespace.toString(), response.item().get("namespace").s());

AssertHelpers.assertThrows("should not create duplicated namespace",
AlreadyExistsException.class,
"already exists",
() -> catalog.createNamespace(namespace));
}

@Test
public void testCreateNamespaceBadName() {
AssertHelpers.assertThrows("should not create namespace with empty level",
ValidationException.class,
"must not be empty",
() -> catalog.createNamespace(Namespace.of("a", "", "b")));

AssertHelpers.assertThrows("should not create namespace with dot in level",
ValidationException.class,
"must not contain dot",
() -> catalog.createNamespace(Namespace.of("a", "b.c")));
}

@Test
public void testListSubNamespaces() {
Namespace parent = Namespace.of(genRandomName());
List<Namespace> namespaceList = IntStream.range(0, 3)
.mapToObj(i -> Namespace.of(parent.toString(), genRandomName()))
.collect(Collectors.toList());
catalog.createNamespace(parent);
namespaceList.forEach(ns -> catalog.createNamespace(ns));
Assert.assertEquals(4, catalog.listNamespaces(parent).size());
}

@Test
public void testNamespaceProperties() {
Namespace namespace = Namespace.of(genRandomName());
Map<String, String> properties = Maps.newHashMap();
properties.put("key1", "val1");
properties.put("key2", "val2");
catalog.createNamespace(namespace, properties);
Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));

properties.put("key3", "val3");
properties.put("key2", "val2-1");
catalog.setProperties(namespace, properties);
Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));

properties.remove("key3");
catalog.removeProperties(namespace, Sets.newHashSet("key3"));
Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));
}

@Test
public void testCreateTable() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
catalog.createTable(tableIdentifier, SCHEMA);
GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
.tableName(catalogTableName)
.key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier))
.build());
Assert.assertTrue("table must exist", response.hasItem());
Assert.assertEquals("table must be stored in DynamoDB with table identifier as partition key",
tableIdentifier.toString(), response.item().get("identifier").s());
Assert.assertEquals("table must be stored in DynamoDB with namespace as sort key",
namespace.toString(), response.item().get("namespace").s());

AssertHelpers.assertThrows("should not create duplicated table",
AlreadyExistsException.class,
"already exists",
() -> catalog.createTable(tableIdentifier, SCHEMA));
}

@Test
public void testCreateTableBadName() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
AssertHelpers.assertThrows("should not create table name with empty namespace",
ValidationException.class,
"Table namespace must not be empty",
() -> catalog.createTable(TableIdentifier.of(Namespace.empty(), "a"), SCHEMA));

AssertHelpers.assertThrows("should not create table name with dot",
ValidationException.class,
"must not contain dot",
() -> catalog.createTable(TableIdentifier.of(namespace, "a.b"), SCHEMA));
}

@Test
public void testListTable() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
List<TableIdentifier> tableIdentifiers = IntStream.range(0, 3)
.mapToObj(i -> TableIdentifier.of(namespace, genRandomName()))
.collect(Collectors.toList());
tableIdentifiers.forEach(id -> catalog.createTable(id, SCHEMA));
Assert.assertEquals(3, catalog.listTables(namespace).size());
}

@Test
public void testDropTable() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
catalog.createTable(tableIdentifier, SCHEMA);
String metadataLocation = dynamo.getItem(GetItemRequest.builder()
.tableName(catalogTableName)
.key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
.item().get("p.metadata_location").s();
catalog.dropTable(tableIdentifier, true);
Assert.assertFalse("table entry should not exist in dynamo",
dynamo.getItem(GetItemRequest.builder()
.tableName(catalogTableName)
.key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
.hasItem());
AssertHelpers.assertThrows("metadata location should be deleted",
NoSuchKeyException.class,
() -> s3.headObject(HeadObjectRequest.builder()
.bucket(testBucket)
.key(metadataLocation.substring(testBucket.length() + 6)) // s3:// + end slash
.build()));
}

@Test
public void testRenameTable() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
Namespace namespace2 = Namespace.of(genRandomName());
catalog.createNamespace(namespace2);
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
catalog.createTable(tableIdentifier, SCHEMA);
TableIdentifier tableIdentifier2 = TableIdentifier.of(namespace2, genRandomName());

AssertHelpers.assertThrows("should not be able to rename a table not exist",
NoSuchTableException.class,
"does not exist",
() -> catalog.renameTable(TableIdentifier.of(namespace, "a"), tableIdentifier2));

AssertHelpers.assertThrows("should not be able to rename an existing table",
AlreadyExistsException.class,
"already exists",
() -> catalog.renameTable(tableIdentifier, tableIdentifier));

String metadataLocation = dynamo.getItem(GetItemRequest.builder()
.tableName(catalogTableName)
.key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
.item().get("p.metadata_location").s();

catalog.renameTable(tableIdentifier, tableIdentifier2);

String metadataLocation2 = dynamo.getItem(GetItemRequest.builder()
.tableName(catalogTableName)
.key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier2)).build())
.item().get("p.metadata_location").s();

Assert.assertEquals("metadata location should be copied to new table entry",
metadataLocation, metadataLocation2);
}

@Test
public void testUpdateTable() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
catalog.createTable(tableIdentifier, SCHEMA);
Table table = catalog.loadTable(tableIdentifier);
table.updateSchema().addColumn("data", Types.StringType.get()).commit();
table.refresh();
Assert.assertEquals(2, table.schema().columns().size());
}

@Test
public void testConcurrentCommits() throws Exception {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
catalog.createTable(tableIdentifier, SCHEMA);
Table table = catalog.loadTable(tableIdentifier);
POOL.submit(() -> IntStream.range(0, 16).parallel()
.forEach(i -> {
try {
table.updateSchema().addColumn(genRandomName(), Types.StringType.get()).commit();
} catch (Exception e) {
// ignore
}
})).get();

Assert.assertEquals(2, table.schema().columns().size());
}

private static String genRandomName() {
return UUID.randomUUID().toString().replace("-", "");
}
}
22 changes: 22 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.Serializable;
import java.util.Map;
import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.PropertyUtil;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
Expand Down Expand Up @@ -130,6 +131,12 @@ public class AwsProperties implements Serializable {
*/
public static final String S3FILEIO_ACL = "s3.acl";

/**
* DynamoDB table name for {@link DynamoDbCatalog}
*/
public static final String DYNAMODB_TABLE_NAME = "dynamodb.table-name";
public static final String DYNAMODB_TABLE_NAME_DEFAULT = "iceberg";

/**
* The implementation class of {@link AwsClientFactory} to customize AWS client configurations.
* If set, all AWS clients will be initialized by the specified factory.
Expand Down Expand Up @@ -180,6 +187,8 @@ public class AwsProperties implements Serializable {
private String glueCatalogId;
private boolean glueCatalogSkipArchive;

private String dynamoDbTableName;

public AwsProperties() {
this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
this.s3FileIoSseKey = null;
Expand All @@ -193,6 +202,8 @@ public AwsProperties() {

this.glueCatalogId = null;
this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;

this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;
}

public AwsProperties(Map<String, String> properties) {
Expand Down Expand Up @@ -236,6 +247,9 @@ public AwsProperties(Map<String, String> properties) {
this.s3FileIoAcl = ObjectCannedACL.fromValue(aclType);
Preconditions.checkArgument(s3FileIoAcl == null || !s3FileIoAcl.equals(ObjectCannedACL.UNKNOWN_TO_SDK_VERSION),
"Cannot support S3 CannedACL " + aclType);

this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME,
DYNAMODB_TABLE_NAME_DEFAULT);
}

public String s3FileIoSseType() {
Expand Down Expand Up @@ -317,4 +331,12 @@ public ObjectCannedACL s3FileIoAcl() {
public void setS3FileIoAcl(ObjectCannedACL acl) {
this.s3FileIoAcl = acl;
}

public String dynamoDbTableName() {
return dynamoDbTableName;
}

public void setDynamoDbTableName(String name) {
this.dynamoDbTableName = name;
}
}
Loading