diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index c215243ed783..a302271bdd86 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -19,12 +19,15 @@ package org.apache.hadoop.hbase.snapshot; import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; @@ -110,6 +113,8 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; + private static final String CONF_TARGET_ZK = "snapshot.export.target.zk"; + private static final String CONF_TARGET_RSGROUP = "snapshot.export.target.rsgroup"; private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; @@ -152,6 +157,10 @@ static final class Options { "Number of mappers to use during the copy (mapreduce.job.maps)."); static final Option BANDWIDTH = new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); + static final Option TARGET_ZK = new Option(null, "targetZK", true, + "Target hbase zookeeper string of format - zk1,zk2,zk3:port:/znode."); + static final Option TARGET_RSGROUP = new Option(null, "targetRSGroup", true, + "Rsgroup of the target cluster to supply as favored nodes"); } // Export Map-Reduce Counters, to keep track of the progress @@ -172,7 +181,7 @@ private static class ExportMapper extends Mapper rsgroupServers = new ArrayList<>(); + private static Testing testing = new Testing(); @Override @@ -196,6 +209,8 @@ public void setup(Context context) throws IOException { filesMode = (short)conf.getInt(CONF_FILES_MODE, 0); outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); + targetZK = conf.get(CONF_TARGET_ZK); + targetRSGroup = conf.get(CONF_TARGET_RSGROUP); inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); @@ -209,7 +224,7 @@ public void setup(Context context) throws IOException { try { destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); - outputFs = FileSystem.get(outputRoot.toUri(), destConf); + outputFs = new HFileSystem(FileSystem.get(outputRoot.toUri(), destConf)); } catch (IOException e) { throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e); } @@ -228,6 +243,41 @@ public void setup(Context context) throws IOException { // task. testing.injectedFailureCount = context.getTaskAttemptID().getId(); } + + + if (targetZK != null && targetRSGroup != null) { + Configuration targetConf = HBaseConfiguration.createClusterConf(new Configuration(), targetZK); + ZKWatcher watcher = new ZKWatcher(targetConf, NAME, null); + String rsGroupBasedPath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RSGroupZNodePaths.rsGroupZNode); + try { + byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(rsGroupBasedPath, targetRSGroup)); + + if (data.length > 0) { + ProtobufUtil.expectPBMagicPrefix(data); + ByteArrayInputStream bis = new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); + RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(bis); + RSGroupInfo RSGroupInfo = new RSGroupInfo(proto.getName()); + for (HBaseProtos.ServerName el : proto.getServersList()) { + RSGroupInfo.addServer(Address.fromParts(el.getHostName(), el.getPort())); + } + rsgroupServers = new ArrayList<>(RSGroupInfo.getServers()); + } + } catch (KeeperException e) { + throw new IOException( + "Failed to setup zookeeper watcher for target zk cluster " + " and target rsgroup + " + targetRSGroup + + targetZK + ", with exception " + e.getMessage(), e); + } catch (InterruptedException e) { + throw new IOException( + "Failed to setup zookeeper watcher for target zk cluster " + " and target rsgroup + " + targetRSGroup + + targetZK + ", with exception " + e.getMessage(), e); + } catch (DeserializationException e) { + throw new IOException( + "Failed to deseralize rsgroup information from zookeeper for target zk cluster " + targetZK + + " and target rsgroup + " + targetRSGroup + ", with exception " + e.getMessage(), e); + } + } else { + LOG.warn("No targetRSGroup or targetZK passed, exporting without any rsgroup awareness to destination"); + } } @Override @@ -309,10 +359,21 @@ private void copyFile(final Context context, final SnapshotFileInfo inputInfo, try { context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); - // Ensure that the output folder is there and copy the file createOutputPath(outputPath.getParent()); - FSDataOutputStream out = outputFs.create(outputPath, true); + + Collections.shuffle(rsgroupServers); + int size = rsgroupServers.size(), index = 0; + InetSocketAddress[] favoredNodes = new InetSocketAddress[size > 2 ? 3 : size]; + for (Address address : this.rsgroupServers) { + if (index == 3 ) { + break; + } + favoredNodes[index++] = new InetSocketAddress(address.getHostname(), address.getPort()); + } + + LOG.info("FavoredNodes selected are " + Arrays.asList(favoredNodes)); + FSDataOutputStream out = FSUtils.create(outputFs.getConf(), outputFs, outputPath, null, favoredNodes); try { copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); } finally { @@ -801,7 +862,7 @@ public boolean nextKeyValue() { private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, final Path snapshotDir, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, - final int mappers, final int bandwidthMB) + final int mappers, final int bandwidthMB, final String targetZK, final String targetRSGroup) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); @@ -817,6 +878,9 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot, conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); conf.set(CONF_SNAPSHOT_NAME, snapshotName); conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); + conf.set(CONF_TARGET_ZK, targetZK); + conf.set(CONF_TARGET_RSGROUP, targetRSGroup); + String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); Job job = new Job(conf); @@ -913,6 +977,8 @@ private void setPermissionParallel(final FileSystem outputFs, final short filesM private int bandwidthMB = Integer.MAX_VALUE; private int filesMode = 0; private int mappers = 0; + private String targetZK = null; + private String targetRSGroup = null; @Override protected void processOptions(CommandLine cmd) { @@ -924,6 +990,12 @@ protected void processOptions(CommandLine cmd) { if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); } + if (cmd.hasOption(Options.TARGET_ZK.getLongOpt())) { + targetZK = cmd.getOptionValue(Options.TARGET_ZK.getLongOpt()); + } + if (cmd.hasOption(Options.TARGET_RSGROUP.getLongOpt())) { + targetRSGroup = cmd.getOptionValue(Options.TARGET_RSGROUP.getLongOpt()); + } mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); @@ -1080,7 +1152,7 @@ public int doWork() throws IOException { // by the HFileArchiver, since they have no references. try { runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, - filesUser, filesGroup, filesMode, mappers, bandwidthMB); + filesUser, filesGroup, filesMode, mappers, bandwidthMB, targetZK, targetRSGroup); LOG.info("Finalize the Snapshot Export"); if (!skipTmp) { @@ -1139,6 +1211,8 @@ protected void printUsage() { addOption(Options.CHMOD); addOption(Options.MAPPERS); addOption(Options.BANDWIDTH); + addOption(Options.TARGET_ZK); + addOption(Options.TARGET_RSGROUP); } public static void main(String[] args) {