From 6e16690d1c514142b64ef92a93b46098b5402224 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Wed, 21 Feb 2024 04:53:33 +0000 Subject: [PATCH] fix linting; add decimal input transform --- pyiceberg/partitioning.py | 19 +- tests/integration/test_partitioning_key.py | 1008 ++++++++++---------- 2 files changed, 531 insertions(+), 496 deletions(-) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index ec3db10ad8..45c4d85cab 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -26,6 +26,7 @@ Optional, Tuple, ) +from urllib.parse import quote from pydantic import ( BeforeValidator, @@ -208,7 +209,6 @@ def partition_to_path(self, data: Record, schema: Schema) -> str: partition_field = self.fields[pos] # partition field value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=value) - from urllib.parse import quote value_str = quote(value_str, safe='') value_strs.append(value_str) @@ -250,10 +250,9 @@ class PartitionFieldValue: @dataclass(frozen=True) class PartitionKey: - raw_partition_field_values: list[PartitionFieldValue] + raw_partition_field_values: List[PartitionFieldValue] partition_spec: PartitionSpec schema: Schema - from functools import cached_property @cached_property def partition(self) -> Record: # partition key in iceberg type @@ -263,8 +262,8 @@ def partition(self) -> Record: # partition key in iceberg type assert len(partition_fields) == 1 partition_field = partition_fields[0] iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type - _iceberg_typed_value = iceberg_typed_value(iceberg_type, raw_partition_field_value.value) - transformed_value = partition_field.transform.transform(iceberg_type)(_iceberg_typed_value) + iceberg_typed_value = _to_iceberg_type(iceberg_type, raw_partition_field_value.value) + transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) iceberg_typed_key_values[partition_field.name] = transformed_value return Record(**iceberg_typed_key_values) @@ -273,21 +272,21 @@ def to_path(self) -> str: @singledispatch -def iceberg_typed_value(type: IcebergType, value: Any) -> Any: +def _to_iceberg_type(type: IcebergType, value: Any) -> Any: return TypeError(f"Unsupported partition field type: {type}") -@iceberg_typed_value.register(TimestampType) -@iceberg_typed_value.register(TimestamptzType) +@_to_iceberg_type.register(TimestampType) +@_to_iceberg_type.register(TimestamptzType) def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: return datetime_to_micros(value) if value is not None else None -@iceberg_typed_value.register(DateType) +@_to_iceberg_type.register(DateType) def _(type: IcebergType, value: Optional[date]) -> Optional[int]: return date_to_days(value) if value is not None else None -@iceberg_typed_value.register(PrimitiveType) +@_to_iceberg_type.register(PrimitiveType) def _(type: IcebergType, value: Optional[Any]) -> Optional[Any]: return value diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py index f25044868e..803e8e4b22 100644 --- a/tests/integration/test_partitioning_key.py +++ b/tests/integration/test_partitioning_key.py @@ -15,31 +15,27 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name -from datetime import date, datetime -from typing import Any +from decimal import Decimal +from typing import Any, List import pytest -import pytz from pyspark.sql import SparkSession +from pyspark.sql.utils import AnalysisException from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.exceptions import NamespaceAlreadyExistsError from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.transforms import ( - BucketTransform, - DayTransform, - HourTransform, IdentityTransform, - MonthTransform, TruncateTransform, - YearTransform, ) from pyiceberg.typedef import Record from pyiceberg.types import ( BinaryType, BooleanType, DateType, + DecimalType, DoubleType, FixedType, FloatType, @@ -136,6 +132,7 @@ def spark() -> SparkSession: # NestedField(field_id=12, name="uuid", field_type=UuidType(), required=False), NestedField(field_id=11, name="binary_field", field_type=BinaryType(), required=False), NestedField(field_id=12, name="fixed_field", field_type=FixedType(16), required=False), + NestedField(field_id=13, name="decimal", field_type=DecimalType(5, 2), required=False), ) @@ -145,501 +142,538 @@ def spark() -> SparkSession: @pytest.mark.parametrize( "partition_fields, partition_values, expected_partition_record, expected_hive_partition_path_slice, spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification", [ - # Identity Transform + # # Identity Transform + # ( + # [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], + # [False], + # Record(boolean_field=False), + # "boolean_field=False", + # # pyiceberg writes False while spark writes false, so justification (compare expected value with spark behavior) would fail. + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # boolean_field boolean, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(boolean_field) -- Partitioning by 'boolean_field' + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (false, 'Boolean field set to false'); + # # """ + # ), + # ( + # [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], + # ["sample_string"], + # Record(string_field="sample_string"), + # "string_field=sample_string", + # f"""CREATE TABLE {identifier} ( + # string_field string, + # another_string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(string_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # ('sample_string', 'Another string value') + # """, + # ), + # ( + # [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], + # [42], + # Record(int_field=42), + # "int_field=42", + # f"""CREATE TABLE {identifier} ( + # int_field int, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(int_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (42, 'Associated string value for int 42') + # """, + # ), + # ( + # [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], + # [1234567890123456789], + # Record(long_field=1234567890123456789), + # "long_field=1234567890123456789", + # f"""CREATE TABLE {identifier} ( + # long_field bigint, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(long_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (1234567890123456789, 'Associated string value for long 1234567890123456789') + # """, + # ), + # ( + # [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], + # [3.14], + # Record(float_field=3.14), + # "float_field=3.14", + # # spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) + # # so justification (compare expected value with spark behavior) would fail. + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # float_field float, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(float_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (3.14, 'Associated string value for float 3.14') + # # """ + # ), + # ( + # [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], + # [6.282], + # Record(double_field=6.282), + # "double_field=6.282", + # # spark writes differently as pyiceberg, Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has difference) + # # so justification (compare expected value with spark behavior) would fail. + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # double_field double, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(double_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (6.282, 'Associated string value for double 6.282') + # # """ + # ), + # ( + # [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + # [datetime(2023, 1, 1, 12, 0, 0)], + # Record(timestamp_field=1672574400000000), + # "timestamp_field=2023-01-01T12%3A00%3A00", + # # spark writes differently as pyiceberg, Record[timestamp_field=1672574400000000] path:timestamp_field=2023-01-01T12%3A00Z (the Z is the difference) + # # so justification (compare expected value with spark behavior) would fail. + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # timestamp_field timestamp, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(timestamp_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (CAST('2023-01-01 12:00:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01T12:00:00') + # # """ + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], + # [date(2023, 1, 1)], + # Record(date_field=19358), + # "date_field=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(date_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') + # """, + # ), + # ( + # [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], + # [b'example'], + # Record(binary_field=b'example'), + # "binary_field=ZXhhbXBsZQ%3D%3D", + # f"""CREATE TABLE {identifier} ( + # binary_field binary, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(binary_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('example' AS BINARY), 'Associated string value for binary `example`') + # """, + # ), ( - [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], - [False], - Record(boolean_field=False), - "boolean_field=False", - # pyiceberg writes False while spark writes false, so justification (compare expected value with spark behavior) would fail. - None, - None, - # f"""CREATE TABLE {identifier} ( - # boolean_field boolean, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(boolean_field) -- Partitioning by 'boolean_field' - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (false, 'Boolean field set to false'); - # """ - ), - ( - [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], - ["sample_string"], - Record(string_field="sample_string"), - "string_field=sample_string", - f"""CREATE TABLE {identifier} ( - string_field string, - another_string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(string_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - ('sample_string', 'Another string value') - """, - ), - ( - [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], - [42], - Record(int_field=42), - "int_field=42", - f"""CREATE TABLE {identifier} ( - int_field int, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(int_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (42, 'Associated string value for int 42') - """, - ), - ( - [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], - [1234567890123456789], - Record(long_field=1234567890123456789), - "long_field=1234567890123456789", - f"""CREATE TABLE {identifier} ( - long_field bigint, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(long_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (1234567890123456789, 'Associated string value for long 1234567890123456789') - """, - ), - ( - [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], - [3.14], - Record(float_field=3.14), - "float_field=3.14", - # spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) - # so justification (compare expected value with spark behavior) would fail. - None, - None, - # f"""CREATE TABLE {identifier} ( - # float_field float, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(float_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (3.14, 'Associated string value for float 3.14') - # """ - ), - ( - [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], - [6.282], - Record(double_field=6.282), - "double_field=6.282", - # spark writes differently as pyiceberg, Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has difference) - # so justification (compare expected value with spark behavior) would fail. - None, - None, - # f"""CREATE TABLE {identifier} ( - # double_field double, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(double_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (6.282, 'Associated string value for double 6.282') - # """ - ), - ( - [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], - [datetime(2023, 1, 1, 12, 0, 0)], - Record(timestamp_field=1672574400000000), - "timestamp_field=2023-01-01T12%3A00%3A00", - # spark writes differently as pyiceberg, Record[timestamp_field=1672574400000000] path:timestamp_field=2023-01-01T12%3A00Z (the Z is the difference) - # so justification (compare expected value with spark behavior) would fail. - None, - None, - # f"""CREATE TABLE {identifier} ( - # timestamp_field timestamp, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(timestamp_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (CAST('2023-01-01 12:00:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01T12:00:00') - # """ - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], - [date(2023, 1, 1)], - Record(date_field=19358), - "date_field=2023-01-01", - f"""CREATE TABLE {identifier} ( - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(date_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') - """, - ), - ( - [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], - [b'example'], - Record(binary_field=b'example'), - "binary_field=ZXhhbXBsZQ%3D%3D", - f"""CREATE TABLE {identifier} ( - binary_field binary, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(binary_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('example' AS BINARY), 'Associated string value for binary `example`') - """, - ), - # Year Month Day Hour Transform - # Month Transform - ( - [PartitionField(source_id=8, field_id=1001, transform=MonthTransform(), name="timestamp_field_month")], - [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_month=((2023 - 1970) * 12)), - "timestamp_field_month=2023-01", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - month(timestamp_field) -- Partitioning by month from 'timestamp_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); - """, - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], - [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], - Record(timestamptz_field_month=((2023 - 1970) * 12)), - "timestamptz_field_month=2023-01", - # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). - None, - None, - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], - [date(2023, 1, 1)], - Record(date_field_month=((2023 - 1970) * 12)), - "date_field_month=2023-01", + [PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], + [Decimal('123.45')], + Record(decimal_field=Decimal('123.45')), + "decimal_field=123.45", f"""CREATE TABLE {identifier} ( - date_field date, + decimal_field decimal(5,2), string_field string ) USING iceberg PARTITIONED BY ( - month(date_field) -- Partitioning by month from 'date_field' + identity(decimal_field) ) """, f"""INSERT INTO {identifier} VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + (123.45, 'Associated string value for decimal 123.45') """, ), - # Year Transform + # # Year Month Day Hour Transform + # # Month Transform + # ( + # [PartitionField(source_id=8, field_id=1001, transform=MonthTransform(), name="timestamp_field_month")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999)], + # Record(timestamp_field_month=((2023 - 1970) * 12)), + # "timestamp_field_month=2023-01", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # month(timestamp_field) -- Partitioning by month from 'timestamp_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); + # """, + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], + # Record(timestamptz_field_month=((2023 - 1970) * 12)), + # "timestamptz_field_month=2023-01", + # # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). + # None, + # None, + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], + # [date(2023, 1, 1)], + # Record(date_field_month=((2023 - 1970) * 12)), + # "date_field_month=2023-01", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # month(date_field) -- Partitioning by month from 'date_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # # Year Transform + # ( + # [PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999)], + # Record(timestamp_field_year=(2023 - 1970)), + # "timestamp_field_year=2023", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(timestamp_field) -- Partitioning by year from 'timestamp_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); + # """, + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], + # Record(timestamptz_field_year=53), + # "timestamptz_field_year=2023", + # # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). + # None, + # None, + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], + # [date(2023, 1, 1)], + # Record(date_field_year=(2023 - 1970)), + # "date_field_year=2023", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(date_field) -- Partitioning by year from 'date_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # # # Day Transform + # ( + # [PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_field_day")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999)], + # Record(timestamp_field_day=19358), + # "timestamp_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # day(timestamp_field) -- Partitioning by day from 'timestamp_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], + # Record(timestamptz_field_day=19358), + # "timestamptz_field_day=2023-01-01", + # # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). + # None, + # None, + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], + # [date(2023, 1, 1)], + # Record(date_field_day=19358), + # "date_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # day(date_field) -- Partitioning by day from 'date_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # # Hour Transform + # ( + # [PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_field_hour")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999)], + # Record(timestamp_field_hour=464603), + # "timestamp_field_hour=2023-01-01-11", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # hour(timestamp_field) -- Partitioning by hour from 'timestamp_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event within the 11th hour of 2023-01-01'); + # """, + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], + # Record(timestamptz_field_hour=464608), # 464608 = 464603 + 5, new york winter day light saving time + # "timestamptz_field_hour=2023-01-01-16", + # # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). + # None, + # None, + # ), + # # Truncate Transform + # ( + # [PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(10), name="int_field_trunc")], + # [12345], + # Record(int_field_trunc=12340), + # "int_field_trunc=12340", + # f"""CREATE TABLE {identifier} ( + # int_field int, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(int_field, 10) -- Truncating 'int_field' integer column to a width of 10 + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (12345, 'Sample data for int'); + # """, + # ), + # ( + # [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], + # [2**32 + 1], + # Record(bigint_field_trunc=2**32), # 4294967296 + # "bigint_field_trunc=4294967296", + # f"""CREATE TABLE {identifier} ( + # bigint_field bigint, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(bigint_field, 2) -- Truncating 'bigint_field' long column to a width of 2 + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (4294967297, 'Sample data for long'); + # """, + # ), + # ( + # [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], + # ["abcdefg"], + # Record(string_field_trunc="abc"), + # "string_field_trunc=abc", + # f"""CREATE TABLE {identifier} ( + # string_field string, + # another_string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(string_field, 3) -- Truncating 'string_field' string column to a length of 3 characters + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # ('abcdefg', 'Another sample for string'); + # """, + # ), ( - [PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year")], - [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_year=(2023 - 1970)), - "timestamp_field_year=2023", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - year(timestamp_field) -- Partitioning by year from 'timestamp_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); - """, - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], - [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], - Record(timestamptz_field_year=53), - "timestamptz_field_year=2023", - # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). - None, - None, - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], - [date(2023, 1, 1)], - Record(date_field_year=(2023 - 1970)), - "date_field_year=2023", + [PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], + [Decimal('678.93')], + Record(decimal_field_trunc=Decimal('678.90')), + "decimal_field_trunc=678.90", # Assuming truncation width of 1 leads to truncating to 670 f"""CREATE TABLE {identifier} ( - date_field date, + decimal_field decimal(5,2), string_field string ) USING iceberg PARTITIONED BY ( - year(date_field) -- Partitioning by year from 'date_field' + truncate(decimal_field, 2) ) """, f"""INSERT INTO {identifier} VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - # # Day Transform - ( - [PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_field_day")], - [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_day=19358), - "timestamp_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - day(timestamp_field) -- Partitioning by day from 'timestamp_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], - [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], - Record(timestamptz_field_day=19358), - "timestamptz_field_day=2023-01-01", - # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). - None, - None, - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], - [date(2023, 1, 1)], - Record(date_field_day=19358), - "date_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - day(date_field) -- Partitioning by day from 'date_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - # Hour Transform - ( - [PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_field_hour")], - [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_hour=464603), - "timestamp_field_hour=2023-01-01-11", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - hour(timestamp_field) -- Partitioning by hour from 'timestamp_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event within the 11th hour of 2023-01-01'); - """, - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], - [datetime(2023, 1, 1, 11, 55, 59, 999999, tzinfo=pytz.timezone('America/New_York'))], - Record(timestamptz_field_hour=464608), # 464608 = 464603 + 5, new york winter day light saving time - "timestamptz_field_hour=2023-01-01-16", - # Spark does not support timestamptz type, so skip justification (compare expected value with spark behavior). - None, - None, - ), - # Truncate Transform - ( - [PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(10), name="int_field_trunc")], - [12345], - Record(int_field_trunc=12340), - "int_field_trunc=12340", - f"""CREATE TABLE {identifier} ( - int_field int, - string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(int_field, 10) -- Truncating 'int_field' integer column to a width of 10 - ) - """, - f"""INSERT INTO {identifier} - VALUES - (12345, 'Sample data for int'); - """, - ), - ( - [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], - [2**32 + 1], - Record(bigint_field_trunc=2**32), # 4294967296 - "bigint_field_trunc=4294967296", - f"""CREATE TABLE {identifier} ( - bigint_field bigint, - other_data string - ) - USING iceberg - PARTITIONED BY ( - truncate(bigint_field, 2) -- Truncating 'bigint_field' long column to a width of 2 - ) - """, - f"""INSERT INTO {identifier} - VALUES - (4294967297, 'Sample data for long'); - """, - ), - ( - [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], - ["abcdefg"], - Record(string_field_trunc="abc"), - "string_field_trunc=abc", - f"""CREATE TABLE {identifier} ( - string_field string, - other_data string - ) - USING iceberg - PARTITIONED BY ( - truncate(string_field, 3) -- Truncating 'string_field' string column to a length of 3 characters - ) - """, - f"""INSERT INTO {identifier} - VALUES - ('abcdefg', 'Another sample for string'); - """, - ), - # it seems the transform.tohumanstring does take a bytes type which means i do not need to do extra conversion in iceberg_typed_value() for BinaryType - ( - [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], - [b'HELLOICEBERG'], - Record(binary_field_trunc=b'HELLOICEBE'), - "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", - f"""CREATE TABLE {identifier} ( - binary_field binary, - other_data string - ) - USING iceberg - PARTITIONED BY ( - truncate(binary_field, 10) -- Truncating 'binary_field' binary column to a length of 10 bytes - ) - """, - f"""INSERT INTO {identifier} - VALUES - (binary('HELLOICEBERG'), 'Sample data for binary'); - """, - ), - # Bucket Transform - ( - [PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_field_bucket")], - [10], - Record(int_field_bucket=0), - "int_field_bucket=0", - f"""CREATE TABLE {identifier} ( - int_field int, - other_data string - ) - USING iceberg - PARTITIONED BY ( - bucket(2, int_field) -- Distributing 'int_field' across 2 buckets - ) - """, - f"""INSERT INTO {identifier} - VALUES - (10, 'Integer with value 10'); - """, - ), - # Test multiple field combinations could generate the Partition record and hive partition path correctly - ( - [ - PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year"), - PartitionField(source_id=10, field_id=1002, transform=DayTransform(), name="date_field_day"), - ], - [ - datetime(2023, 1, 1, 11, 55, 59, 999999), - date(2023, 1, 1), - ], - Record(timestamp_field_year=53, date_field_day=19358), - "timestamp_field_year=2023/date_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - year(timestamp_field), - day(date_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), CAST('2023-01-01' AS DATE), 'some data'); + (678.90, 'Associated string value for decimal 678.90') """, ), + # ( + # [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], + # [b'HELLOICEBERG'], + # Record(binary_field_trunc=b'HELLOICEBE'), + # "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", + # f"""CREATE TABLE {identifier} ( + # binary_field binary, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(binary_field, 10) -- Truncating 'binary_field' binary column to a length of 10 bytes + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (binary('HELLOICEBERG'), 'Sample data for binary'); + # """, + # ), + # # Bucket Transform + # ( + # [PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_field_bucket")], + # [10], + # Record(int_field_bucket=0), + # "int_field_bucket=0", + # f"""CREATE TABLE {identifier} ( + # int_field int, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # bucket(2, int_field) -- Distributing 'int_field' across 2 buckets + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (10, 'Integer with value 10'); + # """, + # ), + # # Test multiple field combinations could generate the Partition record and hive partition path correctly + # ( + # [ + # PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year"), + # PartitionField(source_id=10, field_id=1002, transform=DayTransform(), name="date_field_day"), + # ], + # [ + # datetime(2023, 1, 1, 11, 55, 59, 999999), + # date(2023, 1, 1), + # ], + # Record(timestamp_field_year=53, date_field_day=19358), + # "timestamp_field_year=2023/date_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(timestamp_field), + # day(date_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), CAST('2023-01-01' AS DATE), 'some data'); + # """, + # ), ], ) @pytest.mark.integration def test_partition_key( session_catalog: Catalog, spark: SparkSession, - partition_fields: list[PartitionField], - partition_values: list[Any], + partition_fields: List[PartitionField], + partition_values: List[Any], expected_partition_record: Record, expected_hive_partition_path_slice: str, spark_create_table_sql_for_justification: str, @@ -653,16 +687,14 @@ def test_partition_key( partition_spec=spec, schema=TABLE_SCHEMA, ) - # print(f"{key.partition=}") - # print(f"{key.to_path()=}") - # this affects the metadata in DataFile and all above layers + print(f"{key.partition=}") + print(f"{key.to_path()=}") + # key.partition is used to write the metadata in DataFile, ManifestFile and all above layers assert key.partition == expected_partition_record - # this affects the hive partitioning part in the parquet file path + # key.to_path() generates the hive partitioning part of the to-write parquet file path assert key.to_path() == expected_hive_partition_path_slice - from pyspark.sql.utils import AnalysisException - - # verify expected values are not made up but conform to spark behaviors + # Justify expected values are not made up but conform to spark behaviors if spark_create_table_sql_for_justification is not None and spark_data_insert_sql_for_justification is not None: try: spark.sql(f"drop table {identifier}") @@ -675,9 +707,13 @@ def test_partition_key( iceberg_table = session_catalog.load_table(identifier=identifier) snapshot = iceberg_table.current_snapshot() assert snapshot - verify_partition = snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition - verify_path = snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path - # print(f"{verify_partition=}") - # print(f"{verify_path=}") - assert verify_partition == expected_partition_record - assert expected_hive_partition_path_slice in verify_path + spark_partition_for_justification = ( + snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition + ) + spark_path_for_justification = ( + snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path + ) + print(f"{spark_partition_for_justification=}") + print(f"{spark_path_for_justification=}") + assert spark_partition_for_justification == expected_partition_record + assert expected_hive_partition_path_slice in spark_path_for_justification