diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 6815d978f88b5..f4120a013ed65 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -126,7 +126,7 @@ PendingJobManager addPendingJob( } void waitForDone() throws Exception { - LOG.info("Waiting for jobs to complete."); + LOG.debug("Waiting for BigQuery jobs to complete."); Sleeper sleeper = Sleeper.DEFAULT; while (!pendingJobs.isEmpty()) { List retryJobs = Lists.newArrayList(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 3ac5140f73ff8..f54522c2ca59b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -430,8 +430,11 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u WriteStream writeStream = Preconditions.checkStateNotNull(maybeDatasetService).getWriteStream(streamName); if (writeStream != null && writeStream.hasTableSchema()) { - currentSchema.set(writeStream.getTableSchema()); + TableSchema updatedFromStream = writeStream.getTableSchema(); + currentSchema.set(updatedFromStream); updated.set(true); + LOG.debug( + "Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream); } } return null; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index cf7de067e1536..efcf87eac7a32 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -818,6 +818,8 @@ public void process( newSchema.get(), appendClientInfo.get().getCloseAppendClient(), false)); APPEND_CLIENTS.invalidate(element.getKey()); APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get()); + LOG.debug( + "Fetched updated schema for table {}:\n\t{}", tableId, updatedSchemaReturned); updatedSchema.write(newSchema.get()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index fdd37b079c610..c6a7d32e24864 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -202,6 +202,7 @@ public void processElement( if (firstPaneCreateDisposition == CreateDisposition.CREATE_NEVER) { tableSchema = null; } else if (jsonSchemas.containsKey(destination)) { + // tableSchema for the destination stored in cache (jsonSchemas) tableSchema = BigQueryHelpers.fromJsonString(jsonSchemas.get(destination), TableSchema.class); } else { @@ -215,6 +216,7 @@ public void processElement( firstPaneCreateDisposition, dynamicDestinations, destination); + LOG.debug("Fetched TableSchema for table {}:\n\t{}", destination, tableSchema); jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema)); }