Skip to content

Commit

Permalink
Managed Iceberg dynamic destinations (apache#32565)
Browse files Browse the repository at this point in the history
* iceberg dynamic destinations

* update CHANGES

* spotless

* fix null error

* fix some tests

* fix some tests

* better row filter error message

* clarify keep-only-drop mutual exclusivity

* use constants for field names
  • Loading branch information
ahmedabu98 authored and reeba212 committed Dec 4, 2024
1 parent 273f8b4 commit 51c980b
Show file tree
Hide file tree
Showing 18 changed files with 652 additions and 157 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 1
}
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
## Highlights

* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
* [Managed Iceberg] Added support for streaming writes ([#32451](https://github.com/apache/beam/pull/32451))
* [Managed Iceberg] Added support for writing to dynamic destinations ([#32565](https://github.com/apache/beam/pull/32565))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@
*
* // this filter will exclusively keep these fields and drop everything else
* List<String> fields = Arrays.asList("foo", "bar", "baz");
* RowFilter keepingFilter = new RowFilter(beamSchema).keeping(fields);
* RowFilter keepFilter = new RowFilter(beamSchema).keep(fields);
*
* // this filter will drop these fields
* RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields);
* RowFilter dropFilter = new RowFilter(beamSchema).drop(fields);
*
* // this filter will only output the contents of row field "my_record"
* String field = "my_record";
* RowFilter onlyFilter = new RowFilter(beamSchema).only(field);
*
* // produces a filtered row
* Row outputRow = keepingFilter.filter(row);
* Row outputRow = keepFilter.filter(row);
* }</pre>
*
* Check the documentation for {@link #keeping(List)}, {@link #dropping(List)}, and {@link
* #only(String)} for further details on what an output Row can look like.
* Check the documentation for {@link #keep(List)}, {@link #drop(List)}, and {@link #only(String)}
* for further details on what an output Row can look like.
*/
public class RowFilter implements Serializable {
private final Schema rowSchema;
Expand Down Expand Up @@ -103,7 +103,7 @@ public RowFilter(Schema rowSchema) {
* nested_2: xyz
* }</pre>
*/
public RowFilter keeping(List<String> fields) {
public RowFilter keep(List<String> fields) {
checkUnconfigured();
verifyNoNestedFields(fields, "keep");
validateSchemaContainsFields(rowSchema, fields, "keep");
Expand Down Expand Up @@ -132,7 +132,7 @@ public RowFilter keeping(List<String> fields) {
* bar: 456
* }</pre>
*/
public RowFilter dropping(List<String> fields) {
public RowFilter drop(List<String> fields) {
checkUnconfigured();
verifyNoNestedFields(fields, "drop");
validateSchemaContainsFields(rowSchema, fields, "drop");
Expand Down Expand Up @@ -168,6 +168,7 @@ public RowFilter dropping(List<String> fields) {
*/
public RowFilter only(String field) {
checkUnconfigured();
verifyNoNestedFields(Collections.singletonList(field), "only");
validateSchemaContainsFields(rowSchema, Collections.singletonList(field), "only");
Schema.Field rowField = rowSchema.getField(field);
Preconditions.checkArgument(
Expand All @@ -184,8 +185,8 @@ public RowFilter only(String field) {

/**
* Performs a filter operation (keep or drop) on the input {@link Row}. Must have already
* configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this
* {@link RowFilter}.
* configured a filter operation with {@link #drop(List)} or {@link #keep(List)} for this {@link
* RowFilter}.
*
* <p>If not yet configured, will simply return the same {@link Row}.
*/
Expand All @@ -196,9 +197,9 @@ public Row filter(Row row) {

Preconditions.checkState(
row.getSchema().assignableTo(rowSchema),
"Encountered Row with schema that is incompatible with this RowFilter's schema."
"Encountered Row with schema that is incompatible with this filter's schema."
+ "\nRow schema: %s"
+ "\nSchema used to initialize this RowFilter: %s",
+ "\nSchema used to initialize this filter: %s",
row.getSchema(),
rowSchema);

Expand All @@ -219,8 +220,7 @@ public Schema outputSchema() {
private void checkUnconfigured() {
Preconditions.checkState(
transformedSchema == null,
"This RowFilter has already been configured to filter to the following Schema: %s",
transformedSchema);
"Invalid filter configuration: Please set only one of 'keep', 'drop', or 'only'.");
}

/** Verifies that this selection contains no nested fields. */
Expand All @@ -233,9 +233,7 @@ private void verifyNoNestedFields(List<String> fields, String operation) {
}
if (!nestedFields.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"RowFilter does not support specifying nested fields to %s: %s",
operation, nestedFields));
String.format("'%s' does not support nested fields: %s", operation, nestedFields));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
Expand Down Expand Up @@ -118,16 +119,17 @@ public RowStringInterpolator(String template, Schema rowSchema) {
* Performs string interpolation on the template using values from the input {@link Row} and its
* windowing metadata.
*/
public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) {
public String interpolate(ValueInSingleWindow<Row> element) {
String interpolated = this.template;
for (String field : fieldsToReplace) {
Object val;
Instant timestamp = element.getTimestamp();
switch (field) {
case WINDOW:
val = window.toString();
val = element.getWindow().toString();
break;
case PANE_INDEX:
val = paneInfo.getIndex();
val = element.getPane().getIndex();
break;
case YYYY:
val = timestamp.getChronology().year().get(timestamp.getMillis());
Expand All @@ -139,7 +141,7 @@ public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Inst
val = timestamp.getChronology().dayOfMonth().get(timestamp.getMillis());
break;
default:
val = MoreObjects.firstNonNull(getValue(row, field), "");
val = MoreObjects.firstNonNull(getValue(element.getValue(), field), "");
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,10 @@ public void testKeepSchemaFields() {
@Test
public void testDropNestedFieldsFails() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("RowFilter does not support specifying nested fields to drop");
thrown.expectMessage("'drop' does not support nested fields");

new RowFilter(ROW_SCHEMA)
.dropping(
.drop(
Arrays.asList(
"bool",
"nullable_int",
Expand All @@ -270,10 +270,10 @@ public void testDropNestedFieldsFails() {
@Test
public void testKeepNestedFieldsFails() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("RowFilter does not support specifying nested fields to keep");
thrown.expectMessage("'keep' does not support nested fields");

new RowFilter(ROW_SCHEMA)
.keeping(
.keep(
Arrays.asList("str", "arr_int", "row.nested_str", "row.nested_row.doubly_nested_str"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Rule;
Expand Down Expand Up @@ -68,7 +69,9 @@ public void testInvalidRowThrowsHelpfulError() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid row does not contain field 'str'.");

interpolator.interpolate(invalidRow, null, null, null);
interpolator.interpolate(
ValueInSingleWindow.of(
invalidRow, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}

@Test
Expand All @@ -85,7 +88,9 @@ public void testInvalidRowThrowsHelpfulErrorForNestedFields() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid row does not contain field 'nested_int'.");

interpolator.interpolate(invalidRow, null, null, null);
interpolator.interpolate(
ValueInSingleWindow.of(
invalidRow, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}

@Test
Expand All @@ -105,7 +110,9 @@ public void testInvalidRowThrowsHelpfulErrorForDoublyNestedFields() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid row does not contain field 'doubly_nested_int'.");

interpolator.interpolate(invalidRow, null, null, null);
interpolator.interpolate(
ValueInSingleWindow.of(
invalidRow, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}

private static final Row ROW =
Expand Down Expand Up @@ -134,7 +141,9 @@ public void testTopLevelInterpolation() {
String template = "foo {str}, bar {bool}, baz {int}, xyz {nullable_int}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

String output = interpolator.interpolate(ROW, null, null, null);
String output =
interpolator.interpolate(
ValueInSingleWindow.of(ROW, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));

assertEquals("foo str_value, bar true, baz 123, xyz ", output);
}
Expand All @@ -144,7 +153,9 @@ public void testNestedLevelInterpolation() {
String template = "foo {str}, bar {row.nested_str}, baz {row.nested_float}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

String output = interpolator.interpolate(ROW, null, null, null);
String output =
interpolator.interpolate(
ValueInSingleWindow.of(ROW, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));

assertEquals("foo str_value, bar nested_str_value, baz 1.234", output);
}
Expand All @@ -155,7 +166,9 @@ public void testDoublyNestedInterpolation() {
"foo {str}, bar {row.nested_row.doubly_nested_str}, baz {row.nested_row.doubly_nested_int}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

String output = interpolator.interpolate(ROW, null, null, null);
String output =
interpolator.interpolate(
ValueInSingleWindow.of(ROW, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));

assertEquals("foo str_value, bar doubly_nested_str_value, baz 789", output);
}
Expand All @@ -177,10 +190,11 @@ public void testInterpolateWindowingInformation() {

String output =
interpolator.interpolate(
ROW,
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0),
instant);
ValueInSingleWindow.of(
ROW,
instant,
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0)));
String expected =
String.format(
"str: str_value, window: %s, pane: 2, year: 2024, month: 8, day: 28",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA;
import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST;

import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;

/**
* Assigns the destination metadata for each input record.
Expand All @@ -32,7 +39,7 @@
*/
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<Row>> {

private DynamicDestinations dynamicDestinations;
private final DynamicDestinations dynamicDestinations;

public AssignDestinations(DynamicDestinations dynamicDestinations) {
this.dynamicDestinations = dynamicDestinations;
Expand All @@ -41,23 +48,30 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) {
@Override
public PCollection<Row> expand(PCollection<Row> input) {

final Schema inputSchema = input.getSchema();
final Schema outputSchema =
Schema.builder()
.addRowField("data", inputSchema)
.addRowField("dest", dynamicDestinations.getMetadataSchema())
.addStringField(DEST)
.addRowField(DATA, dynamicDestinations.getDataSchema())
.build();

return input
.apply(
ParDo.of(
new DoFn<Row, Row>() {
@ProcessElement
public void processElement(@Element Row data, OutputReceiver<Row> out) {
public void processElement(
@Element Row element,
BoundedWindow window,
PaneInfo paneInfo,
@Timestamp Instant timestamp,
OutputReceiver<Row> out) {
String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
Row data = dynamicDestinations.getData(element);

out.output(
Row.withSchema(outputSchema)
.addValues(data, dynamicDestinations.assignDestinationMetadata(data))
.build());
Row.withSchema(outputSchema).addValues(tableIdentifier, data).build());
}
}))
.setRowSchema(outputSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
import java.io.Serializable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.iceberg.catalog.TableIdentifier;

public interface DynamicDestinations extends Serializable {

Schema getMetadataSchema();
Schema getDataSchema();

Row assignDestinationMetadata(Row data);
Row getData(Row element);

IcebergDestination instantiateDestination(Row dest);
IcebergDestination instantiateDestination(String destination);

static DynamicDestinations singleTable(TableIdentifier tableId) {
return new OneTableDynamicDestinations(tableId);
String getTableStringIdentifier(ValueInSingleWindow<Row> element);

static DynamicDestinations singleTable(TableIdentifier tableId, Schema inputSchema) {
return new OneTableDynamicDestinations(tableId, inputSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,21 @@ public IcebergWriteResult expand(PCollection<Row> input) {
DynamicDestinations destinations = getDynamicDestinations();
if (destinations == null) {
destinations =
DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier()));
DynamicDestinations.singleTable(
Preconditions.checkNotNull(getTableIdentifier()), input.getSchema());
}

if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
// Assign destinations before re-windowing to global because
// user's dynamic destination may depend on windowing properties
PCollection<Row> assignedRows =
input.apply("Set Destination Metadata", new AssignDestinations(destinations));

if (assignedRows.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
Duration triggeringFrequency = getTriggeringFrequency();
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
input =
input.apply(
assignedRows =
assignedRows.apply(
"WindowIntoGlobal",
Window.<Row>into(new GlobalWindows())
.triggering(
Expand All @@ -138,11 +144,9 @@ public IcebergWriteResult expand(PCollection<Row> input) {
getTriggeringFrequency() == null,
"Triggering frequency is only applicable for streaming pipelines.");
}
return input
.apply("Set Destination Metadata", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
return assignedRows.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
}
}

Expand Down
Loading

0 comments on commit 51c980b

Please sign in to comment.