Skip to content

Commit

Permalink
fix no partition test
Browse files Browse the repository at this point in the history
  • Loading branch information
isha97 committed Dec 5, 2024
1 parent 356c1ad commit bf17005
Showing 1 changed file with 25 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1579,7 +1579,8 @@ public void testWriteToTimestampField() {
assertThat(head.get(head.fieldIndex("timestamp1"))).isEqualTo(timestamp1);
}

protected Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition(Dataset<Row> df) {
protected Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition(
Dataset<Row> df, boolean isPartitioned) {
df.write()
.format("bigquery")
.mode(SaveMode.Overwrite)
Expand All @@ -1591,10 +1592,12 @@ protected Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition(Dataset<Row>
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.save();

IntegrationTestUtils.runQuery(
String.format(
"ALTER TABLE %s.%s SET OPTIONS (require_partition_filter = false)",
testDataset, testTable));
if (isPartitioned) {
IntegrationTestUtils.runQuery(
String.format(
"ALTER TABLE %s.%s SET OPTIONS (require_partition_filter = false)",
testDataset, testTable));
}

return spark
.read()
Expand Down Expand Up @@ -1626,7 +1629,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByHour() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1670,7 +1673,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByDay() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1714,7 +1717,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByMonth() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1758,7 +1761,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByYear() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1801,7 +1804,7 @@ public void testOverwriteDynamicPartition_partitionDateByDay() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1842,7 +1845,7 @@ public void testOverwriteDynamicPartition_partitionDateByMonth() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1883,7 +1886,7 @@ public void testOverwriteDynamicPartition_partitionDateByYear() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1925,7 +1928,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByHour() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1967,7 +1970,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByDay() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2009,7 +2012,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByMonth() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2051,7 +2054,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByYear() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2091,7 +2094,7 @@ public void testOverwriteDynamicPartition_noTimePartitioning() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, false);
assertThat(result.count()).isEqualTo(2);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2132,7 +2135,7 @@ public void testOverwriteDynamicPartition_rangePartitioned() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(5);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2179,7 +2182,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeLessThanSt
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(2);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2214,7 +2217,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeGreaterTha
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(2);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2249,7 +2252,7 @@ public void testOverwriteDynamicPartition_rangePartitionedBoundaryCondition() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2288,7 +2291,7 @@ public void testOverwriteDynamicPartition_rangePartitionedWithNulls() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);

List<Row> rows = result.collectAsList();
Expand Down

0 comments on commit bf17005

Please sign in to comment.