Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snowflake): add missing pushdown_deny_usernames config to be used when use_queries_v2 #12527

Merged

Conversation

sgomezvillamor
Copy link
Contributor

@sgomezvillamor sgomezvillamor commented Jan 31, 2025

The legacy SnowflakeQueriesSource inherits from SnowflakeQueriesExtractorConfig and so exposes the config param

class SnowflakeQueriesSourceConfig(
SnowflakeQueriesExtractorConfig, SnowflakeIdentifierConfig, SnowflakeFilterConfig
):

and then, uses it

def fetch_query_log(
self, users: UsersMapping
) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap]]:
query_log_query = _build_enriched_query_log_query(
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
bucket_duration=self.config.window.bucket_duration,
deny_usernames=self.config.pushdown_deny_usernames,
)

However, the SnowflakeV2Source doesn't

class SnowflakeV2Config(
SnowflakeConfig,
SnowflakeUsageConfig,
StatefulLineageConfigMixin,
StatefulUsageConfigMixin,
StatefulProfilingConfigMixin,
ClassificationSourceConfigMixin,
IncrementalPropertiesConfigMixin,
):

and so the proposal in this PR is to define the config param so it can be passed accordingly when instantiating the SnowflakeQueriesExtractor

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Jan 31, 2025
Copy link

codecov bot commented Jan 31, 2025

❌ 5 Tests Failed:

Tests completed Failed Passed Skipped
1995 5 1990 40
View the top 3 failed tests by shortest run time
tests.integration.kafka-connect.test_kafka_connect::test_kafka_connect_ingest_stateful
Stack Traces | 0.001s run time
kafka_connect_runner = Services(_docker_compose=DockerComposeExecutor(_compose_command='docker compose', _compose_files=['.../runner/work/d...tegration/kafka-connect/docker-compose.override.yml'], _compose_project_name='pytest4874-kafka-connect'), _services={})

    @pytest.fixture(scope="module")
    def loaded_kafka_connect(kafka_connect_runner):
        # # Setup mongo cluster
        command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-init.js"
        ret = subprocess.run(command, shell=True, capture_output=True)
        assert ret.returncode == 0
    
        # Creating MySQL source with no transformations , only topic prefix
        r = requests.post(
            KAFKA_CONNECT_ENDPOINT,
            headers={"Content-Type": "application/json"},
            data="""{
                "name": "mysql_source1",
                "config": {
                    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                    "mode": "incrementing",
                    "incrementing.column.name": "id",
                    "topic.prefix": "test-mysql-jdbc-",
                    "tasks.max": "1",
                    "connection.url": "${env:MYSQL_CONNECTION_URL}"
                }
            }
            """,
        )
>       assert r.status_code == 201  # Created
E       assert 500 == 201
E        +  where 500 = <Response [500]>.status_code

.../integration/kafka-connect/test_kafka_connect.py:111: AssertionError
tests.integration.kafka-connect.test_kafka_connect::test_kafka_connect_mongosourceconnect_ingest
Stack Traces | 0.001s run time
kafka_connect_runner = Services(_docker_compose=DockerComposeExecutor(_compose_command='docker compose', _compose_files=['.../runner/work/d...tegration/kafka-connect/docker-compose.override.yml'], _compose_project_name='pytest4874-kafka-connect'), _services={})

    @pytest.fixture(scope="module")
    def loaded_kafka_connect(kafka_connect_runner):
        # # Setup mongo cluster
        command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-init.js"
        ret = subprocess.run(command, shell=True, capture_output=True)
        assert ret.returncode == 0
    
        # Creating MySQL source with no transformations , only topic prefix
        r = requests.post(
            KAFKA_CONNECT_ENDPOINT,
            headers={"Content-Type": "application/json"},
            data="""{
                "name": "mysql_source1",
                "config": {
                    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                    "mode": "incrementing",
                    "incrementing.column.name": "id",
                    "topic.prefix": "test-mysql-jdbc-",
                    "tasks.max": "1",
                    "connection.url": "${env:MYSQL_CONNECTION_URL}"
                }
            }
            """,
        )
>       assert r.status_code == 201  # Created
E       assert 500 == 201
E        +  where 500 = <Response [500]>.status_code

.../integration/kafka-connect/test_kafka_connect.py:111: AssertionError
tests.integration.kafka-connect.test_kafka_connect::test_kafka_connect_s3sink_ingest
Stack Traces | 0.001s run time
kafka_connect_runner = Services(_docker_compose=DockerComposeExecutor(_compose_command='docker compose', _compose_files=['.../runner/work/d...tegration/kafka-connect/docker-compose.override.yml'], _compose_project_name='pytest4874-kafka-connect'), _services={})

    @pytest.fixture(scope="module")
    def loaded_kafka_connect(kafka_connect_runner):
        # # Setup mongo cluster
        command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-init.js"
        ret = subprocess.run(command, shell=True, capture_output=True)
        assert ret.returncode == 0
    
        # Creating MySQL source with no transformations , only topic prefix
        r = requests.post(
            KAFKA_CONNECT_ENDPOINT,
            headers={"Content-Type": "application/json"},
            data="""{
                "name": "mysql_source1",
                "config": {
                    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                    "mode": "incrementing",
                    "incrementing.column.name": "id",
                    "topic.prefix": "test-mysql-jdbc-",
                    "tasks.max": "1",
                    "connection.url": "${env:MYSQL_CONNECTION_URL}"
                }
            }
            """,
        )
>       assert r.status_code == 201  # Created
E       assert 500 == 201
E        +  where 500 = <Response [500]>.status_code

.../integration/kafka-connect/test_kafka_connect.py:111: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📢 Thoughts on this report? Let us know!

@@ -308,6 +308,13 @@ class SnowflakeV2Config(
" assertions CLI in snowflake",
)

pushdown_deny_usernames: List[str] = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should think about how we can avoid needing to specify every config twice - not sure if there's a good way to do it though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While everything is possible, those refactors will be easier when we remove the code base for queries v1.

@sgomezvillamor sgomezvillamor marked this pull request as ready for review February 3, 2025 07:42
@sgomezvillamor
Copy link
Contributor Author

Some CI errors about kafka connect, so unrelated to this PR. Merging.

@sgomezvillamor sgomezvillamor merged commit ffc98da into master Feb 3, 2025
208 of 217 checks passed
@sgomezvillamor sgomezvillamor deleted the feat-snowflake-queriesv2-pushdown-deny-usernames branch February 3, 2025 08:54
eagle-25 pushed a commit to eagle-25/datahub that referenced this pull request Feb 12, 2025
eagle-25 pushed a commit to eagle-25/datahub that referenced this pull request Feb 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata pending-submitter-merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants