Skip to content

Commit

Permalink
u
Browse files Browse the repository at this point in the history
  • Loading branch information
Drew Bittenbender committed Dec 4, 2023
1 parent 16cd88d commit d993210
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 1 deletion.
77 changes: 77 additions & 0 deletions databuilder/databuilder/models/score.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from typing import (
Iterator, Optional
)
from datetime import datetime

from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.serializers.atlas_serializer import get_entity_attrs


class Owner(GraphSerializable):
LABELS_PERMITTED_TO_HAVE_SCORE = ['Table']

SCORE_NODE_LABEL = 'Score'
SCORE_NODE_SCORE_DATE = 'score_date'
SCORE_NODE_SCORE_VERSION = 'score_version'
SCORE_RELATION_TYPE = 'SCORE'
SCORE_OF_OBJECT_RELATION_TYPE = 'SCORE_OF'

def __init__(self,
start_label: str,
start_key: str,
score: float,
score_dt: datetime,
score_version: str
) -> None:
if start_label not in Owner.LABELS_PERMITTED_TO_HAVE_SCORE:
raise Exception(f'scores for {start_label} are not supported')
self.start_label = start_label
self.start_key = start_key
self.score = score
self.score_dt = score_dt
self.score_version = score_version

self._node_iter = self._create_node_iterator()
self._relation_iter = self._create_relation_iterator()


def __repr__(self) -> str:
return f'Score({self.start_label!r}, {self.start_key!r}, {self.score!r}, {self.score_dt!r}, {self.score_version!r})'

def create_next_node(self) -> Optional[GraphNode]:
try:
return next(self._node_iter)
except StopIteration:
return None

def create_next_relation(self) -> Optional[GraphRelationship]:
try:
return next(self._relation_iter)
except StopIteration:
return None

def _create_node_iterator(self) -> Iterator[GraphNode]:
yield GraphNode(
key=self.get_score_key(),
label=self.SCORE_NODE_LABEL,
attributes={
self.SCORE_NODE_SCORE_DATE: self.score_dt,
self.SCORE_NODE_SCORE_VERSION: self.score_version,
}
)

def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
yield GraphRelationship(
start_label=self.start_label,
start_key=self.start_key,
end_label=self.SCORE_NODE_LABEL,
end_key=self.get_score_key(),
type=self.SCORE_RELATION_TYPE,
reverse_type=self.SCORE_OF_OBJECT_RELATION_TYPE,
attributes={}
)

def get_score_key(self) -> str:
return f"{self.start_key}/score"
32 changes: 32 additions & 0 deletions databuilder/databuilder/models/table_score.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import List, Union
from datetime import datetime

from databuilder.models.score import Score
from databuilder.models.table_metadata import TableMetadata


class TableScore(Score):
"""
Table Score model.
"""

def __init__(self,
db_name: str,
schema: str,
table_name: str,
cluster: str,
score: float,
score_dt: datetime,
score_version: str
) -> None:
self.start_label = TableMetadata.TABLE_NODE_LABEL
self.start_key = f'{db_name}://{cluster}.{schema}/{table_name}'

Score.__init__(
self,
start_label=self.start_label,
start_key=self.start_key,
score=score,
score_dt=score_dt,
score_version=score_version
)
113 changes: 113 additions & 0 deletions databuilder/databuilder/models/table_update_frequency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from typing import (
Iterator, Optional, Union,
)
from enum import Enum
import logging

from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.table_metadata import TableMetadata


LOGGER = logging.getLogger(__name__)


class UpdateFrequency(Enum):
MONTHLY = "monthly"
WEEKLY = "weekly"
DAILY = "daily"

def get_enum_by_value(value):
for member in UpdateFrequency:
if member.value == value:
return member
return None


class TableUpdateFrequency(GraphSerializable):
LABEL = 'Update_Frequency'
UPDATE_FREQUENCY_TABLE_RELATION_TYPE = 'UPDATE_FREQUENCY_OF'
TABLE_UPDATE_FREQUENCY_RELATION_TYPE = 'UPDATE_FREQUENCY'

def __init__(self,
db_name: str,
schema: str,
table_name: str,
cluster: str,
frequency: str,
) -> None:
self.db = db_name
self.schema = schema
self.table = table_name
self.cluster = cluster

self.frequency: UpdateFrequency = UpdateFrequency.get_enum_by_value(frequency.lower())
if not self.frequency:
LOGGER.warning("No Update Frequency provide, node will not be created")
self._node_iter = None
self._relation_iter = None
else:
self._node_iter = self._create_node_iterator()
self._relation_iter = self._create_relation_iterator()

def create_next_node(self) -> Optional[GraphNode]:
# return the string representation of the data
try:
if self._node_iter:
return next(self._node_iter)
else:
None
except StopIteration:
return None

def create_next_relation(self) -> Optional[GraphRelationship]:
try:
if self._relation_iter:
return next(self._relation_iter)
else:
return None
except StopIteration:
return None

def get_source_model_key(self) -> str:
return f'{self.get_table_model_key()}/updatefrequency'

def get_table_model_key(self) -> str:
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.db,
cluster=self.cluster,
schema=self.schema,
tbl=self.table)

def _create_node_iterator(self) -> Iterator[GraphNode]:
"""
Create a table source node
:return:
"""
node = GraphNode(
key=self.get_source_model_key(),
label=self.LABEL,
attributes={
'frequency': self.frequency.value
}
)
yield node

def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
"""
Create relation map between owner record with original hive table
:return:
"""
relationship = GraphRelationship(
start_label=self.LABEL,
start_key=self.get_source_model_key(),
end_label=TableMetadata.TABLE_NODE_LABEL,
end_key=self.get_table_model_key(),
type=self.UPDATE_FREQUENCY_TABLE_RELATION_TYPE,
reverse_type=self.TABLE_UPDATE_FREQUENCY_RELATION_TYPE,
attributes={}
)
yield relationship

def __repr__(self) -> str:
return f'TableUpdateFrequency({self.db!r}, {self.cluster!r}, {self.schema!r}, {self.table!r}, {self.frequency!r})'
2 changes: 1 addition & 1 deletion databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

__version__ = '7.4.4+foodtruck.2'
__version__ = '7.4.4+foodtruck.3'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'requirements.txt')
Expand Down

0 comments on commit d993210

Please sign in to comment.