Skip to content

Commit

Permalink
Feat!: Add new table_format property alongside storage_format (#3175)
Browse files Browse the repository at this point in the history
  • Loading branch information
erindru authored Sep 25, 2024
1 parent e2829a1 commit 8996278
Show file tree
Hide file tree
Showing 26 changed files with 251 additions and 94 deletions.
7 changes: 6 additions & 1 deletion docs/concepts/models/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,13 @@ Learn more about these properties and their default values in the [model configu
### depends_on
: Depends on explicitly specifies the models on which the model depends, in addition to the ones automatically inferred by from the model code.

### table_format
: Table format is an optional property for engines that support table formats like `iceberg` and `hive` where the physical file format is configurable. The intention is to define the table type using `table_format` and then the on-disk format of the files within the table using `storage_format`.

Note that this property only implemented for engines that allow the `table_format` to be configured independently of the `storage_format`.

### storage_format
: Storage format is a property for engines such as Spark or Hive that support storage formats such as `parquet` and `orc`.
: Storage format is a property for engines such as Spark or Hive that support storage formats such as `parquet` and `orc`. Note that some engines dont make a distinction between `table_format` and `storage_format`, in which case `storage_format` is used and `table_format` is ignored.

### partitioned_by
: Partitioned by plays two roles. For most model kinds, it is an optional property for engines that support table partitioning such as Spark or BigQuery.
Expand Down
10 changes: 8 additions & 2 deletions docs/integrations/engines/athena.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,18 @@ These options are specific to SQLMesh itself and are not passed to PyAthena

## Model properties

The Athena adapter utilises the following model top-level [properties](../../concepts/models/overview.md#model-properties):

| Name | Description | Type | Required |
|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|----------|
| `table_format` | Sets the [table_type](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties) Athena uses when creating the table. Valid values are `hive` or `iceberg`. | string | N |
| `storage_format` | Configures the file format to be used by the `table_format`. For Hive tables, this sets the [STORED AS](https://docs.aws.amazon.com/athena/latest/ug/create-table.html#parameters) option. For Iceberg tables, this sets [format](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties) property. | string | N |

The Athena adapter recognises the following model [physical_properties](../../concepts/models/overview.md#physical_properties):

| Name | Description | Type | Default |
|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|---------|
| `s3_base_location`| `s3://` base URI of where the snapshot tables for this model should be written. Overrides `s3_warehouse_location` if one is configured. | string | |
| `table_type` | Sets the [table_type](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties) Athena uses when creating the table. Valid values are `hive` or `iceberg`. | string | `hive` |


## S3 Locations
Expand All @@ -62,6 +68,6 @@ Consequently, any SQLMesh model types that needs to delete or merge data from ex

However, Athena does support [Apache Iceberg](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html) tables which allow a full range of operations. These can be used for more complex model types such as [`INCREMENTAL_BY_UNIQUE_KEY`](../../concepts/models/model_kinds.md#incremental_by_unique_key) and [`SCD_TYPE_2`](../../concepts/models/model_kinds.md#scd-type-2).

To use an Iceberg table for a model, set `table_type='iceberg'` in the model [physical_properties](../../concepts/models/overview.md#physical_properties).
To use an Iceberg table for a model, set `table_format iceberg` in the model [properties](../../concepts/models/overview.md#model-properties).

In general, Iceberg tables offer the most flexibility and you'll run into the least SQLMesh limitations when using them. However, we create Hive tables by default because Athena creates Hive tables by default, so Iceberg tables are opt-in rather than opt-out.
9 changes: 5 additions & 4 deletions docs/reference/model_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Configuration options for SQLMesh model properties. Supported by all model kinds
| Option | Description | Type | Required |
|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-----------------:|:--------:|
| `name` | The model name. Must include at least a qualifying schema (`<schema>.<model>`) and may include a catalog (`<catalog>.<schema>.<model>`). Can be omitted if [infer_names](#model-naming) is set to true. | str | N |
| `project` | The name of the project the model belongs to - used in multi-repo deployments | str | N |
| `project` | The name of the project the model belongs to - used in multi-repo deployments | str | N |
| `kind` | The model kind ([Additional Details](#model-kind-properties)). (Default: `VIEW`) | str \| dict | N |
| `audits` | SQLMesh [audits](../concepts/audits.md) that should run against the model's output | array[str] | N |
| `dialect` | The SQL dialect in which the model's query is written. All SQL dialects [supported by the SQLGlot library](https://github.com/tobymao/sqlglot/blob/main/sqlglot/dialects/dialect.py) are allowed. | str | N |
Expand All @@ -26,14 +26,15 @@ Configuration options for SQLMesh model properties. Supported by all model kinds
| `column_descriptions` | A key-value mapping of column names to column comments that will be registered in the SQL engine's table COMMENT field (if supported by the engine). Specified as key-value pairs (`column_name = 'column comment'`). If present, [inline column comments](../concepts/models/overview.md#inline-column-comments) will not be registered in the SQL engine. | dict | N |
| `grains` | The column(s) whose combination uniquely identifies each row in the model | str \| array[str] | N |
| `references` | The model column(s) used to join to other models' grains | str \| array[str] | N |
| `depends_on` | Models on which this model depends, in addition to the ones inferred from the model's query. (Default: dependencies inferred from model code) | array[str] | N |
| `storage_format` | The storage format that should be used to store physical tables; only applicable to engines such as Spark | str | N |
| `depends_on` | Models on which this model depends, in addition to the ones inferred from the model's query. (Default: dependencies inferred from model code) | array[str] | N |
| `table_format` | The table format that should be used to manage the physical files (eg `iceberg`, `hive`, `delta`); only applicable to engines such as Spark and Athena | str | N |
| `storage_format` | The storage format that should be used to store physical files (eg `parquet`, `orc`); only applicable to engines such as Spark and Athena | str | N |
| `partitioned_by` | The column(s) and/or column expressions used define a model's partitioning key. Required for the `INCREMENTAL_BY_PARTITION` model kind. Optional for all other model kinds; used to partition the model's physical table in engines that support partitioning. | str \| array[str] | N |
| `clustered_by` | The column(s) used to cluster the model's physical table; only applicable to engines that support clustering | str | N |
| `columns` | The column names and data types returned by the model. Disables [automatic inference of column names and types](../concepts/models/overview.md#conventions) from the SQL query. | array[str] | N |
| `physical_properties` | A key-value mapping of arbitrary properties specific to the target engine that are applied to the model table / view in the physical layer. Specified as key-value pairs (`key = value`). | dict | N |
| `virtual_properties` | A key-value mapping of arbitrary properties specific to the target engine that are applied to the model view in the virtual layer. Specified as key-value pairs (`key = value`). | dict | N |
| `session_properties` | A key-value mapping of arbitrary properties specific to the target engine that are applied to the engine session. Specified as key-value pairs (`key = value`). | dict | N |
| `session_properties` | A key-value mapping of arbitrary properties specific to the target engine that are applied to the engine session. Specified as key-value pairs (`key = value`). | dict | N |
| `allow_partials` | Whether this model can process partial (incomplete) data intervals | bool | N |
| `enabled` | Whether the model is enabled. This attribute is `true` by default. Setting it to `false` causes SQLMesh to ignore this model when loading the project. | bool | N |

Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/config/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ class ModelDefaultsConfig(BaseConfig):
start: The earliest date that the model will be backfilled for. If this is None,
then the date is inferred by taking the most recent start date of its ancestors.
The start date can be a static datetime or a relative datetime like "1 year ago"
table_format: The table format used to manage the physical table files defined by `storage_format`, only applicable in certain engines.
(eg, 'iceberg', 'delta', 'hudi')
storage_format: The storage format used to store the physical table, only applicable in certain engines.
(eg. 'parquet')
(eg. 'parquet', 'orc')
on_destructive_change: What should happen when a forward-only model requires a destructive schema change.
audits: The audits to be applied globally to all models in the project.
"""
Expand All @@ -38,6 +40,7 @@ class ModelDefaultsConfig(BaseConfig):
cron: t.Optional[str] = None
owner: t.Optional[str] = None
start: t.Optional[TimeLike] = None
table_format: t.Optional[str] = None
storage_format: t.Optional[str] = None
on_destructive_change: t.Optional[OnDestructiveChange] = None
session_properties: t.Optional[t.Dict[str, t.Any]] = None
Expand Down
36 changes: 14 additions & 22 deletions sqlmesh/core/engine_adapter/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ def create_state_table(
table_name,
columns_to_types,
primary_key=primary_key,
table_properties={
# it's painfully slow, but it works
"table_type": exp.Literal.string("iceberg")
},
# it's painfully slow, but it works
table_format="iceberg",
)

def _get_data_objects(
Expand Down Expand Up @@ -189,7 +187,7 @@ def _build_create_table_exp(
**kwargs,
)

is_hive = self._table_type(table_properties) == "hive"
is_hive = self._table_type(kwargs.get("table_format", None)) == "hive"

# Filter any PARTITIONED BY properties from the main column list since they cant be specified in both places
# ref: https://docs.aws.amazon.com/athena/latest/ug/partitions.html
Expand All @@ -214,6 +212,7 @@ def _build_create_table_exp(
def _build_table_properties_exp(
self,
catalog_name: t.Optional[str] = None,
table_format: t.Optional[str] = None,
storage_format: t.Optional[str] = None,
partitioned_by: t.Optional[t.List[exp.Expression]] = None,
partition_interval_unit: t.Optional[IntervalUnit] = None,
Expand All @@ -229,14 +228,19 @@ def _build_table_properties_exp(
properties: t.List[exp.Expression] = []
table_properties = table_properties or {}

is_hive = self._table_type(table_properties) == "hive"
is_hive = self._table_type(table_format) == "hive"
is_iceberg = not is_hive

if is_hive and not expression:
# Hive tables are CREATE EXTERNAL TABLE, Iceberg tables are CREATE TABLE
# Unless it's a CTAS, those are always CREATE TABLE
properties.append(exp.ExternalProperty())

if table_format:
properties.append(
exp.Property(this=exp.var("table_type"), value=exp.Literal.string(table_format))
)

if table_description:
properties.append(exp.SchemaCommentProperty(this=exp.Literal.string(table_description)))

Expand Down Expand Up @@ -297,15 +301,12 @@ def _truncate_table(self, table_name: TableName) -> None:
self.execute(f"DELETE FROM {table.sql(dialect=self.dialect, identify=True)}")

def _table_type(
self, table_properties: t.Optional[t.Dict[str, exp.Expression]] = None
self, table_type: t.Optional[str] = None
) -> t.Union[t.Literal["hive"], t.Literal["iceberg"]]:
"""
Use the user-specified table_properties to figure out of this is a Hive or an Iceberg table
"""
if table_type and table_type.lower() == "iceberg":
return "iceberg"

# if we cant detect any indication of Iceberg, this is a Hive table
if table_properties and (table_type := table_properties.get("table_type", None)):
if "iceberg" in table_type.sql(dialect=self.dialect).lower():
return "iceberg"
return "hive"

@lru_cache()
Expand Down Expand Up @@ -392,12 +393,3 @@ def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]
raise SQLMeshError("Cannot delete from non-empty Hive table")

return None

def _drop_object(
self,
name: TableName | SchemaName,
exists: bool = True,
kind: str = "TABLE",
**drop_args: t.Any,
) -> None:
return super()._drop_object(name, exists=exists, kind=kind, **drop_args)
Loading

0 comments on commit 8996278

Please sign in to comment.