From 7af2ac3f811eb2f9a7cbc165c882c1eb555d33c4 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Wed, 14 Jun 2023 13:56:56 -0700 Subject: [PATCH] backfill notification_settings - fix a bug (#7228) --- .../io/airbyte/bootloader/BootloaderTest.java | 2 +- .../NotificationSettingsConverter.java | 19 ++- .../server/handlers/WorkspacesHandler.java | 40 ++++- .../handlers/WorkspacesHandlerTest.java | 70 ++++++++- .../config/persistence/ConfigRepository.java | 1 + ...0_1_001__NotificationSettingsBackfill.java | 102 +++++++++++++ ...001__NotificationSettingsBackfillTest.java | 143 ++++++++++++++++++ .../airbyte/persistence/job/JobNotifier.java | 5 + 8 files changed, 371 insertions(+), 11 deletions(-) create mode 100644 airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_50_1_001__NotificationSettingsBackfill.java create mode 100644 airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_50_1_001__NotificationSettingsBackfillTest.java diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java index 80b79f18181..a0ca7275944 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java @@ -90,7 +90,7 @@ class BootloaderTest { // ⚠️ This line should change with every new migration to show that you meant to make a new // migration to the prod database - private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.44.5.004"; + private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.1.001"; private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.44.5.001"; private static final String CDK_VERSION = "1.2.3"; diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/NotificationSettingsConverter.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/NotificationSettingsConverter.java index a0e851ae98a..1a531ab7a5a 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/NotificationSettingsConverter.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/NotificationSettingsConverter.java @@ -43,11 +43,15 @@ public static io.airbyte.config.NotificationSettings toConfig(final io.airbyte.a // Currently customerIoConfiguration is an empty object, so we tend to keep it as null. private static io.airbyte.config.NotificationItem toConfig(final io.airbyte.api.model.generated.NotificationItem notificationItem) { - return new io.airbyte.config.NotificationItem() + io.airbyte.config.NotificationItem result = new io.airbyte.config.NotificationItem() .withNotificationType(notificationItem.getNotificationType().stream() .map(notificationType -> Enums.convertTo(notificationType, io.airbyte.config.Notification.NotificationType.class)).collect( - Collectors.toList())) - .withSlackConfiguration(toConfig(notificationItem.getSlackConfiguration())); + Collectors.toList())); + + if (notificationItem.getSlackConfiguration() != null) { + result.withSlackConfiguration(toConfig(notificationItem.getSlackConfiguration())); + } + return result; } private static io.airbyte.config.SlackNotificationConfiguration toConfig(final io.airbyte.api.model.generated.SlackNotificationConfiguration notification) { @@ -86,11 +90,14 @@ public static io.airbyte.api.model.generated.NotificationSettings toApi(final io } private static io.airbyte.api.model.generated.NotificationItem toApi(final io.airbyte.config.NotificationItem notificationItem) { - return new io.airbyte.api.model.generated.NotificationItem() + var result = new io.airbyte.api.model.generated.NotificationItem() .notificationType(notificationItem.getNotificationType().stream() .map(notificationType -> Enums.convertTo(notificationType, io.airbyte.api.model.generated.NotificationType.class)).collect( - Collectors.toList())) - .slackConfiguration(toApi(notificationItem.getSlackConfiguration())); + Collectors.toList())); + if (notificationItem.getSlackConfiguration() != null) { + result.slackConfiguration(toApi(notificationItem.getSlackConfiguration())); + } + return result; } private static io.airbyte.api.model.generated.SlackNotificationConfiguration toApi(final io.airbyte.config.SlackNotificationConfiguration notification) { diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WorkspacesHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WorkspacesHandler.java index 0d9ed1575f1..44b5d0e0331 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WorkspacesHandler.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WorkspacesHandler.java @@ -14,6 +14,9 @@ import io.airbyte.api.model.generated.DestinationRead; import io.airbyte.api.model.generated.Geography; import io.airbyte.api.model.generated.ListResourcesForWorkspacesRequestBody; +import io.airbyte.api.model.generated.NotificationItem; +import io.airbyte.api.model.generated.NotificationSettings; +import io.airbyte.api.model.generated.NotificationType; import io.airbyte.api.model.generated.SlugRequestBody; import io.airbyte.api.model.generated.SourceRead; import io.airbyte.api.model.generated.WorkspaceCreate; @@ -103,6 +106,9 @@ public WorkspaceRead createWorkspace(final WorkspaceCreate workspaceCreate) ? Enums.convertTo(workspaceCreate.getDefaultGeography(), io.airbyte.config.Geography.class) : io.airbyte.config.Geography.AUTO; + // NotificationSettings from input will be patched with default values. + final NotificationSettings notificationSettings = patchNotificationSettingsWithDefaultValue(workspaceCreate); + final StandardWorkspace workspace = new StandardWorkspace() .withWorkspaceId(uuidSupplier.get()) .withCustomerId(uuidSupplier.get()) @@ -115,7 +121,7 @@ public WorkspaceRead createWorkspace(final WorkspaceCreate workspaceCreate) .withDisplaySetupWizard(displaySetupWizard != null ? displaySetupWizard : false) .withTombstone(false) .withNotifications(NotificationConverter.toConfigList(workspaceCreate.getNotifications())) - .withNotificationSettings(NotificationSettingsConverter.toConfig(workspaceCreate.getNotificationSettings())) + .withNotificationSettings(NotificationSettingsConverter.toConfig(notificationSettings)) .withDefaultGeography(defaultGeography) .withWebhookOperationConfigs(WorkspaceWebhookConfigsConverter.toPersistenceWrite(workspaceCreate.getWebhookConfigs(), uuidSupplier)); @@ -149,6 +155,38 @@ public void deleteWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) persistStandardWorkspace(persistedWorkspace); } + private NotificationSettings patchNotificationSettingsWithDefaultValue(final WorkspaceCreate workspaceCreate) { + NotificationSettings notificationSettings = new NotificationSettings() + .sendOnSuccess(new NotificationItem().notificationType(List.of())) + .sendOnFailure(new NotificationItem().notificationType(List.of())) + .sendOnConnectionUpdate(new NotificationItem().addNotificationTypeItem(NotificationType.CUSTOMERIO)) + .sendOnConnectionUpdateActionRequired(new NotificationItem().addNotificationTypeItem(NotificationType.CUSTOMERIO)) + .sendOnSyncDisabled(new NotificationItem().addNotificationTypeItem(NotificationType.CUSTOMERIO)) + .sendOnSyncDisabledWarning(new NotificationItem().addNotificationTypeItem(NotificationType.CUSTOMERIO)); + if (workspaceCreate.getNotificationSettings() != null) { + NotificationSettings inputNotificationSettings = workspaceCreate.getNotificationSettings(); + if (inputNotificationSettings.getSendOnSuccess() != null) { + notificationSettings.setSendOnSuccess(inputNotificationSettings.getSendOnSuccess()); + } + if (inputNotificationSettings.getSendOnFailure() != null) { + notificationSettings.setSendOnFailure(inputNotificationSettings.getSendOnFailure()); + } + if (inputNotificationSettings.getSendOnConnectionUpdate() != null) { + notificationSettings.setSendOnConnectionUpdate(inputNotificationSettings.getSendOnConnectionUpdate()); + } + if (inputNotificationSettings.getSendOnConnectionUpdateActionRequired() != null) { + notificationSettings.setSendOnConnectionUpdateActionRequired(inputNotificationSettings.getSendOnConnectionUpdateActionRequired()); + } + if (inputNotificationSettings.getSendOnSyncDisabled() != null) { + notificationSettings.setSendOnSyncDisabled(inputNotificationSettings.getSendOnSyncDisabled()); + } + if (inputNotificationSettings.getSendOnSyncDisabledWarning() != null) { + notificationSettings.setSendOnSyncDisabledWarning(inputNotificationSettings.getSendOnSyncDisabledWarning()); + } + } + return notificationSettings; + } + public WorkspaceReadList listWorkspaces() throws JsonValidationException, IOException { final List reads = configRepository.listStandardWorkspaces(false).stream() .map(WorkspacesHandler::buildWorkspaceRead) diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/WorkspacesHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/WorkspacesHandlerTest.java index d3f368dc723..0b9b5c89313 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/WorkspacesHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/WorkspacesHandlerTest.java @@ -156,6 +156,37 @@ private io.airbyte.api.model.generated.NotificationSettings generateApiNotificat .webhook(FAILURE_NOTIFICATION_WEBHOOK))); } + private io.airbyte.api.model.generated.NotificationSettings generateApiNotificationSettingsWithDefaultValue() { + return new io.airbyte.api.model.generated.NotificationSettings() + .sendOnFailure( + new io.airbyte.api.model.generated.NotificationItem().notificationType(List.of(io.airbyte.api.model.generated.NotificationType.SLACK)) + .slackConfiguration(new io.airbyte.api.model.generated.SlackNotificationConfiguration() + .webhook(FAILURE_NOTIFICATION_WEBHOOK))) + .sendOnSuccess(new io.airbyte.api.model.generated.NotificationItem().notificationType(List.of())) + .sendOnConnectionUpdate(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem( + io.airbyte.api.model.generated.NotificationType.CUSTOMERIO)) + .sendOnConnectionUpdateActionRequired(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem( + io.airbyte.api.model.generated.NotificationType.CUSTOMERIO)) + .sendOnSyncDisabled(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem( + io.airbyte.api.model.generated.NotificationType.CUSTOMERIO)) + .sendOnSyncDisabledWarning(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem( + io.airbyte.api.model.generated.NotificationType.CUSTOMERIO)); + } + + private io.airbyte.api.model.generated.NotificationSettings generateDefaultApiNotificationSettings() { + return new io.airbyte.api.model.generated.NotificationSettings() + .sendOnSuccess(new io.airbyte.api.model.generated.NotificationItem().notificationType(List.of())) + .sendOnFailure(new io.airbyte.api.model.generated.NotificationItem().notificationType(List.of())) + .sendOnConnectionUpdate(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem( + io.airbyte.api.model.generated.NotificationType.CUSTOMERIO)) + .sendOnConnectionUpdateActionRequired(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem( + io.airbyte.api.model.generated.NotificationType.CUSTOMERIO)) + .sendOnSyncDisabled(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem( + io.airbyte.api.model.generated.NotificationType.CUSTOMERIO)) + .sendOnSyncDisabledWarning(new io.airbyte.api.model.generated.NotificationItem().addNotificationTypeItem( + io.airbyte.api.model.generated.NotificationType.CUSTOMERIO)); + } + @Test void testCreateWorkspace() throws JsonValidationException, IOException, ConfigNotFoundException { workspace.withWebhookOperationConfigs(PERSISTED_WEBHOOK_CONFIGS); @@ -190,13 +221,46 @@ void testCreateWorkspace() throws JsonValidationException, IOException, ConfigNo .anonymousDataCollection(false) .securityUpdates(false) .notifications(List.of(generateApiNotification())) - .notificationSettings(generateApiNotificationSettings()) + .notificationSettings(generateApiNotificationSettingsWithDefaultValue()) .defaultGeography(GEOGRAPHY_US) .webhookConfigs(List.of(new WebhookConfigRead().id(uuid).name(TEST_NAME))); assertEquals(expectedRead, actualRead); } + @Test + void testCreateWorkspaceWithMinimumInput() throws JsonValidationException, IOException, ConfigNotFoundException { + when(configRepository.getStandardWorkspaceNoSecrets(any(), eq(false))).thenReturn(workspace); + + final UUID uuid = UUID.randomUUID(); + when(uuidSupplier.get()).thenReturn(uuid); + + configRepository.writeStandardWorkspaceNoSecrets(workspace); + + final WorkspaceCreate workspaceCreate = new WorkspaceCreate() + .name(NEW_WORKSPACE) + .email(TEST_EMAIL); + + final WorkspaceRead actualRead = workspacesHandler.createWorkspace(workspaceCreate); + final WorkspaceRead expectedRead = new WorkspaceRead() + .workspaceId(actualRead.getWorkspaceId()) + .customerId(actualRead.getCustomerId()) + .email(TEST_EMAIL) + .name(NEW_WORKSPACE) + .slug(actualRead.getSlug()) + .initialSetupComplete(false) + .displaySetupWizard(false) + .news(false) + .anonymousDataCollection(false) + .securityUpdates(false) + .notifications(List.of()) + .notificationSettings(generateDefaultApiNotificationSettings()) + .defaultGeography(GEOGRAPHY_AUTO) + .webhookConfigs(Collections.emptyList()); + + assertEquals(expectedRead, actualRead); + } + @Test void testCreateWorkspaceDuplicateSlug() throws JsonValidationException, IOException, ConfigNotFoundException { when(configRepository.getWorkspaceBySlugOptional(any(String.class), eq(true))) @@ -231,7 +295,7 @@ void testCreateWorkspaceDuplicateSlug() throws JsonValidationException, IOExcept .anonymousDataCollection(false) .securityUpdates(false) .notifications(Collections.emptyList()) - .notificationSettings(new io.airbyte.api.model.generated.NotificationSettings()) + .notificationSettings(generateDefaultApiNotificationSettings()) .defaultGeography(GEOGRAPHY_AUTO) .webhookConfigs(Collections.emptyList()); @@ -626,7 +690,7 @@ void testWorkspaceIsWrittenThroughSecretsWriter() throws JsonValidationException .anonymousDataCollection(false) .securityUpdates(false) .notifications(List.of(generateApiNotification())) - .notificationSettings(generateApiNotificationSettings()) + .notificationSettings(generateApiNotificationSettingsWithDefaultValue()) .defaultGeography(GEOGRAPHY_US) .webhookConfigs(Collections.emptyList()); diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index d37acf18449..a88057d383e 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -368,6 +368,7 @@ public void writeStandardWorkspaceNoSecrets(final StandardWorkspace workspace) t .set(WORKSPACE.DISPLAY_SETUP_WIZARD, workspace.getDisplaySetupWizard()) .set(WORKSPACE.TOMBSTONE, workspace.getTombstone() != null && workspace.getTombstone()) .set(WORKSPACE.NOTIFICATIONS, JSONB.valueOf(Jsons.serialize(workspace.getNotifications()))) + .set(WORKSPACE.NOTIFICATION_SETTINGS, JSONB.valueOf(Jsons.serialize(workspace.getNotificationSettings()))) .set(WORKSPACE.FIRST_SYNC_COMPLETE, workspace.getFirstCompletedSync()) .set(WORKSPACE.FEEDBACK_COMPLETE, workspace.getFeedbackDone()) .set(WORKSPACE.CREATED_AT, timestamp) diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_50_1_001__NotificationSettingsBackfill.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_50_1_001__NotificationSettingsBackfill.java new file mode 100644 index 00000000000..42ccbf3046e --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_50_1_001__NotificationSettingsBackfill.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import static io.airbyte.db.instance.DatabaseConstants.WORKSPACE_TABLE; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Iterators; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.Notification; +import io.airbyte.config.Notification.NotificationType; +import io.airbyte.config.NotificationItem; +import io.airbyte.config.NotificationSettings; +import io.airbyte.config.SlackNotificationConfiguration; +import java.util.List; +import java.util.UUID; +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.JSONB; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Set default value for sendOnSuccess/sendOnFailure for notificationColumn under workspace table. + * Backfill for null values for notificationColumn under workspace caused by a bug. + */ +public class V0_50_1_001__NotificationSettingsBackfill extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger(V0_50_1_001__NotificationSettingsBackfill.class); + + private static final Field ID_COLUMN = DSL.field("id", SQLDataType.UUID); + private static final Field NOTIFICATION_COLUMN = DSL.field("notifications", SQLDataType.JSONB); + private static final Field NOTIFICATION_SETTINGS_COLUMN = DSL.field("notification_settings", SQLDataType.JSONB); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + // Warning: please do not use any jOOQ generated code to write a migration. + // As database schema changes, the generated jOOQ code can be deprecated. So + // old migration may not compile if there is any generated code. + final DSLContext ctx = DSL.using(context.getConnection()); + backfillNotificationSettings(ctx); + } + + static void backfillNotificationSettings(final DSLContext ctx) { + final var workspaceWithNotificationSettings = ctx.select(ID_COLUMN, NOTIFICATION_COLUMN) + .from(WORKSPACE_TABLE) + .stream() + .toList(); + + workspaceWithNotificationSettings.forEach(workspaceRecord -> { + final UUID workspaceId = workspaceRecord.getValue(ID_COLUMN); + if (workspaceRecord.get(NOTIFICATION_COLUMN) == null) { + // This case does not exist in prod, but adding check to satisfy testing requirements. + LOGGER.warn("Workspace {} does not have notification column", workspaceId); + return; + } + final var originalNotificationList = + Jsons.deserialize(workspaceRecord.get(NOTIFICATION_COLUMN).data(), new TypeReference>() {}); + var notificationSettings = new NotificationSettings(); + // By default the following notifactions are all sent via emails. At this moment customers do not + // have an option to turn + // it off. + notificationSettings.setSendOnConnectionUpdateActionRequired(new NotificationItem().withNotificationType(List.of(NotificationType.CUSTOMERIO))); + notificationSettings.setSendOnConnectionUpdate(new NotificationItem().withNotificationType(List.of(NotificationType.CUSTOMERIO))); + notificationSettings.setSendOnSyncDisabled(new NotificationItem().withNotificationType(List.of(NotificationType.CUSTOMERIO))); + notificationSettings.setSendOnSyncDisabledWarning(new NotificationItem().withNotificationType(List.of(NotificationType.CUSTOMERIO))); + + // By default we do not send sendOnSuccess or sendOnFailure notifications. + notificationSettings.setSendOnSuccess(new NotificationItem().withNotificationType(List.of())); + notificationSettings.setSendOnFailure(new NotificationItem().withNotificationType(List.of())); + + if (!originalNotificationList.isEmpty()) { + final var originalNotification = Iterators.getOnlyElement(originalNotificationList.listIterator()); + final NotificationType notificationType = originalNotification.getNotificationType(); + final SlackNotificationConfiguration slackConfiguration = originalNotification.getSlackConfiguration(); + + if (originalNotification.getSendOnFailure()) { + notificationSettings + .withSendOnFailure(new NotificationItem().withNotificationType(List.of(notificationType)).withSlackConfiguration(slackConfiguration)); + } + if (originalNotification.getSendOnSuccess()) { + notificationSettings + .withSendOnSuccess(new NotificationItem().withNotificationType(List.of(notificationType)).withSlackConfiguration(slackConfiguration)); + } + } + + ctx.update(DSL.table(WORKSPACE_TABLE)) + .set(NOTIFICATION_SETTINGS_COLUMN, JSONB.valueOf(Jsons.serialize(notificationSettings))) + .where(ID_COLUMN.eq(workspaceId)) + .execute(); + }); + } + +} diff --git a/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_50_1_001__NotificationSettingsBackfillTest.java b/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_50_1_001__NotificationSettingsBackfillTest.java new file mode 100644 index 00000000000..33312cb3bdd --- /dev/null +++ b/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_50_1_001__NotificationSettingsBackfillTest.java @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.Notification.NotificationType; +import io.airbyte.config.NotificationItem; +import io.airbyte.config.NotificationSettings; +import io.airbyte.config.SlackNotificationConfiguration; +import io.airbyte.db.factory.FlywayFactory; +import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest; +import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; +import io.airbyte.db.instance.development.DevDatabaseMigrator; +import java.util.List; +import java.util.UUID; +import org.flywaydb.core.Flyway; +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.jooq.DSLContext; +import org.jooq.JSONB; +import org.jooq.Record; +import org.jooq.impl.DSL; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.skyscreamer.jsonassert.JSONAssert; + +@SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert") +class V0_50_1_001__NotificationSettingsBackfillTest extends AbstractConfigsDatabaseTest { + + @BeforeEach + void beforeEach() { + final Flyway flyway = + FlywayFactory.create(dataSource, "V0_50_1_001__NotificationSettingsBackfillTest", ConfigsDatabaseMigrator.DB_IDENTIFIER, + ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); + final ConfigsDatabaseMigrator configsDbMigrator = new ConfigsDatabaseMigrator(database, flyway); + + final BaseJavaMigration previousMigration = new V0_50_1_001__NotificationSettingsBackfill(); + final DevDatabaseMigrator devConfigsDbMigrator = new DevDatabaseMigrator(configsDbMigrator, previousMigration.getVersion()); + devConfigsDbMigrator.createBaseline(); + } + + @Test + void testMigrateEmptyValues() throws Exception { + final DSLContext ctx = getDslContext(); + + // Insert data to workspace + + final UUID workspaceId = UUID.randomUUID(); + + ctx.insertInto(DSL.table("workspace")) + .columns( + DSL.field("id"), + DSL.field("name"), + DSL.field("slug"), + DSL.field("initial_setup_complete"), + DSL.field("notifications")) + .values( + workspaceId, + "name1", + "default", + true, + JSONB.valueOf("[]")) + .execute(); + + V0_50_1_001__NotificationSettingsBackfill.backfillNotificationSettings(ctx); + + final String result = fetchNotificationSettingsData(ctx, workspaceId); + + final NotificationSettings expectedNotification = new NotificationSettings() + .withSendOnSuccess(new NotificationItem() + .withNotificationType(List.of())) + .withSendOnFailure(new NotificationItem() + .withNotificationType(List.of())) + .withSendOnConnectionUpdate(new NotificationItem() + .withNotificationType(List.of(NotificationType.CUSTOMERIO))) + .withSendOnConnectionUpdateActionRequired(new NotificationItem() + .withNotificationType(List.of(NotificationType.CUSTOMERIO))) + .withSendOnSyncDisabled(new NotificationItem() + .withNotificationType(List.of(NotificationType.CUSTOMERIO))) + .withSendOnSyncDisabledWarning(new NotificationItem() + .withNotificationType(List.of(NotificationType.CUSTOMERIO))); + + String expectedNotificationJson = Jsons.serialize(expectedNotification); + + JSONAssert.assertEquals(expectedNotificationJson, result, /* strict= */ true); + } + + @Test + void testMigrateSlackConfigs() throws Exception { + final DSLContext ctx = getDslContext(); + + // Insert data to workspace + final UUID workspaceId = UUID.randomUUID(); + + ctx.insertInto(DSL.table("workspace")) + .columns( + DSL.field("id"), + DSL.field("name"), + DSL.field("slug"), + DSL.field("initial_setup_complete"), + DSL.field("notifications")) + .values( + workspaceId, + "name1", + "default", + true, + JSONB.valueOf( + "[{\"sendOnFailure\": true, \"sendOnSuccess\": false, \"notificationType\": \"slack\", \"slackConfiguration\": {\"webhook\": \"https://hooks.slack.com/testtesttest\"}}]")) + .execute(); + + V0_50_1_001__NotificationSettingsBackfill.backfillNotificationSettings(ctx); + + final String result = fetchNotificationSettingsData(ctx, workspaceId); + final NotificationSettings expectedNotification = new NotificationSettings() + .withSendOnSuccess(new NotificationItem() + .withNotificationType(List.of())) + .withSendOnFailure(new NotificationItem() + .withNotificationType(List.of(NotificationType.SLACK)) + .withSlackConfiguration(new SlackNotificationConfiguration().withWebhook("https://hooks.slack.com/testtesttest"))) + .withSendOnConnectionUpdate(new NotificationItem() + .withNotificationType(List.of(NotificationType.CUSTOMERIO))) + .withSendOnConnectionUpdateActionRequired(new NotificationItem() + .withNotificationType(List.of(NotificationType.CUSTOMERIO))) + .withSendOnSyncDisabled(new NotificationItem() + .withNotificationType(List.of(NotificationType.CUSTOMERIO))) + .withSendOnSyncDisabledWarning(new NotificationItem() + .withNotificationType(List.of(NotificationType.CUSTOMERIO))); + + String expectedNotificationJson = Jsons.serialize(expectedNotification); + + JSONAssert.assertEquals(expectedNotificationJson, result, /* strict= */ true); + } + + protected static String fetchNotificationSettingsData(final DSLContext ctx, final UUID id) { + final Record record = ctx.fetchOne(DSL.select(DSL.field("notification_settings", JSONB.class)) + .from("workspace") + .where(DSL.field("id").eq(id))); + + return record.get("notification_settings", JSONB.class).data(); + } + +} diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobNotifier.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobNotifier.java index 6625356ab21..4a2e2c834b2 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobNotifier.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobNotifier.java @@ -226,6 +226,11 @@ protected NotificationClient getNotificationClient(final Notification notificati private void sendNotification(final NotificationItem notificationItem, ThrowingFunction executeNotification) { + if (notificationItem == null) { + // Note: we may be able to implement a log notifier to log notification message only. + LOGGER.info("No notification item found for the desired notification event found. Skipping notification."); + return; + } final List notificationClients = getNotificationClientsFromNotificationItem(notificationItem); for (final NotificationClient notificationClient : notificationClients) { try {