Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seamlessly propagate schema changes made after pipelines starts running for Spanner Change Streams to BigQuery template #1730

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private synchronized Dataset getDatasetIfExists(String datasetId) throws Illegal
* @return the table, if it exists.
* @throws IllegalStateException if the given table name does not exist in the dataset.
*/
private synchronized Table getTableIfExists(String tableId) throws IllegalStateException {
public synchronized Table getTableIfExists(String tableId) throws IllegalStateException {
checkHasDataset();
Table table = dataset.get(tableId);
if (table == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.TrackedSpannerTable;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerChangeStreamsUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerToBigQueryUtils;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
Expand All @@ -53,39 +49,18 @@

private static final Logger LOG = LoggerFactory.getLogger(BigQueryDynamicDestinations.class);

private final Map<String, TrackedSpannerTable> spannerTableByName;
private final String bigQueryProject, bigQueryDataset, bigQueryTableTemplate;
private final Boolean useStorageWriteApi;
private final ImmutableSet<String> ignoreFields;

public static BigQueryDynamicDestinations of(
BigQueryDynamicDestinationsOptions bigQueryDynamicDestinationsOptions) {
Dialect dialect = getDialect(bigQueryDynamicDestinationsOptions.getSpannerConfig());
try (SpannerAccessor spannerAccessor =
SpannerAccessor.getOrCreate(bigQueryDynamicDestinationsOptions.getSpannerConfig())) {
Map<String, TrackedSpannerTable> spannerTableByName =
new SpannerChangeStreamsUtils(
spannerAccessor.getDatabaseClient(),
bigQueryDynamicDestinationsOptions.getChangeStreamName(),
dialect)
.getSpannerTableByName();
return new BigQueryDynamicDestinations(
bigQueryDynamicDestinationsOptions, spannerTableByName);
} catch (RuntimeException e) {
String errorMessage =
String.format(
"Caught exception when getting BigQueryDynamicDestinations, message: %s,"
+ " cause: %s",
Optional.ofNullable(e.getMessage()), e.getCause());
LOG.error(errorMessage);
throw new RuntimeException(errorMessage, e);
}
return new BigQueryDynamicDestinations(bigQueryDynamicDestinationsOptions);

Check warning on line 59 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L59

Added line #L59 was not covered by tests
}

private BigQueryDynamicDestinations(
BigQueryDynamicDestinationsOptions bigQueryDynamicDestinationsOptions,
Map<String, TrackedSpannerTable> spannerTableByName) {
this.spannerTableByName = spannerTableByName;
BigQueryDynamicDestinationsOptions bigQueryDynamicDestinationsOptions) {

Check warning on line 63 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L63

Added line #L63 was not covered by tests
this.ignoreFields = bigQueryDynamicDestinationsOptions.getIgnoreFields();
this.bigQueryProject = bigQueryDynamicDestinationsOptions.getBigQueryProject();
this.bigQueryDataset = bigQueryDynamicDestinationsOptions.getBigQueryDataset();
Expand Down Expand Up @@ -118,10 +93,8 @@
@Override
public TableSchema getSchema(KV<TableId, TableRow> destination) {
TableRow tableRow = destination.getValue();
String spannerTableName =
(String) tableRow.get(BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME);
TrackedSpannerTable spannerTable = spannerTableByName.get(spannerTableName);
List<TableFieldSchema> fields = getFields(spannerTable);
// Get List<TableFieldSchema> for both user columns and metadata columns.
List<TableFieldSchema> fields = getFields(tableRow);

Check warning on line 97 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L97

Added line #L97 was not covered by tests
List<TableFieldSchema> filteredFields = new ArrayList<>();
for (TableFieldSchema field : fields) {
if (!ignoreFields.contains(field.getName())) {
Expand All @@ -132,9 +105,12 @@
return new TableSchema().setFields(filteredFields);
}

private List<TableFieldSchema> getFields(TrackedSpannerTable spannerTable) {
// Returns List<TableFieldSchema> for user columns and metadata columns based on the parameter
// TableRow.
private List<TableFieldSchema> getFields(TableRow tableRow) {
// Add all user data fields (excluding metadata fields stored in metadataColumns).
List<TableFieldSchema> fields =
SpannerToBigQueryUtils.spannerColumnsToBigQueryIOFields(spannerTable.getAllColumns());
SpannerToBigQueryUtils.tableRowColumnsToBigQueryIOFields(tableRow, this.useStorageWriteApi);

Check warning on line 113 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L113

Added line #L113 was not covered by tests

// Add all metadata fields.
String requiredMode = Field.Mode.REQUIRED.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.TrackedSpannerColumn;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.TrackedSpannerTable;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SchemaUpdateUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerChangeStreamsUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerToBigQueryUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
Expand Down Expand Up @@ -132,6 +133,7 @@
private transient CallContextConfigurator callContextConfigurator;
private transient boolean seenException;
private Boolean useStorageWriteApi;
private Dialect dialect;

public FailsafeModJsonToTableRowFn(
SpannerConfig spannerConfig,
Expand All @@ -146,6 +148,7 @@
this.transformDeadLetterOut = transformDeadLetterOut;
this.ignoreFields = ignoreFields;
this.useStorageWriteApi = useStorageWriteApi;
this.dialect = getDialect(spannerConfig);

Check warning on line 151 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java#L151

Added line #L151 was not covered by tests
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
Expand All @@ -169,7 +172,6 @@
public void setUp() {
seenException = false;
try {
Dialect dialect = getDialect(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
spannerTableByName =
new SpannerChangeStreamsUtils(
Expand All @@ -181,7 +183,7 @@
"Caught exception when setting up FailsafeModJsonToTableRowFn, message: %s,"
+ " cause: %s",
Optional.ofNullable(e.getMessage()), e.getCause()));
seenException = true;
throw new RuntimeException(e);

Check warning on line 186 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java#L186

Added line #L186 was not covered by tests
}
setUpCallContextConfigurator();
}
Expand Down Expand Up @@ -252,6 +254,19 @@
}
String spannerTableName = mod.getTableName();
TrackedSpannerTable spannerTable;
com.google.cloud.Timestamp spannerCommitTimestamp =
com.google.cloud.Timestamp.ofTimeSecondsAndNanos(
mod.getCommitTimestampSeconds(), mod.getCommitTimestampNanos());

Check warning on line 259 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java#L257-L259

Added lines #L257 - L259 were not covered by tests

// Detect schema updates (newly added tables/columns) from mod and propagate changes into
// spannerTableByName which stores schema information by table name.
// Not able to get schema update from DELETE mods as they have empty newValuesJson.
if (mod.getModType() != ModType.DELETE) {
spannerTableByName =
SchemaUpdateUtils.updateStoredSchemaIfNeeded(

Check warning on line 266 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java#L265-L266

Added lines #L265 - L266 were not covered by tests
spannerAccessor, spannerChangeStream, dialect, mod, spannerTableByName);
}

try {
spannerTable = checkStateNotNull(spannerTableByName.get(spannerTableName));

Expand All @@ -262,9 +277,6 @@
LOG.error(errorMessage);
throw new RuntimeException(errorMessage, e);
}
com.google.cloud.Timestamp spannerCommitTimestamp =
com.google.cloud.Timestamp.ofTimeSecondsAndNanos(
mod.getCommitTimestampSeconds(), mod.getCommitTimestampNanos());

// Set metadata fields of the tableRow.
TableRow tableRow = new TableRow();
Expand All @@ -277,32 +289,17 @@
useStorageWriteApi);
JSONObject keysJsonObject = new JSONObject(mod.getKeysJson());
// Set Spanner key columns of the tableRow.
for (TrackedSpannerColumn spannerColumn : spannerTable.getPkColumns()) {
String spannerColumnName = spannerColumn.getName();
if (keysJsonObject.has(spannerColumnName)) {
tableRow.set(spannerColumnName, keysJsonObject.get(spannerColumnName));
} else {
String errorMessage =
String.format(
"Caught exception when setting key column of the tableRow: Cannot find value"
+ " for key column %s",
spannerColumnName);
LOG.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
}

// For "DELETE" mod, we only need to set the key columns.
if (mod.getModType() == ModType.DELETE) {
return tableRow;
}
SpannerToBigQueryUtils.addSpannerPkColumnsToTableRow(
keysJsonObject, spannerTable.getPkColumns(), tableRow);

Check warning on line 293 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java#L292-L293

Added lines #L292 - L293 were not covered by tests

// Set non-key columns of the tableRow.
SpannerToBigQueryUtils.addSpannerNonPkColumnsToTableRow(
mod.getNewValuesJson(), spannerTable.getNonPkColumns(), tableRow);
mod.getNewValuesJson(), spannerTable.getNonPkColumns(), tableRow, mod.getModType());

Check warning on line 297 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java#L297

Added line #L297 was not covered by tests

// For "INSERT" mod, we can get all columns from mod.
if (mod.getModType() == ModType.INSERT) {
// For "DELETE" mod, we only set the key columns. For all non-key columns, we already
// populated "null".
if (mod.getModType() == ModType.INSERT || mod.getModType() == ModType.DELETE) {
return tableRow;
}

Expand Down Expand Up @@ -383,8 +380,7 @@
return tableRow;
}

// Do a Spanner read to retrieve full row. The schema change is currently not supported. so we
// assume the schema isn't changed while the pipeline is running,
// Do a Spanner read to retrieve full row. Schema can change while the pipeline is running.
private void readSpannerRow(
String spannerTableName,
com.google.cloud.spanner.Key key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToBigQueryOptions;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.ModColumnType;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.OptionsUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
Expand All @@ -35,6 +36,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -209,7 +211,8 @@
.equals(BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME)) {
throw new IllegalArgumentException(
String.format(
"bigQueryChangelogTableNameTemplate cannot be set to '{%s}'. This value is reserved for the Cloud Spanner table name.",
"bigQueryChangelogTableNameTemplate cannot be set to '{%s}'. This value is reserved"
+ " for the Cloud Spanner table name.",
BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME));
}

Expand Down Expand Up @@ -386,19 +389,39 @@
.setBigQueryTableTemplate(options.getBigQueryChangelogTableNameTemplate())
.setUseStorageWriteApi(options.getUseStorageWriteApi())
.build();
WriteResult writeResult =
tableRowTuple
.get(failsafeModJsonToTableRow.transformOut)
.apply(
"Write To BigQuery",
BigQueryIO.<TableRow>write()
.to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
.withFormatFunction(element -> removeIntermediateMetadataFields(element))
.withFormatRecordOnFailureFunction(element -> element)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
WriteResult writeResult;
if (!options.getUseStorageWriteApi()) {
writeResult =

Check warning on line 394 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java#L394

Added line #L394 was not covered by tests
tableRowTuple
.get(failsafeModJsonToTableRow.transformOut)
.apply(

Check warning on line 397 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java#L396-L397

Added lines #L396 - L397 were not covered by tests
"Write To BigQuery",
BigQueryIO.<TableRow>write()
.to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
.withFormatFunction(element -> removeIntermediateMetadataFields(element))
.withFormatRecordOnFailureFunction(element -> element)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

Check warning on line 406 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java#L399-L406

Added lines #L399 - L406 were not covered by tests
} else {
writeResult =

Check warning on line 408 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java#L408

Added line #L408 was not covered by tests
tableRowTuple
.get(failsafeModJsonToTableRow.transformOut)
.apply(

Check warning on line 411 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java#L410-L411

Added lines #L410 - L411 were not covered by tests
"Write To BigQuery",
BigQueryIO.<TableRow>write()
.to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
.withFormatFunction(element -> removeIntermediateMetadataFields(element))
.withFormatRecordOnFailureFunction(element -> element)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.ignoreUnknownValues()
.withAutoSchemaUpdate(true) // only supported when using STORAGE_WRITE_API or

Check warning on line 420 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java#L413-L420

Added lines #L413 - L420 were not covered by tests
// STORAGE_API_AT_LEAST_ONCE.
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

Check warning on line 423 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java#L422-L423

Added lines #L422 - L423 were not covered by tests
}

PCollection<String> transformDlqJson =
tableRowTuple
Expand Down Expand Up @@ -471,6 +494,8 @@
for (String rowKey : rowKeys) {
if (metadataFields.contains(rowKey)) {
cleanTableRow.remove(rowKey);
} else if (rowKeys.contains("_type_" + rowKey)) {
cleanTableRow.remove("_type_" + rowKey);

Check warning on line 498 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java#L498

Added line #L498 was not covered by tests
}
}

Expand All @@ -496,6 +521,7 @@
input.isLastRecordInTransactionInPartition(),
input.getRecordSequence(),
input.getTableName(),
input.getRowType().stream().map(ModColumnType::new).collect(Collectors.toList()),

Check warning on line 524 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java#L524

Added line #L524 was not covered by tests
input.getModType(),
input.getValueCaptureType(),
input.getNumberOfRecordsInTransaction(),
Expand Down
Loading
Loading