diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 1834be32c..7c7b7d5e0 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,26 +1,5 @@ - -- [ ] **Breaking change?** (if so, please describe the impact and migration path for existing applications:) - **What changes did you make?** (Give an overview) **Is there anything you'd like reviewers to focus on?** - -**How Has This Been Tested?** (put an "X" next to an item) - -- [ ] No need to -- [ ] Manually(please, describe, when necessary) -- [ ] Unit checks -- [ ] Integration checks -- [ ] Covered by existing automation - - -**Checklist** (put an "X" next to an item, otherwise PR will fail) -- [ ] I have performed a self-review of my own code -- [ ] I have commented my code, particularly in hard-to-understand areas -- [ ] I have made corresponding changes to the documentation (e.g. **ENVIRONMENT VARIABLES**) -- [ ] I have added tests that prove my fix is effective or that my feature works -- [ ] New and existing unit tests pass locally with my changes -- [ ] Any dependent changes have been merged - -Check out [Contributing](https://github.com/opendatadiscovery/odd-platform/blob/main/CONTRIBUTING.md) and [Code of Conduct](https://github.com/opendatadiscovery/odd-platform/blob/main/CODE_OF_CONDUCT.md) \ No newline at end of file +**How to test** \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4ce2134b1..b0b740bce 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -11,7 +11,7 @@ r2dbc-pool = '0.8.8.RELEASE' jooq = '3.16.6' jooq-codegen = '3.14.9' flyway-core = '8.5.10' -psql-driver = '42.3.4' +psql-driver = '42.4.0' javax-validation = '2.0.1.Final' javax-annotation = '1.3.2' jackson-annotations = '2.13.2' diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/ODDPlatformConfiguration.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/ODDPlatformConfiguration.java index 489f3de8a..676d966fa 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/ODDPlatformConfiguration.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/ODDPlatformConfiguration.java @@ -1,6 +1,7 @@ package org.opendatadiscovery.oddplatform.config; import org.opendatadiscovery.oddplatform.config.properties.MetricExporterProperties; +import org.opendatadiscovery.oddplatform.notification.NotificationsProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity; @@ -9,6 +10,6 @@ @Configuration @EnableTransactionManagement @EnableWebFluxSecurity -@EnableConfigurationProperties({MetricExporterProperties.class}) +@EnableConfigurationProperties({MetricExporterProperties.class, NotificationsProperties.class}) public class ODDPlatformConfiguration { } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/R2DBCConfiguration.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/R2DBCConfiguration.java index 93bd4245f..8fc400f77 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/R2DBCConfiguration.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/config/R2DBCConfiguration.java @@ -11,7 +11,6 @@ @Configuration public class R2DBCConfiguration { - @Bean public ConnectionFactory connectionFactory(final DataSourceProperties dataSourceProperties) { final String r2dbcUrl = dataSourceProperties.getUrl().replace("jdbc", "r2dbc"); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java new file mode 100644 index 000000000..12eb5b5af --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriber.java @@ -0,0 +1,150 @@ +package org.opendatadiscovery.oddplatform.notification; + +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jooq.Table; +import org.opendatadiscovery.oddplatform.model.Tables; +import org.opendatadiscovery.oddplatform.notification.NotificationsProperties.WalProperties; +import org.opendatadiscovery.oddplatform.notification.processor.PostgresWALMessageProcessor; +import org.opendatadiscovery.oddplatform.notification.wal.PostgresWALMessageDecoder; +import org.postgresql.PGConnection; +import org.postgresql.replication.PGReplicationStream; +import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; + +@RequiredArgsConstructor +@Slf4j +public class NotificationSubscriber extends Thread { + private static final String PG_REPLICATION_OUTPUT_PLUGIN = "pgoutput"; + + private final WalProperties walProperties; + + private final PGConnectionFactory connectionFactory; + private final PostgresWALMessageDecoder messageDecoder; + private final PostgresWALMessageProcessor messageProcessor; + + @Override + public void run() { + final Properties replicationSlotOptions = new Properties(); + replicationSlotOptions.putAll(Map.of( + "proto_version", "1", + "publication_names", walProperties.getPublicationName() + )); + + while (true) { + final Connection replicationConnection = connectionFactory.getConnection(true); + + try { + replicationConnection.createStatement() + .execute("SELECT pg_advisory_lock(%d)".formatted(walProperties.getAdvisoryLockId())); + + log.debug("Acquired advisory lock on id = {}", walProperties.getAdvisoryLockId()); + + final PGConnection pgReplicationConnection = replicationConnection.unwrap(PGConnection.class); + + registerReplicationSlot(replicationConnection, pgReplicationConnection); + registerPublication(replicationConnection, Tables.ALERT); + + final ChainedLogicalStreamBuilder streamBuilder = + pgReplicationConnection.getReplicationAPI() + .replicationStream() + .logical() + .withSlotName(walProperties.getReplicationSlotName()) + .withSlotOptions(replicationSlotOptions); + + try (final PGReplicationStream stream = streamBuilder.start()) { + while (true) { + final ByteBuffer buffer = stream.readPending(); + + if (buffer == null) { + TimeUnit.MILLISECONDS.sleep(10L); + continue; + } + + log.debug("processing LSN: {}", stream.getLastReceiveLSN()); + + messageDecoder.decode(buffer) + .ifPresent(messageProcessor::process); + + stream.setAppliedLSN(stream.getLastReceiveLSN()); + stream.setFlushedLSN(stream.getLastReceiveLSN()); + } + } + } catch (final Exception e) { + log.error("Error occurred while subscribing: {}", e); + } finally { + try { + replicationConnection.close(); + } catch (final SQLException e) { + log.error("Error while trying to close JDBC replication connection", e); + } + } + + log.debug("Released a lock, waiting 10 seconds for next iteration"); + try { + TimeUnit.SECONDS.sleep(10L); + } catch (final InterruptedException e) { + log.error("Error while sleeping", e); + throw new RuntimeException(e); + } + } + } + + private void registerReplicationSlot(final Connection connection, + final PGConnection replicationConnection) throws SQLException { + final String existsQuery = "SELECT EXISTS (SELECT slot_name FROM pg_replication_slots WHERE slot_name = ?)"; + + final PreparedStatement statement = connection.prepareStatement(existsQuery); + statement.setString(1, walProperties.getReplicationSlotName()); + + try (final ResultSet resultSet = statement.executeQuery()) { + resultSet.next(); + if (!resultSet.getBoolean(1)) { + log.debug("Creating replication slot with name {}", walProperties.getReplicationSlotName()); + replicationConnection.getReplicationAPI() + .createReplicationSlot() + .logical() + .withSlotName(walProperties.getReplicationSlotName()) + .withOutputPlugin(PG_REPLICATION_OUTPUT_PLUGIN) + .make(); + } + } + + log.debug("Replication slot {} registed", walProperties.getReplicationSlotName()); + } + + private void registerPublication(final Connection connection, final Table targetTable) throws SQLException { + if (targetTable == null) { + throw new IllegalArgumentException("targetTable cannot be null"); + } + + final String existsQuery = "SELECT EXISTS (SELECT oid FROM pg_publication WHERE pubname = ?)"; + + final PreparedStatement existsStatement = connection.prepareStatement(existsQuery); + existsStatement.setString(1, walProperties.getPublicationName()); + + try (final ResultSet resultSet = existsStatement.executeQuery()) { + resultSet.next(); + if (!resultSet.getBoolean(1)) { + // PostgreSQL tables are always in a schema, so we don't need to check targetTable.getSchema() for null + final String tableName = + String.format("%s.%s", targetTable.getSchema().getName(), targetTable.getName()); + + log.debug("Creating publication with name {} for table {}", + walProperties.getPublicationName(), tableName); + + connection.createStatement().execute( + "CREATE PUBLICATION %s FOR TABLE %s".formatted(walProperties.getPublicationName(), tableName)); + } + } + + log.debug("Publication {} registered", walProperties.getPublicationName()); + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java new file mode 100644 index 000000000..712a51c1b --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationSubscriberStarter.java @@ -0,0 +1,34 @@ +package org.opendatadiscovery.oddplatform.notification; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opendatadiscovery.oddplatform.notification.processor.AlertNotificationMessageProcessor; +import org.opendatadiscovery.oddplatform.notification.wal.PostgresWALMessageDecoder; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +@Slf4j +public class NotificationSubscriberStarter { + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + + private final PGConnectionFactory pgConnectionFactory; + private final PostgresWALMessageDecoder messageDecoder; + private final NotificationsProperties notificationsProperties; + private final AlertNotificationMessageProcessor messageProcessor; + + @EventListener(ApplicationReadyEvent.class) + public void runLeaderElection() { + if (notificationsProperties.isEnabled()) { + log.debug("Notification subscription is enabled, starting WAL parser"); + executorService.submit(new NotificationSubscriber( + notificationsProperties.getWal(), pgConnectionFactory, messageDecoder, messageProcessor)); + } + + log.debug("Notification subscription is disabled"); + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationsProperties.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationsProperties.java new file mode 100644 index 000000000..1c6a68ed2 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/NotificationsProperties.java @@ -0,0 +1,19 @@ +package org.opendatadiscovery.oddplatform.notification; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("notifications") +@Data +public class NotificationsProperties { + private boolean enabled; + private String webhookUrl; + private WalProperties wal; + + @Data + public static class WalProperties { + private int advisoryLockId; + private String replicationSlotName; + private String publicationName; + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/PGConnectionFactory.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/PGConnectionFactory.java new file mode 100644 index 000000000..6f8b39035 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/PGConnectionFactory.java @@ -0,0 +1,43 @@ +package org.opendatadiscovery.oddplatform.notification; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; +import lombok.RequiredArgsConstructor; +import org.postgresql.PGProperty; +import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; +import org.springframework.jdbc.CannotGetJdbcConnectionException; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class PGConnectionFactory { + private final DataSourceProperties dataSourceProperties; + + public Connection getConnection() { + return getConnection(false); + } + + public Connection getConnection(final boolean replicationMode) { + final String url = dataSourceProperties.getUrl(); + + final Properties props = new Properties(); + + PGProperty.USER.set(props, dataSourceProperties.getUsername()); + PGProperty.PASSWORD.set(props, dataSourceProperties.getPassword()); + if (replicationMode) { + PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "13.2"); + PGProperty.REPLICATION.set(props, "database"); + PGProperty.PREFER_QUERY_MODE.set(props, "simple"); + } + + try { + return DriverManager.getConnection(url, props); + } catch (final SQLException ex) { + throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex); + } catch (final IllegalStateException ex) { + throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage()); + } + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java new file mode 100644 index 000000000..d9354a251 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertNotificationMessageProcessor.java @@ -0,0 +1,20 @@ +package org.opendatadiscovery.oddplatform.notification.processor; + +import lombok.RequiredArgsConstructor; +import org.opendatadiscovery.oddplatform.notification.NotificationsProperties; +import org.opendatadiscovery.oddplatform.notification.processor.webhook.WebhookSender; +import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class AlertNotificationMessageProcessor implements PostgresWALMessageProcessor { + private final WebhookSender webhookSender; + private final NotificationsProperties notificationsProperties; + private final NotificationMessageBuilder messageBuilder; + + @Override + public void process(final DecodedWALMessage message) { + webhookSender.send(notificationsProperties.getWebhookUrl(), messageBuilder.build(message)); + } +} \ No newline at end of file diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertSlackNotificationMessageBuilder.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertSlackNotificationMessageBuilder.java new file mode 100644 index 000000000..b34d8cdb0 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/AlertSlackNotificationMessageBuilder.java @@ -0,0 +1,99 @@ +package org.opendatadiscovery.oddplatform.notification.processor; + +import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage; +import org.springframework.stereotype.Component; + +import static org.opendatadiscovery.oddplatform.model.Tables.ALERT; + +@Component +public class AlertSlackNotificationMessageBuilder implements NotificationMessageBuilder { + private static final String INSERT_TEMPLATE = """ + :warning: + *Warning!* Alert has just been created! + + *Alert type*: + %s + + *Description*: + %s + + *Data Entity ODDRN*: + %s + + *Alert ID*: + %d + + """; + + private static final String UPDATE_TEMPLATE_WITH_UPDATED_BY = """ + %s + Alert status has just been updated + + *Alert type*: + %s + + *Description*: + %s + + *Data Entity ODDRN*: + %s + + *New status*: + %s + + *Updated By*: + %s + + *Alert ID*: + %d + + """; + + private static final String UPDATE_TEMPLATE_WITHOUT_UPDATED_BY = """ + %s + Alert status has just been updated + + *Alert type*: + %s + + *Description*: + %s + + *Data Entity ODDRN*: + %s + + *New status*: + %s + + *Alert ID*: + %d + + """; + + @Override + public String build(final DecodedWALMessage message) { + final long alertId = Long.parseLong(message.columns().get(ALERT.ID.getName()).valueAsString()); + final String dataEntityOddrn = message.columns().get(ALERT.DATA_ENTITY_ODDRN.getName()).valueAsString(); + final String alertDescription = message.columns().get(ALERT.DESCRIPTION.getName()).valueAsString(); + final String alertType = message.columns().get(ALERT.TYPE.getName()).valueAsString(); + + switch (message.operation()) { + case INSERT: + return INSERT_TEMPLATE.formatted(alertType, alertDescription, dataEntityOddrn, alertId); + case UPDATE: + final String updatedBy = message.columns().get(ALERT.STATUS_UPDATED_BY.getName()).valueAsString(); + final String status = message.columns().get(ALERT.STATUS.getName()).valueAsString(); + + final String emoji = "RESOLVED".equals(status) ? ":white_check_mark:" : ":warning:"; + + return updatedBy != null + ? UPDATE_TEMPLATE_WITH_UPDATED_BY.formatted(emoji, alertType, alertDescription, + dataEntityOddrn, status, updatedBy, alertId) + : UPDATE_TEMPLATE_WITHOUT_UPDATED_BY.formatted(emoji, alertType, alertDescription, dataEntityOddrn, + status, alertId); + default: + throw new IllegalArgumentException( + "Unsupported operation for building an alert message: %s".formatted(message.operation())); + } + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/NotificationMessageBuilder.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/NotificationMessageBuilder.java new file mode 100644 index 000000000..3a6336d7f --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/NotificationMessageBuilder.java @@ -0,0 +1,7 @@ +package org.opendatadiscovery.oddplatform.notification.processor; + +import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage; + +public interface NotificationMessageBuilder { + String build(final DecodedWALMessage message); +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java new file mode 100644 index 000000000..02f8346b3 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/PostgresWALMessageProcessor.java @@ -0,0 +1,7 @@ +package org.opendatadiscovery.oddplatform.notification.processor; + +import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage; + +public interface PostgresWALMessageProcessor { + void process(final DecodedWALMessage message); +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/SlackWebhookSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/SlackWebhookSender.java new file mode 100644 index 000000000..595bb018e --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/SlackWebhookSender.java @@ -0,0 +1,33 @@ +package org.opendatadiscovery.oddplatform.notification.processor.webhook; + +import java.net.URI; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.opendatadiscovery.oddplatform.utils.JSONSerDeUtils; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; + +@Component +@Slf4j +public class SlackWebhookSender implements WebhookSender { + private final WebClient webClient = WebClient.create(); + + @Override + public void send(final String webhookUrl, final String message) { + webClient.post() + .uri(URI.create(webhookUrl)) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(new SlackMessage(message).toJsonString())) + .retrieve() + .bodyToMono(String.class) + .block(); + } + + private record SlackMessage(String message) { + public String toJsonString() { + return JSONSerDeUtils.serializeJson(Map.of("text", message)); + } + } +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/WebhookSender.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/WebhookSender.java new file mode 100644 index 000000000..31009c0ca --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/processor/webhook/WebhookSender.java @@ -0,0 +1,5 @@ +package org.opendatadiscovery.oddplatform.notification.processor.webhook; + +public interface WebhookSender { + void send(final String webhookUrl, final String message); +} diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/DecodedWALMessage.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/DecodedWALMessage.java new file mode 100644 index 000000000..45d37efb9 --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/DecodedWALMessage.java @@ -0,0 +1,13 @@ +package org.opendatadiscovery.oddplatform.notification.wal; + +import java.util.Map; + +public record DecodedWALMessage(int relationId, Operation operation, Map columns) { + public enum Operation { + INSERT, + UPDATE, + } + + public record Column(String name, String type, String valueAsString) { + } +} \ No newline at end of file diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/PostgresWALMessageDecoder.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/PostgresWALMessageDecoder.java new file mode 100644 index 000000000..fb3b7f62a --- /dev/null +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/notification/wal/PostgresWALMessageDecoder.java @@ -0,0 +1,235 @@ +package org.opendatadiscovery.oddplatform.notification.wal; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opendatadiscovery.oddplatform.notification.PGConnectionFactory; +import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage.Column; +import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage.Operation; +import org.springframework.stereotype.Component; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toMap; + +@Component +@RequiredArgsConstructor +@Slf4j +public class PostgresWALMessageDecoder { + private static final int COLUMN_NAME_ID = 4; + private static final int COLUMN_TYPE_ID = 6; + + // Value list should be an ordered one to preserve column order coming from RELATION messages + private final Map> tableColumns = new HashMap<>(); + + private final PGConnectionFactory pgConnectionFactory; + + // Returns DecodedWALMessage in case of INSERT and UPDATE messages, otherwise returns empty Optional + public Optional decode(final ByteBuffer buffer) { + final MessageType messageType = MessageType.forType((char) buffer.get()); + + log.debug("Received message type {}", messageType); + + switch (messageType) { + case RELATION: + handleRelationMessage(buffer); + return Optional.empty(); + case INSERT: + return Optional.of(decodeInsertMessage(buffer)); + case UPDATE: + return Optional.of(decodeUpdateMessage(buffer)); + default: + return Optional.empty(); + } + } + + private void handleRelationMessage(final ByteBuffer buffer) { + try (final Connection metadataConnection = pgConnectionFactory.getConnection()) { + final int relationId = buffer.getInt(); + final String schemaName = readString(buffer); + final String tableName = readString(buffer); + // skipping replica identity id for redundancy + buffer.get(); + final short columnCount = buffer.getShort(); + + log.debug("Event: {}, RelationId: {}, Columns: {}", MessageType.RELATION, relationId, columnCount); + log.debug("Schema: '{}', Table: '{}'", schemaName, tableName); + + final DatabaseMetaData databaseMetadata = metadataConnection.getMetaData(); + final List columns = new LinkedList<>(); + + try (final ResultSet rs = databaseMetadata.getColumns(null, schemaName, tableName, null)) { + while (rs.next()) { + final String name = rs.getString(COLUMN_NAME_ID); + final String type = rs.getString(COLUMN_TYPE_ID); + + columns.add(new ColumnMeta(name, type)); + } + } + tableColumns.put(relationId, columns); + } catch (final Exception e) { + log.error("Error occurred while handling RELATION message: {}", e.getMessage()); + throw new RuntimeException(e); + } + } + + private DecodedWALMessage decodeInsertMessage(final ByteBuffer buffer) { + final int relationId = buffer.getInt(); + + // Skipping tuple type char. + // Must be "N" for inserts + buffer.get(); + + final List columnMeta = tableColumns.get(relationId); + if (columnMeta == null) { + throw new RuntimeException("No column meta for relation ID %d".formatted(relationId)); + } + + return readTupleDataForColumns(buffer, columnMeta) + .stream() + .collect(collectingAndThen( + toMap(Column::name, identity()), + columns -> new DecodedWALMessage(relationId, Operation.INSERT, columns) + )); + } + + private DecodedWALMessage decodeUpdateMessage(final ByteBuffer buffer) { + final int relationId = buffer.getInt(); + + final char tupleType = (char) buffer.get(); + + final List columnMeta = tableColumns.get(relationId); + + if (columnMeta == null) { + throw new RuntimeException("No column meta for relation ID %d".formatted(relationId)); + } + + // K = Identifies the following TupleData submessage as a key + // O = Identifies the following TupleData submessage as an old tuple + // Skipping as we don't need old tuple data at the moment + if ('O' == tupleType || 'K' == tupleType) { + skipColumnTupleData(buffer); + + // Skipping the 'N' tuple type + buffer.get(); + } + + return readTupleDataForColumns(buffer, columnMeta) + .stream() + .collect(collectingAndThen( + toMap(Column::name, identity()), + columns -> new DecodedWALMessage(relationId, Operation.UPDATE, columns) + )); + } + + private List readTupleDataForColumns(final ByteBuffer buffer, + final List columnMetaList) { + final List columns = new ArrayList<>(); + + final short numberOfColumns = buffer.getShort(); + + for (short i = 0; i < numberOfColumns; ++i) { + final TupleDataSubMessageType tupleDataSubMessageType = + TupleDataSubMessageType.forType((char) buffer.get()); + + final ColumnMeta columnMeta = columnMetaList.get(i); + + switch (tupleDataSubMessageType) { + case TEXT -> + columns.add(new Column(columnMeta.name(), columnMeta.type(), readColumnValueAsString(buffer))); + case NULL -> columns.add(new Column(columnMeta.name(), columnMeta.type(), null)); + case UNCHANGED -> log.warn("Column: {}, Value: UNCHANGED", columnMeta.name()); + default -> throw new IllegalArgumentException( + "Unknown tuple data sub message type: %s".formatted(tupleDataSubMessageType)); + } + } + + return columns; + } + + private void skipColumnTupleData(final ByteBuffer buffer) { + final short numberOfColumns = buffer.getShort(); + + for (short i = 0; i < numberOfColumns; ++i) { + final TupleDataSubMessageType tupleDataSubMessageType = + TupleDataSubMessageType.forType((char) buffer.get()); + + if (tupleDataSubMessageType == TupleDataSubMessageType.TEXT) { + readColumnValueAsString(buffer); + } + } + } + + public enum MessageType { + RELATION, + BEGIN, + COMMIT, + INSERT, + UPDATE, + DELETE, + TYPE, + ORIGIN, + TRUNCATE, + LOGICAL_DECODING_MESSAGE; + + public static MessageType forType(final char type) { + return switch (type) { + case 'R' -> RELATION; + case 'B' -> BEGIN; + case 'C' -> COMMIT; + case 'I' -> INSERT; + case 'U' -> UPDATE; + case 'D' -> DELETE; + case 'Y' -> TYPE; + case 'O' -> ORIGIN; + case 'T' -> TRUNCATE; + case 'M' -> LOGICAL_DECODING_MESSAGE; + default -> throw new IllegalArgumentException("Unsupported message type: " + type); + }; + } + } + + public enum TupleDataSubMessageType { + TEXT, + UNCHANGED, + NULL; + + public static TupleDataSubMessageType forType(final char type) { + return switch (type) { + case 't' -> TEXT; + case 'u' -> UNCHANGED; + case 'n' -> NULL; + default -> throw new IllegalArgumentException("Unsupported sub-message type: " + type); + }; + } + } + + private static String readString(final ByteBuffer buffer) { + final StringBuilder sb = new StringBuilder(); + byte b; + while ((b = buffer.get()) != 0) { + sb.append((char) b); + } + return sb.toString(); + } + + private static String readColumnValueAsString(final ByteBuffer buffer) { + final int length = buffer.getInt(); + final byte[] value = new byte[length]; + buffer.get(value, 0, length); + return new String(value, StandardCharsets.UTF_8); + } + + record ColumnMeta(String name, String type) { + } +} diff --git a/odd-platform-api/src/main/resources/application.yml b/odd-platform-api/src/main/resources/application.yml index 1dc8f5b86..5bdcaf8b3 100644 --- a/odd-platform-api/src/main/resources/application.yml +++ b/odd-platform-api/src/main/resources/application.yml @@ -61,6 +61,14 @@ metrics: enabled: false otlp-endpoint: http://localhost:4317 +notifications: + enabled: false + webhook_url: + wal: + advisory-lock-id: 100 + replication-slot-name: odd_platform_replication_slot + publication-name: odd_platform_publication_alert + management: endpoints: enabled-by-default: false @@ -88,4 +96,4 @@ logging: org.jooq.tools.LoggerListener: info io.r2dbc.postgresql.QUERY: info io.r2dbc.postgresql.PARAM: info - + org.opendatadiscovery.oddplatform.notification: info \ No newline at end of file diff --git a/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/controller/AlertControllerTest.java b/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/controller/AlertControllerTest.java deleted file mode 100644 index 45ed4c8ef..000000000 --- a/odd-platform-api/src/test/java/org/opendatadiscovery/oddplatform/controller/AlertControllerTest.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.opendatadiscovery.oddplatform.controller; - -import org.junit.jupiter.api.BeforeEach; - -/** - * Test for the {@link AlertController}. - * - * @author matmalik on 10.12.2021 - */ -class AlertControllerTest { - - @BeforeEach - void setUp() { - } -} \ No newline at end of file