Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background primary determiner #121 #171

Merged
merged 18 commits into from
Oct 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,40 @@

package io.github.mfvanek.pg.connection;

import io.github.mfvanek.pg.utils.PgSqlException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class HighAvailabilityPgConnectionImpl implements HighAvailabilityPgConnection {

// TODO Bad design. Need logic here to deal with failover/switch-over in real cluster.
// As a possible solution - cache connectionToPrimary for a short period of time (e.g. 1 minute)
private final PgConnection connectionToPrimary;
private static final Logger LOGGER = LoggerFactory.getLogger(HighAvailabilityPgConnectionImpl.class);

private final AtomicReference<PgConnection> cachedConnectionToPrimary = new AtomicReference<>();
private final Set<PgConnection> connectionsToAllHostsInCluster;
@SuppressWarnings("PMD.DoNotUseThreads")
Evreke marked this conversation as resolved.
Show resolved Hide resolved
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private final PrimaryHostDeterminer primaryHostDeterminer;

private HighAvailabilityPgConnectionImpl(@Nonnull final PgConnection connectionToPrimary,
@Nonnull final Collection<PgConnection> connectionsToAllHostsInCluster) {
this.connectionToPrimary = Objects.requireNonNull(connectionToPrimary, "connectionToPrimary");
Evreke marked this conversation as resolved.
Show resolved Hide resolved
@Nonnull final Collection<PgConnection> connectionsToAllHostsInCluster,
@Nonnull final PrimaryHostDeterminer primaryHostDeterminer) {
this.primaryHostDeterminer = Objects.requireNonNull(primaryHostDeterminer);
final Set<PgConnection> defensiveCopy = new HashSet<>(
Objects.requireNonNull(connectionsToAllHostsInCluster, "connectionsToAllHostsInCluster"));
PgConnectionValidators.shouldContainsConnectionToPrimary(connectionToPrimary, defensiveCopy);
this.cachedConnectionToPrimary.set(connectionToPrimary);
Evreke marked this conversation as resolved.
Show resolved Hide resolved
this.connectionsToAllHostsInCluster = Collections.unmodifiableSet(defensiveCopy);
}

Expand All @@ -39,7 +53,7 @@ private HighAvailabilityPgConnectionImpl(@Nonnull final PgConnection connectionT
@Override
@Nonnull
public PgConnection getConnectionToPrimary() {
return connectionToPrimary;
return cachedConnectionToPrimary.get();
}

/**
Expand All @@ -53,12 +67,46 @@ public Set<PgConnection> getConnectionsToAllHostsInCluster() {

@Nonnull
public static HighAvailabilityPgConnection of(@Nonnull final PgConnection connectionToPrimary) {
return new HighAvailabilityPgConnectionImpl(connectionToPrimary, Collections.singleton(connectionToPrimary));
final PrimaryHostDeterminer primaryHostDeterminer = new PrimaryHostDeterminerImpl();
return new HighAvailabilityPgConnectionImpl(connectionToPrimary, Collections.singleton(connectionToPrimary), primaryHostDeterminer);
}

@Nonnull
public static HighAvailabilityPgConnection of(@Nonnull final PgConnection connectionToPrimary,
@Nonnull final Collection<PgConnection> connectionsToAllHostsInCluster) {
return new HighAvailabilityPgConnectionImpl(connectionToPrimary, connectionsToAllHostsInCluster);
final PrimaryHostDeterminer primaryHostDeterminer = new PrimaryHostDeterminerImpl();
return new HighAvailabilityPgConnectionImpl(connectionToPrimary, connectionsToAllHostsInCluster, primaryHostDeterminer);
Evreke marked this conversation as resolved.
Show resolved Hide resolved
}

@Nonnull
public static HighAvailabilityPgConnection of(@Nonnull final PgConnection connectionToPrimary,
@Nonnull final Collection<PgConnection> connectionsToAllHostsInCluster,
@Nullable final Integer delaySeconds) {
final PrimaryHostDeterminer primaryHostDeterminer = new PrimaryHostDeterminerImpl();
final HighAvailabilityPgConnectionImpl highAvailabilityPgConnection = new HighAvailabilityPgConnectionImpl(connectionToPrimary, connectionsToAllHostsInCluster, primaryHostDeterminer);
Evreke marked this conversation as resolved.
Show resolved Hide resolved
if (highAvailabilityPgConnection.connectionsToAllHostsInCluster.size() > 1 && delaySeconds != null) {
Evreke marked this conversation as resolved.
Show resolved Hide resolved
highAvailabilityPgConnection.startPrimaryUpdater(delaySeconds);
}
return highAvailabilityPgConnection;
}

@SuppressWarnings("PMD.DoNotUseThreads")
Evreke marked this conversation as resolved.
Show resolved Hide resolved
private void startPrimaryUpdater(@Nonnull final Integer delaySeconds) {
Objects.requireNonNull(delaySeconds);
executorService.scheduleWithFixedDelay(this::updateConnectionToPrimary, delaySeconds, delaySeconds, TimeUnit.SECONDS);
}

private void updateConnectionToPrimary() {
connectionsToAllHostsInCluster.forEach(pgConnection -> {
try {
if (primaryHostDeterminer.isPrimary(pgConnection)) {
cachedConnectionToPrimary.set(pgConnection);
LOGGER.debug("Current primary is {}", pgConnection.getHost().getPgUrl());
}
} catch (PgSqlException e) {
LOGGER.debug("Exception during primary detection for host {} with message {}", pgConnection.getHost(), e.getMessage());
Evreke marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2019-2022. Ivan Vakhrushev and others.
* https://github.com/mfvanek/pg-index-health
*
* This file is a part of "pg-index-health" - a Java library for
* analyzing and maintaining indexes health in PostgreSQL databases.
*
* Licensed under the Apache License 2.0
*/

package io.github.mfvanek.pg.connection;

import io.github.mfvanek.pg.support.ClusterAwareTestBase;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.ArrayList;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for HighAvailabilityPgConnection on cluster.
*
* @author Alexey Antipin
* @since 0.6.2
*/
class HighAvailabilityPgConnectionClusterTest extends ClusterAwareTestBase {

@Test
void standbyBecomesPrimaryOnPrimaryDown() {
final PgConnection firstConnection = getFirstPgConnection();
final PgConnection secondConnection = getSecondPgConnection();

final ArrayList<PgConnection> pgConnections = new ArrayList<>();
pgConnections.add(firstConnection);
pgConnections.add(secondConnection);

final HighAvailabilityPgConnection haPgConnection = HighAvailabilityPgConnectionImpl.of(firstConnection, pgConnections, 5);

assertThat(haPgConnection.getConnectionToPrimary())
Evreke marked this conversation as resolved.
Show resolved Hide resolved
.as("First connection is primary")
.isEqualTo(firstConnection);
assertThat(haPgConnection.getConnectionToPrimary())
.as("Second connection is not primary")
.isNotEqualTo(secondConnection);

stopFirstContainer();

Awaitility
.await()
.atMost(Duration.ofSeconds(120))
.with()
.pollInterval(Duration.ofSeconds(2))
.until(() -> haPgConnection.getConnectionToPrimary().equals(secondConnection));

assertThat(haPgConnection.getConnectionToPrimary())
.as("Second connection is primary")
.isEqualTo(secondConnection);
assertThat(haPgConnection.getConnectionToPrimary())
Evreke marked this conversation as resolved.
Show resolved Hide resolved
.as("First connection is not primary")
.isNotEqualTo(firstConnection);

// TODO backward switch is not testable as on start container will get new port that is different from used in PgHostImpl.
// Probably caching and reusing firstMapped port in cluster extension is the solution.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.sql.DataSource;
Expand All @@ -41,8 +42,10 @@ final class PostgresSqlClusterWrapper {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSqlClusterWrapper.class);
private static final String IMAGE_NAME = "docker.io/bitnami/postgresql-repmgr";
private static final String IMAGE_TAG = preparePostgresBitnamiVersion();
private static final String PRIMARY_ALIAS = "pg-0";
private static final String STANDBY_ALIAS = "pg-1";
// REPMGR_NODE_NAME must end with a number, so aliases must also
// To avoid a ConflictException when starting the container, aliases must be unique if there is more than one instance of PostgresSqlClusterWrapper
private static final String PRIMARY_ALIAS = String.format("pg-%s-0", UUID.randomUUID());
Evreke marked this conversation as resolved.
Show resolved Hide resolved
private static final String STANDBY_ALIAS = String.format("pg-%s-1", UUID.randomUUID());

private final Network network;
private final JdbcDatabaseContainer<?> containerForPrimary;
Expand Down