Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture alert information and send it via webhooks #806

Merged
merged 10 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 1 addition & 22 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,5 @@
<!-- ignore-task-list-start -->
- [ ] **Breaking change?** (if so, please describe the impact and migration path for existing applications:)
<!-- ignore-task-list-end -->
**What changes did you make?** (Give an overview)

**Is there anything you'd like reviewers to focus on?**


**How Has This Been Tested?** (put an "X" next to an item)
<!-- ignore-task-list-start -->
- [ ] No need to
- [ ] Manually(please, describe, when necessary)
- [ ] Unit checks
- [ ] Integration checks
- [ ] Covered by existing automation
<!-- ignore-task-list-end -->

**Checklist** (put an "X" next to an item, otherwise PR will fail)
- [ ] I have performed a self-review of my own code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation (e.g. **ENVIRONMENT VARIABLES**)
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] Any dependent changes have been merged

Check out [Contributing](https://github.com/opendatadiscovery/odd-platform/blob/main/CONTRIBUTING.md) and [Code of Conduct](https://github.com/opendatadiscovery/odd-platform/blob/main/CODE_OF_CONDUCT.md)
**How to test**
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ r2dbc-pool = '0.8.8.RELEASE'
jooq = '3.16.6'
jooq-codegen = '3.14.9'
flyway-core = '8.5.10'
psql-driver = '42.3.4'
psql-driver = '42.4.0'
javax-validation = '2.0.1.Final'
javax-annotation = '1.3.2'
jackson-annotations = '2.13.2'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opendatadiscovery.oddplatform.config;

import org.opendatadiscovery.oddplatform.config.properties.MetricExporterProperties;
import org.opendatadiscovery.oddplatform.notification.NotificationsProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
Expand All @@ -9,6 +10,6 @@
@Configuration
@EnableTransactionManagement
@EnableWebFluxSecurity
@EnableConfigurationProperties({MetricExporterProperties.class})
@EnableConfigurationProperties({MetricExporterProperties.class, NotificationsProperties.class})
public class ODDPlatformConfiguration {
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

@Configuration
public class R2DBCConfiguration {

@Bean
public ConnectionFactory connectionFactory(final DataSourceProperties dataSourceProperties) {
final String r2dbcUrl = dataSourceProperties.getUrl().replace("jdbc", "r2dbc");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package org.opendatadiscovery.oddplatform.notification;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.Table;
import org.opendatadiscovery.oddplatform.model.Tables;
import org.opendatadiscovery.oddplatform.notification.NotificationsProperties.WalProperties;
import org.opendatadiscovery.oddplatform.notification.processor.PostgresWALMessageProcessor;
import org.opendatadiscovery.oddplatform.notification.wal.PostgresWALMessageDecoder;
import org.postgresql.PGConnection;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;

@RequiredArgsConstructor
@Slf4j
public class NotificationSubscriber extends Thread {
private static final String PG_REPLICATION_OUTPUT_PLUGIN = "pgoutput";

private final WalProperties walProperties;

private final PGConnectionFactory connectionFactory;
private final PostgresWALMessageDecoder messageDecoder;
private final PostgresWALMessageProcessor messageProcessor;

@Override
public void run() {
final Properties replicationSlotOptions = new Properties();
replicationSlotOptions.putAll(Map.of(
"proto_version", "1",
"publication_names", walProperties.getPublicationName()
));

while (true) {
final Connection replicationConnection = connectionFactory.getConnection(true);

try {
replicationConnection.createStatement()
.execute("SELECT pg_advisory_lock(%d)".formatted(walProperties.getAdvisoryLockId()));

log.debug("Acquired advisory lock on id = {}", walProperties.getAdvisoryLockId());

final PGConnection pgReplicationConnection = replicationConnection.unwrap(PGConnection.class);

registerReplicationSlot(replicationConnection, pgReplicationConnection);
registerPublication(replicationConnection, Tables.ALERT);

final ChainedLogicalStreamBuilder streamBuilder =
pgReplicationConnection.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(walProperties.getReplicationSlotName())
.withSlotOptions(replicationSlotOptions);

try (final PGReplicationStream stream = streamBuilder.start()) {
while (true) {
final ByteBuffer buffer = stream.readPending();

if (buffer == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}

log.debug("processing LSN: {}", stream.getLastReceiveLSN());

messageDecoder.decode(buffer)
.ifPresent(messageProcessor::process);

stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
}
}
} catch (final Exception e) {
log.error("Error occurred while subscribing: {}", e);
} finally {
try {
replicationConnection.close();
} catch (final SQLException e) {
log.error("Error while trying to close JDBC replication connection", e);
}
}

log.debug("Released a lock, waiting 10 seconds for next iteration");
try {
TimeUnit.SECONDS.sleep(10L);
} catch (final InterruptedException e) {
log.error("Error while sleeping", e);
throw new RuntimeException(e);
}
}
}

private void registerReplicationSlot(final Connection connection,
final PGConnection replicationConnection) throws SQLException {
final String existsQuery = "SELECT EXISTS (SELECT slot_name FROM pg_replication_slots WHERE slot_name = ?)";

final PreparedStatement statement = connection.prepareStatement(existsQuery);
statement.setString(1, walProperties.getReplicationSlotName());

try (final ResultSet resultSet = statement.executeQuery()) {
resultSet.next();
if (!resultSet.getBoolean(1)) {
log.debug("Creating replication slot with name {}", walProperties.getReplicationSlotName());
replicationConnection.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(walProperties.getReplicationSlotName())
.withOutputPlugin(PG_REPLICATION_OUTPUT_PLUGIN)
.make();
}
}

log.debug("Replication slot {} registed", walProperties.getReplicationSlotName());
}

private void registerPublication(final Connection connection, final Table<?> targetTable) throws SQLException {
if (targetTable == null) {
throw new IllegalArgumentException("targetTable cannot be null");
}

final String existsQuery = "SELECT EXISTS (SELECT oid FROM pg_publication WHERE pubname = ?)";

final PreparedStatement existsStatement = connection.prepareStatement(existsQuery);
existsStatement.setString(1, walProperties.getPublicationName());

try (final ResultSet resultSet = existsStatement.executeQuery()) {
resultSet.next();
if (!resultSet.getBoolean(1)) {
// PostgreSQL tables are always in a schema, so we don't need to check targetTable.getSchema() for null
final String tableName =
String.format("%s.%s", targetTable.getSchema().getName(), targetTable.getName());

log.debug("Creating publication with name {} for table {}",
walProperties.getPublicationName(), tableName);

connection.createStatement().execute(
"CREATE PUBLICATION %s FOR TABLE %s".formatted(walProperties.getPublicationName(), tableName));
}
}

log.debug("Publication {} registered", walProperties.getPublicationName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.opendatadiscovery.oddplatform.notification;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opendatadiscovery.oddplatform.notification.processor.AlertNotificationMessageProcessor;
import org.opendatadiscovery.oddplatform.notification.wal.PostgresWALMessageDecoder;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationSubscriberStarter {
private final ExecutorService executorService = Executors.newSingleThreadExecutor();

private final PGConnectionFactory pgConnectionFactory;
private final PostgresWALMessageDecoder messageDecoder;
private final NotificationsProperties notificationsProperties;
private final AlertNotificationMessageProcessor messageProcessor;

@EventListener(ApplicationReadyEvent.class)
public void runLeaderElection() {
if (notificationsProperties.isEnabled()) {
log.debug("Notification subscription is enabled, starting WAL parser");
executorService.submit(new NotificationSubscriber(
notificationsProperties.getWal(), pgConnectionFactory, messageDecoder, messageProcessor));
}

log.debug("Notification subscription is disabled");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opendatadiscovery.oddplatform.notification;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("notifications")
@Data
public class NotificationsProperties {
private boolean enabled;
private String webhookUrl;
private WalProperties wal;

@Data
public static class WalProperties {
private int advisoryLockId;
private String replicationSlotName;
private String publicationName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.opendatadiscovery.oddplatform.notification;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import lombok.RequiredArgsConstructor;
import org.postgresql.PGProperty;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class PGConnectionFactory {
private final DataSourceProperties dataSourceProperties;

public Connection getConnection() {
return getConnection(false);
}

public Connection getConnection(final boolean replicationMode) {
final String url = dataSourceProperties.getUrl();

final Properties props = new Properties();

PGProperty.USER.set(props, dataSourceProperties.getUsername());
PGProperty.PASSWORD.set(props, dataSourceProperties.getPassword());
if (replicationMode) {
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "13.2");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");
}

try {
return DriverManager.getConnection(url, props);
} catch (final SQLException ex) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);
} catch (final IllegalStateException ex) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opendatadiscovery.oddplatform.notification.processor;

import lombok.RequiredArgsConstructor;
import org.opendatadiscovery.oddplatform.notification.NotificationsProperties;
import org.opendatadiscovery.oddplatform.notification.processor.webhook.WebhookSender;
import org.opendatadiscovery.oddplatform.notification.wal.DecodedWALMessage;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class AlertNotificationMessageProcessor implements PostgresWALMessageProcessor {
private final WebhookSender webhookSender;
private final NotificationsProperties notificationsProperties;
private final NotificationMessageBuilder messageBuilder;

@Override
public void process(final DecodedWALMessage message) {
webhookSender.send(notificationsProperties.getWebhookUrl(), messageBuilder.build(message));
}
}
Loading