Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Slack notification message and fix leakages #825

Merged
merged 9 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions docker/injector/inject.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import glob
import json
import os
import requests
import time
from typing import Union, Dict, Any, Tuple, List

import requests

REACH_TRIES_NUMBER = 20
APP_PATH = os.getenv("APP_PATH") or "."

platform_host_url = os.environ["PLATFORM_HOST_URL"]
print(f"Platform host url: {platform_host_url}")


def read_sample_json(json_filename: str) -> Tuple[str, Dict[str, Any]]:
with open(json_filename, "r") as f:
Expand Down Expand Up @@ -45,9 +47,6 @@ def inject_data(data: Dict[str, Any], token: str):
raise Exception(f"Couldn't inject data for {data['data_source_oddrn']}")


platform_host_url = os.environ["PLATFORM_HOST_URL"]
print(f"Platform host url: {platform_host_url}")

data_sources_grouped = {ds["oddrn"]: ds for ds in read_datasources_json()}

ingestion_samples_grouped = {
Expand Down
Empty file modified docker/injector/start.sh
100644 → 100755
Empty file.
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ springfox-core = '3.0.0'
mapstruct = '1.4.2.Final'
opentelemetry = '1.6.0'
opentelemetry-alpha = '1.6.0-alpha'
slack-api-model = '1.24.0'
lombok = '1.18.24'
testcontainers = '1.17.1'
slf4j-api = '1.7.30'
Expand Down Expand Up @@ -71,6 +72,7 @@ opentelemetry-api = { module = 'io.opentelemetry:opentelemetry-api', version.ref
opentelemetry-api-metrics = { module = 'io.opentelemetry:opentelemetry-api-metrics', version.ref = 'opentelemetry-alpha' }
opentelemetry-sdk-metrics = { module = 'io.opentelemetry:opentelemetry-sdk-metrics', version.ref = 'opentelemetry-alpha' }
opentelemetry-exporter-otlp-metrics = { module = 'io.opentelemetry:opentelemetry-exporter-otlp-metrics', version.ref = 'opentelemetry-alpha' }
slack-api-model = { module = 'com.slack.api:slack-api-model', version.ref = 'slack-api-model' }
lombok = { module = 'org.projectlombok:lombok', version.ref = 'lombok' }
slf4j-api = { module = 'org.slf4j:slf4j-api', version.ref = 'slf4j-api' }
jul-to-slf4j = { module = 'org.slf4j:jul-to-slf4j', version.ref = 'slf4j-api' }
Expand Down
1 change: 1 addition & 0 deletions odd-platform-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation libs.jackson.annotations
implementation libs.jetbrains.annotations
implementation libs.mapstruct
implementation libs.slack.api.model
compileOnly libs.lombok

annotationProcessor libs.lombok
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.opendatadiscovery.oddplatform.config;

import org.opendatadiscovery.oddplatform.config.properties.MetricExporterProperties;
import org.opendatadiscovery.oddplatform.notification.NotificationsProperties;
import org.opendatadiscovery.oddplatform.notification.config.NotificationsProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;

import static java.util.function.Function.identity;

Expand Down Expand Up @@ -38,6 +39,13 @@ public enum DataEntityTypeDto {
@Getter
private final int id;

public String resolveName() {
return Arrays.stream(name().split("_"))
.map(String::toLowerCase)
.map(StringUtils::capitalize)
.collect(Collectors.joining(" "));
}

public static Optional<DataEntityTypeDto> findById(final int id) {
return Optional.ofNullable(MAP.get(id));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,29 @@
package org.opendatadiscovery.oddplatform.dto.alert;

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import static java.util.function.Function.identity;

@RequiredArgsConstructor
public enum AlertTypeEnum {
DISTRIBUTION_ANOMALY,
BACKWARDS_INCOMPATIBLE_SCHEMA,
FAILED_DQ_TEST,
FAILED_JOB
DISTRIBUTION_ANOMALY("Distribution anomaly"),
BACKWARDS_INCOMPATIBLE_SCHEMA("Backwards incompatible schema"),
FAILED_DQ_TEST("Failed data quality test run"),
FAILED_JOB("Failed job run");

@Getter
private final String description;

private static final Map<String, AlertTypeEnum> DICT = Arrays
.stream(values())
.collect(Collectors.toMap(AlertTypeEnum::name, identity()));

public static Optional<AlertTypeEnum> getByName(final String name) {
return Optional.ofNullable(DICT.get(name));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.Table;
import org.opendatadiscovery.oddplatform.model.Tables;
import org.opendatadiscovery.oddplatform.notification.NotificationsProperties.WalProperties;
import org.opendatadiscovery.oddplatform.notification.config.NotificationsProperties.WalProperties;
import org.opendatadiscovery.oddplatform.notification.dto.DecodedWALMessage;
import org.opendatadiscovery.oddplatform.notification.exception.NotificationSubscriberException;
import org.opendatadiscovery.oddplatform.notification.processor.PostgresWALMessageProcessor;
import org.opendatadiscovery.oddplatform.notification.wal.PostgresWALMessageDecoder;
import org.postgresql.PGConnection;
Expand All @@ -38,14 +42,11 @@ public void run() {
"publication_names", walProperties.getPublicationName()
));

while (true) {
while (!Thread.interrupted()) {
final Connection replicationConnection = connectionFactory.getConnection(true);

try {
replicationConnection.createStatement()
.execute("SELECT pg_advisory_lock(%d)".formatted(walProperties.getAdvisoryLockId()));

log.debug("Acquired advisory lock on id = {}", walProperties.getAdvisoryLockId());
acquireAdvisoryLock(replicationConnection);

final PGConnection pgReplicationConnection = replicationConnection.unwrap(PGConnection.class);

Expand All @@ -61,6 +62,12 @@ public void run() {

try (final PGReplicationStream stream = streamBuilder.start()) {
while (true) {
if (Thread.interrupted()) {
log.warn("Notification subscriber thread interrupted while processing WAL messages");
Thread.currentThread().interrupt();
return;
}

final ByteBuffer buffer = stream.readPending();

if (buffer == null) {
Expand All @@ -70,15 +77,21 @@ public void run() {

log.debug("processing LSN: {}", stream.getLastReceiveLSN());

messageDecoder.decode(buffer)
.ifPresent(messageProcessor::process);
final Optional<DecodedWALMessage> decodedMessage = messageDecoder.decode(buffer);

if (decodedMessage.isPresent()) {
messageProcessor.process(decodedMessage.get());
}

stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
}
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new NotificationSubscriberException(e);
} catch (final Exception e) {
log.error("Error occurred while subscribing: {}", e);
log.error("Error occurred while subscribing", e);
} finally {
try {
replicationConnection.close();
Expand All @@ -91,33 +104,43 @@ public void run() {
try {
TimeUnit.SECONDS.sleep(10L);
} catch (final InterruptedException e) {
log.error("Error while sleeping", e);
throw new RuntimeException(e);
Thread.currentThread().interrupt();
throw new NotificationSubscriberException(e);
DementevNikita marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

private void acquireAdvisoryLock(final Connection replicationConnection) throws SQLException {
try (final Statement advisoryLockStatement = replicationConnection.createStatement()) {
advisoryLockStatement.execute(
"SELECT pg_advisory_lock(%d)".formatted(walProperties.getAdvisoryLockId()));
}

log.debug("Acquired advisory lock on id = {}", walProperties.getAdvisoryLockId());
}

private void registerReplicationSlot(final Connection connection,
final PGConnection replicationConnection) throws SQLException {
final String existsQuery = "SELECT EXISTS (SELECT slot_name FROM pg_replication_slots WHERE slot_name = ?)";

final PreparedStatement statement = connection.prepareStatement(existsQuery);
statement.setString(1, walProperties.getReplicationSlotName());

try (final ResultSet resultSet = statement.executeQuery()) {
resultSet.next();
if (!resultSet.getBoolean(1)) {
log.debug("Creating replication slot with name {}", walProperties.getReplicationSlotName());
replicationConnection.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(walProperties.getReplicationSlotName())
.withOutputPlugin(PG_REPLICATION_OUTPUT_PLUGIN)
.make();
try (final PreparedStatement statement = connection.prepareStatement(existsQuery)) {
statement.setString(1, walProperties.getReplicationSlotName());

try (final ResultSet resultSet = statement.executeQuery()) {
resultSet.next();
if (!resultSet.getBoolean(1)) {
log.debug("Creating replication slot with name {}", walProperties.getReplicationSlotName());
replicationConnection.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName(walProperties.getReplicationSlotName())
.withOutputPlugin(PG_REPLICATION_OUTPUT_PLUGIN)
.make();
}
}
}

log.debug("Replication slot {} registed", walProperties.getReplicationSlotName());
log.debug("Replication slot {} registered", walProperties.getReplicationSlotName());
}

private void registerPublication(final Connection connection, final Table<?> targetTable) throws SQLException {
Expand All @@ -127,24 +150,25 @@ private void registerPublication(final Connection connection, final Table<?> tar

final String existsQuery = "SELECT EXISTS (SELECT oid FROM pg_publication WHERE pubname = ?)";

final PreparedStatement existsStatement = connection.prepareStatement(existsQuery);
existsStatement.setString(1, walProperties.getPublicationName());
try (final PreparedStatement existsStatement = connection.prepareStatement(existsQuery)) {
existsStatement.setString(1, walProperties.getPublicationName());

try (final ResultSet resultSet = existsStatement.executeQuery()) {
resultSet.next();
if (!resultSet.getBoolean(1)) {
// PostgreSQL tables are always in a schema, so we don't need to check targetTable.getSchema() for null
final String tableName =
String.format("%s.%s", targetTable.getSchema().getName(), targetTable.getName());
try (final ResultSet resultSet = existsStatement.executeQuery()) {
resultSet.next();
if (!resultSet.getBoolean(1)) {
// PostgreSQL tables are always in a schema,
// so we don't need to check targetTable.getSchema() for null
final String tableName =
String.format("%s.%s", targetTable.getSchema().getName(), targetTable.getName());

log.debug("Creating publication with name {} for table {}",
walProperties.getPublicationName(), tableName);
log.debug("Creating publication with name {} for table {}",
walProperties.getPublicationName(), tableName);

connection.createStatement().execute(
"CREATE PUBLICATION %s FOR TABLE %s".formatted(walProperties.getPublicationName(), tableName));
connection.createStatement().execute(
"CREATE PUBLICATION %s FOR TABLE %s".formatted(walProperties.getPublicationName(), tableName));
}
}
}

log.debug("Publication {} registered", walProperties.getPublicationName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
import java.util.concurrent.Executors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opendatadiscovery.oddplatform.notification.config.NotificationsProperties;
import org.opendatadiscovery.oddplatform.notification.processor.AlertNotificationMessageProcessor;
import org.opendatadiscovery.oddplatform.notification.wal.PostgresWALMessageDecoder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnProperty(value = "notifications.enabled", havingValue = "true")
@RequiredArgsConstructor
@Slf4j
public class NotificationSubscriberStarter {
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ExecutorService executorService = Executors.newSingleThreadExecutor(
r -> new Thread(r, "notification-subscriber-thread")
);

private final PGConnectionFactory pgConnectionFactory;
private final PostgresWALMessageDecoder messageDecoder;
Expand All @@ -23,12 +28,8 @@ public class NotificationSubscriberStarter {

@EventListener(ApplicationReadyEvent.class)
public void runLeaderElection() {
if (notificationsProperties.isEnabled()) {
log.debug("Notification subscription is enabled, starting WAL parser");
executorService.submit(new NotificationSubscriber(
notificationsProperties.getWal(), pgConnectionFactory, messageDecoder, messageProcessor));
}

log.debug("Notification subscription is disabled");
log.debug("Notification subscription is enabled, starting WAL parser");
executorService.submit(new NotificationSubscriber(
notificationsProperties.getWal(), pgConnectionFactory, messageDecoder, messageProcessor));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Connection getConnection(final boolean replicationMode) {
PGProperty.USER.set(props, dataSourceProperties.getUsername());
PGProperty.PASSWORD.set(props, dataSourceProperties.getPassword());
if (replicationMode) {
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "13.2");
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "11.0");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");
}
Expand Down
Loading