diff --git a/.circleci/config.yml b/.circleci/config.yml index 0a1a3e1b2..8f0afa6ce 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -63,6 +63,9 @@ jobs: environment: DBT_INVOCATION_ENV: circle DBT_DATABRICKS_RETRY_ALL: True + DBT_TEST_USER_1: "buildbot+dbt_test_user_1@dbtlabs.com" + DBT_TEST_USER_2: "buildbot+dbt_test_user_2@dbtlabs.com" + DBT_TEST_USER_3: "buildbot+dbt_test_user_3@dbtlabs.com" docker: - image: fishtownanalytics/test-container:10 steps: @@ -78,6 +81,9 @@ jobs: environment: DBT_INVOCATION_ENV: circle ODBC_DRIVER: Simba # TODO: move env var to Docker image + DBT_TEST_USER_1: "buildbot+dbt_test_user_1@dbtlabs.com" + DBT_TEST_USER_2: "buildbot+dbt_test_user_2@dbtlabs.com" + DBT_TEST_USER_3: "buildbot+dbt_test_user_3@dbtlabs.com" docker: # image based on `fishtownanalytics/test-container` w/ Simba ODBC Spark driver installed - image: 828731156495.dkr.ecr.us-east-1.amazonaws.com/dbt-spark-odbc-test-container:latest diff --git a/CHANGELOG.md b/CHANGELOG.md index 36958eff3..1abed6ec9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ - Incremental materialization updated to not drop table first if full refresh for delta lake format, as it already runs _create or replace table_ ([#286](https://github.com/dbt-labs/dbt-spark/issues/286), [#287](https://github.com/dbt-labs/dbt-spark/pull/287/)) - Apache Spark version upgraded to 3.1.1 ([#348](https://github.com/dbt-labs/dbt-spark/issues/348), [#349](https://github.com/dbt-labs/dbt-spark/pull/349)) +### Features +- Add grants to materializations ([#366](https://github.com/dbt-labs/dbt-spark/issues/366), [#381](https://github.com/dbt-labs/dbt-spark/pull/381)) + ### Under the hood - Update `SparkColumn.numeric_type` to return `decimal` instead of `numeric`, since SparkSQL exclusively supports the former ([#380](https://github.com/dbt-labs/dbt-spark/pull/380)) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 699eca9d2..3fb9978d8 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -380,6 +380,23 @@ def run_sql_for_tests(self, sql, fetch, conn): finally: conn.transaction_open = False + def standardize_grants_dict(self, grants_table: agate.Table) -> dict: + grants_dict: Dict[str, List[str]] = {} + for row in grants_table: + grantee = row["Principal"] + privilege = row["ActionType"] + object_type = row["ObjectType"] + + # we only want to consider grants on this object + # (view or table both appear as 'TABLE') + # and we don't want to consider the OWN privilege + if object_type == "TABLE" and privilege != "OWN": + if privilege in grants_dict.keys(): + grants_dict[privilege].append(grantee) + else: + grants_dict.update({privilege: [grantee]}) + return grants_dict + # spark does something interesting with joins when both tables have the same # static values for the join condition and complains that the join condition is diff --git a/dbt/include/spark/macros/apply_grants.sql b/dbt/include/spark/macros/apply_grants.sql new file mode 100644 index 000000000..49dae95dc --- /dev/null +++ b/dbt/include/spark/macros/apply_grants.sql @@ -0,0 +1,39 @@ +{% macro spark__copy_grants() %} + + {% if config.materialized == 'view' %} + {#-- Spark views don't copy grants when they're replaced --#} + {{ return(False) }} + + {% else %} + {#-- This depends on how we're replacing the table, which depends on its file format + -- Just play it safe by assuming that grants have been copied over, and need to be checked / possibly revoked + -- We can make this more efficient in the future + #} + {{ return(True) }} + + {% endif %} +{% endmacro %} + + +{%- macro spark__get_grant_sql(relation, privilege, grantees) -%} + grant {{ privilege }} on {{ relation }} to {{ adapter.quote(grantees[0]) }} +{%- endmacro %} + + +{%- macro spark__get_revoke_sql(relation, privilege, grantees) -%} + revoke {{ privilege }} on {{ relation }} from {{ adapter.quote(grantees[0]) }} +{%- endmacro %} + + +{%- macro spark__support_multiple_grantees_per_dcl_statement() -%} + {{ return(False) }} +{%- endmacro -%} + + +{% macro spark__call_dcl_statements(dcl_statement_list) %} + {% for dcl_statement in dcl_statement_list %} + {% call statement('grant_or_revoke') %} + {{ dcl_statement }} + {% endcall %} + {% endfor %} +{% endmacro %} diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index 99cd31db1..b80510b71 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -3,6 +3,7 @@ {#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#} {%- set raw_file_format = config.get('file_format', default='parquet') -%} {%- set raw_strategy = config.get('incremental_strategy', default='append') -%} + {%- set grant_config = config.get('grants') -%} {%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%} {%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} @@ -45,6 +46,9 @@ {{ build_sql }} {%- endcall -%} + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} + {% do apply_grants(target_relation, grant_config, should_revoke) %} + {% do persist_docs(target_relation, model) %} {{ run_hooks(post_hooks) }} diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index 9c891ef04..a5304682e 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -75,6 +75,7 @@ {%- set strategy_name = config.get('strategy') -%} {%- set unique_key = config.get('unique_key') %} {%- set file_format = config.get('file_format', 'parquet') -%} + {%- set grant_config = config.get('grants') -%} {% set target_relation_exists, target_relation = get_or_create_relation( database=none, @@ -163,6 +164,9 @@ {{ final_sql }} {% endcall %} + {% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode) %} + {% do apply_grants(target_relation, grant_config, should_revoke) %} + {% do persist_docs(target_relation, model) %} {{ run_hooks(post_hooks, inside_transaction=True) }} diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index 2eeb806fd..3462d3332 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -1,6 +1,7 @@ {% materialization table, adapter = 'spark' %} {%- set identifier = model['alias'] -%} + {%- set grant_config = config.get('grants') -%} {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} {%- set target_relation = api.Relation.create(identifier=identifier, @@ -22,6 +23,9 @@ {{ create_table_as(False, target_relation, sql) }} {%- endcall %} + {% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke) %} + {% do persist_docs(target_relation, model) %} {{ run_hooks(post_hooks) }} diff --git a/test.env.example b/test.env.example index bf4cf2eee..e69f700b7 100644 --- a/test.env.example +++ b/test.env.example @@ -8,3 +8,8 @@ DBT_DATABRICKS_HOST_NAME= DBT_DATABRICKS_TOKEN= # file path to local ODBC driver ODBC_DRIVER= + +# users for testing 'grants' functionality +DBT_TEST_USER_1= +DBT_TEST_USER_2= +DBT_TEST_USER_3= diff --git a/tests/conftest.py b/tests/conftest.py index 0c624713c..0771566b7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -108,4 +108,4 @@ def skip_by_profile_type(request): if request.node.get_closest_marker("skip_profile"): for skip_profile_type in request.node.get_closest_marker("skip_profile").args: if skip_profile_type == profile_type: - pytest.skip("skipped on '{profile_type}' profile") + pytest.skip(f"skipped on '{profile_type}' profile") diff --git a/tests/functional/adapter/test_grants.py b/tests/functional/adapter/test_grants.py new file mode 100644 index 000000000..8e0341df6 --- /dev/null +++ b/tests/functional/adapter/test_grants.py @@ -0,0 +1,60 @@ +import pytest +from dbt.tests.adapter.grants.test_model_grants import BaseModelGrants +from dbt.tests.adapter.grants.test_incremental_grants import BaseIncrementalGrants +from dbt.tests.adapter.grants.test_invalid_grants import BaseInvalidGrants +from dbt.tests.adapter.grants.test_seed_grants import BaseSeedGrants +from dbt.tests.adapter.grants.test_snapshot_grants import BaseSnapshotGrants + + +@pytest.mark.skip_profile("apache_spark", "spark_session") +class TestModelGrantsSpark(BaseModelGrants): + def privilege_grantee_name_overrides(self): + # insert --> modify + return { + "select": "select", + "insert": "modify", + "fake_privilege": "fake_privilege", + "invalid_user": "invalid_user", + } + + +@pytest.mark.skip_profile("apache_spark", "spark_session") +class TestIncrementalGrantsSpark(BaseIncrementalGrants): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+file_format": "delta", + "+incremental_strategy": "merge", + } + } + + +@pytest.mark.skip_profile("apache_spark", "spark_session") +class TestSeedGrantsSpark(BaseSeedGrants): + # seeds in dbt-spark are currently "full refreshed," in such a way that + # the grants are not carried over + # see https://github.com/dbt-labs/dbt-spark/issues/388 + def seeds_support_partial_refresh(self): + return False + + +@pytest.mark.skip_profile("apache_spark", "spark_session") +class TestSnapshotGrantsSpark(BaseSnapshotGrants): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "+file_format": "delta", + "+incremental_strategy": "merge", + } + } + + +@pytest.mark.skip_profile("apache_spark", "spark_session") +class TestInvalidGrantsSpark(BaseInvalidGrants): + def grantee_does_not_exist_error(self): + return "RESOURCE_DOES_NOT_EXIST" + + def privilege_does_not_exist_error(self): + return "Action Unknown"