From 8fad5beb411fa21abe7e0d12a87d875ddef91c36 Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Mon, 18 Jul 2022 17:40:05 +0300 Subject: [PATCH 1/9] Implementation --- docker/injector/inject.py | 9 +- docker/injector/start.sh | 0 gradle/libs.versions.toml | 2 + odd-platform-api/build.gradle | 1 + .../config/ODDPlatformConfiguration.java | 2 +- .../oddplatform/dto/alert/AlertTypeEnum.java | 29 ++- .../notification/NotificationSubscriber.java | 91 +++++---- .../NotificationSubscriberStarter.java | 5 +- .../notification/PGConnectionFactory.java | 2 +- .../config/NotificationConfiguration.java | 76 ++++++++ .../{ => config}/NotificationsProperties.java | 2 +- .../dto/AlertNotificationMessage.java | 35 ++++ .../notification/dto/DecodedWALMessage.java | 23 +++ .../notification/dto/NotificationMessage.java | 5 + .../NotificationSubscriberException.java | 15 ++ .../AlertNotificationMessageProcessor.java | 31 ++- .../AlertSlackNotificationMessageBuilder.java | 99 ---------- .../processor/NotificationMessageBuilder.java | 7 - .../PostgresWALMessageProcessor.java | 2 +- .../processor/message/EmojiUtils.java | 18 ++ .../processor/message/MarkdownUtils.java | 18 ++ .../SlackNotificationMessageGenerator.java | 77 ++++++++ .../processor/webhook/SlackWebhookSender.java | 33 ---- .../processor/webhook/WebhookSender.java | 5 - .../sender/NotificationSender.java | 9 + .../sender/SlackNotificationSender.java | 65 +++++++ .../sender/WebhookNotificationSender.java | 31 +++ .../AlertNotificationMessageTranslator.java | 177 ++++++++++++++++++ .../NotificationMessageTranslator.java | 8 + .../notification/wal/DecodedWALMessage.java | 13 -- .../wal/PostgresWALMessageDecoder.java | 9 +- .../repository/util/JooqRecordHelper.java | 38 ++-- .../oddplatform/utils/JSONSerDeUtils.java | 9 + .../oddplatform/utils/Pair.java | 10 +- .../src/main/resources/application.yml | 13 +- 35 files changed, 734 insertions(+), 235 deletions(-) mode change 100644 => 100755 docker/injector/start.sh create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java rename odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/{ => config}/NotificationsProperties.java (87%) create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/DecodedWALMessage.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/NotificationMessage.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/exception/NotificationSubscriberException.java delete mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertSlackNotificationMessageBuilder.java delete mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/NotificationMessageBuilder.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/EmojiUtils.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/MarkdownUtils.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java delete mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/SlackWebhookSender.java delete mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/WebhookSender.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/NotificationSender.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/SlackNotificationSender.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/WebhookNotificationSender.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/NotificationMessageTranslator.java delete mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/DecodedWALMessage.java diff --git a/docker/injector/inject.py b/docker/injector/inject.py index 7997b46ff..31e4e5329 100644 --- a/docker/injector/inject.py +++ b/docker/injector/inject.py @@ -1,14 +1,16 @@ import glob import json import os +import requests import time from typing import Union, Dict, Any, Tuple, List -import requests - REACH_TRIES_NUMBER = 20 APP_PATH = os.getenv("APP_PATH") or "." +platform_host_url = os.environ["PLATFORM_HOST_URL"] +print(f"Platform host url: {platform_host_url}") + def read_sample_json(json_filename: str) -> Tuple[str, Dict[str, Any]]: with open(json_filename, "r") as f: @@ -45,9 +47,6 @@ def inject_data(data: Dict[str, Any], token: str): raise Exception(f"Couldn't inject data for {data['data_source_oddrn']}") -platform_host_url = os.environ["PLATFORM_HOST_URL"] -print(f"Platform host url: {platform_host_url}") - data_sources_grouped = {ds["oddrn"]: ds for ds in read_datasources_json()} ingestion_samples_grouped = { diff --git a/docker/injector/start.sh b/docker/injector/start.sh old mode 100644 new mode 100755 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index b0b740bce..52c6112f4 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -23,6 +23,7 @@ springfox-core = '3.0.0' mapstruct = '1.4.2.Final' opentelemetry = '1.6.0' opentelemetry-alpha = '1.6.0-alpha' +slack-api-model = '1.24.0' lombok = '1.18.24' testcontainers = '1.17.1' slf4j-api = '1.7.30' @@ -71,6 +72,7 @@ opentelemetry-api = { module = 'io.opentelemetry:opentelemetry-api', version.ref opentelemetry-api-metrics = { module = 'io.opentelemetry:opentelemetry-api-metrics', version.ref = 'opentelemetry-alpha' } opentelemetry-sdk-metrics = { module = 'io.opentelemetry:opentelemetry-sdk-metrics', version.ref = 'opentelemetry-alpha' } opentelemetry-exporter-otlp-metrics = { module = 'io.opentelemetry:opentelemetry-exporter-otlp-metrics', version.ref = 'opentelemetry-alpha' } +slack-api-model = { module = 'com.slack.api:slack-api-model', version.ref = 'slack-api-model' } lombok = { module = 'org.projectlombok:lombok', version.ref = 'lombok' } slf4j-api = { module = 'org.slf4j:slf4j-api', version.ref = 'slf4j-api' } jul-to-slf4j = { module = 'org.slf4j:jul-to-slf4j', version.ref = 'slf4j-api' } diff --git a/odd-platform-api/build.gradle b/odd-platform-api/build.gradle index 3abbc3c5d..a2a9e6507 100644 --- a/odd-platform-api/build.gradle +++ b/odd-platform-api/build.gradle @@ -34,6 +34,7 @@ dependencies { implementation libs.jackson.annotations implementation libs.jetbrains.annotations implementation libs.mapstruct + implementation libs.slack.api.model compileOnly libs.lombok annotationProcessor libs.lombok diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/ODDPlatformConfiguration.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/ODDPlatformConfiguration.java index 676d966fa..a33fe0006 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/ODDPlatformConfiguration.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/ODDPlatformConfiguration.java @@ -1,7 +1,7 @@ package org.opendatadiscovery.oddplatform.config; import org.opendatadiscovery.oddplatform.config.properties.MetricExporterProperties; -import org.opendatadiscovery.oddplatform.notification.NotificationsProperties; +import org.opendatadiscovery.oddplatform.notification.config.NotificationsProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity; diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/alert/AlertTypeEnum.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/alert/AlertTypeEnum.java index f2116802f..0bce97b00 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/alert/AlertTypeEnum.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/alert/AlertTypeEnum.java @@ -1,8 +1,29 @@ package org.opendatadiscovery.oddplatform.dto.alert; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import static java.util.function.Function.identity; + +@RequiredArgsConstructor public enum AlertTypeEnum { - DISTRIBUTION_ANOMALY, - BACKWARDS_INCOMPATIBLE_SCHEMA, - FAILED_DQ_TEST, - FAILED_JOB + DISTRIBUTION_ANOMALY("Distribution anomaly"), + BACKWARDS_INCOMPATIBLE_SCHEMA("Backwards incompatible schema"), + FAILED_DQ_TEST("Failed data quality test run"), + FAILED_JOB("Failed job run"); + + @Getter + private final String description; + + private static final Map DICT = Arrays + .stream(values()) + .collect(Collectors.toMap(AlertTypeEnum::name, identity())); + + public static Optional getByName(final String name) { + return Optional.ofNullable(DICT.get(name)); + } } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java index 12eb5b5af..51889ef14 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java @@ -5,6 +5,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -12,7 +13,8 @@ import lombok.extern.slf4j.Slf4j; import org.jooq.Table; import org.opendatadiscovery.oddplatform.model.Tables; -import org.opendatadiscovery.oddplatform.notification.NotificationsProperties.WalProperties; +import org.opendatadiscovery.oddplatform.notification.config.NotificationsProperties.WalProperties; +import org.opendatadiscovery.oddplatform.notification.exception.NotificationSubscriberException; import org.opendatadiscovery.oddplatform.notification.processor.PostgresWALMessageProcessor; import org.opendatadiscovery.oddplatform.notification.wal.PostgresWALMessageDecoder; import org.postgresql.PGConnection; @@ -38,14 +40,11 @@ public void run() { "publication_names", walProperties.getPublicationName() )); - while (true) { + while (!Thread.interrupted()) { final Connection replicationConnection = connectionFactory.getConnection(true); try { - replicationConnection.createStatement() - .execute("SELECT pg_advisory_lock(%d)".formatted(walProperties.getAdvisoryLockId())); - - log.debug("Acquired advisory lock on id = {}", walProperties.getAdvisoryLockId()); + acquireAdvisoryLock(replicationConnection); final PGConnection pgReplicationConnection = replicationConnection.unwrap(PGConnection.class); @@ -61,6 +60,12 @@ public void run() { try (final PGReplicationStream stream = streamBuilder.start()) { while (true) { + if (Thread.interrupted()) { + log.warn("Notification subscriber thread interrupted while processing WAL messages"); + Thread.currentThread().interrupt(); + return; + } + final ByteBuffer buffer = stream.readPending(); if (buffer == null) { @@ -77,8 +82,11 @@ public void run() { stream.setFlushedLSN(stream.getLastReceiveLSN()); } } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new NotificationSubscriberException(e); } catch (final Exception e) { - log.error("Error occurred while subscribing: {}", e); + log.error("Error occurred while subscribing", e); } finally { try { replicationConnection.close(); @@ -91,33 +99,43 @@ public void run() { try { TimeUnit.SECONDS.sleep(10L); } catch (final InterruptedException e) { - log.error("Error while sleeping", e); - throw new RuntimeException(e); + Thread.currentThread().interrupt(); + throw new NotificationSubscriberException(e); } } } + private void acquireAdvisoryLock(final Connection replicationConnection) throws SQLException { + try (final Statement advisoryLockStatement = replicationConnection.createStatement()) { + advisoryLockStatement.execute( + "SELECT pg_advisory_lock(%d)".formatted(walProperties.getAdvisoryLockId())); + } + + log.debug("Acquired advisory lock on id = {}", walProperties.getAdvisoryLockId()); + } + private void registerReplicationSlot(final Connection connection, final PGConnection replicationConnection) throws SQLException { final String existsQuery = "SELECT EXISTS (SELECT slot_name FROM pg_replication_slots WHERE slot_name = ?)"; - final PreparedStatement statement = connection.prepareStatement(existsQuery); - statement.setString(1, walProperties.getReplicationSlotName()); - - try (final ResultSet resultSet = statement.executeQuery()) { - resultSet.next(); - if (!resultSet.getBoolean(1)) { - log.debug("Creating replication slot with name {}", walProperties.getReplicationSlotName()); - replicationConnection.getReplicationAPI() - .createReplicationSlot() - .logical() - .withSlotName(walProperties.getReplicationSlotName()) - .withOutputPlugin(PG_REPLICATION_OUTPUT_PLUGIN) - .make(); + try (final PreparedStatement statement = connection.prepareStatement(existsQuery)) { + statement.setString(1, walProperties.getReplicationSlotName()); + + try (final ResultSet resultSet = statement.executeQuery()) { + resultSet.next(); + if (!resultSet.getBoolean(1)) { + log.debug("Creating replication slot with name {}", walProperties.getReplicationSlotName()); + replicationConnection.getReplicationAPI() + .createReplicationSlot() + .logical() + .withSlotName(walProperties.getReplicationSlotName()) + .withOutputPlugin(PG_REPLICATION_OUTPUT_PLUGIN) + .make(); + } } } - log.debug("Replication slot {} registed", walProperties.getReplicationSlotName()); + log.debug("Replication slot {} registered", walProperties.getReplicationSlotName()); } private void registerPublication(final Connection connection, final Table targetTable) throws SQLException { @@ -127,24 +145,25 @@ private void registerPublication(final Connection connection, final Table tar final String existsQuery = "SELECT EXISTS (SELECT oid FROM pg_publication WHERE pubname = ?)"; - final PreparedStatement existsStatement = connection.prepareStatement(existsQuery); - existsStatement.setString(1, walProperties.getPublicationName()); + try (final PreparedStatement existsStatement = connection.prepareStatement(existsQuery)) { + existsStatement.setString(1, walProperties.getPublicationName()); - try (final ResultSet resultSet = existsStatement.executeQuery()) { - resultSet.next(); - if (!resultSet.getBoolean(1)) { - // PostgreSQL tables are always in a schema, so we don't need to check targetTable.getSchema() for null - final String tableName = - String.format("%s.%s", targetTable.getSchema().getName(), targetTable.getName()); + try (final ResultSet resultSet = existsStatement.executeQuery()) { + resultSet.next(); + if (!resultSet.getBoolean(1)) { + // PostgreSQL tables are always in a schema, + // so we don't need to check targetTable.getSchema() for null + final String tableName = + String.format("%s.%s", targetTable.getSchema().getName(), targetTable.getName()); - log.debug("Creating publication with name {} for table {}", - walProperties.getPublicationName(), tableName); + log.debug("Creating publication with name {} for table {}", + walProperties.getPublicationName(), tableName); - connection.createStatement().execute( - "CREATE PUBLICATION %s FOR TABLE %s".formatted(walProperties.getPublicationName(), tableName)); + connection.createStatement().execute( + "CREATE PUBLICATION %s FOR TABLE %s".formatted(walProperties.getPublicationName(), tableName)); + } } } - log.debug("Publication {} registered", walProperties.getPublicationName()); } } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java index 712a51c1b..6a5f04891 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java @@ -4,6 +4,7 @@ import java.util.concurrent.Executors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.opendatadiscovery.oddplatform.notification.config.NotificationsProperties; import org.opendatadiscovery.oddplatform.notification.processor.AlertNotificationMessageProcessor; import org.opendatadiscovery.oddplatform.notification.wal.PostgresWALMessageDecoder; import org.springframework.boot.context.event.ApplicationReadyEvent; @@ -14,7 +15,9 @@ @RequiredArgsConstructor @Slf4j public class NotificationSubscriberStarter { - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final ExecutorService executorService = Executors.newSingleThreadExecutor( + r -> new Thread(r, "notification-subscriber-thread") + ); private final PGConnectionFactory pgConnectionFactory; private final PostgresWALMessageDecoder messageDecoder; diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/PGConnectionFactory.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/PGConnectionFactory.java index 6f8b39035..000d5a179 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/PGConnectionFactory.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/PGConnectionFactory.java @@ -27,7 +27,7 @@ public Connection getConnection(final boolean replicationMode) { PGProperty.USER.set(props, dataSourceProperties.getUsername()); PGProperty.PASSWORD.set(props, dataSourceProperties.getPassword()); if (replicationMode) { - PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "13.2"); + PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "11.0"); PGProperty.REPLICATION.set(props, "database"); PGProperty.PREFER_QUERY_MODE.set(props, "simple"); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java new file mode 100644 index 000000000..cb5268954 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java @@ -0,0 +1,76 @@ +package org.opendatadiscovery.oddplatform.notification.config; + +import java.net.URI; +import java.net.URL; +import lombok.extern.slf4j.Slf4j; +import org.jooq.DSLContext; +import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; +import org.opendatadiscovery.oddplatform.notification.processor.message.SlackNotificationMessageGenerator; +import org.opendatadiscovery.oddplatform.notification.sender.NotificationSender; +import org.opendatadiscovery.oddplatform.notification.sender.SlackNotificationSender; +import org.opendatadiscovery.oddplatform.notification.sender.WebhookNotificationSender; +import org.opendatadiscovery.oddplatform.notification.translator.AlertNotificationMessageTranslator; +import org.opendatadiscovery.oddplatform.notification.translator.NotificationMessageTranslator; +import org.opendatadiscovery.oddplatform.repository.util.JooqRecordHelper; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.function.client.WebClient; + +@Configuration +@ConditionalOnProperty(value = "notifications.enabled", havingValue = "true") +@Slf4j +public class NotificationConfiguration { + @Bean + public WebClient webClient() { + return WebClient.create(); + } + + @Bean + @ConditionalOnProperty(name = "notifications.receivers.slack.url") + public NotificationSender slackNotificationSender( + @Value("${notifications.receivers.slack.url}") final URI slackWebhookUrl, + final WebClient webClient, + final SlackNotificationMessageGenerator messageGenerator + ) { + if (slackWebhookUrl.toString().isEmpty()) { + throw new IllegalArgumentException("Slack webhook URL is empty"); + } + + return new SlackNotificationSender(webClient, slackWebhookUrl, messageGenerator); + } + + @Bean + @ConditionalOnProperty(name = "notifications.receivers.webhook.url") + public NotificationSender webhookNotificationSender( + @Value("${notifications.receivers.webhook.url}") final URI webhookUrl, + final WebClient webClient + ) { + if (webhookUrl.toString().isEmpty()) { + throw new IllegalArgumentException("Webhook URL is empty"); + } + + return new WebhookNotificationSender(webClient, webhookUrl); + } + + @Bean + public NotificationMessageTranslator alertNotificationMessageTranslator( + @Value("${notifications.message.downstream-entities-depth}") final int downstreamEntitiesDepth, + final DSLContext dslContext, + final JooqRecordHelper jooqRecordHelper + ) { + if (downstreamEntitiesDepth < 0) { + throw new IllegalArgumentException("Downstream entities depth is negative"); + } + + return new AlertNotificationMessageTranslator(dslContext, jooqRecordHelper, downstreamEntitiesDepth); + } + + @Bean + public SlackNotificationMessageGenerator slackNotificationMessageGenerator( + @Value("${notifications.receivers.slack.platform-base-url}") final URL platformBaseUrl + ) { + return new SlackNotificationMessageGenerator(platformBaseUrl); + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationsProperties.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationsProperties.java similarity index 87% rename from odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationsProperties.java rename to odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationsProperties.java index 1c6a68ed2..6f1ee0102 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationsProperties.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationsProperties.java @@ -1,4 +1,4 @@ -package org.opendatadiscovery.oddplatform.notification; +package org.opendatadiscovery.oddplatform.notification.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java new file mode 100644 index 000000000..36e1aabd2 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java @@ -0,0 +1,35 @@ +package org.opendatadiscovery.oddplatform.notification.dto; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Set; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import org.opendatadiscovery.oddplatform.dto.alert.AlertTypeEnum; +import org.opendatadiscovery.oddplatform.utils.Pair; + +@NoArgsConstructor +@AllArgsConstructor +@Builder +@Getter +@ToString +public class AlertNotificationMessage extends NotificationMessage { + private String alertDescription; + private AlertTypeEnum alertType; + private AlertEventType eventType; + private LocalDateTime eventAt; + private AlertedDataEntity dataEntity; + private List downstream; + + public record AlertedDataEntity(long id, String name, Set> owners) { + } + + public enum AlertEventType { + CREATED, + RESOLVED, + REOPENED + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/DecodedWALMessage.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/DecodedWALMessage.java new file mode 100644 index 000000000..91198faaf --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/DecodedWALMessage.java @@ -0,0 +1,23 @@ +package org.opendatadiscovery.oddplatform.notification.dto; + +import java.util.Map; + +public record DecodedWALMessage(int relationId, Operation operation, Map columns) { + public enum Operation { + INSERT, + UPDATE, + } + + public record Column(String name, String type, String valueAsString) { + } + + public String getColumnValue(final String columnName) { + final Column column = this.columns.get(columnName); + + if (column == null) { + throw new IllegalArgumentException("Column %s has not been found".formatted(columnName)); + } + + return column.valueAsString(); + } +} \ No newline at end of file diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/NotificationMessage.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/NotificationMessage.java new file mode 100644 index 000000000..df85af065 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/NotificationMessage.java @@ -0,0 +1,5 @@ +package org.opendatadiscovery.oddplatform.notification.dto; + +public abstract class NotificationMessage { +} + diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/exception/NotificationSubscriberException.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/exception/NotificationSubscriberException.java new file mode 100644 index 000000000..4c0e43b1a --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/exception/NotificationSubscriberException.java @@ -0,0 +1,15 @@ +package org.opendatadiscovery.oddplatform.notification.exception; + +public class NotificationSubscriberException extends RuntimeException { + public NotificationSubscriberException(final String message) { + super(message); + } + + public NotificationSubscriberException(final String message, final Throwable cause) { + super(message, cause); + } + + public NotificationSubscriberException(final Throwable cause) { + super(cause); + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java index d9354a251..80c0bf02e 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java @@ -1,20 +1,37 @@ package org.opendatadiscovery.oddplatform.notification.processor; +import java.util.List; import lombok.RequiredArgsConstructor; -import org.opendatadiscovery.oddplatform.notification.NotificationsProperties; -import org.opendatadiscovery.oddplatform.notification.processor.webhook.WebhookSender; -import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage; +import lombok.extern.slf4j.Slf4j; +import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; +import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage; +import org.opendatadiscovery.oddplatform.notification.translator.NotificationMessageTranslator; +import org.opendatadiscovery.oddplatform.notification.sender.NotificationSender; import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor +@Slf4j public class AlertNotificationMessageProcessor implements PostgresWALMessageProcessor { - private final WebhookSender webhookSender; - private final NotificationsProperties notificationsProperties; - private final NotificationMessageBuilder messageBuilder; + private final List> notificationSenders; + private final NotificationMessageTranslator messageEnricher; @Override public void process(final DecodedWALMessage message) { - webhookSender.send(notificationsProperties.getWebhookUrl(), messageBuilder.build(message)); + final AlertNotificationMessage notificationMessage = messageEnricher.translate(message); + + notificationSenders.forEach(sender -> sendMessage(sender, notificationMessage)); + } + + private void sendMessage(final NotificationSender notificationSender, + final AlertNotificationMessage message) { + log.debug("Sending notification message via {}: {}", notificationSender.receiverId(), message); + + try { + notificationSender.send(message); + } catch (final Exception e) { + log.error(String.format( + "Error occurred while sending notification via %s", notificationSender.receiverId()), e); + } } } \ No newline at end of file diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertSlackNotificationMessageBuilder.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertSlackNotificationMessageBuilder.java deleted file mode 100644 index b34d8cdb0..000000000 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertSlackNotificationMessageBuilder.java +++ /dev/null @@ -1,99 +0,0 @@ -package org.opendatadiscovery.oddplatform.notification.processor; - -import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage; -import org.springframework.stereotype.Component; - -import static org.opendatadiscovery.oddplatform.model.Tables.ALERT; - -@Component -public class AlertSlackNotificationMessageBuilder implements NotificationMessageBuilder { - private static final String INSERT_TEMPLATE = """ - :warning: - *Warning!* Alert has just been created! - - *Alert type*: - %s - - *Description*: - %s - - *Data Entity ODDRN*: - %s - - *Alert ID*: - %d - - """; - - private static final String UPDATE_TEMPLATE_WITH_UPDATED_BY = """ - %s - Alert status has just been updated - - *Alert type*: - %s - - *Description*: - %s - - *Data Entity ODDRN*: - %s - - *New status*: - %s - - *Updated By*: - %s - - *Alert ID*: - %d - - """; - - private static final String UPDATE_TEMPLATE_WITHOUT_UPDATED_BY = """ - %s - Alert status has just been updated - - *Alert type*: - %s - - *Description*: - %s - - *Data Entity ODDRN*: - %s - - *New status*: - %s - - *Alert ID*: - %d - - """; - - @Override - public String build(final DecodedWALMessage message) { - final long alertId = Long.parseLong(message.columns().get(ALERT.ID.getName()).valueAsString()); - final String dataEntityOddrn = message.columns().get(ALERT.DATA_ENTITY_ODDRN.getName()).valueAsString(); - final String alertDescription = message.columns().get(ALERT.DESCRIPTION.getName()).valueAsString(); - final String alertType = message.columns().get(ALERT.TYPE.getName()).valueAsString(); - - switch (message.operation()) { - case INSERT: - return INSERT_TEMPLATE.formatted(alertType, alertDescription, dataEntityOddrn, alertId); - case UPDATE: - final String updatedBy = message.columns().get(ALERT.STATUS_UPDATED_BY.getName()).valueAsString(); - final String status = message.columns().get(ALERT.STATUS.getName()).valueAsString(); - - final String emoji = "RESOLVED".equals(status) ? ":white_check_mark:" : ":warning:"; - - return updatedBy != null - ? UPDATE_TEMPLATE_WITH_UPDATED_BY.formatted(emoji, alertType, alertDescription, - dataEntityOddrn, status, updatedBy, alertId) - : UPDATE_TEMPLATE_WITHOUT_UPDATED_BY.formatted(emoji, alertType, alertDescription, dataEntityOddrn, - status, alertId); - default: - throw new IllegalArgumentException( - "Unsupported operation for building an alert message: %s".formatted(message.operation())); - } - } -} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/NotificationMessageBuilder.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/NotificationMessageBuilder.java deleted file mode 100644 index 3a6336d7f..000000000 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/NotificationMessageBuilder.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.opendatadiscovery.oddplatform.notification.processor; - -import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage; - -public interface NotificationMessageBuilder { - String build(final DecodedWALMessage message); -} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java index 02f8346b3..12d8c8567 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java @@ -1,6 +1,6 @@ package org.opendatadiscovery.oddplatform.notification.processor; -import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage; +import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage; public interface PostgresWALMessageProcessor { void process(final DecodedWALMessage message); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/EmojiUtils.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/EmojiUtils.java new file mode 100644 index 000000000..390d3f9ea --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/EmojiUtils.java @@ -0,0 +1,18 @@ +package org.opendatadiscovery.oddplatform.notification.processor.message; + +public class EmojiUtils { + private EmojiUtils() { + } + + public static String exclamationEmoji(final String text) { + return String.format(":exclamation: %s", text); + } + + public static String userEmoji(final String text) { + return String.format(":bust_in_silhouette: %s", text); + } + + public static String linkEmoji(final String text) { + return String.format(":link: %s", text); + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/MarkdownUtils.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/MarkdownUtils.java new file mode 100644 index 000000000..5ed75455a --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/MarkdownUtils.java @@ -0,0 +1,18 @@ +package org.opendatadiscovery.oddplatform.notification.processor.message; + +public class MarkdownUtils { + private MarkdownUtils() { + } + + public static String bold(final String text) { + return String.format("*%s*", text); + } + + public static String italic(final String text) { + return String.format("_%s_", text); + } + + public static String buildLink(final String text, final String url) { + return String.format("<%s|%s>", url, text); + } +} \ No newline at end of file diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java new file mode 100644 index 000000000..2e5f7fe87 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java @@ -0,0 +1,77 @@ +package org.opendatadiscovery.oddplatform.notification.processor.message; + +import com.slack.api.model.block.LayoutBlock; +import com.slack.api.model.block.SectionBlock; +import java.net.URL; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; + +import static com.slack.api.model.block.Blocks.divider; +import static com.slack.api.model.block.Blocks.section; +import static com.slack.api.model.block.composition.BlockCompositions.markdownText; +import static java.util.stream.Collectors.joining; +import static org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage.AlertedDataEntity; +import static org.opendatadiscovery.oddplatform.notification.processor.message.EmojiUtils.exclamationEmoji; +import static org.opendatadiscovery.oddplatform.notification.processor.message.EmojiUtils.linkEmoji; +import static org.opendatadiscovery.oddplatform.notification.processor.message.EmojiUtils.userEmoji; +import static org.opendatadiscovery.oddplatform.notification.processor.message.MarkdownUtils.bold; +import static org.opendatadiscovery.oddplatform.notification.processor.message.MarkdownUtils.buildLink; +import static org.opendatadiscovery.oddplatform.notification.processor.message.MarkdownUtils.italic; + +@RequiredArgsConstructor +public class SlackNotificationMessageGenerator { + private final static DateTimeFormatter MESSAGE_DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + private final URL platformBaseUrl; + + public List generate(final AlertNotificationMessage message) { + final AlertedDataEntity dataEntity = message.getDataEntity(); + final String eventAt = message.getEventAt().format(MESSAGE_DATETIME_FORMAT); + + final SectionBlock header = section(c -> c.text(markdownText( + buildLink(bold(dataEntity.name()), buildDataEntityUrl(dataEntity.id())) + "\n" + italic(eventAt)))); + + final SectionBlock alertBody = section(c -> c.text(markdownText( + exclamationEmoji(bold(message.getAlertType().getDescription()) + "\n" + message.getAlertDescription())))); + + final SectionBlock owners = section(c -> c.text(markdownText( + userEmoji(extractOwners(dataEntity))))); + + final ArrayList blocks = new ArrayList<>(List.of(header, alertBody, owners)); + + if (!message.getDownstream().isEmpty()) { + final String downstreamMarkdownText = + message.getDownstream().stream().map(this::generateMarkdownTextForDataEntity).collect(joining("\n")); + + blocks.addAll(List.of( + divider(), + section(c -> c.text(markdownText(linkEmoji(bold("Affected data entities:"))))), + section(c -> c.text(markdownText(downstreamMarkdownText))) + )); + } + + return blocks; + } + + private String extractOwners(final AlertedDataEntity dataEntity) { + return dataEntity.owners() + .stream() + .map(owner -> String.format("@%s (%s)", owner.getLeft(), owner.getRight())) + .collect(joining(", ")); + } + + private String generateMarkdownTextForDataEntity(final AlertedDataEntity dataEntity) { + final String text = buildLink(dataEntity.name(), buildDataEntityUrl(dataEntity.id())); + + return dataEntity.owners().isEmpty() + ? text + : String.format("%s: %s", text, extractOwners(dataEntity)); + } + + private String buildDataEntityUrl(final long id) { + return String.format("%s/dataentities/%d/overview", platformBaseUrl, id); + } +} \ No newline at end of file diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/SlackWebhookSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/SlackWebhookSender.java deleted file mode 100644 index 595bb018e..000000000 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/SlackWebhookSender.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.opendatadiscovery.oddplatform.notification.processor.webhook; - -import java.net.URI; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.opendatadiscovery.oddplatform.utils.JSONSerDeUtils; -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; - -@Component -@Slf4j -public class SlackWebhookSender implements WebhookSender { - private final WebClient webClient = WebClient.create(); - - @Override - public void send(final String webhookUrl, final String message) { - webClient.post() - .uri(URI.create(webhookUrl)) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromValue(new SlackMessage(message).toJsonString())) - .retrieve() - .bodyToMono(String.class) - .block(); - } - - private record SlackMessage(String message) { - public String toJsonString() { - return JSONSerDeUtils.serializeJson(Map.of("text", message)); - } - } -} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/WebhookSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/WebhookSender.java deleted file mode 100644 index 31009c0ca..000000000 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/WebhookSender.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.opendatadiscovery.oddplatform.notification.processor.webhook; - -public interface WebhookSender { - void send(final String webhookUrl, final String message); -} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/NotificationSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/NotificationSender.java new file mode 100644 index 000000000..4147a3a9a --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/NotificationSender.java @@ -0,0 +1,9 @@ +package org.opendatadiscovery.oddplatform.notification.sender; + +import org.opendatadiscovery.oddplatform.notification.dto.NotificationMessage; + +public interface NotificationSender { + void send(final T message); + + String receiverId(); +} \ No newline at end of file diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/SlackNotificationSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/SlackNotificationSender.java new file mode 100644 index 000000000..01e477685 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/SlackNotificationSender.java @@ -0,0 +1,65 @@ +package org.opendatadiscovery.oddplatform.notification.sender; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.slack.api.model.block.LayoutBlock; +import java.net.URI; +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; +import org.opendatadiscovery.oddplatform.notification.processor.message.SlackNotificationMessageGenerator; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; + +@RequiredArgsConstructor +@Slf4j +public class SlackNotificationSender implements NotificationSender { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + .registerModules(new JavaTimeModule()) + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) + .setSerializationInclusion(JsonInclude.Include.NON_NULL); + + private final WebClient webClient; + private final URI slackWebhookUrl; + private final SlackNotificationMessageGenerator messageBuilder; + + @Override + public void send(final AlertNotificationMessage message) { + final List slackMessage = messageBuilder.generate(message); + + webClient.post() + .uri(slackWebhookUrl) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(serializePayload(new SlackMessage(slackMessage)))) + .retrieve() + .bodyToMono(String.class) + .block(); + } + + @Override + public String receiverId() { + return "Slack"; + } + + private String serializePayload(final SlackMessage payload) { + if (payload == null) { + return "{}"; + } + + try { + return OBJECT_MAPPER.writeValueAsString(payload); + } catch (final JsonProcessingException e) { + throw new IllegalArgumentException("Couldn't serialize Slack payload", e); + } + } + + private record SlackMessage(List blocks) { + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/WebhookNotificationSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/WebhookNotificationSender.java new file mode 100644 index 000000000..8bf23f3c0 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/WebhookNotificationSender.java @@ -0,0 +1,31 @@ +package org.opendatadiscovery.oddplatform.notification.sender; + +import java.net.URI; +import lombok.RequiredArgsConstructor; +import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; +import org.opendatadiscovery.oddplatform.utils.JSONSerDeUtils; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; + +@RequiredArgsConstructor +public class WebhookNotificationSender implements NotificationSender { + private final WebClient webClient; + private final URI webhookUrl; + + @Override + public void send(final AlertNotificationMessage message) { + webClient.post() + .uri(webhookUrl) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(JSONSerDeUtils.serializeJson(message))) + .retrieve() + .bodyToMono(String.class) + .block(); + } + + @Override + public String receiverId() { + return "Generic webhook"; + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java new file mode 100644 index 000000000..9a48d260a --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java @@ -0,0 +1,177 @@ +package org.opendatadiscovery.oddplatform.notification.translator; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.sql.Timestamp; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Stream; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jooq.CommonTableExpression; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Name; +import org.jooq.Record; +import org.opendatadiscovery.oddplatform.api.contract.model.AlertType; +import org.opendatadiscovery.oddplatform.dto.alert.AlertTypeEnum; +import org.opendatadiscovery.oddplatform.model.tables.pojos.LineagePojo; +import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; +import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage.AlertEventType; +import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage.AlertedDataEntity; +import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage; +import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage.Operation; +import org.opendatadiscovery.oddplatform.repository.util.JooqRecordHelper; +import org.opendatadiscovery.oddplatform.utils.Pair; + +import static java.util.Collections.emptyList; +import static java.util.stream.Collectors.toSet; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.jsonArrayAgg; +import static org.jooq.impl.DSL.jsonEntry; +import static org.jooq.impl.DSL.jsonObject; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.val; +import static org.opendatadiscovery.oddplatform.model.Tables.ALERT; +import static org.opendatadiscovery.oddplatform.model.Tables.DATA_ENTITY; +import static org.opendatadiscovery.oddplatform.model.Tables.LINEAGE; +import static org.opendatadiscovery.oddplatform.model.Tables.OWNER; +import static org.opendatadiscovery.oddplatform.model.Tables.OWNERSHIP; +import static org.opendatadiscovery.oddplatform.model.Tables.ROLE; + +@Slf4j +@RequiredArgsConstructor +public class AlertNotificationMessageTranslator implements NotificationMessageTranslator { + private static final String OWNERS_FIELD_ALIAS = "owners"; + + private final DSLContext dslContext; + private final JooqRecordHelper jooqRecordHelper; + private final int downstreamEntitiesDepth; + + @Override + public AlertNotificationMessage translate(final DecodedWALMessage message) { + final String alertDescription = message.getColumnValue(ALERT.DESCRIPTION.getName()); + final String dataEntityOddrn = message.getColumnValue(ALERT.DATA_ENTITY_ODDRN.getName()); + final String status = message.getColumnValue(ALERT.STATUS.getName()); + final AlertEventType eventType = resolveAlertEventType(message.operation(), status); + + final String eventAtString = AlertEventType.CREATED.equals(eventType) + ? message.getColumnValue(ALERT.CREATED_AT.getName()) + : message.getColumnValue(ALERT.STATUS_UPDATED_AT.getName()); + + final String alertTypeString = message.getColumnValue(ALERT.TYPE.getName()); + + return AlertNotificationMessage.builder() + .alertDescription(alertDescription) + .eventAt(Timestamp.valueOf(eventAtString).toLocalDateTime()) + .alertType(resolveAlertType(alertTypeString)) + .eventType(resolveAlertEventType(message.operation(), status)) + .dataEntity(fetchAlertedDataEntity(dataEntityOddrn)) + .downstream(fetchDownstream(dataEntityOddrn)) + .build(); + } + + private AlertTypeEnum resolveAlertType(final String alertTypeName) { + return AlertTypeEnum.getByName(alertTypeName) + .orElseThrow(() -> new IllegalArgumentException("Invalid alert type: %s".formatted(alertTypeName))); + } + + private AlertedDataEntity fetchAlertedDataEntity(final String dataEntityOddrn) { + final List entities = fetchAlertedDataEntities(List.of(dataEntityOddrn)); + + if (entities.isEmpty()) { + throw new IllegalStateException( + "Couldn't find data entity with oddrn %s despite the foreign key constraint".formatted( + dataEntityOddrn)); + } + + if (entities.size() > 1) { + throw new IllegalStateException( + "Select query with data entity oddrn %s returned more than one result".formatted(dataEntityOddrn)); + } + + return entities.get(0); + } + + // TODO: replace pair with an object + private List fetchAlertedDataEntities(final Collection oddrns) { + final List records = dslContext + .select(DATA_ENTITY.ID, DATA_ENTITY.INTERNAL_NAME, DATA_ENTITY.EXTERNAL_NAME) + .select(jsonArrayAgg(jsonObject( + jsonEntry("left", OWNER.NAME), + jsonEntry("right", ROLE.NAME)) + ).as(OWNERS_FIELD_ALIAS)) + .from(DATA_ENTITY) + .leftJoin(OWNERSHIP).on(OWNERSHIP.DATA_ENTITY_ID.eq(DATA_ENTITY.ID)) + .leftJoin(OWNER).on(OWNER.ID.eq(OWNERSHIP.OWNER_ID)) + .leftJoin(ROLE).on(ROLE.ID.eq(OWNERSHIP.ROLE_ID)) + .where(DATA_ENTITY.ODDRN.in(oddrns)) + .groupBy(DATA_ENTITY.ID, DATA_ENTITY.INTERNAL_NAME, DATA_ENTITY.EXTERNAL_NAME) + .fetchStream() + .toList(); + + return records.stream().map(this::mapAlertedEntityRecord).toList(); + } + + private List fetchDownstream(final String rootDataEntityOddrn) { + if (downstreamEntitiesDepth == 0) { + return emptyList(); + } + + final Name cteName = name("recursive_downstream"); + + final Field startDepth = val(1).as(field("depth", Integer.class)); + final Field depthField = field("%s.depth".formatted(cteName.toString()), Integer.class); + final Field childOddrnField = field("%s.child_oddrn".formatted(cteName.toString()), String.class); + + final CommonTableExpression cte = cteName.as(dslContext + .select(LINEAGE.fields()) + .select(startDepth) + .from(LINEAGE) + .where(LINEAGE.PARENT_ODDRN.eq(rootDataEntityOddrn)) + .unionAll( + dslContext + .select(LINEAGE.fields()) + .select(depthField.add(1)) + .from(LINEAGE) + .join(cteName).on(LINEAGE.PARENT_ODDRN.eq(childOddrnField)) + .where(depthField.lessThan(downstreamEntitiesDepth + 1)) + )); + + final Set downstreamEntitiesOddrns = dslContext.withRecursive(cte) + .selectDistinct(cte.field(LINEAGE.PARENT_ODDRN), cte.field(LINEAGE.CHILD_ODDRN)) + .from(cte.getName()) + .fetchStreamInto(LineagePojo.class) + .flatMap(lp -> Stream.of(lp.getParentOddrn(), lp.getChildOddrn())) + .filter(oddrn -> !oddrn.equals(rootDataEntityOddrn)) + .collect(toSet()); + + if (downstreamEntitiesOddrns.isEmpty()) { + return emptyList(); + } + + return fetchAlertedDataEntities(downstreamEntitiesOddrns); + } + + private AlertEventType resolveAlertEventType(final Operation operation, final String status) { + return switch (operation) { + case INSERT -> AlertEventType.CREATED; + case UPDATE -> "OPEN".equals(status) ? AlertEventType.REOPENED : AlertEventType.RESOLVED; + }; + } + + private AlertedDataEntity mapAlertedEntityRecord(final Record record) { + final String name = record.get(DATA_ENTITY.INTERNAL_NAME) == null + ? record.get(DATA_ENTITY.EXTERNAL_NAME) + : record.get(DATA_ENTITY.INTERNAL_NAME); + + final Set> owners = jooqRecordHelper + .extractAggRelation(record, OWNERS_FIELD_ALIAS, new TypeReference>() { + }) + .stream() + .filter(p -> p.getRight() != null && p.getLeft() != null) + .collect(toSet()); + + return new AlertedDataEntity(record.get(DATA_ENTITY.ID), name, owners); + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/NotificationMessageTranslator.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/NotificationMessageTranslator.java new file mode 100644 index 000000000..2782b60ec --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/NotificationMessageTranslator.java @@ -0,0 +1,8 @@ +package org.opendatadiscovery.oddplatform.notification.translator; + +import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage; +import org.opendatadiscovery.oddplatform.notification.dto.NotificationMessage; + +public interface NotificationMessageTranslator { + T translate(final DecodedWALMessage message); +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/DecodedWALMessage.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/DecodedWALMessage.java deleted file mode 100644 index 45d37efb9..000000000 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/DecodedWALMessage.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.opendatadiscovery.oddplatform.notification.wal; - -import java.util.Map; - -public record DecodedWALMessage(int relationId, Operation operation, Map columns) { - public enum Operation { - INSERT, - UPDATE, - } - - public record Column(String name, String type, String valueAsString) { - } -} \ No newline at end of file diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/PostgresWALMessageDecoder.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/PostgresWALMessageDecoder.java index fb3b7f62a..8980614ae 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/PostgresWALMessageDecoder.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/PostgresWALMessageDecoder.java @@ -14,8 +14,9 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opendatadiscovery.oddplatform.notification.PGConnectionFactory; -import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage.Column; -import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage.Operation; +import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage; +import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage.Column; +import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage.Operation; import org.springframework.stereotype.Component; import static java.util.function.Function.identity; @@ -114,8 +115,8 @@ private DecodedWALMessage decodeUpdateMessage(final ByteBuffer buffer) { throw new RuntimeException("No column meta for relation ID %d".formatted(relationId)); } - // K = Identifies the following TupleData submessage as a key - // O = Identifies the following TupleData submessage as an old tuple + // K = Identifies the following TupleData sub-message as a key + // O = Identifies the following TupleData sub-message as an old tuple // Skipping as we don't need old tuple data at the moment if ('O' == tupleType || 'K' == tupleType) { skipColumnTupleData(buffer); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/util/JooqRecordHelper.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/util/JooqRecordHelper.java index ae7aa419a..d2481d189 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/util/JooqRecordHelper.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/util/JooqRecordHelper.java @@ -1,11 +1,13 @@ package org.opendatadiscovery.oddplatform.repository.util; +import com.fasterxml.jackson.core.type.TypeReference; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.jooq.DSLContext; @@ -22,19 +24,14 @@ public class JooqRecordHelper { private final DSLContext dslContext; - public Set extractAggRelation(final Record r, final String fieldName, final Class fieldPojoClass) { - final Set set; - try { - set = r.get(fieldName, Set.class); - } catch (final IllegalArgumentException e) { - return emptySet(); - } + public Set extractAggRelation(final Record r, + final String fieldName, + final TypeReference typeReference) { + return extractAggRelation(r, fieldName, t -> JSONSerDeUtils.deserializeJson(t, typeReference)); + } - return set - .stream() - .map(t -> JSONSerDeUtils.deserializeJson(t, fieldPojoClass)) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); + public Set extractAggRelation(final Record r, final String fieldName, final Class fieldPojoClass) { + return extractAggRelation(r, fieldName, t -> JSONSerDeUtils.deserializeJson(t, fieldPojoClass)); } public

P extractRelation(final Record r, final Table relationTable, final Class

pojoClass) { @@ -66,4 +63,21 @@ public Record remapCte(final Record r, final String cteName, final Table targ record.fromMap(remappedValues); return record; } + + private Set extractAggRelation(final Record r, + final String fieldName, + final Function deserializer) { + final Set set; + + try { + set = r.get(fieldName, Set.class); + } catch (final IllegalArgumentException e) { + return emptySet(); + } + + return set.stream() + .map(deserializer) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/JSONSerDeUtils.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/JSONSerDeUtils.java index f91a8d952..456ad2ad8 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/JSONSerDeUtils.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/JSONSerDeUtils.java @@ -26,6 +26,15 @@ public static T deserializeJson(final String data, final Class clazz) { } } + @Nullable + public static T deserializeJson(final Object data, final TypeReference typeReference) { + if (data == null || typeReference == null) { + return null; + } + + return OBJECT_MAPPER.convertValue(data, typeReference); + } + @Nullable public static T deserializeJson(final Object data, final Class clazz) { if (data == null || clazz == null) { diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/Pair.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/Pair.java index db035a577..aeaf14ac4 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/Pair.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/Pair.java @@ -1,14 +1,20 @@ package org.opendatadiscovery.oddplatform.utils; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; -import lombok.RequiredArgsConstructor; @Data -@RequiredArgsConstructor public final class Pair { private final L left; private final R right; + @JsonCreator + public Pair(@JsonProperty("left") L left, @JsonProperty("right") R right) { + this.left = left; + this.right = right; + } + public static Pair of(final L left, final R right) { return new Pair<>(left, right); } diff --git a/odd-platform-api/src/main/resources/application.yml b/odd-platform-api/src/main/resources/application.yml index 5bdcaf8b3..dda35921a 100644 --- a/odd-platform-api/src/main/resources/application.yml +++ b/odd-platform-api/src/main/resources/application.yml @@ -1,6 +1,6 @@ spring: datasource: - url: jdbc:postgresql://127.0.0.1:5432/odd-platform + url: jdbc:postgresql://127.0.0.1:6432/odd-platform username: odd-platform password: odd-platform-password jooq: @@ -63,11 +63,18 @@ metrics: notifications: enabled: false - webhook_url: + message: + downstream-entities-depth: 1 wal: advisory-lock-id: 100 replication-slot-name: odd_platform_replication_slot publication-name: odd_platform_publication_alert +# receivers: +# slack: +# url: +# platform-base-url: +# webhook: +# url: management: endpoints: @@ -96,4 +103,4 @@ logging: org.jooq.tools.LoggerListener: info io.r2dbc.postgresql.QUERY: info io.r2dbc.postgresql.PARAM: info - org.opendatadiscovery.oddplatform.notification: info \ No newline at end of file + org.opendatadiscovery.oddplatform.notification: debug \ No newline at end of file From ed35af59abe2abe3f4d0dc3edcb003f9df972663 Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Tue, 19 Jul 2022 11:50:11 +0300 Subject: [PATCH 2/9] Replace Pair<> with DTO --- .../notification/dto/AlertNotificationMessage.java | 3 +-- .../notification/dto/OwnershipPair.java | 7 +++++++ .../message/SlackNotificationMessageGenerator.java | 2 +- .../AlertNotificationMessageTranslator.java | 14 ++++++-------- .../opendatadiscovery/oddplatform/utils/Pair.java | 10 ++-------- 5 files changed, 17 insertions(+), 19 deletions(-) create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/OwnershipPair.java diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java index 36e1aabd2..142b06f1b 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java @@ -9,7 +9,6 @@ import lombok.NoArgsConstructor; import lombok.ToString; import org.opendatadiscovery.oddplatform.dto.alert.AlertTypeEnum; -import org.opendatadiscovery.oddplatform.utils.Pair; @NoArgsConstructor @AllArgsConstructor @@ -24,7 +23,7 @@ public class AlertNotificationMessage extends NotificationMessage { private AlertedDataEntity dataEntity; private List downstream; - public record AlertedDataEntity(long id, String name, Set> owners) { + public record AlertedDataEntity(long id, String name, Set owners) { } public enum AlertEventType { diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/OwnershipPair.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/OwnershipPair.java new file mode 100644 index 000000000..6eb5c00b7 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/OwnershipPair.java @@ -0,0 +1,7 @@ +package org.opendatadiscovery.oddplatform.notification.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public record OwnershipPair(@JsonProperty("owner_name") String ownerName, + @JsonProperty("role_name") String roleName) { +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java index 2e5f7fe87..9d96783d0 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java @@ -59,7 +59,7 @@ public List generate(final AlertNotificationMessage message) { private String extractOwners(final AlertedDataEntity dataEntity) { return dataEntity.owners() .stream() - .map(owner -> String.format("@%s (%s)", owner.getLeft(), owner.getRight())) + .map(owner -> String.format("@%s (%s)", owner.ownerName(), owner.roleName())) .collect(joining(", ")); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java index 9a48d260a..a486ac263 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java @@ -13,7 +13,6 @@ import org.jooq.Field; import org.jooq.Name; import org.jooq.Record; -import org.opendatadiscovery.oddplatform.api.contract.model.AlertType; import org.opendatadiscovery.oddplatform.dto.alert.AlertTypeEnum; import org.opendatadiscovery.oddplatform.model.tables.pojos.LineagePojo; import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; @@ -21,8 +20,8 @@ import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage.AlertedDataEntity; import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage; import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage.Operation; +import org.opendatadiscovery.oddplatform.notification.dto.OwnershipPair; import org.opendatadiscovery.oddplatform.repository.util.JooqRecordHelper; -import org.opendatadiscovery.oddplatform.utils.Pair; import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toSet; @@ -93,13 +92,12 @@ private AlertedDataEntity fetchAlertedDataEntity(final String dataEntityOddrn) { return entities.get(0); } - // TODO: replace pair with an object private List fetchAlertedDataEntities(final Collection oddrns) { final List records = dslContext .select(DATA_ENTITY.ID, DATA_ENTITY.INTERNAL_NAME, DATA_ENTITY.EXTERNAL_NAME) .select(jsonArrayAgg(jsonObject( - jsonEntry("left", OWNER.NAME), - jsonEntry("right", ROLE.NAME)) + jsonEntry("owner_name", OWNER.NAME), + jsonEntry("role_name", ROLE.NAME)) ).as(OWNERS_FIELD_ALIAS)) .from(DATA_ENTITY) .leftJoin(OWNERSHIP).on(OWNERSHIP.DATA_ENTITY_ID.eq(DATA_ENTITY.ID)) @@ -165,11 +163,11 @@ private AlertedDataEntity mapAlertedEntityRecord(final Record record) { ? record.get(DATA_ENTITY.EXTERNAL_NAME) : record.get(DATA_ENTITY.INTERNAL_NAME); - final Set> owners = jooqRecordHelper - .extractAggRelation(record, OWNERS_FIELD_ALIAS, new TypeReference>() { + final Set owners = jooqRecordHelper + .extractAggRelation(record, OWNERS_FIELD_ALIAS, new TypeReference() { }) .stream() - .filter(p -> p.getRight() != null && p.getLeft() != null) + .filter(o -> o.ownerName() != null && o.roleName() != null) .collect(toSet()); return new AlertedDataEntity(record.get(DATA_ENTITY.ID), name, owners); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/Pair.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/Pair.java index aeaf14ac4..db035a577 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/Pair.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/utils/Pair.java @@ -1,20 +1,14 @@ package org.opendatadiscovery.oddplatform.utils; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; +import lombok.RequiredArgsConstructor; @Data +@RequiredArgsConstructor public final class Pair { private final L left; private final R right; - @JsonCreator - public Pair(@JsonProperty("left") L left, @JsonProperty("right") R right) { - this.left = left; - this.right = right; - } - public static Pair of(final L left, final R right) { return new Pair<>(left, right); } From a643ab159b8bc045614bf047c84ee70522a85b79 Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Tue, 19 Jul 2022 12:54:52 +0300 Subject: [PATCH 3/9] Minor improvements --- .../processor/AlertNotificationMessageProcessor.java | 4 ++-- .../processor/message/SlackNotificationMessageGenerator.java | 2 +- odd-platform-api/src/main/resources/application.yml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java index 80c0bf02e..7159696c8 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java @@ -14,11 +14,11 @@ @Slf4j public class AlertNotificationMessageProcessor implements PostgresWALMessageProcessor { private final List> notificationSenders; - private final NotificationMessageTranslator messageEnricher; + private final NotificationMessageTranslator messageTranslator; @Override public void process(final DecodedWALMessage message) { - final AlertNotificationMessage notificationMessage = messageEnricher.translate(message); + final AlertNotificationMessage notificationMessage = messageTranslator.translate(message); notificationSenders.forEach(sender -> sendMessage(sender, notificationMessage)); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java index 9d96783d0..6d41815a7 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java @@ -40,7 +40,7 @@ public List generate(final AlertNotificationMessage message) { final SectionBlock owners = section(c -> c.text(markdownText( userEmoji(extractOwners(dataEntity))))); - final ArrayList blocks = new ArrayList<>(List.of(header, alertBody, owners)); + final List blocks = new ArrayList<>(List.of(header, alertBody, owners)); if (!message.getDownstream().isEmpty()) { final String downstreamMarkdownText = diff --git a/odd-platform-api/src/main/resources/application.yml b/odd-platform-api/src/main/resources/application.yml index dda35921a..79d42d96e 100644 --- a/odd-platform-api/src/main/resources/application.yml +++ b/odd-platform-api/src/main/resources/application.yml @@ -70,7 +70,7 @@ notifications: replication-slot-name: odd_platform_replication_slot publication-name: odd_platform_publication_alert # receivers: -# slack: +# slack: # url: # platform-base-url: # webhook: From 7ac5b78f5ad0628d4b74be4976fe7c7aaff46ed4 Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Wed, 20 Jul 2022 11:27:45 +0300 Subject: [PATCH 4/9] Replace WebClient with HttpClient --- .../notification/NotificationSubscriber.java | 9 +++-- .../config/NotificationConfiguration.java | 13 ++++---- .../NotificationSenderException.java | 28 ++++++++++++++++ .../AlertNotificationMessageProcessor.java | 24 +++++++------- .../PostgresWALMessageProcessor.java | 5 ++- .../sender/AbstractNotificationSender.java | 31 +++++++++++++++++ .../sender/NotificationSender.java | 3 +- .../sender/SlackNotificationSender.java | 33 +++++++++++-------- .../sender/WebhookNotificationSender.java | 27 ++++++++------- 9 files changed, 122 insertions(+), 51 deletions(-) create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/exception/NotificationSenderException.java create mode 100644 odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/AbstractNotificationSender.java diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java index 51889ef14..f51e7b77e 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java @@ -7,6 +7,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; @@ -14,6 +15,7 @@ import org.jooq.Table; import org.opendatadiscovery.oddplatform.model.Tables; import org.opendatadiscovery.oddplatform.notification.config.NotificationsProperties.WalProperties; +import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage; import org.opendatadiscovery.oddplatform.notification.exception.NotificationSubscriberException; import org.opendatadiscovery.oddplatform.notification.processor.PostgresWALMessageProcessor; import org.opendatadiscovery.oddplatform.notification.wal.PostgresWALMessageDecoder; @@ -75,8 +77,11 @@ public void run() { log.debug("processing LSN: {}", stream.getLastReceiveLSN()); - messageDecoder.decode(buffer) - .ifPresent(messageProcessor::process); + final Optional decodedMessage = messageDecoder.decode(buffer); + + if (decodedMessage.isPresent()) { + messageProcessor.process(decodedMessage.get()); + } stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java index cb5268954..ae81ea022 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java @@ -2,6 +2,7 @@ import java.net.URI; import java.net.URL; +import java.net.http.HttpClient; import lombok.extern.slf4j.Slf4j; import org.jooq.DSLContext; import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; @@ -23,35 +24,35 @@ @Slf4j public class NotificationConfiguration { @Bean - public WebClient webClient() { - return WebClient.create(); + public HttpClient httpClient() { + return HttpClient.newHttpClient(); } @Bean @ConditionalOnProperty(name = "notifications.receivers.slack.url") public NotificationSender slackNotificationSender( @Value("${notifications.receivers.slack.url}") final URI slackWebhookUrl, - final WebClient webClient, + final HttpClient httpClient, final SlackNotificationMessageGenerator messageGenerator ) { if (slackWebhookUrl.toString().isEmpty()) { throw new IllegalArgumentException("Slack webhook URL is empty"); } - return new SlackNotificationSender(webClient, slackWebhookUrl, messageGenerator); + return new SlackNotificationSender(httpClient, slackWebhookUrl, messageGenerator); } @Bean @ConditionalOnProperty(name = "notifications.receivers.webhook.url") public NotificationSender webhookNotificationSender( @Value("${notifications.receivers.webhook.url}") final URI webhookUrl, - final WebClient webClient + final HttpClient httpClient ) { if (webhookUrl.toString().isEmpty()) { throw new IllegalArgumentException("Webhook URL is empty"); } - return new WebhookNotificationSender(webClient, webhookUrl); + return new WebhookNotificationSender(httpClient, webhookUrl); } @Bean diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/exception/NotificationSenderException.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/exception/NotificationSenderException.java new file mode 100644 index 000000000..3a26d3dcd --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/exception/NotificationSenderException.java @@ -0,0 +1,28 @@ +package org.opendatadiscovery.oddplatform.notification.exception; + +import lombok.Getter; + +@Getter +public class NotificationSenderException extends Exception { + private final String notificationReceiverId; + + public NotificationSenderException(final String message, + final String notificationReceiverId) { + super(message); + + this.notificationReceiverId = notificationReceiverId; + } + + public NotificationSenderException(final String message, + final String notificationReceiverId, + final Throwable t) { + super(message, t); + + this.notificationReceiverId = notificationReceiverId; + } + + @Override + public String getMessage() { + return String.format("Notification sender %s: %s", notificationReceiverId, super.getMessage()); + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java index 7159696c8..993bd122f 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java @@ -5,8 +5,9 @@ import lombok.extern.slf4j.Slf4j; import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage; -import org.opendatadiscovery.oddplatform.notification.translator.NotificationMessageTranslator; +import org.opendatadiscovery.oddplatform.notification.exception.NotificationSenderException; import org.opendatadiscovery.oddplatform.notification.sender.NotificationSender; +import org.opendatadiscovery.oddplatform.notification.translator.NotificationMessageTranslator; import org.springframework.stereotype.Component; @Component @@ -17,21 +18,18 @@ public class AlertNotificationMessageProcessor implements PostgresWALMessageProc private final NotificationMessageTranslator messageTranslator; @Override - public void process(final DecodedWALMessage message) { + public void process(final DecodedWALMessage message) throws InterruptedException { final AlertNotificationMessage notificationMessage = messageTranslator.translate(message); - notificationSenders.forEach(sender -> sendMessage(sender, notificationMessage)); - } - - private void sendMessage(final NotificationSender notificationSender, - final AlertNotificationMessage message) { - log.debug("Sending notification message via {}: {}", notificationSender.receiverId(), message); + for (final NotificationSender notificationSender : notificationSenders) { + log.debug("Sending notification message via {}: {}", notificationSender.receiverId(), message); - try { - notificationSender.send(message); - } catch (final Exception e) { - log.error(String.format( - "Error occurred while sending notification via %s", notificationSender.receiverId()), e); + try { + notificationSender.send(notificationMessage); + } catch (final NotificationSenderException e) { + log.error(String.format( + "Error occurred while sending notification via %s", notificationSender.receiverId()), e); + } } } } \ No newline at end of file diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java index 12d8c8567..16c4061da 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java @@ -1,7 +1,10 @@ package org.opendatadiscovery.oddplatform.notification.processor; +import java.io.NotSerializableException; import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage; +import org.opendatadiscovery.oddplatform.notification.exception.NotificationSenderException; public interface PostgresWALMessageProcessor { - void process(final DecodedWALMessage message); + void process(final DecodedWALMessage message) + throws InterruptedException, NotificationSenderException, NotSerializableException; } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/AbstractNotificationSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/AbstractNotificationSender.java new file mode 100644 index 000000000..def2018f9 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/AbstractNotificationSender.java @@ -0,0 +1,31 @@ +package org.opendatadiscovery.oddplatform.notification.sender; + +import java.io.IOException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import lombok.RequiredArgsConstructor; +import org.opendatadiscovery.oddplatform.notification.dto.NotificationMessage; +import org.opendatadiscovery.oddplatform.notification.exception.NotificationSenderException; +import org.springframework.http.HttpStatus; + +@RequiredArgsConstructor +public abstract class AbstractNotificationSender implements NotificationSender { + private final HttpClient httpClient; + + protected void sendAndValidate( + final HttpRequest httpRequest + ) throws NotificationSenderException, InterruptedException { + final HttpResponse response; + try { + response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString()); + } catch (final IOException e) { + throw new NotificationSenderException("Couldn't send HTTP request", receiverId(), e); + } + + if (response.statusCode() != HttpStatus.OK.value()) { + throw new NotificationSenderException( + "Notification sender response didn't complete with 200 status code", receiverId()); + } + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/NotificationSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/NotificationSender.java index 4147a3a9a..3a3c82dad 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/NotificationSender.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/NotificationSender.java @@ -1,9 +1,10 @@ package org.opendatadiscovery.oddplatform.notification.sender; import org.opendatadiscovery.oddplatform.notification.dto.NotificationMessage; +import org.opendatadiscovery.oddplatform.notification.exception.NotificationSenderException; public interface NotificationSender { - void send(final T message); + void send(final T message) throws InterruptedException, NotificationSenderException; String receiverId(); } \ No newline at end of file diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/SlackNotificationSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/SlackNotificationSender.java index 01e477685..cf0c021c6 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/SlackNotificationSender.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/SlackNotificationSender.java @@ -8,39 +8,44 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.slack.api.model.block.LayoutBlock; import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; import java.util.List; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; +import org.opendatadiscovery.oddplatform.notification.exception.NotificationSenderException; import org.opendatadiscovery.oddplatform.notification.processor.message.SlackNotificationMessageGenerator; -import org.springframework.http.MediaType; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; -@RequiredArgsConstructor @Slf4j -public class SlackNotificationSender implements NotificationSender { +public class SlackNotificationSender extends AbstractNotificationSender { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) .registerModules(new JavaTimeModule()) .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) .setSerializationInclusion(JsonInclude.Include.NON_NULL); - private final WebClient webClient; private final URI slackWebhookUrl; private final SlackNotificationMessageGenerator messageBuilder; + public SlackNotificationSender(final HttpClient httpClient, + final URI slackWebhookUrl, + final SlackNotificationMessageGenerator messageBuilder) { + super(httpClient); + + this.slackWebhookUrl = slackWebhookUrl; + this.messageBuilder = messageBuilder; + } + @Override - public void send(final AlertNotificationMessage message) { + public void send(final AlertNotificationMessage message) throws InterruptedException, NotificationSenderException { final List slackMessage = messageBuilder.generate(message); - webClient.post() + final HttpRequest request = HttpRequest.newBuilder() .uri(slackWebhookUrl) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromValue(serializePayload(new SlackMessage(slackMessage)))) - .retrieve() - .bodyToMono(String.class) - .block(); + .POST(HttpRequest.BodyPublishers.ofString(serializePayload(new SlackMessage(slackMessage)))) + .build(); + + sendAndValidate(request); } @Override diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/WebhookNotificationSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/WebhookNotificationSender.java index 8bf23f3c0..d0536891e 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/WebhookNotificationSender.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/sender/WebhookNotificationSender.java @@ -1,27 +1,26 @@ package org.opendatadiscovery.oddplatform.notification.sender; import java.net.URI; -import lombok.RequiredArgsConstructor; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; +import org.opendatadiscovery.oddplatform.notification.exception.NotificationSenderException; import org.opendatadiscovery.oddplatform.utils.JSONSerDeUtils; -import org.springframework.http.MediaType; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; -@RequiredArgsConstructor -public class WebhookNotificationSender implements NotificationSender { - private final WebClient webClient; +public class WebhookNotificationSender extends AbstractNotificationSender { private final URI webhookUrl; + public WebhookNotificationSender(final HttpClient httpClient, final URI webhookUrl) { + super(httpClient); + this.webhookUrl = webhookUrl; + } + @Override - public void send(final AlertNotificationMessage message) { - webClient.post() + public void send(final AlertNotificationMessage message) throws InterruptedException, NotificationSenderException { + sendAndValidate(HttpRequest.newBuilder() .uri(webhookUrl) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromValue(JSONSerDeUtils.serializeJson(message))) - .retrieve() - .bodyToMono(String.class) - .block(); + .POST(HttpRequest.BodyPublishers.ofString(JSONSerDeUtils.serializeJson(message))) + .build()); } @Override From 529c91de1ec31ec19bcb5ebde626ca44e79b7b9b Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Wed, 20 Jul 2022 11:38:51 +0300 Subject: [PATCH 5/9] Remove unnecessary exceptions --- .../notification/processor/PostgresWALMessageProcessor.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java index 16c4061da..8c53384a3 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java @@ -1,10 +1,7 @@ package org.opendatadiscovery.oddplatform.notification.processor; -import java.io.NotSerializableException; import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage; -import org.opendatadiscovery.oddplatform.notification.exception.NotificationSenderException; public interface PostgresWALMessageProcessor { - void process(final DecodedWALMessage message) - throws InterruptedException, NotificationSenderException, NotSerializableException; + void process(final DecodedWALMessage message) throws InterruptedException; } From e935882709ad916d74017c7564b3b56c57ecf231 Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Wed, 20 Jul 2022 11:42:00 +0300 Subject: [PATCH 6/9] Fix checkstyle errors --- .../message/SlackNotificationMessageGenerator.java | 2 +- .../oddplatform/repository/util/JooqRecordHelper.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java index 6d41815a7..477be0703 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java @@ -23,7 +23,7 @@ @RequiredArgsConstructor public class SlackNotificationMessageGenerator { - private final static DateTimeFormatter MESSAGE_DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final DateTimeFormatter MESSAGE_DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private final URL platformBaseUrl; diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/util/JooqRecordHelper.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/util/JooqRecordHelper.java index d2481d189..8b2a06796 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/util/JooqRecordHelper.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/util/JooqRecordHelper.java @@ -27,11 +27,11 @@ public class JooqRecordHelper { public Set extractAggRelation(final Record r, final String fieldName, final TypeReference typeReference) { - return extractAggRelation(r, fieldName, t -> JSONSerDeUtils.deserializeJson(t, typeReference)); + return extractSetRelation(r, fieldName, t -> JSONSerDeUtils.deserializeJson(t, typeReference)); } public Set extractAggRelation(final Record r, final String fieldName, final Class fieldPojoClass) { - return extractAggRelation(r, fieldName, t -> JSONSerDeUtils.deserializeJson(t, fieldPojoClass)); + return extractSetRelation(r, fieldName, t -> JSONSerDeUtils.deserializeJson(t, fieldPojoClass)); } public

P extractRelation(final Record r, final Table relationTable, final Class

pojoClass) { @@ -64,7 +64,7 @@ public Record remapCte(final Record r, final String cteName, final Table targ return record; } - private Set extractAggRelation(final Record r, + private Set extractSetRelation(final Record r, final String fieldName, final Function deserializer) { final Set set; From e300b2576b3ece6de3fd8dee0c96aa13674e7ca9 Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Wed, 20 Jul 2022 13:24:52 +0300 Subject: [PATCH 7/9] Add more payload to the alert --- .../{EmojiUtils.java => SlackEmojiUtils.java} | 10 +++++-- .../SlackNotificationMessageGenerator.java | 30 ++++++++++++------- .../src/main/resources/application.yml | 4 +-- 3 files changed, 28 insertions(+), 16 deletions(-) rename odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/{EmojiUtils.java => SlackEmojiUtils.java} (68%) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/EmojiUtils.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackEmojiUtils.java similarity index 68% rename from odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/EmojiUtils.java rename to odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackEmojiUtils.java index 390d3f9ea..544cb7722 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/EmojiUtils.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackEmojiUtils.java @@ -1,13 +1,17 @@ package org.opendatadiscovery.oddplatform.notification.processor.message; -public class EmojiUtils { - private EmojiUtils() { - } +import lombok.experimental.UtilityClass; +@UtilityClass +public class SlackEmojiUtils { public static String exclamationEmoji(final String text) { return String.format(":exclamation: %s", text); } + public static String resolvedEmoji(final String text) { + return String.format(":white_check_mark: %s", text); + } + public static String userEmoji(final String text) { return String.format(":bust_in_silhouette: %s", text); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java index 477be0703..78b078342 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java @@ -1,7 +1,6 @@ package org.opendatadiscovery.oddplatform.notification.processor.message; import com.slack.api.model.block.LayoutBlock; -import com.slack.api.model.block.SectionBlock; import java.net.URL; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -13,13 +12,15 @@ import static com.slack.api.model.block.Blocks.section; import static com.slack.api.model.block.composition.BlockCompositions.markdownText; import static java.util.stream.Collectors.joining; +import static org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage.AlertEventType; import static org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage.AlertedDataEntity; -import static org.opendatadiscovery.oddplatform.notification.processor.message.EmojiUtils.exclamationEmoji; -import static org.opendatadiscovery.oddplatform.notification.processor.message.EmojiUtils.linkEmoji; -import static org.opendatadiscovery.oddplatform.notification.processor.message.EmojiUtils.userEmoji; import static org.opendatadiscovery.oddplatform.notification.processor.message.MarkdownUtils.bold; import static org.opendatadiscovery.oddplatform.notification.processor.message.MarkdownUtils.buildLink; import static org.opendatadiscovery.oddplatform.notification.processor.message.MarkdownUtils.italic; +import static org.opendatadiscovery.oddplatform.notification.processor.message.SlackEmojiUtils.exclamationEmoji; +import static org.opendatadiscovery.oddplatform.notification.processor.message.SlackEmojiUtils.linkEmoji; +import static org.opendatadiscovery.oddplatform.notification.processor.message.SlackEmojiUtils.resolvedEmoji; +import static org.opendatadiscovery.oddplatform.notification.processor.message.SlackEmojiUtils.userEmoji; @RequiredArgsConstructor public class SlackNotificationMessageGenerator { @@ -31,16 +32,19 @@ public List generate(final AlertNotificationMessage message) { final AlertedDataEntity dataEntity = message.getDataEntity(); final String eventAt = message.getEventAt().format(MESSAGE_DATETIME_FORMAT); - final SectionBlock header = section(c -> c.text(markdownText( - buildLink(bold(dataEntity.name()), buildDataEntityUrl(dataEntity.id())) + "\n" + italic(eventAt)))); + final List blocks = new ArrayList<>(); - final SectionBlock alertBody = section(c -> c.text(markdownText( - exclamationEmoji(bold(message.getAlertType().getDescription()) + "\n" + message.getAlertDescription())))); + blocks.add(section(c -> c.text(markdownText( + buildLink(bold(dataEntity.name()), buildDataEntityUrl(dataEntity.id())) + "\n" + italic(eventAt))))); - final SectionBlock owners = section(c -> c.text(markdownText( - userEmoji(extractOwners(dataEntity))))); + if (AlertEventType.RESOLVED.equals(message.getEventType())) { + blocks.add(section(c -> c.text(markdownText(bold("This alert has been resolved"))))); + blocks.add(section(c -> c.text(markdownText(resolvedEmoji(constructAlertBody(message)))))); + } else { + blocks.add(section(c -> c.text(markdownText(exclamationEmoji(constructAlertBody(message)))))); + } - final List blocks = new ArrayList<>(List.of(header, alertBody, owners)); + blocks.add(section(c -> c.text(markdownText(userEmoji(extractOwners(dataEntity)))))); if (!message.getDownstream().isEmpty()) { final String downstreamMarkdownText = @@ -56,6 +60,10 @@ public List generate(final AlertNotificationMessage message) { return blocks; } + private String constructAlertBody(final AlertNotificationMessage message) { + return bold(message.getAlertType().getDescription()) + "\n" + message.getAlertDescription(); + } + private String extractOwners(final AlertedDataEntity dataEntity) { return dataEntity.owners() .stream() diff --git a/odd-platform-api/src/main/resources/application.yml b/odd-platform-api/src/main/resources/application.yml index 79d42d96e..ca1be0444 100644 --- a/odd-platform-api/src/main/resources/application.yml +++ b/odd-platform-api/src/main/resources/application.yml @@ -1,6 +1,6 @@ spring: datasource: - url: jdbc:postgresql://127.0.0.1:6432/odd-platform + url: jdbc:postgresql://127.0.0.1:5432/odd-platform username: odd-platform password: odd-platform-password jooq: @@ -103,4 +103,4 @@ logging: org.jooq.tools.LoggerListener: info io.r2dbc.postgresql.QUERY: info io.r2dbc.postgresql.PARAM: info - org.opendatadiscovery.oddplatform.notification: debug \ No newline at end of file + org.opendatadiscovery.oddplatform.notification: info \ No newline at end of file From f7aab1a654e6349cbceb4b51fcffda411d37f66e Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Wed, 20 Jul 2022 21:46:57 +0300 Subject: [PATCH 8/9] Reformat alert payload --- .../oddplatform/dto/DataEntityTypeDto.java | 8 + .../config/NotificationConfiguration.java | 1 - .../dto/AlertNotificationMessage.java | 10 +- .../AlertNotificationMessageProcessor.java | 2 +- .../{MarkdownUtils.java => MrkdwnUtils.java} | 10 +- .../processor/message/SlackEmojiUtils.java | 8 - .../SlackNotificationMessageGenerator.java | 137 +++++++++++++----- .../AlertNotificationMessageTranslator.java | 44 +++++- 8 files changed, 160 insertions(+), 60 deletions(-) rename odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/{MarkdownUtils.java => MrkdwnUtils.java} (64%) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/DataEntityTypeDto.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/DataEntityTypeDto.java index 276dc6329..383f2bc80 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/DataEntityTypeDto.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/dto/DataEntityTypeDto.java @@ -6,6 +6,7 @@ import java.util.stream.Collectors; import lombok.Getter; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; import static java.util.function.Function.identity; @@ -38,6 +39,13 @@ public enum DataEntityTypeDto { @Getter private final int id; + public String resolveName() { + return Arrays.stream(name().split("_")) + .map(String::toLowerCase) + .map(StringUtils::capitalize) + .collect(Collectors.joining(" ")); + } + public static Optional findById(final int id) { return Optional.ofNullable(MAP.get(id)); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java index ae81ea022..d85946b9e 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/config/NotificationConfiguration.java @@ -17,7 +17,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.web.reactive.function.client.WebClient; @Configuration @ConditionalOnProperty(value = "notifications.enabled", havingValue = "true") diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java index 142b06f1b..d64bd7399 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/dto/AlertNotificationMessage.java @@ -3,11 +3,13 @@ import java.time.LocalDateTime; import java.util.List; import java.util.Set; +import javax.annotation.Nullable; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.ToString; +import org.opendatadiscovery.oddplatform.dto.DataEntityTypeDto; import org.opendatadiscovery.oddplatform.dto.alert.AlertTypeEnum; @NoArgsConstructor @@ -20,10 +22,16 @@ public class AlertNotificationMessage extends NotificationMessage { private AlertTypeEnum alertType; private AlertEventType eventType; private LocalDateTime eventAt; + private String updatedBy; private AlertedDataEntity dataEntity; private List downstream; - public record AlertedDataEntity(long id, String name, Set owners) { + public record AlertedDataEntity(long id, + String name, + @Nullable String dataSourceName, + @Nullable String namespaceName, + DataEntityTypeDto type, + Set owners) { } public enum AlertEventType { diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java index 993bd122f..2552dc08d 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java @@ -22,7 +22,7 @@ public void process(final DecodedWALMessage message) throws InterruptedException final AlertNotificationMessage notificationMessage = messageTranslator.translate(message); for (final NotificationSender notificationSender : notificationSenders) { - log.debug("Sending notification message via {}: {}", notificationSender.receiverId(), message); + log.debug("Sending notification message via {}: {}", notificationSender.receiverId(), notificationMessage); try { notificationSender.send(notificationMessage); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/MarkdownUtils.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/MrkdwnUtils.java similarity index 64% rename from odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/MarkdownUtils.java rename to odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/MrkdwnUtils.java index 5ed75455a..a30e84271 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/MarkdownUtils.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/MrkdwnUtils.java @@ -1,17 +1,13 @@ package org.opendatadiscovery.oddplatform.notification.processor.message; -public class MarkdownUtils { - private MarkdownUtils() { - } +import lombok.experimental.UtilityClass; +@UtilityClass +public class MrkdwnUtils { public static String bold(final String text) { return String.format("*%s*", text); } - public static String italic(final String text) { - return String.format("_%s_", text); - } - public static String buildLink(final String text, final String url) { return String.format("<%s|%s>", url, text); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackEmojiUtils.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackEmojiUtils.java index 544cb7722..d9c9d871d 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackEmojiUtils.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackEmojiUtils.java @@ -11,12 +11,4 @@ public static String exclamationEmoji(final String text) { public static String resolvedEmoji(final String text) { return String.format(":white_check_mark: %s", text); } - - public static String userEmoji(final String text) { - return String.format(":bust_in_silhouette: %s", text); - } - - public static String linkEmoji(final String text) { - return String.format(":link: %s", text); - } } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java index 78b078342..cd1146d5a 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/message/SlackNotificationMessageGenerator.java @@ -1,84 +1,143 @@ package org.opendatadiscovery.oddplatform.notification.processor.message; import com.slack.api.model.block.LayoutBlock; +import com.slack.api.model.block.SectionBlock; +import com.slack.api.model.block.composition.BlockCompositions; import java.net.URL; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; +import org.opendatadiscovery.oddplatform.notification.dto.OwnershipPair; +import static com.slack.api.model.block.Blocks.context; import static com.slack.api.model.block.Blocks.divider; +import static com.slack.api.model.block.Blocks.header; import static com.slack.api.model.block.Blocks.section; import static com.slack.api.model.block.composition.BlockCompositions.markdownText; +import static com.slack.api.model.block.composition.BlockCompositions.plainText; +import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.joining; import static org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage.AlertEventType; import static org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage.AlertedDataEntity; -import static org.opendatadiscovery.oddplatform.notification.processor.message.MarkdownUtils.bold; -import static org.opendatadiscovery.oddplatform.notification.processor.message.MarkdownUtils.buildLink; -import static org.opendatadiscovery.oddplatform.notification.processor.message.MarkdownUtils.italic; -import static org.opendatadiscovery.oddplatform.notification.processor.message.SlackEmojiUtils.exclamationEmoji; -import static org.opendatadiscovery.oddplatform.notification.processor.message.SlackEmojiUtils.linkEmoji; -import static org.opendatadiscovery.oddplatform.notification.processor.message.SlackEmojiUtils.resolvedEmoji; -import static org.opendatadiscovery.oddplatform.notification.processor.message.SlackEmojiUtils.userEmoji; +import static org.opendatadiscovery.oddplatform.notification.processor.message.MrkdwnUtils.bold; +import static org.opendatadiscovery.oddplatform.notification.processor.message.MrkdwnUtils.buildLink; @RequiredArgsConstructor public class SlackNotificationMessageGenerator { - private static final DateTimeFormatter MESSAGE_DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private final URL platformBaseUrl; public List generate(final AlertNotificationMessage message) { final AlertedDataEntity dataEntity = message.getDataEntity(); - final String eventAt = message.getEventAt().format(MESSAGE_DATETIME_FORMAT); final List blocks = new ArrayList<>(); - blocks.add(section(c -> c.text(markdownText( - buildLink(bold(dataEntity.name()), buildDataEntityUrl(dataEntity.id())) + "\n" + italic(eventAt))))); + blocks.add(AlertEventType.RESOLVED.equals(message.getEventType()) + ? header(c -> c.text(plainText(":white_check_mark: Alert: " + message.getAlertType().getDescription()))) + : header(c -> c.text(plainText(":exclamation: Alert: " + message.getAlertType().getDescription())))); + + blocks.add(divider()); + blocks.add(section(c -> c.text( + markdownText(buildDataEntityLink(dataEntity) + "\n" + message.getAlertDescription())))); + + if (message.getUpdatedBy() != null) { + final String updatedByText = switch (message.getEventType()) { + case REOPENED -> "Reopened by"; + case RESOLVED -> "Resolved by"; + default -> throw new IllegalArgumentException( + "Not supported type for updated_by text building: %s".formatted(message.getEventType())); + }; + + blocks.add(section(c -> c.text(markdownText( + String.format("%s @%s", updatedByText, message.getUpdatedBy()))))); + } + + resolveInformationalContextSection(dataEntity).ifPresent(blocks::add); + resolveOwnerContextSection(dataEntity).ifPresent(blocks::add); + resolveDownstreamSections(message.getDownstream()).ifPresent(blocks::addAll); + + return blocks; + } + + private Optional resolveInformationalContextSection(final AlertedDataEntity dataEntity) { + if (dataEntity.namespaceName() == null && dataEntity.dataSourceName() == null) { + return Optional.empty(); + } + + final StringBuilder sb = new StringBuilder(); + if (dataEntity.dataSourceName() != null) { + sb.append("Data Source: "); + sb.append(dataEntity.dataSourceName()); + } + + if (dataEntity.namespaceName() != null) { + sb.append(", "); + sb.append("Namespace: "); + sb.append(dataEntity.namespaceName()); + } + + return Optional.of( + context(c -> c.elements(List.of(BlockCompositions.markdownText(cc -> cc.text(sb.toString())))))); + } + + private Optional resolveOwnerContextSection(final AlertedDataEntity dataEntity) { + if (dataEntity.owners().isEmpty()) { + return Optional.empty(); + } + + return Optional.of( + context(c -> c.elements(List.of(BlockCompositions.markdownText(extractOwners(dataEntity.owners()))))) + ); + } - if (AlertEventType.RESOLVED.equals(message.getEventType())) { - blocks.add(section(c -> c.text(markdownText(bold("This alert has been resolved"))))); - blocks.add(section(c -> c.text(markdownText(resolvedEmoji(constructAlertBody(message)))))); - } else { - blocks.add(section(c -> c.text(markdownText(exclamationEmoji(constructAlertBody(message)))))); + private Optional> resolveDownstreamSections(final List dataEntities) { + if (dataEntities.isEmpty()) { + return Optional.empty(); } - blocks.add(section(c -> c.text(markdownText(userEmoji(extractOwners(dataEntity)))))); + final ArrayList downstreamSectionBlocks = new ArrayList<>(); - if (!message.getDownstream().isEmpty()) { - final String downstreamMarkdownText = - message.getDownstream().stream().map(this::generateMarkdownTextForDataEntity).collect(joining("\n")); + downstreamSectionBlocks.add(divider()); + downstreamSectionBlocks.add(section(c -> c.text(markdownText(bold("Affected data entities"))))); - blocks.addAll(List.of( - divider(), - section(c -> c.text(markdownText(linkEmoji(bold("Affected data entities:"))))), - section(c -> c.text(markdownText(downstreamMarkdownText))) + final SectionBlock downstreamDataEntitiesSection = dataEntities.stream() + .map(this::buildDataEntityLink) + .collect(collectingAndThen( + joining(", "), + downstreamEntities -> section(c -> c.text(markdownText(downstreamEntities))) )); + + downstreamSectionBlocks.add(downstreamDataEntitiesSection); + + final Set owners = dataEntities.stream() + .flatMap(de -> de.owners().stream()) + .collect(Collectors.toSet()); + + if (!owners.isEmpty()) { + downstreamSectionBlocks.add( + context(c -> c.elements(List.of(markdownText(cc -> cc.text(extractOwners(owners))))))); } - return blocks; + return Optional.of(downstreamSectionBlocks); } - private String constructAlertBody(final AlertNotificationMessage message) { - return bold(message.getAlertType().getDescription()) + "\n" + message.getAlertDescription(); + private String buildDataEntityLink(final AlertedDataEntity dataEntity) { + return buildLink( + String.format("%s %s", dataEntity.type().resolveName(), bold(dataEntity.name())), + buildDataEntityUrl(dataEntity.id()) + ); } - private String extractOwners(final AlertedDataEntity dataEntity) { - return dataEntity.owners() + private String extractOwners(final Set ownershipPairs) { + return ownershipPairs .stream() .map(owner -> String.format("@%s (%s)", owner.ownerName(), owner.roleName())) .collect(joining(", ")); } - private String generateMarkdownTextForDataEntity(final AlertedDataEntity dataEntity) { - final String text = buildLink(dataEntity.name(), buildDataEntityUrl(dataEntity.id())); - - return dataEntity.owners().isEmpty() - ? text - : String.format("%s: %s", text, extractOwners(dataEntity)); - } - private String buildDataEntityUrl(final long id) { return String.format("%s/dataentities/%d/overview", platformBaseUrl, id); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java index a486ac263..7a622e04c 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/translator/AlertNotificationMessageTranslator.java @@ -13,6 +13,7 @@ import org.jooq.Field; import org.jooq.Name; import org.jooq.Record; +import org.opendatadiscovery.oddplatform.dto.DataEntityTypeDto; import org.opendatadiscovery.oddplatform.dto.alert.AlertTypeEnum; import org.opendatadiscovery.oddplatform.model.tables.pojos.LineagePojo; import org.opendatadiscovery.oddplatform.notification.dto.AlertNotificationMessage; @@ -33,7 +34,9 @@ import static org.jooq.impl.DSL.val; import static org.opendatadiscovery.oddplatform.model.Tables.ALERT; import static org.opendatadiscovery.oddplatform.model.Tables.DATA_ENTITY; +import static org.opendatadiscovery.oddplatform.model.Tables.DATA_SOURCE; import static org.opendatadiscovery.oddplatform.model.Tables.LINEAGE; +import static org.opendatadiscovery.oddplatform.model.Tables.NAMESPACE; import static org.opendatadiscovery.oddplatform.model.Tables.OWNER; import static org.opendatadiscovery.oddplatform.model.Tables.OWNERSHIP; import static org.opendatadiscovery.oddplatform.model.Tables.ROLE; @@ -42,6 +45,8 @@ @RequiredArgsConstructor public class AlertNotificationMessageTranslator implements NotificationMessageTranslator { private static final String OWNERS_FIELD_ALIAS = "owners"; + private static final String NAMESPACE_NAME_FIELD_ALIAS = "namespace_name"; + private static final String DATA_SOURCE_NAME_FIELD_ALIAS = "data_source_name"; private final DSLContext dslContext; private final JooqRecordHelper jooqRecordHelper; @@ -52,6 +57,7 @@ public AlertNotificationMessage translate(final DecodedWALMessage message) { final String alertDescription = message.getColumnValue(ALERT.DESCRIPTION.getName()); final String dataEntityOddrn = message.getColumnValue(ALERT.DATA_ENTITY_ODDRN.getName()); final String status = message.getColumnValue(ALERT.STATUS.getName()); + final String updatedBy = message.getColumnValue(ALERT.STATUS_UPDATED_BY.getName()); final AlertEventType eventType = resolveAlertEventType(message.operation(), status); final String eventAtString = AlertEventType.CREATED.equals(eventType) @@ -65,6 +71,7 @@ public AlertNotificationMessage translate(final DecodedWALMessage message) { .eventAt(Timestamp.valueOf(eventAtString).toLocalDateTime()) .alertType(resolveAlertType(alertTypeString)) .eventType(resolveAlertEventType(message.operation(), status)) + .updatedBy(updatedBy) .dataEntity(fetchAlertedDataEntity(dataEntityOddrn)) .downstream(fetchDownstream(dataEntityOddrn)) .build(); @@ -93,8 +100,15 @@ private AlertedDataEntity fetchAlertedDataEntity(final String dataEntityOddrn) { } private List fetchAlertedDataEntities(final Collection oddrns) { + final List> fields = List.of( + DATA_ENTITY.ID, DATA_ENTITY.INTERNAL_NAME, DATA_ENTITY.EXTERNAL_NAME, DATA_ENTITY.TYPE_ID, + DATA_SOURCE.NAME.as(DATA_SOURCE_NAME_FIELD_ALIAS), + NAMESPACE.NAME.as(NAMESPACE_NAME_FIELD_ALIAS) + ); + + // @formatter:off final List records = dslContext - .select(DATA_ENTITY.ID, DATA_ENTITY.INTERNAL_NAME, DATA_ENTITY.EXTERNAL_NAME) + .select(fields) .select(jsonArrayAgg(jsonObject( jsonEntry("owner_name", OWNER.NAME), jsonEntry("role_name", ROLE.NAME)) @@ -103,10 +117,16 @@ private List fetchAlertedDataEntities(final Collection owners = jooqRecordHelper .extractAggRelation(record, OWNERS_FIELD_ALIAS, new TypeReference() { }) @@ -170,6 +193,21 @@ private AlertedDataEntity mapAlertedEntityRecord(final Record record) { .filter(o -> o.ownerName() != null && o.roleName() != null) .collect(toSet()); - return new AlertedDataEntity(record.get(DATA_ENTITY.ID), name, owners); + final Integer typeId = record.get(DATA_ENTITY.TYPE_ID, Integer.class); + if (typeId == null) { + throw new IllegalStateException("Query returned null as a type id"); + } + + final DataEntityTypeDto entityType = DataEntityTypeDto.findById(typeId) + .orElseThrow(() -> new IllegalArgumentException("Query returned unknown type id: %d".formatted(typeId))); + + return new AlertedDataEntity( + record.get(DATA_ENTITY.ID), + name, + dataSourceName, + namespaceName, + entityType, + owners + ); } } From 0c843eb150fdf565673ce25156f40c98c47c5868 Mon Sep 17 00:00:00 2001 From: Nikita Dementev Date: Wed, 20 Jul 2022 21:54:32 +0300 Subject: [PATCH 9/9] Fix configuration --- .../notification/NotificationSubscriberStarter.java | 12 +++++------- .../processor/AlertNotificationMessageProcessor.java | 2 ++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java index 6a5f04891..62fdd85a4 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java @@ -7,11 +7,13 @@ import org.opendatadiscovery.oddplatform.notification.config.NotificationsProperties; import org.opendatadiscovery.oddplatform.notification.processor.AlertNotificationMessageProcessor; import org.opendatadiscovery.oddplatform.notification.wal.PostgresWALMessageDecoder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @Component +@ConditionalOnProperty(value = "notifications.enabled", havingValue = "true") @RequiredArgsConstructor @Slf4j public class NotificationSubscriberStarter { @@ -26,12 +28,8 @@ public class NotificationSubscriberStarter { @EventListener(ApplicationReadyEvent.class) public void runLeaderElection() { - if (notificationsProperties.isEnabled()) { - log.debug("Notification subscription is enabled, starting WAL parser"); - executorService.submit(new NotificationSubscriber( - notificationsProperties.getWal(), pgConnectionFactory, messageDecoder, messageProcessor)); - } - - log.debug("Notification subscription is disabled"); + log.debug("Notification subscription is enabled, starting WAL parser"); + executorService.submit(new NotificationSubscriber( + notificationsProperties.getWal(), pgConnectionFactory, messageDecoder, messageProcessor)); } } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java index 2552dc08d..30c002dd5 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java @@ -8,9 +8,11 @@ import org.opendatadiscovery.oddplatform.notification.exception.NotificationSenderException; import org.opendatadiscovery.oddplatform.notification.sender.NotificationSender; import org.opendatadiscovery.oddplatform.notification.translator.NotificationMessageTranslator; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @Component +@ConditionalOnProperty(value = "notifications.enabled", havingValue = "true") @RequiredArgsConstructor @Slf4j public class AlertNotificationMessageProcessor implements PostgresWALMessageProcessor {