Skip to content

Commit

Permalink
Merge pull request #297 from GoogleCloudPlatform/tswast-bq-refactor
Browse files Browse the repository at this point in the history
Refactor BigQuery samples.
  • Loading branch information
tswast authored Aug 17, 2016
2 parents a9ba257 + a9451b5 commit b54ad45
Show file tree
Hide file tree
Showing 20 changed files with 261 additions and 374 deletions.
9 changes: 4 additions & 5 deletions bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
<dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client</artifactId>
<version>${project.oauth.version}</version>
<version>1.21.0</version>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-jackson2</artifactId>
<version>${project.http.version}</version>
<version>1.21.0</version>
</dependency>
<dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client-jetty</artifactId>
<version>${project.oauth.version}</version>
<version>1.21.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
Expand All @@ -48,6 +48,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -59,8 +60,6 @@
</dependencies>

<properties>
<project.http.version>1.21.0</project.http.version>
<project.oauth.version>1.21.0</project.oauth.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.bigquery.samples;


import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Jobs.GetQueryResults;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
Expand All @@ -28,11 +26,10 @@
import java.util.Iterator;
import java.util.Scanner;


/**
* Example of authorizing with BigQuery and reading from a public dataset.
*/
public class AsyncQuerySample extends BigqueryUtils {
public class AsyncQuerySample {
// [START main]
/**
* Prompts for all the parameters required to make a query.
Expand All @@ -41,23 +38,20 @@ public class AsyncQuerySample extends BigqueryUtils {
* @throws IOException IOException
* @throws InterruptedException InterruptedException
*/
public static void main(final String[] args)
throws IOException, InterruptedException {
public static void main(final String[] args) throws IOException, InterruptedException {
Scanner scanner = new Scanner(System.in);
System.out.println("Enter your project id: ");
String projectId = scanner.nextLine();
System.out.println("Enter your query string: ");
String queryString = scanner.nextLine();
System.out.println("Run query in batch mode? [true|false] ");
boolean batch = Boolean.valueOf(scanner.nextLine());
System.out.println("Enter how often to check if your job is complete "
+ "(milliseconds): ");
System.out.println("Enter how often to check if your job is complete " + "(milliseconds): ");
long waitTime = scanner.nextLong();
scanner.close();
Iterator<GetQueryResultsResponse> pages = run(projectId, queryString,
batch, waitTime);
Iterator<GetQueryResultsResponse> pages = run(projectId, queryString, batch, waitTime);
while (pages.hasNext()) {
printRows(pages.next().getRows(), System.out);
BigQueryUtils.printRows(pages.next().getRows(), System.out);
}
}
// [END main]
Expand All @@ -70,58 +64,53 @@ public static void main(final String[] args)
* @param queryString Query we want to run against BigQuery
* @param batch True if you want to batch the queries
* @param waitTime How long to wait before retries
* @return An interator to the result of your pages
* @return An iterator to the result of your pages
* @throws IOException Thrown if there's an IOException
* @throws InterruptedException Thrown if there's an Interrupted Exception
*/
public static Iterator<GetQueryResultsResponse> run(final String projectId,
final String queryString,
final boolean batch,
final long waitTime)
public static Iterator<GetQueryResultsResponse> run(
final String projectId, final String queryString, final boolean batch, final long waitTime)
throws IOException, InterruptedException {

Bigquery bigquery = BigqueryServiceFactory.getService();
Bigquery bigquery = BigQueryServiceFactory.getService();

Job query = asyncQuery(bigquery, projectId, queryString, batch);
Bigquery.Jobs.Get getRequest = bigquery.jobs().get(
projectId, query.getJobReference().getJobId());
Bigquery.Jobs.Get getRequest =
bigquery.jobs().get(projectId, query.getJobReference().getJobId());

//Poll every waitTime milliseconds,
//retrying at most retries times if there are errors
pollJob(getRequest, waitTime);
// Poll every waitTime milliseconds,
// retrying at most retries times if there are errors
BigQueryUtils.pollJob(getRequest, waitTime);

GetQueryResults resultsRequest = bigquery.jobs().getQueryResults(
projectId, query.getJobReference().getJobId());
GetQueryResults resultsRequest =
bigquery.jobs().getQueryResults(projectId, query.getJobReference().getJobId());

return getPages(resultsRequest);
return BigQueryUtils.getPages(resultsRequest);
}
// [END run]

// [START asyncQuery]
/**
* Inserts an asynchronous query Job for a particular query.
*
* @param bigquery an authorized BigQuery client
* @param bigquery an authorized BigQuery client
* @param projectId a String containing the project ID
* @param querySql the actual query string
* @param batch True if you want to run the query as BATCH
* @return a reference to the inserted query job
* @throws IOException Thrown if there's a network exception
*/
public static Job asyncQuery(final Bigquery bigquery,
final String projectId,
final String querySql,
final boolean batch) throws IOException {
public static Job asyncQuery(
final Bigquery bigquery, final String projectId, final String querySql, final boolean batch)
throws IOException {

JobConfigurationQuery queryConfig = new JobConfigurationQuery()
.setQuery(querySql);
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(querySql);

if (batch) {
queryConfig.setPriority("BATCH");
}

Job job = new Job().setConfiguration(
new JobConfiguration().setQuery(queryConfig));
Job job = new Job().setConfiguration(new JobConfiguration().setQuery(queryConfig));

return bigquery.jobs().insert(projectId, job).execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@
import java.util.Collection;

/**
* This class creates our Service to connect to Bigquery including auth.
* This class creates our Service to connect to BigQuery including auth.
*/
public final class BigqueryServiceFactory {
public final class BigQueryServiceFactory {

/**
* Private constructor to disable creation of this utility Factory class.
*/
private BigqueryServiceFactory() {

}
private BigQueryServiceFactory() {}

/**
* Singleton service used through the app.
Expand All @@ -52,9 +50,9 @@ private BigqueryServiceFactory() {
private static Object serviceLock = new Object();

/**
* Threadsafe Factory that provides an authorized Bigquery service.
* @return The Bigquery service
* @throws IOException Thronw if there is an error connecting to Bigquery.
* Threadsafe Factory that provides an authorized BigQuery service.
* @return The BigQuery service
* @throws IOException Throw if there is an error connecting to BigQuery.
*/
public static Bigquery getService() throws IOException {
if (service == null) {
Expand All @@ -68,7 +66,7 @@ public static Bigquery getService() throws IOException {
}

/**
* Creates an authorized client to Google Bigquery.
* Creates an authorized client to Google BigQuery.
*
* @return The BigQuery Service
* @throws IOException Thrown if there is an error connecting
Expand All @@ -78,18 +76,19 @@ private static Bigquery createAuthorizedClient() throws IOException {
// Create the credential
HttpTransport transport = new NetHttpTransport();
JsonFactory jsonFactory = new JacksonFactory();
GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);

// Depending on the environment that provides the default credentials (e.g. Compute Engine, App
// Engine), the credentials may require us to specify the scopes we need explicitly.
// Check for this case, and inject the Bigquery scope if required.
// Check for this case, and inject the BigQuery scope if required.
if (credential.createScopedRequired()) {
Collection<String> bigqueryScopes = BigqueryScopes.all();
credential = credential.createScoped(bigqueryScopes);
}

return new Bigquery.Builder(transport, jsonFactory, credential)
.setApplicationName("BigQuery Samples").build();
.setApplicationName("BigQuery Samples")
.build();
}
// [END get_service]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@
/**
* Helper functions for the other classes.
*/
public class BigqueryUtils {
public class BigQueryUtils {

/**
* Private contructor to prevent creation of this class, which is just all
* Private constructor to prevent creation of this class, which is just all
* static helper methods.
*/
protected BigqueryUtils() {
}
private BigQueryUtils() {}

/**
* Print rows to the output stream in a formatted way.
Expand Down Expand Up @@ -76,9 +75,8 @@ public static Job pollJob(final Bigquery.Jobs.Get request, final long interval)
throws IOException, InterruptedException {
Job job = request.execute();
while (!job.getStatus().getState().equals("DONE")) {
System.out.println("Job is "
+ job.getStatus().getState()
+ " waiting " + interval + " milliseconds...");
System.out.println(
"Job is " + job.getStatus().getState() + " waiting " + interval + " milliseconds...");
Thread.sleep(interval);
job = request.execute();
}
Expand Down Expand Up @@ -165,9 +163,10 @@ public void remove() {
public static TableSchema loadSchema(final Reader schemaSource) {
TableSchema sourceSchema = new TableSchema();

List<TableFieldSchema> fields = (new Gson())
.<List<TableFieldSchema>>fromJson(schemaSource,
(new ArrayList<TableFieldSchema>()).getClass());
List<TableFieldSchema> fields =
(new Gson())
.<List<TableFieldSchema>>fromJson(
schemaSource, (new ArrayList<TableFieldSchema>()).getClass());

sourceSchema.setFields(fields);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Sample of how to Export Cloud Data.
*/
public class ExportDataCloudStorageSample {
public class ExportDataCloudStorageSample {
/**
* Protected constructor since this is a collection of static functions.
*/
Expand All @@ -42,20 +42,17 @@ protected ExportDataCloudStorageSample() {
* @throws InterruptedException Should never be thrown.
*/
// [START main]
public static void main(final String[] args)
throws IOException, InterruptedException {
public static void main(final String[] args) throws IOException, InterruptedException {
Scanner scanner = new Scanner(System.in);
System.out.println("Enter your project id: ");
String projectId = scanner.nextLine();
System.out.println("Enter your dataset id: ");
String datasetId = scanner.nextLine();
System.out.println("Enter your table id: ");
String tableId = scanner.nextLine();
System.out.println("Enter the Google Cloud Storage Path to which you'd "
+ "like to export: ");
System.out.println("Enter the Google Cloud Storage Path to which you'd " + "like to export: ");
String cloudStoragePath = scanner.nextLine();
System.out.println("Enter how often to check if your job is complete "
+ "(milliseconds): ");
System.out.println("Enter how often to check if your job is complete " + "(milliseconds): ");
long interval = scanner.nextLong();
scanner.close();

Expand All @@ -79,30 +76,33 @@ public static void run(
final String projectId,
final String datasetId,
final String tableId,
final long interval) throws IOException, InterruptedException {
final long interval)
throws IOException, InterruptedException {

Bigquery bigquery = BigqueryServiceFactory.getService();
Bigquery bigquery = BigQueryServiceFactory.getService();

Job extractJob = extractJob(
bigquery,
cloudStoragePath,
new TableReference()
.setProjectId(projectId)
.setDatasetId(datasetId)
.setTableId(tableId));
Job extractJob =
extractJob(
bigquery,
cloudStoragePath,
new TableReference()
.setProjectId(projectId)
.setDatasetId(datasetId)
.setTableId(tableId));

Bigquery.Jobs.Get getJob = bigquery.jobs().get(
extractJob.getJobReference().getProjectId(),
extractJob.getJobReference().getJobId());
Bigquery.Jobs.Get getJob =
bigquery
.jobs()
.get(
extractJob.getJobReference().getProjectId(),
extractJob.getJobReference().getJobId());

BigqueryUtils.pollJob(getJob, interval);
BigQueryUtils.pollJob(getJob, interval);

System.out.println("Export is Done!");

}
// [END run]


/**
* A job that extracts data from a table.
* @param bigquery Bigquery service to use
Expand All @@ -113,16 +113,17 @@ public static void run(
*/
// [START extract_job]
public static Job extractJob(
final Bigquery bigquery,
final String cloudStoragePath,
final TableReference table) throws IOException {
final Bigquery bigquery, final String cloudStoragePath, final TableReference table)
throws IOException {

JobConfigurationExtract extract = new JobConfigurationExtract()
.setSourceTable(table)
.setDestinationUri(cloudStoragePath);
JobConfigurationExtract extract =
new JobConfigurationExtract().setSourceTable(table).setDestinationUri(cloudStoragePath);

return bigquery.jobs().insert(table.getProjectId(),
new Job().setConfiguration(new JobConfiguration().setExtract(extract)))
return bigquery
.jobs()
.insert(
table.getProjectId(),
new Job().setConfiguration(new JobConfiguration().setExtract(extract)))
.execute();
}
// [END extract_job]
Expand Down
Loading

0 comments on commit b54ad45

Please sign in to comment.