diff --git a/dbt-athena-community/README.md b/dbt-athena-community/README.md new file mode 100644 index 00000000..c5487c05 --- /dev/null +++ b/dbt-athena-community/README.md @@ -0,0 +1,882 @@ + +

+ dbt logo +

+

+ + + + + + + + + + + + + + +

+ + +- [Features](#features) + - [Quick start](#quick-start) + - [Installation](#installation) + - [Prerequisites](#prerequisites) + - [Credentials](#credentials) + - [Configuring your profile](#configuring-your-profile) + - [Additional information](#additional-information) + - [Models](#models) + - [Table configuration](#table-configuration) + - [Table location](#table-location) + - [Incremental models](#incremental-models) + - [On schema change](#on-schema-change) + - [Iceberg](#iceberg) + - [Highly available table (HA)](#highly-available-table-ha) + - [HA known issues](#ha-known-issues) + - [Update glue data catalog](#update-glue-data-catalog) + - [Snapshots](#snapshots) + - [Timestamp strategy](#timestamp-strategy) + - [Check strategy](#check-strategy) + - [Hard-deletes](#hard-deletes) + - [Working example](#working-example) + - [Snapshots known issues](#snapshots-known-issues) + - [AWS Lake Formation integration](#aws-lake-formation-integration) + - [Python models](#python-models) + - [Contracts](#contracts) + - [Contributing](#contributing) + - [Contributors ✨](#contributors-) + + +# Features + +- Supports dbt version `1.7.*` +- Support for Python +- Supports [seeds][seeds] +- Correctly detects views and their columns +- Supports [table materialization][table] + - [Iceberg tables][athena-iceberg] are supported **only with Athena Engine v3** and **a unique table location** + (see table location section below) + - Hive tables are supported by both Athena engines +- Supports [incremental models][incremental] + - On Iceberg tables: + - Supports the use of `unique_key` only with the `merge` strategy + - Supports the `append` strategy + - On Hive tables: + - Supports two incremental update strategies: `insert_overwrite` and `append` + - Does **not** support the use of `unique_key` +- Supports [snapshots][snapshots] +- Supports [Python models][python-models] + +[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds + +[incremental]: https://docs.getdbt.com/docs/build/incremental-models + +[table]: https://docs.getdbt.com/docs/build/materializations#table + +[python-models]: https://docs.getdbt.com/docs/build/python-models#configuring-python-models + +[athena-iceberg]: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html + +[snapshots]: https://docs.getdbt.com/docs/build/snapshots + +## Quick start + +### Installation + +- `pip install dbt-athena-community` +- Or `pip install git+https://github.com/dbt-athena/dbt-athena.git` + +### Prerequisites + +To start, you will need an S3 bucket, for instance `my-bucket` and an Athena database: + +```sql +CREATE DATABASE IF NOT EXISTS analytics_dev +COMMENT 'Analytics models generated by dbt (development)' +LOCATION 's3://my-bucket/' +WITH DBPROPERTIES ('creator'='Foo Bar', 'email'='foo@bar.com'); +``` + +Notes: + +- Take note of your AWS region code (e.g. `us-west-2` or `eu-west-2`, etc.). +- You can also use [AWS Glue](https://docs.aws.amazon.com/athena/latest/ug/glue-athena.html) to create and manage Athena + databases. + +### Credentials + +Credentials can be passed directly to the adapter, or they can +be [determined automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) based +on `aws cli`/`boto3` conventions. +You can either: + +- Configure `aws_access_key_id` and `aws_secret_access_key` +- Configure `aws_profile_name` to match a profile defined in your AWS credentials file. + Checkout dbt profile configuration below for details. + +### Configuring your profile + +A dbt profile can be configured to run against AWS Athena using the following configuration: + +| Option | Description | Required? | Example | +|-----------------------|------------------------------------------------------------------------------------------|-----------|--------------------------------------------| +| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | +| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` | +| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` | +| s3_tmp_table_dir | Prefix for storing temporary tables, if different from the connection's `s3_data_dir` | Optional | `s3://bucket3/dbt/` | +| region_name | AWS region of your Athena instance | Required | `eu-west-1` | +| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | +| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | +| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | +| debug_query_state | Flag if debug message with Athena query state is needed | Optional | `false` | +| aws_access_key_id | Access key ID of the user performing requests | Optional | `AKIAIOSFODNN7EXAMPLE` | +| aws_secret_access_key | Secret access key of the user performing requests | Optional | `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` | +| aws_profile_name | Profile to use from your AWS shared credentials file | Optional | `my-profile` | +| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` | +| skip_workgroup_check | Indicates if the WorkGroup check (additional AWS call) can be skipped | Optional | `true` | +| num_retries | Number of times to retry a failing query | Optional | `3` | +| num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | `5` | +| num_iceberg_retries | Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR | Optional | `3` | +| spark_work_group | Identifier of Athena Spark workgroup for running Python models | Optional | `my-spark-workgroup` | +| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{"ACL": "bucket-owner-full-control"}` | +| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` | + +**Example profiles.yml entry:** + +```yaml +athena: + target: dev + outputs: + dev: + type: athena + s3_staging_dir: s3://athena-query-results/dbt/ + s3_data_dir: s3://your_s3_bucket/dbt/ + s3_data_naming: schema_table + s3_tmp_table_dir: s3://your_s3_bucket/temp/ + region_name: eu-west-1 + schema: dbt + database: awsdatacatalog + threads: 4 + aws_profile_name: my-profile + work_group: my-workgroup + spark_work_group: my-spark-workgroup + seed_s3_upload_args: + ACL: bucket-owner-full-control +``` + +### Additional information + +- `threads` is supported +- `database` and `catalog` can be used interchangeably + +## Models + +### Table configuration + +- `external_location` (`default=none`) + - If set, the full S3 path to which the table will be saved + - Works only with incremental models + - Does not work with Hive table with `ha` set to true +- `partitioned_by` (`default=none`) + - An array list of columns by which the table will be partitioned + - Limited to creation of 100 partitions (*currently*) +- `bucketed_by` (`default=none`) + - An array list of columns to bucket data, ignored if using Iceberg +- `bucket_count` (`default=none`) + - The number of buckets for bucketing your data, ignored if using Iceberg +- `table_type` (`default='hive'`) + - The type of table + - Supports `hive` or `iceberg` +- `ha` (`default=false`) + - If the table should be built using the high-availability method. This option is only available for Hive tables + since it is by default for Iceberg tables (see the section [below](#highly-available-table-ha)) +- `format` (`default='parquet'`) + - The data format for the table + - Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE` +- `write_compression` (`default=none`) + - The compression type to use for any storage format that allows compression to be specified. To see which options are + available, check out [CREATE TABLE AS][create-table-as] +- `field_delimiter` (`default=none`) + - Custom field delimiter, for when format is set to `TEXTFILE` +- `table_properties`: table properties to add to the table, valid for Iceberg only +- `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be + made to manage data in S3. Data in S3 will only be cleared up for Iceberg + tables [see AWS docs](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html). Note that + Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds. +- `seed_by_insert` (`default=false`) + - Default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement + - Large seed files cannot use `seed_by_insert`, as the SQL insert statement would + exceed [the Athena limit of 262144 bytes](https://docs.aws.amazon.com/athena/latest/ug/service-limits.html) +- `force_batch` (`default=false`) + - Skip creating the table as CTAS and run the operation directly in batch insert mode + - This is particularly useful when the standard table creation process fails due to partition limitations, + allowing you to work with temporary tables and persist the dataset more efficiently +- `unique_tmp_table_suffix` (`default=false`) + - For incremental models using insert overwrite strategy on hive table + - Replace the __dbt_tmp suffix used as temporary table name suffix by a unique uuid + - Useful if you are looking to run multiple dbt build inserting in the same table in parallel +- `temp_schema` (`default=none`) + - For incremental models, it allows to define a schema to hold temporary create statements + used in incremental model runs + - Schema will be created in the model target database if does not exist +- `lf_tags_config` (`default=none`) + - [AWS Lake Formation](#aws-lake-formation-integration) tags to associate with the table and columns + - `enabled` (`default=False`) whether LF tags management is enabled for a model + - `tags` dictionary with tags and their values to assign for the model + - `tags_columns` dictionary with a tag key, value and list of columns they must be assigned to + - `lf_inherited_tags` (`default=none`) + - List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be + removed during association of those defined in `lf_tags_config` + - i.e., the default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from + tables and columns before associating the ones currently defined for a given model + - This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly + +```sql +{{ + config( + materialized='incremental', + incremental_strategy='append', + on_schema_change='append_new_columns', + table_type='iceberg', + schema='test_schema', + lf_tags_config={ + 'enabled': true, + 'tags': { + 'tag1': 'value1', + 'tag2': 'value2' + }, + 'tags_columns': { + 'tag1': { + 'value1': ['column1', 'column2'], + 'value2': ['column3', 'column4'] + } + }, + 'inherited_tags': ['tag1', 'tag2'] + } + ) +}} +``` + +- Format for `dbt_project.yml`: + +```yaml + +lf_tags_config: + enabled: true + tags: + tag1: value1 + tag2: value2 + tags_columns: + tag1: + value1: [ column1, column2 ] + inherited_tags: [ tag1, tag2 ] +``` + +- `lf_grants` (`default=none`) + - Lake Formation grants config for data_cell filters + - Format: + + ```python + lf_grants={ + 'data_cell_filters': { + 'enabled': True | False, + 'filters': { + 'filter_name': { + 'row_filter': '', + 'principals': ['principal_arn1', 'principal_arn2'] + } + } + } + } + ``` + +> Notes: +> +> - `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources. +> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use +> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or +> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose. +> - `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table +> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every +> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and +> apply changes if they occur: drop, create, update filters and their permissions. +> - Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at + the table and column level +> - Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made +> previously +> - e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed + prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform + where the inherited value is configured nor in the DBT project where the override previously existed but now is + gone) + +[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties + +### Table location + +The location a table is saved to is determined by: + +1. If `external_location` is defined, that value is used +2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming` +3. If `s3_data_dir` is not defined, data is stored under `s3_staging_dir/tables/` + +Here all the options available for `s3_data_naming`: + +- `unique`: `{s3_data_dir}/{uuid4()}/` +- `table`: `{s3_data_dir}/{table}/` +- `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/` +- `schema_table`: `{s3_data_dir}/{schema}/{table}/` +- `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/` + +It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config, +or setting up the value for groups of model in dbt_project.yml. + +> Note: when using a workgroup with a default output location configured, `s3_data_naming` and any configured buckets +> are ignored and the location configured in the workgroup is used. + +### Incremental models + +Support for [incremental models](https://docs.getdbt.com/docs/build/incremental-models). + +These strategies are supported: + +- `insert_overwrite` (default): The insert overwrite strategy deletes the overlapping partitions from the destination + table, and then inserts the new records from the source. This strategy depends on the `partitioned_by` keyword! If no + partitions are defined, dbt will fall back to the `append` strategy. +- `append`: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate + data (e.g. great for log or historical data). +- `merge`: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with `unique_key`. + Only available when using Iceberg. + +### On schema change + +`on_schema_change` is an option to reflect changes of schema in incremental models. +The following options are supported: + +- `ignore` (default) +- `fail` +- `append_new_columns` +- `sync_all_columns` + +For details, please refer +to [dbt docs](https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change). + +### Iceberg + +The adapter supports table materialization for Iceberg. + +To get started just add this as your model: + +```sql +{{ config( + materialized='table', + table_type='iceberg', + format='parquet', + partitioned_by=['bucket(user_id, 5)'], + table_properties={ + 'optimize_rewrite_delete_file_threshold': '2' + } +) }} + +select 'A' as user_id, + 'pi' as name, + 'active' as status, + 17.89 as cost, + 1 as quantity, + 100000000 as quantity_big, + current_date as my_date +``` + +Iceberg supports bucketing as hidden partitions, therefore use the `partitioned_by` config to add specific bucketing +conditions. + +Iceberg supports several table formats for data : `PARQUET`, `AVRO` and `ORC`. + +It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported: + +- `append`: New records are appended to the table, this can lead to duplicates. +- `merge`: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only + available with Athena engine version 3. + - `unique_key` **(required)**: columns that define a unique record in the source and target tables. + - `incremental_predicates` (optional): SQL conditions that enable custom join clauses in the merge statement. This can + be useful for improving performance via predicate pushdown on the target table. + - `delete_condition` (optional): SQL condition used to identify records that should be deleted. + - `update_condition` (optional): SQL condition used to identify records that should be updated. + - `insert_condition` (optional): SQL condition used to identify records that should be inserted. + - `incremental_predicates`, `delete_condition`, `update_condition` and `insert_condition` can include any column of + the incremental table (`src`) or the final table (`target`). + Column names must be prefixed by either `src` or `target` to prevent a `Column is ambiguous` error. + +`delete_condition` example: + +```sql +{{ config( + materialized='incremental', + table_type='iceberg', + incremental_strategy='merge', + unique_key='user_id', + incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"], + delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year", + format='parquet' +) }} + +select 'A' as user_id, + 'pi' as name, + 'active' as status, + 17.89 as cost, + 1 as quantity, + 100000000 as quantity_big, + current_date as my_date +``` + +`update_condition` example: + +```sql +{{ config( + materialized='incremental', + incremental_strategy='merge', + unique_key=['id'], + update_condition='target.id > 1', + schema='sandbox' + ) +}} + +{% if is_incremental() %} + +select * from ( + values + (1, 'v1-updated') + , (2, 'v2-updated') +) as t (id, value) + +{% else %} + +select * from ( + values + (-1, 'v-1') + , (0, 'v0') + , (1, 'v1') + , (2, 'v2') +) as t (id, value) + +{% endif %} +``` + +`insert_condition` example: + +```sql +{{ config( + materialized='incremental', + incremental_strategy='merge', + unique_key=['id'], + insert_condition='target.status != 0', + schema='sandbox' + ) +}} + +select * from ( + values + (1, 0) + , (2, 1) +) as t (id, status) + +``` + +### Highly available table (HA) + +The current implementation of the table materialization can lead to downtime, as the target table is +dropped and re-created. To have the less destructive behavior it's possible to use the `ha` config on +your `table` materialized models. It leverages the table versions feature of glue catalog, creating +a temp table and swapping the target table to the location of the temp table. This materialization +is only available for `table_type=hive` and requires using unique locations. For iceberg, high +availability is the default. + +```sql +{{ config( + materialized='table', + ha=true, + format='parquet', + table_type='hive', + partitioned_by=['status'], + s3_data_naming='table_unique' +) }} + +select 'a' as user_id, + 'pi' as user_name, + 'active' as status +union all +select 'b' as user_id, + 'sh' as user_name, + 'disabled' as status +``` + +By default, the materialization keeps the last 4 table versions, you can change it by setting `versions_to_keep`. + +#### HA known issues + +- When swapping from a table with partitions to a table without (and the other way around), there could be a little + downtime. + If high performances is needed consider bucketing instead of partitions +- By default, Glue "duplicates" the versions internally, so the last two versions of a table point to the same location +- It's recommended to set `versions_to_keep` >= 4, as this will avoid having the older location removed + +### Update glue data catalog + +Optionally persist resource descriptions as column and relation comments to the glue data catalog, and meta as +[glue table properties](https://docs.aws.amazon.com/glue/latest/dg/tables-described.html#table-properties) +and [column parameters](https://docs.aws.amazon.com/glue/latest/webapi/API_Column.html). +By default, documentation persistence is disabled, but it can be enabled for specific resources or +groups of resources as needed. + +For example: + +```yaml +models: + - name: test_deduplicate + description: another value + config: + persist_docs: + relation: true + columns: true + meta: + test: value + columns: + - name: id + meta: + primary_key: true +``` + +See [persist docs](https://docs.getdbt.com/reference/resource-configs/persist_docs) for more details. + +## Snapshots + +The adapter supports snapshot materialization. It supports both timestamp and check strategy. To create a snapshot +create a snapshot file in the snapshots directory. If the directory does not exist create one. + +### Timestamp strategy + +To use the timestamp strategy refer to +the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#timestamp-strategy-recommended) + +### Check strategy + +To use the check strategy refer to the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#check-strategy) + +### Hard-deletes + +The materialization also supports invalidating hard deletes. Check +the [docs](https://docs.getdbt.com/docs/build/snapshots#hard-deletes-opt-in) to understand usage. + +### Working example + +seed file - employent_indicators_november_2022_csv_tables.csv + +```csv +Series_reference,Period,Data_value,Suppressed +MEIM.S1WA,1999.04,80267, +MEIM.S1WA,1999.05,70803, +MEIM.S1WA,1999.06,65792, +MEIM.S1WA,1999.07,66194, +MEIM.S1WA,1999.08,67259, +MEIM.S1WA,1999.09,69691, +MEIM.S1WA,1999.1,72475, +MEIM.S1WA,1999.11,79263, +MEIM.S1WA,1999.12,86540, +MEIM.S1WA,2000.01,82552, +MEIM.S1WA,2000.02,81709, +MEIM.S1WA,2000.03,84126, +MEIM.S1WA,2000.04,77089, +MEIM.S1WA,2000.05,73811, +MEIM.S1WA,2000.06,70070, +MEIM.S1WA,2000.07,69873, +MEIM.S1WA,2000.08,71468, +MEIM.S1WA,2000.09,72462, +MEIM.S1WA,2000.1,74897, +``` + +model.sql + +```sql +{{ config( + materialized='table' +) }} + +select row_number() over() as id + , * + , cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp +from {{ ref('employment_indicators_november_2022_csv_tables') }} +``` + +timestamp strategy - model_snapshot_1 + +```sql +{% snapshot model_snapshot_1 %} + +{{ + config( + strategy='timestamp', + updated_at='refresh_timestamp', + unique_key='id' + ) +}} + +select * +from {{ ref('model') }} {% endsnapshot %} +``` + +invalidate hard deletes - model_snapshot_2 + +```sql +{% snapshot model_snapshot_2 %} + +{{ + config + ( + unique_key='id', + strategy='timestamp', + updated_at='refresh_timestamp', + invalidate_hard_deletes=True, + ) +}} +select * +from {{ ref('model') }} {% endsnapshot %} +``` + +check strategy - model_snapshot_3 + +```sql +{% snapshot model_snapshot_3 %} + +{{ + config + ( + unique_key='id', + strategy='check', + check_cols=['series_reference','data_value'] + ) +}} +select * +from {{ ref('model') }} {% endsnapshot %} +``` + +### Snapshots known issues + +- Incremental Iceberg models - Sync all columns on schema change can't remove columns used for partitioning. + The only way, from a dbt perspective, is to do a full-refresh of the incremental model. + +- Tables, schemas and database names should only be lowercase + +- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not + installed in the target environment. + See for more details. + +- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column + from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history + +## AWS Lake Formation integration + +The adapter implements AWS Lake Formation tags management in the following way: + +- You can enable or disable lf-tags management via [config](#table-configuration) (disabled by default) +- Once you enable the feature, lf-tags will be updated on every dbt run +- First, all lf-tags for columns are removed to avoid inheritance issues +- Then, all redundant lf-tags are removed from tables and actual tags from table configs are applied +- Finally, lf-tags for columns are applied + +It's important to understand the following points: + +- dbt does not manage lf-tags for databases +- dbt does not manage Lake Formation permissions + +That's why you should handle this by yourself manually or using an automation tool like terraform, AWS CDK etc. +You may find the following links useful to manage that: + + +* [terraform aws_lakeformation_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) +* [terraform aws_lakeformation_resource_lf_tags](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_resource_lf_tags) + + +## Python models + +The adapter supports Python models using [`spark`](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). + +### Setup + +- A Spark-enabled workgroup created in Athena +- Spark execution role granted access to Athena, Glue and S3 +- The Spark workgroup is added to the `~/.dbt/profiles.yml` file and the profile to be used + is referenced in `dbt_project.yml` + +### Spark-specific table configuration + +- `timeout` (`default=43200`) + - Time out in seconds for each Python model execution. Defaults to 12 hours/43200 seconds. +- `spark_encryption` (`default=false`) + - If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored + locally by Spark. +- `spark_cross_account_catalog` (`default=false`) + - When using the Spark Athena workgroup, queries can only be made against catalogs located on the same + AWS account by default. However, sometimes you want to query another catalog located on an external AWS + account. Setting this additional Spark properties parameter to true will enable querying external catalogs. + You can use the syntax `external_catalog_id/database.table` to access the external table on the external + catalog (ex: `999999999999/mydatabase.cloudfront_logs` where 999999999999 is the external catalog ID) +- `spark_requester_pays` (`default=false`) + - When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for + data access and data transfer fees associated with the query. + - If this flag is set to true, requester pays S3 buckets are enabled in Athena for Spark. + +### Spark notes + +- A session is created for each unique engine configuration defined in the models that are part of the invocation. +- A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation + (Spark Python model) ready for execution and the engine configuration matches, the process will reuse the same session. +- The number of Python models running at a time depends on the `threads`. The number of sessions created for the + entire run depends on the number of unique engine configurations and the availability of sessions to maintain + thread concurrency. +- For Iceberg tables, it is recommended to use `table_properties` configuration to set the `format_version` to 2. + This is to maintain compatibility between Iceberg tables created by Trino with those created by Spark. + +### Example models + +#### Simple pandas model + +```python +import pandas as pd + + +def model(dbt, session): + dbt.config(materialized="table") + + model_df = pd.DataFrame({"A": [1, 2, 3, 4]}) + + return model_df +``` + +#### Simple spark + +```python +def model(dbt, spark_session): + dbt.config(materialized="table") + + data = [(1,), (2,), (3,), (4,)] + + df = spark_session.createDataFrame(data, ["A"]) + + return df +``` + +#### Spark incremental + +```python +def model(dbt, spark_session): + dbt.config(materialized="incremental") + df = dbt.ref("model") + + if dbt.is_incremental: + max_from_this = ( + f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}" + ) + df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0]) + + return df +``` + +#### Config spark model + +```python +def model(dbt, spark_session): + dbt.config( + materialized="table", + engine_config={ + "CoordinatorDpuSize": 1, + "MaxConcurrentDpus": 3, + "DefaultExecutorDpuSize": 1 + }, + spark_encryption=True, + spark_cross_account_catalog=True, + spark_requester_pays=True + polling_interval=15, + timeout=120, + ) + + data = [(1,), (2,), (3,), (4,)] + + df = spark_session.createDataFrame(data, ["A"]) + + return df +``` + +#### Create pySpark udf using imported external python files + +```python +def model(dbt, spark_session): + dbt.config( + materialized="incremental", + incremental_strategy="merge", + unique_key="num", + ) + sc = spark_session.sparkContext + sc.addPyFile("s3://athena-dbt/test/file1.py") + sc.addPyFile("s3://athena-dbt/test/file2.py") + + def func(iterator): + from file2 import transform + + return [transform(i) for i in iterator] + + from pyspark.sql.functions import udf + from pyspark.sql.functions import col + + udf_with_import = udf(func) + + data = [(1, "a"), (2, "b"), (3, "c")] + cols = ["num", "alpha"] + df = spark_session.createDataFrame(data, cols) + + return df.withColumn("udf_test_col", udf_with_import(col("alpha"))) +``` + +### Known issues in Python models + +- Python models cannot + [reference Athena SQL views](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). +- Third-party Python libraries can be used, but they must be [included in the pre-installed list][pre-installed list] + or [imported manually][imported manually]. +- Python models can only reference or write to tables with names meeting the + regular expression: `^[0-9a-zA-Z_]+$`. Dashes and special characters are not + supported by Spark, even though Athena supports them. +- Incremental models do not fully utilize Spark capabilities. They depend partially on existing SQL-based logic which + runs on Trino. +- Snapshot materializations are not supported. +- Spark can only reference tables within the same catalog. +- For tables created outside of the dbt tool, be sure to populate the location field or dbt will throw an error +when trying to create the table. + +[pre-installed list]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-preinstalled-python-libraries.html +[imported manually]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-import-files-libraries.html + +## Contracts + +The adapter partly supports contract definitions: + +- `data_type` is supported but needs to be adjusted for complex types. Types must be specified + entirely (for instance `array`) even though they won't be checked. Indeed, as dbt recommends, we only compare + the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types + defined in Athena are ok (pre-flight check). +- The adapter does not support the constraints since there is no constraint concept in Athena. + +## Contributing + +See [CONTRIBUTING](CONTRIBUTING.md) for more information on how to contribute to this project. + +## Contributors ✨ + +Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)): + + + + + +Contributions of any kind welcome! diff --git a/dbt-athena-community/pyproject.toml b/dbt-athena-community/pyproject.toml index b2dfe528..bf3028b2 100644 --- a/dbt-athena-community/pyproject.toml +++ b/dbt-athena-community/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "dbt-athena-community" description = "The athena adapter plugin for dbt (data build tool)" -readme = "../README.md" +readme = "README.md" keywords = ["dbt", "adapter", "adapters", "database", "elt", "dbt-core", "dbt Core", "dbt Cloud", "dbt Labs", "athena"] requires-python = ">=3.9.0" authors = [ diff --git a/dbt-athena/README.md b/dbt-athena/README.md new file mode 100644 index 00000000..c5487c05 --- /dev/null +++ b/dbt-athena/README.md @@ -0,0 +1,882 @@ + +

+ dbt logo +

+

+ + + + + + + + + + + + + + +

+ + +- [Features](#features) + - [Quick start](#quick-start) + - [Installation](#installation) + - [Prerequisites](#prerequisites) + - [Credentials](#credentials) + - [Configuring your profile](#configuring-your-profile) + - [Additional information](#additional-information) + - [Models](#models) + - [Table configuration](#table-configuration) + - [Table location](#table-location) + - [Incremental models](#incremental-models) + - [On schema change](#on-schema-change) + - [Iceberg](#iceberg) + - [Highly available table (HA)](#highly-available-table-ha) + - [HA known issues](#ha-known-issues) + - [Update glue data catalog](#update-glue-data-catalog) + - [Snapshots](#snapshots) + - [Timestamp strategy](#timestamp-strategy) + - [Check strategy](#check-strategy) + - [Hard-deletes](#hard-deletes) + - [Working example](#working-example) + - [Snapshots known issues](#snapshots-known-issues) + - [AWS Lake Formation integration](#aws-lake-formation-integration) + - [Python models](#python-models) + - [Contracts](#contracts) + - [Contributing](#contributing) + - [Contributors ✨](#contributors-) + + +# Features + +- Supports dbt version `1.7.*` +- Support for Python +- Supports [seeds][seeds] +- Correctly detects views and their columns +- Supports [table materialization][table] + - [Iceberg tables][athena-iceberg] are supported **only with Athena Engine v3** and **a unique table location** + (see table location section below) + - Hive tables are supported by both Athena engines +- Supports [incremental models][incremental] + - On Iceberg tables: + - Supports the use of `unique_key` only with the `merge` strategy + - Supports the `append` strategy + - On Hive tables: + - Supports two incremental update strategies: `insert_overwrite` and `append` + - Does **not** support the use of `unique_key` +- Supports [snapshots][snapshots] +- Supports [Python models][python-models] + +[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds + +[incremental]: https://docs.getdbt.com/docs/build/incremental-models + +[table]: https://docs.getdbt.com/docs/build/materializations#table + +[python-models]: https://docs.getdbt.com/docs/build/python-models#configuring-python-models + +[athena-iceberg]: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html + +[snapshots]: https://docs.getdbt.com/docs/build/snapshots + +## Quick start + +### Installation + +- `pip install dbt-athena-community` +- Or `pip install git+https://github.com/dbt-athena/dbt-athena.git` + +### Prerequisites + +To start, you will need an S3 bucket, for instance `my-bucket` and an Athena database: + +```sql +CREATE DATABASE IF NOT EXISTS analytics_dev +COMMENT 'Analytics models generated by dbt (development)' +LOCATION 's3://my-bucket/' +WITH DBPROPERTIES ('creator'='Foo Bar', 'email'='foo@bar.com'); +``` + +Notes: + +- Take note of your AWS region code (e.g. `us-west-2` or `eu-west-2`, etc.). +- You can also use [AWS Glue](https://docs.aws.amazon.com/athena/latest/ug/glue-athena.html) to create and manage Athena + databases. + +### Credentials + +Credentials can be passed directly to the adapter, or they can +be [determined automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) based +on `aws cli`/`boto3` conventions. +You can either: + +- Configure `aws_access_key_id` and `aws_secret_access_key` +- Configure `aws_profile_name` to match a profile defined in your AWS credentials file. + Checkout dbt profile configuration below for details. + +### Configuring your profile + +A dbt profile can be configured to run against AWS Athena using the following configuration: + +| Option | Description | Required? | Example | +|-----------------------|------------------------------------------------------------------------------------------|-----------|--------------------------------------------| +| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | +| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` | +| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` | +| s3_tmp_table_dir | Prefix for storing temporary tables, if different from the connection's `s3_data_dir` | Optional | `s3://bucket3/dbt/` | +| region_name | AWS region of your Athena instance | Required | `eu-west-1` | +| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | +| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | +| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | +| debug_query_state | Flag if debug message with Athena query state is needed | Optional | `false` | +| aws_access_key_id | Access key ID of the user performing requests | Optional | `AKIAIOSFODNN7EXAMPLE` | +| aws_secret_access_key | Secret access key of the user performing requests | Optional | `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` | +| aws_profile_name | Profile to use from your AWS shared credentials file | Optional | `my-profile` | +| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` | +| skip_workgroup_check | Indicates if the WorkGroup check (additional AWS call) can be skipped | Optional | `true` | +| num_retries | Number of times to retry a failing query | Optional | `3` | +| num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | `5` | +| num_iceberg_retries | Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR | Optional | `3` | +| spark_work_group | Identifier of Athena Spark workgroup for running Python models | Optional | `my-spark-workgroup` | +| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{"ACL": "bucket-owner-full-control"}` | +| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` | + +**Example profiles.yml entry:** + +```yaml +athena: + target: dev + outputs: + dev: + type: athena + s3_staging_dir: s3://athena-query-results/dbt/ + s3_data_dir: s3://your_s3_bucket/dbt/ + s3_data_naming: schema_table + s3_tmp_table_dir: s3://your_s3_bucket/temp/ + region_name: eu-west-1 + schema: dbt + database: awsdatacatalog + threads: 4 + aws_profile_name: my-profile + work_group: my-workgroup + spark_work_group: my-spark-workgroup + seed_s3_upload_args: + ACL: bucket-owner-full-control +``` + +### Additional information + +- `threads` is supported +- `database` and `catalog` can be used interchangeably + +## Models + +### Table configuration + +- `external_location` (`default=none`) + - If set, the full S3 path to which the table will be saved + - Works only with incremental models + - Does not work with Hive table with `ha` set to true +- `partitioned_by` (`default=none`) + - An array list of columns by which the table will be partitioned + - Limited to creation of 100 partitions (*currently*) +- `bucketed_by` (`default=none`) + - An array list of columns to bucket data, ignored if using Iceberg +- `bucket_count` (`default=none`) + - The number of buckets for bucketing your data, ignored if using Iceberg +- `table_type` (`default='hive'`) + - The type of table + - Supports `hive` or `iceberg` +- `ha` (`default=false`) + - If the table should be built using the high-availability method. This option is only available for Hive tables + since it is by default for Iceberg tables (see the section [below](#highly-available-table-ha)) +- `format` (`default='parquet'`) + - The data format for the table + - Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE` +- `write_compression` (`default=none`) + - The compression type to use for any storage format that allows compression to be specified. To see which options are + available, check out [CREATE TABLE AS][create-table-as] +- `field_delimiter` (`default=none`) + - Custom field delimiter, for when format is set to `TEXTFILE` +- `table_properties`: table properties to add to the table, valid for Iceberg only +- `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be + made to manage data in S3. Data in S3 will only be cleared up for Iceberg + tables [see AWS docs](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html). Note that + Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds. +- `seed_by_insert` (`default=false`) + - Default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement + - Large seed files cannot use `seed_by_insert`, as the SQL insert statement would + exceed [the Athena limit of 262144 bytes](https://docs.aws.amazon.com/athena/latest/ug/service-limits.html) +- `force_batch` (`default=false`) + - Skip creating the table as CTAS and run the operation directly in batch insert mode + - This is particularly useful when the standard table creation process fails due to partition limitations, + allowing you to work with temporary tables and persist the dataset more efficiently +- `unique_tmp_table_suffix` (`default=false`) + - For incremental models using insert overwrite strategy on hive table + - Replace the __dbt_tmp suffix used as temporary table name suffix by a unique uuid + - Useful if you are looking to run multiple dbt build inserting in the same table in parallel +- `temp_schema` (`default=none`) + - For incremental models, it allows to define a schema to hold temporary create statements + used in incremental model runs + - Schema will be created in the model target database if does not exist +- `lf_tags_config` (`default=none`) + - [AWS Lake Formation](#aws-lake-formation-integration) tags to associate with the table and columns + - `enabled` (`default=False`) whether LF tags management is enabled for a model + - `tags` dictionary with tags and their values to assign for the model + - `tags_columns` dictionary with a tag key, value and list of columns they must be assigned to + - `lf_inherited_tags` (`default=none`) + - List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be + removed during association of those defined in `lf_tags_config` + - i.e., the default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from + tables and columns before associating the ones currently defined for a given model + - This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly + +```sql +{{ + config( + materialized='incremental', + incremental_strategy='append', + on_schema_change='append_new_columns', + table_type='iceberg', + schema='test_schema', + lf_tags_config={ + 'enabled': true, + 'tags': { + 'tag1': 'value1', + 'tag2': 'value2' + }, + 'tags_columns': { + 'tag1': { + 'value1': ['column1', 'column2'], + 'value2': ['column3', 'column4'] + } + }, + 'inherited_tags': ['tag1', 'tag2'] + } + ) +}} +``` + +- Format for `dbt_project.yml`: + +```yaml + +lf_tags_config: + enabled: true + tags: + tag1: value1 + tag2: value2 + tags_columns: + tag1: + value1: [ column1, column2 ] + inherited_tags: [ tag1, tag2 ] +``` + +- `lf_grants` (`default=none`) + - Lake Formation grants config for data_cell filters + - Format: + + ```python + lf_grants={ + 'data_cell_filters': { + 'enabled': True | False, + 'filters': { + 'filter_name': { + 'row_filter': '', + 'principals': ['principal_arn1', 'principal_arn2'] + } + } + } + } + ``` + +> Notes: +> +> - `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources. +> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use +> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or +> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose. +> - `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table +> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every +> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and +> apply changes if they occur: drop, create, update filters and their permissions. +> - Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at + the table and column level +> - Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made +> previously +> - e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed + prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform + where the inherited value is configured nor in the DBT project where the override previously existed but now is + gone) + +[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties + +### Table location + +The location a table is saved to is determined by: + +1. If `external_location` is defined, that value is used +2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming` +3. If `s3_data_dir` is not defined, data is stored under `s3_staging_dir/tables/` + +Here all the options available for `s3_data_naming`: + +- `unique`: `{s3_data_dir}/{uuid4()}/` +- `table`: `{s3_data_dir}/{table}/` +- `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/` +- `schema_table`: `{s3_data_dir}/{schema}/{table}/` +- `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/` + +It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config, +or setting up the value for groups of model in dbt_project.yml. + +> Note: when using a workgroup with a default output location configured, `s3_data_naming` and any configured buckets +> are ignored and the location configured in the workgroup is used. + +### Incremental models + +Support for [incremental models](https://docs.getdbt.com/docs/build/incremental-models). + +These strategies are supported: + +- `insert_overwrite` (default): The insert overwrite strategy deletes the overlapping partitions from the destination + table, and then inserts the new records from the source. This strategy depends on the `partitioned_by` keyword! If no + partitions are defined, dbt will fall back to the `append` strategy. +- `append`: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate + data (e.g. great for log or historical data). +- `merge`: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with `unique_key`. + Only available when using Iceberg. + +### On schema change + +`on_schema_change` is an option to reflect changes of schema in incremental models. +The following options are supported: + +- `ignore` (default) +- `fail` +- `append_new_columns` +- `sync_all_columns` + +For details, please refer +to [dbt docs](https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change). + +### Iceberg + +The adapter supports table materialization for Iceberg. + +To get started just add this as your model: + +```sql +{{ config( + materialized='table', + table_type='iceberg', + format='parquet', + partitioned_by=['bucket(user_id, 5)'], + table_properties={ + 'optimize_rewrite_delete_file_threshold': '2' + } +) }} + +select 'A' as user_id, + 'pi' as name, + 'active' as status, + 17.89 as cost, + 1 as quantity, + 100000000 as quantity_big, + current_date as my_date +``` + +Iceberg supports bucketing as hidden partitions, therefore use the `partitioned_by` config to add specific bucketing +conditions. + +Iceberg supports several table formats for data : `PARQUET`, `AVRO` and `ORC`. + +It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported: + +- `append`: New records are appended to the table, this can lead to duplicates. +- `merge`: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only + available with Athena engine version 3. + - `unique_key` **(required)**: columns that define a unique record in the source and target tables. + - `incremental_predicates` (optional): SQL conditions that enable custom join clauses in the merge statement. This can + be useful for improving performance via predicate pushdown on the target table. + - `delete_condition` (optional): SQL condition used to identify records that should be deleted. + - `update_condition` (optional): SQL condition used to identify records that should be updated. + - `insert_condition` (optional): SQL condition used to identify records that should be inserted. + - `incremental_predicates`, `delete_condition`, `update_condition` and `insert_condition` can include any column of + the incremental table (`src`) or the final table (`target`). + Column names must be prefixed by either `src` or `target` to prevent a `Column is ambiguous` error. + +`delete_condition` example: + +```sql +{{ config( + materialized='incremental', + table_type='iceberg', + incremental_strategy='merge', + unique_key='user_id', + incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"], + delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year", + format='parquet' +) }} + +select 'A' as user_id, + 'pi' as name, + 'active' as status, + 17.89 as cost, + 1 as quantity, + 100000000 as quantity_big, + current_date as my_date +``` + +`update_condition` example: + +```sql +{{ config( + materialized='incremental', + incremental_strategy='merge', + unique_key=['id'], + update_condition='target.id > 1', + schema='sandbox' + ) +}} + +{% if is_incremental() %} + +select * from ( + values + (1, 'v1-updated') + , (2, 'v2-updated') +) as t (id, value) + +{% else %} + +select * from ( + values + (-1, 'v-1') + , (0, 'v0') + , (1, 'v1') + , (2, 'v2') +) as t (id, value) + +{% endif %} +``` + +`insert_condition` example: + +```sql +{{ config( + materialized='incremental', + incremental_strategy='merge', + unique_key=['id'], + insert_condition='target.status != 0', + schema='sandbox' + ) +}} + +select * from ( + values + (1, 0) + , (2, 1) +) as t (id, status) + +``` + +### Highly available table (HA) + +The current implementation of the table materialization can lead to downtime, as the target table is +dropped and re-created. To have the less destructive behavior it's possible to use the `ha` config on +your `table` materialized models. It leverages the table versions feature of glue catalog, creating +a temp table and swapping the target table to the location of the temp table. This materialization +is only available for `table_type=hive` and requires using unique locations. For iceberg, high +availability is the default. + +```sql +{{ config( + materialized='table', + ha=true, + format='parquet', + table_type='hive', + partitioned_by=['status'], + s3_data_naming='table_unique' +) }} + +select 'a' as user_id, + 'pi' as user_name, + 'active' as status +union all +select 'b' as user_id, + 'sh' as user_name, + 'disabled' as status +``` + +By default, the materialization keeps the last 4 table versions, you can change it by setting `versions_to_keep`. + +#### HA known issues + +- When swapping from a table with partitions to a table without (and the other way around), there could be a little + downtime. + If high performances is needed consider bucketing instead of partitions +- By default, Glue "duplicates" the versions internally, so the last two versions of a table point to the same location +- It's recommended to set `versions_to_keep` >= 4, as this will avoid having the older location removed + +### Update glue data catalog + +Optionally persist resource descriptions as column and relation comments to the glue data catalog, and meta as +[glue table properties](https://docs.aws.amazon.com/glue/latest/dg/tables-described.html#table-properties) +and [column parameters](https://docs.aws.amazon.com/glue/latest/webapi/API_Column.html). +By default, documentation persistence is disabled, but it can be enabled for specific resources or +groups of resources as needed. + +For example: + +```yaml +models: + - name: test_deduplicate + description: another value + config: + persist_docs: + relation: true + columns: true + meta: + test: value + columns: + - name: id + meta: + primary_key: true +``` + +See [persist docs](https://docs.getdbt.com/reference/resource-configs/persist_docs) for more details. + +## Snapshots + +The adapter supports snapshot materialization. It supports both timestamp and check strategy. To create a snapshot +create a snapshot file in the snapshots directory. If the directory does not exist create one. + +### Timestamp strategy + +To use the timestamp strategy refer to +the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#timestamp-strategy-recommended) + +### Check strategy + +To use the check strategy refer to the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#check-strategy) + +### Hard-deletes + +The materialization also supports invalidating hard deletes. Check +the [docs](https://docs.getdbt.com/docs/build/snapshots#hard-deletes-opt-in) to understand usage. + +### Working example + +seed file - employent_indicators_november_2022_csv_tables.csv + +```csv +Series_reference,Period,Data_value,Suppressed +MEIM.S1WA,1999.04,80267, +MEIM.S1WA,1999.05,70803, +MEIM.S1WA,1999.06,65792, +MEIM.S1WA,1999.07,66194, +MEIM.S1WA,1999.08,67259, +MEIM.S1WA,1999.09,69691, +MEIM.S1WA,1999.1,72475, +MEIM.S1WA,1999.11,79263, +MEIM.S1WA,1999.12,86540, +MEIM.S1WA,2000.01,82552, +MEIM.S1WA,2000.02,81709, +MEIM.S1WA,2000.03,84126, +MEIM.S1WA,2000.04,77089, +MEIM.S1WA,2000.05,73811, +MEIM.S1WA,2000.06,70070, +MEIM.S1WA,2000.07,69873, +MEIM.S1WA,2000.08,71468, +MEIM.S1WA,2000.09,72462, +MEIM.S1WA,2000.1,74897, +``` + +model.sql + +```sql +{{ config( + materialized='table' +) }} + +select row_number() over() as id + , * + , cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp +from {{ ref('employment_indicators_november_2022_csv_tables') }} +``` + +timestamp strategy - model_snapshot_1 + +```sql +{% snapshot model_snapshot_1 %} + +{{ + config( + strategy='timestamp', + updated_at='refresh_timestamp', + unique_key='id' + ) +}} + +select * +from {{ ref('model') }} {% endsnapshot %} +``` + +invalidate hard deletes - model_snapshot_2 + +```sql +{% snapshot model_snapshot_2 %} + +{{ + config + ( + unique_key='id', + strategy='timestamp', + updated_at='refresh_timestamp', + invalidate_hard_deletes=True, + ) +}} +select * +from {{ ref('model') }} {% endsnapshot %} +``` + +check strategy - model_snapshot_3 + +```sql +{% snapshot model_snapshot_3 %} + +{{ + config + ( + unique_key='id', + strategy='check', + check_cols=['series_reference','data_value'] + ) +}} +select * +from {{ ref('model') }} {% endsnapshot %} +``` + +### Snapshots known issues + +- Incremental Iceberg models - Sync all columns on schema change can't remove columns used for partitioning. + The only way, from a dbt perspective, is to do a full-refresh of the incremental model. + +- Tables, schemas and database names should only be lowercase + +- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not + installed in the target environment. + See for more details. + +- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column + from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history + +## AWS Lake Formation integration + +The adapter implements AWS Lake Formation tags management in the following way: + +- You can enable or disable lf-tags management via [config](#table-configuration) (disabled by default) +- Once you enable the feature, lf-tags will be updated on every dbt run +- First, all lf-tags for columns are removed to avoid inheritance issues +- Then, all redundant lf-tags are removed from tables and actual tags from table configs are applied +- Finally, lf-tags for columns are applied + +It's important to understand the following points: + +- dbt does not manage lf-tags for databases +- dbt does not manage Lake Formation permissions + +That's why you should handle this by yourself manually or using an automation tool like terraform, AWS CDK etc. +You may find the following links useful to manage that: + + +* [terraform aws_lakeformation_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) +* [terraform aws_lakeformation_resource_lf_tags](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_resource_lf_tags) + + +## Python models + +The adapter supports Python models using [`spark`](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). + +### Setup + +- A Spark-enabled workgroup created in Athena +- Spark execution role granted access to Athena, Glue and S3 +- The Spark workgroup is added to the `~/.dbt/profiles.yml` file and the profile to be used + is referenced in `dbt_project.yml` + +### Spark-specific table configuration + +- `timeout` (`default=43200`) + - Time out in seconds for each Python model execution. Defaults to 12 hours/43200 seconds. +- `spark_encryption` (`default=false`) + - If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored + locally by Spark. +- `spark_cross_account_catalog` (`default=false`) + - When using the Spark Athena workgroup, queries can only be made against catalogs located on the same + AWS account by default. However, sometimes you want to query another catalog located on an external AWS + account. Setting this additional Spark properties parameter to true will enable querying external catalogs. + You can use the syntax `external_catalog_id/database.table` to access the external table on the external + catalog (ex: `999999999999/mydatabase.cloudfront_logs` where 999999999999 is the external catalog ID) +- `spark_requester_pays` (`default=false`) + - When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for + data access and data transfer fees associated with the query. + - If this flag is set to true, requester pays S3 buckets are enabled in Athena for Spark. + +### Spark notes + +- A session is created for each unique engine configuration defined in the models that are part of the invocation. +- A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation + (Spark Python model) ready for execution and the engine configuration matches, the process will reuse the same session. +- The number of Python models running at a time depends on the `threads`. The number of sessions created for the + entire run depends on the number of unique engine configurations and the availability of sessions to maintain + thread concurrency. +- For Iceberg tables, it is recommended to use `table_properties` configuration to set the `format_version` to 2. + This is to maintain compatibility between Iceberg tables created by Trino with those created by Spark. + +### Example models + +#### Simple pandas model + +```python +import pandas as pd + + +def model(dbt, session): + dbt.config(materialized="table") + + model_df = pd.DataFrame({"A": [1, 2, 3, 4]}) + + return model_df +``` + +#### Simple spark + +```python +def model(dbt, spark_session): + dbt.config(materialized="table") + + data = [(1,), (2,), (3,), (4,)] + + df = spark_session.createDataFrame(data, ["A"]) + + return df +``` + +#### Spark incremental + +```python +def model(dbt, spark_session): + dbt.config(materialized="incremental") + df = dbt.ref("model") + + if dbt.is_incremental: + max_from_this = ( + f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}" + ) + df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0]) + + return df +``` + +#### Config spark model + +```python +def model(dbt, spark_session): + dbt.config( + materialized="table", + engine_config={ + "CoordinatorDpuSize": 1, + "MaxConcurrentDpus": 3, + "DefaultExecutorDpuSize": 1 + }, + spark_encryption=True, + spark_cross_account_catalog=True, + spark_requester_pays=True + polling_interval=15, + timeout=120, + ) + + data = [(1,), (2,), (3,), (4,)] + + df = spark_session.createDataFrame(data, ["A"]) + + return df +``` + +#### Create pySpark udf using imported external python files + +```python +def model(dbt, spark_session): + dbt.config( + materialized="incremental", + incremental_strategy="merge", + unique_key="num", + ) + sc = spark_session.sparkContext + sc.addPyFile("s3://athena-dbt/test/file1.py") + sc.addPyFile("s3://athena-dbt/test/file2.py") + + def func(iterator): + from file2 import transform + + return [transform(i) for i in iterator] + + from pyspark.sql.functions import udf + from pyspark.sql.functions import col + + udf_with_import = udf(func) + + data = [(1, "a"), (2, "b"), (3, "c")] + cols = ["num", "alpha"] + df = spark_session.createDataFrame(data, cols) + + return df.withColumn("udf_test_col", udf_with_import(col("alpha"))) +``` + +### Known issues in Python models + +- Python models cannot + [reference Athena SQL views](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). +- Third-party Python libraries can be used, but they must be [included in the pre-installed list][pre-installed list] + or [imported manually][imported manually]. +- Python models can only reference or write to tables with names meeting the + regular expression: `^[0-9a-zA-Z_]+$`. Dashes and special characters are not + supported by Spark, even though Athena supports them. +- Incremental models do not fully utilize Spark capabilities. They depend partially on existing SQL-based logic which + runs on Trino. +- Snapshot materializations are not supported. +- Spark can only reference tables within the same catalog. +- For tables created outside of the dbt tool, be sure to populate the location field or dbt will throw an error +when trying to create the table. + +[pre-installed list]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-preinstalled-python-libraries.html +[imported manually]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-import-files-libraries.html + +## Contracts + +The adapter partly supports contract definitions: + +- `data_type` is supported but needs to be adjusted for complex types. Types must be specified + entirely (for instance `array`) even though they won't be checked. Indeed, as dbt recommends, we only compare + the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types + defined in Athena are ok (pre-flight check). +- The adapter does not support the constraints since there is no constraint concept in Athena. + +## Contributing + +See [CONTRIBUTING](CONTRIBUTING.md) for more information on how to contribute to this project. + +## Contributors ✨ + +Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)): + + + + + +Contributions of any kind welcome! diff --git a/dbt-athena/pyproject.toml b/dbt-athena/pyproject.toml index 4832e320..6aa8426e 100644 --- a/dbt-athena/pyproject.toml +++ b/dbt-athena/pyproject.toml @@ -2,7 +2,7 @@ dynamic = ["version"] name = "dbt-athena" description = "The athena adapter plugin for dbt (data build tool)" -readme = "../README.md" +readme = "README.md" keywords = ["dbt", "adapter", "adapters", "database", "elt", "dbt-core", "dbt Core", "dbt Cloud", "dbt Labs", "athena"] requires-python = ">=3.9.0" authors = [