Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add apply_grants call to materialization macros #381

Merged
merged 18 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ jobs:
environment:
DBT_INVOCATION_ENV: circle
DBT_DATABRICKS_RETRY_ALL: True
DBT_TEST_USER_1: "buildbot+dbt_test_user_1@dbtlabs.com"
DBT_TEST_USER_2: "buildbot+dbt_test_user_2@dbtlabs.com"
DBT_TEST_USER_3: "buildbot+dbt_test_user_3@dbtlabs.com"
docker:
- image: fishtownanalytics/test-container:10
steps:
Expand All @@ -78,6 +81,9 @@ jobs:
environment:
DBT_INVOCATION_ENV: circle
ODBC_DRIVER: Simba # TODO: move env var to Docker image
DBT_TEST_USER_1: "buildbot+dbt_test_user_1@dbtlabs.com"
DBT_TEST_USER_2: "buildbot+dbt_test_user_2@dbtlabs.com"
DBT_TEST_USER_3: "buildbot+dbt_test_user_3@dbtlabs.com"
docker:
# image based on `fishtownanalytics/test-container` w/ Simba ODBC Spark driver installed
- image: 828731156495.dkr.ecr.us-east-1.amazonaws.com/dbt-spark-odbc-test-container:latest
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
- Incremental materialization updated to not drop table first if full refresh for delta lake format, as it already runs _create or replace table_ ([#286](https://github.com/dbt-labs/dbt-spark/issues/286), [#287](https://github.com/dbt-labs/dbt-spark/pull/287/))
- Apache Spark version upgraded to 3.1.1 ([#348](https://github.com/dbt-labs/dbt-spark/issues/348), [#349](https://github.com/dbt-labs/dbt-spark/pull/349))

### Features
- Add grants to materializations ([#366](https://github.com/dbt-labs/dbt-spark/issues/366), [#381](https://github.com/dbt-labs/dbt-spark/pull/381))

### Under the hood
- Update `SparkColumn.numeric_type` to return `decimal` instead of `numeric`, since SparkSQL exclusively supports the former ([#380](https://github.com/dbt-labs/dbt-spark/pull/380))

Expand Down
17 changes: 17 additions & 0 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,23 @@ def run_sql_for_tests(self, sql, fetch, conn):
finally:
conn.transaction_open = False

def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
grants_dict: Dict[str, List[str]] = {}
for row in grants_table:
grantee = row["Principal"]
privilege = row["ActionType"]
object_type = row["ObjectType"]

# we only want to consider grants on this object
# (view or table both appear as 'TABLE')
# and we don't want to consider the OWN privilege
if object_type == "TABLE" and privilege != "OWN":
if privilege in grants_dict.keys():
grants_dict[privilege].append(grantee)
else:
grants_dict.update({privilege: [grantee]})
return grants_dict


# spark does something interesting with joins when both tables have the same
# static values for the join condition and complains that the join condition is
Expand Down
39 changes: 39 additions & 0 deletions dbt/include/spark/macros/apply_grants.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{% macro spark__copy_grants() %}

{% if config.materialized == 'view' %}
{#-- Spark views don't copy grants when they're replaced --#}
{{ return(False) }}

{% else %}
{#-- This depends on how we're replacing the table, which depends on its file format
-- Just play it safe by assuming that grants have been copied over, and need to be checked / possibly revoked
-- We can make this more efficient in the future
#}
{{ return(True) }}

{% endif %}
{% endmacro %}


{%- macro spark__get_grant_sql(relation, privilege, grantees) -%}
grant {{ privilege }} on {{ relation }} to {{ adapter.quote(grantees[0]) }}
{%- endmacro %}


{%- macro spark__get_revoke_sql(relation, privilege, grantees) -%}
revoke {{ privilege }} on {{ relation }} from {{ adapter.quote(grantees[0]) }}
{%- endmacro %}


{%- macro spark__support_multiple_grantees_per_dcl_statement() -%}
{{ return(False) }}
{%- endmacro -%}


{% macro spark__call_dcl_statements(dcl_statement_list) %}
{% for dcl_statement in dcl_statement_list %}
{% call statement('grant_or_revoke') %}
{{ dcl_statement }}
{% endcall %}
{% endfor %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#}
{%- set raw_file_format = config.get('file_format', default='parquet') -%}
{%- set raw_strategy = config.get('incremental_strategy', default='append') -%}
{%- set grant_config = config.get('grants') -%}

{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}
Expand Down Expand Up @@ -45,6 +46,9 @@
{{ build_sql }}
{%- endcall -%}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks) }}
Expand Down
4 changes: 4 additions & 0 deletions dbt/include/spark/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}
{%- set file_format = config.get('file_format', 'parquet') -%}
{%- set grant_config = config.get('grants') -%}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=none,
Expand Down Expand Up @@ -163,6 +164,9 @@
{{ final_sql }}
{% endcall %}

{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}

{% do persist_docs(target_relation, model) %}
McKnight-42 marked this conversation as resolved.
Show resolved Hide resolved

{{ run_hooks(post_hooks, inside_transaction=True) }}
Expand Down
4 changes: 4 additions & 0 deletions dbt/include/spark/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{% materialization table, adapter = 'spark' %}

{%- set identifier = model['alias'] -%}
{%- set grant_config = config.get('grants') -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
Expand All @@ -22,6 +23,9 @@
{{ create_table_as(False, target_relation, sql) }}
{%- endcall %}

{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks) }}
Expand Down
5 changes: 5 additions & 0 deletions test.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ DBT_DATABRICKS_HOST_NAME=
DBT_DATABRICKS_TOKEN=
# file path to local ODBC driver
ODBC_DRIVER=

# users for testing 'grants' functionality
DBT_TEST_USER_1=
DBT_TEST_USER_2=
DBT_TEST_USER_3=
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,4 @@ def skip_by_profile_type(request):
if request.node.get_closest_marker("skip_profile"):
for skip_profile_type in request.node.get_closest_marker("skip_profile").args:
if skip_profile_type == profile_type:
pytest.skip("skipped on '{profile_type}' profile")
pytest.skip(f"skipped on '{profile_type}' profile")
60 changes: 60 additions & 0 deletions tests/functional/adapter/test_grants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pytest
from dbt.tests.adapter.grants.test_model_grants import BaseModelGrants
from dbt.tests.adapter.grants.test_incremental_grants import BaseIncrementalGrants
from dbt.tests.adapter.grants.test_invalid_grants import BaseInvalidGrants
from dbt.tests.adapter.grants.test_seed_grants import BaseSeedGrants
from dbt.tests.adapter.grants.test_snapshot_grants import BaseSnapshotGrants


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestModelGrantsSpark(BaseModelGrants):
def privilege_grantee_name_overrides(self):
# insert --> modify
return {
"select": "select",
"insert": "modify",
"fake_privilege": "fake_privilege",
"invalid_user": "invalid_user",
}


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestIncrementalGrantsSpark(BaseIncrementalGrants):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+file_format": "delta",
"+incremental_strategy": "merge",
}
}


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestSeedGrantsSpark(BaseSeedGrants):
# seeds in dbt-spark are currently "full refreshed," in such a way that
# the grants are not carried over
# see https://github.com/dbt-labs/dbt-spark/issues/388
def seeds_support_partial_refresh(self):
return False


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestSnapshotGrantsSpark(BaseSnapshotGrants):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"snapshots": {
"+file_format": "delta",
"+incremental_strategy": "merge",
}
}


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestInvalidGrantsSpark(BaseInvalidGrants):
def grantee_does_not_exist_error(self):
return "RESOURCE_DOES_NOT_EXIST"

def privilege_does_not_exist_error(self):
return "Action Unknown"