Skip to content

Commit

Permalink
fix RdbOnlyDumper fail before reconnect for rdb not continue
Browse files Browse the repository at this point in the history
  • Loading branch information
sl_li committed Dec 30, 2024
1 parent 8990a86 commit 01c5773
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.ctrip.xpipe.redis.keeper.exception.psync;

import com.ctrip.xpipe.redis.keeper.monitor.PsyncFailReason;

public class KeeperTolerantClosePsyncException extends PsyncRuntimeException {

public KeeperTolerantClosePsyncException(PsyncRuntimeException e) {
super("keeper tolerant:" + e.getMessage(), e);
}

@Override
public PsyncFailReason toReason() {
Throwable cause = getCause();
if (cause instanceof PsyncRuntimeException) return ((PsyncRuntimeException) cause).toReason();
else return super.toReason();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import com.ctrip.xpipe.redis.keeper.RedisKeeperServer;
import com.ctrip.xpipe.redis.keeper.RedisMaster;
import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager;
import com.ctrip.xpipe.redis.keeper.exception.psync.KeeperTolerantClosePsyncException;
import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncConnectMasterFailException;
import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncMasterRdbOffsetNotContinuousRuntimeException;
import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncRuntimeException;
import com.ctrip.xpipe.redis.keeper.exception.replication.UnexpectedReplIdException;
import com.ctrip.xpipe.redis.keeper.store.RdbOnlyReplicationStore;
import com.ctrip.xpipe.utils.VisibleForTesting;
Expand All @@ -24,6 +26,7 @@

import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;

/**
* @author wenchao.meng
Expand All @@ -47,6 +50,8 @@ enum REPL_STATE {

private REPL_STATE state;

private AtomicReference<Psync> currentPsync = new AtomicReference<>();

public RdbonlyRedisMasterReplication(RedisKeeperServer redisKeeperServer, RedisMaster redisMaster,
boolean tryRordb, boolean freshRdbNeeded,
NioEventLoopGroup nioEventLoopGroup, ScheduledExecutorService scheduled,
Expand Down Expand Up @@ -120,14 +125,24 @@ protected void psyncFail(Throwable cause) {
protected Psync createPsync() {

Psync psync;
RdbOnlyReplicationStore replicationStore;
try {
replicationStore = tryGetReplicationStore();
} catch (Throwable th) {
logger.info("[createPsync][prepareReplicationStore][fail] close", th);
disconnectWithMaster();
throw new PsyncRuntimeException("prepare ReplicationStore fail", th);
}

if (state.equals(REPL_STATE.FRESH_SYNC)) {
psync = new FreshRdbOnlyPsync(clientPool, rdbOnlyReplicationStore, scheduled);
psync = new FreshRdbOnlyPsync(clientPool, replicationStore, scheduled);
} else {
psync = new RdbOnlyPsync(clientPool, rdbOnlyReplicationStore, scheduled);
psync = new RdbOnlyPsync(clientPool, replicationStore, scheduled);
}

psync.addPsyncObserver(this);
psync.addPsyncObserver(redisKeeperServer.createPsyncObserverForRdbOnlyRepl());
currentPsync.set(psync);
return psync;
}

Expand Down Expand Up @@ -175,11 +190,13 @@ protected void doOnFullSync(long masterRdbOffset) {
if (state.equals(REPL_STATE.NORMAL_SYNC)) {
state = REPL_STATE.FAIL_FOR_NOT_CONTINUE;
try {
logger.info("[retryOnceForRdbNotContinue][resetRdbStore]{}", dumpedRdbStore);
dumpedRdbStore = getRdbDumper().prepareRdbStore();
rdbOnlyReplicationStore = new RdbOnlyReplicationStore(dumpedRdbStore);
logger.info("[retryOnceForRdbNotContinue][resetRdbStore][{}:{}]{}", masterRdbOffset, firstAvailable, dumpedRdbStore);
currentPsync.get().future().setFailure(new KeeperTolerantClosePsyncException(
new PsyncMasterRdbOffsetNotContinuousRuntimeException(masterRdbOffset, firstAvailable)));
resetReplicationStore();
disconnectWithMaster();
} catch (Exception e) {
logger.info("[doOnFullSync][retryForNotContinue] fail", e);
dumpFail(new PsyncMasterRdbOffsetNotContinuousRuntimeException(masterRdbOffset, firstAvailable));
}
} else {
Expand All @@ -188,6 +205,34 @@ protected void doOnFullSync(long masterRdbOffset) {
}
}

private synchronized void resetReplicationStore() {
dumpedRdbStore = null;
rdbOnlyReplicationStore = null;
}

private RdbOnlyReplicationStore tryGetReplicationStore() throws IOException {
if (null != rdbOnlyReplicationStore) return rdbOnlyReplicationStore;

synchronized (this) {
if (null != rdbOnlyReplicationStore) return rdbOnlyReplicationStore;

dumpedRdbStore = getRdbDumper().prepareRdbStore();
rdbOnlyReplicationStore = new RdbOnlyReplicationStore(dumpedRdbStore);
return rdbOnlyReplicationStore;
}
}

@Override
protected void dumpFail(Throwable th) {
if (th instanceof KeeperTolerantClosePsyncException ||
(null != th.getCause() && th.getCause() instanceof KeeperTolerantClosePsyncException)) {
logger.info("[dumpFail][tolerant] {}", th.getMessage());
return;
}

super.dumpFail(th);
}

@Override
protected String getSimpleName() {
return "RdbRep";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.ctrip.xpipe.redis.keeper.AbstractRedisKeeperTest;
import com.ctrip.xpipe.redis.keeper.RedisMaster;
import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager;
import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncCommandFailException;
import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncMasterRdbOffsetNotContinuousRuntimeException;
import com.ctrip.xpipe.redis.keeper.monitor.KeeperMonitor;
import com.ctrip.xpipe.redis.keeper.monitor.KeeperStats;
Expand Down Expand Up @@ -159,6 +160,14 @@ protected void scheduleReconnect(int timeMilli) {
}
};

Psync psync = rdbonlyRedisMasterReplication.createPsync();
Assert.assertTrue(psync instanceof RdbOnlyPsync);

psync.future().addListener(commandFuture -> {
if (!commandFuture.isSuccess()) {
rdbonlyRedisMasterReplication.dumpFail(new PsyncCommandFailException(commandFuture.cause()));
}
});
when(replicationStore.firstAvailableOffset()).thenReturn(120L);
rdbonlyRedisMasterReplication.onFullSync(100);
Assert.assertFalse(dumper.future().isDone());
Expand All @@ -167,6 +176,9 @@ protected void scheduleReconnect(int timeMilli) {
Assert.assertEquals(1, reconnectCnt.get());
Assert.assertFalse(dumper.future().isDone());

psync = rdbonlyRedisMasterReplication.createPsync();
Assert.assertTrue(psync instanceof FreshRdbOnlyPsync);

rdbonlyRedisMasterReplication.masterDisconnected(Mockito.mock(Channel.class));
Assert.assertEquals(1, reconnectCnt.get());
Assert.assertTrue(dumper.future().isDone());
Expand Down

0 comments on commit 01c5773

Please sign in to comment.