Skip to content

Commit

Permalink
re-use VRRunnable
Browse files Browse the repository at this point in the history
  • Loading branch information
Hernan Gelaf-Romer committed Jul 28, 2023
1 parent 4535d3c commit 2a46c13
Showing 1 changed file with 5 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
Expand All @@ -61,7 +60,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.mapreduce.InputSplit;
Expand Down Expand Up @@ -293,14 +291,15 @@ private void logFailRowAndIncreaseCounter(Context context, Counters counter, Res
return;
}

VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable,
reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose);

if (reCompareExecutor == null) {
syncLogFailRowAndIncreaseCounter(context, counter, rowKey);
runnable.run();
return;
}

VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable,
reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose);
reCompareExecutor.submit(runnable);
}

Expand Down Expand Up @@ -375,38 +374,6 @@ protected void cleanup(Context context) {
}
}
}

private void syncLogFailRowAndIncreaseCounter(Mapper.Context context, Counters counter,
byte[] row) {
int sleepMs = sleepMsBeforeReCompare;
int tries = 0;

while (++tries <= reCompareTries) {
context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).increment(1);
Threads.sleep(sleepMs);
try {
Result sourceResult = sourceTable.get(new Get(row));
Result replicatedResult = replicatedTable.get(new Get(row));
Result.compareResults(sourceResult, replicatedResult, false);
if (!sourceResult.isEmpty()) {
context.getCounter(Counters.GOODROWS).increment(1);
if (verbose) {
LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row)
+ delimiter);
}
}
return;
} catch (Exception e) {
context.getCounter(Counters.FAILED_RECOMPARE).increment(1);
LOG.error("recompare fail after sleep, rowkey=" + delimiter + Bytes.toStringBinary(row)
+ delimiter);
}
sleepMs = sleepMs * (2 ^ reCompareBackoffExponent);
}
context.getCounter(counter).increment(1);
context.getCounter(Counters.BADROWS).increment(1);
LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(row), delimiter);
}
}

private static Pair<ReplicationPeerConfig, Configuration>
Expand Down

0 comments on commit 2a46c13

Please sign in to comment.