Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Updates DbtCloudCredentials to implement CredentialsBlock interface #109

Merged
merged 5 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `DbtCloudMetadataClient` and `get_metadata_client` method to `DbtCloudCredentials` to enable interaction with the dbt Cloud metadata API - [#109](https://github.com/PrefectHQ/prefect-dbt/pull/109)
- Added `get_client` method to `DbtCloudCredentials` - [#109](https://github.com/PrefectHQ/prefect-dbt/pull/109)

### Changed

### Deprecated
Expand Down
46 changes: 45 additions & 1 deletion prefect_dbt/cloud/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import prefect
from httpx import AsyncClient, Response
from sgqlc.endpoint.http import HTTPEndpoint
from typing_extensions import Literal

from prefect_dbt.cloud.models import TriggerJobRunOptions
Expand All @@ -18,7 +19,7 @@ class DbtCloudAdministrativeClient:
domain: Domain at which the dbt Cloud API is hosted.
"""

def __init__(self, api_key: str, account_id: int, domain: str):
def __init__(self, api_key: str, account_id: int, domain: str = "cloud.getdbt.com"):
self._closed = False
self._started = False

Expand Down Expand Up @@ -188,3 +189,46 @@ async def __aenter__(self):
async def __aexit__(self, *exc):
self._closed = True
await self._admin_client.__aexit__()


class DbtCloudMetadataClient:
"""
Client for interacting with the dbt cloud Administrative API.

Args:
api_key: API key to authenticate with the dbt Cloud administrative API.
account_id: ID of dbt Cloud account with which to interact.
domain: Domain at which the dbt Cloud API is hosted.
"""

def __init__(self, api_key: str, domain: str = "metadata.cloud.getdbt.com"):
self._http_endpoint = HTTPEndpoint(
base_headers={
"Authorization": f"Bearer {api_key}",
"user-agent": f"prefect-{prefect.__version__}",
"content-type": "application/json",
},
url=f"https://{domain}/graphql",
)

def query(
self,
query: str,
variables: Optional[Dict] = None,
operation_name: Optional[str] = None,
) -> Dict[str, Any]:
"""
Run a GraphQL query against the dbt Cloud metadata API.

Args:
query: The GraphQL query to run.
variables: The values of any variables defined in the GraphQL query.
operation_name: The name of the operation to run if multiple operations
are defined in the provided query.

Returns:
The result of the GraphQL query.
"""
return self._http_endpoint(
query=query, variables=variables, operation_name=operation_name
)
130 changes: 121 additions & 9 deletions prefect_dbt/cloud/credentials.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""Module containing credentials for interacting with dbt Cloud"""
from prefect.blocks.core import Block
from pydantic import SecretStr
from typing import Union

from prefect_dbt.cloud.clients import DbtCloudAdministrativeClient
from prefect.blocks.abstract import CredentialsBlock
from pydantic import Field, SecretStr
from typing_extensions import Literal

from prefect_dbt.cloud.clients import (
DbtCloudAdministrativeClient,
DbtCloudMetadataClient,
)

class DbtCloudCredentials(Block):

class DbtCloudCredentials(CredentialsBlock):
"""
Credentials block for credential use across dbt Cloud tasks and flows.

Expand Down Expand Up @@ -55,17 +61,123 @@ def trigger_dbt_cloud_job_run_flow():
_block_type_name = "dbt Cloud Credentials"
_logo_url = "https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250" # noqa

api_key: SecretStr
account_id: int
domain: str = "cloud.getdbt.com"

def get_administrative_client(self):
api_key: SecretStr = Field(
default=...,
title="API Key",
description="A dbt Cloud API key to use for authentication.",
)
account_id: int = Field(
default=..., title="Account ID", description="The ID of your dbt Cloud account."
)
domain: str = Field(
default="cloud.getdbt.com",
description="The base domain of your dbt Cloud instance.",
)

def get_administrative_client(self) -> DbtCloudAdministrativeClient:
"""
Returns a newly instantiated client for working with the dbt Cloud
administrative API.

Returns:
An authenticated dbt Cloud administrative API client.
"""
return DbtCloudAdministrativeClient(
api_key=self.api_key.get_secret_value(),
account_id=self.account_id,
domain=self.domain,
)

def get_metadata_client(self) -> DbtCloudMetadataClient:
"""
Returns a newly instantiated client for working with the dbt Cloud
metadata API.

Example:
Sending queries via the returned metadata client:
```python
from prefect_dbt import DbtCloudCredentials

credentials_block = DbtCloudCredentials.load("test-account")
metadata_client = credentials_block.get_metadata_client()
query = \"\"\"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the \ needed here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, need to escape the quotes so that the docstring continues.

{
metrics(jobId: 123) {
uniqueId
name
packageName
tags
label
runId
description
type
sql
timestamp
timeGrains
dimensions
meta
resourceType
filters {
field
operator
value
}
model {
name
}
}
}
\"\"\"
metadata_client.query(query)
# Result:
# {
# "data": {
# "metrics": [
# {
# "uniqueId": "metric.tpch.total_revenue",
# "name": "total_revenue",
# "packageName": "tpch",
# "tags": [],
# "label": "Total Revenue ($)",
# "runId": 108952046,
# "description": "",
# "type": "sum",
# "sql": "net_item_sales_amount",
# "timestamp": "order_date",
# "timeGrains": ["day", "week", "month"],
# "dimensions": ["status_code", "priority_code"],
# "meta": {},
# "resourceType": "metric",
# "filters": [],
# "model": { "name": "fct_orders" }
# }
# ]
# }
# }
```

Returns:
An authenticated dbt Cloud metadata API client.
"""
return DbtCloudMetadataClient(
api_key=self.api_key.get_secret_value(),
domain=f"metadata.{self.domain}",
)

def get_client(
self, client_type: Literal["administrative", "metadata"]
) -> Union[DbtCloudAdministrativeClient, DbtCloudMetadataClient]:
"""
Returns a newly instantiated client for working with the dbt Cloud API.

Args:
client_type: Type of client to return. Accepts either 'administrative'
or 'metadata'.

Returns:
The authenticated client of the requested type.
"""
get_client_method = getattr(self, f"get_{client_type}_client", None)
if get_client_method is None:
raise ValueError(f"'{client_type}' is not a supported client type.")
return get_client_method()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
prefect>=2.7.2
prefect_shell>=0.1.0
sgqlc>=16.0.0
67 changes: 67 additions & 0 deletions tests/cloud/test_clients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import json
from unittest.mock import MagicMock

from prefect_dbt.cloud.clients import DbtCloudMetadataClient


def test_metadata_client_query(monkeypatch):
mock_response = {
"data": {
"metrics": [
{
"uniqueId": "metric.tpch.total_revenue",
"name": "total_revenue",
"packageName": "tpch",
"tags": [],
"label": "Total Revenue ($)",
"runId": 108952046,
"description": "",
"type": "sum",
"sql": "net_item_sales_amount",
"timestamp": "order_date",
"timeGrains": ["day", "week", "month"],
"dimensions": ["status_code", "priority_code"],
"meta": {},
"resourceType": "metric",
"filters": [],
"model": {"name": "fct_orders"},
}
]
}
}
urlopen_mock = MagicMock()
urlopen_mock.getcode.return_value = 200
urlopen_mock.return_value = urlopen_mock
urlopen_mock.read.return_value = json.dumps(mock_response).encode()
urlopen_mock.__enter__.return_value = urlopen_mock
monkeypatch.setattr("urllib.request.urlopen", urlopen_mock)
dbt_cloud_metadata_client = DbtCloudMetadataClient(api_key="my_api_key")
mock_query = """
{
metrics(jobId: 123) {
uniqueId
name
packageName
tags
label
runId
description
type
sql
timestamp
timeGrains
dimensions
meta
resourceType
filters {
field
operator
value
}
model {
name
}
}
}
"""
assert dbt_cloud_metadata_client.query(mock_query) == mock_response
35 changes: 35 additions & 0 deletions tests/cloud/test_cloud_credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest

from prefect_dbt import DbtCloudCredentials
from prefect_dbt.cloud.clients import (
DbtCloudAdministrativeClient,
DbtCloudMetadataClient,
)


@pytest.fixture
def dbt_cloud_credentials():
return DbtCloudCredentials(api_key="my_api_key", account_id=123456789)


def test_get_administrative_client(dbt_cloud_credentials: DbtCloudCredentials):
assert isinstance(
dbt_cloud_credentials.get_administrative_client(), DbtCloudAdministrativeClient
)


def test_get_metadata_client(dbt_cloud_credentials: DbtCloudCredentials):
assert isinstance(
dbt_cloud_credentials.get_metadata_client(), DbtCloudMetadataClient
)


def test_get_client(dbt_cloud_credentials: DbtCloudCredentials):
assert isinstance(
dbt_cloud_credentials.get_client("administrative"), DbtCloudAdministrativeClient
)
assert isinstance(
dbt_cloud_credentials.get_client("metadata"), DbtCloudMetadataClient
)
with pytest.raises(ValueError, match="'blorp' is not a supported client type"):
dbt_cloud_credentials.get_client("blorp")