Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26874 VerifyReplication recompare async #5051

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -30,7 +35,6 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
Expand All @@ -46,6 +50,7 @@
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
Expand All @@ -55,12 +60,12 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
Expand All @@ -84,6 +89,11 @@ public class VerifyReplication extends Configured implements Tool {

public final static String NAME = "verifyrep";
private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
private static ThreadPoolExecutor reCompareExecutor = null;
int reCompareTries = 0;
int reCompareBackoffExponent = 0;
int reCompareThreads = 0;
int sleepMsBeforeReCompare = 0;
long startTime = 0;
long endTime = Long.MAX_VALUE;
int batch = -1;
Expand All @@ -94,7 +104,6 @@ public class VerifyReplication extends Configured implements Tool {
String peerId = null;
String peerQuorumAddress = null;
String rowPrefixes = null;
int sleepMsBeforeReCompare = 0;
boolean verbose = false;
boolean includeDeletedCells = false;
// Source table snapshot name
Expand Down Expand Up @@ -124,7 +133,12 @@ public enum Counters {
BADROWS,
ONLY_IN_SOURCE_TABLE_ROWS,
ONLY_IN_PEER_TABLE_ROWS,
CONTENT_DIFFERENT_ROWS
CONTENT_DIFFERENT_ROWS,
RECOMPARES,
MAIN_THREAD_RECOMPARES,
SOURCE_ROW_CHANGED,
PEER_ROW_CHANGED,
FAILED_RECOMPARE
bbeaudreault marked this conversation as resolved.
Show resolved Hide resolved
}

private Connection sourceConnection;
Expand All @@ -133,6 +147,9 @@ public enum Counters {
private Table replicatedTable;
private ResultScanner replicatedScanner;
private Result currentCompareRowInPeerTable;
private Scan tableScan;
private int reCompareTries;
private int reCompareBackoffExponent;
private int sleepMsBeforeReCompare;
private String delimiter = "";
private boolean verbose = false;
Expand All @@ -150,7 +167,12 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
throws IOException {
if (replicatedScanner == null) {
Configuration conf = context.getConfiguration();
reCompareTries = conf.getInt(NAME + ".recompareTries", 0);
reCompareBackoffExponent = conf.getInt(NAME + ".recompareBackoffExponent", 1);
sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0);
if (sleepMsBeforeReCompare > 0) {
reCompareTries = Math.max(reCompareTries, 1);
}
delimiter = conf.get(NAME + ".delimiter", "");
verbose = conf.getBoolean(NAME + ".verbose", false);
batch = conf.getInt(NAME + ".batch", -1);
Expand Down Expand Up @@ -179,9 +201,12 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
if (versions >= 0) {
scan.readVersions(versions);
}
int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0);
reCompareExecutor = buildReCompareExecutor(reCompareThreads, context);
bbeaudreault marked this conversation as resolved.
Show resolved Hide resolved
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
sourceConnection = ConnectionFactory.createConnection(conf);
sourceTable = sourceConnection.getTable(tableName);
tableScan = scan;

final InputSplit tableSplit = context.getInputSplit();

Expand Down Expand Up @@ -226,7 +251,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
while (true) {
if (currentCompareRowInPeerTable == null) {
// reach the region end of peer table, row only in source table
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
break;
}
int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
Expand All @@ -240,55 +265,77 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
"Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
}
} catch (Exception e) {
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value,
currentCompareRowInPeerTable);
}
currentCompareRowInPeerTable = replicatedScanner.next();
break;
} else if (rowCmpRet < 0) {
// row only exists in source table
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
break;
} else {
// row only exists in peer table
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
currentCompareRowInPeerTable);
currentCompareRowInPeerTable = replicatedScanner.next();
}
}
}

private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
if (sleepMsBeforeReCompare > 0) {
Threads.sleep(sleepMsBeforeReCompare);
try {
Result sourceResult = sourceTable.get(new Get(row.getRow()));
Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
Result.compareResults(sourceResult, replicatedResult, false);
if (!sourceResult.isEmpty()) {
context.getCounter(Counters.GOODROWS).increment(1);
if (verbose) {
LOG.info("Good row key (with recompare): " + delimiter
+ Bytes.toStringBinary(row.getRow()) + delimiter);
}
}
return;
} catch (Exception e) {
LOG.error("recompare fail after sleep, rowkey=" + delimiter
+ Bytes.toStringBinary(row.getRow()) + delimiter);
}
@SuppressWarnings("FutureReturnValueIgnored")
bbeaudreault marked this conversation as resolved.
Show resolved Hide resolved
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row,
Result replicatedRow) {
byte[] rowKey = getRow(row, replicatedRow);
if (reCompareTries == 0) {
context.getCounter(counter).increment(1);
context.getCounter(Counters.BADROWS).increment(1);
LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter);
return;
}

VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable,
reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose);

if (reCompareExecutor == null) {
runnable.run();
return;
}
context.getCounter(counter).increment(1);
context.getCounter(Counters.BADROWS).increment(1);
LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow())
+ delimiter);

reCompareExecutor.submit(runnable);
}

@Override
protected void cleanup(Context context) {
if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) {
reCompareExecutor.shutdown();
try {
boolean terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES);
if (!terminated) {
List<Runnable> queue = reCompareExecutor.shutdownNow();
for (Runnable runnable : queue) {
((VerifyReplicationRecompareRunnable) runnable).fail();
}

terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES);

if (!terminated) {
int activeCount = Math.max(1, reCompareExecutor.getActiveCount());
LOG.warn("Found {} possible recompares still running in the executable"
+ " incrementing BADROWS and FAILED_RECOMPARE", activeCount);
context.getCounter(Counters.BADROWS).increment(activeCount);
context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount);
}
}
} catch (InterruptedException e) {
throw new RuntimeException("Failed to await executor termination in cleanup", e);
}
}
if (replicatedScanner != null) {
try {
while (currentCompareRowInPeerTable != null) {
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
currentCompareRowInPeerTable);
currentCompareRowInPeerTable = replicatedScanner.next();
}
Expand Down Expand Up @@ -424,6 +471,10 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
conf.setInt(NAME + ".versions", versions);
LOG.info("Number of version: " + versions);

conf.setInt(NAME + ".recompareTries", reCompareTries);
conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent);
conf.setInt(NAME + ".recompareThreads", reCompareThreads);

// Set Snapshot specific parameters
if (peerSnapshotName != null) {
conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
Expand Down Expand Up @@ -491,6 +542,15 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
return job;
}

protected static byte[] getRow(Result sourceResult, Result replicatedResult) {
if (sourceResult != null) {
return sourceResult.getRow();
} else if (replicatedResult != null) {
return replicatedResult.getRow();
}
throw new RuntimeException("Both sourceResult and replicatedResult are null!");
}

private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
String[] rowPrefixArray = rowPrefixes.split(",");
Expand Down Expand Up @@ -575,11 +635,20 @@ public boolean doCommandLine(final String[] args) {
continue;
}

final String sleepToReCompareKey = "--recomparesleep=";
final String deprecatedSleepToReCompareKey = "--recomparesleep=";
final String sleepToReCompareKey = "--recompareSleep=";
if (cmd.startsWith(deprecatedSleepToReCompareKey)) {
LOG.warn("--recomparesleep is deprecated and will be removed in 4.0.0."
+ " Use --recompareSleep instead.");
sleepMsBeforeReCompare =
Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length()));
continue;
}
if (cmd.startsWith(sleepToReCompareKey)) {
sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
continue;
}

final String verboseKey = "--verbose";
if (cmd.startsWith(verboseKey)) {
verbose = true;
Expand Down Expand Up @@ -628,6 +697,25 @@ public boolean doCommandLine(final String[] args) {
continue;
}

final String reCompareThreadArgs = "--recompareThreads=";
if (cmd.startsWith(reCompareThreadArgs)) {
reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length()));
continue;
}

final String reCompareTriesKey = "--recompareTries=";
if (cmd.startsWith(reCompareTriesKey)) {
reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length()));
continue;
}

final String reCompareBackoffExponentKey = "--recompareBackoffExponent=";
if (cmd.startsWith(reCompareBackoffExponentKey)) {
reCompareBackoffExponent =
Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length()));
continue;
}

if (cmd.startsWith("--")) {
printUsage("Invalid argument '" + cmd + "'");
return false;
Expand Down Expand Up @@ -704,7 +792,8 @@ private static void printUsage(final String errorMsg) {
System.err.println("ERROR: " + errorMsg);
}
System.err.println("Usage: verifyrep [--starttime=X]"
+ " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] "
+ " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recompareSleep=] "
+ "[--recompareThreads=] [--recompareTries=] [--recompareBackoffExponent=]"
+ "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
+ "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] "
+ "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
Expand All @@ -720,8 +809,14 @@ private static void printUsage(final String errorMsg) {
System.err.println(" families comma-separated list of families to copy");
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
System.err.println(" delimiter the delimiter used in display around rowkey");
System.err.println(" recomparesleep milliseconds to sleep before recompare row, "
System.err.println(" recompareSleep milliseconds to sleep before recompare row, "
+ "default value is 0 which disables the recompare.");
System.err.println(" recompareThreads number of threads to run recompares in");
System.err.println(" recompareTries number of recompare attempts before incrementing "
+ "the BADROWS counter. Defaults to 1 recompare");
System.out.println(" recompareBackoffExponent exponential multiplier to increase "
+ "recompareSleep after each recompare attempt, "
+ "default value is 0 which results in a constant sleep time");
System.err.println(" verbose logs row keys of good rows");
System.err.println(" peerTableName Peer Table Name");
System.err.println(" sourceSnapshotName Source Snapshot Name");
Expand Down Expand Up @@ -788,6 +883,27 @@ private static void printUsage(final String errorMsg) {
+ "2181:/cluster-b \\\n" + " TestTable");
}

private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) {
if (maxThreads == 0) {
return null;
}

return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(),
buildRejectedReComparePolicy(context));
}

private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) {
return new CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
LOG.debug("Re-comparison execution rejected. Running in main thread.");
context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1);
// will run in the current thread
super.rejectedExecution(runnable, e);
}
};
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Expand Down
Loading