Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add integration tests with RetrySettings enabled. #2275

Merged
merged 32 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
408f82e
Add integration tests with RetrySettings enabled.
Oct 13, 2023
ab069b5
Fix formatting
Oct 13, 2023
51f1f51
Add quota and non-quota e2e tests.
Oct 30, 2023
8c82567
Cleanup
Oct 30, 2023
d904fa6
Merge branch 'googleapis:main' into retry-tests
egreco12 Oct 31, 2023
9abfa88
Revert sample changes to keep this branch only for e2e tests
Oct 31, 2023
3bea4df
Remove additional retry-specific sample code
Oct 31, 2023
97614b1
Remove backoff-related retry settings for non quota tests
Oct 31, 2023
52cb39e
Update kokoro build to ignore retry tests until retry-specific Kokoro…
Oct 31, 2023
9e8506e
Run format
Oct 31, 2023
872acff
Wrap -Dtest args
Oct 31, 2023
610f032
Fix integration command
Oct 31, 2023
18f442c
Fix integration command
Oct 31, 2023
0bbc3ff
Remove -Dtest arg
Nov 1, 2023
f39a19d
Fix integration test to ignore retry tests
Nov 3, 2023
5401143
Use list instead of regex for ignoring retry tests when integration t…
Nov 3, 2023
5984f9f
Fix typo in integration test command
Nov 3, 2023
8f11c10
Merge branch 'googleapis:main' into retry-tests
egreco12 Nov 6, 2023
5e86a16
Merge branch 'googleapis:main' into retry-tests
egreco12 Nov 6, 2023
8a44d41
Fix ignore retry test settings
Nov 8, 2023
8432f80
Add debug logs to see why test fails in github
Nov 8, 2023
385d3ea
Remove log
Nov 8, 2023
1de97ce
Add additional retry-based logging
Nov 8, 2023
3e2ed81
Remove unused profile
Nov 8, 2023
a46d327
Add more debugging logs for connection worker test
Nov 8, 2023
4b0d1e1
rearrange builder order
Nov 8, 2023
fdf20d6
Remove debug log
Nov 8, 2023
d083ac0
Directly add streamwriter to list in connection worker pool test
Nov 8, 2023
b341de5
Merge branch 'googleapis:main' into retry-tests
egreco12 Nov 8, 2023
7e29b8e
Refactor retry tests into helper class
Nov 9, 2023
4803de7
Fix file headers
Nov 9, 2023
057cf8c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
egreco12 marked this conversation as resolved.
Show resolved Hide resolved
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery.storage.v1.it;

import static org.junit.Assert.assertFalse;

import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.threeten.bp.Duration;

/** Integration tests for BigQuery Write API. */
public class ITBigQueryWriteNonQuotaRetryTest {
egreco12 marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger LOG =
Logger.getLogger(ITBigQueryWriteNonQuotaRetryTest.class.getName());
private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";
private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test";
egreco12 marked this conversation as resolved.
Show resolved Hide resolved
GaoleMeng marked this conversation as resolved.
Show resolved Hide resolved

private static BigQueryWriteClient client;
private static BigQuery bigquery;

@BeforeClass
public static void beforeClass() throws IOException {
client = BigQueryWriteClient.create();

RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
bigquery = bigqueryHelper.getOptions().getService();
DatasetInfo datasetInfo =
DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build();
bigquery.create(datasetInfo);
LOG.info("Created test dataset: " + DATASET);
TableInfo tableInfo = TableInfo.newBuilder(
TableId.of(DATASET, TABLE),
StandardTableDefinition.of(
Schema.of(
Field.newBuilder("foo", LegacySQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build())))
.build();
bigquery.create(tableInfo);
}

@AfterClass
public static void afterClass() {
if (client != null) {
client.close();
}

if (bigquery != null) {
RemoteBigQueryHelper.forceDelete(bigquery, DATASET);
LOG.info("Deleted test dataset: " + DATASET);
}
}

@Test
public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry()
throws IOException, InterruptedException,
DescriptorValidationException {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setMaxAttempts(5)
.build();
String tableName = "CommittedRetry";
TableId tableId = TableId.of(DATASET, tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema schema = Schema.of(col1);
TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName);

WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
int totalRequest = 901;
int rowBatch = 1;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses =
new ArrayList<>(totalRequest);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch));
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
LOG.info("Waiting for request " + i);
try {
Assert.assertEquals(
allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch);
} catch (ExecutionException ex) {
Assert.fail("Unexpected error " + ex);
}
}
}
}

@Test
public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry()
throws IOException, InterruptedException,
DescriptorValidationException {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
egreco12 marked this conversation as resolved.
Show resolved Hide resolved
.build();
String tableName = "JsonTableDefaultStream";
TableFieldSchema TEST_STRING =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_str")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build();
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
Field.newBuilder(
"test_str", StandardSQLTypeName.STRING)
.build())))
.build();

bigquery.create(tableInfo);
TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName);

int totalRequest = 901;
int rowBatch = 1;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses =
new ArrayList<>(totalRequest);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableSchema)
.setIgnoreUnknownFields(true)
.setRetrySettings(retrySettings)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr));
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
LOG.info("Waiting for request " + i);
try {
assertFalse(allResponses.get(i).get().hasError());
} catch (Exception ex) {
Assert.fail("Unexpected error " + ex);
}
}
}
}
}
Loading
Loading