Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.15.0 upgrade #46

Merged
merged 11 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ env/
*.pyc
__pycache__
.tox/
.idea/
build/
dist/
dbt-integration-tests
36 changes: 26 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ For more information on using Spark with dbt, consult the [dbt documentation](ht
### Installation
This plugin can be installed via pip:
```
# Install prerequisites:
$ pip install pyhive[hive]

# Install dbt-spark:
$ pip install dbt-spark
```
Expand Down Expand Up @@ -79,18 +76,20 @@ your_profile_name:
The following configurations can be supplied to models run with the dbt-spark plugin:


| Option | Description | Required? | Example |
|---------|----------------------------------------------------|-------------------------|--------------------------|
| file_format | The file format to use when creating tables | Optional | `parquet` |
| Option | Description | Required? | Example |
|-------------|----------------------------------------------------|-------------------------|------------------------|
| file_format | The file format to use when creating tables (`parquet`, `delta`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | `delta` |
| incremental_strategy | The strategy to use for incremental models (`insert_overwrite` or `merge`). Note `merge` requires `file_format` = `delta` and `unique_key` to be specified. | Optional (default: `insert_overwrite`) | `merge` |
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Required | `['date_day']` |
| cluster_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. This is typically used with partitioning to read and shuffle less data. | Optional | `['name', 'address']` |
| num_buckets | Used in conjunction with `cluster_by`. | Optional (required if `cluster_by` is specified) | `3` |



**Incremental Models**

Spark does not natively support `delete`, `update`, or `merge` statements. As such, [incremental models](https://docs.getdbt.com/docs/configuring-incremental-models)
are implemented differently than usual in this plugin. To use incremental models, specify a `partition_by` clause in your model config.
dbt will use an `insert overwrite` query to overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant
data for a partition when using incremental models.
To use incremental models, specify a `partition_by` clause in your model config. The default incremental strategy used is `insert_overwrite`, which will overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant
data for a partition when using the `insert_overwrite` strategy.

```
{{ config(
Expand All @@ -113,6 +112,23 @@ where date_day::date >= '2019-01-01'
group by 1
```

The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). It also requires you to specify a `unique key` to match existing records.

```
{{ config(
materialized='incremental',
incremental_strategy='merge',
partition_by=['date_day'],
file_format='delta'
) }}

select *
from {{ ref('events') }}
{% if is_incremental() %}
where date_day > (select max(date_day) from {{ this }})
{% endif %}
```

### Reporting bugs and contributing code

- Want to report a bug or request a feature? Let us know on [Slack](http://slack.getdbt.com/), or open [an issue](https://github.com/fishtown-analytics/dbt-spark/issues/new).
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dbt.adapters.spark.connections import SparkConnectionManager # noqa
from dbt.adapters.spark.connections import SparkCredentials
from dbt.adapters.spark.relation import SparkRelation # noqa
from dbt.adapters.spark.column import SparkColumn # noqa
from dbt.adapters.spark.impl import SparkAdapter

from dbt.adapters.base import AdapterPlugin
Expand Down
41 changes: 41 additions & 0 deletions dbt/adapters/spark/column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from dataclasses import dataclass
from typing import TypeVar

from dbt.adapters.base.column import Column

Self = TypeVar('Self', bound='SparkColumn')


@dataclass(init=False)
class SparkColumn(Column):
column: str
dtype: str
comment: str

def __init__(
self,
column: str,
dtype: str,
comment: str = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If comments can be None, instead just do:

@dataclass
class SparkColumn(Column):
  column: str
  dtype: str
  comment: Optional[str]

That will add comment as a new optional str field, which is what it is based on this argument spec.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, thanks.

) -> None:
super().__init__(column, dtype)

self.comment = comment

@classmethod
def translate_type(cls, dtype: str) -> str:
return dtype

def can_expand_to(self: Self, other_column: Self) -> bool:
"""returns True if both columns are strings"""
return self.is_string() and other_column.is_string()

def literal(self, value):
return "cast({} as {})".format(value, self.dtype)

@property
def data_type(self) -> str:
return self.dtype

def __repr__(self) -> str:
return "<SparkColumn {} ({})>".format(self.name, self.data_type)
Loading