Skip to content

Commit

Permalink
HBASE-27984 NPE in MigrateReplicationQueueFromZkToTableProcedure reco…
Browse files Browse the repository at this point in the history
…very (#5329)

Signed-off-by: GeorryHuang <huangzhuoyue@apache.org>
  • Loading branch information
Apache9 authored Jul 22, 2023
1 parent 2c92e6f commit 73ea43f
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo;
Expand Down Expand Up @@ -364,6 +365,9 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste

private RSGroupInfoManager rsGroupInfoManager;

private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
new ReplicationLogCleanerBarrier();

// manager of replication
private ReplicationPeerManager replicationPeerManager;

Expand Down Expand Up @@ -4106,6 +4110,11 @@ public ReplicationPeerManager getReplicationPeerManager() {
return replicationPeerManager;
}

@Override
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return replicationLogCleanerBarrier;
}

public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
getReplicationLoad(ServerName[] serverNames) {
List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
Expand Down Expand Up @@ -361,6 +362,12 @@ ReplicationPeerConfig getReplicationPeerConfig(String peerId)
*/
ReplicationPeerManager getReplicationPeerManager();

/**
* Returns the {@link ReplicationLogCleanerBarrier}. It will be used at multiple places so we put
* it in MasterServices directly.
*/
ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier();

/**
* Returns the {@link SyncReplicationReplayWALManager}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected ReplicationPeerConfig getNewPeerConfig() {
@Override
protected void releaseLatch(MasterProcedureEnv env) {
if (cleanerDisabled) {
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
env.getMasterServices().getReplicationLogCleanerBarrier().enable();
}
if (peerConfig.isSyncReplication()) {
env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
Expand All @@ -97,7 +97,7 @@ protected void releaseLatch(MasterProcedureEnv env) {
@Override
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException, ProcedureSuspendedException {
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs",
peerId, backoff / 1000));
Expand Down Expand Up @@ -142,7 +142,7 @@ protected void afterReplay(MasterProcedureEnv env) {
// when executing the procedure we will try to disable and acquire.
return;
}
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
throw new IllegalStateException("can not disable log cleaner, this should not happen");
}
cleanerDisabled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void shutdownExecutorService() {

private void disableReplicationLogCleaner(MasterProcedureEnv env)
throws ProcedureSuspendedException {
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
// it is not likely that we can reach here as we will schedule this procedure immediately
// after master restarting, where ReplicationLogCleaner should have not started its first run
// yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since
Expand All @@ -130,7 +130,7 @@ private void disableReplicationLogCleaner(MasterProcedureEnv env)
}

private void enableReplicationLogCleaner(MasterProcedureEnv env) {
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
env.getMasterServices().getReplicationLogCleanerBarrier().enable();
}

private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
Expand Down Expand Up @@ -224,7 +224,7 @@ protected Flow executeFromState(MasterProcedureEnv env,
lockEntry = procLock.getLockEntry(getProcId());
} catch (IOException ioe) {
LOG.error("Error while acquiring execution lock for procedure {}"
+ " when trying to wake it up, aborting...", ioe);
+ " when trying to wake it up, aborting...", this, ioe);
env.getMasterServices().abort("Can not acquire procedure execution lock", e);
return;
}
Expand Down Expand Up @@ -304,7 +304,7 @@ protected void afterReplay(MasterProcedureEnv env) {
// when executing the procedure we will try to disable and acquire.
return;
}
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
throw new IllegalStateException("can not disable log cleaner, this should not happen");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
Expand Down Expand Up @@ -115,9 +114,6 @@ public class ReplicationPeerManager implements ConfigurationObserver {
// Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1);

private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
new ReplicationLogCleanerBarrier();

private final String clusterId;

private volatile Configuration conf;
Expand Down Expand Up @@ -725,10 +721,6 @@ public void releaseSyncReplicationPeerLock() {
syncReplicationPeerLock.release();
}

public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return replicationLogCleanerBarrier;
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
// queue for a given peer, that why we can use a String peerId as key instead of
// ReplicationQueueId.
private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets;
private ReplicationLogCleanerBarrier barrier;
private ReplicationPeerManager rpm;
private Supplier<Set<ServerName>> getNotFullyDeadServers;

Expand All @@ -84,7 +85,7 @@ public void preClean() {
LOG.error("Error occurred while executing queueStorage.hasData()", e);
return;
}
canFilter = rpm.getReplicationLogCleanerBarrier().start();
canFilter = barrier.start();
if (canFilter) {
notFullyDeadServers = getNotFullyDeadServers.get();
peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId)
Expand All @@ -98,7 +99,7 @@ public void preClean() {
allQueueData = rpm.getQueueStorage().listAllQueues();
} catch (ReplicationException e) {
LOG.error("Can not list all replication queues, give up cleaning", e);
rpm.getReplicationLogCleanerBarrier().stop();
barrier.stop();
canFilter = false;
notFullyDeadServers = null;
peerIds = null;
Expand All @@ -122,7 +123,7 @@ public void preClean() {
@Override
public void postClean() {
if (canFilter) {
rpm.getReplicationLogCleanerBarrier().stop();
barrier.stop();
canFilter = false;
// release memory
notFullyDeadServers = null;
Expand Down Expand Up @@ -244,6 +245,7 @@ public void init(Map<String, Object> params) {
Object master = params.get(HMaster.MASTER);
if (master != null && master instanceof MasterServices) {
MasterServices m = (MasterServices) master;
barrier = m.getReplicationLogCleanerBarrier();
rpm = m.getReplicationPeerManager();
getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
Expand Down Expand Up @@ -524,4 +525,9 @@ public boolean replicationPeerModificationSwitch(boolean on) throws IOException
public boolean isReplicationPeerModificationEnabled() {
return false;
}

@Override
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ public void beforeTest() throws Exception {

masterServices = mock(MasterServices.class);
when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
when(masterServices.getReplicationLogCleanerBarrier())
.thenReturn(new ReplicationLogCleanerBarrier());
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
when(rpm.getQueueStorage()).thenReturn(queueStorage);
when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
ServerManager sm = mock(ServerManager.class);
when(masterServices.getServerManager()).thenReturn(sm);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ public void testDisablePeerAndWaitStates() throws Exception {
EXTRA_REGION_SERVERS
.put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);

ReplicationLogCleanerBarrier barrier = UTIL.getHBaseCluster().getMaster()
.getReplicationPeerManager().getReplicationLogCleanerBarrier();
ReplicationLogCleanerBarrier barrier =
UTIL.getHBaseCluster().getMaster().getReplicationLogCleanerBarrier();
assertTrue(barrier.start());

ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public class TestReplicationLogCleaner {
@Before
public void setUp() throws ReplicationException {
services = mock(MasterServices.class);
when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
when(services.getReplicationPeerManager()).thenReturn(rpm);
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testNoConf() {

@Test
public void testCanNotFilter() {
assertTrue(services.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable());
assertTrue(services.getReplicationLogCleanerBarrier().disable());
List<FileStatus> files = Arrays.asList(new FileStatus());
assertSame(Collections.emptyList(), runCleaner(cleaner, files));
}
Expand Down

0 comments on commit 73ea43f

Please sign in to comment.