Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Improve logging and display of information #34

Merged
merged 8 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ requests==2.28.1
tomli==2.0.1
typing_extensions==4.4.0
urllib3==1.26.12
croniter==1.3.7
croniter==1.3.7
rich==12.6.0
59 changes: 59 additions & 0 deletions src/changeset/change_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Optional
from pydantic import BaseModel
import string
from rich.table import Table


class Change(BaseModel):
"""Describes what a given change is and hot to apply it."""

identifier: str
type: str
action: str
sync_function: object
parameters: dict

def __str__(self):
return f"{self.action.upper()} {string.capwords(self.type)} {self.identifier}"

def apply(self):
self.sync_function(**self.parameters)


class ChangeSet(BaseModel):
"""Store the set of changes to be displayed or applied."""

__root__: Optional[list[Change]] = []

def __iter__(self):
return iter(self.__root__)

def append(self, change: Change):
self.__root__.append(change)

def __str__(self):
list_str = [str(change) for change in self.__root__]
return "\n".join(list_str)

def to_table(self) -> Table:
"""Return a table representation of the changeset."""

table = Table(title="Changes detected")

table.add_column("Action", style="cyan", no_wrap=True)
table.add_column("Type", style="magenta")
table.add_column("ID", style="green")

for change in self.__root__:
table.add_row(
change.action.upper(), string.capwords(change.type), change.identifier
)

return table
Comment on lines +38 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is beautiful! Excellent use of rich.


def __len__(self):
return len(self.__root__)

def apply(self):
for change in self.__root__:
change.apply()
48 changes: 12 additions & 36 deletions src/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
CustomEnvironmentVariablePayload,
)
from schemas.job import JobDefinition
from schemas import check_env_var_same


class DBTCloud:
Expand Down Expand Up @@ -65,7 +66,7 @@ def update_job(self, job: JobDefinition) -> JobDefinition:
if response.status_code >= 400:
logger.error(response.json())

logger.success("Updated successfully.")
logger.success("Job updated successfully.")

return JobDefinition(**(response.json()["data"]), identifier=job.identifier)

Expand All @@ -83,7 +84,7 @@ def create_job(self, job: JobDefinition) -> JobDefinition:
if response.status_code >= 400:
logger.error(response.json())

logger.success("Created successfully.")
logger.success("Job created successfully.")

return JobDefinition(**(response.json()["data"]), identifier=job.identifier)

Expand All @@ -100,7 +101,7 @@ def delete_job(self, job: JobDefinition) -> None:
if response.status_code >= 400:
logger.error(response.json())

logger.warning("Deleted successfully.")
logger.success("Job deleted successfully.")

def get_jobs(self) -> List[JobDefinition]:
"""Return a list of Jobs for all the dbt Cloud jobs in an environment."""
Expand Down Expand Up @@ -206,41 +207,16 @@ def create_env_var(
return response.json()["data"]

def update_env_var(
self, custom_env_var: CustomEnvironmentVariable, project_id: int, job_id: int
) -> Optional[CustomEnvironmentVariablePayload]:
self, custom_env_var: CustomEnvironmentVariable, project_id: int, job_id: int, env_var_id: int, yml_job_identifier: str = None) -> Optional[CustomEnvironmentVariablePayload]:
"""Update env vars job overwrite in dbt Cloud."""

self._check_for_creds()

all_env_vars = self.get_env_vars(project_id, job_id)

if custom_env_var.name not in all_env_vars:
raise Exception(
f"Custom environment variable {custom_env_var.name} not found in dbt Cloud, "
f"you need to create it first."
)

env_var_id: Optional[int]

# TODO: Move this logic out of the client layer, and move it into
# at least one layer higher up. We want the dbt Cloud client to be
# as naive as possible.
if custom_env_var.name not in all_env_vars:
return self.create_env_var(
CustomEnvironmentVariablePayload(
account_id=self.account_id,
project_id=project_id,
**custom_env_var.dict(),
)
)

if all_env_vars[custom_env_var.name].value == custom_env_var.value:
logger.debug(
f"The env var {custom_env_var.name} is already up to date for the job {job_id}."
)
return None

env_var_id: int = all_env_vars[custom_env_var.name].id
# handle the case where the job was not created when we queued the function call
if yml_job_identifier and not job_id:
mapping_job_identifier_job_id = self.build_mapping_job_identifier_job_id()
job_id = mapping_job_identifier_job_id[yml_job_identifier]
custom_env_var.job_definition_id = job_id

# the endpoint is different for updating an overwrite vs creating one
if env_var_id:
Expand All @@ -266,7 +242,7 @@ def update_env_var(

self._clear_env_var_cache(job_definition_id=payload.job_definition_id)

logger.info(f"Updated the env_var {custom_env_var.name} for job {job_id}")
logger.success(f"Updated the env_var {custom_env_var.name} for job {job_id}")
return CustomEnvironmentVariablePayload(**(response.json()["data"]))

def delete_env_var(self, project_id: int, env_var_id: int) -> None:
Expand All @@ -282,7 +258,7 @@ def delete_env_var(self, project_id: int, env_var_id: int) -> None:
if response.status_code >= 400:
logger.error(response.json())

logger.warning("Deleted successfully.")
logger.success("Env Var Job Overwrite deleted successfully.")

def get_environments(self) -> Dict:
"""Return a list of Environments for all the dbt Cloud jobs in an account"""
Expand Down
Loading