Skip to content

Commit

Permalink
fix(samples): removed env variables and buckets from creating bq. (#526)
Browse files Browse the repository at this point in the history
* Removed env variables and buckets from creating BQ.

* Fix: removed buckets from BQ import

* pr fix: imports.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* pr fix: fixed test.

* pr fix: added comment.

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
dfirova and gcf-owl-bot[bot] authored Sep 16, 2022
1 parent e4170e8 commit eeb68fa
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.FileReader;
import java.io.IOException;
import java.util.stream.Collectors;
import product.setup.ProductsCreateBigqueryTable;

public class EventsCreateBigQueryTable {

Expand All @@ -35,10 +36,11 @@ public static void main(String[] args) throws IOException {
String validEventsTable = "events";
String invalidEventsTable = "events_some_invalid";
String eventsSchemaFilePath = "src/main/resources/events_schema.json";
// user_events.json and user_events_some_invalid.json are located in the resources folder
String validEventsSourceFile =
String.format("gs://%s/user_events.json", System.getenv("EVENTS_BUCKET_NAME"));
ProductsCreateBigqueryTable.class.getResource("/user_events.json").getPath();
String invalidEventsSourceFile =
String.format("gs://%s/user_events_some_invalid.json", System.getenv("EVENTS_BUCKET_NAME"));
ProductsCreateBigqueryTable.class.getResource("/user_events_some_invalid.json").getPath();

BufferedReader bufferedReader = new BufferedReader(new FileReader(eventsSchemaFilePath));
String jsonToString = bufferedReader.lines().collect(Collectors.joining());
Expand All @@ -48,8 +50,8 @@ public static void main(String[] args) throws IOException {

createBqDataset(dataset);
createBqTable(dataset, validEventsTable, eventsSchema);
uploadDataToBqTable(dataset, validEventsTable, validEventsSourceFile, eventsSchema);
uploadDataToBqTable(dataset, validEventsTable, validEventsSourceFile);
createBqTable(dataset, invalidEventsTable, eventsSchema);
uploadDataToBqTable(dataset, invalidEventsTable, invalidEventsSourceFile, eventsSchema);
uploadDataToBqTable(dataset, invalidEventsTable, invalidEventsSourceFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public static void main(String[] args) throws IOException {
String invalidProductsTable = "products_some_invalid";
String productSchemaFilePath = "src/main/resources/product_schema.json";
String validProductsSourceFile =
String.format("gs://%s/products.json", System.getenv("BUCKET_NAME"));
ProductsCreateBigqueryTable.class.getResource("/products.json").getPath();
String invalidProductsSourceFile =
String.format("gs://%s/products_some_invalid.json", System.getenv("BUCKET_NAME"));
ProductsCreateBigqueryTable.class.getResource("products_some_invalid.json").getPath();

BufferedReader bufferedReader = new BufferedReader(new FileReader(productSchemaFilePath));
String jsonToString = bufferedReader.lines().collect(Collectors.joining());
Expand All @@ -48,8 +48,8 @@ public static void main(String[] args) throws IOException {

createBqDataset(dataset);
createBqTable(dataset, validProductsTable, productSchema);
uploadDataToBqTable(dataset, validProductsTable, validProductsSourceFile, productSchema);
uploadDataToBqTable(dataset, validProductsTable, validProductsSourceFile);
createBqTable(dataset, invalidProductsTable, productSchema);
uploadDataToBqTable(dataset, invalidProductsTable, invalidProductsSourceFile, productSchema);
uploadDataToBqTable(dataset, invalidProductsTable, invalidProductsSourceFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.cloud.retail.v2.CreateProductRequest;
import com.google.cloud.retail.v2.DeleteProductRequest;
import com.google.cloud.retail.v2.FulfillmentInfo;
Expand Down Expand Up @@ -71,6 +72,8 @@
import com.google.protobuf.Int32Value;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
Expand Down Expand Up @@ -349,27 +352,37 @@ public static void createBqTable(String datasetName, String tableName, Schema sc
}
}

public static void uploadDataToBqTable(
String datasetName, String tableName, String sourceUri, Schema schema) {
public static void uploadDataToBqTable(String datasetName, String tableName, String sourceUri) {
try {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableId tableId = TableId.of(datasetName, tableName);
LoadJobConfiguration loadConfig =
LoadJobConfiguration.newBuilder(tableId, sourceUri)

WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId)
.setFormatOptions(FormatOptions.json())
.setSchema(schema)
.build();
Job job = bigquery.create(JobInfo.of(loadConfig));
job = job.waitFor();

String jobName = "jobId_" + UUID.randomUUID();
JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();

try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
OutputStream stream = Channels.newOutputStream(writer)) {
Files.copy(Paths.get(sourceUri), stream);
}

Job job = bigquery.getJob(jobId);
Job completedJob = job.waitFor();
if (job.isDone()) {
System.out.printf("Json from GCS successfully loaded in a table '%s'.%n", tableName);
System.out.printf("Json successfully loaded in a table '%s'.%n", tableName);
} else {
System.out.println(
"BigQuery was unable to load into the table due to an error:"
+ job.getStatus().getError());
}
} catch (BigQueryException | InterruptedException e) {
System.out.printf("Column not added during load append: %s%n", e.getMessage());
} catch (IOException e) {
System.out.printf("Error copying file: %s%n", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void setUp() throws IOException, InterruptedException, ExecutionException
public void testAddFulfillment() {
String outputResult = bout.toString();

assertThat(outputResult).contains("Add fulfilment places with current date");
assertThat(outputResult).contains("Add fulfilment places");
assertThat(outputResult).contains("Add fulfillment places, wait 45 seconds");
}

Expand Down

0 comments on commit eeb68fa

Please sign in to comment.