Skip to content

Commit

Permalink
Add debug logs for updated schema in BigQueryIO (#28189)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn authored Aug 30, 2023
1 parent 100c06a commit e5ed7aa
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ PendingJobManager addPendingJob(
}

void waitForDone() throws Exception {
LOG.info("Waiting for jobs to complete.");
LOG.debug("Waiting for BigQuery jobs to complete.");
Sleeper sleeper = Sleeper.DEFAULT;
while (!pendingJobs.isEmpty()) {
List<JobInfo> retryJobs = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,11 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u
WriteStream writeStream =
Preconditions.checkStateNotNull(maybeDatasetService).getWriteStream(streamName);
if (writeStream != null && writeStream.hasTableSchema()) {
currentSchema.set(writeStream.getTableSchema());
TableSchema updatedFromStream = writeStream.getTableSchema();
currentSchema.set(updatedFromStream);
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream);
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,8 @@ public void process(
newSchema.get(), appendClientInfo.get().getCloseAppendClient(), false));
APPEND_CLIENTS.invalidate(element.getKey());
APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableId, updatedSchemaReturned);
updatedSchema.write(newSchema.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public void processElement(
if (firstPaneCreateDisposition == CreateDisposition.CREATE_NEVER) {
tableSchema = null;
} else if (jsonSchemas.containsKey(destination)) {
// tableSchema for the destination stored in cache (jsonSchemas)
tableSchema =
BigQueryHelpers.fromJsonString(jsonSchemas.get(destination), TableSchema.class);
} else {
Expand All @@ -215,6 +216,7 @@ public void processElement(
firstPaneCreateDisposition,
dynamicDestinations,
destination);
LOG.debug("Fetched TableSchema for table {}:\n\t{}", destination, tableSchema);
jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema));
}

Expand Down

0 comments on commit e5ed7aa

Please sign in to comment.