Skip to content

Commit

Permalink
Cleanup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Jan 31, 2024
1 parent fa5cfe6 commit d9b1230
Showing 1 changed file with 122 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import org.apache.iceberg.CatalogUtil;
Expand All @@ -36,17 +37,10 @@
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.TestBase;
import org.apache.spark.sql.SparkSession;
<<<<<<< HEAD
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
=======
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
>>>>>>> a7f9c6000 (Fix agg pushodwn on struct)

public class TestAggregatePushDown extends CatalogTestBase {

Expand Down Expand Up @@ -255,78 +249,6 @@ public void testAggregateNotPushDownIfOneCantPushDown() {
assertEquals("expected and actual should equal", expected, actual);
}

@TestTemplate
public void testAggregationPushdownStructInteger() {
testAggregationPushdownStruct(
2L,
3L,
2L,
"(id BIGINT, struct_with_int STRUCT<c1:BIGINT>)",
"struct_with_int.c1",
"(1, named_struct(\"c1\", NULL))",
"(2, named_struct(\"c1\", 2))",
"(3, named_struct(\"c1\", 3))");
}

@TestTemplate
public void testAggregationPushdownNestedStruct() {
testAggregationPushdownStruct(
2L,
3L,
2L,
"(id BIGINT, struct_with_int STRUCT<c1:STRUCT<c2:STRUCT<c3:STRUCT<c4:BIGINT>>>>)",
"struct_with_int.c1.c2.c3.c4",
"(1, named_struct(\"c1\", named_struct(\"c2\", named_struct(\"c3\", named_struct(\"c4\", NULL)))))",
"(2, named_struct(\"c1\", named_struct(\"c2\", named_struct(\"c3\", named_struct(\"c4\", 2)))))",
"(3, named_struct(\"c1\", named_struct(\"c2\", named_struct(\"c3\", named_struct(\"c4\", 3)))))");
}

@TestTemplate
public void testAggregationPushdownStructTimestamp() {
long timestamp = System.currentTimeMillis();
long futureTimestamp = timestamp + 5000;
Timestamp expectedMax = new Timestamp(futureTimestamp / 1000 * 1000);
Timestamp expectedMin = new Timestamp(1000 * (timestamp / 1000));
testAggregationPushdownStruct(
2L,
expectedMax,
expectedMin,
"(id BIGINT, struct_with_ts STRUCT<c1:TIMESTAMP>)",
"struct_with_ts.c1",
"(1, named_struct(\"c1\", NULL))",
String.format(
"(2, named_struct(\"c1\", CAST(from_unixtime(%d/1000) AS TIMESTAMP)))", timestamp),
String.format(
"(3, named_struct(\"c1\", CAST(from_unixtime(%d/1000) AS TIMESTAMP)))",
timestamp + 5000));
}

private void testAggregationPushdownStruct(
Object expectedCount,
Object expectedMax,
Object expectedMin,
String schema,
String aggField,
String... rows) {
sql("CREATE TABLE %s %s USING iceberg", tableName, schema);
sql("INSERT INTO TABLE %s VALUES %s", tableName, String.join(",", rows));
List<Object[]> actual =
sql("SELECT COUNT(%s), MAX(%s), MIN(%s) FROM %s", aggField, aggField, aggField, tableName);
Object actualCount = actual.get(0)[0];
Object actualMax = actual.get(0)[1];
Object actualMin = actual.get(0)[2];
Assertions.assertThat(actualCount)
.withFailMessage("Expected and actual count should equal")
.isEqualTo(expectedCount);
Assertions.assertThat(actualMax)
.withFailMessage("Expected and actual max should equal")
.isEqualTo(expectedMax);
Assertions.assertThat(actualMin)
.withFailMessage("Expected and actual min should equal")
.isEqualTo(expectedMin);
sql("DROP TABLE %s", tableName);
}

@TestTemplate
public void testAggregatePushDownWithMetricsMode() {
sql("CREATE TABLE %s (id LONG, data DOUBLE) USING iceberg", tableName);
Expand Down Expand Up @@ -558,6 +480,126 @@ public void testAggregateWithComplexType() {
.isFalse();
}

@TestTemplate
public void testAggregationPushdownStructInteger() {
sql("CREATE TABLE %s (id BIGINT, struct_with_int STRUCT<c1:BIGINT>) USING iceberg", tableName);
sql("INSERT INTO TABLE %s VALUES (1, named_struct(\"c1\", NULL))", tableName);
sql("INSERT INTO TABLE %s VALUES (2, named_struct(\"c1\", 2))", tableName);
sql("INSERT INTO TABLE %s VALUES (3, named_struct(\"c1\", 3))", tableName);

String query = "SELECT COUNT(%s), MAX(%s), MIN(%s) FROM %s";
String aggField = "struct_with_int.c1";
assertAggregates(sql(query, aggField, aggField, aggField, tableName), 2L, 3L, 2L);
assertExplainContains(
sql("EXPLAIN " + query, aggField, aggField, aggField, tableName),
"count(struct_with_int.c1)",
"max(struct_with_int.c1)",
"min(struct_with_int.c1)");
}

@TestTemplate
public void testAggregationPushdownNestedStruct() {
sql(
"CREATE TABLE %s (id BIGINT, struct_with_int STRUCT<c1:STRUCT<c2:STRUCT<c3:STRUCT<c4:BIGINT>>>>) USING iceberg",
tableName);
sql(
"INSERT INTO TABLE %s VALUES (1, named_struct(\"c1\", named_struct(\"c2\", named_struct(\"c3\", named_struct(\"c4\", NULL)))))",
tableName);
sql(
"INSERT INTO TABLE %s VALUES (2, named_struct(\"c1\", named_struct(\"c2\", named_struct(\"c3\", named_struct(\"c4\", 2)))))",
tableName);
sql(
"INSERT INTO TABLE %s VALUES (3, named_struct(\"c1\", named_struct(\"c2\", named_struct(\"c3\", named_struct(\"c4\", 3)))))",
tableName);

String query = "SELECT COUNT(%s), MAX(%s), MIN(%s) FROM %s";
String aggField = "struct_with_int.c1.c2.c3.c4";

assertAggregates(sql(query, aggField, aggField, aggField, tableName), 2L, 3L, 2L);

assertExplainContains(
sql("EXPLAIN " + query, aggField, aggField, aggField, tableName),
"count(struct_with_int.c1.c2.c3.c4)",
"max(struct_with_int.c1.c2.c3.c4)",
"min(struct_with_int.c1.c2.c3.c4)");
}

@TestTemplate
public void testAggregationPushdownStructTimestamp() {
sql(
"CREATE TABLE %s (id BIGINT, struct_with_ts STRUCT<c1:TIMESTAMP>) USING iceberg",
tableName);
sql("INSERT INTO TABLE %s VALUES (1, named_struct(\"c1\", NULL))", tableName);
sql(
"INSERT INTO TABLE %s VALUES (2, named_struct(\"c1\", timestamp('2023-01-30T22:22:22Z')))",
tableName);
sql(
"INSERT INTO TABLE %s VALUES (3, named_struct(\"c1\", timestamp('2023-01-30T22:23:23Z')))",
tableName);

String query = "SELECT COUNT(%s), MAX(%s), MIN(%s) FROM %s";
String aggField = "struct_with_ts.c1";

assertAggregates(
sql(query, aggField, aggField, aggField, tableName),
2L,
new Timestamp(1675117403000L),
new Timestamp(1675117342000L));

assertExplainContains(
sql("EXPLAIN " + query, aggField, aggField, aggField, tableName),
"count(struct_with_ts.c1)",
"max(struct_with_ts.c1)",
"min(struct_with_ts.c1)");
}

@TestTemplate
public void testAggregationPushdownOnBucketedColumn() {
sql(
"CREATE TABLE %s (id BIGINT, struct_with_int STRUCT<c1:INT>) USING iceberg PARTITIONED BY (bucket(8, id))",
tableName);

sql("INSERT INTO TABLE %s VALUES (1, named_struct(\"c1\", NULL))", tableName);
sql("INSERT INTO TABLE %s VALUES (null, named_struct(\"c1\", 2))", tableName);
sql("INSERT INTO TABLE %s VALUES (2, named_struct(\"c1\", 3))", tableName);

String query = "SELECT COUNT(%s), MAX(%s), MIN(%s) FROM %s";
String aggField = "id";
assertAggregates(sql(query, aggField, aggField, aggField, tableName), 2L, 2L, 1L);
assertExplainContains(
sql("EXPLAIN " + query, aggField, aggField, aggField, tableName),
"count(id)",
"max(id)",
"min(id)");
}

private void assertAggregates(
List<Object[]> actual, Object expectedCount, Object expectedMax, Object expectedMin) {
Object actualCount = actual.get(0)[0];
Object actualMax = actual.get(0)[1];
Object actualMin = actual.get(0)[2];

Assertions.assertThat(actualCount)
.as("Expected and actual count should equal")
.isEqualTo(expectedCount);
Assertions.assertThat(actualMax)
.as("Expected and actual max should equal")
.isEqualTo(expectedMax);
Assertions.assertThat(actualMin)
.as("Expected and actual min should equal")
.isEqualTo(expectedMin);
}

private void assertExplainContains(List<Object[]> explain, String... expectedFragments) {
String explainString = explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
Arrays.stream(expectedFragments)
.forEach(
fragment ->
Assertions.assertThat(explainString.contains(fragment))
.isTrue()
.as("Expected to find plan fragment in explain plan"));
}

@TestTemplate
public void testAggregatePushDownInDeleteCopyOnWrite() {
sql("CREATE TABLE %s (id LONG, data INT) USING iceberg", tableName);
Expand Down

0 comments on commit d9b1230

Please sign in to comment.