Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi export functionality to scheduled DataDoc #372

Merged
merged 1 commit into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ Success: "{{ doc_title }}" (Doc ID: {{ doc_id }}) has completed!
Failure: "{{ doc_title }}" (Doc ID: {{ doc_id }}) has failed!
{% endif %}

* **Here is the url for the DataDoc: <{{ doc_url }}>**
{% if export_url != None %}
* **Here is the exported query result url: <{{ export_url }}>**
{% elif is_success == False and error_msg != None%}
* **The failure reason: {{ error_msg }}**
{% endif %}
- **Here is the url for the DataDoc: <{{ doc_url }}>**
{% if export_urls|length > 0 %}
- **Here is the exported query result url:**
{% for export_url in export_urls %}
<{{ export_url }}>
{% endfor %}
{% elif is_success == False and error_msg != None %}
- **The failure reason: {{ error_msg }}**
{% endif %}
44 changes: 16 additions & 28 deletions datahub/server/datasources/datadoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
from env import DataHubSettings

from lib.celery.cron import validate_cron
from lib.form import validate_form
from lib.export.all_exporters import get_exporter
from lib.logger import get_logger
from lib.scheduled_datadoc.validator import validate_datadoc_schedule_config
from lib.scheduled_datadoc.legacy import convert_if_legacy_datadoc_schedule

from logic import (
datadoc_collab,
Expand Down Expand Up @@ -265,38 +265,25 @@ def get_datadoc_schedule(id):
verify_data_doc_permission(id, session=session)

schedule_name = get_data_doc_schedule_name(id)
return schedule_logic.get_task_schedule_by_name(schedule_name, session=session)


def validate_datadoc_schedule_kwargs(kwargs):
allowed_keys = [
"notify_with",
"notify_on",
"exporter_cell_id",
"exporter_name",
"exporter_params",
]
for key in kwargs.keys():
api_assert(key in allowed_keys, "Invalid field {}".format(key))

# Check if export_cell_id is provided then export name must be valid
if kwargs.get("exporter_cell_id", None) is not None:
exporter_name = kwargs.get("exporter_name", None)
exporter = get_exporter(exporter_name)
api_assert(exporter is not None, "Invalid exporter {}".format(exporter_name))
schedule = schedule_logic.get_task_schedule_by_name(
schedule_name, session=session
)
if not schedule:
return None

exporter_params = kwargs.get("exporter_params", {})
exporter_form = exporter.export_form
if not (exporter_form is None and not exporter_params):
valid, reason = validate_form(exporter_form, exporter_params)
api_assert(valid, "Invalid exporter params, reason: " + reason)
schedule_dict = schedule.to_dict()
schedule_dict["kwargs"] = convert_if_legacy_datadoc_schedule(
schedule_dict["kwargs"]
)
return schedule_dict


@register("/datadoc/<int:id>/schedule/", methods=["POST"])
def create_datadoc_schedule(
id, cron, kwargs,
):
validate_datadoc_schedule_kwargs(kwargs)
kwargs_valid, kwargs_valid_reason = validate_datadoc_schedule_config(kwargs)
api_assert(kwargs_valid, kwargs_valid_reason)
api_assert(validate_cron(cron), "Invalid cron expression")

schedule_name = get_data_doc_schedule_name(id)
Expand All @@ -318,7 +305,8 @@ def create_datadoc_schedule(
@register("/datadoc/<int:id>/schedule/", methods=["PUT"])
def update_datadoc_schedule(id, cron=None, enabled=None, kwargs=None):
if kwargs is not None:
validate_datadoc_schedule_kwargs(kwargs)
kwargs_valid, kwargs_valid_reason = validate_datadoc_schedule_config(kwargs)
api_assert(kwargs_valid, kwargs_valid_reason)
if cron is not None:
api_assert(validate_cron(cron), "Invalid cron expression")

Expand Down
1 change: 1 addition & 0 deletions datahub/server/lib/export/all_exporters.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from lib.utils.plugin import import_plugin


ALL_PLUGIN_EXPORTERS = import_plugin("exporter_plugin", "ALL_PLUGIN_EXPORTERS", [])

# No default exporter is provided
Expand Down
4 changes: 2 additions & 2 deletions datahub/server/lib/notify/all_notifiers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from lib.utils.plugin import import_plugin
from lib.notify.notifier.email_notifier import EmailNotifier
from .notifier.email_notifier import EmailNotifier

ALL_PLUGIN_NOTIFIERS = import_plugin(
"notifier_plugin", "ALL_PLUGIN_NOTIFIERS", [EmailNotifier()]
"notifier_plugin", "ALL_PLUGIN_NOTIFIERS", [EmailNotifier(),],
)

ALL_NOTIFIERS = ALL_PLUGIN_NOTIFIERS
Expand Down
6 changes: 4 additions & 2 deletions datahub/server/lib/notify/notifier/slack_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@


class SlackNotifier(BaseNotifier):
def __init__(self, token=None):
self.token = token if token is not None else DataHubSettings.DATAHUB_SLACK_TOKEN

@property
def notifier_name(self):
return "slack"
Expand All @@ -14,9 +17,8 @@ def notifier_format(self):

def notify(self, user, message):
to = f"@{user.username}"
token = DataHubSettings.DATAHUB_SLACK_TOKEN
url = "https://slack.com/api/chat.postMessage"
headers = {"Authorization": "Bearer {}".format(token)}
headers = {"Authorization": "Bearer {}".format(self.token)}
text = self._convert_markdown(message)
data = {
"text": text,
Expand Down
Empty file.
59 changes: 59 additions & 0 deletions datahub/server/lib/scheduled_datadoc/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from collections import defaultdict
from typing import List

from app.db import with_session
from const.query_execution import QueryExecutionStatus
from lib.export.all_exporters import get_exporter
from logic.datadoc import get_data_doc_by_id
from logic.query_execution import get_last_query_execution_from_cell


@with_session
def export_datadoc(doc_id, uid, exports, session=None):
if not len(exports):
return []

export_by_cell = group_export_by_cell_id(exports)
datadoc = get_data_doc_by_id(doc_id, session=session)
query_cells = datadoc.get_query_cells()
export_urls = []
for cell in query_cells:
if cell.id not in export_by_cell:
pass
export_urls.extend(
_export_query_cell(cell, uid, export_by_cell[cell.id], session)
)

return export_urls


def group_export_by_cell_id(exports: List):
export_by_cell = defaultdict(list)
for export in exports:
export_by_cell[export["exporter_cell_id"]].append(export)
return export_by_cell


def _export_query_cell(
cell,
uid,
cell_exports,
session, # Don't use with_session because cell is should be in another
):
statement_execution_id = None

query_execution = get_last_query_execution_from_cell(cell.id, session=session)
if not query_execution or query_execution.status != QueryExecutionStatus.DONE:
return [query_execution.status]
statement_execution_id = query_execution.statement_executions[-1].id

export_urls = []
for export in cell_exports:
exporter_name = export["exporter_name"]
exporter_params = export.get("exporter_params", {})

exporter = get_exporter(exporter_name)
export_urls.append(
exporter.export(statement_execution_id, uid, **exporter_params)
)
return export_urls
41 changes: 41 additions & 0 deletions datahub/server/lib/scheduled_datadoc/legacy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import Dict


def set_key_if_exists(to_dict: Dict, from_dict: Dict, key: str):
if key in from_dict:
to_dict[key] = from_dict[key]


def convert_if_legacy_datadoc_schedule(schedule_config: Dict) -> Dict:
"""Previously, the format of the datadoc schedule can only
support a single export. Now the exports config is in an
array to support multiple export options. This function
ensures that the config used is always converted to
the one with multi-export.

Args:
schedule_config (Dict): Can be legacy config or no legacy

Returns:
Dict: Up to date config
"""

is_legacy_config = "exporter_cell_id" in schedule_config
if not is_legacy_config:
return schedule_config

export_config = {
"exporter_cell_id": schedule_config["exporter_cell_id"],
"exporter_name": schedule_config["exporter_name"],
}
set_key_if_exists(export_config, schedule_config, "exporter_params")

new_schedule_config = {
"exports": [export_config],
"doc_id": schedule_config["doc_id"],
"user_id": schedule_config["user_id"],
}
set_key_if_exists(new_schedule_config, schedule_config, "notify_with")
set_key_if_exists(new_schedule_config, schedule_config, "notify_on")

return new_schedule_config
72 changes: 72 additions & 0 deletions datahub/server/lib/scheduled_datadoc/notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import Dict, List

from app.db import DBSession, with_session
from const.schedule import NotifyOn
from env import DataHubSettings
from lib.notify.utils import notify_user
from logic.datadoc import get_data_doc_by_id
from models.user import User


def notifiy_on_datadoc_complete(
doc_id: int,
user_id: int,
is_success: bool,
notify_with: str,
notify_on: int, # NotifyOn.value
error_msg: str,
export_urls: List[str],
):
if _should_notify(notify_with, notify_on, is_success):
with DBSession() as session:
_send_datadoc_notification(
user_id,
notify_with,
_get_datadoc_notification_params(
doc_id, is_success, error_msg, export_urls, session=session
),
session=session,
)


def _should_notify(notify_with: str, notify_on: NotifyOn, is_success: bool):
return bool(notify_with) and (
notify_on == NotifyOn.ALL.value
or (
(notify_on == NotifyOn.ON_SUCCESS.value and is_success)
or (notify_on == NotifyOn.ON_FAILURE.value and not is_success)
)
)


@with_session
def _get_datadoc_notification_params(
doc_id: int, is_success: bool, error_msg: str, export_urls: List[str], session=None
):
datadoc = get_data_doc_by_id(doc_id, session=session)
doc_title = datadoc.title or "Untitled"
env_name = datadoc.environment.name
doc_url = f"{DataHubSettings.PUBLIC_URL}/{env_name}/datadoc/{doc_id}/"

return dict(
is_success=is_success,
doc_title=doc_title,
doc_url=doc_url,
doc_id=doc_id,
export_urls=export_urls,
error_msg=error_msg,
)


@with_session
def _send_datadoc_notification(
user_id: int, notify_with: str, notification_params: Dict, session=None,
):
user = User.get(id=user_id, session=session)
notify_user(
user=user,
template_name="datadoc_completion_notification",
template_params=notification_params,
notifier_name=notify_with,
session=session,
)
55 changes: 55 additions & 0 deletions datahub/server/lib/scheduled_datadoc/validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import Dict, List
from lib.export.all_exporters import get_exporter
from lib.form import validate_form


class InvalidScheduleException(Exception):
pass


valid_schedule_config_keys = ["notify_with", "notify_on", "exports"]
valid_export_config_keys = ["exporter_cell_id", "exporter_name", "exporter_params"]


def validate_datadoc_schedule_config(schedule_config):
try:
validate_dict_keys(schedule_config, valid_schedule_config_keys)
validate_exporters_config(schedule_config.get("exports", []))
except InvalidScheduleException as e:
return False, str(e)
return True, ""


def validate_dict_keys(d: Dict, allowed_keys: List):
for key in d.keys():
if key not in allowed_keys:
raise InvalidScheduleException(f"Invalid field {key}")


def validate_exporters_config(export_configs: List):
if not export_configs:
return

for export_config in export_configs:
validate_dict_keys(export_config, valid_export_config_keys)

if export_config.get("exporter_cell_id", None) is None:
raise InvalidScheduleException("exporter_cell_id is required")

exporter = _get_exporter(export_config)
exporter_params = export_config.get("exporter_params", {})
exporter_form = exporter.export_form
if exporter_form is not None or exporter_params:
valid, reason = validate_form(exporter_form, exporter_params)
if not valid:
raise InvalidScheduleException(
f"Invalid exporter params, reason: {reason}"
)


def _get_exporter(export_config):
exporter_name = export_config.get("exporter_name", None)
try:
return get_exporter(exporter_name)
except ValueError:
raise InvalidScheduleException(f"Invalid exporter {exporter_name}")
10 changes: 10 additions & 0 deletions datahub/server/logic/query_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ def get_datadoc_id_from_query_execution_id(query_execution_id, session=None):
)


@with_session
def get_last_query_execution_from_cell(cell_id, session=None):
return (
session.query(QueryExecution)
.join(DataCellQueryExecution, DataCellQueryExecution.data_cell_id == cell_id,)
.order_by(QueryExecution.id.desc())
.first()
)


@with_session
def search_query_execution(
environment_id, filters, orderBy, limit, offset, session=None
Expand Down
3 changes: 3 additions & 0 deletions datahub/server/models/datadoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def to_dict(self, with_cells=False):

return data_doc_dict

def get_query_cells(self):
return [cell for cell in self.cells if cell.cell_type == DataCellType.query]


class DataCell(Base):
__tablename__ = "data_cell"
Expand Down
Loading