Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Athena adapter #3154

Merged
merged 3 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,11 @@ workflows:
- redshift
- bigquery
- clickhouse-cloud
- athena
filters:
branches:
only:
- main
branches:
only:
- main
- trigger_private_tests:
requires:
- style_and_slow_tests
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ venv/
ENV/
env.bak/
venv.bak/
venv*/

# Spyder project settings
.spyderproject
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ install-doc:
pip3 install -r ./docs/requirements.txt

install-engine-test:
pip3 install -e ".[dev,web,slack,mysql,postgres,databricks,redshift,bigquery,snowflake,trino,mssql,clickhouse]"
pip3 install -e ".[dev,web,slack,mysql,postgres,databricks,redshift,bigquery,snowflake,trino,mssql,clickhouse,athena]"

install-pre-commit:
pre-commit install
Expand Down Expand Up @@ -209,3 +209,6 @@ redshift-test: guard-REDSHIFT_HOST guard-REDSHIFT_USER guard-REDSHIFT_PASSWORD g

clickhouse-cloud-test: guard-CLICKHOUSE_CLOUD_HOST guard-CLICKHOUSE_CLOUD_USERNAME guard-CLICKHOUSE_CLOUD_PASSWORD engine-clickhouse-install
pytest -n auto -x -m "clickhouse_cloud" --retries 3 --junitxml=test-results/junit-clickhouse-cloud.xml

athena-test: guard-AWS_ACCESS_KEY_ID guard-AWS_SECRET_ACCESS_KEY guard-ATHENA_S3_WAREHOUSE_LOCATION engine-athena-install
pytest -n auto -x -m "athena" --retries 3 --retry-delay 10 --junitxml=test-results/junit-athena.xml
1 change: 1 addition & 0 deletions docs/guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ Example snowflake connection configuration:

These pages describe the connection configuration options for each execution engine.

* [Athena](../integrations/engines/athena.md)
* [BigQuery](../integrations/engines/bigquery.md)
* [Databricks](../integrations/engines/databricks.md)
* [DuckDB](../integrations/engines/duckdb.md)
Expand Down
67 changes: 67 additions & 0 deletions docs/integrations/engines/athena.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Athena

## Installation

```
pip install "sqlmesh[athena]"
```

## Connection options

### PyAthena connection options

SQLMesh leverages the [PyAthena](https://github.com/laughingman7743/PyAthena) DBAPI driver to connect to Athena. Therefore, the connection options relate to the PyAthena connection options.
Note that PyAthena uses [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) under the hood so you can also use [boto3 environment variables](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#using-environment-variables) for configuration.

| Option | Description | Type | Required |
|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|:------:|:--------:|
| `type` | Engine type name - must be `athena` | string | Y |
| `aws_access_key_id` | The access key for your AWS user | string | N |
| `aws_secret_access_key` | The secret key for your AWS user | string | N |
| `role_arn` | The ARN of a role to assume once authenticated | string | N |
| `role_session_name` | The session name to use when assuming `role_arn` | string | N |
| `region_name` | The AWS region to use | string | N |
| `work_group` | The Athena [workgroup](https://docs.aws.amazon.com/athena/latest/ug/workgroups-manage-queries-control-costs.html) to send queries to | string | N |
| `s3_staging_dir` | The S3 location for Athena to write query results. Only required if not using `work_group` OR the configured `work_group` doesnt have a results location set | string | N |
| `schema_name` | The default schema to place objects in if a schema isnt specified. Defaults to `default` | string | N |
| `catalog_name` | The default catalog to place schemas in. Defaults to `AwsDataCatalog` | string | N |

### SQLMesh connection options

These options are specific to SQLMesh itself and are not passed to PyAthena

| Option | Description | Type | Required |
|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|----------|
| `s3_warehouse_location` | Set the base path in S3 where SQLMesh will instruct Athena to place table data. Only required if you arent specifying the location in the model itself. See [S3 Locations](#s3-locations) below. | string | N |

## Model properties

The Athena adapter recognises the following model [physical_properties](../../concepts/models/overview.md#physical_properties):

| Name | Description | Type | Default |
|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|---------|
| `s3_base_location`| `s3://` base URI of where the snapshot tables for this model should be written. Overrides `s3_warehouse_location` if one is configured. | string | |
| `table_type` | Sets the [table_type](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties) Athena uses when creating the table. Valid values are `hive` or `iceberg`. | string | `hive` |


## S3 Locations
When creating tables, Athena needs to know where in S3 the table data is located. You cannot issue a `CREATE TABLE` statement without specifying a `LOCATION` for the table data.

In addition, unlike other engines such as Trino, Athena will not infer a table location if you set a _schema_ location via `CREATE SCHEMA <schema> LOCATION 's3://schema/location'`.

Therefore, in order for SQLMesh to issue correct `CREATE TABLE` statements to Athena, you need to configure where the tables should be stored. There are two options for this:

- **Project-wide:** set `s3_warehouse_location` in the connection config. SQLMesh will set the table `LOCATION` to be `<s3_warehouse_location>/<schema_name>/<snapshot_table_name>` when it creates a snapshot of your model.
- **Per-model:** set `s3_base_location` in the model `physical_properties`. SQLMesh will set the table `LOCATION` to be `<s3_base_location>/<snapshot_table_name>` every time it creates a snapshot of your model. This takes precedence over any `s3_warehouse_location` set in the connection config.


## Limitations
Athena was initially designed to read data stored in S3 and to do so without changing that data. This means that it does not have good support for mutating tables. In particular, it will not delete data from Hive tables.

Consequently, any SQLMesh model types that needs to delete or merge data from existing tables will not work on Hive tables. In addition, [forward only changes](../../concepts/plans.md#forward-only-change) that mutate the schemas of existing tables have a high chance of failure because Athena supports very limited schema modifications on Hive tables.

However, Athena does support [Apache Iceberg](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html) tables which allow a full range of operations. These can be used for more complex model types such as [`INCREMENTAL_BY_UNIQUE_KEY`](../../concepts/models/model_kinds.md#incremental_by_unique_key) and [`SCD_TYPE_2`](../../concepts/models/model_kinds.md#scd-type-2).

To use an Iceberg table for a model, set `table_type='iceberg'` in the model [physical_properties](../../concepts/models/overview.md#physical_properties).

In general, Iceberg tables offer the most flexibility and you'll run into the least SQLMesh limitations when using them. However, we create Hive tables by default because Athena creates Hive tables by default, so Iceberg tables are opt-in rather than opt-out.
1 change: 1 addition & 0 deletions docs/integrations/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ SQLMesh supports integrations with the following tools:
## Execution engines
SQLMesh supports the following execution engines for running SQLMesh projects:

* [Athena](./engines/athena.md)
* [BigQuery](./engines/bigquery.md)
* [Databricks](./engines/databricks.md)
* [DuckDB](./engines/duckdb.md)
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ nav:
- integrations/dbt.md
- integrations/github.md
- Execution engines:
- integrations/engines/athena.md
- integrations/engines/bigquery.md
- integrations/engines/clickhouse.md
- integrations/engines/databricks.md
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ markers =
spark_pyspark: test for Spark with PySpark dependency
# Engine Adapters
engine: test all engine adapters
athena: test for Athena
bigquery: test for BigQuery
clickhouse: test for Clickhouse (standalone mode)
clickhouse_cluster: test for Clickhouse (cluster mode)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"sqlglot[rs]~=25.22.0",
],
extras_require={
"athena": ["PyAthena[Pandas]"],
"bigquery": [
"google-cloud-bigquery[pandas]",
"google-cloud-bigquery-storage",
Expand Down
16 changes: 15 additions & 1 deletion sqlmesh/core/config/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from enum import Enum

from sqlmesh.utils import classproperty
from sqlmesh.utils.errors import ConfigError
from sqlmesh.utils.errors import ConfigError, SQLMeshError
from sqlmesh.utils.pydantic import field_validator


Expand Down Expand Up @@ -86,3 +86,17 @@ def _validate_type(v: t.Any) -> None:
mode="before",
check_fields=False,
)(_variables_validator)


def validate_s3_location(value: str, error_type: t.Type[Exception] = SQLMeshError) -> str:
if not value.startswith("s3://"):
raise error_type(f"Location '{value}' must be a s3:// URI")

if not value.endswith("/"):
value += "/"

# To avoid HIVE_METASTORE_ERROR: S3 resource path length must be less than or equal to 700.
if len(value) > 700:
raise error_type(f"Location '{value}' cannot be more than 700 characters")

return value
72 changes: 72 additions & 0 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sqlmesh.core.config.common import (
concurrent_tasks_validator,
http_headers_validator,
validate_s3_location,
)
from sqlmesh.core.engine_adapter import EngineAdapter
from sqlmesh.utils.errors import ConfigError
Expand Down Expand Up @@ -1483,6 +1484,77 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
return {"compress": compress, "client_name": f"SQLMesh/{__version__}", **settings}


class AthenaConnectionConfig(ConnectionConfig):
# PyAthena connection options
aws_access_key_id: t.Optional[str] = None
aws_secret_access_key: t.Optional[str] = None
role_arn: t.Optional[str] = None
role_session_name: t.Optional[str] = None
region_name: t.Optional[str] = None
work_group: t.Optional[str] = None
s3_staging_dir: t.Optional[str] = None
schema_name: t.Optional[str] = None
catalog_name: t.Optional[str] = None

# SQLMesh options
s3_warehouse_location: t.Optional[str] = None
concurrent_tasks: int = 4
register_comments: Literal[False] = (
False # because Athena doesnt support comments in most cases
)
pre_ping: Literal[False] = False

type_: Literal["athena"] = Field(alias="type", default="athena")

@model_validator(mode="after")
@model_validator_v1_args
def _root_validator(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
work_group = values.get("work_group")
s3_staging_dir = values.get("s3_staging_dir")
s3_warehouse_location = values.get("s3_warehouse_location")

if not work_group and not s3_staging_dir:
raise ConfigError("At least one of work_group or s3_staging_dir must be set")

if s3_staging_dir:
values["s3_staging_dir"] = validate_s3_location(s3_staging_dir, error_type=ConfigError)

if s3_warehouse_location:
values["s3_warehouse_location"] = validate_s3_location(
s3_warehouse_location, error_type=ConfigError
)

return values

@property
def _connection_kwargs_keys(self) -> t.Set[str]:
return {
"aws_access_key_id",
"aws_secret_access_key",
"role_arn",
"role_session_name",
"region_name",
"work_group",
"s3_staging_dir",
"schema_name",
"catalog_name",
}

@property
def _engine_adapter(self) -> t.Type[EngineAdapter]:
return engine_adapter.AthenaEngineAdapter

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {"s3_warehouse_location": self.s3_warehouse_location}

@property
def _connection_factory(self) -> t.Callable:
from pyathena import connect # type: ignore

return connect


CONNECTION_CONFIG_TO_TYPE = {
# Map all subclasses of ConnectionConfig to the value of their `type_` field.
tpe.all_field_infos()["type_"].default: tpe
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/engine_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sqlmesh.core.engine_adapter.snowflake import SnowflakeEngineAdapter
from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter
from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter
from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter

DIALECT_TO_ENGINE_ADAPTER = {
"hive": SparkEngineAdapter,
Expand All @@ -31,6 +32,7 @@
"mysql": MySQLEngineAdapter,
"mssql": MSSQLEngineAdapter,
"trino": TrinoEngineAdapter,
"athena": AthenaEngineAdapter,
}

DIALECT_ALIASES = {
Expand Down
Loading