Skip to content

Commit

Permalink
[SC-78072][Delta] Fix partitionedBy in DeltaTableBuilder Python API
Browse files Browse the repository at this point in the history
Fix partitionedBy in DeltaTableBuilder Python API

Added more usages in the unit tests.

Author: Yijia Cui <yijia.cui@databricks.com>

GitOrigin-RevId: 99e2e89242c8fa41c960df25dcc382faae439ab1
  • Loading branch information
yijiacui-db authored and Yaohua628 committed Jun 3, 2021
1 parent 50fcd97 commit 59aa330
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 15 deletions.
6 changes: 3 additions & 3 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ def addColumns(self, cols):
return self

@since(1.0)
def partitionedBy(self, col, *cols):
def partitionedBy(self, *cols):
"""
Specify columns for partitioning
Expand All @@ -886,8 +886,8 @@ def partitionedBy(self, col, *cols):
.. note:: Evolving
"""
if type(col) is not str:
self._raise_type_error("Partitioning columns must be str.", [col])
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
cols = cols[0]
for c in cols:
if type(c) is not str:
self._raise_type_error("Partitioning column must be str.", [c])
Expand Down
76 changes: 64 additions & 12 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os

from pyspark.sql import Row
from pyspark.sql.column import _to_seq
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
from pyspark.sql.utils import AnalysisException, ParseException
Expand Down Expand Up @@ -353,7 +354,8 @@ def test_isDeltaTable(self):

def __verify_table_schema(self, tableName, schema, cols,
types, nullables={}, comments={},
properties={}):
properties={}, partitioningColumns=[],
tblComment=None):
fields = []
for i in range(len(cols)):
col = cols[i]
Expand All @@ -369,6 +371,13 @@ def __verify_table_schema(self, tableName, schema, cols,
for key in properties:
assert (key in tablePropertyMap)
assert (tablePropertyMap[key] == properties[key])
tableDetails = self.spark.sql("DESCRIBE DETAIL {}".format(tableName))\
.collect()[0]
assert(tableDetails.format == "delta")
actualComment = tableDetails.description
assert(actualComment == tblComment)
partitionCols = tableDetails.partitionColumns
assert(sorted(partitionCols) == sorted((partitioningColumns)))

def __verify_generated_column(self, tableName, deltaTable):
cmd = "INSERT INTO {table} (col1, col2) VALUES (1, 11)".format(table=tableName)
Expand Down Expand Up @@ -405,22 +414,28 @@ def test_create_table_with_existing_schema(self):
df = self.spark.createDataFrame([('a', 1), ('b', 2), ('c', 3)], ["key", "value"])
deltaTable = DeltaTable.create(self.spark).tableName("test") \
.addColumns(df.schema) \
.addColumn("value2", dataType="int").execute()
.addColumn("value2", dataType="int")\
.partitionedBy(["value2", "value"])\
.execute()
self.__verify_table_schema("test",
deltaTable.toDF().schema,
["key", "value", "value2"],
[StringType(), LongType(), IntegerType()],
nullables={"key", "value", "value2"})
nullables={"key", "value", "value2"},
partitioningColumns=["value", "value2"])

# verify creating table with list of structFields
deltaTable2 = DeltaTable.create(self.spark).tableName("test2").addColumns(
df.schema.fields) \
.addColumn("value2", dataType="int").execute()
.addColumn("value2", dataType="int") \
.partitionedBy("value2", "value")\
.execute()
self.__verify_table_schema("test2",
deltaTable2.toDF().schema,
["key", "value", "value2"],
[StringType(), LongType(), IntegerType()],
nullables={"key", "value", "value2"})
nullables={"key", "value", "value2"},
partitioningColumns=["value", "value2"])

def test_create_replace_table_with_no_spark_session_passed(self):
# create table.
Expand Down Expand Up @@ -471,7 +486,9 @@ def test_create_table_with_name_only(self):
[IntegerType(), IntegerType()],
nullables={"col2"},
comments={"col1": "foo"},
properties={"foo": "bar"})
properties={"foo": "bar"},
partitioningColumns=["col1"],
tblComment="comment")
# verify generated columns.
self.__verify_generated_column(tableName, deltaTable)
self.spark.sql("DROP TABLE IF EXISTS {}".format(tableName))
Expand All @@ -486,7 +503,9 @@ def test_create_table_with_location_only(self):
["col1", "col2"],
[IntegerType(), IntegerType()],
nullables={"col2"},
comments={"col1": "foo"})
comments={"col1": "foo"},
partitioningColumns=["col1"],
tblComment="comment")
# verify generated columns.
self.__verify_generated_column("delta.`{}`".format(path), deltaTable)

Expand All @@ -503,7 +522,9 @@ def test_create_table_with_name_and_location(self):
[IntegerType(), IntegerType()],
nullables={"col2"},
comments={"col1": "foo"},
properties={"foo": "bar"})
properties={"foo": "bar"},
partitioningColumns=["col1"],
tblComment="comment")
# verify generated columns.
self.__verify_generated_column(tableName, deltaTable)
self.spark.sql("DROP TABLE IF EXISTS {}".format(tableName))
Expand Down Expand Up @@ -539,7 +560,9 @@ def test_replace_table_with_name_only(self):
[IntegerType(), IntegerType()],
nullables={"col2"},
comments={"col1": "foo"},
properties={"foo": "bar"})
properties={"foo": "bar"},
partitioningColumns=["col1"],
tblComment="comment")
# verify generated columns.
self.__verify_generated_column(tableName, deltaTable)
self.spark.sql("DROP TABLE IF EXISTS {}".format(tableName))
Expand All @@ -555,7 +578,10 @@ def test_replace_table_with_location_only(self):
["col1", "col2"],
[IntegerType(), IntegerType()],
nullables={"col2"},
comments={"col1": "foo"})
comments={"col1": "foo"},
properties={"foo": "bar"},
partitioningColumns=["col1"],
tblComment="comment")
# verify generated columns.
self.__verify_generated_column("delta.`{}`".format(path), deltaTable)

Expand All @@ -574,7 +600,9 @@ def test_replace_table_with_name_and_location(self):
[IntegerType(), IntegerType()],
nullables={"col2"},
comments={"col1": "foo"},
properties={"foo": "bar"})
properties={"foo": "bar"},
partitioningColumns=["col1"],
tblComment="comment")
# verify generated columns.
self.__verify_generated_column(tableName, deltaTable)
self.spark.sql("DROP TABLE IF EXISTS {}".format(tableName))
Expand All @@ -594,7 +622,28 @@ def test_replace_table_behavior(self):
[IntegerType(), IntegerType()],
nullables={"col2"},
comments={"col1": "foo"},
properties={"foo": "bar"})
properties={"foo": "bar"},
partitioningColumns=["col1"],
tblComment="comment")

def test_verify_paritionedBy_compatibility(self):
tableBuilder = DeltaTable.create(self.spark).tableName("testTable") \
.addColumn("col1", "int", comment="foo", nullable=False) \
.addColumn("col2", IntegerType(), generatedAlwaysAs="col1 + 10") \
.property("foo", "bar") \
.comment("comment")
tableBuilder._jbuilder = tableBuilder._jbuilder \
.partitionedBy(_to_seq(self.spark._sc, ["col1"]))
deltaTable = tableBuilder.execute()
self.__verify_table_schema("testTable",
deltaTable.toDF().schema,
["col1", "col2"],
[IntegerType(), IntegerType()],
nullables={"col2"},
comments={"col1": "foo"},
properties={"foo": "bar"},
partitioningColumns=["col1"],
tblComment="comment")

def test_delta_table_builder_with_bad_args(self):
builder = DeltaTable.create(self.spark)
Expand Down Expand Up @@ -650,6 +699,9 @@ def test_delta_table_builder_with_bad_args(self):
with self.assertRaises(TypeError):
builder.partitionedBy(1, "1")

with self.assertRaises(TypeError):
builder.partitionedBy([1])

# bad property key
with self.assertRaises(TypeError):
builder.property(1, "1")
Expand Down

0 comments on commit 59aa330

Please sign in to comment.