-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
0.15.0 upgrade #46
0.15.0 upgrade #46
Conversation
@SamKosky Thank you for your tremendous effort in consolidating and coalescing all the proposed changes to the From here on, I'm going to be running point as the primary maintainer of this plugin, with support from @drewbanin and @beckjake. Over the next week, I’m going to work through the open PRs one by one. Once all of those PRs are merged, would you be able to rebase this branch on top of the latest |
After spending the start of this week re-familiarizing myself with the open issues and PRs, it's become clear to me that there's a lot of really heroic work baked into this PR. I'm particularly excited about the addition of On areas where there's overlap with other PRs currently open—the extensions to I'm also interested in whether we can leverage the @beckjake: Would you be able to look at some of the deeper python elements? Particularly the switch to dataclasses for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some feedback on the dataclasses parts, they mostly look pretty good. Also a quick note on a potential type narrowing issue w/floats.
dbt/adapters/spark/column.py
Outdated
self, | ||
column: str, | ||
dtype: str, | ||
comment: str = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If comments can be None, instead just do:
@dataclass
class SparkColumn(Column):
column: str
dtype: str
comment: Optional[str]
That will add comment
as a new optional str field, which is what it is based on this argument spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect, thanks.
dbt/adapters/spark/connections.py
Outdated
|
||
@property | ||
def type(self): | ||
return 'spark' | ||
|
||
def _connection_keys(self): | ||
return ('host', 'port', 'cluster', 'schema') | ||
return 'host', 'port', 'schema', 'organization' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is cluster secret? If it's not, probably worth including it again so it shows up in dbt debug
output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cluster is just a random ID assigned by Spark/Databricks. I'll update this now.
@@ -136,6 +108,10 @@ def execute(self, sql, bindings=None): | |||
ThriftState.FINISHED_STATE, | |||
] | |||
|
|||
# Convert decimal.Decimal to float as PyHive doesn't work with decimals | |||
if bindings: | |||
bindings = [float(x) if isinstance(x, decimal.Decimal) else x for x in bindings] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this plugin have to handle numbers that double-precision floats can't represent? For example, 2^64
can't be represented by floats. Does pyhive accept str
s? If so you could use str
in place of float to get exact representations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fairly unlikely. If it were to happen, chances are strings would be in use already instead of trying to represent it as a decimal. But not opposed to changing this to str
if you think it's a better approach.
@dataclass(frozen=True, eq=False, repr=False) | ||
class SparkRelation(BaseRelation): | ||
quote_policy: SparkQuotePolicy = SparkQuotePolicy() | ||
include_policy: SparkIncludePolicy = SparkIncludePolicy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming this wasn't intended to change the quote character to "
, I think you'll also want to include:
quote_character: str = '`'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've actually overridden SparkQuotePolicy
and set everything to not include quotes. I'm using this adapter with Databricks and there are restrictions on table names, specifically;
Valid names only contain alphabet characters, numbers and _.
I'm not sure if this is the same with Spark, but I can only assume. I'd like to just enforce no quotes if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like Spark itself encodes that restriction on object name validity. I checked and saw that it does not enforce the same validation on column names, however.
There's a lot of dbt and dbt-adjacent functionality that uses the quoted
versions of columns and relations by default, since it gives us more flexibility across databases. In general, even though quoting is far less important on Spark than on Redshift or Snowflake, I think it's a better idea to keep quoting implemented in this plugin, rather than needing to reimplement dozens of common macros.
A specific example that came up today: the dbt_utils.equality
test uses the quoted
attribute from get_columns_from_relation
, and missing that attribute caused an integration test to fail. I think our answer would be to override the quoted
property of the SparkColumn
class in the same way BigQuery does here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I have implemented this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SamKosky Thanks for your patience on this. I think we're very close to merging the three currently open PRs, all of which are narrow in scope. At that point, I'm hopeful that we can test these changes, resolve any conflicts, and ship version 0.15.0.
In the meantime, would you be able to rebase this branch onto master
?
@dataclass(frozen=True, eq=False, repr=False) | ||
class SparkRelation(BaseRelation): | ||
quote_policy: SparkQuotePolicy = SparkQuotePolicy() | ||
include_policy: SparkIncludePolicy = SparkIncludePolicy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like Spark itself encodes that restriction on object name validity. I checked and saw that it does not enforce the same validation on column names, however.
There's a lot of dbt and dbt-adjacent functionality that uses the quoted
versions of columns and relations by default, since it gives us more flexibility across databases. In general, even though quoting is far less important on Spark than on Redshift or Snowflake, I think it's a better idea to keep quoting implemented in this plugin, rather than needing to reimplement dozens of common macros.
A specific example that came up today: the dbt_utils.equality
test uses the quoted
attribute from get_columns_from_relation
, and missing that attribute caused an integration test to fail. I think our answer would be to override the quoted
property of the SparkColumn
class in the same way BigQuery does here.
@SamKosky Thanks for your contribution. I have a question about table overwrite. When model is a "table", the current behavior is to drop and create the table. Since spark doesn't support the transaction, it is not good to drop the table first. I noticed that your change will do a temp table and rename, which is great! I wonder if it is possible to support "Insert into overwrite" statement https://docs.databricks.com/spark/latest/spark-sql/language-manual/insert.html. In this way, we can use delta table's time travel feature. Any suggestions on how to make that an option? |
@tongqqiu if you want to use the delta functionality, you need to change your materialisations to |
@SamKosky Thanks for your information. I am working a PR to make snapshot working on spark delta format. Please take a look when you have time SamKosky#1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SamKosky I left a few comments, specifically on the view, table, and incremental materializations.
Beyond that, I think the big blocker to shipping in this code is rebasing or merging the changes from master
. Is that something you'd be able to do in the near future? I'm hoping to cut a release next week.
-- build model | ||
{% call statement('main') -%} | ||
{{ create_table_as(False, target_relation, sql) }} | ||
{{ create_table_as(False, intermediate_relation, sql) }} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We switched (in #17) from using the default (Postgres/Redshift) approach (creating an intermediate table and performing a rename-swap) to a naive approach (drop the existing table, create it anew) for two main reasons:
- The appeal of this clever approach on Postgres/Redshift is the atomicity of running the whole thing inside of a transaction (except for the final
drop
). Spark doesn't have transactions, so if a query fails, it gets stuck midway through the process. It felt better to have a simple/intuitive process of only two steps (drop + create) than potentially have__dbt_tmp
and__dbt_backup
relational objects hanging around. - In Databricks (and potentially other hosted Spark providers), if it's a managed table, renaming a table physically moves the contents of that table from one file location to another. We found that the
alter ... rename
step can take longer than thecreate
step.
I'd be open to hearing arguments in favor of returning to this default, cleverer approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jtcohen6 For drop + create approach, the table is not available before create complete. Right? Any suggestions if I want to use that for dim table? That table might be temporarily unavailable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jtcohen6 For drop + create approach, the table is not available before create complete. Right? Any suggestions if I want to use that for dim table? That table might be temporarily unavailable?
That's correct, but it's true for the default/Redshift/Postgres approach too (create ... __dbt_tmp
+ alter ... rename
+ alter ... rename
+ drop
), because Spark doesn't have transactions. The table will be unavailable while the second alter ... rename
is occurring, and we found that many renames (especially for managed tables) take longer than simply recreating the table from scratch. So the drop
+ create
approach is both simpler and, in many cases, results in less overall downtime.
The only atomic operation currently available in Spark for updating/replacing a table is insert overwrite
. Though it feels a bit counterintuitive, you could materialize your dim table as an incremental model, and re-select all the data (= overwrite all partitions) every time.
I think the best answer is that create or replace table
is coming in Spark 3.0.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comments.
{%- set identifier = model['alias'] -%} | ||
{%- set tmp_identifier = model['name'] + '__dbt_tmp' -%} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of the view materialization, since Spark supports create or replace view
, I think we should move in the direction of Snowflake, BigQuery, and Presto (dbt-labs/dbt-presto#13) and away from the clever transactional approach in Postgres/Redshift (default).
I think this materialization would look like:
{% materialization view, adapter='spark' -%}
{{ return(create_or_replace_view()) }}
{%- endmaterialization %}
And in the adapter macro:
{% macro spark__create_view_as(relation, sql) -%}
create or replace view
{{ relation }}
as
{{ sql }}
{% endmacro %}
This would slim the materialization code down significantly, and leverage some existing art in dbt-core
's create_or_replace_view
macro.
{%- set old_relation = none -%} | ||
{%- endif %} | ||
{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %} | ||
{%- if strategy == 'insert_overwrite' -%} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I split the insert_overwrite
approach into its own macro, get_insert_overwrite_sql
, in #60. I think you should pull out the merge
SQL into a get_merge_sql
macro, and then dbt_spark_get_incremental_sql
can call get_insert_overwrite_sql
and get_merge_sql
based on the strategy.
A note to myself: dbt-core
implements a common get_merge_sql
macro here, and the syntax for Spark is roughly identical—it looks like Spark lets you use *
for update set
+ insert
, where other databases require an explicit list of columns. We're changing the names of a few of these macros in the 0.16.0 release of dbt
, so it probably makes more sense to connect this back to the core macros for the 0.16.0 release of the plugin.
Update: @beckjake has offered to rebase this, in time for a release next week. Let me know if that's okay by you, or if you'd prefer to see it through yourself. |
Additions/Fixes
merge
(Databricks) orinsert_overwrite
dbt-integration-tests all pass. I haven't added many unit tests at this stage, so this is something to work on.