Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.15.3 upgrade #65

Merged
merged 18 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,18 @@ The following configurations can be supplied to models run with the dbt-spark pl


| Option | Description | Required? | Example |
|---------|----------------------------------------------------|-------------------------|--------------------------|
| file_format | The file format to use when creating tables | Optional | `parquet` |
| file_format | The file format to use when creating tables (`parquet`, `delta`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | `parquet` |
| location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | `/mnt/root` |
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` |
| incremental_strategy | The strategy to use for incremental models (`insert_overwrite` or `merge`). Note `merge` requires `file_format` = `delta` and `unique_key` to be specified. | Optional (default: `insert_overwrite`) | `merge` |


**Incremental Models**

Spark does not natively support `delete`, `update`, or `merge` statements. As such, [incremental models](https://docs.getdbt.com/docs/configuring-incremental-models)
are implemented differently than usual in this plugin. To use incremental models, specify a `partition_by` clause in your model config.
dbt will use an `insert overwrite` query to overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant
data for a partition when using incremental models.
To use incremental models, specify a `partition_by` clause in your model config. The default incremental strategy used is `insert_overwrite`, which will overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant
data for a partition when using the `insert_overwrite` strategy.

```
{{ config(
Expand All @@ -152,6 +150,23 @@ where date_day::date >= '2019-01-01'
group by 1
```

The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). It also requires you to specify a `unique key` to match existing records.

```
{{ config(
materialized='incremental',
incremental_strategy='merge',
partition_by=['date_day'],
file_format='delta'
) }}

select *
from {{ ref('events') }}
{% if is_incremental() %}
where date_day > (select max(date_day) from {{ this }})
{% endif %}
```

### Running locally

A `docker-compose` environment starts a Spark Thrift server and a Postgres database as a Hive Metastore backend.
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dbt.adapters.spark.connections import SparkConnectionManager # noqa
from dbt.adapters.spark.connections import SparkCredentials
from dbt.adapters.spark.relation import SparkRelation # noqa
from dbt.adapters.spark.column import SparkColumn # noqa
from dbt.adapters.spark.impl import SparkAdapter

from dbt.adapters.base import AdapterPlugin
Expand Down
63 changes: 63 additions & 0 deletions dbt/adapters/spark/column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dataclasses import dataclass
from typing import TypeVar, Optional, Dict, Any

from dbt.adapters.base.column import Column

Self = TypeVar('Self', bound='SparkColumn')


@dataclass
class SparkColumn(Column):
table_database: Optional[str] = None
table_schema: Optional[str] = None
table_name: Optional[str] = None
table_type: Optional[str] = None
table_owner: Optional[str] = None
table_stats: Optional[Dict[str, Any]] = None
column_index: Optional[int] = None

@classmethod
def translate_type(cls, dtype: str) -> str:
return dtype

def can_expand_to(self: Self, other_column: Self) -> bool:
"""returns True if both columns are strings"""
return self.is_string() and other_column.is_string()

def literal(self, value):
return "cast({} as {})".format(value, self.dtype)

@property
def quoted(self) -> str:
return '`{}`'.format(self.column)

@property
def data_type(self) -> str:
return self.dtype

def __repr__(self) -> str:
return "<SparkColumn {} ({})>".format(self.name, self.data_type)

@staticmethod
def convert_table_stats(raw_stats: Optional[str]) -> Dict[str, Any]:
table_stats = {}
if raw_stats:
# format: 1109049927 bytes, 14093476 rows
stats = {
stats.split(" ")[1]: int(stats.split(" ")[0])
for stats in raw_stats.split(', ')
}
for key, val in stats.items():
table_stats[f'stats:{key}:label'] = key
table_stats[f'stats:{key}:value'] = val
table_stats[f'stats:{key}:description'] = ''
table_stats[f'stats:{key}:include'] = True
return table_stats

def to_dict(self, omit_none=False):
original_dict = super().to_dict(omit_none=omit_none)
# If there are stats, merge them into the root of the dict
original_stats = original_dict.pop('table_stats')
if original_stats:
original_dict.update(original_stats)
return original_dict
Loading