From 24255ac84b7e240cbabf598f30e286b66775b4bb Mon Sep 17 00:00:00 2001 From: Jeff Kinard Date: Tue, 27 Aug 2024 04:28:55 -0400 Subject: [PATCH] Add schema to SpannerIO read (#32008) * Add schema to SpannerIO read Signed-off-by: Jeffrey Kinard * fix spotless Signed-off-by: Jeffrey Kinard * fix spotless Signed-off-by: Jeffrey Kinard * address comments Signed-off-by: Jeffrey Kinard * spotless Signed-off-by: Jeffrey Kinard --------- Signed-off-by: Jeffrey Kinard --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 66 +++++++++++++++++- .../io/gcp/spanner/SpannerQuerySourceDef.java | 56 ++++++++++++++++ .../SpannerSchemaRetrievalException.java | 29 ++++++++ .../sdk/io/gcp/spanner/SpannerSourceDef.java | 36 ++++++++++ .../io/gcp/spanner/SpannerTableSourceDef.java | 63 +++++++++++++++++ .../beam/sdk/io/gcp/spanner/StructUtils.java | 67 +++++++++++++++++++ .../sdk/io/gcp/spanner/SpannerIOReadTest.java | 26 +++++++ .../sdk/io/gcp/spanner/SpannerReadIT.java | 34 ++++++++++ .../sdk/io/gcp/spanner/StructUtilsTest.java | 66 ++++++++++++++++++ 9 files changed, 442 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerQuerySourceDef.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaRetrievalException.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSourceDef.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTableSourceDef.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index e5fd168df1ba3..0437e41459047 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -475,6 +475,14 @@ public static ReadAll readAll() { .build(); } + public static Read readWithSchema() { + return read() + .withBeamRowConverters( + TypeDescriptor.of(Struct.class), + StructUtils.structToBeamRow(), + StructUtils.structFromBeamRow()); + } + /** * Returns a transform that creates a batch transaction. By default, {@link * TimestampBound#strong()} transaction is created, to override this use {@link @@ -708,6 +716,12 @@ static ServiceCallMetric buildServiceCallMetricForReadOp( @AutoValue public abstract static class Read extends PTransform> { + interface ToBeamRowFunction + extends SerializableFunction> {} + + interface FromBeamRowFunction + extends SerializableFunction> {} + abstract SpannerConfig getSpannerConfig(); abstract ReadOperation getReadOperation(); @@ -720,6 +734,12 @@ public abstract static class Read extends PTransform abstract Boolean getBatching(); + abstract @Nullable TypeDescriptor getTypeDescriptor(); + + abstract @Nullable ToBeamRowFunction getToBeamRowFn(); + + abstract @Nullable FromBeamRowFunction getFromBeamRowFn(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -737,9 +757,26 @@ abstract static class Builder { abstract Builder setBatching(Boolean batching); + abstract Builder setTypeDescriptor(TypeDescriptor typeDescriptor); + + abstract Builder setToBeamRowFn(ToBeamRowFunction toRowFn); + + abstract Builder setFromBeamRowFn(FromBeamRowFunction fromRowFn); + abstract Read build(); } + public Read withBeamRowConverters( + TypeDescriptor typeDescriptor, + ToBeamRowFunction toRowFn, + FromBeamRowFunction fromRowFn) { + return toBuilder() + .setTypeDescriptor(typeDescriptor) + .setToBeamRowFn(toRowFn) + .setFromBeamRowFn(fromRowFn) + .build(); + } + /** Specifies the Cloud Spanner configuration. */ public Read withSpannerConfig(SpannerConfig spannerConfig) { return toBuilder().setSpannerConfig(spannerConfig).build(); @@ -876,6 +913,14 @@ public Read withHighPriority() { return withSpannerConfig(config.withRpcPriority(RpcPriority.HIGH)); } + private SpannerSourceDef createSourceDef() { + if (getReadOperation().getQuery() != null) { + return SpannerQuerySourceDef.create(getSpannerConfig(), getReadOperation().getQuery()); + } + return SpannerTableSourceDef.create( + getSpannerConfig(), getReadOperation().getTable(), getReadOperation().getColumns()); + } + @Override public PCollection expand(PBegin input) { getSpannerConfig().validate(); @@ -905,13 +950,32 @@ public PCollection expand(PBegin input) { "SpannerIO.read() requires query OR table to set with withTable OR withQuery method."); } + final SpannerSourceDef sourceDef = createSourceDef(); + + Schema beamSchema = null; + if (getTypeDescriptor() != null && getToBeamRowFn() != null && getFromBeamRowFn() != null) { + beamSchema = sourceDef.getBeamSchema(); + } + ReadAll readAll = readAll() .withSpannerConfig(getSpannerConfig()) .withTimestampBound(getTimestampBound()) .withBatching(getBatching()) .withTransaction(getTransaction()); - return input.apply(Create.of(getReadOperation())).apply("Execute query", readAll); + + PCollection rows = + input.apply(Create.of(getReadOperation())).apply("Execute query", readAll); + + if (beamSchema != null) { + rows.setSchema( + beamSchema, + getTypeDescriptor(), + getToBeamRowFn().apply(beamSchema), + getFromBeamRowFn().apply(beamSchema)); + } + + return rows; } SerializableFunction getFormatFn() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerQuerySourceDef.java new file mode 100644 index 0000000000000..f9896b5e64f29 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerQuerySourceDef.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.apache.beam.sdk.io.gcp.spanner.StructUtils.structTypeToBeamRowSchema; + +import com.google.cloud.spanner.ReadContext; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Statement; +import org.apache.beam.sdk.schemas.Schema; + +class SpannerQuerySourceDef implements SpannerSourceDef { + + private final SpannerConfig config; + private final Statement query; + + static SpannerQuerySourceDef create(SpannerConfig config, Statement query) { + return new SpannerQuerySourceDef(config, query); + } + + private SpannerQuerySourceDef(SpannerConfig config, Statement query) { + this.config = config; + this.query = query; + } + + /** {@inheritDoc} */ + @Override + public Schema getBeamSchema() { + Schema beamSchema; + try (SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(config)) { + try (ReadContext readContext = spannerAccessor.getDatabaseClient().singleUse()) { + ResultSet result = readContext.analyzeQuery(query, ReadContext.QueryAnalyzeMode.PLAN); + result.next(); + beamSchema = structTypeToBeamRowSchema(result.getMetadata().getRowType(), true); + } + } catch (Exception e) { + throw new SpannerSchemaRetrievalException("Exception while trying to retrieve schema", e); + } + return beamSchema; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaRetrievalException.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaRetrievalException.java new file mode 100644 index 0000000000000..6f1d272d09b4f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaRetrievalException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +/** Exception to signal that Spanner schema retrieval failed. */ +public class SpannerSchemaRetrievalException extends RuntimeException { + SpannerSchemaRetrievalException(String message, Throwable cause) { + super(message, cause); + } + + SpannerSchemaRetrievalException(String message) { + super(message); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSourceDef.java new file mode 100644 index 0000000000000..fe5e1333e6952 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSourceDef.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import java.io.Serializable; +import org.apache.beam.sdk.schemas.Schema; + +/** + * Represents a source used for {@link SpannerIO#read()}. Currently, this could be either a table or + * a query. Direct read sources are not yet supported. + */ +interface SpannerSourceDef extends Serializable { + + /** + * Extract the Beam {@link Schema} corresponding to this source. + * + * @return Beam schema of the source + * @throws SpannerSchemaRetrievalException if schema retrieval fails + */ + Schema getBeamSchema(); +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTableSourceDef.java new file mode 100644 index 0000000000000..aca9e86c9750a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTableSourceDef.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.apache.beam.sdk.io.gcp.spanner.StructUtils.structTypeToBeamRowSchema; + +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Options; +import com.google.cloud.spanner.ReadContext; +import com.google.cloud.spanner.ResultSet; +import org.apache.beam.sdk.schemas.Schema; + +class SpannerTableSourceDef implements SpannerSourceDef { + + private final SpannerConfig config; + private final String table; + private final Iterable columns; + + static SpannerTableSourceDef create( + SpannerConfig config, String table, Iterable columns) { + return new SpannerTableSourceDef(config, table, columns); + } + + private SpannerTableSourceDef(SpannerConfig config, String table, Iterable columns) { + this.table = table; + this.config = config; + this.columns = columns; + } + + /** {@inheritDoc} */ + @Override + public Schema getBeamSchema() { + Schema beamSchema; + try (SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(config)) { + try (ReadContext readContext = spannerAccessor.getDatabaseClient().singleUse()) { + ResultSet result = readContext.read(table, KeySet.all(), columns, Options.limit(1)); + if (result.next()) { + beamSchema = structTypeToBeamRowSchema(result.getMetadata().getRowType(), true); + } else { + throw new SpannerSchemaRetrievalException("Cannot find Spanner table."); + } + } + } catch (Exception e) { + throw new SpannerSchemaRetrievalException("Exception while trying to retrieve schema", e); + } + return beamSchema; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java index 1f32bddba7b9c..6183ac9768f71 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java @@ -24,6 +24,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.Type; +import com.google.spanner.v1.StructType; import java.math.BigDecimal; import java.util.HashMap; import java.util.List; @@ -38,6 +39,20 @@ final class StructUtils { + private static final SpannerIO.Read.ToBeamRowFunction STRUCT_TO_BEAM_ROW_FUNCTION = + schema -> (Struct struct) -> structToBeamRow(struct, schema); + + public static SpannerIO.Read.ToBeamRowFunction structToBeamRow() { + return STRUCT_TO_BEAM_ROW_FUNCTION; + } + + private static final SpannerIO.Read.FromBeamRowFunction STRUCT_FROM_BEAM_ROW_FUNCTION = + ignored -> StructUtils::beamRowToStruct; + + public static SpannerIO.Read.FromBeamRowFunction structFromBeamRow() { + return STRUCT_FROM_BEAM_ROW_FUNCTION; + } + // It's not possible to pass nulls as values even with a field is nullable @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -52,6 +67,58 @@ public static Row structToBeamRow(Struct struct, Schema schema) { return Row.withSchema(schema).withFieldValues(structValues).build(); } + public static Schema structTypeToBeamRowSchema(StructType structType, boolean isRead) { + Schema.Builder beamSchema = Schema.builder(); + structType + .getFieldsList() + .forEach( + field -> { + Schema.FieldType fieldType; + try { + fieldType = convertSpannerTypeToBeamFieldType(field.getType()); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Error processing struct to row: " + e.getMessage()); + } + // Treat reads from Spanner as Nullable and leave Null handling to Spanner + if (isRead) { + beamSchema.addNullableField(field.getName(), fieldType); + } else { + beamSchema.addField(field.getName(), fieldType); + } + }); + return beamSchema.build(); + } + + public static Schema.FieldType convertSpannerTypeToBeamFieldType( + com.google.spanner.v1.Type spannerType) { + switch (spannerType.getCode()) { + case BOOL: + return Schema.FieldType.BOOLEAN; + case BYTES: + return Schema.FieldType.BYTES; + case TIMESTAMP: + case DATE: + return Schema.FieldType.DATETIME; + case INT64: + return Schema.FieldType.INT64; + case FLOAT32: + return Schema.FieldType.FLOAT; + case FLOAT64: + return Schema.FieldType.DOUBLE; + case NUMERIC: + return Schema.FieldType.DECIMAL; + case ARRAY: + return Schema.FieldType.array( + convertSpannerTypeToBeamFieldType(spannerType.getArrayElementType())); + case STRUCT: + throw new IllegalArgumentException( + String.format("Unsupported type '%s'.", spannerType.getCode())); + default: + return Schema.FieldType.STRING; + } + } + public static Struct beamRowToStruct(Row row) { Struct.Builder structBuilder = Struct.newBuilder(); List fields = row.getSchema().getFields(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java index 697300d1c4135..3d4b7818c6516 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java @@ -17,9 +17,12 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; @@ -62,6 +65,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -755,6 +759,28 @@ private void runReadAllPipeline(SpannerIO.ReadAll readAllTransform) { verifyQueryRequestMetricWasSet(spannerConfig, QUERY_NAME, "ok", 3); } + @Test + public void runReadFailsToRetrieveSchema() { + PCollection spannerRows = + pipeline.apply( + SpannerIO.read() + .withInstanceId(INSTANCE_ID) + .withDatabaseId(DATABASE_ID) + .withTable(TABLE_ID) + .withColumns("id", "name")); + + Exception exception = assertThrows(IllegalStateException.class, spannerRows::getSchema); + checkMessage("Cannot call getSchema when there is no schema", exception.getMessage()); + } + + private void checkMessage(String substring, @Nullable String message) { + if (message != null) { + assertThat(message, containsString(substring)); + } else { + fail(); + } + } + private long getRequestMetricCount(HashMap baseLabels) { MonitoringInfoMetricName name = MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java index 8dff31cf4b773..a54813dfcad16 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import static org.junit.Assert.assertEquals; + import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.spanner.BatchClient; import com.google.cloud.spanner.Database; @@ -38,6 +40,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; @@ -189,6 +192,37 @@ public void testRead() throws Exception { p.run().waitUntilFinish(); } + @Test + public void testReadWithSchema() throws Exception { + + SpannerConfig spannerConfig = createSpannerConfig(); + + PCollectionView tx = + p.apply( + "Create tx", + SpannerIO.createTransaction() + .withSpannerConfig(spannerConfig) + .withTimestampBound(TimestampBound.strong())); + + PCollection output = + p.apply( + "read db", + SpannerIO.readWithSchema() + .withSpannerConfig(spannerConfig) + .withTable(options.getTable()) + .withColumns("Key", "Value") + .withTransaction(tx)); + Schema schema = + Schema.of( + Schema.Field.nullable("Key", Schema.FieldType.INT64), + Schema.Field.nullable("Value", Schema.FieldType.STRING)); + assertEquals(output.getSchema(), schema); + + PAssert.thatSingleton(output.apply("Count rows", Count.globally())).isEqualTo(5L); + + p.run().waitUntilFinish(); + } + @Test @Ignore("https://github.com/apache/beam/issues/26208 Test stuck indefinitely") public void testReadWithDataBoost() throws Exception { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java index ca9eb025f7706..1cdf9afa7de1a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java @@ -30,6 +30,8 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.Type; +import com.google.spanner.v1.StructType; +import com.google.spanner.v1.TypeCode; import java.math.BigDecimal; import java.util.List; import org.apache.beam.sdk.schemas.Schema; @@ -262,6 +264,70 @@ public void testBeamTypeToSpannerTypeTranslation() { beamTypeToSpannerType(Schema.FieldType.array(Schema.FieldType.INT64))); } + @Test + public void testStructTypeToBeamRowSchema() { + assertEquals( + StructUtils.structTypeToBeamRowSchema(createStructType(), true), createRowSchema()); + } + + @Test + public void testStructTypeToBeamRowSchemaFailsTypeNotSupported() { + StructType structTypeWithStruct = + createStructType() + .toBuilder() + .addFields(getFieldForTypeCode("f_struct", TypeCode.STRUCT)) + .build(); + + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> StructUtils.structTypeToBeamRowSchema(structTypeWithStruct, true)); + checkMessage( + "Error processing struct to row: Unsupported type 'STRUCT'.", exception.getMessage()); + } + + private StructType.Field getFieldForTypeCode(String name, TypeCode typeCode) { + return StructType.Field.newBuilder() + .setName(name) + .setType(com.google.spanner.v1.Type.newBuilder().setCode(typeCode)) + .build(); + } + + private StructType createStructType() { + return StructType.newBuilder() + .addFields(getFieldForTypeCode("f_int64", TypeCode.INT64)) + .addFields(getFieldForTypeCode("f_float32", TypeCode.FLOAT32)) + .addFields(getFieldForTypeCode("f_float64", TypeCode.FLOAT64)) + .addFields(getFieldForTypeCode("f_string", TypeCode.STRING)) + .addFields(getFieldForTypeCode("f_bytes", TypeCode.BYTES)) + .addFields(getFieldForTypeCode("f_timestamp", TypeCode.TIMESTAMP)) + .addFields(getFieldForTypeCode("f_date", TypeCode.DATE)) + .addFields(getFieldForTypeCode("f_numeric", TypeCode.NUMERIC)) + .addFields( + StructType.Field.newBuilder() + .setName("f_array") + .setType( + com.google.spanner.v1.Type.newBuilder() + .setCode(TypeCode.ARRAY) + .setArrayElementType( + com.google.spanner.v1.Type.newBuilder().setCode(TypeCode.INT64))) + .build()) + .build(); + } + + private Schema createRowSchema() { + return Schema.of( + Schema.Field.nullable("f_int64", Schema.FieldType.INT64), + Schema.Field.nullable("f_float32", Schema.FieldType.FLOAT), + Schema.Field.nullable("f_float64", Schema.FieldType.DOUBLE), + Schema.Field.nullable("f_string", Schema.FieldType.STRING), + Schema.Field.nullable("f_bytes", Schema.FieldType.BYTES), + Schema.Field.nullable("f_timestamp", Schema.FieldType.DATETIME), + Schema.Field.nullable("f_date", Schema.FieldType.DATETIME), + Schema.Field.nullable("f_numeric", Schema.FieldType.DECIMAL), + Schema.Field.nullable("f_array", Schema.FieldType.array(Schema.FieldType.INT64))); + } + private Schema.Builder getSchemaTemplate() { return Schema.builder() .addNullableField("f_int64", Schema.FieldType.INT64)