Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery Storage Write API SchemaTransform wrapper for Python SDK #24783

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -450,16 +450,7 @@ tasks.register("pythonFormatterPreCommit") {

tasks.register("python37PostCommit") {
dependsOn(":sdks:python:test-suites:dataflow:py37:postCommitIT")
dependsOn(":sdks:python:test-suites:direct:py37:postCommitIT")
dependsOn(":sdks:python:test-suites:direct:py37:directRunnerIT")
dependsOn(":sdks:python:test-suites:direct:py37:hdfsIntegrationTest")
dependsOn(":sdks:python:test-suites:direct:py37:azureIntegrationTest")
dependsOn(":sdks:python:test-suites:direct:py37:mongodbioIT")
dependsOn(":sdks:python:test-suites:portable:py37:postCommitPy37")
dependsOn(":sdks:python:test-suites:dataflow:py37:spannerioIT")
dependsOn(":sdks:python:test-suites:direct:py37:spannerioIT")
dependsOn(":sdks:python:test-suites:portable:py37:xlangSpannerIOIT")
dependsOn(":sdks:python:test-suites:direct:py37:inferencePostCommitIT")
}

tasks.register("python38PostCommit") {
Expand All @@ -483,8 +474,6 @@ tasks.register("python39PostCommit") {

tasks.register("python310PostCommit") {
dependsOn(":sdks:python:test-suites:dataflow:py310:postCommitIT")
dependsOn(":sdks:python:test-suites:direct:py310:postCommitIT")
dependsOn(":sdks:python:test-suites:direct:py310:hdfsIntegrationTest")
dependsOn(":sdks:python:test-suites:portable:py310:postCommitPy310")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class BeamRowToStorageApiProto {
.put(
SqlTypes.DATETIME.getIdentifier(),
(logicalType, value) ->
CivilTimeEncoder.encodePacked64DatetimeSeconds((LocalDateTime) value))
CivilTimeEncoder.encodePacked64DatetimeMicros((LocalDateTime) value))
.put(
SqlTypes.TIMESTAMP.getIdentifier(),
(logicalType, value) -> (ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -385,7 +386,7 @@ private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
FieldType type = schemaField.getType();

TableFieldSchema field = new TableFieldSchema().setName(schemaField.getName());
if (schemaField.getDescription() != null && !"".equals(schemaField.getDescription())) {
if (!Strings.isNullOrEmpty(schemaField.getDescription())) {
field.setDescription(schemaField.getDescription());
}

Expand Down Expand Up @@ -512,7 +513,7 @@ public static TableRow convertGenericRecordToTableRow(
return BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
}

/** Convert a BigQuery TableRow to a Beam Row. */
/** Convert a Beam Row to a BigQuery TableRow. */
public static TableRow toTableRow(Row row) {
TableRow output = new TableRow();
for (int i = 0; i < row.getFieldCount(); i++) {
Expand Down Expand Up @@ -686,6 +687,14 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
if (JSON_VALUE_PARSERS.containsKey(fieldType.getTypeName())) {
return JSON_VALUE_PARSERS.get(fieldType.getTypeName()).apply(jsonBQString);
} else if (fieldType.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
// Handle if datetime value is in micros
chamikaramj marked this conversation as resolved.
Show resolved Hide resolved
try {
Long value = Long.parseLong(jsonBQString);
return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(value);
} catch (NumberFormatException e) {
// This means value is not represented by a number, so we swallow and handle it as a
// String
}
return LocalDateTime.parse(jsonBQString, BIGQUERY_DATETIME_FORMATTER);
} else if (fieldType.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return LocalDate.parse(jsonBQString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -51,7 +52,7 @@
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
Expand All @@ -74,7 +75,8 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
extends TypedSchemaTransformProvider<BigQueryStorageWriteApiSchemaTransformConfiguration> {
private static final Duration DEFAULT_TRIGGERING_FREQUENCY = Duration.standardSeconds(5);
private static final String INPUT_ROWS_TAG = "input";
private static final String OUTPUT_ERRORS_TAG = "errors";
private static final String FAILED_ROWS_TAG = "failed_rows";
private static final String FAILED_ROWS_WITH_ERRORS_TAG = "failed_rows_with_errors";

@Override
protected Class<BigQueryStorageWriteApiSchemaTransformConfiguration> configurationClass() {
Expand All @@ -99,7 +101,7 @@ public List<String> inputCollectionNames() {

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_ERRORS_TAG);
return Arrays.asList(FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG);
}

/** Configuration for writing to BigQuery with Storage Write API. */
Expand Down Expand Up @@ -130,17 +132,19 @@ public void validate() {

// validate create and write dispositions
if (!Strings.isNullOrEmpty(this.getCreateDisposition())) {
checkArgument(
CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()) != null,
checkNotNull(
CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()),
invalidConfigMessage
+ "Invalid create disposition was specified. Available dispositions are: ",
+ "Invalid create disposition (%s) was specified. Available dispositions are: %s",
this.getCreateDisposition(),
CREATE_DISPOSITIONS.keySet());
}
if (!Strings.isNullOrEmpty(this.getWriteDisposition())) {
checkNotNull(
WRITE_DISPOSITIONS.get(this.getWriteDisposition().toUpperCase()),
invalidConfigMessage
+ "Invalid write disposition was specified. Available dispositions are: ",
+ "Invalid write disposition (%s) was specified. Available dispositions are: %s",
this.getWriteDisposition(),
WRITE_DISPOSITIONS.keySet());
}
}
Expand Down Expand Up @@ -229,7 +233,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> inputRows = input.get(INPUT_ROWS_TAG);

BigQueryIO.Write<Row> write = createStorageWriteApiTransform();

if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
Long triggeringFrequency = configuration.getTriggeringFrequencySeconds();
write =
Expand All @@ -240,30 +243,45 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
? DEFAULT_TRIGGERING_FREQUENCY
: Duration.standardSeconds(triggeringFrequency));
}

WriteResult result = inputRows.apply(write);

Schema rowSchema = inputRows.getSchema();
Schema errorSchema =
Schema.of(
Field.of("failed_row", FieldType.STRING),
Field.of("failed_row", FieldType.row(rowSchema)),
Field.of("error_message", FieldType.STRING));

// Errors consisting of failed rows along with their error message
PCollection<Row> errorRows =
// Failed rows
PCollection<Row> failedRows =
result
.getFailedStorageApiInserts()
.apply(
"Construct Failed Rows",
MapElements.into(TypeDescriptors.rows())
.via(
(storageError) ->
BigQueryUtils.toBeamRow(rowSchema, storageError.getRow())))
.setRowSchema(rowSchema);

// Failed rows along with their corresponding error messages
PCollection<Row> failedRowsWithErrors =
result
.getFailedStorageApiInserts()
.apply(
"Extract Errors",
MapElements.into(TypeDescriptor.of(Row.class))
"Construct Failed Rows and Errors",
MapElements.into(TypeDescriptors.rows())
.via(
(storageError) ->
Row.withSchema(errorSchema)
.withFieldValue("error_message", storageError.getErrorMessage())
.withFieldValue("failed_row", storageError.getRow().toString())
.withFieldValue(
"failed_row",
BigQueryUtils.toBeamRow(rowSchema, storageError.getRow()))
.build()))
.setRowSchema(errorSchema);

return PCollectionRowTuple.of(OUTPUT_ERRORS_TAG, errorRows);
return PCollectionRowTuple.of(FAILED_ROWS_TAG, failedRows)
.and(FAILED_ROWS_WITH_ERRORS_TAG, failedRowsWithErrors);
}

BigQueryIO.Write<Row> createStorageWriteApiTransform() {
Expand All @@ -283,13 +301,13 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform() {
if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get(
configuration.getCreateDisposition());
configuration.getCreateDisposition().toUpperCase());
write = write.withCreateDisposition(createDisposition);
}
if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
WriteDisposition writeDisposition =
BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(
configuration.getWriteDisposition());
configuration.getWriteDisposition().toUpperCase());
write = write.withWriteDisposition(writeDisposition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public class BeamRowToStorageApiProtoTest {
BASE_ROW.getLogicalTypeValue("sqlTimeValue", LocalTime.class)))
.put(
"sqldatetimevalue",
CivilTimeEncoder.encodePacked64DatetimeSeconds(
CivilTimeEncoder.encodePacked64DatetimeMicros(
BASE_ROW.getLogicalTypeValue("sqlDatetimeValue", LocalDateTime.class)))
.put(
"sqltimestampvalue",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery.providers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.api.services.bigquery.model.TableRow;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiPCollectionRowTupleTransform;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
Expand All @@ -34,6 +38,7 @@
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -70,12 +75,12 @@ public class BigQueryStorageWriteApiSchemaTransformProviderTest {
Row.withSchema(SCHEMA)
.withFieldValue("name", "b")
.withFieldValue("number", 2L)
.withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00"))
.withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00.123"))
.build(),
Row.withSchema(SCHEMA)
.withFieldValue("name", "c")
.withFieldValue("number", 3L)
.withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00"))
.withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00.123456"))
.build());

@Rule public final transient TestPipeline p = TestPipeline.create();
Expand Down Expand Up @@ -107,7 +112,7 @@ public void testInvalidConfig() {
}

public PCollectionRowTuple runWithConfig(
BigQueryStorageWriteApiSchemaTransformConfiguration config) {
BigQueryStorageWriteApiSchemaTransformConfiguration config, List<Row> rows) {
BigQueryStorageWriteApiSchemaTransformProvider provider =
new BigQueryStorageWriteApiSchemaTransformProvider();

Expand All @@ -118,25 +123,82 @@ public PCollectionRowTuple runWithConfig(
writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
String tag = provider.inputCollectionNames().get(0);

PCollection<Row> rows = p.apply(Create.of(ROWS).withRowSchema(SCHEMA));
PCollection<Row> rowPc = p.apply(Create.of(rows).withRowSchema(SCHEMA));

PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
PCollectionRowTuple input = PCollectionRowTuple.of(tag, rowPc);
PCollectionRowTuple result = input.apply(writeRowTupleTransform);

return result;
}

public Boolean rowsEquals(List<Row> expectedRows, List<TableRow> actualRows) {
if (expectedRows.size() != actualRows.size()) {
return false;
}
for (int i = 0; i < expectedRows.size(); i++) {
TableRow actualRow = actualRows.get(i);
Row expectedRow = expectedRows.get(Integer.parseInt(actualRow.get("number").toString()) - 1);

if (!expectedRow.getValue("name").equals(actualRow.get("name"))
|| !expectedRow
.getValue("number")
.equals(Long.parseLong(actualRow.get("number").toString()))) {
return false;
}
}
return true;
}

@Test
public void testSimpleWrite() throws Exception {
String tableSpec = "project:dataset.simple_write";
BigQueryStorageWriteApiSchemaTransformConfiguration config =
BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();

runWithConfig(config);
runWithConfig(config, ROWS);
p.run().waitUntilFinish();

assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec)));
assertTrue(
rowsEquals(ROWS, fakeDatasetService.getAllRows("project", "dataset", "simple_write")));
}

@Test
public void testFailedRows() throws Exception {
String tableSpec = "project:dataset.write_with_fail";
BigQueryStorageWriteApiSchemaTransformConfiguration config =
BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();

String failValue = "fail_me";

List<Row> expectedSuccessfulRows = new ArrayList<>(ROWS);
List<Row> expectedFailedRows = new ArrayList<>();
for (long l = 1L; l <= 3L; l++) {
expectedFailedRows.add(
Row.withSchema(SCHEMA)
.withFieldValue("name", failValue)
.withFieldValue("number", l)
.withFieldValue("dt", LocalDateTime.parse("2020-01-01T00:00:00.09"))
.build());
}

List<Row> totalRows = new ArrayList<>(expectedSuccessfulRows);
totalRows.addAll(expectedFailedRows);

Function<TableRow, Boolean> shouldFailRow =
(Function<TableRow, Boolean> & Serializable) tr -> tr.get("name").equals(failValue);
fakeDatasetService.setShouldFailRow(shouldFailRow);

PCollectionRowTuple result = runWithConfig(config, totalRows);
PCollection<Row> failedRows = result.get("failed_rows");

PAssert.that(failedRows).containsInAnyOrder(expectedFailedRows);
p.run().waitUntilFinish();

assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec)));
assertEquals(
ROWS.size(), fakeDatasetService.getAllRows("project", "dataset", "simple_write").size());
assertTrue(
rowsEquals(
expectedSuccessfulRows,
fakeDatasetService.getAllRows("project", "dataset", "write_with_fail")));
}
}
Loading