diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index bbdc3a3910ef8..3f63c0c9975f2 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3 + "modification": 2 } diff --git a/CHANGES.md b/CHANGES.md index 364b1a5fbdef5..fcb02d1d996af 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686)) ## New Features / Improvements diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 5b63803a52d02..d12f8914a3388 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -34,6 +34,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; @@ -106,6 +107,14 @@ *

Additional configuration options are provided in the `Pre-filtering Options` section below, * for Iceberg writes. * + *

Creating Tables

+ * + *

If an Iceberg table does not exist at the time of writing, this connector will automatically + * create one with the data's schema. + * + *

Note that this is a best-effort operation that depends on the {@link Catalog} implementation. + * Some implementations may not support creating a table using the Iceberg API. + * *

Beam Rows

* *

Being a Managed transform, this IO exclusively writes and reads using Beam {@link Row}s. diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 055c8882b72cb..396db7c20f360 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -25,7 +25,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.Row; @@ -41,7 +43,13 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions. @@ -66,6 +74,7 @@ * #getSerializableDataFiles()}. */ class RecordWriterManager implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class); /** * Represents the state of one Iceberg table destination. Creates one {@link RecordWriter} per * partition and manages them in a {@link Cache}. @@ -79,8 +88,8 @@ class DestinationState { private final PartitionKey partitionKey; private final Table table; private final String stateToken = UUID.randomUUID().toString(); + final Cache writers; private final List dataFiles = Lists.newArrayList(); - @VisibleForTesting final Cache writers; @VisibleForTesting final Map writerCounts = Maps.newHashMap(); DestinationState(IcebergDestination icebergDestination, Table table) { @@ -186,6 +195,8 @@ private RecordWriter createWriter(PartitionKey partitionKey) { private final Map, List> totalSerializableDataFiles = Maps.newHashMap(); + private static final Cache TABLE_CACHE = + CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); private boolean isClosed = false; @@ -196,6 +207,40 @@ private RecordWriter createWriter(PartitionKey partitionKey) { this.maxNumWriters = maxNumWriters; } + /** + * Returns an Iceberg {@link Table}. + * + *

First attempts to fetch the table from the {@link #TABLE_CACHE}. If it's not there, we + * attempt to load it using the Iceberg API. If the table doesn't exist at all, we attempt to + * create it, inferring the table schema from the record schema. + * + *

Note that this is a best-effort operation that depends on the {@link Catalog} + * implementation. Although it is expected, some implementations may not support creating a table + * using the Iceberg API. + */ + private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) { + @Nullable Table table = TABLE_CACHE.getIfPresent(identifier); + if (table == null) { + try { + table = catalog.loadTable(identifier); + } catch (NoSuchTableException e) { + try { + org.apache.iceberg.Schema tableSchema = + IcebergUtils.beamSchemaToIcebergSchema(dataSchema); + // TODO(ahmedabu98): support creating a table with a specified partition spec + table = catalog.createTable(identifier, tableSchema); + LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema); + } catch (AlreadyExistsException alreadyExistsException) { + // handle race condition where workers are concurrently creating the same table. + // if running into already exists exception, we perform one last load + table = catalog.loadTable(identifier); + } + } + TABLE_CACHE.put(identifier, table); + } + return table; + } + /** * Fetches the appropriate {@link RecordWriter} for this destination and partition and writes the * record. @@ -208,7 +253,16 @@ public boolean write(WindowedValue icebergDestination, Row r destinations.computeIfAbsent( icebergDestination, destination -> { - Table table = catalog.loadTable(destination.getValue().getTableIdentifier()); + TableIdentifier identifier = destination.getValue().getTableIdentifier(); + Table table; + try { + table = + TABLE_CACHE.get( + identifier, () -> getOrCreateTable(identifier, row.getSchema())); + } catch (ExecutionException e) { + throw new RuntimeException( + "Error while fetching or creating table: " + identifier, e); + } return new DestinationState(destination.getValue(), table); }); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 84f2146275f03..a5c034ac901ad 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.io.Serializable; @@ -290,14 +291,16 @@ public void testRead() throws Exception { */ @Test public void testWrite() { - Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA); - // Write with Beam + // Expect the sink to create the table Map config = managedIcebergConfig(tableId); PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); pipeline.run().waitUntilFinish(); + Table table = catalog.loadTable(TableIdentifier.parse(tableId)); + assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA)); + // Read back and check records are correct List returnedRecords = readRecords(table); assertThat( @@ -434,22 +437,23 @@ private void writeToDynamicDestinations( Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema()); - PartitionSpec partitionSpec = null; + TableIdentifier tableIdentifier0 = TableIdentifier.parse(tableId + "_0_a"); + TableIdentifier tableIdentifier1 = TableIdentifier.parse(tableId + "_1_b"); + TableIdentifier tableIdentifier2 = TableIdentifier.parse(tableId + "_2_c"); + TableIdentifier tableIdentifier3 = TableIdentifier.parse(tableId + "_3_d"); + TableIdentifier tableIdentifier4 = TableIdentifier.parse(tableId + "_4_e"); + // the sink doesn't support creating partitioned tables yet, + // so we need to create it manually for this test case if (partitioning) { Preconditions.checkState(filterOp == null || !filterOp.equals("only")); - partitionSpec = + PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema).identity("bool").identity("modulo_5").build(); + catalog.createTable(tableIdentifier0, tableSchema, partitionSpec); + catalog.createTable(tableIdentifier1, tableSchema, partitionSpec); + catalog.createTable(tableIdentifier2, tableSchema, partitionSpec); + catalog.createTable(tableIdentifier3, tableSchema, partitionSpec); + catalog.createTable(tableIdentifier4, tableSchema, partitionSpec); } - Table table0 = - catalog.createTable(TableIdentifier.parse(tableId + "_0_a"), tableSchema, partitionSpec); - Table table1 = - catalog.createTable(TableIdentifier.parse(tableId + "_1_b"), tableSchema, partitionSpec); - Table table2 = - catalog.createTable(TableIdentifier.parse(tableId + "_2_c"), tableSchema, partitionSpec); - Table table3 = - catalog.createTable(TableIdentifier.parse(tableId + "_3_d"), tableSchema, partitionSpec); - Table table4 = - catalog.createTable(TableIdentifier.parse(tableId + "_4_e"), tableSchema, partitionSpec); // Write with Beam PCollection input; @@ -467,6 +471,16 @@ private void writeToDynamicDestinations( input.setRowSchema(BEAM_SCHEMA).apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig)); pipeline.run().waitUntilFinish(); + Table table0 = catalog.loadTable(tableIdentifier0); + Table table1 = catalog.loadTable(tableIdentifier1); + Table table2 = catalog.loadTable(tableIdentifier2); + Table table3 = catalog.loadTable(tableIdentifier3); + Table table4 = catalog.loadTable(tableIdentifier4); + + for (Table t : Arrays.asList(table0, table1, table2, table3, table4)) { + assertTrue(t.schema().sameSchema(tableSchema)); + } + // Read back and check records are correct List> returnedRecords = Arrays.asList( diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index e62c22be79685..87a543a439ec0 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -80,9 +80,6 @@ public void testSimpleAppend() throws Exception { TableIdentifier tableId = TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); - // Create a table and add records to it. - Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); - Map catalogProps = ImmutableMap.builder() .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) @@ -104,6 +101,7 @@ public void testSimpleAppend() throws Exception { testPipeline.run().waitUntilFinish(); LOG.info("Done running pipeline"); + Table table = warehouse.loadTable(tableId); List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); @@ -117,11 +115,6 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception { final TableIdentifier table2Id = TableIdentifier.of("default", "table2-" + salt); final TableIdentifier table3Id = TableIdentifier.of("default", "table3-" + salt); - // Create a table and add records to it. - Table table1 = warehouse.createTable(table1Id, TestFixtures.SCHEMA); - Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA); - Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA); - Map catalogProps = ImmutableMap.builder() .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) @@ -177,6 +170,10 @@ public IcebergDestination instantiateDestination(String dest) { testPipeline.run().waitUntilFinish(); LOG.info("Done running pipeline"); + Table table1 = warehouse.loadTable(table1Id); + Table table2 = warehouse.loadTable(table2Id); + Table table3 = warehouse.loadTable(table3Id); + List writtenRecords1 = ImmutableList.copyOf(IcebergGenerics.read(table1).build()); List writtenRecords2 = ImmutableList.copyOf(IcebergGenerics.read(table2).build()); List writtenRecords3 = ImmutableList.copyOf(IcebergGenerics.read(table3).build()); @@ -320,9 +317,6 @@ public void testStreamingWrite() { TableIdentifier.of( "default", "streaming_" + Long.toString(UUID.randomUUID().hashCode(), 16)); - // Create a table and add records to it. - Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); - Map catalogProps = ImmutableMap.builder() .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) @@ -365,6 +359,8 @@ public void testStreamingWrite() { PAssert.that(snapshots).containsInAnyOrder(2L); testPipeline.run().waitUntilFinish(); + Table table = warehouse.loadTable(tableId); + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 3196d303239f3..47dc9aa425dd8 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -95,11 +95,6 @@ public void testBuildTransformWithRow() { public void testSimpleAppend() { String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); - TableIdentifier tableId = TableIdentifier.parse(identifier); - - // Create a table and add records to it. - Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); - Map properties = new HashMap<>(); properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); properties.put("warehouse", warehouse.location); @@ -129,6 +124,9 @@ public void testSimpleAppend() { testPipeline.run().waitUntilFinish(); + TableIdentifier tableId = TableIdentifier.parse(identifier); + Table table = warehouse.loadTable(tableId); + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); @@ -137,7 +135,6 @@ public void testSimpleAppend() { @Test public void testWriteUsingManagedTransform() { String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); - Table table = warehouse.createTable(TableIdentifier.parse(identifier), TestFixtures.SCHEMA); String yamlConfig = String.format( @@ -161,6 +158,7 @@ public void testWriteUsingManagedTransform() { testPipeline.run().waitUntilFinish(); + Table table = warehouse.loadTable(TableIdentifier.parse(identifier)); List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); } @@ -261,9 +259,6 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(filter.outputSchema()); - Table table0 = warehouse.createTable(TableIdentifier.parse(identifier0), icebergSchema); - Table table1 = warehouse.createTable(TableIdentifier.parse(identifier1), icebergSchema); - Table table2 = warehouse.createTable(TableIdentifier.parse(identifier2), icebergSchema); TestStream stream = TestStream.create(beamSchema) @@ -301,6 +296,9 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo testPipeline.run().waitUntilFinish(); + Table table0 = warehouse.loadTable(TableIdentifier.parse(identifier0)); + Table table1 = warehouse.loadTable(TableIdentifier.parse(identifier1)); + Table table2 = warehouse.loadTable(TableIdentifier.parse(identifier2)); List table0Records = ImmutableList.copyOf(IcebergGenerics.read(table0).build()); List table1Records = ImmutableList.copyOf(IcebergGenerics.read(table1).build()); List table2Records = ImmutableList.copyOf(IcebergGenerics.read(table2).build()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java index ad4fc6b382d4c..1e1c84d31de91 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java @@ -149,4 +149,8 @@ public Table createTable( someTableHasBeenCreated = true; return catalog.createTable(tableId, schema, partitionSpec); } + + public Table loadTable(TableIdentifier tableId) { + return catalog.loadTable(tableId); + } }