Skip to content

Commit

Permalink
Issue GoogleCloudDataproc#867: Support writing with RangePartitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalkarve15 committed Aug 8, 2023
1 parent bd33526 commit 5834477
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next

* Issue #867: Support writing with RangePartitioning
* Issue #144: allow writing Spark String to BQ TIME type
* PR #1038: Logical plan now shows the BigQuery table of DirectBigQueryRelation. Thanks @idc101 !

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
Expand Down Expand Up @@ -604,6 +605,12 @@ public void loadDataIntoTable(
options.getPartitionField().ifPresent(timePartitionBuilder::setField);
jobConfiguration.setTimePartitioning(timePartitionBuilder.build());
}
if (options.getPartitionField().isPresent() && options.getPartitionRange().isPresent()) {
RangePartitioning.Builder rangePartitionBuilder = RangePartitioning.newBuilder();
rangePartitionBuilder.setField(options.getPartitionField().get());
rangePartitionBuilder.setRange(options.getPartitionRange().get());
jobConfiguration.setRangePartitioning(rangePartitionBuilder.build());
}

options
.getClusteredFields()
Expand Down Expand Up @@ -713,6 +720,8 @@ public interface LoadDataOptions {

Optional<TimePartitioning.Type> getPartitionType();

Optional<RangePartitioning.Range> getPartitionRange();

TimePartitioning.Type getPartitionTypeOrDefault();

OptionalLong getPartitionExpirationMs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigquery.ParquetOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.connector.common.BigQueryClient;
Expand Down Expand Up @@ -183,6 +184,9 @@ public static WriteMethod from(@Nullable String writeMethod) {
Long partitionExpirationMs = null;
com.google.common.base.Optional<Boolean> partitionRequireFilter = empty();
com.google.common.base.Optional<TimePartitioning.Type> partitionType = empty();
com.google.common.base.Optional<Long> partitionRangeStart = empty();
com.google.common.base.Optional<Long> partitionRangeEnd = empty();
com.google.common.base.Optional<Long> partitionRangeInterval = empty();
com.google.common.base.Optional<String[]> clusteredFields = empty();
com.google.common.base.Optional<JobInfo.CreateDisposition> createDisposition = empty();
boolean optimizedEmptyProjection = true;
Expand Down Expand Up @@ -289,6 +293,11 @@ public static SparkBigQueryConfig from(
firstPresent(getOption(options, "project").toJavaUtil(), fallbackProject);
config.partitionType =
getOption(options, "partitionType").transform(TimePartitioning.Type::valueOf);
config.partitionRangeStart =
getOption(options, "partitionRangeStart").transform(Long::parseLong);
config.partitionRangeEnd = getOption(options, "partitionRangeEnd").transform(Long::parseLong);
config.partitionRangeInterval =
getOption(options, "partitionRangeInterval").transform(Long::parseLong);
Optional<String> datePartitionParam = getOption(options, DATE_PARTITION_PARAM).toJavaUtil();
datePartitionParam.ifPresent(
date -> validateDateFormat(date, config.getPartitionTypeOrDefault(), DATE_PARTITION_PARAM));
Expand Down Expand Up @@ -820,6 +829,20 @@ public Optional<TimePartitioning.Type> getPartitionType() {
return partitionType.toJavaUtil();
}

public Optional<RangePartitioning.Range> getPartitionRange() {
if (partitionRangeStart.isPresent()
&& partitionRangeEnd.isPresent()
&& partitionRangeInterval.isPresent()) {
return Optional.of(
RangePartitioning.Range.newBuilder()
.setStart(partitionRangeStart.get())
.setEnd(partitionRangeEnd.get())
.setInterval(partitionRangeInterval.get())
.build());
}
return Optional.empty();
}

public TimePartitioning.Type getPartitionTypeOrDefault() {
return partitionType.or(TimePartitioning.Type.DAY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.auth.oauth2.ImpersonatedCredentials;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.CompressionCodec;
Expand Down Expand Up @@ -107,6 +108,7 @@ public void testDefaults() {
assertThat(config.getPartitionExpirationMs()).isEqualTo(OptionalLong.empty());
assertThat(config.getPartitionRequireFilter()).isEqualTo(Optional.empty());
assertThat(config.getPartitionType()).isEqualTo(Optional.empty());
assertThat(config.getPartitionRange()).isEqualTo(Optional.empty());
assertThat(config.getClusteredFields()).isEqualTo(Optional.empty());
assertThat(config.getCreateDisposition()).isEqualTo(Optional.empty());
assertThat(config.getLoadSchemaUpdateOptions()).isEqualTo(ImmutableList.of());
Expand Down Expand Up @@ -228,6 +230,38 @@ public void testConfigFromOptions() {
assertThat(config.getKmsKeyName()).isEqualTo(Optional.of("some/key/name"));
}

@Test
public void testConfigFromOptions_rangePartitioning() {
Configuration hadoopConfiguration = new Configuration();
DataSourceOptions options =
new DataSourceOptions(
ImmutableMap.<String, String>builder()
.put("table", "test_t")
.put("dataset", "test_d")
.put("project", "test_p")
.put("partitionRangeStart", "1")
.put("partitionRangeEnd", "20")
.put("partitionRangeInterval", "2")
.put("partitionField", "some_field")
.build());
SparkBigQueryConfig config =
SparkBigQueryConfig.from(
options.asMap(),
defaultGlobalOptions,
hadoopConfiguration,
ImmutableMap.of(),
DEFAULT_PARALLELISM,
new SQLConf(),
SPARK_VERSION,
Optional.empty(), /* tableIsMandatory */
true);
RangePartitioning.Range expectedRange =
RangePartitioning.Range.newBuilder().setStart(1L).setEnd(20L).setInterval(2L).build();
assertThat(config.getTableId()).isEqualTo(TableId.of("test_p", "test_d", "test_t"));
assertThat(config.getPartitionRange()).isEqualTo(Optional.of(expectedRange));
assertThat(config.getPartitionField()).isEqualTo(Optional.of("some_field"));
}

@Test
public void testCacheExpirationSetToZero() {
Configuration hadoopConfiguration = new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeThat;

import com.google.cloud.bigquery.BigQuery;
Expand All @@ -27,6 +28,7 @@
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
Expand All @@ -41,6 +43,7 @@
import com.google.cloud.spark.bigquery.integration.model.Friend;
import com.google.cloud.spark.bigquery.integration.model.Link;
import com.google.cloud.spark.bigquery.integration.model.Person;
import com.google.cloud.spark.bigquery.integration.model.RangeData;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.inject.ProvisionException;
Expand Down Expand Up @@ -1052,6 +1055,41 @@ private void testPartition(String partitionType) {
assertThat(readDF.count()).isEqualTo(3);
}

@Test
public void testPartitionRange() {
// partition write not supported in BQ Storage Write API
assumeThat(writeMethod, equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));

List<RangeData> data =
Arrays.asList(new RangeData("a", 1L), new RangeData("b", 5L), new RangeData("c", 11L));
Dataset<Row> df = spark.createDataset(data, Encoders.bean(RangeData.class)).toDF();
String table = testDataset.toString() + "." + testTable + "_range";
df.write()
.format("bigquery")
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("partitionField", "rng")
.option("partitionRangeStart", "1")
.option("partitionRangeEnd", "21")
.option("partitionRangeInterval", "2")
.option("partitionRequireFilter", "true")
.option("table", table)
.option("writeMethod", writeMethod.toString())
.save();

Dataset<Row> readDF = spark.read().format("bigquery").load(table);
assertThat(readDF.count()).isEqualTo(3);
Table bqTable = bq.getTable(TableId.of(testDataset.toString(), testTable + "_range"));
assertThat(bqTable).isNotNull();
assertTrue(bqTable.getDefinition() instanceof StandardTableDefinition);
StandardTableDefinition bqTableDef = bqTable.getDefinition();
assertThat(bqTableDef.getRangePartitioning()).isNotNull();
RangePartitioning.Range expectedRange =
RangePartitioning.Range.newBuilder().setStart(1L).setEnd(21L).setInterval(2L).build();
String expectedField = "rng";
assertThat(bqTableDef.getRangePartitioning().getRange()).isEqualTo(expectedRange);
assertThat(bqTableDef.getRangePartitioning().getField()).isEqualTo(expectedField);
}

@Test
public void testCacheDataFrameInDataSource() {
// It takes some time for the data to be available for read via the Storage Read API, after it
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2021 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spark.bigquery.integration.model;

import com.google.common.base.Objects;
import java.io.Serializable;

public class RangeData implements Serializable {

private String str;
private Long rng;

public RangeData(String str, Long rng) {
this.str = str;
this.rng = rng;
}

public String getStr() {
return str;
}

public void setStr(String str) {
this.str = str;
}

public Long getRng() {
return rng;
}

public void setRng(Long rng) {
this.rng = rng;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof RangeData)) {
return false;
}
RangeData data = (RangeData) o;
return Objects.equal(str, data.str) && Objects.equal(rng, data.rng);
}

@Override
public int hashCode() {
return Objects.hashCode(str, rng);
}

@Override
public String toString() {
return "Data{" + "str='" + str + '\'' + ", rng=" + rng + '}';
}
}

0 comments on commit 5834477

Please sign in to comment.