Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24157 Destination RSgroup aware export snapshot #1941

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.hadoop.hbase.snapshot;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
Expand All @@ -46,17 +49,27 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FileLink;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.WALLink;
import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupProtos;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
Expand All @@ -73,6 +86,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,6 +114,7 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {

private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);

private static final String RS_GROUP_ZNODE = "rsgroup";
private static final String MR_NUM_MAPS = "mapreduce.job.maps";
private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
Expand All @@ -110,6 +125,8 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
private static final String CONF_TARGET_ZK = "snapshot.export.target.zk";
private static final String CONF_TARGET_RSGROUP = "snapshot.export.target.rsgroup";
private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
Expand Down Expand Up @@ -152,6 +169,10 @@ static final class Options {
"Number of mappers to use during the copy (mapreduce.job.maps).");
static final Option BANDWIDTH = new Option(null, "bandwidth", true,
"Limit bandwidth to this value in MB/second.");
static final Option TARGET_ZK = new Option(null, "targetZK", true,
"Target hbase zookeeper string of format - zk1,zk2,zk3:port:/znode.");
static final Option TARGET_RSGROUP = new Option(null, "targetRSGroup", true,
"Rsgroup of the target cluster to supply as favored nodes");
}

// Export Map-Reduce Counters, to keep track of the progress
Expand All @@ -172,14 +193,18 @@ private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
private short filesMode;
private int bufferSize;

private FileSystem outputFs;
private HFileSystem outputFs;
private Path outputArchive;
private Path outputRoot;

private FileSystem inputFs;
private Path inputArchive;
private Path inputRoot;

private String targetZK;
private String targetRSGroup;
private List<Address> rsgroupServers = new ArrayList<>();

private static Testing testing = new Testing();

@Override
Expand All @@ -196,6 +221,8 @@ public void setup(Context context) throws IOException {
filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
targetZK = conf.get(CONF_TARGET_ZK);
targetRSGroup = conf.get(CONF_TARGET_RSGROUP);

inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
Expand All @@ -209,7 +236,7 @@ public void setup(Context context) throws IOException {

try {
destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
outputFs = FileSystem.get(outputRoot.toUri(), destConf);
outputFs = new HFileSystem(FileSystem.get(outputRoot.toUri(), destConf));
} catch (IOException e) {
throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
}
Expand All @@ -228,6 +255,41 @@ public void setup(Context context) throws IOException {
// task.
testing.injectedFailureCount = context.getTaskAttemptID().getId();
}


if (targetZK != null && targetRSGroup != null) {
Configuration targetConf = HBaseConfiguration.createClusterConf(new Configuration(), targetZK);
ZKWatcher watcher = new ZKWatcher(targetConf, NAME, null);
String rsGroupBasedPath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE);
try {
byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(rsGroupBasedPath, targetRSGroup));

if (data.length > 0) {
ProtobufUtil.expectPBMagicPrefix(data);
ByteArrayInputStream bis = new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(bis);
RSGroupInfo RSGroupInfo = new RSGroupInfo(proto.getName());
for (HBaseProtos.ServerName el : proto.getServersList()) {
RSGroupInfo.addServer(Address.fromParts(el.getHostName(), el.getPort()));
}
rsgroupServers = new ArrayList<>(RSGroupInfo.getServers());
}
} catch (KeeperException e) {
throw new IOException(
"Failed to setup zookeeper watcher for target zk cluster " + " and target rsgroup + " + targetRSGroup
+ targetZK + ", with exception " + e.getMessage(), e);
} catch (InterruptedException e) {
throw new IOException(
"Failed to setup zookeeper watcher for target zk cluster " + " and target rsgroup + " + targetRSGroup
+ targetZK + ", with exception " + e.getMessage(), e);
} catch (DeserializationException e) {
throw new IOException(
"Failed to deseralize rsgroup information from zookeeper for target zk cluster " + targetZK
+ " and target rsgroup + " + targetRSGroup + ", with exception " + e.getMessage(), e);
}
} else {
LOG.warn("No targetRSGroup or targetZK passed, exporting without any rsgroup awareness to destination");
}
}

@Override
Expand Down Expand Up @@ -309,10 +371,21 @@ private void copyFile(final Context context, final SnapshotFileInfo inputInfo,

try {
context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());

// Ensure that the output folder is there and copy the file
createOutputPath(outputPath.getParent());
FSDataOutputStream out = outputFs.create(outputPath, true);

Collections.shuffle(rsgroupServers);
int size = rsgroupServers.size(), index = 0;
InetSocketAddress[] favoredNodes = new InetSocketAddress[size > 2 ? 3 : size];
for (Address address : this.rsgroupServers) {
if (index == 3 ) {
break;
}
favoredNodes[index++] = new InetSocketAddress(address.getHostname(), address.getPort());
}

LOG.info("FavoredNodes selected are " + Arrays.asList(favoredNodes));
FSDataOutputStream out = FSUtils.create(outputFs.getConf(), outputFs, outputPath, null, favoredNodes);
try {
copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
} finally {
Expand Down Expand Up @@ -801,7 +874,7 @@ public boolean nextKeyValue() {
private void runCopyJob(final Path inputRoot, final Path outputRoot,
final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
final String filesUser, final String filesGroup, final int filesMode,
final int mappers, final int bandwidthMB)
final int mappers, final int bandwidthMB, final String targetZK, final String targetRSGroup)
throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = getConf();
if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
Expand All @@ -817,6 +890,13 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot,
conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
conf.set(CONF_SNAPSHOT_NAME, snapshotName);
conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
if (targetZK != null) {
conf.set(CONF_TARGET_ZK, targetZK);
}
if (targetRSGroup != null) {
conf.set(CONF_TARGET_RSGROUP, targetRSGroup);
}


String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
Job job = new Job(conf);
Expand Down Expand Up @@ -913,6 +993,8 @@ private void setPermissionParallel(final FileSystem outputFs, final short filesM
private int bandwidthMB = Integer.MAX_VALUE;
private int filesMode = 0;
private int mappers = 0;
private String targetZK = null;
private String targetRSGroup = null;

@Override
protected void processOptions(CommandLine cmd) {
Expand All @@ -924,6 +1006,12 @@ protected void processOptions(CommandLine cmd) {
if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
}
if (cmd.hasOption(Options.TARGET_ZK.getLongOpt())) {
targetZK = cmd.getOptionValue(Options.TARGET_ZK.getLongOpt());
}
if (cmd.hasOption(Options.TARGET_RSGROUP.getLongOpt())) {
targetRSGroup = cmd.getOptionValue(Options.TARGET_RSGROUP.getLongOpt());
}
mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
Expand Down Expand Up @@ -1080,7 +1168,7 @@ public int doWork() throws IOException {
// by the HFileArchiver, since they have no references.
try {
runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
filesUser, filesGroup, filesMode, mappers, bandwidthMB);
filesUser, filesGroup, filesMode, mappers, bandwidthMB, targetZK, targetRSGroup);

LOG.info("Finalize the Snapshot Export");
if (!skipTmp) {
Expand Down Expand Up @@ -1139,6 +1227,8 @@ protected void printUsage() {
addOption(Options.CHMOD);
addOption(Options.MAPPERS);
addOption(Options.BANDWIDTH);
addOption(Options.TARGET_ZK);
addOption(Options.TARGET_RSGROUP);
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -62,8 +61,7 @@
/**
* Test Export Snapshot Tool
*/
@Ignore // HBASE-24493
@Category({VerySlowMapReduceTests.class, LargeTests.class})
@Category({MapReduceTests.class, LargeTests.class})
public class TestExportSnapshot {

@ClassRule
Expand Down Expand Up @@ -97,12 +95,10 @@ public static void setUpBaseConf(Configuration conf) {
public static void setUpBeforeClass() throws Exception {
setUpBaseConf(TEST_UTIL.getConfiguration());
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.startMiniMapReduceCluster();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniMapReduceCluster();
TEST_UTIL.shutdownMiniCluster();
}

Expand Down Expand Up @@ -325,7 +321,7 @@ private static Set<String> listFiles(final FileSystem fs, final Path root, final
return files;
}

private Path getHdfsDestinationDir() {
protected Path getHdfsDestinationDir() {
Path rootDir = TEST_UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
Path path = new Path(new Path(rootDir, "export-test"), "export-" + System.currentTimeMillis());
LOG.info("HDFS export destination path: " + path);
Expand Down
Loading