diff --git a/docs/concepts/models/overview.md b/docs/concepts/models/overview.md index 52a2281b6..9e0e40c13 100644 --- a/docs/concepts/models/overview.md +++ b/docs/concepts/models/overview.md @@ -258,8 +258,13 @@ Learn more about these properties and their default values in the [model configu ### depends_on : Depends on explicitly specifies the models on which the model depends, in addition to the ones automatically inferred by from the model code. +### table_format +: Table format is an optional property for engines that support table formats like `iceberg` and `hive` where the physical file format is configurable. The intention is to define the table type using `table_format` and then the on-disk format of the files within the table using `storage_format`. + + Note that this property only implemented for engines that allow the `table_format` to be configured independently of the `storage_format`. + ### storage_format -: Storage format is a property for engines such as Spark or Hive that support storage formats such as `parquet` and `orc`. +: Storage format is a property for engines such as Spark or Hive that support storage formats such as `parquet` and `orc`. Note that some engines dont make a distinction between `table_format` and `storage_format`, in which case `storage_format` is used and `table_format` is ignored. ### partitioned_by : Partitioned by plays two roles. For most model kinds, it is an optional property for engines that support table partitioning such as Spark or BigQuery. diff --git a/docs/integrations/engines/athena.md b/docs/integrations/engines/athena.md index fbf4fef77..9bd2baa60 100644 --- a/docs/integrations/engines/athena.md +++ b/docs/integrations/engines/athena.md @@ -36,12 +36,18 @@ These options are specific to SQLMesh itself and are not passed to PyAthena ## Model properties +The Athena adapter utilises the following model top-level [properties](../../concepts/models/overview.md#model-properties): + +| Name | Description | Type | Required | +|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|----------| +| `table_format` | Sets the [table_type](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties) Athena uses when creating the table. Valid values are `hive` or `iceberg`. | string | N | +| `storage_format` | Configures the file format to be used by the `table_format`. For Hive tables, this sets the [STORED AS](https://docs.aws.amazon.com/athena/latest/ug/create-table.html#parameters) option. For Iceberg tables, this sets [format](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties) property. | string | N | + The Athena adapter recognises the following model [physical_properties](../../concepts/models/overview.md#physical_properties): | Name | Description | Type | Default | |-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|---------| | `s3_base_location`| `s3://` base URI of where the snapshot tables for this model should be written. Overrides `s3_warehouse_location` if one is configured. | string | | -| `table_type` | Sets the [table_type](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties) Athena uses when creating the table. Valid values are `hive` or `iceberg`. | string | `hive` | ## S3 Locations @@ -62,6 +68,6 @@ Consequently, any SQLMesh model types that needs to delete or merge data from ex However, Athena does support [Apache Iceberg](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html) tables which allow a full range of operations. These can be used for more complex model types such as [`INCREMENTAL_BY_UNIQUE_KEY`](../../concepts/models/model_kinds.md#incremental_by_unique_key) and [`SCD_TYPE_2`](../../concepts/models/model_kinds.md#scd-type-2). -To use an Iceberg table for a model, set `table_type='iceberg'` in the model [physical_properties](../../concepts/models/overview.md#physical_properties). +To use an Iceberg table for a model, set `table_format iceberg` in the model [properties](../../concepts/models/overview.md#model-properties). In general, Iceberg tables offer the most flexibility and you'll run into the least SQLMesh limitations when using them. However, we create Hive tables by default because Athena creates Hive tables by default, so Iceberg tables are opt-in rather than opt-out. \ No newline at end of file diff --git a/docs/reference/model_configuration.md b/docs/reference/model_configuration.md index 8f1d9b479..b9d0fdaf5 100644 --- a/docs/reference/model_configuration.md +++ b/docs/reference/model_configuration.md @@ -11,7 +11,7 @@ Configuration options for SQLMesh model properties. Supported by all model kinds | Option | Description | Type | Required | |-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-----------------:|:--------:| | `name` | The model name. Must include at least a qualifying schema (`.`) and may include a catalog (`..`). Can be omitted if [infer_names](#model-naming) is set to true. | str | N | -| `project` | The name of the project the model belongs to - used in multi-repo deployments | str | N | +| `project` | The name of the project the model belongs to - used in multi-repo deployments | str | N | | `kind` | The model kind ([Additional Details](#model-kind-properties)). (Default: `VIEW`) | str \| dict | N | | `audits` | SQLMesh [audits](../concepts/audits.md) that should run against the model's output | array[str] | N | | `dialect` | The SQL dialect in which the model's query is written. All SQL dialects [supported by the SQLGlot library](https://github.com/tobymao/sqlglot/blob/main/sqlglot/dialects/dialect.py) are allowed. | str | N | @@ -26,14 +26,15 @@ Configuration options for SQLMesh model properties. Supported by all model kinds | `column_descriptions` | A key-value mapping of column names to column comments that will be registered in the SQL engine's table COMMENT field (if supported by the engine). Specified as key-value pairs (`column_name = 'column comment'`). If present, [inline column comments](../concepts/models/overview.md#inline-column-comments) will not be registered in the SQL engine. | dict | N | | `grains` | The column(s) whose combination uniquely identifies each row in the model | str \| array[str] | N | | `references` | The model column(s) used to join to other models' grains | str \| array[str] | N | -| `depends_on` | Models on which this model depends, in addition to the ones inferred from the model's query. (Default: dependencies inferred from model code) | array[str] | N | -| `storage_format` | The storage format that should be used to store physical tables; only applicable to engines such as Spark | str | N | +| `depends_on` | Models on which this model depends, in addition to the ones inferred from the model's query. (Default: dependencies inferred from model code) | array[str] | N | +| `table_format` | The table format that should be used to manage the physical files (eg `iceberg`, `hive`, `delta`); only applicable to engines such as Spark and Athena | str | N | +| `storage_format` | The storage format that should be used to store physical files (eg `parquet`, `orc`); only applicable to engines such as Spark and Athena | str | N | | `partitioned_by` | The column(s) and/or column expressions used define a model's partitioning key. Required for the `INCREMENTAL_BY_PARTITION` model kind. Optional for all other model kinds; used to partition the model's physical table in engines that support partitioning. | str \| array[str] | N | | `clustered_by` | The column(s) used to cluster the model's physical table; only applicable to engines that support clustering | str | N | | `columns` | The column names and data types returned by the model. Disables [automatic inference of column names and types](../concepts/models/overview.md#conventions) from the SQL query. | array[str] | N | | `physical_properties` | A key-value mapping of arbitrary properties specific to the target engine that are applied to the model table / view in the physical layer. Specified as key-value pairs (`key = value`). | dict | N | | `virtual_properties` | A key-value mapping of arbitrary properties specific to the target engine that are applied to the model view in the virtual layer. Specified as key-value pairs (`key = value`). | dict | N | -| `session_properties` | A key-value mapping of arbitrary properties specific to the target engine that are applied to the engine session. Specified as key-value pairs (`key = value`). | dict | N | +| `session_properties` | A key-value mapping of arbitrary properties specific to the target engine that are applied to the engine session. Specified as key-value pairs (`key = value`). | dict | N | | `allow_partials` | Whether this model can process partial (incomplete) data intervals | bool | N | | `enabled` | Whether the model is enabled. This attribute is `true` by default. Setting it to `false` causes SQLMesh to ignore this model when loading the project. | bool | N | diff --git a/sqlmesh/core/config/model.py b/sqlmesh/core/config/model.py index 31f9a7467..ec8d2aaaf 100644 --- a/sqlmesh/core/config/model.py +++ b/sqlmesh/core/config/model.py @@ -27,8 +27,10 @@ class ModelDefaultsConfig(BaseConfig): start: The earliest date that the model will be backfilled for. If this is None, then the date is inferred by taking the most recent start date of its ancestors. The start date can be a static datetime or a relative datetime like "1 year ago" + table_format: The table format used to manage the physical table files defined by `storage_format`, only applicable in certain engines. + (eg, 'iceberg', 'delta', 'hudi') storage_format: The storage format used to store the physical table, only applicable in certain engines. - (eg. 'parquet') + (eg. 'parquet', 'orc') on_destructive_change: What should happen when a forward-only model requires a destructive schema change. audits: The audits to be applied globally to all models in the project. """ @@ -38,6 +40,7 @@ class ModelDefaultsConfig(BaseConfig): cron: t.Optional[str] = None owner: t.Optional[str] = None start: t.Optional[TimeLike] = None + table_format: t.Optional[str] = None storage_format: t.Optional[str] = None on_destructive_change: t.Optional[OnDestructiveChange] = None session_properties: t.Optional[t.Dict[str, t.Any]] = None diff --git a/sqlmesh/core/engine_adapter/athena.py b/sqlmesh/core/engine_adapter/athena.py index 0f23cf1be..4058ca5e7 100644 --- a/sqlmesh/core/engine_adapter/athena.py +++ b/sqlmesh/core/engine_adapter/athena.py @@ -66,10 +66,8 @@ def create_state_table( table_name, columns_to_types, primary_key=primary_key, - table_properties={ - # it's painfully slow, but it works - "table_type": exp.Literal.string("iceberg") - }, + # it's painfully slow, but it works + table_format="iceberg", ) def _get_data_objects( @@ -189,7 +187,7 @@ def _build_create_table_exp( **kwargs, ) - is_hive = self._table_type(table_properties) == "hive" + is_hive = self._table_type(kwargs.get("table_format", None)) == "hive" # Filter any PARTITIONED BY properties from the main column list since they cant be specified in both places # ref: https://docs.aws.amazon.com/athena/latest/ug/partitions.html @@ -214,6 +212,7 @@ def _build_create_table_exp( def _build_table_properties_exp( self, catalog_name: t.Optional[str] = None, + table_format: t.Optional[str] = None, storage_format: t.Optional[str] = None, partitioned_by: t.Optional[t.List[exp.Expression]] = None, partition_interval_unit: t.Optional[IntervalUnit] = None, @@ -229,7 +228,7 @@ def _build_table_properties_exp( properties: t.List[exp.Expression] = [] table_properties = table_properties or {} - is_hive = self._table_type(table_properties) == "hive" + is_hive = self._table_type(table_format) == "hive" is_iceberg = not is_hive if is_hive and not expression: @@ -237,6 +236,11 @@ def _build_table_properties_exp( # Unless it's a CTAS, those are always CREATE TABLE properties.append(exp.ExternalProperty()) + if table_format: + properties.append( + exp.Property(this=exp.var("table_type"), value=exp.Literal.string(table_format)) + ) + if table_description: properties.append(exp.SchemaCommentProperty(this=exp.Literal.string(table_description))) @@ -297,15 +301,12 @@ def _truncate_table(self, table_name: TableName) -> None: self.execute(f"DELETE FROM {table.sql(dialect=self.dialect, identify=True)}") def _table_type( - self, table_properties: t.Optional[t.Dict[str, exp.Expression]] = None + self, table_type: t.Optional[str] = None ) -> t.Union[t.Literal["hive"], t.Literal["iceberg"]]: - """ - Use the user-specified table_properties to figure out of this is a Hive or an Iceberg table - """ + if table_type and table_type.lower() == "iceberg": + return "iceberg" + # if we cant detect any indication of Iceberg, this is a Hive table - if table_properties and (table_type := table_properties.get("table_type", None)): - if "iceberg" in table_type.sql(dialect=self.dialect).lower(): - return "iceberg" return "hive" @lru_cache() @@ -392,12 +393,3 @@ def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression] raise SQLMeshError("Cannot delete from non-empty Hive table") return None - - def _drop_object( - self, - name: TableName | SchemaName, - exists: bool = True, - kind: str = "TABLE", - **drop_args: t.Any, - ) -> None: - return super()._drop_object(name, exists=exists, kind=kind, **drop_args) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 152a24174..e2be37ad8 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -2078,6 +2078,7 @@ def _build_clustered_by_exp( def _build_table_properties_exp( self, catalog_name: t.Optional[str] = None, + table_format: t.Optional[str] = None, storage_format: t.Optional[str] = None, partitioned_by: t.Optional[t.List[exp.Expression]] = None, partition_interval_unit: t.Optional[IntervalUnit] = None, diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 2324bbdbd..9a7e547fe 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -599,6 +599,7 @@ def _build_partitioned_by_exp( def _build_table_properties_exp( self, catalog_name: t.Optional[str] = None, + table_format: t.Optional[str] = None, storage_format: t.Optional[str] = None, partitioned_by: t.Optional[t.List[exp.Expression]] = None, partition_interval_unit: t.Optional[IntervalUnit] = None, diff --git a/sqlmesh/core/engine_adapter/clickhouse.py b/sqlmesh/core/engine_adapter/clickhouse.py index ba4eb08e8..3e1f615e5 100644 --- a/sqlmesh/core/engine_adapter/clickhouse.py +++ b/sqlmesh/core/engine_adapter/clickhouse.py @@ -373,6 +373,7 @@ def _build_settings_property( def _build_table_properties_exp( self, catalog_name: t.Optional[str] = None, + table_format: t.Optional[str] = None, storage_format: t.Optional[str] = None, partitioned_by: t.Optional[t.List[exp.Expression]] = None, partition_interval_unit: t.Optional[IntervalUnit] = None, diff --git a/sqlmesh/core/engine_adapter/mixins.py b/sqlmesh/core/engine_adapter/mixins.py index 90e274dcb..4da61f46b 100644 --- a/sqlmesh/core/engine_adapter/mixins.py +++ b/sqlmesh/core/engine_adapter/mixins.py @@ -149,6 +149,7 @@ def _build_partitioned_by_exp( def _build_table_properties_exp( self, catalog_name: t.Optional[str] = None, + table_format: t.Optional[str] = None, storage_format: t.Optional[str] = None, partitioned_by: t.Optional[t.List[exp.Expression]] = None, partition_interval_unit: t.Optional[IntervalUnit] = None, @@ -160,7 +161,15 @@ def _build_table_properties_exp( ) -> t.Optional[exp.Properties]: properties: t.List[exp.Expression] = [] - if storage_format: + if table_format and self.dialect == "spark": + properties.append(exp.FileFormatProperty(this=exp.Var(this=table_format))) + if storage_format: + properties.append( + exp.Property( + this="write.format.default", value=exp.Literal.string(storage_format) + ) + ) + elif storage_format: properties.append(exp.FileFormatProperty(this=exp.Var(this=storage_format))) if partitioned_by: diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index 0a79fe967..d7f2bdb84 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -157,6 +157,7 @@ def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None def _build_table_properties_exp( self, catalog_name: t.Optional[str] = None, + table_format: t.Optional[str] = None, storage_format: t.Optional[str] = None, partitioned_by: t.Optional[t.List[exp.Expression]] = None, partition_interval_unit: t.Optional[IntervalUnit] = None, diff --git a/sqlmesh/core/model/definition.py b/sqlmesh/core/model/definition.py index 88306c634..c052be51a 100644 --- a/sqlmesh/core/model/definition.py +++ b/sqlmesh/core/model/definition.py @@ -107,8 +107,10 @@ class _Model(ModelMeta, frozen=True): end: The date that the model will be backfilled up until. Follows the same syntax as 'start', should be omitted if there is no end date. lookback: The number of previous incremental intervals in the lookback window. + table_format: The table format used to manage the physical table files defined by `storage_format`, only applicable in certain engines. + (eg, 'iceberg', 'delta', 'hudi') storage_format: The storage format used to store the physical table, only applicable in certain engines. - (eg. 'parquet') + (eg. 'parquet', 'orc') partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) clustered_by: The cluster columns, only applicable in certain engines. (eg. (ds, hour)) python_env: Dictionary containing all global variables needed to render the model's macros. @@ -821,6 +823,7 @@ def _data_hash_values(self) -> t.List[str]: [(k, v) for k, v in self.sorted_python_env if not v.is_metadata] ), *self.kind.data_hash_values, + self.table_format, self.storage_format, str(self.lookback), *(gen(expr) for expr in (self.partitioned_by or [])), diff --git a/sqlmesh/core/model/meta.py b/sqlmesh/core/model/meta.py index 2b9f64701..3c4f705ea 100644 --- a/sqlmesh/core/model/meta.py +++ b/sqlmesh/core/model/meta.py @@ -57,6 +57,7 @@ class ModelMeta(_Node): name: str kind: ModelKind = ViewKind() retention: t.Optional[int] = None # not implemented yet + table_format: t.Optional[str] = None storage_format: t.Optional[str] = None partitioned_by_: t.List[exp.Expression] = Field(default=[], alias="partitioned_by") clustered_by: t.List[str] = [] @@ -151,9 +152,9 @@ def _normalize(value: t.Any) -> t.Any: return v - @field_validator("storage_format", mode="before") + @field_validator("table_format", "storage_format", mode="before") @field_validator_v1_args - def _storage_format_validator(cls, v: t.Any, values: t.Dict[str, t.Any]) -> t.Optional[str]: + def _format_validator(cls, v: t.Any, values: t.Dict[str, t.Any]) -> t.Optional[str]: if isinstance(v, exp.Expression) and not (isinstance(v, (exp.Literal, exp.Identifier))): return v.sql(values.get("dialect")) return str_or_exp_to_str(v) @@ -329,6 +330,18 @@ def _pre_root_validator(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: @model_validator_v1_args def _root_validator(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: values = cls._kind_validator(values) + + # needs to be in a mode=after model validator so that the field validators have run to convert from Expression -> str + if (storage_format := values.get("storage_format")) and storage_format.lower() in { + "iceberg", + "hive", + "hudi", + "delta", + }: + logger.warning( + f"Model {values['name']} has `storage_format` set to a table format '{storage_format}' which is deprecated. Please use the `table_format` property instead" + ) + return values @classmethod diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 47439ce43..b25ee6482 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1052,6 +1052,7 @@ def _replace_query_for_model(self, model: Model, name: str, query_or_df: QueryOr name, query_or_df, columns_to_types=model.columns_to_types if model.annotated else None, + table_format=model.table_format, storage_format=model.storage_format, partitioned_by=model.partitioned_by, partition_interval_unit=model.interval_unit, @@ -1180,6 +1181,7 @@ def create( self.adapter.create_table( table_name, columns_to_types=model.columns_to_types_or_raise, + table_format=model.table_format, storage_format=model.storage_format, partitioned_by=model.partitioned_by, partition_interval_unit=model.interval_unit, @@ -1202,6 +1204,7 @@ def create( table_name, ctas_query, model.columns_to_types, + table_format=model.table_format, storage_format=model.storage_format, partitioned_by=model.partitioned_by, partition_interval_unit=model.interval_unit, @@ -1408,6 +1411,7 @@ def create( self.adapter.create_table( table_name, columns_to_types=columns_to_types, + table_format=model.table_format, storage_format=model.storage_format, partitioned_by=model.partitioned_by, partition_interval_unit=model.interval_unit, diff --git a/sqlmesh/dbt/basemodel.py b/sqlmesh/dbt/basemodel.py index 71f1d6f40..aa963d03b 100644 --- a/sqlmesh/dbt/basemodel.py +++ b/sqlmesh/dbt/basemodel.py @@ -103,6 +103,7 @@ class BaseModelConfig(GeneralConfig): # sqlmesh fields owner: t.Optional[str] = None stamp: t.Optional[str] = None + table_format: t.Optional[str] = None storage_format: t.Optional[str] = None path: Path = Path() dependencies: Dependencies = Dependencies() diff --git a/sqlmesh/migrations/v0057_add_table_format.py b/sqlmesh/migrations/v0057_add_table_format.py new file mode 100644 index 000000000..e34b6a4a5 --- /dev/null +++ b/sqlmesh/migrations/v0057_add_table_format.py @@ -0,0 +1,5 @@ +"""Add table_format to the model top-level properties""" + + +def migrate(state_sync, **kwargs): # type: ignore + pass diff --git a/tests/core/engine_adapter/test_athena.py b/tests/core/engine_adapter/test_athena.py index b7c161be6..8ba1266b9 100644 --- a/tests/core/engine_adapter/test_athena.py +++ b/tests/core/engine_adapter/test_athena.py @@ -149,9 +149,9 @@ def test_create_table_iceberg(adapter: AthenaEngineAdapter) -> None: name test_table, kind FULL, partitioned_by (colc, bucket(16, cola)), + table_format iceberg, storage_format parquet, physical_properties ( - table_type = 'iceberg', s3_base_location = 's3://foo' ) ); @@ -166,6 +166,7 @@ def test_create_table_iceberg(adapter: AthenaEngineAdapter) -> None: columns_to_types=model.columns_to_types_or_raise, table_properties=model.physical_properties, partitioned_by=model.partitioned_by, + table_format=model.table_format, storage_format=model.storage_format, ) @@ -227,11 +228,11 @@ def test_ctas_iceberg(adapter: AthenaEngineAdapter): table_name="foo.bar", columns_to_types={"a": exp.DataType.build("int")}, query_or_df=parse_one("select 1", into=exp.Select), - table_properties={"table_type": exp.Literal.string("iceberg")}, + table_format="iceberg", ) assert to_sql_calls(adapter) == [ - 'CREATE TABLE IF NOT EXISTS "foo"."bar" WITH (location=\'s3://bucket/prefix/foo/bar/\', is_external=false, table_type=\'iceberg\') AS SELECT CAST("a" AS INTEGER) AS "a" FROM (SELECT 1) AS "_subquery"' + 'CREATE TABLE IF NOT EXISTS "foo"."bar" WITH (table_type=\'iceberg\', location=\'s3://bucket/prefix/foo/bar/\', is_external=false) AS SELECT CAST("a" AS INTEGER) AS "a" FROM (SELECT 1) AS "_subquery"' ] diff --git a/tests/core/engine_adapter/test_integration.py b/tests/core/engine_adapter/test_integration.py index 22b472d86..4b381ff44 100644 --- a/tests/core/engine_adapter/test_integration.py +++ b/tests/core/engine_adapter/test_integration.py @@ -181,13 +181,6 @@ def table(self, table_name: str, schema: str = TEST_SCHEMA) -> exp.Table: ) ) - def physical_properties( - self, properties_for_dialect: t.Dict[str, t.Dict[str, str | exp.Expression]] - ) -> t.Dict[str, exp.Expression]: - if props := properties_for_dialect.get(self.dialect): - return {k: exp.Literal.string(v) if isinstance(v, str) else v for k, v in props.items()} - return {} - def schema(self, schema_name: str, catalog_name: t.Optional[str] = None) -> str: return exp.table_name( normalize_model_name( @@ -1327,15 +1320,11 @@ def test_merge(ctx: TestContext): ctx.init() table = ctx.table("test_table") - table_properties = ctx.physical_properties( - { - # Athena only supports MERGE on Iceberg tables - # And it cant fall back to a logical merge on Hive tables because it cant delete records - "athena": {"table_type": "iceberg"} - } - ) + # Athena only supports MERGE on Iceberg tables + # And it cant fall back to a logical merge on Hive tables because it cant delete records + table_format = "iceberg" if ctx.dialect == "athena" else None - ctx.engine_adapter.create_table(table, ctx.columns_to_types, table_properties=table_properties) + ctx.engine_adapter.create_table(table, ctx.columns_to_types, table_format=table_format) input_data = pd.DataFrame( [ {"id": 1, "ds": "2022-01-01"}, @@ -1405,13 +1394,11 @@ def test_scd_type_2_by_time(ctx: TestContext): input_schema = { k: v for k, v in ctx.columns_to_types.items() if k not in ("valid_from", "valid_to") } - table_properties = ctx.physical_properties( - { - # Athena only supports the operations required for SCD models on Iceberg tables - "athena": {"table_type": "iceberg"} - } - ) - ctx.engine_adapter.create_table(table, ctx.columns_to_types, table_properties=table_properties) + + # Athena only supports the operations required for SCD models on Iceberg tables + table_format = "iceberg" if ctx.dialect == "athena" else None + + ctx.engine_adapter.create_table(table, ctx.columns_to_types, table_format=table_format) input_data = pd.DataFrame( [ {"id": 1, "name": "a", "updated_at": "2022-01-01 00:00:00"}, @@ -1429,7 +1416,7 @@ def test_scd_type_2_by_time(ctx: TestContext): execution_time="2023-01-01 00:00:00", updated_at_as_valid_from=False, columns_to_types=input_schema, - table_properties=table_properties, + table_format=table_format, ) results = ctx.get_metadata_results() assert len(results.views) == 0 @@ -1490,7 +1477,7 @@ def test_scd_type_2_by_time(ctx: TestContext): execution_time="2023-01-05 00:00:00", updated_at_as_valid_from=False, columns_to_types=input_schema, - table_properties=table_properties, + table_format=table_format, ) results = ctx.get_metadata_results() assert len(results.views) == 0 @@ -1556,13 +1543,11 @@ def test_scd_type_2_by_column(ctx: TestContext): input_schema = { k: v for k, v in ctx.columns_to_types.items() if k not in ("valid_from", "valid_to") } - table_properties = ctx.physical_properties( - { - # Athena only supports the operations required for SCD models on Iceberg tables - "athena": {"table_type": "iceberg"} - } - ) - ctx.engine_adapter.create_table(table, ctx.columns_to_types, table_properties=table_properties) + + # Athena only supports the operations required for SCD models on Iceberg tables + table_format = "iceberg" if ctx.dialect == "athena" else None + + ctx.engine_adapter.create_table(table, ctx.columns_to_types, table_format=table_format) input_data = pd.DataFrame( [ {"id": 1, "name": "a", "status": "active"}, @@ -1795,14 +1780,10 @@ def test_truncate_table(ctx: TestContext): ctx.init() table = ctx.table("test_table") - table_properties = ctx.physical_properties( - { - # Athena only supports TRUNCATE (DELETE FROM ) on Iceberg tables - "athena": {"table_type": "iceberg"} - } - ) + # Athena only supports TRUNCATE (DELETE FROM
) on Iceberg tables + table_format = "iceberg" if ctx.dialect == "athena" else None - ctx.engine_adapter.create_table(table, ctx.columns_to_types, table_properties=table_properties) + ctx.engine_adapter.create_table(table, ctx.columns_to_types, table_format=table_format) input_data = pd.DataFrame( [ {"id": 1, "ds": "2022-01-01"}, @@ -1943,10 +1924,10 @@ def test_sushi(mark_gateway: t.Tuple[str, str], ctx: TestContext): # Athena needs models that get mutated after creation to be using Iceberg if ctx.dialect == "athena": - for model_name in {"sushi.customer_revenue_lifetime"}: - context.get_model(model_name).physical_properties["table_type"] = exp.Literal.string( - "iceberg" - ) + for model_name in {"customer_revenue_lifetime"}: + model_key = next(k for k in context._models if model_name in k) + model = context._models[model_key].copy(update={"table_format": "iceberg"}) + context._models.update({model_key: model}) plan: Plan = context.plan( environment="test_prod", @@ -2470,10 +2451,10 @@ def _mutate_config(current_gateway_name: str, config: Config): create_sql_model(name=f"{schema}.seed_model", query=seed_query, kind="FULL") ) - physical_properties = "" + table_format = "" if ctx.dialect == "athena": # INCREMENTAL_BY_UNIQUE_KEY uses MERGE which is only supported in Athena on Iceberg tables - physical_properties = "physical_properties (table_type = 'iceberg')," + table_format = "table_format iceberg," context.upsert_model( load_sql_based_model( @@ -2484,7 +2465,7 @@ def _mutate_config(current_gateway_name: str, config: Config): unique_key item_id, batch_size 1 ), - {physical_properties} + {table_format} start '2020-01-01', end '2020-01-07', cron '@daily' diff --git a/tests/core/engine_adapter/test_spark.py b/tests/core/engine_adapter/test_spark.py index 2b303b9bd..a4ea97006 100644 --- a/tests/core/engine_adapter/test_spark.py +++ b/tests/core/engine_adapter/test_spark.py @@ -12,10 +12,18 @@ from sqlmesh.core.engine_adapter import SparkEngineAdapter from sqlmesh.utils.errors import SQLMeshError from tests.core.engine_adapter import to_sql_calls +import sqlmesh.core.dialect as d +from sqlmesh.core.model import load_sql_based_model +from sqlmesh.core.model.definition import SqlModel pytestmark = [pytest.mark.engine, pytest.mark.spark] +@pytest.fixture +def adapter(make_mocked_engine_adapter: t.Callable) -> SparkEngineAdapter: + return make_mocked_engine_adapter(SparkEngineAdapter) + + def test_create_table_properties(make_mocked_engine_adapter: t.Callable): adapter = make_mocked_engine_adapter(SparkEngineAdapter) @@ -987,3 +995,61 @@ def test_replace_query_with_wap_self_reference( "INSERT OVERWRITE TABLE `catalog`.`schema`.`table`.`branch_wap_12345` (`a`) SELECT 1 AS `a` FROM `catalog`.`schema`.`temp_branch_wap_12345_abcdefgh`", "DROP TABLE IF EXISTS `catalog`.`schema`.`temp_branch_wap_12345_abcdefgh`", ] + + +def test_table_format(adapter: SparkEngineAdapter, mocker: MockerFixture): + mocker.patch( + "sqlmesh.core.engine_adapter.spark.SparkEngineAdapter.table_exists", + return_value=True, + ) + + expressions = d.parse( + """ + MODEL ( + name test_table, + kind FULL, + table_format iceberg, + storage_format orc + ); + + SELECT 1::timestamp AS cola, 2::varchar as colb, 'foo' as colc; + """ + ) + model: SqlModel = t.cast(SqlModel, load_sql_based_model(expressions)) + + # both table_format and storage_format + adapter.create_table( + table_name=model.name, + columns_to_types=model.columns_to_types_or_raise, + table_format=model.table_format, + storage_format=model.storage_format, + ) + + # just table_format + adapter.create_table( + table_name=model.name, + columns_to_types=model.columns_to_types_or_raise, + table_format=model.table_format, + ) + + # just storage_format set to a table format (test for backwards compatibility) + adapter.create_table( + table_name=model.name, + columns_to_types=model.columns_to_types_or_raise, + storage_format=model.table_format, + ) + + adapter.ctas( + table_name=model.name, + query_or_df=model.query, + columns_to_types=model.columns_to_types_or_raise, + table_format=model.table_format, + storage_format=model.storage_format, + ) + + assert to_sql_calls(adapter) == [ + "CREATE TABLE IF NOT EXISTS `test_table` (`cola` TIMESTAMP, `colb` STRING, `colc` STRING) USING ICEBERG TBLPROPERTIES ('write.format.default'='orc')", + "CREATE TABLE IF NOT EXISTS `test_table` (`cola` TIMESTAMP, `colb` STRING, `colc` STRING) USING ICEBERG", + "CREATE TABLE IF NOT EXISTS `test_table` (`cola` TIMESTAMP, `colb` STRING, `colc` STRING) USING ICEBERG", + "CREATE TABLE IF NOT EXISTS `test_table` USING ICEBERG TBLPROPERTIES ('write.format.default'='orc') AS SELECT CAST(`cola` AS TIMESTAMP) AS `cola`, CAST(`colb` AS STRING) AS `colb`, CAST(`colc` AS STRING) AS `colc` FROM (SELECT CAST(1 AS TIMESTAMP) AS `cola`, CAST(2 AS STRING) AS `colb`, 'foo' AS `colc`) AS `_subquery`", + ] diff --git a/tests/core/engine_adapter/test_trino.py b/tests/core/engine_adapter/test_trino.py index 0ad7662e3..c2720164d 100644 --- a/tests/core/engine_adapter/test_trino.py +++ b/tests/core/engine_adapter/test_trino.py @@ -400,3 +400,48 @@ def test_delta_timestamps(make_mocked_engine_adapter: t.Callable): "ts_tz": ts3_tz, "ts_tz_1": ts3_tz, } + + +def test_table_format(trino_mocked_engine_adapter: TrinoEngineAdapter, mocker: MockerFixture): + adapter = trino_mocked_engine_adapter + mocker.patch( + "sqlmesh.core.engine_adapter.trino.TrinoEngineAdapter.get_current_catalog", + return_value="iceberg", + ) + + expressions = d.parse( + """ + MODEL ( + name iceberg.test_table, + kind FULL, + table_format iceberg, + storage_format orc + ); + + SELECT 1::timestamp AS cola, 2::varchar as colb, 'foo' as colc; + """ + ) + model: SqlModel = t.cast(SqlModel, load_sql_based_model(expressions)) + + adapter.create_table( + table_name=model.name, + columns_to_types=model.columns_to_types_or_raise, + table_format=model.table_format, + storage_format=model.storage_format, + ) + + adapter.ctas( + table_name=model.name, + query_or_df=t.cast(exp.Query, model.query), + columns_to_types=model.columns_to_types_or_raise, + table_format=model.table_format, + storage_format=model.storage_format, + ) + + # Trino needs to ignore the `table_format` property because to create Iceberg tables, you target an Iceberg catalog + # rather than explicitly telling it to create an Iceberg table. So this is testing that `FORMAT='ORC'` is output + # instead of `FORMAT='ICEBERG'` which would be invalid + assert to_sql_calls(adapter) == [ + 'CREATE TABLE IF NOT EXISTS "iceberg"."test_table" ("cola" TIMESTAMP, "colb" VARCHAR, "colc" VARCHAR) WITH (FORMAT=\'ORC\')', + 'CREATE TABLE IF NOT EXISTS "iceberg"."test_table" WITH (FORMAT=\'ORC\') AS SELECT CAST("cola" AS TIMESTAMP) AS "cola", CAST("colb" AS VARCHAR) AS "colb", CAST("colc" AS VARCHAR) AS "colc" FROM (SELECT CAST(1 AS TIMESTAMP) AS "cola", CAST(2 AS VARCHAR) AS "colb", \'foo\' AS "colc") AS "_subquery"', + ] diff --git a/tests/core/test_model.py b/tests/core/test_model.py index c07f3c73a..8dd74f063 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -113,6 +113,7 @@ def test_load(assert_exp_eq): assert model.name == "db.table" assert model.owner == "owner_name" assert model.dialect == "spark" + assert model.table_format is None assert model.storage_format == "iceberg" assert [col.sql() for col in model.partitioned_by] == ['"a"', '"d"'] assert model.clustered_by == ["e"] @@ -3921,7 +3922,7 @@ def test_when_matched_multiple(): unique_key name, when_matched WHEN MATCHED AND source.x = 1 THEN UPDATE SET target.salary = COALESCE(source.salary, target.salary), WHEN MATCHED THEN UPDATE SET target.salary = COALESCE(source.salary, target.salary) - + ) ); SELECT 'name' AS name, 1 AS salary; @@ -3950,7 +3951,7 @@ def test_default_catalog_sql(assert_exp_eq): The system is not designed to actually support having an engine that doesn't support default catalog to start supporting it or the reverse of that. If that did happen then bugs would occur. """ - HASH_WITH_CATALOG = "3198762995" + HASH_WITH_CATALOG = "1833514724" # Test setting default catalog doesn't change hash if it matches existing logic expressions = d.parse( @@ -4116,7 +4117,7 @@ def test_default_catalog_sql(assert_exp_eq): def test_default_catalog_python(): - HASH_WITH_CATALOG = "2928466080" + HASH_WITH_CATALOG = "753636858" @model(name="db.table", kind="full", columns={'"COL"': "int"}) def my_model(context, **kwargs): @@ -4207,7 +4208,7 @@ def test_default_catalog_external_model(): Since external models fqns are the only thing affected by default catalog, and when they change new snapshots are made, the hash will be the same across different names. """ - EXPECTED_HASH = "1837375494" + EXPECTED_HASH = "4263688522" model = create_external_model("db.table", columns={"a": "int", "limit": "int"}) assert model.default_catalog is None diff --git a/tests/core/test_snapshot.py b/tests/core/test_snapshot.py index 077b22464..d995457f6 100644 --- a/tests/core/test_snapshot.py +++ b/tests/core/test_snapshot.py @@ -702,7 +702,7 @@ def test_fingerprint(model: Model, parent_model: Model): fingerprint = fingerprint_from_node(model, nodes={}) original_fingerprint = SnapshotFingerprint( - data_hash="3582214120", + data_hash="720070643", metadata_hash="2793463216", ) @@ -762,7 +762,7 @@ def test_fingerprint_seed_model(): ) expected_fingerprint = SnapshotFingerprint( - data_hash="2156038176", + data_hash="2233743260", metadata_hash="3403817841", ) @@ -801,7 +801,7 @@ def test_fingerprint_jinja_macros(model: Model): } ) original_fingerprint = SnapshotFingerprint( - data_hash="2973224250", + data_hash="1896101134", metadata_hash="2793463216", ) diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 88c698714..65ec1d7fb 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -172,6 +172,7 @@ def x(evaluator, y=None) -> None: common_kwargs = dict( columns_to_types={"a": exp.DataType.build("int")}, + table_format=None, storage_format="parquet", partitioned_by=[exp.to_column("a", quoted=True)], partition_interval_unit=IntervalUnit.DAY, @@ -544,7 +545,7 @@ def test_evaluate_materialized_view_with_partitioned_by_cluster_by( execute_mock.assert_has_calls( [ call( - "CREATE MATERIALIZED VIEW `sqlmesh__test_schema`.`test_schema__test_model__2383078413` PARTITION BY `a` CLUSTER BY `b` AS SELECT `a` AS `a`, `b` AS `b` FROM `tbl` AS `tbl`" + "CREATE MATERIALIZED VIEW `sqlmesh__test_schema`.`test_schema__test_model__2531876769` PARTITION BY `a` CLUSTER BY `b` AS SELECT `a` AS `a`, `b` AS `b` FROM `tbl` AS `tbl`" ), ] ) @@ -661,6 +662,7 @@ def test_evaluate_incremental_unmanaged_no_intervals( columns_to_types=None, partition_interval_unit=model.interval_unit, partitioned_by=model.partitioned_by, + table_format=None, storage_format=None, table_description=None, table_properties={}, @@ -1214,6 +1216,7 @@ def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot) adapter_mock.create_table.assert_called_once_with( f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}__temp__schema_migration_source", columns_to_types={"a": exp.DataType.build("int"), "ds": exp.DataType.build("date")}, + table_format=None, storage_format=None, partitioned_by=[exp.to_column("ds", quoted=True)], partition_interval_unit=IntervalUnit.DAY, @@ -1321,6 +1324,7 @@ def test_create_clone_in_dev_self_referencing(mocker: MockerFixture, adapter_moc adapter_mock.create_table.assert_called_once_with( f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}__temp__schema_migration_source", columns_to_types={"a": exp.DataType.build("int"), "ds": exp.DataType.build("date")}, + table_format=None, storage_format=None, partitioned_by=[exp.to_column("ds", quoted=True)], partition_interval_unit=IntervalUnit.DAY, @@ -1435,6 +1439,7 @@ def test_forward_only_snapshot_for_added_model(mocker: MockerFixture, adapter_mo common_create_args = dict( columns_to_types={"a": exp.DataType.build("int"), "ds": exp.DataType.build("date")}, + table_format=None, storage_format=None, partitioned_by=[exp.to_column("ds", quoted=True)], partition_interval_unit=IntervalUnit.DAY, @@ -1485,6 +1490,7 @@ def test_create_scd_type_2_by_time(adapter_mock, make_snapshot): "valid_from": exp.DataType.build("TIMESTAMPTZ"), "valid_to": exp.DataType.build("TIMESTAMPTZ"), }, + table_format=None, storage_format=None, partitioned_by=[], partition_interval_unit=IntervalUnit.DAY, @@ -1533,6 +1539,7 @@ def test_create_ctas_scd_type_2_by_time(adapter_mock, make_snapshot): # Verify that managed columns are included in CTAS with types common_kwargs = dict( + table_format=None, storage_format=None, partitioned_by=[], partition_interval_unit=IntervalUnit.DAY, @@ -1649,6 +1656,7 @@ def test_create_scd_type_2_by_column(adapter_mock, make_snapshot): "valid_from": exp.DataType.build("TIMESTAMP"), "valid_to": exp.DataType.build("TIMESTAMP"), }, + table_format=None, storage_format=None, partitioned_by=[], partition_interval_unit=IntervalUnit.DAY, @@ -1697,6 +1705,7 @@ def test_create_ctas_scd_type_2_by_column(adapter_mock, make_snapshot): # Verify that managed columns are included in CTAS with types common_kwargs = dict( + table_format=None, storage_format=None, partitioned_by=[], partition_interval_unit=IntervalUnit.DAY, @@ -1958,6 +1967,7 @@ def test_create_incremental_by_unique_no_intervals(adapter_mock, make_snapshot): columns_to_types=model.columns_to_types, partition_interval_unit=model.interval_unit, partitioned_by=model.partitioned_by, + table_format=None, storage_format=None, table_description=None, table_properties={}, @@ -1987,6 +1997,7 @@ def test_create_seed(mocker: MockerFixture, adapter_mock, make_snapshot): common_create_kwargs: t.Dict[str, t.Any] = dict( columns_to_types={"id": exp.DataType.build("bigint"), "name": exp.DataType.build("text")}, + table_format=None, storage_format=None, partitioned_by=[], partition_interval_unit=IntervalUnit.DAY, @@ -2070,6 +2081,7 @@ def test_create_seed_on_error(mocker: MockerFixture, adapter_mock, make_snapshot mocker.ANY, column_descriptions={}, columns_to_types={"id": exp.DataType.build("bigint"), "name": exp.DataType.build("text")}, + table_format=None, storage_format=None, partitioned_by=[], partition_interval_unit=IntervalUnit.DAY, @@ -2125,6 +2137,7 @@ def test_create_seed_no_intervals(mocker: MockerFixture, adapter_mock, make_snap mocker.ANY, column_descriptions={}, columns_to_types={"id": exp.DataType.build("bigint"), "name": exp.DataType.build("text")}, + table_format=None, storage_format=None, partitioned_by=[], partition_interval_unit=IntervalUnit.DAY, @@ -2528,6 +2541,7 @@ def test_create_managed(adapter_mock, make_snapshot, mocker: MockerFixture): f"{snapshot.table_name()}__temp", mocker.ANY, model.columns_to_types, + table_format=model.table_format, storage_format=model.storage_format, partitioned_by=model.partitioned_by, partition_interval_unit=model.interval_unit, @@ -2607,6 +2621,7 @@ def test_evaluate_managed(adapter_mock, make_snapshot, mocker: MockerFixture): f"{snapshot.table_name()}__temp", mocker.ANY, columns_to_types=None, + table_format=model.table_format, storage_format=model.storage_format, partitioned_by=model.partitioned_by, partition_interval_unit=model.interval_unit, @@ -2648,10 +2663,10 @@ def test_cleanup_managed(adapter_mock, make_snapshot, mocker: MockerFixture): evaluator.cleanup(target_snapshots=[cleanup_task]) adapter_mock.drop_table.assert_called_once_with( - "sqlmesh__test_schema.test_schema__test_model__1556851963__temp" + "sqlmesh__test_schema.test_schema__test_model__2998759427__temp" ) adapter_mock.drop_managed_table.assert_called_once_with( - "sqlmesh__test_schema.test_schema__test_model__1556851963" + "sqlmesh__test_schema.test_schema__test_model__2998759427" ) diff --git a/tests/core/test_state_sync.py b/tests/core/test_state_sync.py index 0efdf0432..ca0f2a26a 100644 --- a/tests/core/test_state_sync.py +++ b/tests/core/test_state_sync.py @@ -2229,7 +2229,7 @@ def test_snapshot_batching(state_sync, mocker, make_snapshot): call( exp.to_table("sqlmesh._snapshots"), where=parse_one( - f"(name, identifier) in (('\"a\"', '{snapshot_b.identifier}'), ('\"a\"', '{snapshot_a.identifier}'))" + f"(name, identifier) in (('\"a\"', '{snapshot_a.identifier}'), ('\"a\"', '{snapshot_b.identifier}'))" ), ), call( diff --git a/tests/schedulers/airflow/test_client.py b/tests/schedulers/airflow/test_client.py index 4f991762f..6471c125f 100644 --- a/tests/schedulers/airflow/test_client.py +++ b/tests/schedulers/airflow/test_client.py @@ -180,7 +180,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot): "models_to_backfill": ['"test_model"'], "end_bounded": False, "ensure_finalized_snapshots": False, - "directly_modified_snapshots": [{"identifier": "844700562", "name": '"test_model"'}], + "directly_modified_snapshots": [{"identifier": "4011362914", "name": '"test_model"'}], "indirectly_modified_snapshots": {}, "removed_snapshots": [], "restatements": { diff --git a/web/server/api/endpoints/models.py b/web/server/api/endpoints/models.py index 51288a927..052e65100 100644 --- a/web/server/api/endpoints/models.py +++ b/web/server/api/endpoints/models.py @@ -95,6 +95,7 @@ def serialize_model(context: Context, model: Model, render_query: bool = False) stamp=model.stamp, start=model.start, retention=model.retention, + table_format=model.table_format, storage_format=model.storage_format, time_column=time_column, tags=tags, diff --git a/web/server/models.py b/web/server/models.py index 2f3553851..bd1283d5f 100644 --- a/web/server/models.py +++ b/web/server/models.py @@ -155,6 +155,7 @@ class ModelDetails(PydanticModel): stamp: t.Optional[TimeLike] = None start: t.Optional[TimeLike] = None retention: t.Optional[int] = None + table_format: t.Optional[str] = None storage_format: t.Optional[str] = None time_column: t.Optional[str] = None tags: t.Optional[str] = None