Skip to content

Commit

Permalink
Support writing with RangePartitioning (Issue #867)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalkarve15 committed Jul 7, 2023
1 parent 0acdf95 commit 5fe8ad0
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
Expand Down Expand Up @@ -578,6 +579,12 @@ public void loadDataIntoTable(
options.getPartitionField().ifPresent(timePartitionBuilder::setField);
jobConfiguration.setTimePartitioning(timePartitionBuilder.build());
}
if (options.getPartitionField().isPresent() || options.getPartitionRange().isPresent()) {
RangePartitioning.Builder rangePartitionBuilder = RangePartitioning.newBuilder();
options.getPartitionField().ifPresent(rangePartitionBuilder::setField);
options.getPartitionRange().ifPresent(rangePartitionBuilder::setRange);
jobConfiguration.setRangePartitioning(rangePartitionBuilder.build());
}

options
.getClusteredFields()
Expand Down Expand Up @@ -687,6 +694,8 @@ public interface LoadDataOptions {

Optional<TimePartitioning.Type> getPartitionType();

Optional<RangePartitioning.Range> getPartitionRange();

TimePartitioning.Type getPartitionTypeOrDefault();

OptionalLong getPartitionExpirationMs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigquery.ParquetOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.connector.common.BigQueryClient;
Expand Down Expand Up @@ -182,6 +183,9 @@ public static WriteMethod from(@Nullable String writeMethod) {
Long partitionExpirationMs = null;
com.google.common.base.Optional<Boolean> partitionRequireFilter = empty();
com.google.common.base.Optional<TimePartitioning.Type> partitionType = empty();
com.google.common.base.Optional<Long> partitionRangeStart = empty();
com.google.common.base.Optional<Long> partitionRangeEnd = empty();
com.google.common.base.Optional<Long> partitionRangeInterval = empty();
com.google.common.base.Optional<String[]> clusteredFields = empty();
com.google.common.base.Optional<JobInfo.CreateDisposition> createDisposition = empty();
boolean optimizedEmptyProjection = true;
Expand Down Expand Up @@ -287,6 +291,11 @@ public static SparkBigQueryConfig from(
firstPresent(getOption(options, "project").toJavaUtil(), fallbackProject);
config.partitionType =
getOption(options, "partitionType").transform(TimePartitioning.Type::valueOf);
config.partitionRangeStart =
getOption(options, "partitionRangeStart").transform(Long::parseLong);
config.partitionRangeEnd = getOption(options, "partitionRangeEnd").transform(Long::parseLong);
config.partitionRangeInterval =
getOption(options, "partitionRangeInterval").transform(Long::parseLong);
Optional<String> datePartitionParam = getOption(options, DATE_PARTITION_PARAM).toJavaUtil();
datePartitionParam.ifPresent(
date -> validateDateFormat(date, config.getPartitionTypeOrDefault(), DATE_PARTITION_PARAM));
Expand Down Expand Up @@ -816,6 +825,20 @@ public Optional<TimePartitioning.Type> getPartitionType() {
return partitionType.toJavaUtil();
}

public Optional<RangePartitioning.Range> getPartitionRange() {
if (partitionRangeStart.isPresent()
&& partitionRangeEnd.isPresent()
&& partitionRangeInterval.isPresent()) {
return Optional.of(
RangePartitioning.Range.newBuilder()
.setStart(partitionRangeStart.get())
.setEnd(partitionRangeEnd.get())
.setInterval(partitionRangeInterval.get())
.build());
}
return Optional.empty();
}

public TimePartitioning.Type getPartitionTypeOrDefault() {
return partitionType.or(TimePartitioning.Type.DAY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.auth.oauth2.ImpersonatedCredentials;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.CompressionCodec;
Expand Down Expand Up @@ -107,6 +108,7 @@ public void testDefaults() {
assertThat(config.getPartitionExpirationMs()).isEqualTo(OptionalLong.empty());
assertThat(config.getPartitionRequireFilter()).isEqualTo(Optional.empty());
assertThat(config.getPartitionType()).isEqualTo(Optional.empty());
assertThat(config.getPartitionRange()).isEqualTo(Optional.empty());
assertThat(config.getClusteredFields()).isEqualTo(Optional.empty());
assertThat(config.getCreateDisposition()).isEqualTo(Optional.empty());
assertThat(config.getLoadSchemaUpdateOptions()).isEqualTo(ImmutableList.of());
Expand Down Expand Up @@ -228,6 +230,38 @@ public void testConfigFromOptions() {
assertThat(config.getKmsKeyName()).isEqualTo(Optional.of("some/key/name"));
}

@Test
public void testConfigFromOptions_rangePartitioning() {
Configuration hadoopConfiguration = new Configuration();
DataSourceOptions options =
new DataSourceOptions(
ImmutableMap.<String, String>builder()
.put("table", "test_t")
.put("dataset", "test_d")
.put("project", "test_p")
.put("partitionRangeStart", "1")
.put("partitionRangeEnd", "20")
.put("partitionRangeInterval", "2")
.put("partitionField", "some_field")
.build());
SparkBigQueryConfig config =
SparkBigQueryConfig.from(
options.asMap(),
defaultGlobalOptions,
hadoopConfiguration,
ImmutableMap.of(),
DEFAULT_PARALLELISM,
new SQLConf(),
SPARK_VERSION,
Optional.empty(), /* tableIsMandatory */
true);
RangePartitioning.Range expectedRange =
RangePartitioning.Range.newBuilder().setStart(1L).setEnd(20L).setInterval(2L).build();
assertThat(config.getTableId()).isEqualTo(TableId.of("test_p", "test_d", "test_t"));
assertThat(config.getPartitionRange()).isEqualTo(Optional.of(expectedRange));
assertThat(config.getPartitionField()).isEqualTo(Optional.of("some_field"));
}

@Test
public void testCacheExpirationSetToZero() {
Configuration hadoopConfiguration = new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.cloud.spark.bigquery.integration.model.Friend;
import com.google.cloud.spark.bigquery.integration.model.Link;
import com.google.cloud.spark.bigquery.integration.model.Person;
import com.google.cloud.spark.bigquery.integration.model.RangeData;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.inject.ProvisionException;
Expand Down Expand Up @@ -894,6 +895,31 @@ private void testPartition(String partitionType) {
assertThat(readDF.count()).isEqualTo(3);
}

@Test
public void testPartitionRange() {
// partition write not supported in BQ Storage Write API
assumeThat(writeMethod, equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));

List<RangeData> data =
Arrays.asList(new RangeData("a", 1L), new RangeData("b", 5L), new RangeData("c", 11L));
Dataset<Row> df = spark.createDataset(data, Encoders.bean(RangeData.class)).toDF();
String table = testDataset.toString() + "." + testTable + "_range";
df.write()
.format("bigquery")
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("partitionField", "rng")
.option("partitionRangeStart", "1")
.option("partitionRangeEnd", "21")
.option("partitionRangeInterval", "2")
.option("partitionRequireFilter", "true")
.option("table", table)
.option("writeMethod", writeMethod.toString())
.save();

Dataset<Row> readDF = spark.read().format("bigquery").load(table);
assertThat(readDF.count()).isEqualTo(3);
}

@Test
public void testCacheDataFrameInDataSource() {
// It takes some time for the data to be available for read via the Storage Read API, after it
Expand Down

0 comments on commit 5fe8ad0

Please sign in to comment.