Skip to content

Commit

Permalink
Background primary determiner #121
Browse files Browse the repository at this point in the history
change: resolve comments
Signed-off-by: Antipin Alexey <evreke@gmail.com>
  • Loading branch information
Evreke committed Sep 30, 2022
1 parent a5a8166 commit f040c47
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

@SuppressWarnings("PMD.DoNotUseThreads")
public class HighAvailabilityPgConnectionImpl implements HighAvailabilityPgConnection {

private static final Logger LOGGER = LoggerFactory.getLogger(HighAvailabilityPgConnectionImpl.class);
private static final Integer DEFAULT_DELAY_SECONDS = 30;

private final AtomicReference<PgConnection> cachedConnectionToPrimary = new AtomicReference<>();
private final Set<PgConnection> connectionsToAllHostsInCluster;
@SuppressWarnings("PMD.DoNotUseThreads")
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private final PrimaryHostDeterminer primaryHostDeterminer;

Expand All @@ -43,7 +44,7 @@ private HighAvailabilityPgConnectionImpl(@Nonnull final PgConnection connectionT
final Set<PgConnection> defensiveCopy = new HashSet<>(
Objects.requireNonNull(connectionsToAllHostsInCluster, "connectionsToAllHostsInCluster"));
PgConnectionValidators.shouldContainsConnectionToPrimary(connectionToPrimary, defensiveCopy);
this.cachedConnectionToPrimary.set(connectionToPrimary);
this.cachedConnectionToPrimary.set(Objects.requireNonNull(connectionToPrimary, "connectionToPrimary"));
this.connectionsToAllHostsInCluster = Collections.unmodifiableSet(defensiveCopy);
}

Expand Down Expand Up @@ -75,7 +76,9 @@ public static HighAvailabilityPgConnection of(@Nonnull final PgConnection connec
public static HighAvailabilityPgConnection of(@Nonnull final PgConnection connectionToPrimary,
@Nonnull final Collection<PgConnection> connectionsToAllHostsInCluster) {
final PrimaryHostDeterminer primaryHostDeterminer = new PrimaryHostDeterminerImpl();
return new HighAvailabilityPgConnectionImpl(connectionToPrimary, connectionsToAllHostsInCluster, primaryHostDeterminer);
final HighAvailabilityPgConnectionImpl highAvailabilityPgConnection = new HighAvailabilityPgConnectionImpl(connectionToPrimary, connectionsToAllHostsInCluster, primaryHostDeterminer);
highAvailabilityPgConnection.startPrimaryUpdater(DEFAULT_DELAY_SECONDS);
return highAvailabilityPgConnection;
}

@Nonnull
Expand All @@ -84,16 +87,21 @@ public static HighAvailabilityPgConnection of(@Nonnull final PgConnection connec
@Nullable final Integer delaySeconds) {
final PrimaryHostDeterminer primaryHostDeterminer = new PrimaryHostDeterminerImpl();
final HighAvailabilityPgConnectionImpl highAvailabilityPgConnection = new HighAvailabilityPgConnectionImpl(connectionToPrimary, connectionsToAllHostsInCluster, primaryHostDeterminer);
if (highAvailabilityPgConnection.connectionsToAllHostsInCluster.size() > 1 && delaySeconds != null) {
if (delaySeconds == null) {
highAvailabilityPgConnection.startPrimaryUpdater(DEFAULT_DELAY_SECONDS);
} else {
highAvailabilityPgConnection.startPrimaryUpdater(delaySeconds);
}
return highAvailabilityPgConnection;
}

@SuppressWarnings("PMD.DoNotUseThreads")
private void startPrimaryUpdater(@Nonnull final Integer delaySeconds) {
Objects.requireNonNull(delaySeconds);
executorService.scheduleWithFixedDelay(this::updateConnectionToPrimary, delaySeconds, delaySeconds, TimeUnit.SECONDS);
Objects.requireNonNull(delaySeconds, "delaySeconds");
if (this.getConnectionsToAllHostsInCluster().size() > 1) {
executorService.scheduleWithFixedDelay(this::updateConnectionToPrimary, delaySeconds, delaySeconds, TimeUnit.SECONDS);
} else {
LOGGER.debug("Single node. There's no point to monitor primary node.");
}
}

private void updateConnectionToPrimary() {
Expand All @@ -104,7 +112,7 @@ private void updateConnectionToPrimary() {
LOGGER.debug("Current primary is {}", pgConnection.getHost().getPgUrl());
}
} catch (PgSqlException e) {
LOGGER.debug("Exception during primary detection for host {} with message {}", pgConnection.getHost(), e.getMessage());
LOGGER.error("Exception during primary detection for host {} with message {}", pgConnection.getHost(), e.getMessage());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
class HighAvailabilityPgConnectionClusterTest extends ClusterAwareTestBase {

@Test
void standbyBecomesPrimaryOnPrimaryDown() {
void standbyBecomesPrimaryOnPrimaryDownWithPredefinedDelay() {
final PgConnection firstConnection = getFirstPgConnection();
final PgConnection secondConnection = getSecondPgConnection();

Expand All @@ -40,8 +40,7 @@ void standbyBecomesPrimaryOnPrimaryDown() {

assertThat(haPgConnection.getConnectionToPrimary())
.as("First connection is primary")
.isEqualTo(firstConnection);
assertThat(haPgConnection.getConnectionToPrimary())
.isEqualTo(firstConnection)
.as("Second connection is not primary")
.isNotEqualTo(secondConnection);

Expand All @@ -56,12 +55,42 @@ void standbyBecomesPrimaryOnPrimaryDown() {

assertThat(haPgConnection.getConnectionToPrimary())
.as("Second connection is primary")
.isEqualTo(secondConnection);
assertThat(haPgConnection.getConnectionToPrimary())
.isEqualTo(secondConnection)
.as("First connection is not primary")
.isNotEqualTo(firstConnection);
}

@Test
void standbyBecomesPrimaryOnPrimaryDownWithDefaultDelay() {
startFirstContainer();
final PgConnection firstConnection = getFirstPgConnection();
final PgConnection secondConnection = getSecondPgConnection();

final ArrayList<PgConnection> pgConnections = new ArrayList<>();
pgConnections.add(firstConnection);
pgConnections.add(secondConnection);

final HighAvailabilityPgConnection haPgConnection = HighAvailabilityPgConnectionImpl.of(firstConnection, pgConnections);

assertThat(haPgConnection.getConnectionToPrimary())
.as("First connection is primary")
.isEqualTo(firstConnection)
.as("Second connection is not primary")
.isNotEqualTo(secondConnection);

stopFirstContainer();

// TODO backward switch is not testable as on start container will get new port that is different from used in PgHostImpl.
// Probably caching and reusing firstMapped port in cluster extension is the solution.
Awaitility
.await()
.atMost(Duration.ofSeconds(120))
.with()
.pollInterval(Duration.ofSeconds(2))
.until(() -> haPgConnection.getConnectionToPrimary().equals(secondConnection));

assertThat(haPgConnection.getConnectionToPrimary())
.as("Second connection is primary")
.isEqualTo(secondConnection)
.as("First connection is not primary")
.isNotEqualTo(firstConnection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ protected PgConnection getSecondPgConnection() {
protected void stopFirstContainer() {
POSTGRES_CLUSTER.stopFirstContainer();
}

protected void startFirstContainer() {
POSTGRES_CLUSTER.startFirstContainer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ final class PostgresSqlClusterWrapper {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSqlClusterWrapper.class);
private static final String IMAGE_NAME = "docker.io/bitnami/postgresql-repmgr";
private static final String IMAGE_TAG = preparePostgresBitnamiVersion();
// REPMGR_NODE_NAME must end with a number, so aliases must also
// To avoid a ConflictException when starting the container, aliases must be unique if there is more than one instance of PostgresSqlClusterWrapper
private static final String PRIMARY_ALIAS = String.format("pg-%s-0", UUID.randomUUID());
private static final String STANDBY_ALIAS = String.format("pg-%s-1", UUID.randomUUID());
private static final String PRIMARY_ALIAS;
private static final String STANDBY_ALIAS;

static {
// REPMGR_NODE_NAME must end with a number, so aliases must also
// To avoid a ConflictException when starting the container, aliases must be unique if there is more than one instance of PostgresSqlClusterWrapper
final UUID uuid = UUID.randomUUID();
PRIMARY_ALIAS = String.format("pg-%s-0", uuid);
STANDBY_ALIAS = String.format("pg-%s-1", uuid);
}

private final Network network;
private final JdbcDatabaseContainer<?> containerForPrimary;
Expand Down Expand Up @@ -111,6 +117,11 @@ public void stopFirstContainer() {
.until(() -> containerForStandBy.getLogs().contains("standby promoted to primary after"));
}

public void startFirstContainer() {
LOGGER.info("Starting first container");
containerForPrimary.start();
}

@Nonnull
private Map<String, String> primaryEnvVarsMap() {
final Map<String, String> envVarsMap = new HashMap<>();
Expand Down

0 comments on commit f040c47

Please sign in to comment.