Skip to content

Commit f81d8ad

Browse files
author
Jack Ye
authored
AWS: add DynamoDb catalog (#2688)
* AWS: add DynamoDb catalog * fix spacing, add comment
1 parent 8d16bad commit f81d8ad

File tree

4 files changed

+1139
-0
lines changed

4 files changed

+1139
-0
lines changed
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.aws.dynamodb;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.UUID;
25+
import java.util.concurrent.ForkJoinPool;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.IntStream;
28+
import org.apache.iceberg.AssertHelpers;
29+
import org.apache.iceberg.CatalogProperties;
30+
import org.apache.iceberg.Schema;
31+
import org.apache.iceberg.Table;
32+
import org.apache.iceberg.aws.AwsClientFactories;
33+
import org.apache.iceberg.aws.AwsClientFactory;
34+
import org.apache.iceberg.aws.AwsIntegTestUtil;
35+
import org.apache.iceberg.aws.AwsProperties;
36+
import org.apache.iceberg.catalog.Namespace;
37+
import org.apache.iceberg.catalog.TableIdentifier;
38+
import org.apache.iceberg.exceptions.AlreadyExistsException;
39+
import org.apache.iceberg.exceptions.NoSuchTableException;
40+
import org.apache.iceberg.exceptions.ValidationException;
41+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
42+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
43+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
44+
import org.apache.iceberg.types.Types;
45+
import org.junit.AfterClass;
46+
import org.junit.Assert;
47+
import org.junit.BeforeClass;
48+
import org.junit.Test;
49+
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
50+
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
51+
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
52+
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
53+
import software.amazon.awssdk.services.s3.S3Client;
54+
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
55+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
56+
57+
public class DynamoDbCatalogTest {
58+
59+
private static final ForkJoinPool POOL = new ForkJoinPool(16);
60+
private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "id", Types.StringType.get()));
61+
62+
private static String catalogTableName;
63+
private static DynamoDbClient dynamo;
64+
private static S3Client s3;
65+
private static DynamoDbCatalog catalog;
66+
private static String testBucket;
67+
68+
@BeforeClass
69+
public static void beforeClass() {
70+
catalogTableName = genRandomName();
71+
AwsClientFactory clientFactory = AwsClientFactories.defaultFactory();
72+
dynamo = clientFactory.dynamo();
73+
s3 = clientFactory.s3();
74+
catalog = new DynamoDbCatalog();
75+
testBucket = AwsIntegTestUtil.testBucketName();
76+
catalog.initialize("test", ImmutableMap.of(
77+
AwsProperties.DYNAMODB_TABLE_NAME, catalogTableName,
78+
CatalogProperties.WAREHOUSE_LOCATION, "s3://" + testBucket + "/" + genRandomName()));
79+
}
80+
81+
@AfterClass
82+
public static void afterClass() {
83+
dynamo.deleteTable(DeleteTableRequest.builder().tableName(catalogTableName).build());
84+
}
85+
86+
@Test
87+
public void testCreateNamespace() {
88+
Namespace namespace = Namespace.of(genRandomName());
89+
catalog.createNamespace(namespace);
90+
GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
91+
.tableName(catalogTableName)
92+
.key(DynamoDbCatalog.namespacePrimaryKey(namespace))
93+
.build());
94+
Assert.assertTrue("namespace must exist", response.hasItem());
95+
Assert.assertEquals("namespace must be stored in DynamoDB",
96+
namespace.toString(), response.item().get("namespace").s());
97+
98+
AssertHelpers.assertThrows("should not create duplicated namespace",
99+
AlreadyExistsException.class,
100+
"already exists",
101+
() -> catalog.createNamespace(namespace));
102+
}
103+
104+
@Test
105+
public void testCreateNamespaceBadName() {
106+
AssertHelpers.assertThrows("should not create namespace with empty level",
107+
ValidationException.class,
108+
"must not be empty",
109+
() -> catalog.createNamespace(Namespace.of("a", "", "b")));
110+
111+
AssertHelpers.assertThrows("should not create namespace with dot in level",
112+
ValidationException.class,
113+
"must not contain dot",
114+
() -> catalog.createNamespace(Namespace.of("a", "b.c")));
115+
}
116+
117+
@Test
118+
public void testListSubNamespaces() {
119+
Namespace parent = Namespace.of(genRandomName());
120+
List<Namespace> namespaceList = IntStream.range(0, 3)
121+
.mapToObj(i -> Namespace.of(parent.toString(), genRandomName()))
122+
.collect(Collectors.toList());
123+
catalog.createNamespace(parent);
124+
namespaceList.forEach(ns -> catalog.createNamespace(ns));
125+
Assert.assertEquals(4, catalog.listNamespaces(parent).size());
126+
}
127+
128+
@Test
129+
public void testNamespaceProperties() {
130+
Namespace namespace = Namespace.of(genRandomName());
131+
Map<String, String> properties = Maps.newHashMap();
132+
properties.put("key1", "val1");
133+
properties.put("key2", "val2");
134+
catalog.createNamespace(namespace, properties);
135+
Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));
136+
137+
properties.put("key3", "val3");
138+
properties.put("key2", "val2-1");
139+
catalog.setProperties(namespace, properties);
140+
Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));
141+
142+
properties.remove("key3");
143+
catalog.removeProperties(namespace, Sets.newHashSet("key3"));
144+
Assert.assertEquals(properties, catalog.loadNamespaceMetadata(namespace));
145+
}
146+
147+
@Test
148+
public void testCreateTable() {
149+
Namespace namespace = Namespace.of(genRandomName());
150+
catalog.createNamespace(namespace);
151+
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
152+
catalog.createTable(tableIdentifier, SCHEMA);
153+
GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
154+
.tableName(catalogTableName)
155+
.key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier))
156+
.build());
157+
Assert.assertTrue("table must exist", response.hasItem());
158+
Assert.assertEquals("table must be stored in DynamoDB with table identifier as partition key",
159+
tableIdentifier.toString(), response.item().get("identifier").s());
160+
Assert.assertEquals("table must be stored in DynamoDB with namespace as sort key",
161+
namespace.toString(), response.item().get("namespace").s());
162+
163+
AssertHelpers.assertThrows("should not create duplicated table",
164+
AlreadyExistsException.class,
165+
"already exists",
166+
() -> catalog.createTable(tableIdentifier, SCHEMA));
167+
}
168+
169+
@Test
170+
public void testCreateTableBadName() {
171+
Namespace namespace = Namespace.of(genRandomName());
172+
catalog.createNamespace(namespace);
173+
AssertHelpers.assertThrows("should not create table name with empty namespace",
174+
ValidationException.class,
175+
"Table namespace must not be empty",
176+
() -> catalog.createTable(TableIdentifier.of(Namespace.empty(), "a"), SCHEMA));
177+
178+
AssertHelpers.assertThrows("should not create table name with dot",
179+
ValidationException.class,
180+
"must not contain dot",
181+
() -> catalog.createTable(TableIdentifier.of(namespace, "a.b"), SCHEMA));
182+
}
183+
184+
@Test
185+
public void testListTable() {
186+
Namespace namespace = Namespace.of(genRandomName());
187+
catalog.createNamespace(namespace);
188+
List<TableIdentifier> tableIdentifiers = IntStream.range(0, 3)
189+
.mapToObj(i -> TableIdentifier.of(namespace, genRandomName()))
190+
.collect(Collectors.toList());
191+
tableIdentifiers.forEach(id -> catalog.createTable(id, SCHEMA));
192+
Assert.assertEquals(3, catalog.listTables(namespace).size());
193+
}
194+
195+
@Test
196+
public void testDropTable() {
197+
Namespace namespace = Namespace.of(genRandomName());
198+
catalog.createNamespace(namespace);
199+
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
200+
catalog.createTable(tableIdentifier, SCHEMA);
201+
String metadataLocation = dynamo.getItem(GetItemRequest.builder()
202+
.tableName(catalogTableName)
203+
.key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
204+
.item().get("p.metadata_location").s();
205+
catalog.dropTable(tableIdentifier, true);
206+
Assert.assertFalse("table entry should not exist in dynamo",
207+
dynamo.getItem(GetItemRequest.builder()
208+
.tableName(catalogTableName)
209+
.key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
210+
.hasItem());
211+
AssertHelpers.assertThrows("metadata location should be deleted",
212+
NoSuchKeyException.class,
213+
() -> s3.headObject(HeadObjectRequest.builder()
214+
.bucket(testBucket)
215+
.key(metadataLocation.substring(testBucket.length() + 6)) // s3:// + end slash
216+
.build()));
217+
}
218+
219+
@Test
220+
public void testRenameTable() {
221+
Namespace namespace = Namespace.of(genRandomName());
222+
catalog.createNamespace(namespace);
223+
Namespace namespace2 = Namespace.of(genRandomName());
224+
catalog.createNamespace(namespace2);
225+
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
226+
catalog.createTable(tableIdentifier, SCHEMA);
227+
TableIdentifier tableIdentifier2 = TableIdentifier.of(namespace2, genRandomName());
228+
229+
AssertHelpers.assertThrows("should not be able to rename a table not exist",
230+
NoSuchTableException.class,
231+
"does not exist",
232+
() -> catalog.renameTable(TableIdentifier.of(namespace, "a"), tableIdentifier2));
233+
234+
AssertHelpers.assertThrows("should not be able to rename an existing table",
235+
AlreadyExistsException.class,
236+
"already exists",
237+
() -> catalog.renameTable(tableIdentifier, tableIdentifier));
238+
239+
String metadataLocation = dynamo.getItem(GetItemRequest.builder()
240+
.tableName(catalogTableName)
241+
.key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier)).build())
242+
.item().get("p.metadata_location").s();
243+
244+
catalog.renameTable(tableIdentifier, tableIdentifier2);
245+
246+
String metadataLocation2 = dynamo.getItem(GetItemRequest.builder()
247+
.tableName(catalogTableName)
248+
.key(DynamoDbCatalog.tablePrimaryKey(tableIdentifier2)).build())
249+
.item().get("p.metadata_location").s();
250+
251+
Assert.assertEquals("metadata location should be copied to new table entry",
252+
metadataLocation, metadataLocation2);
253+
}
254+
255+
@Test
256+
public void testUpdateTable() {
257+
Namespace namespace = Namespace.of(genRandomName());
258+
catalog.createNamespace(namespace);
259+
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
260+
catalog.createTable(tableIdentifier, SCHEMA);
261+
Table table = catalog.loadTable(tableIdentifier);
262+
table.updateSchema().addColumn("data", Types.StringType.get()).commit();
263+
table.refresh();
264+
Assert.assertEquals(2, table.schema().columns().size());
265+
}
266+
267+
@Test
268+
public void testConcurrentCommits() throws Exception {
269+
Namespace namespace = Namespace.of(genRandomName());
270+
catalog.createNamespace(namespace);
271+
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, genRandomName());
272+
catalog.createTable(tableIdentifier, SCHEMA);
273+
Table table = catalog.loadTable(tableIdentifier);
274+
POOL.submit(() -> IntStream.range(0, 16).parallel()
275+
.forEach(i -> {
276+
try {
277+
table.updateSchema().addColumn(genRandomName(), Types.StringType.get()).commit();
278+
} catch (Exception e) {
279+
// ignore
280+
}
281+
})).get();
282+
283+
Assert.assertEquals(2, table.schema().columns().size());
284+
}
285+
286+
private static String genRandomName() {
287+
return UUID.randomUUID().toString().replace("-", "");
288+
}
289+
}

aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.io.Serializable;
2323
import java.util.Map;
24+
import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog;
2425
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2526
import org.apache.iceberg.util.PropertyUtil;
2627
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
@@ -130,6 +131,12 @@ public class AwsProperties implements Serializable {
130131
*/
131132
public static final String S3FILEIO_ACL = "s3.acl";
132133

134+
/**
135+
* DynamoDB table name for {@link DynamoDbCatalog}
136+
*/
137+
public static final String DYNAMODB_TABLE_NAME = "dynamodb.table-name";
138+
public static final String DYNAMODB_TABLE_NAME_DEFAULT = "iceberg";
139+
133140
/**
134141
* The implementation class of {@link AwsClientFactory} to customize AWS client configurations.
135142
* If set, all AWS clients will be initialized by the specified factory.
@@ -180,6 +187,8 @@ public class AwsProperties implements Serializable {
180187
private String glueCatalogId;
181188
private boolean glueCatalogSkipArchive;
182189

190+
private String dynamoDbTableName;
191+
183192
public AwsProperties() {
184193
this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
185194
this.s3FileIoSseKey = null;
@@ -193,6 +202,8 @@ public AwsProperties() {
193202

194203
this.glueCatalogId = null;
195204
this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
205+
206+
this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;
196207
}
197208

198209
public AwsProperties(Map<String, String> properties) {
@@ -236,6 +247,9 @@ public AwsProperties(Map<String, String> properties) {
236247
this.s3FileIoAcl = ObjectCannedACL.fromValue(aclType);
237248
Preconditions.checkArgument(s3FileIoAcl == null || !s3FileIoAcl.equals(ObjectCannedACL.UNKNOWN_TO_SDK_VERSION),
238249
"Cannot support S3 CannedACL " + aclType);
250+
251+
this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME,
252+
DYNAMODB_TABLE_NAME_DEFAULT);
239253
}
240254

241255
public String s3FileIoSseType() {
@@ -317,4 +331,12 @@ public ObjectCannedACL s3FileIoAcl() {
317331
public void setS3FileIoAcl(ObjectCannedACL acl) {
318332
this.s3FileIoAcl = acl;
319333
}
334+
335+
public String dynamoDbTableName() {
336+
return dynamoDbTableName;
337+
}
338+
339+
public void setDynamoDbTableName(String name) {
340+
this.dynamoDbTableName = name;
341+
}
320342
}

0 commit comments

Comments
 (0)