Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27217 Revisit the DumpReplicationQueues tool #4810

Merged
merged 2 commits into from
Nov 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -31,7 +35,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
Expand All @@ -40,28 +44,33 @@
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.io.WALLink;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKDump;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;

/**
* TODO: reimplement this tool
* <p/>
* Provides information about the existing states of replication, replication peers and queues.
* Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
* Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
* usage by the replication queues (note: can be overestimated).
* usage by the replication queues (note: can be overestimated). In the new version, we
* reimplemented the DumpReplicationQueues tool to support obtaining information from replication
* table.
*/
@InterfaceAudience.Private
public class DumpReplicationQueues extends Configured implements Tool {
Expand Down Expand Up @@ -185,7 +194,7 @@ protected static void printUsage(final String className, final String message) {
System.err.println("General Options:");
System.err.println(" -h|--h|--help Show this help and exit.");
System.err.println(" --distributed Poll each RS and print its own replication queue. "
+ "Default only polls ZooKeeper");
+ "Default only polls replication table.");
System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication."
+ " It could be overestimated if replicating to multiple peers."
+ " --distributed flag is also needed.");
Expand All @@ -201,13 +210,7 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();

ZKWatcher zkw =
new ZKWatcher(conf, "DumpReplicationQueues" + EnvironmentEdgeManager.currentTime(),
new WarnOnlyAbortable(), true);

try {
// Our zk watcher
LOG.info("Our Quorum: " + zkw.getQuorum());
List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
if (replicatedTableCFs.isEmpty()) {
LOG.info("No tables with a configured replication peer were found.");
Expand All @@ -229,21 +232,72 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
LOG.info("Found [--distributed], will poll each RegionServer.");
Set<String> peerIds =
peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet());
System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
System.out.println(dumpQueues(connection, peerIds, opts.isHdfs(), conf));
System.out.println(dumpReplicationSummary());
} else {
// use ZK instead
System.out.print("Dumping replication znodes via ZooKeeper:");
System.out.println(ZKDump.getReplicationZnodesDump(zkw));
// use replication table instead
System.out.println("Dumping replication info via replication table.");
System.out.println(dumpReplicationViaTable(connection, conf));
}
return (0);
} catch (IOException e) {
return (-1);
} finally {
zkw.close();
connection.close();
}
}

public String dumpReplicationViaTable(Connection connection, Configuration conf)
throws ReplicationException, IOException {
StringBuilder sb = new StringBuilder();
ReplicationQueueStorage queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);

// The dump info format is as follows:
// peers:
// peers/1: zk1:2181:/hbase
// peers/1/peer-state: ENABLED
// rs:
// rs/rs1,16020,1664092120094/1/rs1%2C16020%2C1664092120094.1664096778778: 123
// rs/rs2,16020,1664092120094/2/rs1%2C16020%2C1664092120094.1664096778778: 321
// hfile-refs:
// hfile-refs/1/hfile1,hfile2
// hfile-refs/2/hfile3,hfile4
String peersKey = "peers";
sb.append(peersKey).append(": ").append("\n");
List<ReplicationPeerDescription> repPeerDescs = connection.getAdmin().listReplicationPeers();
for (ReplicationPeerDescription repPeerDesc : repPeerDescs) {
sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append(": ")
.append(repPeerDesc.getPeerConfig().getClusterKey()).append("\n");
sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append("/peer-state: ")
.append(repPeerDesc.isEnabled() ? "ENABLED" : "DISABLED").append("\n");
}

List<ReplicationQueueData> repQueueDataList = queueStorage.listAllQueues();
String rsKey = "rs";
sb.append(rsKey).append(": ").append("\n");
for (ReplicationQueueData repQueueData : repQueueDataList) {
String peerId = repQueueData.getId().getPeerId();
for (ImmutableMap.Entry<String, ReplicationGroupOffset> entry : repQueueData.getOffsets()
.entrySet()) {
sb.append(rsKey).append("/").append(entry.getKey()).append("/").append(peerId).append("/")
.append(entry.getValue().getWal()).append(": ").append(entry.getValue().getOffset())
.append("\n");
}
}

List<String> peerIds = queueStorage.getAllPeersFromHFileRefsQueue();
String hfileKey = "hfile-refs";
sb.append(hfileKey).append(": ").append("\n");
for (String peerId : peerIds) {
List<String> hfiles = queueStorage.getReplicableHFiles(peerId);
sb.append(hfileKey).append("/").append(peerId).append("/").append(String.join(",", hfiles))
.append("\n");
}

return sb.toString();
}

public String dumpReplicationSummary() {
StringBuilder sb = new StringBuilder();
if (!deletedQueues.isEmpty()) {
Expand Down Expand Up @@ -294,80 +348,111 @@ public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce
return sb.toString();
}

public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception {
ReplicationQueueStorage queueStorage;
public String dumpQueues(Connection connection, Set<String> peerIds, boolean hdfs,
Configuration conf) throws Exception {
StringBuilder sb = new StringBuilder();
ReplicationQueueStorage queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);

Set<ServerName> liveRegionServers =
connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet();

// queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
// Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
// zkw.getZNodePaths().rsZNode)
// .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
//
// Loops each peer on each RS and dumps the queues
// List<ServerName> regionservers = queueStorage.getListOfReplicators();
// if (regionservers == null || regionservers.isEmpty()) {
// return sb.toString();
// }
// for (ServerName regionserver : regionservers) {
// List<String> queueIds = queueStorage.getAllQueues(regionserver);
// if (!liveRegionServers.contains(regionserver)) {
// deadRegionServers.add(regionserver.getServerName());
// }
// for (String queueId : queueIds) {
// ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
// List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
// Collections.sort(wals);
// if (!peerIds.contains(queueInfo.getPeerId())) {
// deletedQueues.add(regionserver + "/" + queueId);
// sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
// } else {
// sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
// }
// }
// }
List<ServerName> regionServers = queueStorage.listAllReplicators();
if (regionServers == null || regionServers.isEmpty()) {
return sb.toString();
}
for (ServerName regionServer : regionServers) {
List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(regionServer);

if (!liveRegionServers.contains(regionServer)) {
deadRegionServers.add(regionServer.getServerName());
}
for (ReplicationQueueId queueId : queueIds) {
List<String> tmpWals = new ArrayList<>();
// wals
AbstractFSWALProvider
.getWALFiles(connection.getConfiguration(), queueId.getServerWALsBelongTo()).stream()
.map(Path::toString).forEach(tmpWals::add);

// old wals
AbstractFSWALProvider.getArchivedWALFiles(connection.getConfiguration(),
queueId.getServerWALsBelongTo(), URLEncoder
.encode(queueId.getServerWALsBelongTo().toString(), StandardCharsets.UTF_8.name()))
.stream().map(Path::toString).forEach(tmpWals::add);

Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
// filter out the wal files that should replicate
List<String> wals = new ArrayList<>();
for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) {
ReplicationGroupOffset offset = entry.getValue();
for (String wal : tmpWals) {
if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) {
wals.add(wal);
}
}
}
Collections.sort(wals, Comparator.comparingLong(AbstractFSWALProvider::getTimestamp));
if (!peerIds.contains(queueId.getPeerId())) {
deletedQueues.add(regionServer + "/" + queueId);
sb.append(formatQueue(regionServer, offsets, wals, queueId, true, hdfs));
} else {
sb.append(formatQueue(regionServer, offsets, wals, queueId, false, hdfs));
}
}
}
return sb.toString();
}

private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
boolean hdfs) throws Exception {
private String formatQueue(ServerName regionServer, Map<String, ReplicationGroupOffset> offsets,
List<String> wals, ReplicationQueueId queueId, boolean isDeleted, boolean hdfs)
throws Exception {
StringBuilder sb = new StringBuilder();

List<ServerName> deadServers;

sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
sb.append(" Queue znode: " + queueId + "\n");
sb.append(" PeerID: " + queueInfo.getPeerId() + "\n");
sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n");
deadServers = queueInfo.getDeadRegionServers();
if (deadServers.isEmpty()) {
sb.append(" No dead RegionServers found in this queue." + "\n");
sb.append("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n");
sb.append(" Queue id: " + queueId + "\n");
sb.append(" PeerID: " + queueId.getPeerId() + "\n");
sb.append(" Recovered: " + queueId.isRecovered() + "\n");
// In new version, we only record the first dead RegionServer in queueId.
if (queueId.getSourceServerName().isPresent()) {
sb.append(" Dead RegionServer: " + queueId.getSourceServerName().get() + "\n");
} else {
sb.append(" Dead RegionServers: " + deadServers + "\n");
sb.append(" No dead RegionServer found in this queue." + "\n");
}
sb.append(" Was deleted: " + isDeleted + "\n");
sb.append(" Number of WALs in replication queue: " + wals.size() + "\n");
peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());

for (String wal : wals) {
// long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
// sb.append(" Replication position for " + wal + ": "
// + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
peersQueueSize.addAndGet(queueId.getPeerId(), wals.size());

for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) {
String walGroup = entry.getKey();
ReplicationGroupOffset offset = entry.getValue();
for (String wal : wals) {
long position = 0;
if (offset.getWal().equals(wal)) {
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
position = offset.getOffset();
}
sb.append(
" Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal) + ": ");
if (position == 0) {
sb.append("0 (not started or nothing to replicate)");
} else if (position > 0) {
sb.append(position);
}
sb.append("\n");
}
}

if (hdfs) {
FileSystem fs = FileSystem.get(getConf());
sb.append(" Total size of WALs on HDFS for this queue: "
+ StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
+ StringUtils.humanSize(getTotalWALSize(fs, wals, regionServer)) + "\n");
}
return sb.toString();
}

/**
* return total size in bytes from a list of WALs
*/
private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server)
throws IOException {
private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server) {
long size = 0;
FileStatus fileStatus;

Expand All @@ -389,19 +474,4 @@ private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server
totalSizeOfWALs += size;
return size;
}

private static class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why);
if (LOG.isDebugEnabled()) {
LOG.debug(e.toString(), e);
}
}

@Override
public boolean isAborted() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,26 @@ public static List<Path> getArchivedWALFiles(Configuration conf, ServerName serv
return archivedWalFiles;
}

/**
* List all the wal files for a logPrefix.
*/
public static List<Path> getWALFiles(Configuration c, ServerName serverName) throws IOException {
Path walRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
FileSystem fs = walRoot.getFileSystem(c);
List<Path> walFiles = new ArrayList<>();
Path walDir = new Path(walRoot, serverName.toString());
try {
for (FileStatus status : fs.listStatus(walDir)) {
if (status.isFile()) {
walFiles.add(status.getPath());
}
}
} catch (FileNotFoundException e) {
LOG.info("WAL dir {} not exists", walDir);
}
return walFiles;
}

/**
* Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
* this method ignores the format of the logfile component. Current format: [base directory for
Expand Down
Loading