Skip to content

Commit e05d6ae

Browse files
authored
Add Integration tests for Delta tables for Spark Client (#1500)
1 parent b55b4e7 commit e05d6ae

File tree

9 files changed

+453
-40
lines changed

9 files changed

+453
-40
lines changed

plugins/spark/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,5 +96,5 @@ Following describes the current functionality limitations of the Polaris Spark c
9696
is also not supported, since it relies on the CTAS support.
9797
2) Create a Delta table without explicit location is not supported.
9898
3) Rename a Delta table is not supported.
99-
4) ALTER TABLE ... SET LOCATION/SET FILEFORMAT/ADD PARTITION is not supported for DELTA table.
99+
4) ALTER TABLE ... SET LOCATION is not supported for DELTA table.
100100
5) For other non-Iceberg tables like csv, it is not supported today.

plugins/spark/v3.5/integration/build.gradle.kts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,19 @@ dependencies {
4949
testImplementation(project(":polaris-spark-${sparkMajorVersion}_${scalaVersion}"))
5050

5151
testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
52-
// exclude log4j dependencies
52+
// exclude log4j dependencies. Explicit dependencies for the log4j libraries are
53+
// enforced below to ensure the version compatibility
5354
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
54-
exclude("org.apache.logging.log4j", "log4j-api")
5555
exclude("org.apache.logging.log4j", "log4j-1.2-api")
56+
exclude("org.apache.logging.log4j", "log4j-core")
5657
exclude("org.slf4j", "jul-to-slf4j")
5758
}
59+
// enforce the usage of log4j 2.24.3. This is for the log4j-api compatibility
60+
// of spark-sql dependency
61+
testRuntimeOnly("org.apache.logging.log4j:log4j-core:2.24.3")
62+
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.24.3")
63+
64+
testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1")
5865

5966
testImplementation(platform(libs.jackson.bom))
6067
testImplementation("com.fasterxml.jackson.core:jackson-annotations")
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.spark.quarkus.it;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
23+
24+
import io.quarkus.test.junit.QuarkusIntegrationTest;
25+
import java.io.File;
26+
import java.nio.file.Path;
27+
import java.util.Arrays;
28+
import java.util.List;
29+
import org.apache.commons.io.FileUtils;
30+
import org.apache.polaris.service.it.env.IntegrationTestsHelper;
31+
import org.apache.spark.sql.Dataset;
32+
import org.apache.spark.sql.Row;
33+
import org.apache.spark.sql.RowFactory;
34+
import org.apache.spark.sql.delta.DeltaAnalysisException;
35+
import org.apache.spark.sql.types.DataTypes;
36+
import org.apache.spark.sql.types.Metadata;
37+
import org.apache.spark.sql.types.StructField;
38+
import org.apache.spark.sql.types.StructType;
39+
import org.junit.jupiter.api.AfterEach;
40+
import org.junit.jupiter.api.BeforeEach;
41+
import org.junit.jupiter.api.Test;
42+
import org.junit.jupiter.api.io.TempDir;
43+
44+
@QuarkusIntegrationTest
45+
public class SparkDeltaIT extends SparkIntegrationBase {
46+
private String defaultNs;
47+
private String tableRootDir;
48+
49+
private String getTableLocation(String tableName) {
50+
return String.format("%s/%s", tableRootDir, tableName);
51+
}
52+
53+
private String getTableNameWithRandomSuffix() {
54+
return generateName("deltatb");
55+
}
56+
57+
@BeforeEach
58+
public void createDefaultResources(@TempDir Path tempDir) {
59+
spark.sparkContext().setLogLevel("WARN");
60+
defaultNs = generateName("delta");
61+
// create a default namespace
62+
sql("CREATE NAMESPACE %s", defaultNs);
63+
sql("USE NAMESPACE %s", defaultNs);
64+
tableRootDir =
65+
IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath();
66+
}
67+
68+
@AfterEach
69+
public void cleanupDeltaData() {
70+
// clean up delta data
71+
File dirToDelete = new File(tableRootDir);
72+
FileUtils.deleteQuietly(dirToDelete);
73+
sql("DROP NAMESPACE %s", defaultNs);
74+
}
75+
76+
@Test
77+
public void testBasicTableOperations() {
78+
// create a regular delta table
79+
String deltatb1 = "deltatb1";
80+
sql(
81+
"CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'",
82+
deltatb1, getTableLocation(deltatb1));
83+
sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", deltatb1);
84+
List<Object[]> results = sql("SELECT * FROM %s WHERE id > 1 ORDER BY id DESC", deltatb1);
85+
assertThat(results.size()).isEqualTo(1);
86+
assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"});
87+
88+
// create a detla table with partition
89+
String deltatb2 = "deltatb2";
90+
sql(
91+
"CREATE TABLE %s (name String, age INT, country STRING) USING DELTA PARTITIONED BY (country) LOCATION '%s'",
92+
deltatb2, getTableLocation(deltatb2));
93+
sql(
94+
"INSERT INTO %s VALUES ('anna', 10, 'US'), ('james', 32, 'US'), ('yan', 16, 'CHINA')",
95+
deltatb2);
96+
results = sql("SELECT name, country FROM %s ORDER BY age", deltatb2);
97+
assertThat(results.size()).isEqualTo(3);
98+
assertThat(results.get(0)).isEqualTo(new Object[] {"anna", "US"});
99+
assertThat(results.get(1)).isEqualTo(new Object[] {"yan", "CHINA"});
100+
assertThat(results.get(2)).isEqualTo(new Object[] {"james", "US"});
101+
102+
// verify the partition dir is created
103+
List<String> subDirs = listDirs(getTableLocation(deltatb2));
104+
assertThat(subDirs).contains("_delta_log", "country=CHINA", "country=US");
105+
106+
// test listTables
107+
List<Object[]> tables = sql("SHOW TABLES");
108+
assertThat(tables.size()).isEqualTo(2);
109+
assertThat(tables)
110+
.contains(
111+
new Object[] {defaultNs, deltatb1, false}, new Object[] {defaultNs, deltatb2, false});
112+
113+
sql("DROP TABLE %s", deltatb1);
114+
sql("DROP TABLE %s", deltatb2);
115+
tables = sql("SHOW TABLES");
116+
assertThat(tables.size()).isEqualTo(0);
117+
}
118+
119+
@Test
120+
public void testAlterOperations() {
121+
String deltatb = getTableNameWithRandomSuffix();
122+
sql(
123+
"CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'",
124+
deltatb, getTableLocation(deltatb));
125+
sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", deltatb);
126+
127+
// test alter columns
128+
// add two new columns to the table
129+
sql("Alter TABLE %s ADD COLUMNS (city STRING, age INT)", deltatb);
130+
// add one more row to the table
131+
sql("INSERT INTO %s VALUES (3, 'john', 'SFO', 20)", deltatb);
132+
// verify the table now have 4 columns with correct result
133+
List<Object[]> results = sql("SELECT * FROM %s ORDER BY id", deltatb);
134+
assertThat(results.size()).isEqualTo(3);
135+
assertThat(results).contains(new Object[] {1, "anna", null, null});
136+
assertThat(results).contains(new Object[] {2, "bob", null, null});
137+
assertThat(results).contains(new Object[] {3, "john", "SFO", 20});
138+
139+
// drop and rename column require set the delta.columnMapping property
140+
sql("ALTER TABLE %s SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')", deltatb);
141+
// drop column age
142+
sql("Alter TABLE %s DROP COLUMN age", deltatb);
143+
// verify the table now have 3 columns with correct result
144+
results = sql("SELECT * FROM %s ORDER BY id", deltatb);
145+
assertThat(results.size()).isEqualTo(3);
146+
assertThat(results).contains(new Object[] {1, "anna", null});
147+
assertThat(results).contains(new Object[] {2, "bob", null});
148+
assertThat(results).contains(new Object[] {3, "john", "SFO"});
149+
150+
// rename column city to address
151+
sql("Alter TABLE %s RENAME COLUMN city TO address", deltatb);
152+
// verify column address exists
153+
results = sql("SELECT id, address FROM %s ORDER BY id", deltatb);
154+
assertThat(results.size()).isEqualTo(3);
155+
assertThat(results).contains(new Object[] {1, null});
156+
assertThat(results).contains(new Object[] {2, null});
157+
assertThat(results).contains(new Object[] {3, "SFO"});
158+
159+
// test alter properties
160+
sql(
161+
"ALTER TABLE %s SET TBLPROPERTIES ('description' = 'people table', 'test-owner' = 'test-user')",
162+
deltatb);
163+
List<Object[]> tableInfo = sql("DESCRIBE TABLE EXTENDED %s", deltatb);
164+
// find the table properties result
165+
String properties = null;
166+
for (Object[] info : tableInfo) {
167+
if (info[0].equals("Table Properties")) {
168+
properties = (String) info[1];
169+
break;
170+
}
171+
}
172+
assertThat(properties).contains("description=people table,test-owner=test-user");
173+
sql("DROP TABLE %s", deltatb);
174+
}
175+
176+
@Test
177+
public void testUnsupportedAlterTableOperations() {
178+
String deltatb = getTableNameWithRandomSuffix();
179+
sql(
180+
"CREATE TABLE %s (name String, age INT, country STRING) USING DELTA PARTITIONED BY (country) LOCATION '%s'",
181+
deltatb, getTableLocation(deltatb));
182+
183+
// ALTER TABLE ... RENAME TO ... fails
184+
assertThatThrownBy(() -> sql("ALTER TABLE %s RENAME TO new_delta", deltatb))
185+
.isInstanceOf(UnsupportedOperationException.class);
186+
187+
// ALTER TABLE ... SET LOCATION ... fails
188+
assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION '/tmp/new/path'", deltatb))
189+
.isInstanceOf(DeltaAnalysisException.class);
190+
191+
sql("DROP TABLE %s", deltatb);
192+
}
193+
194+
@Test
195+
public void testUnsupportedTableCreateOperations() {
196+
String deltatb = getTableNameWithRandomSuffix();
197+
// create delta table with no location
198+
assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING DELTA", deltatb))
199+
.isInstanceOf(UnsupportedOperationException.class);
200+
201+
// CTAS fails
202+
assertThatThrownBy(
203+
() ->
204+
sql(
205+
"CREATE TABLE %s USING DELTA LOCATION '%s' AS SELECT 1 AS id",
206+
deltatb, getTableLocation(deltatb)))
207+
.isInstanceOf(IllegalArgumentException.class);
208+
}
209+
210+
@Test
211+
public void testDataframeSaveOperations() {
212+
List<Row> data = Arrays.asList(RowFactory.create("Alice", 30), RowFactory.create("Bob", 25));
213+
StructType schema =
214+
new StructType(
215+
new StructField[] {
216+
new StructField("name", DataTypes.StringType, false, Metadata.empty()),
217+
new StructField("age", DataTypes.IntegerType, false, Metadata.empty())
218+
});
219+
Dataset<Row> df = spark.createDataFrame(data, schema);
220+
221+
String deltatb = getTableNameWithRandomSuffix();
222+
// saveAsTable requires support for delta requires CTAS support for third party catalog
223+
// in delta catalog, which is currently not supported.
224+
assertThatThrownBy(
225+
() ->
226+
df.write()
227+
.format("delta")
228+
.option("path", getTableLocation(deltatb))
229+
.saveAsTable(deltatb))
230+
.isInstanceOf(IllegalArgumentException.class);
231+
232+
// verify regular dataframe saving still works
233+
df.write().format("delta").save(getTableLocation(deltatb));
234+
235+
// verify the partition dir is created
236+
List<String> subDirs = listDirs(getTableLocation(deltatb));
237+
assertThat(subDirs).contains("_delta_log");
238+
239+
// verify we can create a table out of the exising delta location
240+
sql("CREATE TABLE %s USING DELTA LOCATION '%s'", deltatb, getTableLocation(deltatb));
241+
List<Object[]> tables = sql("SHOW TABLES");
242+
assertThat(tables.size()).isEqualTo(1);
243+
assertThat(tables).contains(new Object[] {defaultNs, deltatb, false});
244+
245+
sql("INSERT INTO %s VALUES ('Anna', 11)", deltatb);
246+
247+
List<Object[]> results = sql("SELECT * FROM %s ORDER BY name", deltatb);
248+
assertThat(results.size()).isEqualTo(3);
249+
assertThat(results.get(0)).isEqualTo(new Object[] {"Alice", 30});
250+
assertThat(results.get(1)).isEqualTo(new Object[] {"Anna", 11});
251+
assertThat(results.get(2)).isEqualTo(new Object[] {"Bob", 25});
252+
253+
sql("DROP TABLE %s", deltatb);
254+
}
255+
}

0 commit comments

Comments
 (0)