diff --git a/airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java b/airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java index 384d3d9701a9..7daa4ca826a8 100644 --- a/airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java +++ b/airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java @@ -63,13 +63,13 @@ public CustomerioNotificationClient(final Notification notification, } @Override - public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl) + public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl, final Long jobId) throws IOException, InterruptedException { throw new NotImplementedException(); } @Override - public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl) + public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl, final Long jobId) throws IOException, InterruptedException { throw new NotImplementedException(); } diff --git a/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java b/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java index fb6e8da8a124..c4be2734ef21 100644 --- a/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java +++ b/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java @@ -23,14 +23,16 @@ public abstract boolean notifyJobFailure( String sourceConnector, String destinationConnector, String jobDescription, - String logUrl) + String logUrl, + Long jobId) throws IOException, InterruptedException; public abstract boolean notifyJobSuccess( String sourceConnector, String destinationConnector, String jobDescription, - String logUrl) + String logUrl, + Long jobId) throws IOException, InterruptedException; public abstract boolean notifyConnectionDisabled(String receiverEmail, diff --git a/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java b/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java index c4cd4e259f2d..14b41134f2e5 100644 --- a/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java +++ b/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java @@ -44,25 +44,27 @@ public SlackNotificationClient(final Notification notification) { } @Override - public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl) + public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl, final Long jobId) throws IOException, InterruptedException { return notifyFailure(renderTemplate( "slack/failure_slack_notification_template.txt", sourceConnector, destinationConnector, jobDescription, - logUrl)); + logUrl, + String.valueOf(jobId))); } @Override - public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl) + public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl, final Long jobId) throws IOException, InterruptedException { return notifySuccess(renderTemplate( "slack/success_slack_notification_template.txt", sourceConnector, destinationConnector, jobDescription, - logUrl)); + logUrl, + String.valueOf(jobId))); } @Override diff --git a/airbyte-notification/src/main/resources/slack/failure_slack_notification_template.txt b/airbyte-notification/src/main/resources/slack/failure_slack_notification_template.txt index b1c8eaabb4a7..4eb2827f56a5 100644 --- a/airbyte-notification/src/main/resources/slack/failure_slack_notification_template.txt +++ b/airbyte-notification/src/main/resources/slack/failure_slack_notification_template.txt @@ -2,3 +2,5 @@ Your connection from %s to %s just failed... This happened with %s You can access its logs here: %s + +Job ID: %s \ No newline at end of file diff --git a/airbyte-notification/src/main/resources/slack/success_slack_notification_template.txt b/airbyte-notification/src/main/resources/slack/success_slack_notification_template.txt index 564e72af41b2..f8ea6cea9566 100644 --- a/airbyte-notification/src/main/resources/slack/success_slack_notification_template.txt +++ b/airbyte-notification/src/main/resources/slack/success_slack_notification_template.txt @@ -2,3 +2,5 @@ Your connection from %s to %s succeeded This was for %s You can access its logs here: %s + +Job ID: %s \ No newline at end of file diff --git a/airbyte-notification/src/test/java/io/airbyte/notification/SlackNotificationClientTest.java b/airbyte-notification/src/test/java/io/airbyte/notification/SlackNotificationClientTest.java index aecb3c757321..36713df5b59f 100644 --- a/airbyte-notification/src/test/java/io/airbyte/notification/SlackNotificationClientTest.java +++ b/airbyte-notification/src/test/java/io/airbyte/notification/SlackNotificationClientTest.java @@ -40,6 +40,7 @@ class SlackNotificationClientTest { private static final String JOB_DESCRIPTION = "job description"; private static final String LOG_URL = "logUrl"; private static final String SOURCE_TEST = "source-test"; + private static final Long JOB_ID = 1L; public static final String WEBHOOK_URL = "http://localhost:"; private static final String EXPECTED_FAIL_MESSAGE = "Your connection from source-test to destination-test just failed...\n" @@ -81,7 +82,7 @@ void testBadWebhookUrl() { new SlackNotificationClient(new Notification() .withNotificationType(NotificationType.SLACK) .withSlackConfiguration(new SlackNotificationConfiguration().withWebhook(WEBHOOK_URL + server.getAddress().getPort() + "/bad"))); - assertThrows(IOException.class, () -> client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL)); + assertThrows(IOException.class, () -> client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL, JOB_ID)); } @Test @@ -89,7 +90,7 @@ void testEmptyWebhookUrl() throws IOException, InterruptedException { final SlackNotificationClient client = new SlackNotificationClient( new Notification().withNotificationType(NotificationType.SLACK).withSlackConfiguration(new SlackNotificationConfiguration())); - assertFalse(client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL)); + assertFalse(client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL, JOB_ID)); } @Test @@ -111,7 +112,7 @@ void testNotifyJobFailure() throws IOException, InterruptedException { new SlackNotificationClient(new Notification() .withNotificationType(NotificationType.SLACK) .withSlackConfiguration(new SlackNotificationConfiguration().withWebhook(WEBHOOK_URL + server.getAddress().getPort() + TEST_PATH))); - assertTrue(client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL)); + assertTrue(client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL, JOB_ID)); } @Test @@ -122,7 +123,7 @@ void testNotifyJobSuccess() throws IOException, InterruptedException { .withNotificationType(NotificationType.SLACK) .withSendOnSuccess(true) .withSlackConfiguration(new SlackNotificationConfiguration().withWebhook(WEBHOOK_URL + server.getAddress().getPort() + TEST_PATH))); - assertTrue(client.notifyJobSuccess(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL)); + assertTrue(client.notifyJobSuccess(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL, JOB_ID)); } @Test diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java index 947b4080e107..59b93cb425f5 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java @@ -103,12 +103,12 @@ private void notifyJob(final String reason, MoreMaps.merge(jobMetadata, sourceMetadata, destinationMetadata, notificationMetadata.build())); if (FAILURE_NOTIFICATION == action) { - if (!notificationClient.notifyJobFailure(sourceConnector, destinationConnector, jobDescription, logUrl)) { + if (!notificationClient.notifyJobFailure(sourceConnector, destinationConnector, jobDescription, logUrl, job.getId())) { LOGGER.warn("Failed to successfully notify failure: {}", notification); } break; } else if (SUCCESS_NOTIFICATION == action) { - if (!notificationClient.notifyJobSuccess(sourceConnector, destinationConnector, jobDescription, logUrl)) { + if (!notificationClient.notifyJobSuccess(sourceConnector, destinationConnector, jobDescription, logUrl, job.getId())) { LOGGER.warn("Failed to successfully notify success: {}", notification); } break; diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java index 01abe668d711..68dba755783d 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java @@ -4,8 +4,7 @@ package io.airbyte.scheduler.persistence; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -85,7 +84,7 @@ void testFailJob() throws IOException, InterruptedException, JsonValidationExcep when(configRepository.getStandardDestinationDefinition(any())).thenReturn(destinationDefinition); when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)).thenReturn(getWorkspace()); when(workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(job.getId())).thenReturn(WORKSPACE_ID); - when(notificationClient.notifyJobFailure(anyString(), anyString(), anyString(), anyString())).thenReturn(true); + when(notificationClient.notifyJobFailure(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(true); jobNotifier.failJob("JobNotifierTest was running", job); final DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.FULL).withZone(ZoneId.systemDefault()); @@ -94,7 +93,8 @@ void testFailJob() throws IOException, InterruptedException, JsonValidationExcep "destination-test", String.format("sync started on %s, running for 1 day 10 hours 17 minutes 36 seconds, as the JobNotifierTest was running.", formatter.format(Instant.ofEpochSecond(job.getStartedAtInSecond().get()))), - String.format("http://localhost:8000/workspaces/%s/connections/%s", WORKSPACE_ID, job.getScope())); + String.format("http://localhost:8000/workspaces/%s/connections/%s", WORKSPACE_ID, job.getScope()), + job.getId()); final Builder metadata = ImmutableMap.builder(); metadata.put("connection_id", UUID.fromString(job.getScope()));