Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,8 @@ abstract Builder<T> setBadRecordErrorHandler(
abstract Builder<T> setBadRecordRouter(BadRecordRouter badRecordRouter);

abstract Builder<T> setProjectionPushdownApplied(boolean projectionPushdownApplied);

abstract Builder<T> setDirectReadPicosTimestampPrecision(TimestampPrecision precision);
}

abstract @Nullable ValueProvider<String> getJsonTableRef();
Expand Down Expand Up @@ -1306,6 +1308,8 @@ abstract Builder<T> setBadRecordErrorHandler(

abstract boolean getProjectionPushdownApplied();

abstract @Nullable TimestampPrecision getDirectReadPicosTimestampPrecision();

/**
* An enumeration type for the priority of a query.
*
Expand Down Expand Up @@ -1381,7 +1385,8 @@ private BigQueryStorageQuerySource<T> createStorageQuerySource(
getFormat(),
getParseFn(),
outputCoder,
getBigQueryServices());
getBigQueryServices(),
getDirectReadPicosTimestampPrecision());
}

private static final String QUERY_VALIDATION_FAILURE_ERROR =
Expand Down Expand Up @@ -1525,7 +1530,12 @@ public PCollection<T> expand(PBegin input) {
if (selectedFields != null && selectedFields.isAccessible()) {
tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFields.get());
}
beamSchema = BigQueryUtils.fromTableSchema(tableSchema);
BigQueryUtils.SchemaConversionOptions.Builder builder =
BigQueryUtils.SchemaConversionOptions.builder();
if (getDirectReadPicosTimestampPrecision() != null) {
builder.setPicosecondTimestampMapping(getDirectReadPicosTimestampPrecision());
}
beamSchema = BigQueryUtils.fromTableSchema(tableSchema, builder.build());
}

final Coder<T> coder = inferCoder(p.getCoderRegistry());
Expand Down Expand Up @@ -1710,7 +1720,8 @@ private PCollection<T> expandForDirectRead(
getParseFn(),
outputCoder,
getBigQueryServices(),
getProjectionPushdownApplied())));
getProjectionPushdownApplied(),
getDirectReadPicosTimestampPrecision())));
if (beamSchema != null) {
rows.setSchema(
beamSchema,
Expand All @@ -1731,7 +1742,8 @@ private PCollection<T> expandForDirectRead(
getParseFn(),
outputCoder,
getBigQueryServices(),
getProjectionPushdownApplied());
getProjectionPushdownApplied(),
getDirectReadPicosTimestampPrecision());
List<? extends BoundedSource<T>> sources;
try {
// This splitting logic taken from the SDF implementation of Read
Expand Down Expand Up @@ -2293,6 +2305,18 @@ public TypedRead<T> withMethod(TypedRead.Method method) {
return toBuilder().setMethod(method).build();
}

/**
* Sets the timestamp precision to request for TIMESTAMP(12) BigQuery columns when reading via
* the Storage Read API.
*
* <p>This option only affects precision of TIMESTAMP(12) column reads using {@link
* Method#DIRECT_READ}. If not set the BQ client will return microsecond precision by default.
*/
public TypedRead<T> withDirectReadPicosTimestampPrecision(
TimestampPrecision timestampPrecision) {
return toBuilder().setDirectReadPicosTimestampPrecision(timestampPrecision).build();
}

/** See {@link DataFormat}. */
public TypedRead<T> withFormat(DataFormat format) {
return toBuilder().setFormat(format).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ static class BigQueryIOReadTranslator implements TransformPayloadTranslator<Type
.addNullableBooleanField("projection_pushdown_applied")
.addNullableByteArrayField("bad_record_router")
.addNullableByteArrayField("bad_record_error_handler")
.addNullableByteArrayField("direct_read_picos_timestamp_precision")
.build();

public static final String BIGQUERY_READ_TRANSFORM_URN =
Expand Down Expand Up @@ -195,6 +196,11 @@ public Row toConfigRow(TypedRead<?> transform) {
if (transform.getUseAvroLogicalTypes() != null) {
fieldValues.put("use_avro_logical_types", transform.getUseAvroLogicalTypes());
}
if (transform.getDirectReadPicosTimestampPrecision() != null) {
fieldValues.put(
"direct_read_picos_timestamp_precision",
toByteArray(transform.getDirectReadPicosTimestampPrecision()));
}
fieldValues.put("projection_pushdown_applied", transform.getProjectionPushdownApplied());
fieldValues.put("bad_record_router", toByteArray(transform.getBadRecordRouter()));
fieldValues.put(
Expand Down Expand Up @@ -293,6 +299,13 @@ public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
if (formatBytes != null) {
builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
}
byte[] timestampPrecisionBytes =
configRow.getBytes("direct_read_picos_timestamp_precision");
if (timestampPrecisionBytes != null) {
builder =
builder.setDirectReadPicosTimestampPrecision(
(TimestampPrecision) fromByteArray(timestampPrecisionBytes));
}
Collection<String> selectedFields = configRow.getArray("selected_fields");
if (selectedFields != null && !selectedFields.isEmpty()) {
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,38 @@
/** A {@link org.apache.beam.sdk.io.Source} representing reading the results of a query. */
class BigQueryStorageQuerySource<T> extends BigQueryStorageSourceBase<T> {

public static <T> BigQueryStorageQuerySource<T> create(
String stepUuid,
ValueProvider<String> queryProvider,
Boolean flattenResults,
Boolean useLegacySql,
QueryPriority priority,
@Nullable String location,
@Nullable String queryTempDataset,
@Nullable String queryTempProject,
@Nullable String kmsKey,
@Nullable DataFormat format,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices,
@Nullable TimestampPrecision picosTimestampPrecision) {
return new BigQueryStorageQuerySource<>(
stepUuid,
queryProvider,
flattenResults,
useLegacySql,
priority,
location,
queryTempDataset,
queryTempProject,
kmsKey,
format,
parseFn,
outputCoder,
bqServices,
picosTimestampPrecision);
}

public static <T> BigQueryStorageQuerySource<T> create(
String stepUuid,
ValueProvider<String> queryProvider,
Expand Down Expand Up @@ -67,7 +99,8 @@ public static <T> BigQueryStorageQuerySource<T> create(
format,
parseFn,
outputCoder,
bqServices);
bqServices,
/*picosTimestampPrecision=*/ null);
}

public static <T> BigQueryStorageQuerySource<T> create(
Expand All @@ -94,7 +127,8 @@ public static <T> BigQueryStorageQuerySource<T> create(
null,
parseFn,
outputCoder,
bqServices);
bqServices,
/*picosTimestampPrecision=*/ null);
}

private final String stepUuid;
Expand Down Expand Up @@ -123,8 +157,9 @@ private BigQueryStorageQuerySource(
@Nullable DataFormat format,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices) {
super(format, null, null, parseFn, outputCoder, bqServices);
BigQueryServices bqServices,
@Nullable TimestampPrecision picosTimestampPrecision) {
super(format, null, null, parseFn, outputCoder, bqServices, picosTimestampPrecision);
this.stepUuid = checkNotNull(stepUuid, "stepUuid");
this.queryProvider = checkNotNull(queryProvider, "queryProvider");
this.flattenResults = checkNotNull(flattenResults, "flattenResults");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions;
import com.google.cloud.bigquery.storage.v1.AvroSerializationOptions;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadSession;
Expand Down Expand Up @@ -69,20 +71,23 @@ abstract class BigQueryStorageSourceBase<T> extends BoundedSource<T> {
protected final SerializableFunction<SchemaAndRecord, T> parseFn;
protected final Coder<T> outputCoder;
protected final BigQueryServices bqServices;
private final @Nullable TimestampPrecision picosTimestampPrecision;

BigQueryStorageSourceBase(
@Nullable DataFormat format,
@Nullable ValueProvider<List<String>> selectedFieldsProvider,
@Nullable ValueProvider<String> rowRestrictionProvider,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices) {
BigQueryServices bqServices,
@Nullable TimestampPrecision picosTimestampPrecision) {
this.format = format;
this.selectedFieldsProvider = selectedFieldsProvider;
this.rowRestrictionProvider = rowRestrictionProvider;
this.parseFn = checkNotNull(parseFn, "parseFn");
this.outputCoder = checkNotNull(outputCoder, "outputCoder");
this.bqServices = checkNotNull(bqServices, "bqServices");
this.picosTimestampPrecision = picosTimestampPrecision;
}

/**
Expand Down Expand Up @@ -131,11 +136,12 @@ public List<BigQueryStorageStreamSource<T>> split(
if (rowRestrictionProvider != null && rowRestrictionProvider.isAccessible()) {
tableReadOptionsBuilder.setRowRestriction(rowRestrictionProvider.get());
}
readSessionBuilder.setReadOptions(tableReadOptionsBuilder);

if (format != null) {
readSessionBuilder.setDataFormat(format);
setPicosTimestampPrecision(tableReadOptionsBuilder, format);
}
readSessionBuilder.setReadOptions(tableReadOptionsBuilder);

// Setting the requested max stream count to 0, implies that the Read API backend will select
// an appropriate number of streams for the Session to produce reasonable throughput.
Expand Down Expand Up @@ -199,4 +205,61 @@ public List<BigQueryStorageStreamSource<T>> split(
public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
throw new UnsupportedOperationException("BigQuery storage source must be split before reading");
}

private void setPicosTimestampPrecision(
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, DataFormat dataFormat) {
if (picosTimestampPrecision == null) {
return;
}

if (dataFormat == DataFormat.ARROW) {
setArrowTimestampPrecision(tableReadOptionsBuilder, picosTimestampPrecision);
} else if (dataFormat == DataFormat.AVRO) {
setAvroTimestampPrecision(tableReadOptionsBuilder, picosTimestampPrecision);
}
}

private static void setArrowTimestampPrecision(
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder,
TimestampPrecision timestampPrecision) {
ArrowSerializationOptions.PicosTimestampPrecision precision;
switch (timestampPrecision) {
case MICROS:
precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
break;
case NANOS:
precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
break;
case PICOS:
precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
break;
default:
throw new IllegalArgumentException(
"Unsupported timestamp precision for Storage Read API: " + timestampPrecision);
}
tableReadOptionsBuilder.setArrowSerializationOptions(
ArrowSerializationOptions.newBuilder().setPicosTimestampPrecision(precision));
}

private static void setAvroTimestampPrecision(
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder,
TimestampPrecision timestampPrecision) {
AvroSerializationOptions.PicosTimestampPrecision precision;
switch (timestampPrecision) {
case MICROS:
precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
break;
case NANOS:
precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
break;
case PICOS:
precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
break;
default:
throw new IllegalArgumentException(
"Unsupported timestamp precision for Storage Read API: " + timestampPrecision);
}
tableReadOptionsBuilder.setAvroSerializationOptions(
AvroSerializationOptions.newBuilder().setPicosTimestampPrecision(precision));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public static <T> BigQueryStorageTableSource<T> create(
parseFn,
outputCoder,
bqServices,
projectionPushdownApplied);
projectionPushdownApplied,
/*picosTimestampPrecision=*/ null);
}

public static <T> BigQueryStorageTableSource<T> create(
Expand All @@ -83,7 +84,30 @@ public static <T> BigQueryStorageTableSource<T> create(
parseFn,
outputCoder,
bqServices,
false);
/*projectionPushdownApplied=*/ false,
/*picosTimestampPrecision=*/ null);
}

public static <T> BigQueryStorageTableSource<T> create(
ValueProvider<TableReference> tableRefProvider,
DataFormat format,
@Nullable ValueProvider<List<String>> selectedFields,
@Nullable ValueProvider<String> rowRestriction,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices,
boolean projectionPushdownApplied,
@Nullable TimestampPrecision picosTimestampPrecision) {
return new BigQueryStorageTableSource<>(
tableRefProvider,
format,
selectedFields,
rowRestriction,
parseFn,
outputCoder,
bqServices,
projectionPushdownApplied,
picosTimestampPrecision);
}

private BigQueryStorageTableSource(
Expand All @@ -94,8 +118,16 @@ private BigQueryStorageTableSource(
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices,
boolean projectionPushdownApplied) {
super(format, selectedFields, rowRestriction, parseFn, outputCoder, bqServices);
boolean projectionPushdownApplied,
@Nullable TimestampPrecision picosTimestampPrecision) {
super(
format,
selectedFields,
rowRestriction,
parseFn,
outputCoder,
bqServices,
picosTimestampPrecision);
this.tableReferenceProvider = checkNotNull(tableRefProvider, "tableRefProvider");
this.projectionPushdownApplied = projectionPushdownApplied;
cachedTable = new AtomicReference<>();
Expand Down
Loading
Loading