From 2a46c130bd21495dda388bfd8256a0f7c86c35e1 Mon Sep 17 00:00:00 2001 From: Hernan Gelaf-Romer Date: Fri, 28 Jul 2023 15:46:35 -0400 Subject: [PATCH] re-use VRRunnable --- .../replication/VerifyReplication.java | 43 +++---------------- 1 file changed, 5 insertions(+), 38 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 3d6c4c266466..179d595f4447 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -61,7 +60,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.mapreduce.InputSplit; @@ -293,14 +291,15 @@ private void logFailRowAndIncreaseCounter(Context context, Counters counter, Res return; } + VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, + row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable, + reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose); + if (reCompareExecutor == null) { - syncLogFailRowAndIncreaseCounter(context, counter, rowKey); + runnable.run(); return; } - VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, - row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable, - reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose); reCompareExecutor.submit(runnable); } @@ -375,38 +374,6 @@ protected void cleanup(Context context) { } } } - - private void syncLogFailRowAndIncreaseCounter(Mapper.Context context, Counters counter, - byte[] row) { - int sleepMs = sleepMsBeforeReCompare; - int tries = 0; - - while (++tries <= reCompareTries) { - context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).increment(1); - Threads.sleep(sleepMs); - try { - Result sourceResult = sourceTable.get(new Get(row)); - Result replicatedResult = replicatedTable.get(new Get(row)); - Result.compareResults(sourceResult, replicatedResult, false); - if (!sourceResult.isEmpty()) { - context.getCounter(Counters.GOODROWS).increment(1); - if (verbose) { - LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row) - + delimiter); - } - } - return; - } catch (Exception e) { - context.getCounter(Counters.FAILED_RECOMPARE).increment(1); - LOG.error("recompare fail after sleep, rowkey=" + delimiter + Bytes.toStringBinary(row) - + delimiter); - } - sleepMs = sleepMs * (2 ^ reCompareBackoffExponent); - } - context.getCounter(counter).increment(1); - context.getCounter(Counters.BADROWS).increment(1); - LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(row), delimiter); - } } private static Pair