diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index d1f187458a95..d0ea8c53092e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1447,9 +1447,18 @@ public enum OperationStatusCode { public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = "hbase.client.fast.fail.interceptor.impl"; + /** + * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based + * distributed WAL splitter; see SplitWALManager. + */ + @Deprecated public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = "hbase.split.wal.zk.coordinated"; - public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true; + /** + * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0. + */ + @Deprecated + public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = false; public static final String HBASE_SPLIT_WAL_MAX_SPLITTER = "hbase.regionserver.wal.max.splitters"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java index 6be1131bfb82..443c8d2e32c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java @@ -25,9 +25,14 @@ /** * Counters kept by the distributed WAL split log process. * Used by master and regionserver packages. + * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based + * distributed WAL splitter, see SplitWALManager */ +@Deprecated @InterfaceAudience.Private public class SplitLogCounters { + private SplitLogCounters() {} + //Spnager counters public final static LongAdder tot_mgr_log_split_batch_start = new LongAdder(); public final static LongAdder tot_mgr_log_split_batch_success = new LongAdder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java index dd4eb935a081..ca07fcb1ee33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java @@ -31,7 +31,10 @@ * Encapsulates protobuf serialization/deserialization so we don't leak generated pb outside of * this class. Used by regionserver and master packages. *

Immutable + * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based + * distributed WAL splitter, see SplitWALManager */ +@Deprecated @InterfaceAudience.Private public class SplitLogTask { private final ServerName originServer; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java index 8682c91c850d..33d8f2ca779d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java @@ -40,8 +40,11 @@ * Methods required for task life circle:
* {@link #checkTaskStillAvailable(String)} Check that task is still there
* {@link #checkTasks()} check for unassigned tasks and resubmit them + * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based + * distributed WAL splitter, see SplitWALManager */ @InterfaceAudience.Private +@Deprecated public interface SplitLogManagerCoordination { /** * Detail class that shares data between coordination and split log manager diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java index ad74015490b7..a9fae4640d5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java @@ -1,5 +1,4 @@ - /** - * + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,16 +17,14 @@ */ package org.apache.hadoop.hbase.coordination; import java.util.concurrent.atomic.LongAdder; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.SplitLogTask; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; - +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -44,7 +41,10 @@ *

* Important methods for WALSplitterHandler:
* splitting task has completed. + * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based + * distributed WAL splitter, see SplitWALManager */ +@Deprecated @InterfaceAudience.Private public interface SplitLogWorkerCoordination { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index ba73d53258b5..323e5752ace9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -25,7 +25,10 @@ /** * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}. + * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based + * distributed WAL splitter (see SplitWALManager) which doesn't use this zk-based coordinator. */ +@Deprecated @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class ZkCoordinatedStateManager implements CoordinatedStateManager { protected ZKWatcher watcher; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index fc29a381d47f..c470acd60356 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerFactory; import org.apache.hadoop.hbase.ipc.RpcServerInterface; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.locking.LockProcedure; @@ -2436,6 +2437,12 @@ public ClearDeadServersResponse clearDeadServers(RpcController controller, @Override public ReportProcedureDoneResponse reportProcedureDone(RpcController controller, ReportProcedureDoneRequest request) throws ServiceException { + // Check Masters is up and ready for duty before progressing. Remote side will keep trying. + try { + this.master.checkServiceStarted(); + } catch (ServerNotRunningYetException snrye) { + throw new ServiceException(snrye); + } request.getResultList().forEach(result -> { if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) { master.remoteProcedureCompleted(result.getProcId()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 9d015f38978d..6001c8f9a98f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -44,7 +43,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -55,6 +53,9 @@ public class MasterWalManager { private static final Logger LOG = LoggerFactory.getLogger(MasterWalManager.class); + /** + * Filter *in* WAL files that are for the hbase:meta Region. + */ final static PathFilter META_FILTER = new PathFilter() { @Override public boolean accept(Path p) { @@ -62,6 +63,9 @@ public boolean accept(Path p) { } }; + /** + * Filter *out* WAL files that are for the hbase:meta Region; i.e. return user-space WALs only. + */ @VisibleForTesting public final static PathFilter NON_META_FILTER = new PathFilter() { @Override @@ -81,10 +85,19 @@ public boolean accept(Path p) { // The Path to the old logs dir private final Path oldLogDir; + private final Path rootDir; // create the split log lock private final Lock splitLogLock = new ReentrantLock(); + + /** + * Superceded by {@link SplitWALManager}; i.e. procedure-based WAL splitting rather than + * 'classic' zk-coordinated WAL splitting. + * @deprecated since 2.3.0 and 3.0.0 to be removed in 4.0.0; replaced by {@link SplitWALManager}. + * @see SplitWALManager + */ + @Deprecated private final SplitLogManager splitLogManager; // Is the fileystem ok? @@ -102,7 +115,6 @@ public MasterWalManager(Configuration conf, FileSystem fs, Path rootDir, MasterS this.rootDir = rootDir; this.services = services; this.splitLogManager = new SplitLogManager(services, conf); - this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); } @@ -204,7 +216,7 @@ public FileStatus[] getWALDirPaths(final PathFilter filter) throws IOException { */ @Deprecated public Set getFailedServersFromLogFolders() throws IOException { - boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", + boolean retrySplitting = !conf.getBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); Set serverNames = new HashSet<>(); @@ -361,11 +373,13 @@ public void splitLog(final Set serverNames, PathFilter filter) throw } /** - * For meta region open and closed normally on a server, it may leave some meta - * WAL in the server's wal dir. Since meta region is no long on this server, - * The SCP won't split those meta wals, just leaving them there. So deleting - * the wal dir will fail since the dir is not empty. Actually We can safely achive those - * meta log and Archiving the meta log and delete the dir. + * The hbase:meta region may OPEN and CLOSE without issue on a server and then move elsewhere. + * On CLOSE, the WAL for the hbase:meta table may not be archived yet (The WAL is only needed if + * hbase:meta did not close cleanaly). Since meta region is no long on this server, + * the ServerCrashProcedure won't split these leftover hbase:meta WALs, just leaving them in + * the WAL splitting dir. If we try to delete the WAL splitting for the server, it fail since + * the dir is not totally empty. We can safely archive these hbase:meta log; then the + * WAL dir can be deleted. * @param serverName the server to archive meta log */ public void archiveMetaLog(final ServerName serverName) { @@ -396,6 +410,4 @@ public void archiveMetaLog(final ServerName serverName) { LOG.warn("Failed archiving meta log for server " + serverName, ie); } } - - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java index caf43247e5c9..9d4550c5eb0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java @@ -159,6 +159,9 @@ public long getNumWALFiles() { @Override public Map> getTableSpaceUtilization() { + if (master == null) { + return Collections.emptyMap(); + } QuotaObserverChore quotaChore = master.getQuotaObserverChore(); if (quotaChore == null) { return Collections.emptyMap(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 7ed0d9a0c960..3e0c7460eaf3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -86,7 +86,10 @@ * again. If a task is resubmitted then there is a risk that old "delete task" * can delete the re-submission. * @see SplitWALManager for an alternate implementation based on Procedures. + * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based + * distributed WAL splitter, see SplitWALManager. */ +@Deprecated @InterfaceAudience.Private public class SplitLogManager { private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java index 76407e039545..9ff84dc942e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,35 +16,36 @@ * limitations under the License. */ package org.apache.hadoop.hbase.master; - import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER; - import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -78,15 +79,17 @@ public class SplitWALManager { private final Path rootDir; private final FileSystem fs; private final Configuration conf; + private final Path walArchiveDir; - public SplitWALManager(MasterServices master) { + public SplitWALManager(MasterServices master) throws IOException { this.master = master; this.conf = master.getConfiguration(); this.splitWorkerAssigner = new SplitWorkerAssigner(this.master, conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); this.rootDir = master.getMasterFileSystem().getWALRootDir(); + // TODO: This should be the WAL FS, not the Master FS? this.fs = master.getMasterFileSystem().getFileSystem(); - + this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); } public List splitWALs(ServerName crashedServer, boolean splitMeta) @@ -117,14 +120,24 @@ private Path getWALSplitDir(ServerName serverName) { return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); } - public void deleteSplitWAL(String wal) throws IOException { - fs.delete(new Path(wal), false); + /** + * Archive processed WAL + */ + public void archive(String wal) throws IOException { + WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir); } public void deleteWALDir(ServerName serverName) throws IOException { Path splitDir = getWALSplitDir(serverName); - if (!fs.delete(splitDir, false)) { - LOG.warn("Failed delete {}", splitDir); + try { + if (!fs.delete(splitDir, false)) { + LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true)); + } + } catch (PathIsNotEmptyDirectoryException e) { + FileStatus [] files = CommonFSUtils.listStatus(fs, splitDir); + LOG.warn("PathIsNotEmptyDirectoryException {}", + Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList())); + throw e; } } @@ -197,7 +210,11 @@ public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) { this.maxSplitTasks = maxSplitTasks; this.master = master; this.event = new ProcedureEvent<>("split-WAL-worker-assigning"); - this.master.getServerManager().registerListener(this); + // ServerManager might be null in a test context where we are mocking; allow for this + ServerManager sm = this.master.getServerManager(); + if (sm != null) { + sm.registerListener(this); + } } public synchronized Optional acquire() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index a6ebbaac35d2..17606c340665 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,10 +16,8 @@ * limitations under the License. */ package org.apache.hadoop.hbase.master.procedure; - import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -47,7 +45,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState; @@ -154,8 +151,8 @@ protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) break; case SERVER_CRASH_SPLIT_META_LOGS: if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, - DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { - splitMetaLogs(env); + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + zkCoordinatedSplitMetaLogs(env); setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); } else { am.getRegionStates().metaLogSplitting(serverName); @@ -164,8 +161,7 @@ protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) } break; case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR: - if(isSplittingDone(env, true)){ - cleanupSplitDir(env); + if (isSplittingDone(env, true)) { setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); am.getRegionStates().metaLogSplit(serverName); } else { @@ -195,7 +191,7 @@ protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) case SERVER_CRASH_SPLIT_LOGS: if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { - splitLogs(env); + zkCoordinatedSplitLogs(env); setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); } else { am.getRegionStates().logSplitting(this.serverName); @@ -256,19 +252,27 @@ List getRegionsOnCrashedServer(MasterProcedureEnv env) { private void cleanupSplitDir(MasterProcedureEnv env) { SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); try { + if (!this.carryingMeta) { + // If we are NOT carrying hbase:meta, check if any left-over hbase:meta WAL files from an + // old hbase:meta tenancy on this server; clean these up if any before trying to remove the + // WAL directory of this server or we will fail. See archiveMetaLog comment for more details + // on this condition. + env.getMasterServices().getMasterWalManager().archiveMetaLog(this.serverName); + } splitWALManager.deleteWALDir(serverName); } catch (IOException e) { - LOG.warn("Remove WAL directory of server {} failed, ignore...", serverName, e); + LOG.warn("Remove WAL directory for {} failed, ignore...{}", serverName, e.getMessage()); } } private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) { - LOG.debug("check if splitting WALs of {} done? isMeta: {}", serverName, splitMeta); SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); try { - return splitWALManager.getWALsToSplit(serverName, splitMeta).size() == 0; + int wals = splitWALManager.getWALsToSplit(serverName, splitMeta).size(); + LOG.debug("Check if {} WAL splitting is done? wals={}, meta={}", serverName, wals, splitMeta); + return wals == 0; } catch (IOException e) { - LOG.warn("get filelist of serverName {} failed, retry...", serverName, e); + LOG.warn("Get WALs of {} failed, retry...", serverName, e); return false; } } @@ -293,7 +297,12 @@ private boolean isDefaultMetaRegion(RegionInfo hri) { return hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri); } - private void splitMetaLogs(MasterProcedureEnv env) throws IOException { + /** + * Split hbase:meta logs using 'classic' zk-based coordination. + * Superceded by procedure-based WAL splitting. + * @see #createSplittingWalProcedures(MasterProcedureEnv, boolean) + */ + private void zkCoordinatedSplitMetaLogs(MasterProcedureEnv env) throws IOException { LOG.debug("Splitting meta WALs {}", this); MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); AssignmentManager am = env.getMasterServices().getAssignmentManager(); @@ -303,7 +312,12 @@ private void splitMetaLogs(MasterProcedureEnv env) throws IOException { LOG.debug("Done splitting meta WALs {}", this); } - private void splitLogs(final MasterProcedureEnv env) throws IOException { + /** + * Split logs using 'classic' zk-based coordination. + * Superceded by procedure-based WAL splitting. + * @see #createSplittingWalProcedures(MasterProcedureEnv, boolean) + */ + private void zkCoordinatedSplitLogs(final MasterProcedureEnv env) throws IOException { LOG.debug("Splitting WALs {}", this); MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); AssignmentManager am = env.getMasterServices().getAssignmentManager(); @@ -333,14 +347,12 @@ void updateProgress(boolean updateState) { currentRunningState = getCurrentState(); } int childrenLatch = getChildrenLatch(); - status.setStatus(msg + " current State " + currentRunningState - + (childrenLatch > 0 ? "; remaining num of running child procedures = " + childrenLatch - : "")); + status.setStatus(msg + " current State " + currentRunningState + (childrenLatch > 0? + "; remaining num of running child procedures = " + childrenLatch: "")); } @Override - protected void rollbackState(MasterProcedureEnv env, ServerCrashState state) - throws IOException { + protected void rollbackState(MasterProcedureEnv env, ServerCrashState state) throws IOException { // Can't rollback. throw new UnsupportedOperationException("unhandled state=" + state); } @@ -424,7 +436,8 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) int size = state.getRegionsOnCrashedServerCount(); if (size > 0) { this.regionsOnCrashedServer = new ArrayList<>(size); - for (org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri: state.getRegionsOnCrashedServerList()) { + for (org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri: + state.getRegionsOnCrashedServerList()) { this.regionsOnCrashedServer.add(ProtobufUtil.toRegionInfo(ri)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java index c829e51e0890..54607e613697 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -96,7 +96,7 @@ public Optional remoteCallBuild(Maste protected void complete(MasterProcedureEnv env, Throwable error) { if (error == null) { try { - env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath); + env.getMasterServices().getSplitWALManager().archive(walPath); } catch (IOException e) { LOG.warn("Failed split of {}; ignore...", walPath, e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 0457f90d3369..04ef115ed735 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -66,7 +66,10 @@ * the absence of a global lock there is a unavoidable race here - a worker might have just finished * its task when it is stripped of its ownership. Here we rely on the idempotency of the log * splitting task for correctness + * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based + * distributed WAL splitter, see SplitWALRemoteProcedure */ +@Deprecated @InterfaceAudience.Private public class SplitLogWorker implements Runnable { @@ -181,8 +184,8 @@ static Status splitLog(String filename, CancelableProgressable p, Configuration SplitLogWorkerCoordination splitLogWorkerCoordination = server.getCoordinatedStateManager() == null ? null : server.getCoordinatedStateManager().getSplitLogWorkerCoordination(); - if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf, p, - sequenceIdChecker, splitLogWorkerCoordination, factory, server)) { + if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf, + p, sequenceIdChecker, splitLogWorkerCoordination, factory, server)) { return Status.PREEMPTED; } } catch (InterruptedIOException iioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java index d6009e388fa7..ff39531c11ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java @@ -38,7 +38,10 @@ /** * Handles log splitting a wal * Used by the zk-based distributed log splitting. Created by ZKSplitLogWorkerCoordination. - */ + * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based + * distributed WAL splitter, see SplitWALManager + */ +@Deprecated @InterfaceAudience.Private public class WALSplitterHandler extends EventHandler { private static final Logger LOG = LoggerFactory.getLogger(WALSplitterHandler.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java index 2df1a38ce2e3..d392366ff5da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +16,6 @@ * limitations under the License. */ package org.apache.hadoop.hbase.wal; - import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -57,9 +56,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -87,9 +84,6 @@ private WALSplitUtil() { * the splitLogFile() part. If the master crashes then this function might get called multiple * times. *

- * @param logfile - * @param conf - * @throws IOException */ public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { Path walDir = CommonFSUtils.getWALRootDir(conf); @@ -100,20 +94,9 @@ public static void finishSplitLogFile(String logfile, Configuration conf) throws } else { walPath = new Path(walDir, logfile); } - finishSplitLogFile(walDir, oldLogDir, walPath, conf); - } - - static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath, - Configuration conf) throws IOException { - List processedLogs = new ArrayList<>(); - List corruptedLogs = new ArrayList<>(); FileSystem walFS = walDir.getFileSystem(conf); - if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) { - corruptedLogs.add(walPath); - } else { - processedLogs.add(walPath); - } - archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf); + boolean corrupt = ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS); + archive(walPath, corrupt, oldLogDir, walFS, conf); Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName()); walFS.delete(stagingDir, true); } @@ -122,40 +105,40 @@ static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath, * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation */ - private static void archiveWALs(final List corruptedWALs, final List processedWALs, - final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException { - final Path corruptDir = - new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); - if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { - LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", - corruptDir); - } - if (!walFS.mkdirs(corruptDir)) { - LOG.info("Unable to mkdir {}", corruptDir); - } - walFS.mkdirs(oldWALDir); - - // this method can get restarted or called multiple times for archiving - // the same log files. - for (Path corruptedWAL : corruptedWALs) { - Path p = new Path(corruptDir, corruptedWAL.getName()); - if (walFS.exists(corruptedWAL)) { - if (!walFS.rename(corruptedWAL, p)) { - LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p); - } else { - LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p); - } + static void archive(final Path wal, final boolean corrupt, final Path oldWALDir, + final FileSystem walFS, final Configuration conf) throws IOException { + Path dir; + Path target; + if (corrupt) { + dir = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); + if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { + LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", dir); } + target = new Path(dir, wal.getName()); + } else { + dir = oldWALDir; + target = AbstractFSWAL.getWALArchivePath(oldWALDir, wal); } + mkdir(walFS, dir); + moveWAL(walFS, wal, target); + } - for (Path p : processedWALs) { - Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p); - if (walFS.exists(p)) { - if (!CommonFSUtils.renameAndSetModifyTime(walFS, p, newPath)) { - LOG.warn("Unable to move {} to {}", p, newPath); - } else { - LOG.info("Archived processed log {} to {}", p, newPath); - } + private static void mkdir(FileSystem fs, Path dir) throws IOException { + if (!fs.mkdirs(dir)) { + LOG.warn("Failed mkdir {}", dir); + } + } + + /** + * Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir. + * WAL may have already been moved; makes allowance. + */ + public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException { + if (fs.exists(p)) { + if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) { + LOG.warn("Failed move of {} to {}", p, targetDir); + } else { + LOG.info("Moved {} to {}", p, targetDir); } } } @@ -172,7 +155,6 @@ private static void archiveWALs(final List corruptedWALs, final List * @param tmpDirName of the directory used to sideline old recovered edits file * @param conf configuration * @return Path to file into which to dump split log edits. - * @throws IOException */ @SuppressWarnings("deprecation") @VisibleForTesting @@ -217,7 +199,7 @@ private static String getTmpRecoveredEditsFileName(String fileName) { /** * Get the completed recovered edits file path, renaming it to be by last edit in the file from * its first edit. Then we could use the name to skip recovered edits when doing - * {@link HRegion#replayRecoveredEditsIfAny}. + * HRegion#replayRecoveredEditsIfAny(Map, CancelableProgressable, MonitoredTask). * @return dstPath take file's last edit log seq num as the name */ static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) { @@ -304,7 +286,6 @@ public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region, * @param walFS WAL FileSystem used to retrieving split edits files. * @param regionDir WAL region dir to look for recovered edits files under. * @return Files in passed regionDir as a sorted set. - * @throws IOException */ public static NavigableSet getSplitEditFilesSorted(final FileSystem walFS, final Path regionDir) throws IOException { @@ -350,7 +331,6 @@ public boolean accept(Path p) { * @param fs the file system used to rename bad edits file. * @param edits Edits file to move aside. * @return The name of the moved aside file. - * @throws IOException */ public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) throws IOException { @@ -453,9 +433,9 @@ public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mut } private final ClientProtos.MutationProto.MutationType type; - public final Mutation mutation; - public final long nonceGroup; - public final long nonce; + @SuppressWarnings("checkstyle:VisibilityModifier") public final Mutation mutation; + @SuppressWarnings("checkstyle:VisibilityModifier") public final long nonceGroup; + @SuppressWarnings("checkstyle:VisibilityModifier") public final long nonce; @Override public int compareTo(final MutationReplay d) { @@ -484,12 +464,9 @@ public ClientProtos.MutationProto.MutationType getType() { /** * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & * WALEdit from the passed in WALEntry - * @param entry - * @param cells * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances * extracted from the passed in WALEntry. * @return list of Pair<MutationType, Mutation> to be replayed - * @throws IOException */ public static List getMutationsFromWALEntry(AdminProtos.WALEntry entry, CellScanner cells, Pair logEntry, Durability durability) throws IOException { @@ -517,7 +494,9 @@ public static List getMutationsFromWALEntry(AdminProtos.WALEntry throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); } Cell cell = cells.current(); - if (val != null) val.add(cell); + if (val != null) { + val.add(cell); + } boolean isNewRowOrType = previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() @@ -576,8 +555,6 @@ public static List getMutationsFromWALEntry(AdminProtos.WALEntry * @param tableName the table name * @param encodedRegionName the encoded region name * @param familyName the column family name - * @param seqId the sequence id which used to generate file name - * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name * @return Path to recovered.hfiles directory of the region's column family. */ static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index d55a89e499f3..303f428212c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.wal; -import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -30,6 +29,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +import edu.umd.cs.findbugs.annotations.Nullable; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -63,24 +62,26 @@ import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; +import javax.validation.constraints.Null; /** * Split RegionServer WAL files. Splits the WAL into new files, * one per region, to be picked up on Region reopen. Deletes the split WAL when finished. - * See {@link #split(Path, Path, Path, FileSystem, Configuration, WALFactory)} or - * {@link #splitLogFile(Path, FileStatus, FileSystem, Configuration, CancelableProgressable, - * LastSequenceId, SplitLogWorkerCoordination, WALFactory, RegionServerServices)} for - * entry-point. + * Create an instance and call {@link #splitWAL(FileStatus, CancelableProgressable)} per file or + * use static helper methods. */ @InterfaceAudience.Private public class WALSplitter { private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class); + public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors"; - /** By default we retry errors in splitting, rather than skipping. */ + /** + * By default we retry errors in splitting, rather than skipping. + */ public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; // Parameters for split process - protected final Path walDir; + protected final Path walRootDir; protected final FileSystem walFS; protected final Configuration conf; final Path rootDir; @@ -100,8 +101,6 @@ public class WALSplitter { private final WALFactory walFactory; - private MonitoredTask status; - // For checking the latest flushed sequence id protected final LastSequenceId sequenceIdChecker; @@ -132,17 +131,28 @@ public class WALSplitter { public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize"; public final static String SPLIT_WAL_WRITER_THREADS = - "hbase.regionserver.hlog.splitlog.writer.threads"; + "hbase.regionserver.hlog.splitlog.writer.threads"; + + private final int numWriterThreads; + private final long bufferSize; + private final boolean splitWriterCreationBounded; + private final boolean hfile; + private final boolean skipErrors; + + WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, + FileSystem walFS, Path rootDir, FileSystem rootFS) { + this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null); + } @VisibleForTesting - WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS, - Path rootDir, FileSystem rootFS, LastSequenceId idChecker, + WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, + FileSystem walFS, Path rootDir, FileSystem rootFS, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) { this.conf = HBaseConfiguration.create(conf); String codecClassName = - conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); + conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); - this.walDir = walDir; + this.walRootDir = walRootDir; this.walFS = walFS; this.rootDir = rootDir; this.rootFS = rootFS; @@ -150,32 +160,17 @@ public class WALSplitter { this.splitLogWorkerCoordination = splitLogWorkerCoordination; this.rsServices = rsServices; this.walFactory = factory; - PipelineController controller = new PipelineController(); this.tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - - // if we limit the number of writers opened for sinking recovered edits - boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); - boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE); - long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024); - int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3); - - if (splitToHFile) { - entryBuffers = new BoundedEntryBuffers(controller, bufferSize); - outputSink = - new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads); - } else if (splitWriterCreationBounded) { - entryBuffers = new BoundedEntryBuffers(controller, bufferSize); - outputSink = - new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); - } else { - entryBuffers = new EntryBuffers(controller, bufferSize); - outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); - } + this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); + this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024); + this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3); + this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE); + this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT); } - WALFactory getWalFactory(){ + WALFactory getWalFactory() { return this.walFactory; } @@ -193,6 +188,9 @@ Map> getRegionMaxSeqIdInStores() { /** * Splits a WAL file. + * Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and tests. + * Not used by new procedure-based WAL splitter. + * * @return false if it is interrupted by the progress-able. */ public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, @@ -201,9 +199,12 @@ public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem w RegionServerServices rsServices) throws IOException { Path rootDir = CommonFSUtils.getRootDir(conf); FileSystem rootFS = rootDir.getFileSystem(conf); - WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, - splitLogWorkerCoordination, rsServices); - return s.splitLogFile(logfile, reporter); + WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, + splitLogWorkerCoordination, rsServices); + // splitWAL returns a data structure with whether split is finished and if the file is corrupt. + // We don't need to propagate corruption flag here because it is propagated by the + // SplitLogWorkerCoordination. + return splitter.splitWAL(logfile, reporter).isFinished(); } /** @@ -214,85 +215,123 @@ public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem w * @return List of output files created by the split. */ @VisibleForTesting - public static List split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS, + public static List split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException { Path rootDir = CommonFSUtils.getRootDir(conf); FileSystem rootFS = rootDir.getFileSystem(conf); - final FileStatus[] logfiles = - SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); + WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS); + final FileStatus[] wals = + SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null); List splits = new ArrayList<>(); - if (ArrayUtils.isNotEmpty(logfiles)) { - for (FileStatus logfile : logfiles) { - WALSplitter s = - new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, null, null, null); - if (s.splitLogFile(logfile, null)) { - finishSplitLogFile(walDir, oldLogDir, logfile.getPath(), conf); - if (s.outputSink.splits != null) { - splits.addAll(s.outputSink.splits); + if (ArrayUtils.isNotEmpty(wals)) { + for (FileStatus wal: wals) { + SplitWALResult splitWALResult = splitter.splitWAL(wal, null); + if (splitWALResult.isFinished()) { + WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf); + if (splitter.outputSink.splits != null) { + splits.addAll(splitter.outputSink.splits); } } } } - if (!walFS.delete(logDir, true)) { - throw new IOException("Unable to delete src dir: " + logDir); + if (!walFS.delete(walsDir, true)) { + throw new IOException("Unable to delete src dir " + walsDir); } return splits; } /** - * WAL splitting implementation, splits one log file. - * @param logfile should be an actual log file. + * Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable). + * Test {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if + * the WAL is corrupt. + */ + static final class SplitWALResult { + private final boolean finished; + private final boolean corrupt; + + private SplitWALResult(boolean finished, boolean corrupt) { + this.finished = finished; + this.corrupt = corrupt; + } + + public boolean isFinished() { + return finished; + } + + public boolean isCorrupt() { + return corrupt; + } + } + + /** + * Setup the output sinks and entry buffers ahead of splitting WAL. + */ + private void createOutputSinkAndEntryBuffers() { + PipelineController controller = new PipelineController(); + if (this.hfile) { + this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize); + this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller, + this.entryBuffers, this.numWriterThreads); + } else if (this.splitWriterCreationBounded) { + this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize); + this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller, + this.entryBuffers, this.numWriterThreads); + } else { + this.entryBuffers = new EntryBuffers(controller, this.bufferSize); + this.outputSink = new RecoveredEditsOutputSink(this, controller, + this.entryBuffers, this.numWriterThreads); + } + } + + /** + * WAL splitting implementation, splits one WAL file. + * @param walStatus should be for an actual WAL file. */ @VisibleForTesting - boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException { - Preconditions.checkState(status == null); - Preconditions.checkArgument(logfile.isFile(), - "passed in file status is for something other than a regular file."); - boolean isCorrupted = false; - boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", - SPLIT_SKIP_ERRORS_DEFAULT); + SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException { + Path wal = walStatus.getPath(); + Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString()); + boolean corrupt = false; int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); - Path logPath = logfile.getPath(); boolean outputSinkStarted = false; - boolean progressFailed = false; + boolean cancelled = false; int editsCount = 0; int editsSkipped = 0; - - status = TaskMonitor.get().createStatus( - "Splitting log file " + logfile.getPath() + "into a temporary staging area."); + MonitoredTask status = + TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area."); status.enableStatusJournal(true); - Reader logFileReader = null; - this.fileBeingSplit = logfile; + Reader walReader = null; + this.fileBeingSplit = walStatus; long startTS = EnvironmentEdgeManager.currentTime(); + long length = walStatus.getLen(); + String lengthStr = StringUtils.humanSize(length); + createOutputSinkAndEntryBuffers(); try { - long logLength = logfile.getLen(); - LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength), - logLength); - status.setStatus("Opening log file " + logPath); - if (reporter != null && !reporter.progress()) { - progressFailed = true; - return false; + String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)"; + LOG.info(logStr); + status.setStatus(logStr); + if (cancel != null && !cancel.progress()) { + cancelled = true; + return new SplitWALResult(false, corrupt); } - logFileReader = getReader(logfile, skipErrors, reporter); - if (logFileReader == null) { - LOG.warn("Nothing to split in WAL={}", logPath); - return true; + walReader = getReader(walStatus, this.skipErrors, cancel); + if (walReader == null) { + LOG.warn("Nothing in {}; empty?", wal); + return new SplitWALResult(true, corrupt); } - long openCost = EnvironmentEdgeManager.currentTime() - startTS; - LOG.info("Open WAL={} cost {} ms", logPath, openCost); + LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS); int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); int numOpenedFilesLastCheck = 0; - outputSink.setReporter(reporter); + outputSink.setReporter(cancel); outputSink.setStatus(status); outputSink.startWriterThreads(); outputSinkStarted = true; Entry entry; - Long lastFlushedSequenceId = -1L; startTS = EnvironmentEdgeManager.currentTime(); - while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) { + while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) { byte[] region = entry.getKey().getEncodedRegionName(); String encodedRegionNameAsStr = Bytes.toString(region); - lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); + Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); if (lastFlushedSequenceId == null) { if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), encodedRegionNameAsStr))) { @@ -301,8 +340,7 @@ boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits // will get skipped by the seqId check below. // See more details at https://issues.apache.org/jira/browse/HBASE-24189 - LOG.info("{} no longer available in the FS. Skipping all edits for this region.", - encodedRegionNameAsStr); + LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr); lastFlushedSequenceId = Long.MAX_VALUE; } else { if (sequenceIdChecker != null) { @@ -315,7 +353,7 @@ boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); lastFlushedSequenceId = ids.getLastFlushedSequenceId(); if (LOG.isDebugEnabled()) { - LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + TextFormat.shortDebugString(ids)); } } @@ -344,9 +382,9 @@ boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits())) + " edits, skipped " + editsSkipped + " edits."; status.setStatus("Split " + countsStr); - if (reporter != null && !reporter.progress()) { - progressFailed = true; - return false; + if (cancel != null && !cancel.progress()) { + cancelled = true; + return new SplitWALResult(false, corrupt); } } } @@ -355,68 +393,64 @@ boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws iie.initCause(ie); throw iie; } catch (CorruptedLogFileException e) { - LOG.warn("Could not parse, corrupted WAL={}", logPath, e); - if (splitLogWorkerCoordination != null) { + LOG.warn("Could not parse, corrupt WAL={}", wal, e); + // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update + // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component. + if (this.splitLogWorkerCoordination != null) { // Some tests pass in a csm of null. - splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); - } else { - // for tests only - ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); + splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS); } - isCorrupted = true; + corrupt = true; } catch (IOException e) { e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; throw e; } finally { - final String log = "Finishing writing output logs and closing down"; + final String log = "Finishing writing output for " + wal + " so closing down"; LOG.debug(log); status.setStatus(log); try { - if (null != logFileReader) { - logFileReader.close(); + if (null != walReader) { + walReader.close(); } } catch (IOException exception) { - LOG.warn("Could not close WAL reader", exception); + LOG.warn("Could not close {} reader", wal, exception); } try { if (outputSinkStarted) { - // Set progress_failed to true as the immediate following statement will reset its value - // when close() throws exception, progress_failed has the right value - progressFailed = true; - progressFailed = outputSink.close() == null; + // Set cancelled to true as the immediate following statement will reset its value. + // If close() throws an exception, cancelled will have the right value + cancelled = true; + cancelled = outputSink.close() == null; } } finally { long processCost = EnvironmentEdgeManager.currentTime() - startTS; // See if length got updated post lease recovery String msg = "Processed " + editsCount + " edits across " + - outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost + - " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" + - StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() + - ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed; + outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost + + " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr + + ", length=" + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled; LOG.info(msg); status.markComplete(msg); if (LOG.isDebugEnabled()) { - LOG.debug("WAL split completed for {} , Journal Log: {}", logPath, - status.prettyPrintJournal()); + LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal()); } } } - return !progressFailed; + return new SplitWALResult(!cancelled, corrupt); } - private boolean isRegionDirPresentUnderRoot(TableName tableName, String regionName) - throws IOException { - Path regionDirPath = CommonFSUtils.getRegionDir(this.rootDir, tableName, regionName); - return this.rootFS.exists(regionDirPath); + private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException { + return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region)); } /** * Create a new {@link Reader} for reading logs to split. + * @return Returns null if file has length zero or file can't be found. */ - private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) + protected Reader getReader(FileStatus walStatus, boolean skipErrors, CancelableProgressable cancel) throws IOException, CorruptedLogFileException { - Path path = file.getPath(); - long length = file.getLen(); + Path path = walStatus.getPath(); + long length = walStatus.getLen(); Reader in; // Check for possibly empty file. With appends, currently Hadoop reports a @@ -427,9 +461,9 @@ private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgress } try { - RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, reporter); + RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel); try { - in = getReader(path, reporter); + in = getReader(path, cancel); } catch (EOFException e) { if (length <= 0) { // TODO should we ignore an empty, not-last log file if skip.errors @@ -451,8 +485,8 @@ private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgress if (!skipErrors || e instanceof InterruptedIOException) { throw e; // Don't mark the file corrupted if interrupted, or not skipErrors } - throw new CorruptedLogFileException("skipErrors=true Could not open wal " - + path + " ignoring", e); + throw new CorruptedLogFileException("skipErrors=true; could not open " + path + + ", skipping", e); } return in; } @@ -463,14 +497,14 @@ private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) return in.next(); } catch (EOFException eof) { // truncated files are expected if a RS crashes (see HBASE-2643) - LOG.info("EOF from wal {}. Continuing.", path); + LOG.info("EOF from {}; continuing.", path); return null; } catch (IOException e) { // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help if (e.getCause() != null && (e.getCause() instanceof ParseException || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { - LOG.warn("Parse exception from wal {}. Continuing", path, e); + LOG.warn("Parse exception from {}; continuing", path, e); return null; } if (!skipErrors) { @@ -493,7 +527,7 @@ protected WALProvider.Writer createWriter(Path logfile) throws IOException { * Create a new {@link Reader} for reading logs to split. * @return new Reader instance, caller should close */ - protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { + private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { return walFactory.createReader(walFS, curLogFile, reporter); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index f5d5d41e3429..b8f284ebe914 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,17 +19,9 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -43,18 +34,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.LongAdder; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.StartMiniClusterOption; @@ -67,7 +54,6 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; -import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; @@ -77,12 +63,10 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; -import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.junit.After; import org.junit.AfterClass; @@ -93,7 +77,6 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** @@ -176,83 +159,6 @@ public void after() throws Exception { ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); } - @Test - public void testRecoveredEdits() throws Exception { - conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal - startCluster(NUM_RS); - - int numLogLines = 10000; - SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); - // turn off load balancing to prevent regions from moving around otherwise - // they will consume recovered.edits - master.balanceSwitch(false); - FileSystem fs = master.getMasterFileSystem().getFileSystem(); - - List rsts = cluster.getLiveRegionServerThreads(); - - Path rootdir = CommonFSUtils.getRootDir(conf); - - int numRegions = 50; - try (Table t = installTable(numRegions)) { - List regions = null; - HRegionServer hrs = null; - for (int i = 0; i < NUM_RS; i++) { - hrs = rsts.get(i).getRegionServer(); - regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); - // At least one RS will have >= to average number of regions. - if (regions.size() >= numRegions / NUM_RS) { - break; - } - } - Path logDir = new Path(rootdir, - AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); - - LOG.info("#regions = " + regions.size()); - Iterator it = regions.iterator(); - while (it.hasNext()) { - RegionInfo region = it.next(); - if (region.getTable().getNamespaceAsString() - .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) { - it.remove(); - } - } - - makeWAL(hrs, regions, numLogLines, 100); - - slm.splitLogDistributed(logDir); - - int count = 0; - for (RegionInfo hri : regions) { - @SuppressWarnings("deprecation") - Path editsdir = WALSplitUtil - .getRegionDirRecoveredEditsDir(CommonFSUtils.getWALRegionDir(conf, - tableName, hri.getEncodedName())); - LOG.debug("Checking edits dir " + editsdir); - FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { - @Override - public boolean accept(Path p) { - if (WALSplitUtil.isSequenceIdFile(p)) { - return false; - } - return true; - } - }); - LOG.info("Files {}", Arrays.stream(files).map(f -> f.getPath().toString()). - collect(Collectors.joining(","))); - assertTrue("Edits dir should have more than a one file", files.length > 1); - for (int i = 0; i < files.length; i++) { - int c = countWAL(files[i].getPath(), fs, conf); - count += c; - } - LOG.info(count + " edits in " + files.length + " recovered edits files."); - } - - // check that the log file is moved - assertFalse(fs.exists(logDir)); - assertEquals(numLogLines, count); - } - } - @Test public void testMasterStartsUpWithLogSplittingWork() throws Exception { conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); @@ -303,71 +209,6 @@ public boolean evaluate() throws Exception { } } - /** - * The original intention of this test was to force an abort of a region server and to make sure - * that the failure path in the region servers is properly evaluated. But it is difficult to - * ensure that the region server doesn't finish the log splitting before it aborts. Also now, - * there is this code path where the master will preempt the region server when master detects - * that the region server has aborted. - * @throws Exception - */ - // Was marked flaky before Distributed Log Replay cleanup. - @Test - public void testWorkerAbort() throws Exception { - LOG.info("testWorkerAbort"); - startCluster(3); - int numLogLines = 10000; - SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); - FileSystem fs = master.getMasterFileSystem().getFileSystem(); - - List rsts = cluster.getLiveRegionServerThreads(); - HRegionServer hrs = findRSToKill(false); - Path rootdir = CommonFSUtils.getRootDir(conf); - final Path logDir = new Path(rootdir, - AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); - - try (Table t = installTable(40)) { - makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100); - - new Thread() { - @Override - public void run() { - try { - waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); - } catch (InterruptedException e) { - } - for (RegionServerThread rst : rsts) { - rst.getRegionServer().abort("testing"); - break; - } - } - }.start(); - FileStatus[] logfiles = fs.listStatus(logDir); - TaskBatch batch = new TaskBatch(); - slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); - // waitForCounter but for one of the 2 counters - long curt = System.currentTimeMillis(); - long waitTime = 80000; - long endt = curt + waitTime; - while (curt < endt) { - if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + - tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + - tot_wkr_preempt_task.sum()) == 0) { - Thread.sleep(100); - curt = System.currentTimeMillis(); - } else { - assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + - tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + - tot_wkr_preempt_task.sum())); - return; - } - } - fail("none of the following counters went up in " + waitTime + " milliseconds - " + - "tot_wkr_task_resigned, tot_wkr_task_err, " + - "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task"); - } - } - @Test public void testThreeRSAbort() throws Exception { LOG.info("testThreeRSAbort"); @@ -411,6 +252,11 @@ public String explainFailure() throws Exception { @Test public void testDelayedDeleteOnFailure() throws Exception { + if (!this.conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + // This test depends on zk coordination.... + return; + } LOG.info("testDelayedDeleteOnFailure"); startCluster(1); final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); @@ -504,8 +350,9 @@ private Table installTable(int nrs, int existingRegions) throws Exception { NavigableSet regions = HBaseTestingUtility.getAllOnlineRegions(cluster); LOG.debug("Verifying only catalog region is assigned\n"); if (regions.size() != 1) { - for (String oregion : regions) + for (String oregion : regions) { LOG.debug("Region still online: " + oregion); + } } assertEquals(1 + existingRegions, regions.size()); LOG.debug("Enabling table\n"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java index 2d0156590d4f..8c949092526b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,9 +16,9 @@ * limitations under the License. */ package org.apache.hadoop.hbase.master.assignment; - +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import static org.mockito.ArgumentMatchers.any; - import java.io.IOException; import java.util.List; import java.util.Map; @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.master.MasterWalManager; import org.apache.hadoop.hbase.master.MockNoopMasterServices; import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.SplitWALManager; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; @@ -60,9 +61,7 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; - import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; @@ -79,6 +78,7 @@ public class MockMasterServices extends MockNoopMasterServices { private final MasterFileSystem fileSystemManager; private final MasterWalManager walManager; + private final SplitWALManager splitWALManager; private final AssignmentManager assignmentManager; private final TableStateManager tableStateManager; @@ -100,6 +100,10 @@ public MockMasterServices(Configuration conf, Superusers.initialize(conf); this.fileSystemManager = new MasterFileSystem(conf); this.walManager = new MasterWalManager(this); + this.splitWALManager = + conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)? + null: new SplitWALManager(this); + // Mock an AM. this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) { @Override @@ -358,4 +362,8 @@ private static MultiResponse buildMultiResponse(MultiRequest req) { } return builder.build(); } + + @Override public SplitWALManager getSplitWALManager() { + return splitWALManager; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java index fd1e53356798..63b611d378f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java @@ -77,7 +77,7 @@ public void testCleanupMetaWAL() throws Exception { Path walPath = new Path(fs.getWALRootDir(), HConstants.HREGION_LOGDIR_NAME); for (FileStatus status : CommonFSUtils.listStatus(fs.getFileSystem(), walPath)) { if (status.getPath().toString().contains(SPLITTING_EXT)) { - fail("Should not have splitting wal dir here:" + status); + fail("Splitting WAL dir should have been cleaned up: " + status); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index 7e93932323fc..f36c5b9d1f38 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; - import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; @@ -84,7 +83,7 @@ public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); - conf.setBoolean("hbase.hlog.split.skip.errors", true); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir()); } @@ -168,21 +167,14 @@ private void testSecureWALInternal(boolean offheap) throws IOException, FileNotF wals.createReader(TEST_UTIL.getTestFileSystem(), walPath); assertFalse(true); } catch (IOException ioe) { - // expected IOE + System.out.println("Expected ioe " + ioe.getMessage()); } FileStatus[] listStatus = fs.listStatus(walPath.getParent()); Path rootdir = CommonFSUtils.getRootDir(conf); - try { - WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null); - s.splitLogFile(listStatus[0], null); - Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), - "corrupt"); - assertTrue(fs.exists(file)); - // assertFalse("log splitting should have failed", true); - } catch (IOException ioe) { - assertTrue("WAL should have been sidelined", false); - } + WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null); + WALSplitter.SplitWALResult swr = s.splitWAL(listStatus[0], null); + assertTrue(swr.isCorrupt()); wals.close(); } @@ -219,7 +211,7 @@ public void testSecureWALReaderOnWAL() throws Exception { Path rootdir = CommonFSUtils.getRootDir(conf); try { WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null); - s.splitLogFile(listStatus[0], null); + s.splitWAL(listStatus[0], null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); assertTrue(!fs.exists(file)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 43cf81fae6fd..5f22b454697e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Method; @@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; @@ -91,12 +91,10 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.Joiner; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -105,18 +103,9 @@ */ @Category({RegionServerTests.class, LargeTests.class}) public class TestWALSplit { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestWALSplit.class); - - { - // Uncomment the following lines if more verbosity is needed for - // debugging (see HBASE-12285 for details). - //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); - //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); - //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); - } private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class); private static Configuration conf; @@ -143,7 +132,6 @@ public class TestWALSplit { private static final byte[] VALUE = Bytes.toBytes("v1"); private static final String WAL_FILE_PREFIX = "wal.dat."; private static List REGIONS = new ArrayList<>(); - private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors"; private static String ROBBER; private static String ZOMBIE; private static String [] GROUP = new String [] {"supergroup"}; @@ -223,8 +211,6 @@ public void tearDown() throws Exception { /** * Simulates splitting a WAL out from under a regionserver that is still trying to write it. * Ensures we do not lose edits. - * @throws IOException - * @throws InterruptedException */ @Test public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { @@ -553,7 +539,7 @@ public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOExcept @Test public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, true); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); generateWALs(Integer.MAX_VALUE); corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.APPEND_GARBAGE, true); @@ -562,7 +548,7 @@ public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOExceptio @Test public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, true); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); generateWALs(Integer.MAX_VALUE); corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); @@ -571,7 +557,7 @@ public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException @Test public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, true); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); generateWALs(Integer.MAX_VALUE); corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false); @@ -587,7 +573,7 @@ public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOExce @Test public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, true); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); List failureTypes = Arrays .asList(FaultyProtobufLogReader.FailureType.values()).stream() .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList()); @@ -654,14 +640,14 @@ private Set splitCorruptWALs(final FaultyProtobufLogReader.FailureType f @Test (expected = IOException.class) public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, false); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); } @Test public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, false); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); try { splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); } catch (IOException e) { @@ -673,7 +659,7 @@ public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() private void ignoreCorruption(final Corruptions corruption, final int entryCount, final int expectedCount) throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, false); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); final String REGION = "region__1"; REGIONS.clear(); @@ -698,7 +684,7 @@ private void ignoreCorruption(final Corruptions corruption, final int entryCount in.close(); // should not have stored the EOF files as corrupt - FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); + FileStatus[] archivedLogs = fs.exists(CORRUPTDIR)? fs.listStatus(CORRUPTDIR): new FileStatus[0]; assertEquals(0, archivedLogs.length); } @@ -717,7 +703,7 @@ public void testCorruptWALTrailer() throws IOException { @Test public void testLogsGetArchivedAfterSplit() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, false); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); generateWALs(-1); useDifferentDFSClient(); WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); @@ -793,7 +779,7 @@ public void testSplitDeletedRegion() throws IOException { @Test public void testIOEOnOutputThread() throws Exception { - conf.setBoolean(HBASE_SKIP_ERRORS, false); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); generateWALs(-1); useDifferentDFSClient(); @@ -847,7 +833,7 @@ public void run() { t.setDaemon(true); t.start(); try { - logSplitter.splitLogFile(logfiles[largestLogFile], null); + logSplitter.splitWAL(logfiles[largestLogFile], null); fail("Didn't throw!"); } catch (IOException ioe) { assertTrue(ioe.toString().contains("Injected")); @@ -944,7 +930,7 @@ public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { try { conf.setInt("hbase.splitlog.report.period", 1000); boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null, - null, wals, null); + Mockito.mock(SplitLogWorkerCoordination.class), wals, null); assertFalse("Log splitting should failed", ret); assertTrue(count.get() > 0); } catch (IOException e) { @@ -1002,9 +988,7 @@ private void doTestThreading(final int numFakeEdits, makeRegionDirs(regions); // Create a splitter that reads and writes the data without touching disk - WALSplitter logSplitter = - new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { - + WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) { /* Produce a mock writer that doesn't write anywhere */ @Override protected Writer createWriter(Path logfile) throws IOException { @@ -1039,8 +1023,8 @@ public Void answer(InvocationOnMock invocation) { /* Produce a mock reader that generates fake entries */ @Override - protected Reader getReader(Path curLogFile, CancelableProgressable reporter) - throws IOException { + protected Reader getReader(FileStatus file, boolean skipErrors, + CancelableProgressable reporter) throws IOException, CorruptedLogFileException { Reader mockReader = Mockito.mock(Reader.class); Mockito.doAnswer(new Answer() { int index = 0; @@ -1064,7 +1048,7 @@ public Entry answer(InvocationOnMock invocation) throws Throwable { } }; - logSplitter.splitLogFile(fs.getFileStatus(logPath), null); + logSplitter.splitWAL(fs.getFileStatus(logPath), null); // Verify number of written edits per region Map outputCounts = logSplitter.outputSink.getOutputCounts(); @@ -1119,7 +1103,7 @@ public void testSplitLogFileMultipleRegions() throws IOException { @Test public void testSplitLogFileFirstLineCorruptionLog() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, true); + conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); generateWALs(1, 10, -1); FileStatus logfile = fs.listStatus(WALDIR)[0]; @@ -1175,7 +1159,7 @@ protected Writer createWriter(Path logfile) } }; try{ - logSplitter.splitLogFile(logfiles[0], null); + logSplitter.splitWAL(logfiles[0], null); } catch (IOException e) { LOG.info(e.toString(), e); fail("Throws IOException when spliting " diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index c7b45fe9b656..bb02af3788aa 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -221,8 +221,8 @@ public static void setMetaLocation(ZKWatcher zookeeper, ServerName serverName, i LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required"); return; } - LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}", replicaId, - serverName); + LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}, state={}", replicaId, + serverName, state); // Make the MetaRegionServer pb and then get its bytes and save this as // the znode content. MetaRegionServer pbrsr = MetaRegionServer.newBuilder() diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index f93307010a7f..63e2857b0e9c 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -32,7 +32,10 @@ /** * Common methods and attributes used by SplitLogManager and SplitLogWorker running distributed * splitting of WAL logs. + * @deprecated since 2.4.0 and 3.0.0 replaced by procedure-based WAL splitting; see + * SplitWALManager. */ +@Deprecated @InterfaceAudience.Private public final class ZKSplitLog { private static final Logger LOG = LoggerFactory.getLogger(ZKSplitLog.class);