Skip to content

Commit

Permalink
HBASE-28565 Make map reduce jobs accept connection uri when specifyin…
Browse files Browse the repository at this point in the history
…g peer cluster (#5972)

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
Apache9 authored Jun 12, 2024
1 parent 317ad3c commit 9bdee6d
Show file tree
Hide file tree
Showing 16 changed files with 742 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -63,6 +65,11 @@ public class CopyTable extends Configured implements Tool {
String startRow = null;
String stopRow = null;
String dstTableName = null;
URI peerUri = null;
/**
* @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #peerUri} instead.
*/
@Deprecated
String peerAddress = null;
String families = null;
boolean allCells = false;
Expand All @@ -89,7 +96,7 @@ private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
return newDir;
}

private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
private void initCopyTableMapperJob(Job job, Scan scan) throws IOException {
Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
if (readingSnapshot) {
TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
Expand Down Expand Up @@ -166,7 +173,7 @@ public Job createSubmittableJob(String[] args) throws IOException {
job.setNumReduceTasks(0);

if (bulkload) {
initCopyTableMapperReducerJob(job, scan);
initCopyTableMapperJob(job, scan);

// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
Expand All @@ -180,8 +187,15 @@ public Job createSubmittableJob(String[] args) throws IOException {
admin.getDescriptor((TableName.valueOf(dstTableName))));
}
} else {
initCopyTableMapperReducerJob(job, scan);
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress);
initCopyTableMapperJob(job, scan);
if (peerUri != null) {
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerUri);
} else if (peerAddress != null) {
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress);
} else {
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job);
}

}

return job;
Expand All @@ -195,7 +209,7 @@ private static void printUsage(final String errorMsg) {
System.err.println("ERROR: " + errorMsg);
}
System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] "
+ "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
+ "[--new.name=NEW] [--peer.uri=URI|--peer.adr=ADR] <tablename | snapshotName>");
System.err.println();
System.err.println("Options:");
System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
Expand All @@ -208,9 +222,12 @@ private static void printUsage(final String errorMsg) {
System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
System.err.println(" versions number of cell versions to copy");
System.err.println(" new.name new table's name");
System.err.println(" peer.uri The URI of the peer cluster");
System.err.println(" peer.adr Address of the peer cluster given in the format");
System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client"
+ ".port:zookeeper.znode.parent");
System.err.println(" Do not take effect if peer.uri is specified");
System.err.println(" Deprecated, please use peer.uri instead");
System.err.println(" families comma-separated list of families to copy");
System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
System.err.println(" To keep the same name, just give \"cfName\"");
Expand Down Expand Up @@ -247,144 +264,149 @@ private boolean doCommandLine(final String[] args) {
printUsage(null);
return false;
}
try {
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
if (cmd.equals("-h") || cmd.startsWith("--h")) {
printUsage(null);
return false;
}

final String startRowArgKey = "--startrow=";
if (cmd.startsWith(startRowArgKey)) {
startRow = cmd.substring(startRowArgKey.length());
continue;
}

final String stopRowArgKey = "--stoprow=";
if (cmd.startsWith(stopRowArgKey)) {
stopRow = cmd.substring(stopRowArgKey.length());
continue;
}

final String startTimeArgKey = "--starttime=";
if (cmd.startsWith(startTimeArgKey)) {
startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
continue;
}

final String endTimeArgKey = "--endtime=";
if (cmd.startsWith(endTimeArgKey)) {
endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
continue;
}

final String batchArgKey = "--batch=";
if (cmd.startsWith(batchArgKey)) {
batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
continue;
}

final String cacheRowArgKey = "--cacheRow=";
if (cmd.startsWith(cacheRowArgKey)) {
cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
continue;
}
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
if (cmd.equals("-h") || cmd.startsWith("--h")) {
printUsage(null);
return false;
}

final String versionsArgKey = "--versions=";
if (cmd.startsWith(versionsArgKey)) {
versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
continue;
}
final String startRowArgKey = "--startrow=";
if (cmd.startsWith(startRowArgKey)) {
startRow = cmd.substring(startRowArgKey.length());
continue;
}

final String newNameArgKey = "--new.name=";
if (cmd.startsWith(newNameArgKey)) {
dstTableName = cmd.substring(newNameArgKey.length());
continue;
}
final String stopRowArgKey = "--stoprow=";
if (cmd.startsWith(stopRowArgKey)) {
stopRow = cmd.substring(stopRowArgKey.length());
continue;
}

final String peerAdrArgKey = "--peer.adr=";
if (cmd.startsWith(peerAdrArgKey)) {
peerAddress = cmd.substring(peerAdrArgKey.length());
continue;
}
final String startTimeArgKey = "--starttime=";
if (cmd.startsWith(startTimeArgKey)) {
startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
continue;
}

final String familiesArgKey = "--families=";
if (cmd.startsWith(familiesArgKey)) {
families = cmd.substring(familiesArgKey.length());
continue;
}
final String endTimeArgKey = "--endtime=";
if (cmd.startsWith(endTimeArgKey)) {
endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
continue;
}

if (cmd.startsWith("--all.cells")) {
allCells = true;
continue;
}
final String batchArgKey = "--batch=";
if (cmd.startsWith(batchArgKey)) {
batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
continue;
}

if (cmd.startsWith("--bulkload")) {
bulkload = true;
continue;
}
final String cacheRowArgKey = "--cacheRow=";
if (cmd.startsWith(cacheRowArgKey)) {
cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
continue;
}

if (cmd.startsWith("--shuffle")) {
shuffle = true;
continue;
}
final String versionsArgKey = "--versions=";
if (cmd.startsWith(versionsArgKey)) {
versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
continue;
}

if (cmd.startsWith("--snapshot")) {
readingSnapshot = true;
continue;
}
final String newNameArgKey = "--new.name=";
if (cmd.startsWith(newNameArgKey)) {
dstTableName = cmd.substring(newNameArgKey.length());
continue;
}

if (i == args.length - 1) {
if (readingSnapshot) {
snapshot = cmd;
} else {
tableName = cmd;
}
} else {
printUsage("Invalid argument '" + cmd + "'");
final String peerUriArgKey = "--peer.uri=";
if (cmd.startsWith(peerUriArgKey)) {
try {
peerUri = new URI(cmd.substring(peerUriArgKey.length()));
} catch (URISyntaxException e) {
LOG.error("Malformed peer uri specified: {}", cmd, e);
return false;
}
continue;
}
if (dstTableName == null && peerAddress == null) {
printUsage("At least a new table name or a peer address must be specified");
return false;

final String peerAdrArgKey = "--peer.adr=";
if (cmd.startsWith(peerAdrArgKey)) {
peerAddress = cmd.substring(peerAdrArgKey.length());
continue;
}
if ((endTime != 0) && (startTime > endTime)) {
printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
return false;

final String familiesArgKey = "--families=";
if (cmd.startsWith(familiesArgKey)) {
families = cmd.substring(familiesArgKey.length());
continue;
}

if (bulkload && peerAddress != null) {
printUsage("Remote bulkload is not supported!");
return false;
if (cmd.startsWith("--all.cells")) {
allCells = true;
continue;
}

if (readingSnapshot && peerAddress != null) {
printUsage("Loading data from snapshot to remote peer cluster is not supported.");
return false;
if (cmd.startsWith("--bulkload")) {
bulkload = true;
continue;
}

if (readingSnapshot && dstTableName == null) {
printUsage("The --new.name=<table> for destination table should be "
+ "provided when copying data from snapshot .");
return false;
if (cmd.startsWith("--shuffle")) {
shuffle = true;
continue;
}

if (readingSnapshot && snapshot == null) {
printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
return false;
if (cmd.startsWith("--snapshot")) {
readingSnapshot = true;
continue;
}

// set dstTableName if necessary
if (dstTableName == null) {
dstTableName = tableName;
if (i == args.length - 1) {
if (readingSnapshot) {
snapshot = cmd;
} else {
tableName = cmd;
}
} else {
printUsage("Invalid argument '" + cmd + "'");
return false;
}
} catch (Exception e) {
LOG.error("Failed to parse commandLine arguments", e);
printUsage("Can't start because " + e.getMessage());
}
if (dstTableName == null && peerAddress == null) {
printUsage("At least a new table name or a peer address must be specified");
return false;
}
if ((endTime != 0) && (startTime > endTime)) {
printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
return false;
}

if (bulkload && (peerUri != null || peerAddress != null)) {
printUsage("Remote bulkload is not supported!");
return false;
}

if (readingSnapshot && (peerUri != null || peerAddress != null)) {
printUsage("Loading data from snapshot to remote peer cluster is not supported.");
return false;
}

if (readingSnapshot && dstTableName == null) {
printUsage("The --new.name=<table> for destination table should be "
+ "provided when copying data from snapshot .");
return false;
}

if (readingSnapshot && snapshot == null) {
printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
return false;
}

// set dstTableName if necessary
if (dstTableName == null) {
dstTableName = tableName;
}
return true;
}

Expand All @@ -401,7 +423,9 @@ public static void main(String[] args) throws Exception {
@Override
public int run(String[] args) throws Exception {
Job job = createSubmittableJob(args);
if (job == null) return 1;
if (job == null) {
return 1;
}
if (!job.waitForCompletion(true)) {
LOG.info("Map-reduce job failed!");
if (bulkload) {
Expand Down
Loading

0 comments on commit 9bdee6d

Please sign in to comment.