diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/LoadJobConfiguration.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/LoadJobConfiguration.java index ee142be01..321d542a6 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/LoadJobConfiguration.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/LoadJobConfiguration.java @@ -22,6 +22,7 @@ import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import java.util.List; import java.util.Map; @@ -58,6 +59,10 @@ public final class LoadJobConfiguration extends JobConfiguration implements Load private final HivePartitioningOptions hivePartitioningOptions; private final String referenceFileSchemaUri; + private final List connectionProperties; + + private final Boolean createSession; + public static final class Builder extends JobConfiguration.Builder implements LoadConfiguration.Builder { @@ -83,6 +88,8 @@ public static final class Builder extends JobConfiguration.Builder connectionProperties; + private Boolean createSession; private Builder() { super(Type.LOAD); @@ -112,6 +119,8 @@ private Builder(LoadJobConfiguration loadConfiguration) { this.rangePartitioning = loadConfiguration.rangePartitioning; this.hivePartitioningOptions = loadConfiguration.hivePartitioningOptions; this.referenceFileSchemaUri = loadConfiguration.referenceFileSchemaUri; + this.connectionProperties = loadConfiguration.connectionProperties; + this.createSession = loadConfiguration.createSession; } private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) { @@ -205,6 +214,13 @@ private Builder(com.google.api.services.bigquery.model.JobConfiguration configur if (loadConfigurationPb.getReferenceFileSchemaUri() != null) { this.referenceFileSchemaUri = loadConfigurationPb.getReferenceFileSchemaUri(); } + if (loadConfigurationPb.getConnectionProperties() != null) { + + this.connectionProperties = + Lists.transform( + loadConfigurationPb.getConnectionProperties(), ConnectionProperty.FROM_PB_FUNCTION); + } + createSession = loadConfigurationPb.getCreateSession(); } @Override @@ -368,6 +384,16 @@ public Builder setReferenceFileSchemaUri(String referenceFileSchemaUri) { return this; } + public Builder setConnectionProperties(List connectionProperties) { + this.connectionProperties = ImmutableList.copyOf(connectionProperties); + return this; + } + + public Builder setCreateSession(Boolean createSession) { + this.createSession = createSession; + return this; + } + @Override public LoadJobConfiguration build() { return new LoadJobConfiguration(this); @@ -397,6 +423,8 @@ private LoadJobConfiguration(Builder builder) { this.rangePartitioning = builder.rangePartitioning; this.hivePartitioningOptions = builder.hivePartitioningOptions; this.referenceFileSchemaUri = builder.referenceFileSchemaUri; + this.connectionProperties = builder.connectionProperties; + this.createSession = builder.createSession; } @Override @@ -520,6 +548,14 @@ public String getReferenceFileSchemaUri() { return referenceFileSchemaUri; } + public List getConnectionProperties() { + return connectionProperties; + } + + public Boolean getCreateSession() { + return createSession; + } + @Override public Builder toBuilder() { return new Builder(this); @@ -548,7 +584,9 @@ ToStringHelper toStringHelper() { .add("jobTimeoutMs", jobTimeoutMs) .add("rangePartitioning", rangePartitioning) .add("hivePartitioningOptions", hivePartitioningOptions) - .add("referenceFileSchemaUri", referenceFileSchemaUri); + .add("referenceFileSchemaUri", referenceFileSchemaUri) + .add("connectionProperties", connectionProperties) + .add("createSession", createSession); } @Override @@ -654,6 +692,13 @@ com.google.api.services.bigquery.model.JobConfiguration toPb() { if (referenceFileSchemaUri != null) { loadConfigurationPb.setReferenceFileSchemaUri(referenceFileSchemaUri); } + if (connectionProperties != null) { + loadConfigurationPb.setConnectionProperties( + Lists.transform(connectionProperties, ConnectionProperty.TO_PB_FUNCTION)); + } + if (createSession != null) { + loadConfigurationPb.setCreateSession(createSession); + } jobConfiguration.setLoad(loadConfigurationPb); return jobConfiguration; diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/LoadJobConfigurationTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/LoadJobConfigurationTest.java index deed2f11b..341965fb8 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/LoadJobConfigurationTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/LoadJobConfigurationTest.java @@ -57,6 +57,8 @@ public class LoadJobConfigurationTest { private static final Schema TABLE_SCHEMA = Schema.of(FIELD_SCHEMA); private static final Boolean AUTODETECT = true; private static final Boolean USE_AVRO_LOGICAL_TYPES = true; + + private static final boolean CREATE_SESSION = true; private static final EncryptionConfiguration JOB_ENCRYPTION_CONFIGURATION = EncryptionConfiguration.newBuilder().setKmsKeyName("KMS_KEY_1").build(); private static final TimePartitioning TIME_PARTITIONING = TimePartitioning.of(Type.DAY); @@ -71,6 +73,13 @@ public class LoadJobConfigurationTest { RangePartitioning.newBuilder().setField("IntegerField").setRange(RANGE).build(); private static final String MODE = "STRING"; private static final String SOURCE_URI_PREFIX = "gs://bucket/path_to_table"; + + private static final String KEY = "session_id"; + private static final String VALUE = "session_id_1234567890"; + private static final ConnectionProperty CONNECTION_PROPERTY = + ConnectionProperty.newBuilder().setKey(KEY).setValue(VALUE).build(); + private static final List CONNECTION_PROPERTIES = + ImmutableList.of(CONNECTION_PROPERTY); private static final HivePartitioningOptions HIVE_PARTITIONING_OPTIONS = HivePartitioningOptions.newBuilder() .setMode(MODE) @@ -95,6 +104,8 @@ public class LoadJobConfigurationTest { .setRangePartitioning(RANGE_PARTITIONING) .setNullMarker("nullMarker") .setHivePartitioningOptions(HIVE_PARTITIONING_OPTIONS) + .setConnectionProperties(CONNECTION_PROPERTIES) + .setCreateSession(CREATE_SESSION) .build(); private static final DatastoreBackupOptions BACKUP_OPTIONS = @@ -253,5 +264,7 @@ private void compareLoadJobConfiguration( assertEquals(expected.getRangePartitioning(), value.getRangePartitioning()); assertEquals(expected.getNullMarker(), value.getNullMarker()); assertEquals(expected.getHivePartitioningOptions(), value.getHivePartitioningOptions()); + assertEquals(expected.getConnectionProperties(), value.getConnectionProperties()); + assertEquals(expected.getCreateSession(), value.getCreateSession()); } } diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 69c69f171..0bbe1efdd 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -3655,6 +3655,56 @@ public void testQuerySessionSupport() throws InterruptedException { assertEquals(sessionId, statisticsWithSession.getSessionInfo().getSessionId()); } + @Test + public void testLoadSessionSupport() throws InterruptedException { + // Start the session + TableId sessionTableId = TableId.of("_SESSION", "test_temp_destination_table"); + LoadJobConfiguration configuration = + LoadJobConfiguration.newBuilder( + sessionTableId, "gs://" + BUCKET + "/" + JSON_LOAD_FILE, FormatOptions.json()) + .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) + .setSchema(TABLE_SCHEMA) + .setCreateSession(true) + .build(); + Job job = bigquery.create(JobInfo.of(configuration)); + job = job.waitFor(); + assertNull(job.getStatus().getError()); + + Job loadJob = bigquery.getJob(job.getJobId()); + JobStatistics.LoadStatistics statistics = loadJob.getStatistics(); + String sessionId = statistics.getSessionInfo().getSessionId(); + assertNotNull(sessionId); + + // Load job in the same session. + // Should load the data to a temp table. + ConnectionProperty sessionConnectionProperty = + ConnectionProperty.newBuilder().setKey("session_id").setValue(sessionId).build(); + LoadJobConfiguration loadJobConfigurationWithSession = + LoadJobConfiguration.newBuilder( + sessionTableId, "gs://" + BUCKET + "/" + JSON_LOAD_FILE, FormatOptions.json()) + .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) + .setSchema(TABLE_SCHEMA) + .setConnectionProperties(ImmutableList.of(sessionConnectionProperty)) + .build(); + Job remoteJobWithSession = bigquery.create(JobInfo.of(loadJobConfigurationWithSession)); + remoteJobWithSession = remoteJobWithSession.waitFor(); + assertNull(remoteJobWithSession.getStatus().getError()); + Job queryJobWithSession = bigquery.getJob(remoteJobWithSession.getJobId()); + LoadStatistics statisticsWithSession = queryJobWithSession.getStatistics(); + assertNotNull(statisticsWithSession.getSessionInfo().getSessionId()); + + // Checking if the data loaded to the temp table in the session + String queryTempTable = "SELECT * FROM _SESSION.test_temp_destination_table;"; + QueryJobConfiguration queryJobConfigurationWithSession = + QueryJobConfiguration.newBuilder(queryTempTable) + .setConnectionProperties(ImmutableList.of(sessionConnectionProperty)) + .build(); + Job queryTempTableJob = bigquery.create(JobInfo.of(queryJobConfigurationWithSession)); + queryTempTableJob = queryTempTableJob.waitFor(); + assertNull(queryTempTableJob.getStatus().getError()); + assertNotNull(queryTempTableJob.getQueryResults()); + } + // TODO: uncomment this testcase when executeUpdate is implemented // @Test // public void testExecuteSelectWithSession() throws BigQuerySQLException {