Skip to content

Commit

Permalink
async VerifyReplication recompares
Browse files Browse the repository at this point in the history
  • Loading branch information
Hernan Gelaf-Romer committed Jul 31, 2023
1 parent 0bbc8d1 commit 1ef20e9
Show file tree
Hide file tree
Showing 4 changed files with 591 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -30,7 +35,6 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
Expand All @@ -46,6 +50,7 @@
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
Expand All @@ -55,12 +60,12 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
Expand All @@ -84,6 +89,11 @@ public class VerifyReplication extends Configured implements Tool {

public final static String NAME = "verifyrep";
private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
private static ThreadPoolExecutor reCompareExecutor = null;
int reCompareTries = 0;
int reCompareBackoffExponent = 0;
int reCompareThreads = 0;
int sleepMsBeforeReCompare = 0;
long startTime = 0;
long endTime = Long.MAX_VALUE;
int batch = -1;
Expand All @@ -94,7 +104,6 @@ public class VerifyReplication extends Configured implements Tool {
String peerId = null;
String peerQuorumAddress = null;
String rowPrefixes = null;
int sleepMsBeforeReCompare = 0;
boolean verbose = false;
boolean includeDeletedCells = false;
// Source table snapshot name
Expand Down Expand Up @@ -124,7 +133,12 @@ public enum Counters {
BADROWS,
ONLY_IN_SOURCE_TABLE_ROWS,
ONLY_IN_PEER_TABLE_ROWS,
CONTENT_DIFFERENT_ROWS
CONTENT_DIFFERENT_ROWS,
RECOMPARES,
MAIN_THREAD_RECOMPARES,
SOURCE_ROW_CHANGED,
PEER_ROW_CHANGED,
FAILED_RECOMPARE
}

private Connection sourceConnection;
Expand All @@ -133,6 +147,9 @@ public enum Counters {
private Table replicatedTable;
private ResultScanner replicatedScanner;
private Result currentCompareRowInPeerTable;
private Scan tableScan;
private int reCompareTries;
private int reCompareBackoffExponent;
private int sleepMsBeforeReCompare;
private String delimiter = "";
private boolean verbose = false;
Expand All @@ -150,7 +167,12 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
throws IOException {
if (replicatedScanner == null) {
Configuration conf = context.getConfiguration();
reCompareTries = conf.getInt(NAME + ".recompareTries", 0);
reCompareBackoffExponent = conf.getInt(NAME + ".recompareBackoffExponent", 1);
sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0);
if (sleepMsBeforeReCompare > 0) {
reCompareTries = Math.max(reCompareTries, 1);
}
delimiter = conf.get(NAME + ".delimiter", "");
verbose = conf.getBoolean(NAME + ".verbose", false);
batch = conf.getInt(NAME + ".batch", -1);
Expand Down Expand Up @@ -179,9 +201,12 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
if (versions >= 0) {
scan.readVersions(versions);
}
int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0);
reCompareExecutor = buildReCompareExecutor(reCompareThreads, context);
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
sourceConnection = ConnectionFactory.createConnection(conf);
sourceTable = sourceConnection.getTable(tableName);
tableScan = scan;

final InputSplit tableSplit = context.getInputSplit();

Expand Down Expand Up @@ -226,7 +251,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
while (true) {
if (currentCompareRowInPeerTable == null) {
// reach the region end of peer table, row only in source table
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
break;
}
int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
Expand All @@ -240,55 +265,77 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
"Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
}
} catch (Exception e) {
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value,
currentCompareRowInPeerTable);
}
currentCompareRowInPeerTable = replicatedScanner.next();
break;
} else if (rowCmpRet < 0) {
// row only exists in source table
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
break;
} else {
// row only exists in peer table
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
currentCompareRowInPeerTable);
currentCompareRowInPeerTable = replicatedScanner.next();
}
}
}

private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
if (sleepMsBeforeReCompare > 0) {
Threads.sleep(sleepMsBeforeReCompare);
try {
Result sourceResult = sourceTable.get(new Get(row.getRow()));
Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
Result.compareResults(sourceResult, replicatedResult, false);
if (!sourceResult.isEmpty()) {
context.getCounter(Counters.GOODROWS).increment(1);
if (verbose) {
LOG.info("Good row key (with recompare): " + delimiter
+ Bytes.toStringBinary(row.getRow()) + delimiter);
}
}
return;
} catch (Exception e) {
LOG.error("recompare fail after sleep, rowkey=" + delimiter
+ Bytes.toStringBinary(row.getRow()) + delimiter);
}
@SuppressWarnings("FutureReturnValueIgnored")
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row,
Result replicatedRow) {
byte[] rowKey = getRow(row, replicatedRow);
if (reCompareTries == 0) {
context.getCounter(counter).increment(1);
context.getCounter(Counters.BADROWS).increment(1);
LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter);
return;
}

VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable,
reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose);

if (reCompareExecutor == null) {
runnable.run();
return;
}
context.getCounter(counter).increment(1);
context.getCounter(Counters.BADROWS).increment(1);
LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow())
+ delimiter);

reCompareExecutor.submit(runnable);
}

@Override
protected void cleanup(Context context) {
if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) {
reCompareExecutor.shutdown();
try {
boolean terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES);
if (!terminated) {
List<Runnable> queue = reCompareExecutor.shutdownNow();
for (Runnable runnable : queue) {
((VerifyReplicationRecompareRunnable) runnable).fail();
}

terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES);

if (!terminated) {
int activeCount = Math.max(1, reCompareExecutor.getActiveCount());
LOG.warn("Found {} possible recompares still running in the executable"
+ " incrementing BADROWS and FAILED_RECOMPARE", activeCount);
context.getCounter(Counters.BADROWS).increment(activeCount);
context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount);
}
}
} catch (InterruptedException e) {
throw new RuntimeException("Failed to await executor termination in cleanup", e);
}
}
if (replicatedScanner != null) {
try {
while (currentCompareRowInPeerTable != null) {
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
currentCompareRowInPeerTable);
currentCompareRowInPeerTable = replicatedScanner.next();
}
Expand Down Expand Up @@ -424,6 +471,10 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
conf.setInt(NAME + ".versions", versions);
LOG.info("Number of version: " + versions);

conf.setInt(NAME + ".recompareTries", reCompareTries);
conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent);
conf.setInt(NAME + ".recompareThreads", reCompareThreads);

// Set Snapshot specific parameters
if (peerSnapshotName != null) {
conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
Expand Down Expand Up @@ -491,6 +542,15 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
return job;
}

protected static byte[] getRow(Result sourceResult, Result replicatedResult) {
if (sourceResult != null) {
return sourceResult.getRow();
} else if (replicatedResult != null) {
return replicatedResult.getRow();
}
throw new RuntimeException("Both sourceResult and replicatedResult are null!");
}

private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
String[] rowPrefixArray = rowPrefixes.split(",");
Expand Down Expand Up @@ -575,11 +635,20 @@ public boolean doCommandLine(final String[] args) {
continue;
}

final String sleepToReCompareKey = "--recomparesleep=";
final String deprecatedSleepToReCompareKey = "--recomparesleep=";
final String sleepToReCompareKey = "--recompareSleep=";
if (cmd.startsWith(deprecatedSleepToReCompareKey)) {
LOG.warn("--recomparesleep is deprecated and will be removed in 4.0.0."
+ " Use --recompareSleep instead.");
sleepMsBeforeReCompare =
Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length()));
continue;
}
if (cmd.startsWith(sleepToReCompareKey)) {
sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
continue;
}

final String verboseKey = "--verbose";
if (cmd.startsWith(verboseKey)) {
verbose = true;
Expand Down Expand Up @@ -628,6 +697,25 @@ public boolean doCommandLine(final String[] args) {
continue;
}

final String reCompareThreadArgs = "--recompareThreads=";
if (cmd.startsWith(reCompareThreadArgs)) {
reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length()));
continue;
}

final String reCompareTriesKey = "--recompareTries=";
if (cmd.startsWith(reCompareTriesKey)) {
reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length()));
continue;
}

final String reCompareBackoffExponentKey = "--recompareBackoffExponent=";
if (cmd.startsWith(reCompareBackoffExponentKey)) {
reCompareBackoffExponent =
Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length()));
continue;
}

if (cmd.startsWith("--")) {
printUsage("Invalid argument '" + cmd + "'");
return false;
Expand Down Expand Up @@ -704,7 +792,8 @@ private static void printUsage(final String errorMsg) {
System.err.println("ERROR: " + errorMsg);
}
System.err.println("Usage: verifyrep [--starttime=X]"
+ " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] "
+ " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recompareSleep=] "
+ "[--recompareThreads=] [--recompareTries=] [--recompareBackoffExponent=]"
+ "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
+ "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] "
+ "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
Expand All @@ -720,8 +809,14 @@ private static void printUsage(final String errorMsg) {
System.err.println(" families comma-separated list of families to copy");
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
System.err.println(" delimiter the delimiter used in display around rowkey");
System.err.println(" recomparesleep milliseconds to sleep before recompare row, "
System.err.println(" recompareSleep milliseconds to sleep before recompare row, "
+ "default value is 0 which disables the recompare.");
System.err.println(" recompareThreads number of threads to run recompares in");
System.err.println(" recompareTries number of recompare attempts before incrementing "
+ "the BADROWS counter. Defaults to 1 recompare");
System.out.println(" recompareBackoffExponent exponential multiplier to increase "
+ "recompareSleep after each recompare attempt, "
+ "default value is 0 which results in a constant sleep time");
System.err.println(" verbose logs row keys of good rows");
System.err.println(" peerTableName Peer Table Name");
System.err.println(" sourceSnapshotName Source Snapshot Name");
Expand Down Expand Up @@ -788,6 +883,27 @@ private static void printUsage(final String errorMsg) {
+ "2181:/cluster-b \\\n" + " TestTable");
}

private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) {
if (maxThreads == 0) {
return null;
}

return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(),
buildRejectedReComparePolicy(context));
}

private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) {
return new CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
LOG.debug("Re-comparison execution rejected. Running in main thread.");
context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1);
// will run in the current thread
super.rejectedExecution(runnable, e);
}
};
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Expand Down
Loading

0 comments on commit 1ef20e9

Please sign in to comment.