Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28565 Make map reduce jobs accept connection uri when specifyin… #5972

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -63,6 +65,11 @@ public class CopyTable extends Configured implements Tool {
String startRow = null;
String stopRow = null;
String dstTableName = null;
URI peerUri = null;
/**
* @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #peerUri} instead.
*/
@Deprecated
String peerAddress = null;
String families = null;
boolean allCells = false;
Expand All @@ -89,7 +96,7 @@ private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
return newDir;
}

private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
private void initCopyTableMapperJob(Job job, Scan scan) throws IOException {
Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
if (readingSnapshot) {
TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
Expand Down Expand Up @@ -166,7 +173,7 @@ public Job createSubmittableJob(String[] args) throws IOException {
job.setNumReduceTasks(0);

if (bulkload) {
initCopyTableMapperReducerJob(job, scan);
initCopyTableMapperJob(job, scan);

// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
Expand All @@ -180,8 +187,15 @@ public Job createSubmittableJob(String[] args) throws IOException {
admin.getDescriptor((TableName.valueOf(dstTableName))));
}
} else {
initCopyTableMapperReducerJob(job, scan);
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress);
initCopyTableMapperJob(job, scan);
if (peerUri != null) {
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerUri);
} else if (peerAddress != null) {
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress);
} else {
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job);
}

}

return job;
Expand All @@ -195,7 +209,7 @@ private static void printUsage(final String errorMsg) {
System.err.println("ERROR: " + errorMsg);
}
System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] "
+ "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
+ "[--new.name=NEW] [--peer.uri=URI|--peer.adr=ADR] <tablename | snapshotName>");
System.err.println();
System.err.println("Options:");
System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
Expand All @@ -208,9 +222,12 @@ private static void printUsage(final String errorMsg) {
System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
System.err.println(" versions number of cell versions to copy");
System.err.println(" new.name new table's name");
System.err.println(" peer.uri The URI of the peer cluster");
System.err.println(" peer.adr Address of the peer cluster given in the format");
System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client"
+ ".port:zookeeper.znode.parent");
System.err.println(" Do not take effect if peer.uri is specified");
System.err.println(" Deprecated, please use peer.uri instead");
System.err.println(" families comma-separated list of families to copy");
System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
System.err.println(" To keep the same name, just give \"cfName\"");
Expand Down Expand Up @@ -247,144 +264,149 @@ private boolean doCommandLine(final String[] args) {
printUsage(null);
return false;
}
try {
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
if (cmd.equals("-h") || cmd.startsWith("--h")) {
printUsage(null);
return false;
}

final String startRowArgKey = "--startrow=";
if (cmd.startsWith(startRowArgKey)) {
startRow = cmd.substring(startRowArgKey.length());
continue;
}

final String stopRowArgKey = "--stoprow=";
if (cmd.startsWith(stopRowArgKey)) {
stopRow = cmd.substring(stopRowArgKey.length());
continue;
}

final String startTimeArgKey = "--starttime=";
if (cmd.startsWith(startTimeArgKey)) {
startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
continue;
}

final String endTimeArgKey = "--endtime=";
if (cmd.startsWith(endTimeArgKey)) {
endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
continue;
}

final String batchArgKey = "--batch=";
if (cmd.startsWith(batchArgKey)) {
batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
continue;
}

final String cacheRowArgKey = "--cacheRow=";
if (cmd.startsWith(cacheRowArgKey)) {
cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
continue;
}
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
if (cmd.equals("-h") || cmd.startsWith("--h")) {
printUsage(null);
return false;
}

final String versionsArgKey = "--versions=";
if (cmd.startsWith(versionsArgKey)) {
versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
continue;
}
final String startRowArgKey = "--startrow=";
if (cmd.startsWith(startRowArgKey)) {
startRow = cmd.substring(startRowArgKey.length());
continue;
}

final String newNameArgKey = "--new.name=";
if (cmd.startsWith(newNameArgKey)) {
dstTableName = cmd.substring(newNameArgKey.length());
continue;
}
final String stopRowArgKey = "--stoprow=";
if (cmd.startsWith(stopRowArgKey)) {
stopRow = cmd.substring(stopRowArgKey.length());
continue;
}

final String peerAdrArgKey = "--peer.adr=";
if (cmd.startsWith(peerAdrArgKey)) {
peerAddress = cmd.substring(peerAdrArgKey.length());
continue;
}
final String startTimeArgKey = "--starttime=";
if (cmd.startsWith(startTimeArgKey)) {
startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
continue;
}

final String familiesArgKey = "--families=";
if (cmd.startsWith(familiesArgKey)) {
families = cmd.substring(familiesArgKey.length());
continue;
}
final String endTimeArgKey = "--endtime=";
if (cmd.startsWith(endTimeArgKey)) {
endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
continue;
}

if (cmd.startsWith("--all.cells")) {
allCells = true;
continue;
}
final String batchArgKey = "--batch=";
if (cmd.startsWith(batchArgKey)) {
batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
continue;
}

if (cmd.startsWith("--bulkload")) {
bulkload = true;
continue;
}
final String cacheRowArgKey = "--cacheRow=";
if (cmd.startsWith(cacheRowArgKey)) {
cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
continue;
}

if (cmd.startsWith("--shuffle")) {
shuffle = true;
continue;
}
final String versionsArgKey = "--versions=";
if (cmd.startsWith(versionsArgKey)) {
versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
continue;
}

if (cmd.startsWith("--snapshot")) {
readingSnapshot = true;
continue;
}
final String newNameArgKey = "--new.name=";
if (cmd.startsWith(newNameArgKey)) {
dstTableName = cmd.substring(newNameArgKey.length());
continue;
}

if (i == args.length - 1) {
if (readingSnapshot) {
snapshot = cmd;
} else {
tableName = cmd;
}
} else {
printUsage("Invalid argument '" + cmd + "'");
final String peerUriArgKey = "--peer.uri=";
if (cmd.startsWith(peerUriArgKey)) {
try {
peerUri = new URI(cmd.substring(peerUriArgKey.length()));
} catch (URISyntaxException e) {
LOG.error("Malformed peer uri specified: {}", cmd, e);
return false;
}
continue;
}
if (dstTableName == null && peerAddress == null) {
printUsage("At least a new table name or a peer address must be specified");
return false;

final String peerAdrArgKey = "--peer.adr=";
if (cmd.startsWith(peerAdrArgKey)) {
peerAddress = cmd.substring(peerAdrArgKey.length());
continue;
}
if ((endTime != 0) && (startTime > endTime)) {
printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
return false;

final String familiesArgKey = "--families=";
if (cmd.startsWith(familiesArgKey)) {
families = cmd.substring(familiesArgKey.length());
continue;
}

if (bulkload && peerAddress != null) {
printUsage("Remote bulkload is not supported!");
return false;
if (cmd.startsWith("--all.cells")) {
allCells = true;
continue;
}

if (readingSnapshot && peerAddress != null) {
printUsage("Loading data from snapshot to remote peer cluster is not supported.");
return false;
if (cmd.startsWith("--bulkload")) {
bulkload = true;
continue;
}

if (readingSnapshot && dstTableName == null) {
printUsage("The --new.name=<table> for destination table should be "
+ "provided when copying data from snapshot .");
return false;
if (cmd.startsWith("--shuffle")) {
shuffle = true;
continue;
}

if (readingSnapshot && snapshot == null) {
printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
return false;
if (cmd.startsWith("--snapshot")) {
readingSnapshot = true;
continue;
}

// set dstTableName if necessary
if (dstTableName == null) {
dstTableName = tableName;
if (i == args.length - 1) {
if (readingSnapshot) {
snapshot = cmd;
} else {
tableName = cmd;
}
} else {
printUsage("Invalid argument '" + cmd + "'");
return false;
}
} catch (Exception e) {
LOG.error("Failed to parse commandLine arguments", e);
printUsage("Can't start because " + e.getMessage());
}
if (dstTableName == null && peerAddress == null) {
printUsage("At least a new table name or a peer address must be specified");
return false;
}
if ((endTime != 0) && (startTime > endTime)) {
printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
return false;
}

if (bulkload && (peerUri != null || peerAddress != null)) {
printUsage("Remote bulkload is not supported!");
return false;
}

if (readingSnapshot && (peerUri != null || peerAddress != null)) {
printUsage("Loading data from snapshot to remote peer cluster is not supported.");
return false;
}

if (readingSnapshot && dstTableName == null) {
printUsage("The --new.name=<table> for destination table should be "
+ "provided when copying data from snapshot .");
return false;
}

if (readingSnapshot && snapshot == null) {
printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
return false;
}

// set dstTableName if necessary
if (dstTableName == null) {
dstTableName = tableName;
}
return true;
}

Expand All @@ -401,7 +423,9 @@ public static void main(String[] args) throws Exception {
@Override
public int run(String[] args) throws Exception {
Job job = createSubmittableJob(args);
if (job == null) return 1;
if (job == null) {
return 1;
}
if (!job.waitForCompletion(true)) {
LOG.info("Map-reduce job failed!");
if (bulkload) {
Expand Down
Loading