Skip to content

Commit

Permalink
Merge pull request #104 from astronomer/dev
Browse files Browse the repository at this point in the history
v2.0.4
  • Loading branch information
fritz-astronomer authored Jun 7, 2024
2 parents cb97bbc + 61b6836 commit d216a76
Show file tree
Hide file tree
Showing 25 changed files with 1,354 additions and 211 deletions.
2 changes: 1 addition & 1 deletion astronomer_starship/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.0.3"
__version__ = "2.0.4"


def get_provider_info():
Expand Down
Empty file.
Empty file.
Empty file.
253 changes: 253 additions & 0 deletions astronomer_starship/providers/starship/hooks/starship.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
"""
Hooks for interacting with Starship migrations
"""
from abc import ABC, abstractmethod

from typing import List

from airflow.providers.http.hooks.http import HttpHook
from airflow.hooks.base import BaseHook

from astronomer_starship.starship_api import starship_compat

POOLS_ROUTE = "/api/starship/pools"
CONNECTIONS_ROUTE = "/api/starship/connections"
VARIABLES_ROUTE = "/api/starship/variables"
DAGS_ROUTE = "/api/starship/dags"
DAG_RUNS_ROUTE = "/api/starship/dag_runs"
TASK_INSTANCES_ROUTE = "/api/starship/task_instances"


class StarshipHook(ABC):
@abstractmethod
def get_variables(self):
pass

@abstractmethod
def set_variable(self, **kwargs):
pass

@abstractmethod
def get_pools(self):
pass

@abstractmethod
def set_pool(self, **kwargs):
pass

@abstractmethod
def get_connections(self):
pass

@abstractmethod
def set_connection(self, **kwargs):
pass

@abstractmethod
def get_dags(self):
pass

@abstractmethod
def set_dag_is_paused(self, dag_id: str, is_paused: bool):
pass

@abstractmethod
def get_dag_runs(self, dag_id: str, offset: int = 0, limit: int = 10) -> dict:
pass

@abstractmethod
def set_dag_runs(self, dag_runs: list):
pass

@abstractmethod
def get_task_instances(self, dag_id: str, offset: int = 0, limit: int = 10):
pass

@abstractmethod
def set_task_instances(self, task_instances: list):
pass


class StarshipLocalHook(BaseHook, StarshipHook):
"""Hook to retrieve local Airflow data, which can then be sent to the Target Starship instance."""

def get_variables(self):
"""
Get all variables from the local Airflow instance.
"""
return starship_compat.get_variables()

def set_variable(self, **kwargs):
raise RuntimeError("Setting local data is not supported")

def get_pools(self):
"""
Get all pools from the local Airflow instance.
"""
return starship_compat.get_pools()

def set_pool(self, **kwargs):
raise RuntimeError("Setting local data is not supported")

# noinspection PyMethodOverriding
def get_connections(self):
"""
Get all connections from the local Airflow instance.
"""
return starship_compat.get_connections()

def set_connection(self, **kwargs):
raise RuntimeError("Setting local data is not supported")

def get_dags(self) -> dict:
"""
Get all DAGs from the local Airflow instance.
"""
return starship_compat.get_dags()

def set_dag_is_paused(self, dag_id: str, is_paused: bool):
"""
Set the paused status of a DAG in the local Airflow instance.
"""
return starship_compat.set_dag_is_paused(dag_id, is_paused)

def get_dag_runs(self, dag_id: str, offset: int = 0, limit: int = 10) -> dict:
"""
Get DAG runs from the local Airflow instance.
"""
return starship_compat.get_dag_runs(dag_id, offset=offset, limit=limit)

def set_dag_runs(self, dag_runs: list):
raise RuntimeError("Setting local data is not supported")

def get_task_instances(self, dag_id: str, offset: int = 0, limit: int = 10):
"""
Get task instances from the local Airflow instance.
"""
return starship_compat.get_task_instances(dag_id, offset=offset, limit=limit)

def set_task_instances(self, task_instances: list):
raise RuntimeError("Setting local data is not supported")


class StarshipHttpHook(HttpHook, StarshipHook):
def get_variables(self):
"""
Get all variables from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(VARIABLES_ROUTE)
res = conn.get(url)
res.raise_for_status()
return res.json()

def set_variable(self, **kwargs):
"""
Set a variable in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(VARIABLES_ROUTE)
res = conn.post(url, json=kwargs)
res.raise_for_status()
return res.json()

def get_pools(self):
"""
Get all pools from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(POOLS_ROUTE)
res = conn.get(url)
res.raise_for_status()
return res.json()

def set_pool(self, **kwargs):
"""
Set a pool in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(POOLS_ROUTE)
res = conn.post(url, json=kwargs)
res.raise_for_status()
return res.json()

# noinspection PyMethodOverriding
def get_connections(self):
"""
Get all connections from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(CONNECTIONS_ROUTE)
res = conn.get(url)
res.raise_for_status()
return res.json()

def set_connection(self, **kwargs):
"""
Set a connection in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(CONNECTIONS_ROUTE)
res = conn.post(url, json=kwargs)
res.raise_for_status()
return res.json()

def get_dags(self) -> dict:
"""
Get all DAGs from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(DAGS_ROUTE)
res = conn.get(url)
res.raise_for_status()
return res.json()

def set_dag_is_paused(self, dag_id: str, is_paused: bool):
"""
Set the paused status of a DAG in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(DAGS_ROUTE)
res = conn.patch(url, json={"dag_id": dag_id, "is_paused": is_paused})
res.raise_for_status()
return res.json()

def get_dag_runs(self, dag_id: str, offset: int = 0, limit: int = 10) -> dict:
"""
Get DAG runs from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(DAG_RUNS_ROUTE)
res = conn.get(url, params={"dag_id": dag_id, "limit": limit})
res.raise_for_status()
return res.json()

def set_dag_runs(self, dag_runs: List[dict]) -> dict:
"""
Set DAG runs in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(DAG_RUNS_ROUTE)
res = conn.post(url, json={"dag_runs": dag_runs})
res.raise_for_status()
return res.json()

def get_task_instances(self, dag_id: str, offset: int = 0, limit: int = 10):
"""
Get task instances from the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(TASK_INSTANCES_ROUTE)
res = conn.get(url, params={"dag_id": dag_id, "limit": limit})
res.raise_for_status()
return res.json()

def set_task_instances(self, task_instances: list[dict]) -> dict:
"""
Set task instances in the Target Starship instance.
"""
conn = self.get_conn()
url = self.url_from_endpoint(TASK_INSTANCES_ROUTE)
res = conn.post(url, json={"task_instances": task_instances})
res.raise_for_status()
return res.json()
Empty file.
Loading

0 comments on commit d216a76

Please sign in to comment.