Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(sample): update WriteToDefaultStream sample to match best practices #1631

Merged
merged 2 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies

```Groovy
implementation platform('com.google.cloud:libraries-bom:25.1.0')
implementation platform('com.google.cloud:libraries-bom:25.2.0')

implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@
package com.example.bigquerystorage;

// [START bigquerystorage_jsonstreamwriter_default]

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;

Expand All @@ -45,36 +55,155 @@ public static void runWriteToDefaultStream()

public static void writeToDefaultStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
Table table = bigquery.getTable(datasetName, tableName);
TableName parentTable = TableName.of(projectId, datasetName, tableName);
Schema schema = table.getDefinition().getSchema();
TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);

// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) {
// Write two batches to the stream, each with 10 JSON records. A writer should be used for as
// much writes as possible. Creating a writer for just one write is an antipattern.
for (int i = 0; i < 2; i++) {
// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = new JSONObject();
record.put("test_string", String.format("record %03d-%03d", i, j));
jsonArr.put(record);

DataWriter writer = new DataWriter();
// One time initialization for the worker.
writer.initialize(parentTable);

// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
for (int i = 0; i < 2; i++) {
// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = new JSONObject();
record.put("test_string", String.format("record %03d-%03d", i, j));
jsonArr.put(record);
}

writer.append(new AppendContext(jsonArr, 0));
}

// Final cleanup for the stream during worker teardown.
writer.cleanup();
System.out.println("Appended records successfully.");
}

private static class AppendContext {

JSONArray data;
int retryCount = 0;

AppendContext(JSONArray data, int retryCount) {
this.data = data;
this.retryCount = retryCount;
}
}

private static class DataWriter {

private static final int MAX_RETRY_COUNT = 2;
private static final ImmutableList<Code> RETRIABLE_ERROR_CODES =
ImmutableList.of(Code.INTERNAL, Code.ABORTED, Code.CANCELLED);

// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
private final Object lock = new Object();
private JsonStreamWriter streamWriter;

@GuardedBy("lock")
private RuntimeException error = null;

public void initialize(TableName parentTable)
throws DescriptorValidationException, IOException, InterruptedException {
// Retrive table schema information.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
Table table = bigquery.getTable(parentTable.getDataset(), parentTable.getTable());
Schema schema = table.getDefinition().getSchema();
TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);

// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
streamWriter = JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build();
}

public void append(AppendContext appendContext)
throws DescriptorValidationException, IOException {
synchronized (this.lock) {
// If earlier appends have failed, we need to reset before continuing.
if (this.error != null) {
throw this.error;
}
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
AppendRowsResponse response = future.get();
}
System.out.println("Appended records successfully.");
} catch (ExecutionException e) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
System.out.println("Failed to append records. \n" + e.toString());
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse> future = streamWriter.append(appendContext.data);
ApiFutures.addCallback(
future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor());

// Increase the count of in-flight requests.
inflightRequestCount.register();
}

public void cleanup() {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();

// Close the connection to the server.
streamWriter.close();

// Verify that no error occurred in the stream.
synchronized (this.lock) {
if (this.error != null) {
throw this.error;
}
}
}

static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {

private final DataWriter parent;
private final AppendContext appendContext;

public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
this.parent = parent;
this.appendContext = appendContext;
}

public void onSuccess(AppendRowsResponse response) {
System.out.format("Append success\n");
done();
}

public void onFailure(Throwable throwable) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information,
// see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
Status status = Status.fromThrowable(throwable);
if (appendContext.retryCount < MAX_RETRY_COUNT
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
appendContext.retryCount++;
try {
// Since default stream appends are not ordered, we can simply retry the appends.
// Retrying with exclusive streams requires more careful consideration.
this.parent.append(appendContext);
// Mark the existing attempt as done since it's being retried.
done();
return;
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s\n", e);
}
}

synchronized (this.parent.lock) {
if (this.parent.error == null) {
StorageException storageException = Exceptions.toStorageException(throwable);
this.parent.error =
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
System.out.format("Error: %s\n", throwable);
done();
}

private void done() {
// Reduce the count of in-flight requests.
this.parent.inflightRequestCount.arriveAndDeregister();
}
}
}
}
Expand Down