Skip to content

Commit

Permalink
Enforce notification settings on schema change (#7104)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed Jun 12, 2023
1 parent d50fc0a commit e7897a8
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
import io.airbyte.featureflag.Connection;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.UseNotificationWorkflow;
import io.airbyte.notification.NotificationType;
import io.airbyte.notification.NotificationEvent;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.client.WorkflowClient;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -72,7 +71,8 @@ private void callNotificationWorkflow(final UUID connectionId,
throw new RuntimeException(e);
}
try {
notificationWorkflow.sendNotification(connectionId, "", message, List.of(NotificationType.webhook, NotificationType.customerio));
notificationWorkflow.sendNotification(connectionId, "", message,
containsBreakingChange ? NotificationEvent.onBreakingChange : NotificationEvent.onNonBreakingChange);
} catch (final RuntimeException e) {
log.error("There was an error while sending a Schema Change Notification", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

package io.airbyte.commons.temporal.scheduling;

import io.airbyte.notification.NotificationType;
import io.airbyte.notification.NotificationEvent;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.util.List;
import java.util.UUID;

/**
Expand All @@ -17,6 +16,6 @@
public interface NotificationWorkflow {

@WorkflowMethod
public void sendNotification(UUID connectionId, String subject, String message, List<NotificationType> notificationType);
public void sendNotification(UUID connectionId, String subject, String message, NotificationEvent notificationEvent);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.TestClient;
import io.airbyte.featureflag.UseNotificationWorkflow;
import io.airbyte.notification.NotificationType;
import io.airbyte.notification.NotificationEvent;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.client.WorkflowClient;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -51,7 +50,7 @@ void testCallNewNotifyWorkflow() {

notificationClient.sendSchemaChangeNotification(connectionId, "", false);

verify(notificationWorkflow).sendNotification(eq(connectionId), any(), any(), eq(List.of(NotificationType.webhook, NotificationType.customerio)));
verify(notificationWorkflow).sendNotification(eq(connectionId), any(), any(), eq(NotificationEvent.onNonBreakingChange));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.notification;

import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.NotificationItem;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import jakarta.inject.Singleton;
import java.util.UUID;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

/**
* Fetching notification settings from workspace.
*/
@Singleton
@Slf4j
public class WorkspaceNotificationConfigFetcher {

private final WorkspaceApi workspaceApi;

public WorkspaceNotificationConfigFetcher(WorkspaceApi workspaceApi) {
this.workspaceApi = workspaceApi;
}

@Value
class NotificationItemWithCustomerIoConfig {

NotificationItem notificationItem;
CustomerIoEmailConfig customerIoEmailConfig;

}

/**
* Fetch corresponding notificationItem based on notification action.
*/
public NotificationItemWithCustomerIoConfig fetchNotificationConfig(final UUID connectionId, NotificationEvent notificationEvent) {
final WorkspaceRead workspaceRead = AirbyteApiClient.retryWithJitter(
() -> workspaceApi.getWorkspaceByConnectionId(new ConnectionIdRequestBody().connectionId(connectionId)),
"retrieve workspace for notification use.",
/* jitterMaxIntervalSecs= */10,
/* finalInternvalSecs= */10,
/* maxTries= */ 3);
if (workspaceRead == null) {
log.error(
String.format("Unable to fetch workspace by connection %s. Not blocking but we are not sending any notifications. \n", connectionId));
return new NotificationItemWithCustomerIoConfig(new NotificationItem(), new CustomerIoEmailConfig(""));
}

NotificationItem item;

switch (notificationEvent) {
case onBreakingChange -> {
item = workspaceRead.getNotificationSettings().getSendOnConnectionUpdateActionRequired();
break;
}
case onNonBreakingChange -> {
item = workspaceRead.getNotificationSettings().getSendOnConnectionUpdate();
break;
}
default -> throw new RuntimeException("Unexpected notification action: " + notificationEvent);
}

return new NotificationItemWithCustomerIoConfig(item, new CustomerIoEmailConfig(workspaceRead.getEmail()));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ enum class NotificationType {
webhook, customerio
}

enum class NotificationEvent {
onNonBreakingChange, onBreakingChange
}

@Singleton
open class NotificationHandler(
private val maybeWebhookConfigFetcher: WebhookConfigFetcher?,
private val maybeCustomerIoConfigFetcher: CustomerIoEmailConfigFetcher?,
private val maybeWebhookNotificationSender: WebhookNotificationSender?,
private val maybeCustomerIoNotificationSender: CustomerIoEmailNotificationSender?,
private val maybeWorkspaceNotificationConfigFetcher: WorkspaceNotificationConfigFetcher?,
) {
/**
* Send a notification with a subject and a message if a configuration is present
Expand All @@ -34,4 +39,33 @@ open class NotificationHandler(
}
}
}
}

open fun sendNotification(connectionId: UUID, subject: String, message: String, notificationEvent: NotificationEvent) {
val notificationItemWithCustomerIoEmailConfig = maybeWorkspaceNotificationConfigFetcher?.fetchNotificationConfig(connectionId, notificationEvent)

if (notificationItemWithCustomerIoEmailConfig?.notificationItem == null) {
return;
}
notificationItemWithCustomerIoEmailConfig!!.notificationItem.notificationType!!.forEach { notificationType ->
runCatching {
if (maybeWebhookNotificationSender != null
&& notificationType == io.airbyte.api.client.model.generated.NotificationType.SLACK) {
val webhookConfig = WebhookConfig(
notificationItemWithCustomerIoEmailConfig
!!.notificationItem
!!.slackConfiguration
!!.webhook)
maybeWebhookNotificationSender.sendNotification(webhookConfig, subject, message)
}

if (maybeCustomerIoNotificationSender != null
&& notificationType == io.airbyte.api.client.model.generated.NotificationType.CUSTOMERIO) {
maybeCustomerIoNotificationSender
.sendNotification(notificationItemWithCustomerIoEmailConfig
!!.customerIoEmailConfig, subject, message)
}
}
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.notification;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.NotificationItem;
import io.airbyte.api.client.model.generated.NotificationSettings;
import io.airbyte.api.client.model.generated.NotificationType;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.notification.WorkspaceNotificationConfigFetcher.NotificationItemWithCustomerIoConfig;
import java.util.UUID;
import org.junit.jupiter.api.Test;

class WorkspaceNotificationConfigFetcherTest {

private final WorkspaceApi workspaceApi = mock(WorkspaceApi.class);

private final WorkspaceNotificationConfigFetcher workspaceNotificationConfigFetcher = new WorkspaceNotificationConfigFetcher(workspaceApi);

@Test
void testReturnTheRightConfig() throws ApiException {
final UUID connectionId = UUID.randomUUID();
final String email = "em@il.com";
final NotificationItem notificationItem = new NotificationItem().addNotificationTypeItem(NotificationType.CUSTOMERIO);
when(workspaceApi.getWorkspaceByConnectionId(new ConnectionIdRequestBody().connectionId(connectionId)))
.thenReturn(
new WorkspaceRead().email(email).notificationSettings(new NotificationSettings().sendOnConnectionUpdateActionRequired(notificationItem)));

NotificationItemWithCustomerIoConfig result =
workspaceNotificationConfigFetcher.fetchNotificationConfig(connectionId, NotificationEvent.onBreakingChange);
assertEquals(notificationItem, result.getNotificationItem());
assertEquals(email, result.getCustomerIoEmailConfig().getTo());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class NotificationHandlerTest {
private val customerIoConfigFetcher: CustomerIoEmailConfigFetcher = mockk()
private val webhookNotificationSender: WebhookNotificationSender = mockk()
private val customerIoNotificationSender: CustomerIoEmailNotificationSender = mockk()
private val workspaceNotificationConfigFetcher: WorkspaceNotificationConfigFetcher = mockk()

private val webhookConfig: WebhookConfig = WebhookConfig("http://webhook.com")
private val customerIoConfig: CustomerIoEmailConfig = CustomerIoEmailConfig( "to@to.com")
Expand All @@ -22,7 +23,7 @@ class NotificationHandlerTest {

@Test
fun testNoBeanPresent() {
val notificationHandler = NotificationHandler(null, null, null, null)
val notificationHandler = NotificationHandler(null, null, null, null, null,)

notificationHandler.sendNotification(connectionId, subject, message, listOf(NotificationType.webhook))

Expand All @@ -37,7 +38,8 @@ class NotificationHandlerTest {
val notificationHandler = NotificationHandler(webhookConfigFetcher,
customerIoConfigFetcher,
webhookNotificationSender,
customerIoNotificationSender,)
customerIoNotificationSender,
workspaceNotificationConfigFetcher,)

every {
webhookConfigFetcher.fetchConfig(connectionId)
Expand Down Expand Up @@ -69,7 +71,8 @@ class NotificationHandlerTest {
val notificationHandler = NotificationHandler(webhookConfigFetcher,
customerIoConfigFetcher,
webhookNotificationSender,
customerIoNotificationSender,)
customerIoNotificationSender,
workspaceNotificationConfigFetcher,)

notificationHandler.sendNotification(connectionId, subject, message, listOf())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
package io.airbyte.workers.temporal.scheduling;

import io.airbyte.commons.temporal.scheduling.NotificationWorkflow;
import io.airbyte.notification.NotificationType;
import io.airbyte.notification.NotificationEvent;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.NotifyActivity;
import java.util.List;
import java.util.UUID;

/**
Expand All @@ -21,8 +20,8 @@ public class NotificationWorkflowImpl implements NotificationWorkflow {
private NotifyActivity notifyActivity;

@Override
public void sendNotification(UUID connectionId, String subject, String message, List<NotificationType> notificationType) {
notifyActivity.sendNotification(connectionId, subject, message, notificationType);
public void sendNotification(UUID connectionId, String subject, String message, NotificationEvent notificationEvent) {
notifyActivity.sendNotificationWithEvent(connectionId, subject, message, notificationEvent);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.notification.NotificationEvent;
import io.airbyte.notification.NotificationType;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
Expand All @@ -19,4 +20,7 @@ public interface NotifyActivity {
@ActivityMethod
void sendNotification(UUID connectionId, String subject, String message, List<NotificationType> notificationType);

@ActivityMethod
void sendNotificationWithEvent(UUID connectionId, String subject, String message, NotificationEvent notificationEvent);

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.notification.NotificationEvent;
import io.airbyte.notification.NotificationHandler;
import io.airbyte.notification.NotificationType;
import jakarta.inject.Singleton;
Expand All @@ -27,4 +28,9 @@ public void sendNotification(UUID connectionId, String subject, String message,
notificationHandler.sendNotification(connectionId, subject, message, notificationType);
}

@Override
public void sendNotificationWithEvent(UUID connectionId, String subject, String message, NotificationEvent notificationEvent) {
notificationHandler.sendNotification(connectionId, subject, message, notificationEvent);
}

}

0 comments on commit e7897a8

Please sign in to comment.