Skip to content

Commit

Permalink
backfill notification_settings - fix a bug (#7228)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed Jun 14, 2023
1 parent 3e766d0 commit 7af2ac3
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.44.5.004";
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.1.001";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.44.5.001";
private static final String CDK_VERSION = "1.2.3";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ public static io.airbyte.config.NotificationSettings toConfig(final io.airbyte.a

// Currently customerIoConfiguration is an empty object, so we tend to keep it as null.
private static io.airbyte.config.NotificationItem toConfig(final io.airbyte.api.model.generated.NotificationItem notificationItem) {
return new io.airbyte.config.NotificationItem()
io.airbyte.config.NotificationItem result = new io.airbyte.config.NotificationItem()
.withNotificationType(notificationItem.getNotificationType().stream()
.map(notificationType -> Enums.convertTo(notificationType, io.airbyte.config.Notification.NotificationType.class)).collect(
Collectors.toList()))
.withSlackConfiguration(toConfig(notificationItem.getSlackConfiguration()));
Collectors.toList()));

if (notificationItem.getSlackConfiguration() != null) {
result.withSlackConfiguration(toConfig(notificationItem.getSlackConfiguration()));
}
return result;
}

private static io.airbyte.config.SlackNotificationConfiguration toConfig(final io.airbyte.api.model.generated.SlackNotificationConfiguration notification) {
Expand Down Expand Up @@ -86,11 +90,14 @@ public static io.airbyte.api.model.generated.NotificationSettings toApi(final io
}

private static io.airbyte.api.model.generated.NotificationItem toApi(final io.airbyte.config.NotificationItem notificationItem) {
return new io.airbyte.api.model.generated.NotificationItem()
var result = new io.airbyte.api.model.generated.NotificationItem()
.notificationType(notificationItem.getNotificationType().stream()
.map(notificationType -> Enums.convertTo(notificationType, io.airbyte.api.model.generated.NotificationType.class)).collect(
Collectors.toList()))
.slackConfiguration(toApi(notificationItem.getSlackConfiguration()));
Collectors.toList()));
if (notificationItem.getSlackConfiguration() != null) {
result.slackConfiguration(toApi(notificationItem.getSlackConfiguration()));
}
return result;
}

private static io.airbyte.api.model.generated.SlackNotificationConfiguration toApi(final io.airbyte.config.SlackNotificationConfiguration notification) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import io.airbyte.api.model.generated.DestinationRead;
import io.airbyte.api.model.generated.Geography;
import io.airbyte.api.model.generated.ListResourcesForWorkspacesRequestBody;
import io.airbyte.api.model.generated.NotificationItem;
import io.airbyte.api.model.generated.NotificationSettings;
import io.airbyte.api.model.generated.NotificationType;
import io.airbyte.api.model.generated.SlugRequestBody;
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.WorkspaceCreate;
Expand Down Expand Up @@ -103,6 +106,9 @@ public WorkspaceRead createWorkspace(final WorkspaceCreate workspaceCreate)
? Enums.convertTo(workspaceCreate.getDefaultGeography(), io.airbyte.config.Geography.class)
: io.airbyte.config.Geography.AUTO;

// NotificationSettings from input will be patched with default values.
final NotificationSettings notificationSettings = patchNotificationSettingsWithDefaultValue(workspaceCreate);

final StandardWorkspace workspace = new StandardWorkspace()
.withWorkspaceId(uuidSupplier.get())
.withCustomerId(uuidSupplier.get())
Expand All @@ -115,7 +121,7 @@ public WorkspaceRead createWorkspace(final WorkspaceCreate workspaceCreate)
.withDisplaySetupWizard(displaySetupWizard != null ? displaySetupWizard : false)
.withTombstone(false)
.withNotifications(NotificationConverter.toConfigList(workspaceCreate.getNotifications()))
.withNotificationSettings(NotificationSettingsConverter.toConfig(workspaceCreate.getNotificationSettings()))
.withNotificationSettings(NotificationSettingsConverter.toConfig(notificationSettings))
.withDefaultGeography(defaultGeography)
.withWebhookOperationConfigs(WorkspaceWebhookConfigsConverter.toPersistenceWrite(workspaceCreate.getWebhookConfigs(), uuidSupplier));

Expand Down Expand Up @@ -149,6 +155,38 @@ public void deleteWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody)
persistStandardWorkspace(persistedWorkspace);
}

private NotificationSettings patchNotificationSettingsWithDefaultValue(final WorkspaceCreate workspaceCreate) {
NotificationSettings notificationSettings = new NotificationSettings()
.sendOnSuccess(new NotificationItem().notificationType(List.of()))
.sendOnFailure(new NotificationItem().notificationType(List.of()))
.sendOnConnectionUpdate(new NotificationItem().addNotificationTypeItem(NotificationType.CUSTOMERIO))
.sendOnConnectionUpdateActionRequired(new NotificationItem().addNotificationTypeItem(NotificationType.CUSTOMERIO))
.sendOnSyncDisabled(new NotificationItem().addNotificationTypeItem(NotificationType.CUSTOMERIO))
.sendOnSyncDisabledWarning(new NotificationItem().addNotificationTypeItem(NotificationType.CUSTOMERIO));
if (workspaceCreate.getNotificationSettings() != null) {
NotificationSettings inputNotificationSettings = workspaceCreate.getNotificationSettings();
if (inputNotificationSettings.getSendOnSuccess() != null) {
notificationSettings.setSendOnSuccess(inputNotificationSettings.getSendOnSuccess());
}
if (inputNotificationSettings.getSendOnFailure() != null) {
notificationSettings.setSendOnFailure(inputNotificationSettings.getSendOnFailure());
}
if (inputNotificationSettings.getSendOnConnectionUpdate() != null) {
notificationSettings.setSendOnConnectionUpdate(inputNotificationSettings.getSendOnConnectionUpdate());
}
if (inputNotificationSettings.getSendOnConnectionUpdateActionRequired() != null) {
notificationSettings.setSendOnConnectionUpdateActionRequired(inputNotificationSettings.getSendOnConnectionUpdateActionRequired());
}
if (inputNotificationSettings.getSendOnSyncDisabled() != null) {
notificationSettings.setSendOnSyncDisabled(inputNotificationSettings.getSendOnSyncDisabled());
}
if (inputNotificationSettings.getSendOnSyncDisabledWarning() != null) {
notificationSettings.setSendOnSyncDisabledWarning(inputNotificationSettings.getSendOnSyncDisabledWarning());
}
}
return notificationSettings;
}

public WorkspaceReadList listWorkspaces() throws JsonValidationException, IOException {
final List<WorkspaceRead> reads = configRepository.listStandardWorkspaces(false).stream()
.map(WorkspacesHandler::buildWorkspaceRead)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,37 @@ private io.airbyte.api.model.generated.NotificationSettings generateApiNotificat
.webhook(FAILURE_NOTIFICATION_WEBHOOK)));
}

private io.airbyte.api.model.generated.NotificationSettings generateApiNotificationSettingsWithDefaultValue() {
return new io.airbyte.api.model.generated.NotificationSettings()
.sendOnFailure(
new io.airbyte.api.model.generated.NotificationItem().notificationType(List.of(io.airbyte.api.model.generated.NotificationType.SLACK))
.slackConfiguration(new io.airbyte.api.model.generated.SlackNotificationConfiguration()
.webhook(FAILURE_NOTIFICATION_WEBHOOK)))
.sendOnSuccess(new io.airbyte.api.model.generated.NotificationItem().notificationType(List.of()))
.sendOnConnectionUpdate(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem(
io.airbyte.api.model.generated.NotificationType.CUSTOMERIO))
.sendOnConnectionUpdateActionRequired(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem(
io.airbyte.api.model.generated.NotificationType.CUSTOMERIO))
.sendOnSyncDisabled(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem(
io.airbyte.api.model.generated.NotificationType.CUSTOMERIO))
.sendOnSyncDisabledWarning(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem(
io.airbyte.api.model.generated.NotificationType.CUSTOMERIO));
}

private io.airbyte.api.model.generated.NotificationSettings generateDefaultApiNotificationSettings() {
return new io.airbyte.api.model.generated.NotificationSettings()
.sendOnSuccess(new io.airbyte.api.model.generated.NotificationItem().notificationType(List.of()))
.sendOnFailure(new io.airbyte.api.model.generated.NotificationItem().notificationType(List.of()))
.sendOnConnectionUpdate(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem(
io.airbyte.api.model.generated.NotificationType.CUSTOMERIO))
.sendOnConnectionUpdateActionRequired(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem(
io.airbyte.api.model.generated.NotificationType.CUSTOMERIO))
.sendOnSyncDisabled(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem(
io.airbyte.api.model.generated.NotificationType.CUSTOMERIO))
.sendOnSyncDisabledWarning(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem(
io.airbyte.api.model.generated.NotificationType.CUSTOMERIO));
}

@Test
void testCreateWorkspace() throws JsonValidationException, IOException, ConfigNotFoundException {
workspace.withWebhookOperationConfigs(PERSISTED_WEBHOOK_CONFIGS);
Expand Down Expand Up @@ -190,13 +221,46 @@ void testCreateWorkspace() throws JsonValidationException, IOException, ConfigNo
.anonymousDataCollection(false)
.securityUpdates(false)
.notifications(List.of(generateApiNotification()))
.notificationSettings(generateApiNotificationSettings())
.notificationSettings(generateApiNotificationSettingsWithDefaultValue())
.defaultGeography(GEOGRAPHY_US)
.webhookConfigs(List.of(new WebhookConfigRead().id(uuid).name(TEST_NAME)));

assertEquals(expectedRead, actualRead);
}

@Test
void testCreateWorkspaceWithMinimumInput() throws JsonValidationException, IOException, ConfigNotFoundException {
when(configRepository.getStandardWorkspaceNoSecrets(any(), eq(false))).thenReturn(workspace);

final UUID uuid = UUID.randomUUID();
when(uuidSupplier.get()).thenReturn(uuid);

configRepository.writeStandardWorkspaceNoSecrets(workspace);

final WorkspaceCreate workspaceCreate = new WorkspaceCreate()
.name(NEW_WORKSPACE)
.email(TEST_EMAIL);

final WorkspaceRead actualRead = workspacesHandler.createWorkspace(workspaceCreate);
final WorkspaceRead expectedRead = new WorkspaceRead()
.workspaceId(actualRead.getWorkspaceId())
.customerId(actualRead.getCustomerId())
.email(TEST_EMAIL)
.name(NEW_WORKSPACE)
.slug(actualRead.getSlug())
.initialSetupComplete(false)
.displaySetupWizard(false)
.news(false)
.anonymousDataCollection(false)
.securityUpdates(false)
.notifications(List.of())
.notificationSettings(generateDefaultApiNotificationSettings())
.defaultGeography(GEOGRAPHY_AUTO)
.webhookConfigs(Collections.emptyList());

assertEquals(expectedRead, actualRead);
}

@Test
void testCreateWorkspaceDuplicateSlug() throws JsonValidationException, IOException, ConfigNotFoundException {
when(configRepository.getWorkspaceBySlugOptional(any(String.class), eq(true)))
Expand Down Expand Up @@ -231,7 +295,7 @@ void testCreateWorkspaceDuplicateSlug() throws JsonValidationException, IOExcept
.anonymousDataCollection(false)
.securityUpdates(false)
.notifications(Collections.emptyList())
.notificationSettings(new io.airbyte.api.model.generated.NotificationSettings())
.notificationSettings(generateDefaultApiNotificationSettings())
.defaultGeography(GEOGRAPHY_AUTO)
.webhookConfigs(Collections.emptyList());

Expand Down Expand Up @@ -626,7 +690,7 @@ void testWorkspaceIsWrittenThroughSecretsWriter() throws JsonValidationException
.anonymousDataCollection(false)
.securityUpdates(false)
.notifications(List.of(generateApiNotification()))
.notificationSettings(generateApiNotificationSettings())
.notificationSettings(generateApiNotificationSettingsWithDefaultValue())
.defaultGeography(GEOGRAPHY_US)
.webhookConfigs(Collections.emptyList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ public void writeStandardWorkspaceNoSecrets(final StandardWorkspace workspace) t
.set(WORKSPACE.DISPLAY_SETUP_WIZARD, workspace.getDisplaySetupWizard())
.set(WORKSPACE.TOMBSTONE, workspace.getTombstone() != null && workspace.getTombstone())
.set(WORKSPACE.NOTIFICATIONS, JSONB.valueOf(Jsons.serialize(workspace.getNotifications())))
.set(WORKSPACE.NOTIFICATION_SETTINGS, JSONB.valueOf(Jsons.serialize(workspace.getNotificationSettings())))
.set(WORKSPACE.FIRST_SYNC_COMPLETE, workspace.getFirstCompletedSync())
.set(WORKSPACE.FEEDBACK_COMPLETE, workspace.getFeedbackDone())
.set(WORKSPACE.CREATED_AT, timestamp)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import static io.airbyte.db.instance.DatabaseConstants.WORKSPACE_TABLE;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Iterators;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.Notification;
import io.airbyte.config.Notification.NotificationType;
import io.airbyte.config.NotificationItem;
import io.airbyte.config.NotificationSettings;
import io.airbyte.config.SlackNotificationConfiguration;
import java.util.List;
import java.util.UUID;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Set default value for sendOnSuccess/sendOnFailure for notificationColumn under workspace table.
* Backfill for null values for notificationColumn under workspace caused by a bug.
*/
public class V0_50_1_001__NotificationSettingsBackfill extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_50_1_001__NotificationSettingsBackfill.class);

private static final Field<UUID> ID_COLUMN = DSL.field("id", SQLDataType.UUID);
private static final Field<JSONB> NOTIFICATION_COLUMN = DSL.field("notifications", SQLDataType.JSONB);
private static final Field<JSONB> NOTIFICATION_SETTINGS_COLUMN = DSL.field("notification_settings", SQLDataType.JSONB);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
backfillNotificationSettings(ctx);
}

static void backfillNotificationSettings(final DSLContext ctx) {
final var workspaceWithNotificationSettings = ctx.select(ID_COLUMN, NOTIFICATION_COLUMN)
.from(WORKSPACE_TABLE)
.stream()
.toList();

workspaceWithNotificationSettings.forEach(workspaceRecord -> {
final UUID workspaceId = workspaceRecord.getValue(ID_COLUMN);
if (workspaceRecord.get(NOTIFICATION_COLUMN) == null) {
// This case does not exist in prod, but adding check to satisfy testing requirements.
LOGGER.warn("Workspace {} does not have notification column", workspaceId);
return;
}
final var originalNotificationList =
Jsons.deserialize(workspaceRecord.get(NOTIFICATION_COLUMN).data(), new TypeReference<List<Notification>>() {});
var notificationSettings = new NotificationSettings();
// By default the following notifactions are all sent via emails. At this moment customers do not
// have an option to turn
// it off.
notificationSettings.setSendOnConnectionUpdateActionRequired(new NotificationItem().withNotificationType(List.of(NotificationType.CUSTOMERIO)));
notificationSettings.setSendOnConnectionUpdate(new NotificationItem().withNotificationType(List.of(NotificationType.CUSTOMERIO)));
notificationSettings.setSendOnSyncDisabled(new NotificationItem().withNotificationType(List.of(NotificationType.CUSTOMERIO)));
notificationSettings.setSendOnSyncDisabledWarning(new NotificationItem().withNotificationType(List.of(NotificationType.CUSTOMERIO)));

// By default we do not send sendOnSuccess or sendOnFailure notifications.
notificationSettings.setSendOnSuccess(new NotificationItem().withNotificationType(List.of()));
notificationSettings.setSendOnFailure(new NotificationItem().withNotificationType(List.of()));

if (!originalNotificationList.isEmpty()) {
final var originalNotification = Iterators.getOnlyElement(originalNotificationList.listIterator());
final NotificationType notificationType = originalNotification.getNotificationType();
final SlackNotificationConfiguration slackConfiguration = originalNotification.getSlackConfiguration();

if (originalNotification.getSendOnFailure()) {
notificationSettings
.withSendOnFailure(new NotificationItem().withNotificationType(List.of(notificationType)).withSlackConfiguration(slackConfiguration));
}
if (originalNotification.getSendOnSuccess()) {
notificationSettings
.withSendOnSuccess(new NotificationItem().withNotificationType(List.of(notificationType)).withSlackConfiguration(slackConfiguration));
}
}

ctx.update(DSL.table(WORKSPACE_TABLE))
.set(NOTIFICATION_SETTINGS_COLUMN, JSONB.valueOf(Jsons.serialize(notificationSettings)))
.where(ID_COLUMN.eq(workspaceId))
.execute();
});
}

}
Loading

0 comments on commit 7af2ac3

Please sign in to comment.