Skip to content

Commit

Permalink
Add apply_grants call to materialization macros (dbt-labs#381)
Browse files Browse the repository at this point in the history
* Add apply_grants call to materialization macros

* add standardize_grants_dict

* Working grant macros

* Initialize tests in CI

* Refactor to account for core macro changes. Passing tests

* Fix code checks

* Try default__reset_csv_table

* Code checks

* Revert "Try default__reset_csv_table"

This reverts commit 8bd4145.

* Account for refactor in dbt-labs/dbt-core@c763601

* Account for test changes in dbt-labs/dbt-core@debc867

* add changelog

* Empty-Commit

* rerun ci

* rerun ci

* readd a persist_docs call to snapshot.sql

* fix whitespace

Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com>
Co-authored-by: Matthew McKnight <matthew.mcknight@dbtlabs.com>
  • Loading branch information
3 people authored and francescomucio committed Jul 26, 2022
1 parent af7fae3 commit f5db7e6
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ jobs:
environment:
DBT_INVOCATION_ENV: circle
DBT_DATABRICKS_RETRY_ALL: True
DBT_TEST_USER_1: "buildbot+dbt_test_user_1@dbtlabs.com"
DBT_TEST_USER_2: "buildbot+dbt_test_user_2@dbtlabs.com"
DBT_TEST_USER_3: "buildbot+dbt_test_user_3@dbtlabs.com"
docker:
- image: fishtownanalytics/test-container:10
steps:
Expand All @@ -78,6 +81,9 @@ jobs:
environment:
DBT_INVOCATION_ENV: circle
ODBC_DRIVER: Simba # TODO: move env var to Docker image
DBT_TEST_USER_1: "buildbot+dbt_test_user_1@dbtlabs.com"
DBT_TEST_USER_2: "buildbot+dbt_test_user_2@dbtlabs.com"
DBT_TEST_USER_3: "buildbot+dbt_test_user_3@dbtlabs.com"
docker:
# image based on `fishtownanalytics/test-container` w/ Simba ODBC Spark driver installed
- image: 828731156495.dkr.ecr.us-east-1.amazonaws.com/dbt-spark-odbc-test-container:latest
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
- Incremental materialization updated to not drop table first if full refresh for delta lake format, as it already runs _create or replace table_ ([#286](https://github.com/dbt-labs/dbt-spark/issues/286), [#287](https://github.com/dbt-labs/dbt-spark/pull/287/))
- Apache Spark version upgraded to 3.1.1 ([#348](https://github.com/dbt-labs/dbt-spark/issues/348), [#349](https://github.com/dbt-labs/dbt-spark/pull/349))

### Features
- Add grants to materializations ([#366](https://github.com/dbt-labs/dbt-spark/issues/366), [#381](https://github.com/dbt-labs/dbt-spark/pull/381))

### Under the hood
- Update `SparkColumn.numeric_type` to return `decimal` instead of `numeric`, since SparkSQL exclusively supports the former ([#380](https://github.com/dbt-labs/dbt-spark/pull/380))

Expand Down
17 changes: 17 additions & 0 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,23 @@ def run_sql_for_tests(self, sql, fetch, conn):
finally:
conn.transaction_open = False

def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
grants_dict: Dict[str, List[str]] = {}
for row in grants_table:
grantee = row["Principal"]
privilege = row["ActionType"]
object_type = row["ObjectType"]

# we only want to consider grants on this object
# (view or table both appear as 'TABLE')
# and we don't want to consider the OWN privilege
if object_type == "TABLE" and privilege != "OWN":
if privilege in grants_dict.keys():
grants_dict[privilege].append(grantee)
else:
grants_dict.update({privilege: [grantee]})
return grants_dict


# spark does something interesting with joins when both tables have the same
# static values for the join condition and complains that the join condition is
Expand Down
39 changes: 39 additions & 0 deletions dbt/include/spark/macros/apply_grants.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{% macro spark__copy_grants() %}

{% if config.materialized == 'view' %}
{#-- Spark views don't copy grants when they're replaced --#}
{{ return(False) }}

{% else %}
{#-- This depends on how we're replacing the table, which depends on its file format
-- Just play it safe by assuming that grants have been copied over, and need to be checked / possibly revoked
-- We can make this more efficient in the future
#}
{{ return(True) }}

{% endif %}
{% endmacro %}


{%- macro spark__get_grant_sql(relation, privilege, grantees) -%}
grant {{ privilege }} on {{ relation }} to {{ adapter.quote(grantees[0]) }}
{%- endmacro %}


{%- macro spark__get_revoke_sql(relation, privilege, grantees) -%}
revoke {{ privilege }} on {{ relation }} from {{ adapter.quote(grantees[0]) }}
{%- endmacro %}


{%- macro spark__support_multiple_grantees_per_dcl_statement() -%}
{{ return(False) }}
{%- endmacro -%}


{% macro spark__call_dcl_statements(dcl_statement_list) %}
{% for dcl_statement in dcl_statement_list %}
{% call statement('grant_or_revoke') %}
{{ dcl_statement }}
{% endcall %}
{% endfor %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#}
{%- set raw_file_format = config.get('file_format', default='parquet') -%}
{%- set raw_strategy = config.get('incremental_strategy', default='append') -%}
{%- set grant_config = config.get('grants') -%}

{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}
Expand Down Expand Up @@ -45,6 +46,9 @@
{{ build_sql }}
{%- endcall -%}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks) }}
Expand Down
4 changes: 4 additions & 0 deletions dbt/include/spark/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}
{%- set file_format = config.get('file_format', 'parquet') -%}
{%- set grant_config = config.get('grants') -%}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=none,
Expand Down Expand Up @@ -163,6 +164,9 @@
{{ final_sql }}
{% endcall %}

{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}
Expand Down
4 changes: 4 additions & 0 deletions dbt/include/spark/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{% materialization table, adapter = 'spark' %}

{%- set identifier = model['alias'] -%}
{%- set grant_config = config.get('grants') -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
Expand All @@ -22,6 +23,9 @@
{{ create_table_as(False, target_relation, sql) }}
{%- endcall %}

{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks) }}
Expand Down
5 changes: 5 additions & 0 deletions test.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ DBT_DATABRICKS_HOST_NAME=
DBT_DATABRICKS_TOKEN=
# file path to local ODBC driver
ODBC_DRIVER=

# users for testing 'grants' functionality
DBT_TEST_USER_1=
DBT_TEST_USER_2=
DBT_TEST_USER_3=
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,4 @@ def skip_by_profile_type(request):
if request.node.get_closest_marker("skip_profile"):
for skip_profile_type in request.node.get_closest_marker("skip_profile").args:
if skip_profile_type == profile_type:
pytest.skip("skipped on '{profile_type}' profile")
pytest.skip(f"skipped on '{profile_type}' profile")
60 changes: 60 additions & 0 deletions tests/functional/adapter/test_grants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pytest
from dbt.tests.adapter.grants.test_model_grants import BaseModelGrants
from dbt.tests.adapter.grants.test_incremental_grants import BaseIncrementalGrants
from dbt.tests.adapter.grants.test_invalid_grants import BaseInvalidGrants
from dbt.tests.adapter.grants.test_seed_grants import BaseSeedGrants
from dbt.tests.adapter.grants.test_snapshot_grants import BaseSnapshotGrants


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestModelGrantsSpark(BaseModelGrants):
def privilege_grantee_name_overrides(self):
# insert --> modify
return {
"select": "select",
"insert": "modify",
"fake_privilege": "fake_privilege",
"invalid_user": "invalid_user",
}


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestIncrementalGrantsSpark(BaseIncrementalGrants):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+file_format": "delta",
"+incremental_strategy": "merge",
}
}


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestSeedGrantsSpark(BaseSeedGrants):
# seeds in dbt-spark are currently "full refreshed," in such a way that
# the grants are not carried over
# see https://github.com/dbt-labs/dbt-spark/issues/388
def seeds_support_partial_refresh(self):
return False


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestSnapshotGrantsSpark(BaseSnapshotGrants):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"snapshots": {
"+file_format": "delta",
"+incremental_strategy": "merge",
}
}


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestInvalidGrantsSpark(BaseInvalidGrants):
def grantee_does_not_exist_error(self):
return "RESOURCE_DOES_NOT_EXIST"

def privilege_does_not_exist_error(self):
return "Action Unknown"

0 comments on commit f5db7e6

Please sign in to comment.