diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 94e1657e62b..1b6dd6bdc4e 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -111,6 +111,9 @@ def __init__( notify_on_success=False, notify_slack_webhook_url=None, notify_pager_duty_integration_key=None, + notify_incident_io_api_key=None, + incident_io_success_severity_id=None, + incident_io_error_severity_id=None, enable_heartbeat_daemon=True, enable_error_msg_capture=False, ): @@ -160,6 +163,9 @@ def __init__( self.notify_on_success = notify_on_success self.notify_slack_webhook_url = notify_slack_webhook_url self.notify_pager_duty_integration_key = notify_pager_duty_integration_key + self.notify_incident_io_api_key = notify_incident_io_api_key + self.incident_io_success_severity_id = incident_io_success_severity_id + self.incident_io_error_severity_id = incident_io_error_severity_id self.enable_heartbeat_daemon = enable_heartbeat_daemon self.enable_error_msg_capture = enable_error_msg_capture self.parameters = self._process_parameters() @@ -891,6 +897,17 @@ def _compile_workflow_template(self): and self.notify_pager_duty_integration_key else {} ), + **( + { + # workflow status maps to Completed + "notify-incident-io-on-success": LifecycleHook() + .expression("workflow.status == 'Succeeded'") + .template("notify-incident-io-on-success"), + } + if self.notify_on_success + and self.notify_incident_io_api_key + else {} + ), **( { # workflow status maps to Failed or Error @@ -918,6 +935,19 @@ def _compile_workflow_template(self): and self.notify_pager_duty_integration_key else {} ), + **( + { + # workflow status maps to Failed or Error + "notify-incident-io-on-failure": LifecycleHook() + .expression("workflow.status == 'Failed'") + .template("notify-incident-io-on-error"), + "notify-incident-io-on-error": LifecycleHook() + .expression("workflow.status == 'Error'") + .template("notify-incident-io-on-error"), + } + if self.notify_on_error and self.notify_incident_io_api_key + else {} + ), # Warning: terrible hack to workaround a bug in Argo Workflow # where the hooks listed above do not execute unless # there is an explicit exit hook. as and when this @@ -2270,9 +2300,11 @@ def _exit_hook_templates(self): if self.notify_on_error: templates.append(self._slack_error_template()) templates.append(self._pager_duty_alert_template()) + templates.append(self._incident_io_alert_template()) if self.notify_on_success: templates.append(self._slack_success_template()) templates.append(self._pager_duty_change_template()) + templates.append(self._incident_io_change_template()) if self.notify_on_error or self.notify_on_success: # Warning: terrible hack to workaround a bug in Argo Workflow where the # templates listed above do not execute unless there is an @@ -2466,6 +2498,82 @@ def _pager_duty_alert_template(self): ) ) + def _incident_io_alert_template(self): + if self.notify_incident_io_api_key is None: + return None + if self.incident_io_error_severity_id is None: + raise MetaflowException( + "Creating incidents for errors requires a severity id." + ) + return Template("notify-incident-io-on-error").http( + Http("POST") + .url("https://api.incident.io/v2/incidents") + .header("Content-Type", "application/json") + .header("Authorization", "Bearer %s" % self.notify_incident_io_api_key) + .body( + json.dumps( + { + "idempotency_key": "argo-{{workflow.name}}", # use run id to deduplicate alerts. + "visibility": "public", + "severity_id": self.incident_io_error_severity_id, + "name": "Flow %s has failed." % self.flow.name, + "summary": "Metaflow run %s/argo-{{workflow.name}} failed! %s" + % (self.flow.name, self._incident_io_ui_urls_for_run()), + # TODO: Add support for custom field entries. + } + ) + ) + ) + + def _incident_io_change_template(self): + if self.notify_incident_io_api_key is None: + return None + if self.incident_io_success_severity_id is None: + raise MetaflowException( + "Creating incidents for successes requires a severity id." + ) + return Template("notify-incident-io-on-success").http( + Http("POST") + .url("https://api.incident.io/v2/incidents") + .header("Content-Type", "application/json") + .header("Authorization", "Bearer %s" % self.notify_incident_io_api_key) + .body( + json.dumps( + { + "idempotency_key": "argo-{{workflow.name}}", # use run id to deduplicate alerts. + "visibility": "public", + "severity_id": self.incident_io_success_severity_id, + # TODO: Do we need to make incident type configurable for successes? otherwise they are created as 'investigating' + # "incident_type_id": "" + "name": "Flow %s has succeeded." % self.flow.name, + "summary": "Metaflow run %s/argo-{{workflow.name}} succeeded!%s" + % (self.flow.name, self._incident_io_ui_urls_for_run()), + # TODO: Add support for custom field entries. + } + ) + ) + ) + + def _incident_io_ui_urls_for_run(self): + links = [] + if UI_URL: + url = "[Metaflow UI](%s/%s/%s)" % ( + UI_URL.rstrip("/"), + self.flow.name, + "argo-{{workflow.name}}", + ) + links.append(url) + if ARGO_WORKFLOWS_UI_URL: + url = "[Argo UI](%s/workflows/%s/%s)" % ( + ARGO_WORKFLOWS_UI_URL.rstrip("/"), + "{{workflow.namespace}}", + "{{workflow.name}}", + ) + links.append(url) + if links: + links = ["See details for the run at: ", *links] + return "\n\n".join(links) + def _pager_duty_change_template(self): # https://developer.pagerduty.com/docs/ZG9jOjExMDI5NTgy-send-a-change-event if self.notify_pager_duty_integration_key is None: diff --git a/metaflow/plugins/argo/argo_workflows_cli.py b/metaflow/plugins/argo/argo_workflows_cli.py index e71787ab04f..8fb9e123a1b 100644 --- a/metaflow/plugins/argo/argo_workflows_cli.py +++ b/metaflow/plugins/argo/argo_workflows_cli.py @@ -176,6 +176,21 @@ def argo_workflows(obj, name=None): default="", help="PagerDuty Events API V2 Integration key for workflow success/failure notifications.", ) +@click.option( + "--notify-incident-io-api-key", + default=None, + help="Incident.io API V2 key for workflow success/failure notifications.", +) +@click.option( + "--incident-io-success-severity-id", + default=None, + help="Incident.io severity id for success alerts.", +) +@click.option( + "--incident-io-error-severity-id", + default=None, + help="Incident.io severity id for error alerts.", +) @click.option( "--enable-heartbeat-daemon/--no-enable-heartbeat-daemon", default=False, @@ -213,6 +228,9 @@ def create( notify_on_success=False, notify_slack_webhook_url=None, notify_pager_duty_integration_key=None, + notify_incident_io_api_key=None, + incident_io_success_severity_id=None, + incident_io_error_severity_id=None, enable_heartbeat_daemon=True, deployer_attribute_file=None, enable_error_msg_capture=False, @@ -268,6 +286,9 @@ def create( notify_on_success, notify_slack_webhook_url, notify_pager_duty_integration_key, + notify_incident_io_api_key, + incident_io_success_severity_id, + incident_io_error_severity_id, enable_heartbeat_daemon, enable_error_msg_capture, ) @@ -442,6 +463,9 @@ def make_flow( notify_on_success, notify_slack_webhook_url, notify_pager_duty_integration_key, + notify_incident_io_api_key, + incident_io_success_severity_id, + incident_io_error_severity_id, enable_heartbeat_daemon, enable_error_msg_capture, ): @@ -453,17 +477,30 @@ def make_flow( ) if (notify_on_error or notify_on_success) and not ( - notify_slack_webhook_url or notify_pager_duty_integration_key + notify_slack_webhook_url + or notify_pager_duty_integration_key + or notify_incident_io_api_key ): raise MetaflowException( - "Notifications require specifying an incoming Slack webhook url via --notify-slack-webhook-url or " - "PagerDuty events v2 integration key via --notify-pager-duty-integration-key.\n If you would like to set up " - "notifications for your Slack workspace, follow the instructions at " - "https://api.slack.com/messaging/webhooks to generate a webhook url.\n For notifications through PagerDuty, " - "generate an integration key by following the instructions at " - "https://support.pagerduty.com/docs/services-and-integrations#create-a-generic-events-api-integration" + "Notifications require specifying an incoming Slack webhook url via --notify-slack-webhook-url, PagerDuty events v2 integration key via --notify-pager-duty-integration-key or\n" + "Incident.io integration API key via --notify-incident-io-api-key.\n" + " If you would like to set up notifications for your Slack workspace, follow the instructions at " + "https://api.slack.com/messaging/webhooks to generate a webhook url.\n" + " For notifications through PagerDuty, generate an integration key by following the instructions at " + "https://support.pagerduty.com/docs/services-and-integrations#create-a-generic-events-api-integration\n" + " For notifications through Incident.io, generate an API key with a permission to create incidents." ) + if notify_incident_io_api_key: + if notify_on_error and incident_io_error_severity_id is None: + raise MetaflowException( + "Incident.io error notifications require a severity id. Please set one with --incident-io-error-severity-id" + ) + + if notify_on_success and incident_io_success_severity_id is None: + raise MetaflowException( + "Incident.io success notifications require a severity id. Please set one with --incident-io-success-severity-id" + ) # Attach @kubernetes and @environment decorator to the flow to # ensure that the related decorator hooks are invoked. decorators._attach_decorators( @@ -507,6 +544,9 @@ def make_flow( notify_on_success=notify_on_success, notify_slack_webhook_url=notify_slack_webhook_url, notify_pager_duty_integration_key=notify_pager_duty_integration_key, + notify_incident_io_api_key=notify_incident_io_api_key, + incident_io_success_severity_id=incident_io_success_severity_id, + incident_io_error_severity_id=incident_io_error_severity_id, enable_heartbeat_daemon=enable_heartbeat_daemon, enable_error_msg_capture=enable_error_msg_capture, )