diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 69b9c62ceea9..7c0ab785ae7e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1251,6 +1251,8 @@ abstract Builder setBadRecordErrorHandler( abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter); abstract Builder setProjectionPushdownApplied(boolean projectionPushdownApplied); + + abstract Builder setDirectReadPicosTimestampPrecision(TimestampPrecision precision); } abstract @Nullable ValueProvider getJsonTableRef(); @@ -1306,6 +1308,8 @@ abstract Builder setBadRecordErrorHandler( abstract boolean getProjectionPushdownApplied(); + abstract @Nullable TimestampPrecision getDirectReadPicosTimestampPrecision(); + /** * An enumeration type for the priority of a query. * @@ -1381,7 +1385,8 @@ private BigQueryStorageQuerySource createStorageQuerySource( getFormat(), getParseFn(), outputCoder, - getBigQueryServices()); + getBigQueryServices(), + getDirectReadPicosTimestampPrecision()); } private static final String QUERY_VALIDATION_FAILURE_ERROR = @@ -1525,7 +1530,12 @@ public PCollection expand(PBegin input) { if (selectedFields != null && selectedFields.isAccessible()) { tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFields.get()); } - beamSchema = BigQueryUtils.fromTableSchema(tableSchema); + BigQueryUtils.SchemaConversionOptions.Builder builder = + BigQueryUtils.SchemaConversionOptions.builder(); + if (getDirectReadPicosTimestampPrecision() != null) { + builder.setPicosecondTimestampMapping(getDirectReadPicosTimestampPrecision()); + } + beamSchema = BigQueryUtils.fromTableSchema(tableSchema, builder.build()); } final Coder coder = inferCoder(p.getCoderRegistry()); @@ -1710,7 +1720,8 @@ private PCollection expandForDirectRead( getParseFn(), outputCoder, getBigQueryServices(), - getProjectionPushdownApplied()))); + getProjectionPushdownApplied(), + getDirectReadPicosTimestampPrecision()))); if (beamSchema != null) { rows.setSchema( beamSchema, @@ -1731,7 +1742,8 @@ private PCollection expandForDirectRead( getParseFn(), outputCoder, getBigQueryServices(), - getProjectionPushdownApplied()); + getProjectionPushdownApplied(), + getDirectReadPicosTimestampPrecision()); List> sources; try { // This splitting logic taken from the SDF implementation of Read @@ -2293,6 +2305,18 @@ public TypedRead withMethod(TypedRead.Method method) { return toBuilder().setMethod(method).build(); } + /** + * Sets the timestamp precision to request for TIMESTAMP(12) BigQuery columns when reading via + * the Storage Read API. + * + *

This option only affects precision of TIMESTAMP(12) column reads using {@link + * Method#DIRECT_READ}. If not set the BQ client will return microsecond precision by default. + */ + public TypedRead withDirectReadPicosTimestampPrecision( + TimestampPrecision timestampPrecision) { + return toBuilder().setDirectReadPicosTimestampPrecision(timestampPrecision).build(); + } + /** See {@link DataFormat}. */ public TypedRead withFormat(DataFormat format) { return toBuilder().setFormat(format).build(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index d519ea4016ff..c2e891145acd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -109,6 +109,7 @@ static class BigQueryIOReadTranslator implements TransformPayloadTranslator transform) { if (transform.getUseAvroLogicalTypes() != null) { fieldValues.put("use_avro_logical_types", transform.getUseAvroLogicalTypes()); } + if (transform.getDirectReadPicosTimestampPrecision() != null) { + fieldValues.put( + "direct_read_picos_timestamp_precision", + toByteArray(transform.getDirectReadPicosTimestampPrecision())); + } fieldValues.put("projection_pushdown_applied", transform.getProjectionPushdownApplied()); fieldValues.put("bad_record_router", toByteArray(transform.getBadRecordRouter())); fieldValues.put( @@ -293,6 +299,13 @@ public TypedRead fromConfigRow(Row configRow, PipelineOptions options) { if (formatBytes != null) { builder = builder.setFormat((DataFormat) fromByteArray(formatBytes)); } + byte[] timestampPrecisionBytes = + configRow.getBytes("direct_read_picos_timestamp_precision"); + if (timestampPrecisionBytes != null) { + builder = + builder.setDirectReadPicosTimestampPrecision( + (TimestampPrecision) fromByteArray(timestampPrecisionBytes)); + } Collection selectedFields = configRow.getArray("selected_fields"); if (selectedFields != null && !selectedFields.isEmpty()) { builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields))); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java index 07c3273c293c..064b9bebaf16 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java @@ -40,6 +40,38 @@ /** A {@link org.apache.beam.sdk.io.Source} representing reading the results of a query. */ class BigQueryStorageQuerySource extends BigQueryStorageSourceBase { + public static BigQueryStorageQuerySource create( + String stepUuid, + ValueProvider queryProvider, + Boolean flattenResults, + Boolean useLegacySql, + QueryPriority priority, + @Nullable String location, + @Nullable String queryTempDataset, + @Nullable String queryTempProject, + @Nullable String kmsKey, + @Nullable DataFormat format, + SerializableFunction parseFn, + Coder outputCoder, + BigQueryServices bqServices, + @Nullable TimestampPrecision picosTimestampPrecision) { + return new BigQueryStorageQuerySource<>( + stepUuid, + queryProvider, + flattenResults, + useLegacySql, + priority, + location, + queryTempDataset, + queryTempProject, + kmsKey, + format, + parseFn, + outputCoder, + bqServices, + picosTimestampPrecision); + } + public static BigQueryStorageQuerySource create( String stepUuid, ValueProvider queryProvider, @@ -67,7 +99,8 @@ public static BigQueryStorageQuerySource create( format, parseFn, outputCoder, - bqServices); + bqServices, + /*picosTimestampPrecision=*/ null); } public static BigQueryStorageQuerySource create( @@ -94,7 +127,8 @@ public static BigQueryStorageQuerySource create( null, parseFn, outputCoder, - bqServices); + bqServices, + /*picosTimestampPrecision=*/ null); } private final String stepUuid; @@ -123,8 +157,9 @@ private BigQueryStorageQuerySource( @Nullable DataFormat format, SerializableFunction parseFn, Coder outputCoder, - BigQueryServices bqServices) { - super(format, null, null, parseFn, outputCoder, bqServices); + BigQueryServices bqServices, + @Nullable TimestampPrecision picosTimestampPrecision) { + super(format, null, null, parseFn, outputCoder, bqServices, picosTimestampPrecision); this.stepUuid = checkNotNull(stepUuid, "stepUuid"); this.queryProvider = checkNotNull(queryProvider, "queryProvider"); this.flattenResults = checkNotNull(flattenResults, "flattenResults"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java index d0bc655b311a..45763c6ac14f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java @@ -22,6 +22,8 @@ import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions; +import com.google.cloud.bigquery.storage.v1.AvroSerializationOptions; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1.DataFormat; import com.google.cloud.bigquery.storage.v1.ReadSession; @@ -69,6 +71,7 @@ abstract class BigQueryStorageSourceBase extends BoundedSource { protected final SerializableFunction parseFn; protected final Coder outputCoder; protected final BigQueryServices bqServices; + private final @Nullable TimestampPrecision picosTimestampPrecision; BigQueryStorageSourceBase( @Nullable DataFormat format, @@ -76,13 +79,15 @@ abstract class BigQueryStorageSourceBase extends BoundedSource { @Nullable ValueProvider rowRestrictionProvider, SerializableFunction parseFn, Coder outputCoder, - BigQueryServices bqServices) { + BigQueryServices bqServices, + @Nullable TimestampPrecision picosTimestampPrecision) { this.format = format; this.selectedFieldsProvider = selectedFieldsProvider; this.rowRestrictionProvider = rowRestrictionProvider; this.parseFn = checkNotNull(parseFn, "parseFn"); this.outputCoder = checkNotNull(outputCoder, "outputCoder"); this.bqServices = checkNotNull(bqServices, "bqServices"); + this.picosTimestampPrecision = picosTimestampPrecision; } /** @@ -131,11 +136,12 @@ public List> split( if (rowRestrictionProvider != null && rowRestrictionProvider.isAccessible()) { tableReadOptionsBuilder.setRowRestriction(rowRestrictionProvider.get()); } - readSessionBuilder.setReadOptions(tableReadOptionsBuilder); if (format != null) { readSessionBuilder.setDataFormat(format); + setPicosTimestampPrecision(tableReadOptionsBuilder, format); } + readSessionBuilder.setReadOptions(tableReadOptionsBuilder); // Setting the requested max stream count to 0, implies that the Read API backend will select // an appropriate number of streams for the Session to produce reasonable throughput. @@ -199,4 +205,61 @@ public List> split( public BoundedReader createReader(PipelineOptions options) throws IOException { throw new UnsupportedOperationException("BigQuery storage source must be split before reading"); } + + private void setPicosTimestampPrecision( + ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, DataFormat dataFormat) { + if (picosTimestampPrecision == null) { + return; + } + + if (dataFormat == DataFormat.ARROW) { + setArrowTimestampPrecision(tableReadOptionsBuilder, picosTimestampPrecision); + } else if (dataFormat == DataFormat.AVRO) { + setAvroTimestampPrecision(tableReadOptionsBuilder, picosTimestampPrecision); + } + } + + private static void setArrowTimestampPrecision( + ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, + TimestampPrecision timestampPrecision) { + ArrowSerializationOptions.PicosTimestampPrecision precision; + switch (timestampPrecision) { + case MICROS: + precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS; + break; + case NANOS: + precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS; + break; + case PICOS: + precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS; + break; + default: + throw new IllegalArgumentException( + "Unsupported timestamp precision for Storage Read API: " + timestampPrecision); + } + tableReadOptionsBuilder.setArrowSerializationOptions( + ArrowSerializationOptions.newBuilder().setPicosTimestampPrecision(precision)); + } + + private static void setAvroTimestampPrecision( + ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, + TimestampPrecision timestampPrecision) { + AvroSerializationOptions.PicosTimestampPrecision precision; + switch (timestampPrecision) { + case MICROS: + precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS; + break; + case NANOS: + precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS; + break; + case PICOS: + precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS; + break; + default: + throw new IllegalArgumentException( + "Unsupported timestamp precision for Storage Read API: " + timestampPrecision); + } + tableReadOptionsBuilder.setAvroSerializationOptions( + AvroSerializationOptions.newBuilder().setPicosTimestampPrecision(precision)); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java index 909a2551b299..8b7240158dc1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java @@ -65,7 +65,8 @@ public static BigQueryStorageTableSource create( parseFn, outputCoder, bqServices, - projectionPushdownApplied); + projectionPushdownApplied, + /*picosTimestampPrecision=*/ null); } public static BigQueryStorageTableSource create( @@ -83,7 +84,30 @@ public static BigQueryStorageTableSource create( parseFn, outputCoder, bqServices, - false); + /*projectionPushdownApplied=*/ false, + /*picosTimestampPrecision=*/ null); + } + + public static BigQueryStorageTableSource create( + ValueProvider tableRefProvider, + DataFormat format, + @Nullable ValueProvider> selectedFields, + @Nullable ValueProvider rowRestriction, + SerializableFunction parseFn, + Coder outputCoder, + BigQueryServices bqServices, + boolean projectionPushdownApplied, + @Nullable TimestampPrecision picosTimestampPrecision) { + return new BigQueryStorageTableSource<>( + tableRefProvider, + format, + selectedFields, + rowRestriction, + parseFn, + outputCoder, + bqServices, + projectionPushdownApplied, + picosTimestampPrecision); } private BigQueryStorageTableSource( @@ -94,8 +118,16 @@ private BigQueryStorageTableSource( SerializableFunction parseFn, Coder outputCoder, BigQueryServices bqServices, - boolean projectionPushdownApplied) { - super(format, selectedFields, rowRestriction, parseFn, outputCoder, bqServices); + boolean projectionPushdownApplied, + @Nullable TimestampPrecision picosTimestampPrecision) { + super( + format, + selectedFields, + rowRestriction, + parseFn, + outputCoder, + bqServices, + picosTimestampPrecision); this.tableReferenceProvider = checkNotNull(tableRefProvider, "tableRefProvider"); this.projectionPushdownApplied = projectionPushdownApplied; cachedTable = new AtomicReference<>(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java index 5b9e15f22b90..95f472f5c61b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java @@ -27,6 +27,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -69,14 +71,18 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.util.Text; import org.apache.avro.Schema; @@ -444,6 +450,63 @@ public void testTableSourceInitialSplit_MaxSplitCount() throws Exception { asList( field("name", new ArrowType.Utf8()), field("number", new ArrowType.Int(64, true)))); + // --- MICROS --- + private static final TableSchema TABLE_SCHEMA_TIMESTAMP = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("ts") + .setType("TIMESTAMP") + .setMode("REQUIRED") + .setTimestampPrecision(12L))); + + private static final org.apache.arrow.vector.types.pojo.Schema ARROW_SCHEMA_TS_MICROS = + new org.apache.arrow.vector.types.pojo.Schema( + asList(field("ts", new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")))); + + private static final String AVRO_SCHEMA_TS_MICROS_STRING = + "{\"namespace\": \"example.avro\"," + + " \"type\": \"record\"," + + " \"name\": \"RowRecord\"," + + " \"fields\": [" + + " {\"name\": \"ts\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}}" + + " ]}"; + + private static final Schema AVRO_SCHEMA_TS_MICROS = + new Schema.Parser().parse(AVRO_SCHEMA_TS_MICROS_STRING); + + // --- NANOS --- + private static final org.apache.arrow.vector.types.pojo.Schema ARROW_SCHEMA_TS_NANOS = + new org.apache.arrow.vector.types.pojo.Schema( + asList(field("ts", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")))); + + private static final String AVRO_SCHEMA_TS_NANOS_STRING = + "{\"namespace\": \"example.avro\"," + + " \"type\": \"record\"," + + " \"name\": \"RowRecord\"," + + " \"fields\": [" + + " {\"name\": \"ts\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}}" + + " ]}"; + + private static final Schema AVRO_SCHEMA_TS_NANOS = + new Schema.Parser().parse(AVRO_SCHEMA_TS_NANOS_STRING); + + // --- PICOS (string) --- + private static final org.apache.arrow.vector.types.pojo.Schema ARROW_SCHEMA_TS_PICOS = + new org.apache.arrow.vector.types.pojo.Schema(asList(field("ts", new ArrowType.Utf8()))); + + private static final String AVRO_SCHEMA_TS_PICOS_STRING = + "{\"namespace\": \"example.avro\"," + + " \"type\": \"record\"," + + " \"name\": \"RowRecord\"," + + " \"fields\": [" + + " {\"name\": \"ts\", \"type\": \"string\"}" + + " ]}"; + + private static final Schema AVRO_SCHEMA_TS_PICOS = + new Schema.Parser().parse(AVRO_SCHEMA_TS_PICOS_STRING); + private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) throws Exception { fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); @@ -2381,6 +2444,587 @@ public void testReadFromBigQueryAvroObjectsMutation() throws Exception { assertEquals(new Utf8("A"), rowA.get("name")); } + @Test + public void testTimestampPrecisionDefaultValue() { + BigQueryIO.TypedRead typedRead = + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) + .withMethod(Method.DIRECT_READ) + .from("foo.com:project:dataset.table"); + + assertNull(typedRead.getDirectReadPicosTimestampPrecision()); + } + + @Test + public void testwithDirectReadPicosTimestampPrecisionNanos() { + BigQueryIO.TypedRead typedRead = + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) + .withMethod(Method.DIRECT_READ) + .from("foo.com:project:dataset.table") + .withDirectReadPicosTimestampPrecision(TimestampPrecision.NANOS); + + assertEquals(TimestampPrecision.NANOS, typedRead.getDirectReadPicosTimestampPrecision()); + } + + @Test + public void testwithDirectReadPicosTimestampPrecisionPicos() { + BigQueryIO.TypedRead typedRead = + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) + .withMethod(Method.DIRECT_READ) + .from("foo.com:project:dataset.table") + .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS); + + assertEquals(TimestampPrecision.PICOS, typedRead.getDirectReadPicosTimestampPrecision()); + } + + @Test + public void testTableSourceInitialSplit_withDirectReadPicosTimestampPrecisionNanos_Arrow() + throws Exception { + fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); + TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); + Table table = new Table().setTableReference(tableRef).setNumBytes(100L).setSchema(TABLE_SCHEMA); + fakeDatasetService.createTable(table); + + CreateReadSessionRequest expectedRequest = + CreateReadSessionRequest.newBuilder() + .setParent("projects/project-id") + .setReadSession( + ReadSession.newBuilder() + .setTable("projects/foo.com:project/datasets/dataset/tables/table") + .setDataFormat(DataFormat.ARROW) + .setReadOptions( + ReadSession.TableReadOptions.newBuilder() + .setArrowSerializationOptions( + com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions + .newBuilder() + .setPicosTimestampPrecision( + com.google.cloud.bigquery.storage.v1 + .ArrowSerializationOptions.PicosTimestampPrecision + .TIMESTAMP_PRECISION_NANOS)))) + .setMaxStreamCount(10) + .build(); + + ReadSession.Builder builder = + ReadSession.newBuilder() + .setArrowSchema( + ArrowSchema.newBuilder().setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA))) + .setDataFormat(DataFormat.ARROW); + for (int i = 0; i < 10; i++) { + builder.addStreams(ReadStream.newBuilder().setName("stream-" + i)); + } + + StorageClient fakeStorageClient = mock(StorageClient.class); + when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build()); + + BigQueryStorageTableSource tableSource = + BigQueryStorageTableSource.create( + ValueProvider.StaticValueProvider.of(tableRef), + DataFormat.ARROW, + null, /* selectedFields */ + null, /* rowRestriction */ + new TableRowParser(), + TableRowJsonCoder.of(), + new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withStorageClient(fakeStorageClient), + false, /* projectionPushdownApplied */ + TimestampPrecision.NANOS); + + List> sources = tableSource.split(10L, options); + assertEquals(10L, sources.size()); + } + + private org.apache.arrow.vector.types.pojo.Schema getArrowSchemaTs(TimestampPrecision precision) { + switch (precision) { + case NANOS: + return ARROW_SCHEMA_TS_NANOS; + case PICOS: + return ARROW_SCHEMA_TS_PICOS; + case MICROS: + default: + return ARROW_SCHEMA_TS_MICROS; + } + } + + private Schema getAvroSchemaTs(TimestampPrecision precision) { + switch (precision) { + case NANOS: + return AVRO_SCHEMA_TS_NANOS; + case PICOS: + return AVRO_SCHEMA_TS_PICOS; + case MICROS: + default: + return AVRO_SCHEMA_TS_MICROS; + } + } + + private String getAvroSchemaStringTs(TimestampPrecision precision) { + switch (precision) { + case NANOS: + return AVRO_SCHEMA_TS_NANOS_STRING; + case PICOS: + return AVRO_SCHEMA_TS_PICOS_STRING; + case MICROS: + default: + return AVRO_SCHEMA_TS_MICROS_STRING; + } + } + + /** + * Converts ISO timestamp strings to the appropriate format for the precision. - MICROS: Long + * (epoch microseconds) - NANOS: Long (epoch nanoseconds) - PICOS: String (formatted as + * "yyyy-MM-dd HH:mm:ss.SSSSSSSSSSSS UTC") + */ + private List convertInputsForPrecision( + List isoTimestamps, TimestampPrecision precision) { + return isoTimestamps.stream() + .map( + iso -> { + if (precision == TimestampPrecision.PICOS) { + // For PICOS, input IS the string (already formatted) + return iso; + } + java.time.Instant instant = java.time.Instant.parse(iso); + if (precision == TimestampPrecision.NANOS) { + return instant.getEpochSecond() * 1_000_000_000L + instant.getNano(); + } else { + // MICROS (default) + return instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1000; + } + }) + .collect(Collectors.toList()); + } + + private ReadSession createTsReadSession( + DataFormat dataFormat, + org.apache.arrow.vector.types.pojo.Schema arrowSchema, + String avroSchemaString) { + ReadSession.Builder builder = + ReadSession.newBuilder() + .setName("readSessionName") + .addStreams(ReadStream.newBuilder().setName("streamName")) + .setDataFormat(dataFormat); + + if (dataFormat == DataFormat.ARROW) { + builder.setArrowSchema( + ArrowSchema.newBuilder().setSerializedSchema(serializeArrowSchema(arrowSchema)).build()); + } else { + builder.setAvroSchema(AvroSchema.newBuilder().setSchema(avroSchemaString).build()); + } + return builder.build(); + } + + private ReadRowsResponse createArrowTsResponse( + org.apache.arrow.vector.types.pojo.Schema arrowSchema, + TimestampPrecision precision, + List inputValues) { + ArrowRecordBatch serializedRecord; + try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(arrowSchema, allocator)) { + schemaRoot.allocateNew(); + schemaRoot.setRowCount(inputValues.size()); + + switch (precision) { + case NANOS: + TimeStampNanoTZVector nanoVector = + (TimeStampNanoTZVector) schemaRoot.getFieldVectors().get(0); + for (int i = 0; i < inputValues.size(); i++) { + nanoVector.set(i, (Long) inputValues.get(i)); + } + break; + case PICOS: + VarCharVector stringVector = (VarCharVector) schemaRoot.getFieldVectors().get(0); + for (int i = 0; i < inputValues.size(); i++) { + stringVector.set(i, new Text((String) inputValues.get(i))); + } + break; + case MICROS: + default: + TimeStampMicroTZVector microVector = + (TimeStampMicroTZVector) schemaRoot.getFieldVectors().get(0); + for (int i = 0; i < inputValues.size(); i++) { + microVector.set(i, (Long) inputValues.get(i)); + } + break; + } + + VectorUnloader unLoader = new VectorUnloader(schemaRoot); + try (org.apache.arrow.vector.ipc.message.ArrowRecordBatch records = + unLoader.getRecordBatch()) { + try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(os)), records); + serializedRecord = + ArrowRecordBatch.newBuilder() + .setRowCount(records.getLength()) + .setSerializedRecordBatch(ByteString.copyFrom(os.toByteArray())) + .build(); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + return ReadRowsResponse.newBuilder() + .setArrowRecordBatch(serializedRecord) + .setRowCount(inputValues.size()) + .setStats( + StreamStats.newBuilder() + .setProgress(Progress.newBuilder().setAtResponseStart(0.0).setAtResponseEnd(1.0))) + .build(); + } + + private ReadRowsResponse createAvroTsResponse( + Schema avroSchema, TimestampPrecision precision, List inputValues) throws Exception { + List records = new ArrayList<>(); + for (Object value : inputValues) { + GenericRecord record = new Record(avroSchema); + record.put("ts", value); + records.add(record); + } + + GenericDatumWriter writer = new GenericDatumWriter<>(avroSchema); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null); + for (GenericRecord record : records) { + writer.write(record, binaryEncoder); + } + binaryEncoder.flush(); + + return ReadRowsResponse.newBuilder() + .setAvroRows( + AvroRows.newBuilder() + .setSerializedBinaryRows(ByteString.copyFrom(outputStream.toByteArray())) + .setRowCount(records.size())) + .setRowCount(records.size()) + .setStats( + StreamStats.newBuilder() + .setProgress(Progress.newBuilder().setAtResponseStart(0.0).setAtResponseEnd(1.0))) + .build(); + } + + private void runTimestampTest( + DataFormat dataFormat, + TimestampPrecision precision, + boolean useSchema, + List inputTimestamps, + List expectedOutputs) + throws Exception { + + TimestampPrecision effectivePrecision = + (precision != null) ? precision : TimestampPrecision.MICROS; + + fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); + TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); + Table table = + new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA_TIMESTAMP); + fakeDatasetService.createTable(table); + + org.apache.arrow.vector.types.pojo.Schema arrowSchema = getArrowSchemaTs(effectivePrecision); + Schema avroSchema = getAvroSchemaTs(effectivePrecision); + String avroSchemaString = getAvroSchemaStringTs(effectivePrecision); + + List inputValues = convertInputsForPrecision(inputTimestamps, effectivePrecision); + + ReadSession readSession = createTsReadSession(dataFormat, arrowSchema, avroSchemaString); + + List readRowsResponses; + if (dataFormat == DataFormat.ARROW) { + readRowsResponses = + Lists.newArrayList(createArrowTsResponse(arrowSchema, effectivePrecision, inputValues)); + } else { + readRowsResponses = + Lists.newArrayList(createAvroTsResponse(avroSchema, effectivePrecision, inputValues)); + } + + StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable()); + when(fakeStorageClient.createReadSession(any(CreateReadSessionRequest.class))) + .thenReturn(readSession); + when(fakeStorageClient.readRows(any(ReadRowsRequest.class), eq(""))) + .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses)); + + TypedRead read = + useSchema ? BigQueryIO.readTableRowsWithSchema() : BigQueryIO.readTableRows(); + + read = + read.from("foo.com:project:dataset.table") + .withMethod(Method.DIRECT_READ) + .withFormat(dataFormat) + .withTestServices( + new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withStorageClient(fakeStorageClient)); + + if (precision != null) { + read = read.withDirectReadPicosTimestampPrecision(precision); + } + + PCollection output = p.apply(read); + + PAssert.that(output) + .satisfies( + rows -> { + List rowList = Lists.newArrayList(rows); + assertEquals(expectedOutputs.size(), rowList.size()); + + List actualTimestamps = + rowList.stream() + .map(r -> (String) r.get("ts")) + .sorted() + .collect(Collectors.toList()); + + List sortedExpected = + expectedOutputs.stream().sorted().collect(Collectors.toList()); + + assertEquals(sortedExpected, actualTimestamps); + return null; + }); + + p.run(); + } + + // ===== Avro + readTableRows ===== + + @Test + public void testReadTableRows_Avro_DefaultPrecision() throws Exception { + runTimestampTest( + DataFormat.AVRO, + null, + false, + Arrays.asList("2024-01-01T00:00:00.123456Z", "2024-06-15T12:30:45.987654Z"), + Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15 12:30:45.987654 UTC")); + } + + @Test + public void testReadTableRows_Avro_MicrosPrecision() throws Exception { + runTimestampTest( + DataFormat.AVRO, + TimestampPrecision.MICROS, + false, + Arrays.asList("2024-01-01T00:00:00.123456Z", "2024-06-15T12:30:45.987654Z"), + Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15 12:30:45.987654 UTC")); + } + + @Test + public void testReadTableRows_Avro_NanosPrecision() throws Exception { + runTimestampTest( + DataFormat.AVRO, + TimestampPrecision.NANOS, + false, + Arrays.asList("2024-01-01T00:00:00.123456789Z", "2024-06-15T12:30:45.987654321Z"), + Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15 12:30:45.987654321 UTC")); + } + + @Test + public void testReadTableRows_Avro_PicosPrecision() throws Exception { + runTimestampTest( + DataFormat.AVRO, + TimestampPrecision.PICOS, + false, + Arrays.asList( + "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 12:30:45.987654321098 UTC"), + Arrays.asList( + "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 12:30:45.987654321098 UTC")); + } + + // ===== Avro + readTableRowsWithSchema ===== + + @Test + public void testReadTableRowsWithSchema_Avro_DefaultPrecision() throws Exception { + runTimestampTest( + DataFormat.AVRO, + null, + true, + Arrays.asList("2024-01-01T00:00:00.123456Z", "2024-06-15T12:30:45.987654Z"), + Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15 12:30:45.987654 UTC")); + } + + @Test + public void testReadTableRowsWithSchema_Avro_MicrosPrecision() throws Exception { + runTimestampTest( + DataFormat.AVRO, + TimestampPrecision.MICROS, + true, + Arrays.asList("2024-01-01T00:00:00.123456Z", "2024-06-15T12:30:45.987654Z"), + Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15 12:30:45.987654 UTC")); + } + + @Test + public void testReadTableRowsWithSchema_Avro_NanosPrecision() throws Exception { + runTimestampTest( + DataFormat.AVRO, + TimestampPrecision.NANOS, + true, + Arrays.asList("2024-01-01T00:00:00.123456789Z", "2024-06-15T12:30:45.987654321Z"), + Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15 12:30:45.987654321 UTC")); + } + + @Test + public void testReadTableRowsWithSchema_Avro_PicosPrecision() throws Exception { + runTimestampTest( + DataFormat.AVRO, + TimestampPrecision.PICOS, + true, + Arrays.asList( + "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 12:30:45.987654321098 UTC"), + Arrays.asList( + "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 12:30:45.987654321098 UTC")); + } + + // ===== Arrow + readTableRows ===== + + @Test + public void testReadTableRows_Arrow_DefaultPrecision() throws Exception { + // Avro records are always converted to beam Row and then to GenericRecord in + // ArrowConversion.java + // ArrowConversion.java is a generic utility to convert Arrow records and it does not take + // into account + // the BigQuery TableSchema to determine the appropriate beam type. Historically arrow + // microsecond timestamps + // are converted to FieldType.DATETIME, which maps to joda Instants, which only supports up + // to millisecond precision + // hence precision is lost. + runTimestampTest( + DataFormat.ARROW, + null, + false, + Arrays.asList("2024-01-01T00:00:00.123456Z", "2024-06-15T12:30:45.987654Z"), + Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987 UTC")); + } + + @Test + public void testReadTableRows_Arrow_MicrosPrecision() throws Exception { + runTimestampTest( + DataFormat.ARROW, + TimestampPrecision.MICROS, + false, + Arrays.asList("2024-01-01T00:00:00.123456Z", "2024-06-15T12:30:45.987654Z"), + Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987 UTC")); + } + + @Test + public void testReadTableRows_Arrow_NanosPrecision() throws Exception { + runTimestampTest( + DataFormat.ARROW, + TimestampPrecision.NANOS, + false, + Arrays.asList("2024-01-01T00:00:00.123456789Z", "2024-06-15T12:30:45.987654321Z"), + Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15 12:30:45.987654321 UTC")); + } + + @Test + public void testReadTableRows_Arrow_PicosPrecision() throws Exception { + runTimestampTest( + DataFormat.ARROW, + TimestampPrecision.PICOS, + false, + Arrays.asList( + "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 12:30:45.987654321098 UTC"), + Arrays.asList( + "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 12:30:45.987654321098 UTC")); + } + + // ===== Arrow + readTableRowsWithSchema ===== + + @Test + public void testReadTableRowsWithSchema_Arrow_DefaultPrecision() throws Exception { + runTimestampTest( + DataFormat.ARROW, + null, + true, + Arrays.asList("2024-01-01T00:00:00.123456Z", "2024-06-15T12:30:45.987654Z"), + Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987 UTC")); + } + + @Test + public void testReadTableRowsWithSchema_Arrow_MicrosPrecision() throws Exception { + runTimestampTest( + DataFormat.ARROW, + TimestampPrecision.MICROS, + true, + Arrays.asList("2024-01-01T00:00:00.123456Z", "2024-06-15T12:30:45.987654Z"), + Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987 UTC")); + } + + @Test + public void testReadTableRowsWithSchema_Arrow_NanosPrecision() throws Exception { + runTimestampTest( + DataFormat.ARROW, + TimestampPrecision.NANOS, + true, + Arrays.asList("2024-01-01T00:00:00.123456789Z", "2024-06-15T12:30:45.987654321Z"), + Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15 12:30:45.987654321 UTC")); + } + + @Test + public void testReadTableRowsWithSchema_Arrow_PicosPrecision() throws Exception { + runTimestampTest( + DataFormat.ARROW, + TimestampPrecision.PICOS, + true, + Arrays.asList( + "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 12:30:45.987654321098 UTC"), + Arrays.asList( + "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 12:30:45.987654321098 UTC")); + } + + @Test + public void testTableSourceInitialSplit_withDirectReadPicosTimestampPrecisionNanos_Avro() + throws Exception { + fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); + TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); + Table table = new Table().setTableReference(tableRef).setNumBytes(100L).setSchema(TABLE_SCHEMA); + fakeDatasetService.createTable(table); + + // Expected request should include AvroSerializationOptions with NANOS precision + CreateReadSessionRequest expectedRequest = + CreateReadSessionRequest.newBuilder() + .setParent("projects/project-id") + .setReadSession( + ReadSession.newBuilder() + .setTable("projects/foo.com:project/datasets/dataset/tables/table") + .setDataFormat(DataFormat.AVRO) + .setReadOptions( + ReadSession.TableReadOptions.newBuilder() + .setAvroSerializationOptions( + com.google.cloud.bigquery.storage.v1.AvroSerializationOptions + .newBuilder() + .setPicosTimestampPrecision( + com.google.cloud.bigquery.storage.v1 + .AvroSerializationOptions.PicosTimestampPrecision + .TIMESTAMP_PRECISION_NANOS)))) + .setMaxStreamCount(10) + .build(); + + ReadSession.Builder builder = + ReadSession.newBuilder() + .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) + .setDataFormat(DataFormat.AVRO); + for (int i = 0; i < 10; i++) { + builder.addStreams(ReadStream.newBuilder().setName("stream-" + i)); + } + + StorageClient fakeStorageClient = mock(StorageClient.class); + when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build()); + + BigQueryStorageTableSource tableSource = + BigQueryStorageTableSource.create( + ValueProvider.StaticValueProvider.of(tableRef), + DataFormat.AVRO, + null, /* selectedFields */ + null, /* rowRestriction */ + new TableRowParser(), + TableRowJsonCoder.of(), + new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withStorageClient(fakeStorageClient), + false, /* projectionPushdownApplied */ + TimestampPrecision.NANOS); + + List> sources = tableSource.split(10L, options); + assertEquals(10L, sources.size()); + } + private static org.apache.arrow.vector.types.pojo.Field field( String name, boolean nullable, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index 5b7b5d473190..de63120c93cc 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -63,6 +63,8 @@ public class BigQueryIOTranslationTest { READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryTempProject", "query_temp_project"); READ_TRANSFORM_SCHEMA_MAPPING.put("getMethod", "method"); READ_TRANSFORM_SCHEMA_MAPPING.put("getFormat", "format"); + READ_TRANSFORM_SCHEMA_MAPPING.put( + "getDirectReadPicosTimestampPrecision", "direct_read_picos_timestamp_precision"); READ_TRANSFORM_SCHEMA_MAPPING.put("getSelectedFields", "selected_fields"); READ_TRANSFORM_SCHEMA_MAPPING.put("getRowRestriction", "row_restriction"); READ_TRANSFORM_SCHEMA_MAPPING.put("getCoder", "coder"); @@ -323,4 +325,24 @@ public void testWriteTransformRowIncludesAllFields() { .contains(fieldName)); }); } + + @Test + public void testReCreateReadTransformFromRowWithDirectReadPicosTimestampPrecision() { + BigQueryIO.TypedRead readTransform = + BigQueryIO.readTableRows() + .from("dummyproject:dummydataset.dummytable") + .withMethod(TypedRead.Method.DIRECT_READ) + .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS); + + BigQueryIOTranslation.BigQueryIOReadTranslator translator = + new BigQueryIOTranslation.BigQueryIOReadTranslator(); + Row row = translator.toConfigRow(readTransform); + + BigQueryIO.TypedRead readTransformFromRow = + (BigQueryIO.TypedRead) + translator.fromConfigRow(row, PipelineOptionsFactory.create()); + + assertEquals( + TimestampPrecision.PICOS, readTransformFromRow.getDirectReadPicosTimestampPrecision()); + } }