Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background primary determiner #121 #171

Merged
merged 18 commits into from
Oct 16, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,27 @@ dependencies {

test {
testLogging.showStandardStreams = false // set to true for debug purposes
useJUnitPlatform()
useJUnitPlatform() {
excludeTags 'cluster'
}
dependsOn checkstyleMain, checkstyleTest, pmdMain, pmdTest, spotbugsMain, spotbugsTest
maxParallelForks = 2 // try to set a higher value to speed up the local build
finalizedBy jacocoTestReport
finalizedBy jacocoTestCoverageVerification
}

tasks.register('clusterTest', Test) {
description = 'Runs tests on PostgreSQL cluster.'
group = 'verification'
useJUnitPlatform() {
includeTags 'cluster'
}
maxParallelForks = 1 // important!
mustRunAfter test
}

build.dependsOn clusterTest

jar {
manifest {
attributes("Implementation-Title": project.name,
Expand Down
1 change: 1 addition & 0 deletions config/pmd/pmd.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

<rule ref="category/java/multithreading.xml">
<exclude name="UseConcurrentHashMap"/>
<exclude name="DoNotUseThreads"/>
</rule>

<rule ref="category/java/performance.xml"/>
Expand Down
9 changes: 8 additions & 1 deletion config/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
</Match>
<Match>
<Bug pattern="BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"/>
<Class name="io.github.mfvanek.pg.support.PostgreSqlContainerWrapper"/>
<Or>
<Class name="io.github.mfvanek.pg.support.PostgreSqlContainerWrapper"/>
<Class name="io.github.mfvanek.pg.e2e.PostgresSqlClusterWrapper"/>
</Or>
</Match>
<Match>
<Bug pattern="OBL_UNSATISFIED_OBLIGATION,ODR_OPEN_DATABASE_RESOURCE"/>
<Class name="io.github.mfvanek.pg.connection.HighAvailabilityPgConnectionUnitTest"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,46 @@

package io.github.mfvanek.pg.connection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;

/**
* Implementation of a connection to a high availability cluster (with set of primary host and replicas).
*
* @author Ivan Vakhrushev
* @author Alexey Antipin
* @see HighAvailabilityPgConnection
*/
public class HighAvailabilityPgConnectionImpl implements HighAvailabilityPgConnection {

// TODO Bad design. Need logic here to deal with failover/switch-over in real cluster.
// As a possible solution - cache connectionToPrimary for a short period of time (e.g. 1 minute)
private final PgConnection connectionToPrimary;
private static final Logger LOGGER = LoggerFactory.getLogger(HighAvailabilityPgConnectionImpl.class);
private static final long DEFAULT_PRIMARY_REFRESH_INTERVAL_MILLISECONDS = 30_000L;

private final AtomicReference<PgConnection> cachedConnectionToPrimary = new AtomicReference<>();
private final Set<PgConnection> connectionsToAllHostsInCluster;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private final PrimaryHostDeterminer primaryHostDeterminer;

private HighAvailabilityPgConnectionImpl(@Nonnull final PgConnection connectionToPrimary,
@Nonnull final Collection<PgConnection> connectionsToAllHostsInCluster) {
this.connectionToPrimary = Objects.requireNonNull(connectionToPrimary, "connectionToPrimary");
Evreke marked this conversation as resolved.
Show resolved Hide resolved
@Nonnull final Collection<PgConnection> connectionsToAllHostsInCluster,
@Nonnull final PrimaryHostDeterminer primaryHostDeterminer) {
this.primaryHostDeterminer = Objects.requireNonNull(primaryHostDeterminer);
Objects.requireNonNull(connectionToPrimary, "connectionToPrimary");
final Set<PgConnection> defensiveCopy = new HashSet<>(
Objects.requireNonNull(connectionsToAllHostsInCluster, "connectionsToAllHostsInCluster"));
PgConnectionValidators.shouldContainsConnectionToPrimary(connectionToPrimary, defensiveCopy);
this.cachedConnectionToPrimary.set(connectionToPrimary);
Evreke marked this conversation as resolved.
Show resolved Hide resolved
this.connectionsToAllHostsInCluster = Collections.unmodifiableSet(defensiveCopy);
}

Expand All @@ -39,7 +59,7 @@ private HighAvailabilityPgConnectionImpl(@Nonnull final PgConnection connectionT
@Override
@Nonnull
public PgConnection getConnectionToPrimary() {
return connectionToPrimary;
return cachedConnectionToPrimary.get();
}

/**
Expand All @@ -51,14 +71,67 @@ public Set<PgConnection> getConnectionsToAllHostsInCluster() {
return connectionsToAllHostsInCluster;
}

/**
* Constructs a {@code HighAvailabilityPgConnection} object with the given {@code PgConnection}.
*
* @param connectionToPrimary connection to the primary host in the single-node cluster.
* @return {@code HighAvailabilityPgConnection}
*/
@Nonnull
public static HighAvailabilityPgConnection of(@Nonnull final PgConnection connectionToPrimary) {
return new HighAvailabilityPgConnectionImpl(connectionToPrimary, Collections.singleton(connectionToPrimary));
return of(connectionToPrimary, Collections.singleton(connectionToPrimary));
}

/**
* Constructs a {@code HighAvailabilityPgConnection} object with the given connections to primary and replicas.
*
* @param connectionToPrimary connection to the primary host in the cluster.
* @param connectionsToAllHostsInCluster connections to all replicas in the cluster.
* @return {@code HighAvailabilityPgConnection}
*/
@Nonnull
public static HighAvailabilityPgConnection of(@Nonnull final PgConnection connectionToPrimary,
@Nonnull final Collection<PgConnection> connectionsToAllHostsInCluster) {
return new HighAvailabilityPgConnectionImpl(connectionToPrimary, connectionsToAllHostsInCluster);
return of(connectionToPrimary, connectionsToAllHostsInCluster, DEFAULT_PRIMARY_REFRESH_INTERVAL_MILLISECONDS);
}

/**
* Constructs a {@code HighAvailabilityPgConnection} object with the given connections to primary and replicas and a refresh interval.
*
* @param connectionToPrimary connection to the primary host in the cluster.
* @param connectionsToAllHostsInCluster connections to all replicas in the cluster.
* @param primaryRefreshIntervalMilliseconds time interval in milliseconds to refresh connection to the primary host.
* @return {@code HighAvailabilityPgConnection}
*/
@Nonnull
public static HighAvailabilityPgConnection of(@Nonnull final PgConnection connectionToPrimary,
@Nonnull final Collection<PgConnection> connectionsToAllHostsInCluster,
final long primaryRefreshIntervalMilliseconds) {
final PrimaryHostDeterminer primaryHostDeterminer = new PrimaryHostDeterminerImpl();
final HighAvailabilityPgConnectionImpl highAvailabilityPgConnection = new HighAvailabilityPgConnectionImpl(connectionToPrimary, connectionsToAllHostsInCluster, primaryHostDeterminer);
Evreke marked this conversation as resolved.
Show resolved Hide resolved
highAvailabilityPgConnection.startPrimaryUpdater(primaryRefreshIntervalMilliseconds);
return highAvailabilityPgConnection;
}

private void startPrimaryUpdater(final long primaryRefreshIntervalMilliseconds) {
if (this.getConnectionsToAllHostsInCluster().size() > 1) {
executorService.scheduleWithFixedDelay(this::updateConnectionToPrimary, primaryRefreshIntervalMilliseconds, primaryRefreshIntervalMilliseconds, TimeUnit.MILLISECONDS);
} else {
LOGGER.debug("Single node. There's no point to monitor primary node.");
}
}

@SuppressWarnings("checkstyle:IllegalCatch")
private void updateConnectionToPrimary() {
connectionsToAllHostsInCluster.forEach(pgConnection -> {
try {
if (primaryHostDeterminer.isPrimary(pgConnection)) {
cachedConnectionToPrimary.set(pgConnection);
LOGGER.debug("Current primary is {}", pgConnection.getHost().getPgUrl());
}
} catch (Exception e) {
LOGGER.warn("Exception during primary detection for host {}", pgConnection.getHost(), e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@

package io.github.mfvanek.pg.checks.cluster;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import io.github.mfvanek.pg.checks.predicates.FilterIndexesByNamePredicate;
import io.github.mfvanek.pg.common.maintenance.DatabaseCheckOnCluster;
import io.github.mfvanek.pg.common.maintenance.Diagnostic;
Expand All @@ -23,14 +18,12 @@
import io.github.mfvanek.pg.model.index.UnusedIndex;
import io.github.mfvanek.pg.statistics.maintenance.StatisticsMaintenanceOnHost;
import io.github.mfvanek.pg.support.DatabaseAwareTestBase;
import io.github.mfvanek.pg.support.LogsCaptor;
import io.github.mfvanek.pg.utils.ClockHolder;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.slf4j.LoggerFactory;

import java.time.OffsetDateTime;
import java.util.Arrays;
Expand All @@ -44,30 +37,10 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

@SuppressWarnings("PMD.ExcessiveImports")
class UnusedIndexesCheckOnClusterTest extends DatabaseAwareTestBase {

private static Logger logger;
private static ListAppender<ILoggingEvent> logAppender;

private final DatabaseCheckOnCluster<UnusedIndex> check = new UnusedIndexesCheckOnCluster(getHaPgConnection());

@BeforeAll
static void init() {
final LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
logger = context.getLogger(UnusedIndexesCheckOnCluster.class);
logAppender = new ListAppender<>();
logAppender.start();
logger.addAppender(logAppender);
}

@BeforeEach
void setUp() {
logger.setLevel(Level.INFO);
logAppender.clearAllFilters();
logAppender.list.clear();
}

@Test
void shouldSatisfyContract() {
assertThat(check.getType()).isEqualTo(UnusedIndex.class);
Expand All @@ -76,12 +49,14 @@ void shouldSatisfyContract() {

@Test
void checkOnClusterShouldLogResetStatisticsData() {
assertThat(check.check())
.isEmpty();

assertThat(logAppender.list)
.hasSize(1)
.allMatch(l -> l.getMessage().contains("reset"));
try (LogsCaptor logsCaptor = new LogsCaptor(UnusedIndexesCheckOnCluster.class)) {
assertThat(check.check())
.isEmpty();

assertThat(logsCaptor.getLogs())
.hasSize(1)
.allMatch(l -> l.getMessage().contains("reset"));
}
}

@ParameterizedTest
Expand Down
Loading