diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index f29f79eb9b40..92f8178a197e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -71,7 +70,6 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.locking.LockProcedure; -import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable; @@ -2587,35 +2585,18 @@ public MasterProtos.BypassProcedureResponse bypassProcedure(RpcController contro public MasterProtos.ScheduleServerCrashProcedureResponse scheduleServerCrashProcedure( RpcController controller, MasterProtos.ScheduleServerCrashProcedureRequest request) throws ServiceException { - List serverNames = request.getServerNameList(); List pids = new ArrayList<>(); - try { - for (HBaseProtos.ServerName sn: serverNames) { - ServerName serverName = ProtobufUtil.toServerName(sn); - LOG.info("{} schedule ServerCrashProcedure for {}", - this.master.getClientIdAuditPrefix(), serverName); - if (shouldSubmitSCP(serverName)) { - final boolean containsMetaWALs = containMetaWals(serverName); - long pid = this.master.getServerManager().expireServer(serverName, - new Function() { - @Override - public Long apply(ServerName serverName) { - ProcedureExecutor procExec = - master.getMasterProcedureExecutor(); - return procExec.submitProcedure( - new HBCKServerCrashProcedure(procExec.getEnvironment(), - serverName, true, containsMetaWALs)); - } - }); - pids.add(pid); - } else { - pids.add(Procedure.NO_PROC_ID); - } + for (HBaseProtos.ServerName sn: request.getServerNameList()) { + ServerName serverName = ProtobufUtil.toServerName(sn); + LOG.info("{} schedule ServerCrashProcedure for {}", + this.master.getClientIdAuditPrefix(), serverName); + if (shouldSubmitSCP(serverName)) { + pids.add(this.master.getServerManager().expireServer(serverName, true)); + } else { + pids.add(Procedure.NO_PROC_ID); } - return MasterProtos.ScheduleServerCrashProcedureResponse.newBuilder().addAllPid(pids).build(); - } catch (IOException e) { - throw new ServiceException(e); } + return MasterProtos.ScheduleServerCrashProcedureResponse.newBuilder().addAllPid(pids).build(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index f94f25251d10..75ebfabb18c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -562,32 +561,18 @@ private List getRegionServersInZK(final ZKWatcher zkw) /** * Expire the passed server. Add it to list of dead servers and queue a shutdown processing. - * @return True if we queued a ServerCrashProcedure else false if we did not (could happen for - * many reasons including the fact that its this server that is going down or we already - * have queued an SCP for this server or SCP processing is currently disabled because we - * are in startup phase). + * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did + * not (could happen for many reasons including the fact that its this server that is + * going down or we already have queued an SCP for this server or SCP processing is + * currently disabled because we are in startup phase). */ - public boolean expireServer(final ServerName serverName) { - return expireServer(serverName, new Function() { - @Override - public Long apply(ServerName serverName) { - return master.getAssignmentManager().submitServerCrash(serverName, true); - } - }) != Procedure.NO_PROC_ID; + @VisibleForTesting // Redo test so we can make this protected. + public synchronized long expireServer(final ServerName serverName) { + return expireServer(serverName, false); + } - /** - * Expire the passed server. Add it to list of dead servers and queue a shutdown processing. - * Used when expireServer is externally invoked by hbck2. - * @param function Takes ServerName and returns pid. See default implementation which queues - * an SCP via the AssignmentManager. - * @return True if we queued a ServerCrashProcedure else false if we did not (could happen for - * many reasons including the fact that its this server that is going down or we already - * have queued an SCP for this server or SCP processing is currently disabled because we - * are in startup phase). - */ - synchronized long expireServer(final ServerName serverName, - Function function) { + synchronized long expireServer(final ServerName serverName, boolean force) { // THIS server is going down... can't handle our own expiration. if (serverName.equals(master.getServerName())) { if (!(master.isAborted() || master.isStopped())) { @@ -612,10 +597,7 @@ synchronized long expireServer(final ServerName serverName, return Procedure.NO_PROC_ID; } LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName()); - long pid = function.apply(serverName); - if (pid <= 0) { - return Procedure.NO_PROC_ID; - } + long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force); // Tell our listeners that a server was removed if (!this.listeners.isEmpty()) { this.listeners.stream().forEach(l -> l.serverRemoved(serverName)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 7541c5f89f0c..d1585c7c1c53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; +import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; @@ -1490,15 +1491,21 @@ public int getNumRegionsOpened() { return 0; } - public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) { - boolean carryingMeta; - long pid; + /** + * Usually run by the Master in reaction to server crash during normal processing. + * Can also be invoked via external RPC to effect repair; in the latter case, + * the 'force' flag is set so we push through the SCP though context may indicate + * already-running-SCP (An old SCP may have exited abnormally, or damaged cluster + * may still have references in hbase:meta to 'Unknown Servers' -- servers that + * are not online or in dead servers list, etc.) + * @param force Set if the request came in externally over RPC (via hbck2). Force means + * run the SCP even if it seems as though there might be an outstanding + * SCP running. + * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled. + */ + public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) { + // May be an 'Unknown Server' so handle case where serverNode is null. ServerStateNode serverNode = regionStates.getServerNode(serverName); - if (serverNode == null) { - LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName); - return -1; - } - // Remove the in-memory rsReports result synchronized (rsReports) { rsReports.remove(serverName); @@ -1508,26 +1515,43 @@ public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) { // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from // this server. This is used to simplify the implementation for TRSP and SCP, where we can make // sure that, the region list fetched by SCP will not be changed any more. - serverNode.writeLock().lock(); + if (serverNode != null) { + serverNode.writeLock().lock(); + } + boolean carryingMeta; + long pid; try { ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); carryingMeta = isCarryingMeta(serverName); - if (!serverNode.isInState(ServerState.ONLINE)) { - LOG.info( - "Skip to add SCP for {} with meta= {}, " + - "since there should be a SCP is processing or already done for this server node", - serverName, carryingMeta); - return -1; + if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) { + LOG.info("Skip adding SCP for {} (meta={}) -- running?", serverNode, carryingMeta); + return Procedure.NO_PROC_ID; } else { - serverNode.setState(ServerState.CRASHED); - pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), - serverName, shouldSplitWal, carryingMeta)); - LOG.info( - "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}", - serverName, carryingMeta, pid); + MasterProcedureEnv mpe = procExec.getEnvironment(); + // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead. + // HBCKSCP scours Master in-memory state AND hbase;meta for references to + // serverName just-in-case. An SCP that is scheduled when the server is + // 'Unknown' probably originated externally with HBCK2 fix-it tool. + ServerState oldState = null; + if (serverNode != null) { + oldState = serverNode.getState(); + serverNode.setState(ServerState.CRASHED); + } + + if (force) { + pid = procExec.submitProcedure( + new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)); + } else { + pid = procExec.submitProcedure( + new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)); + } + LOG.info("Scheduled SCP pid={} for {} (carryingMeta={}){}.", pid, serverName, carryingMeta, + serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState); } } finally { - serverNode.writeLock().unlock(); + if (serverNode != null) { + serverNode.writeLock().unlock(); + } } return pid; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java index f245500ce886..b58de83eb4ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java @@ -714,6 +714,9 @@ public void removeServer(final ServerName serverName) { serverMap.remove(serverName); } + /** + * @return Pertinent ServerStateNode or NULL if none found. + */ @VisibleForTesting public ServerStateNode getServerNode(final ServerName serverName) { return serverMap.get(serverName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java index 11883db86cbf..33f6b1a07d84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java @@ -33,12 +33,9 @@ */ @InterfaceAudience.Private public class ServerStateNode implements Comparable { - private final Set regions; private final ServerName serverName; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private volatile ServerState state = ServerState.ONLINE; public ServerStateNode(ServerName serverName) { @@ -120,6 +117,7 @@ public boolean equals(final Object other) { @Override public String toString() { - return String.format("ServerStateNode(%s)", getServerName()); + return getServerName() + "/" + getState() + "/regionCount=" + this.regions.size() + + "/lock=" + this.lock; } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/HBCKServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/HBCKServerCrashProcedure.java index 1e53a78d90c9..874dcde19861 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/HBCKServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/HBCKServerCrashProcedure.java @@ -81,12 +81,15 @@ List getRegionsOnCrashedServer(MasterProcedureEnv env) { LOG.warn("Failed get of all regions; continuing", ioe); } if (ps == null || ps.isEmpty()) { + LOG.warn("No regions found in hbase:meta"); return ris; } List aggregate = ris == null || ris.isEmpty()? new ArrayList<>(): new ArrayList<>(ris); + int before = aggregate.size(); ps.stream().filter(p -> p.getSecond() != null && p.getSecond().equals(getServerName())). forEach(p -> aggregate.add(p.getFirst())); + LOG.info("Found {} mentions of {} in hbase:meta", aggregate.size() - before, getServerName()); return aggregate; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterRestartFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterRestartFailover.java index 1577af84b511..2e18c1602819 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterRestartFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterRestartFailover.java @@ -109,7 +109,8 @@ public void test() throws Exception { ((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST)).findAny(); assertTrue("Should have one SCP for " + SERVER_FOR_TEST, procedure.isPresent()); assertFalse("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail", - UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST)); + UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST) == + Procedure.NO_PROC_ID); // Wait the SCP to finish SCP_LATCH.countDown(); @@ -117,7 +118,8 @@ public void test() throws Exception { assertFalse("Even when the SCP is finished, the duplicate SCP should not be scheduled for " + SERVER_FOR_TEST, - UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST)); + UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST) == + Procedure.NO_PROC_ID); serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() .getServerNode(SERVER_FOR_TEST); assertNull("serverNode should be deleted after SCP finished", serverNode); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java index 73b940aae672..61d29ec6553a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java @@ -304,7 +304,7 @@ protected void sendTransitionReport(final ServerName serverName, protected void doCrash(final ServerName serverName) { this.master.getServerManager().moveFromOnlineToDeadServers(serverName); - this.am.submitServerCrash(serverName, false/* No WALs here */); + this.am.submitServerCrash(serverName, false/* No WALs here */, false); // add a new server to avoid killing all the region servers which may hang the UTs ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1); newRsAdded++;