Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove JobInfo hierarchy, add JobConfiguration hierarchy #584

Merged
merged 6 commits into from
Jan 29, 2016
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ import com.google.gcloud.bigquery.BigQuery;
import com.google.gcloud.bigquery.BigQueryOptions;
import com.google.gcloud.bigquery.Field;
import com.google.gcloud.bigquery.JobStatus;
import com.google.gcloud.bigquery.LoadJobInfo;
import com.google.gcloud.bigquery.JobInfo;
import com.google.gcloud.bigquery.Schema;
import com.google.gcloud.bigquery.TableId;
import com.google.gcloud.bigquery.TableInfo;
Expand All @@ -144,7 +144,8 @@ if (info == null) {
bigquery.create(TableInfo.of(tableId, Schema.of(integerField)));
} else {
System.out.println("Loading data into table " + tableId);
LoadJobInfo loadJob = LoadJobInfo.of(tableId, "gs://bucket/path");
LoadJobConfiguration configuration = LoadJobConfiguration.of(tableId, "gs://bucket/path");
JobInfo loadJob = JobInfo.of(configuration);
loadJob = bigquery.create(loadJob);
while (loadJob.status().state() != JobStatus.State.DONE) {
Thread.sleep(1000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ protected final boolean baseEquals(BaseTableInfo tableInfo) {
return Objects.equals(toPb(), tableInfo.toPb());
}

BaseTableInfo setProjectId(String projectId) {
return toBuilder().tableId(tableId().setProjectId(projectId)).build();
}

Table toPb() {
Table tablePb = new Table();
tablePb.setTableReference(tableId.toPb());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ public static JobListOption startPageToken(String pageToken) {
* is not provided all job's fields are returned. {@code JobOption.fields()} can be used to
* specify only the fields of interest. {@link JobInfo#jobId()}, {@link JobStatus#state()},
* {@link JobStatus#error()} as well as type-specific configuration (e.g.
* {@link QueryJobInfo#query()} for Query Jobs) are always returned, even if not specified.
* {@link JobField#SELF_LINK} and {@link JobField#ETAG} can not be selected when listing jobs.
* {@link QueryJobConfiguration#query()} for Query Jobs) are always returned, even if not
* specified. {@link JobField#SELF_LINK} and {@link JobField#ETAG} can not be selected when
* listing jobs.
*/
public static JobListOption fields(JobField... fields) {
String selector = JobField.selector(fields);
Expand All @@ -397,8 +398,8 @@ private JobOption(BigQueryRpc.Option option, Object value) {
* Returns an option to specify the job's fields to be returned by the RPC call. If this option
* is not provided all job's fields are returned. {@code JobOption.fields()} can be used to
* specify only the fields of interest. {@link JobInfo#jobId()} as well as type-specific
* configuration (e.g. {@link QueryJobInfo#query()} for Query Jobs) are always returned, even if
* not specified.
* configuration (e.g. {@link QueryJobConfiguration#query()} for Query Jobs) are always
* returned, even if not specified.
*/
public static JobOption fields(JobField... fields) {
return new JobOption(BigQueryRpc.Option.FIELDS, JobField.selector(fields));
Expand Down Expand Up @@ -470,7 +471,7 @@ public static QueryResultsOption maxWaitTime(long maxWaitTime) {
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T create(T job, JobOption... options) throws BigQueryException;
JobInfo create(JobInfo job, JobOption... options) throws BigQueryException;

/**
* Returns the requested dataset or {@code null} if not found.
Expand Down Expand Up @@ -611,14 +612,14 @@ Page<List<FieldValue>> listTableData(TableId tableId, TableDataListOption... opt
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T getJob(String jobId, JobOption... options) throws BigQueryException;
JobInfo getJob(String jobId, JobOption... options) throws BigQueryException;

/**
* Returns the requested job or {@code null} if not found.
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T getJob(JobId jobId, JobOption... options) throws BigQueryException;
JobInfo getJob(JobId jobId, JobOption... options) throws BigQueryException;

/**
* Lists the jobs.
Expand Down Expand Up @@ -665,9 +666,9 @@ Page<List<FieldValue>> listTableData(TableId tableId, TableDataListOption... opt

/**
* Returns a channel to write data to be inserted into a BigQuery table. Data format and other
* options can be configured using the {@link LoadConfiguration} parameter.
* options can be configured using the {@link WriteChannelConfiguration} parameter.
*
* @throws BigQueryException upon failure
*/
TableDataWriteChannel writer(LoadConfiguration loadConfiguration);
TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest.Rows;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -159,7 +158,7 @@ public QueryResult nextPage() {
@Override
public DatasetInfo create(DatasetInfo dataset, DatasetOption... options)
throws BigQueryException {
final Dataset datasetPb = setProjectId(dataset).toPb();
final Dataset datasetPb = dataset.setProjectId(options().projectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
return DatasetInfo.fromPb(runWithRetries(new Callable<Dataset>() {
Expand All @@ -176,7 +175,7 @@ public Dataset call() {
@Override
public <T extends BaseTableInfo> T create(T table, TableOption... options)
throws BigQueryException {
final Table tablePb = setProjectId(table).toPb();
final Table tablePb = table.setProjectId(options().projectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
return BaseTableInfo.fromPb(runWithRetries(new Callable<Table>() {
Expand All @@ -191,8 +190,8 @@ public Table call() {
}

@Override
public <T extends JobInfo> T create(T job, JobOption... options) throws BigQueryException {
final Job jobPb = setProjectId(job).toPb();
public JobInfo create(JobInfo job, JobOption... options) throws BigQueryException {
final Job jobPb = job.setProjectId(options().projectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
return JobInfo.fromPb(runWithRetries(new Callable<Job>() {
Expand Down Expand Up @@ -295,7 +294,7 @@ public Boolean call() {
@Override
public DatasetInfo update(DatasetInfo dataset, DatasetOption... options)
throws BigQueryException {
final Dataset datasetPb = setProjectId(dataset).toPb();
final Dataset datasetPb = dataset.setProjectId(options().projectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
return DatasetInfo.fromPb(runWithRetries(new Callable<Dataset>() {
Expand All @@ -312,7 +311,7 @@ public Dataset call() {
@Override
public <T extends BaseTableInfo> T update(T table, TableOption... options)
throws BigQueryException {
final Table tablePb = setProjectId(table).toPb();
final Table tablePb = table.setProjectId(options().projectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
return BaseTableInfo.fromPb(runWithRetries(new Callable<Table>() {
Expand Down Expand Up @@ -442,12 +441,12 @@ public List<FieldValue> apply(TableRow rowPb) {
}

@Override
public <T extends JobInfo> T getJob(String jobId, JobOption... options) throws BigQueryException {
public JobInfo getJob(String jobId, JobOption... options) throws BigQueryException {
return getJob(JobId.of(jobId), options);
}

@Override
public <T extends JobInfo> T getJob(final JobId jobId, JobOption... options)
public JobInfo getJob(final JobId jobId, JobOption... options)
throws BigQueryException {
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
Expand All @@ -457,7 +456,7 @@ public Job call() {
return bigQueryRpc.getJob(jobId.job(), optionsMap);
}
}, options().retryParams(), EXCEPTION_HANDLER);
return answer == null ? null : JobInfo.<T>fromPb(answer);
return answer == null ? null : JobInfo.fromPb(answer);
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down Expand Up @@ -508,7 +507,7 @@ public QueryResponse query(final QueryRequest request) throws BigQueryException
runWithRetries(new Callable<com.google.api.services.bigquery.model.QueryResponse>() {
@Override
public com.google.api.services.bigquery.model.QueryResponse call() {
return bigQueryRpc.query(setProjectId(request).toPb());
return bigQueryRpc.query(request.setProjectId(options().projectId()).toPb());
}
}, options().retryParams(), EXCEPTION_HANDLER);
QueryResponse.Builder builder = QueryResponse.builder();
Expand Down Expand Up @@ -596,8 +595,9 @@ private static QueryResult.Builder transformQueryResults(JobId jobId, List<Table
.results(transformTableData(rowsPb));
}

public TableDataWriteChannel writer(LoadConfiguration loadConfiguration) {
return new TableDataWriteChannel(options(), setProjectId(loadConfiguration));
public TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration) {
return new TableDataWriteChannel(options(),
writeChannelConfiguration.setProjectId(options().projectId()));
}

private Map<BigQueryRpc.Option, ?> optionMap(Option... options) {
Expand All @@ -608,93 +608,4 @@ public TableDataWriteChannel writer(LoadConfiguration loadConfiguration) {
}
return optionMap;
}

private DatasetInfo setProjectId(DatasetInfo dataset) {
DatasetInfo.Builder datasetBuilder = dataset.toBuilder();
datasetBuilder.datasetId(setProjectId(dataset.datasetId()));
if (dataset.acl() != null) {
List<Acl> acls = Lists.newArrayListWithCapacity(dataset.acl().size());
for (Acl acl : dataset.acl()) {
if (acl.entity().type() == Acl.Entity.Type.VIEW) {
Dataset.Access accessPb = acl.toPb();
TableReference viewReferencePb = accessPb.getView();
if (viewReferencePb.getProjectId() == null) {
viewReferencePb.setProjectId(options().projectId());
}
acls.add(Acl.of(new Acl.View(TableId.fromPb(viewReferencePb))));
} else {
acls.add(acl);
}
}
datasetBuilder.acl(acls);
}
return datasetBuilder.build();
}

private DatasetId setProjectId(DatasetId dataset) {
return dataset.project() != null ? dataset
: DatasetId.of(options().projectId(), dataset.dataset());
}

private BaseTableInfo setProjectId(BaseTableInfo table) {
return table.toBuilder().tableId(setProjectId(table.tableId())).build();
}

private TableId setProjectId(TableId table) {
return table.project() != null ? table
: TableId.of(options().projectId(), table.dataset(), table.table());
}

private JobInfo setProjectId(JobInfo job) {
if (job instanceof CopyJobInfo) {
CopyJobInfo copyJob = (CopyJobInfo) job;
CopyJobInfo.Builder copyBuilder = copyJob.toBuilder();
copyBuilder.destinationTable(setProjectId(copyJob.destinationTable()));
copyBuilder.sourceTables(
Lists.transform(copyJob.sourceTables(), new Function<TableId, TableId>() {
@Override
public TableId apply(TableId tableId) {
return setProjectId(tableId);
}
}));
return copyBuilder.build();
}
if (job instanceof QueryJobInfo) {
QueryJobInfo queryJob = (QueryJobInfo) job;
QueryJobInfo.Builder queryBuilder = queryJob.toBuilder();
if (queryJob.destinationTable() != null) {
queryBuilder.destinationTable(setProjectId(queryJob.destinationTable()));
}
if (queryJob.defaultDataset() != null) {
queryBuilder.defaultDataset(setProjectId(queryJob.defaultDataset()));
}
return queryBuilder.build();
}
if (job instanceof ExtractJobInfo) {
ExtractJobInfo extractJob = (ExtractJobInfo) job;
ExtractJobInfo.Builder extractBuilder = extractJob.toBuilder();
extractBuilder.sourceTable(setProjectId(extractJob.sourceTable()));
return extractBuilder.build();
}
if (job instanceof LoadJobInfo) {
LoadJobInfo loadJob = (LoadJobInfo) job;
LoadJobInfo.Builder loadBuilder = loadJob.toBuilder();
return loadBuilder.configuration(setProjectId(loadJob.configuration())).build();
}
return job;
}

private QueryRequest setProjectId(QueryRequest request) {
QueryRequest.Builder builder = request.toBuilder();
if (request.defaultDataset() != null) {
builder.defaultDataset(setProjectId(request.defaultDataset()));
}
return builder.build();
}

private LoadConfiguration setProjectId(LoadConfiguration configuration) {
LoadConfiguration.Builder builder = configuration.toBuilder();
builder.destinationTable(setProjectId(configuration.destinationTable()));
return builder.build();
}
}
Loading