-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-26874 VerifyReplication recompare async #5051
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
Show resolved
Hide resolved
if (verbose) { | ||
LOG.info("Good row key (with recompare): " + delimiter | ||
+ Bytes.toStringBinary(row.getRow()) + delimiter); | ||
LOG.info("Got an exception during recompare for rowkey=" + Bytes.toStringBinary(row), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have slf4j here, so can use {}
placeholders in all of these new LOG calls
...mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
Outdated
Show resolved
Hide resolved
return new CallerRunsPolicy() { | ||
@Override | ||
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { | ||
LOG.info("Re-comparison execution rejected. Running in main thread."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might want to make this debug. In general best to avoid info logs in M/R jobs because they can unexpected slow down jobs if they start spamming
+ Bytes.toStringBinary(row.getRow()) + delimiter); | ||
if (failCounter != null) { | ||
context.getCounter(failCounter).increment(1); | ||
LOG.debug(failCounter + " for rowkey=" + Bytes.toStringBinary(row)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap this in LOG.isDebugEnabled, since converting bytes to StringBinary takes cpu
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
// we use a static retry/backoff schedule for this part because | ||
// we have an opportunity rerun the fetching again when we retry the | ||
// higher level recompare as well. | ||
private static final int FETCH_LATEST_ROWS_TRIES = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taking another look at this. In terms of upstreaming here, i feel like we might want to simplify this. I don't love having something unconfigurable like this, since it's just begging for someone to need to push another update later to make it configurable. I also think it's a little harder for some new user to reason about a design which makes use of two retries like we have(a high level and lower level retry).
Rather than have 2 retries, can we make do with 1? That would require some modifications to the code below. It would also probably mean configuring our internal wrapper job to have higher number of retries than we have today.
So basically I think we'd want to have the 1 loop in RecompareRunnable and maybe change fetchLatestRows
to return a boolean which causes us to have retry again in the main loop.
Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense, I can implement these changes 👍
LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter); | ||
} | ||
|
||
private class RecompareRunnable implements Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other thing I'm thinking is whether we can split this out to an upper level class, i.e. its own class named VerifyReplicationRecompareRunnable. I just think this job is complicated and long, so adding an entire inner class here is a non-trivial addition to the complexity. It'd also make it possible to write unit tests just against the recompare process, without having to worry about somehow running the full MR job and trying to cause inconsistencies.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
Looking into those issues ^ |
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
^ looking into these failures |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! I had a few super minor things, and then 1 change to how we setup the default state. We should be good to go after these
...mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
Show resolved
Hide resolved
@@ -150,6 +167,8 @@ public void map(ImmutableBytesWritable row, final Result value, Context context) | |||
throws IOException { | |||
if (replicatedScanner == null) { | |||
Configuration conf = context.getConfiguration(); | |||
reCompareTries = conf.getInt(NAME + ".recompareTries", 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually im realizing we might want to make this dynamic --
- from the perspective of backwards compatibility, by default there were no retries. so a default value of 0 makes sense.
- previously, the way to specify retries was to set sleepMsBeforeReCompare and it would always just do 1 retry
so i think we should default this to 0, and then below add:
if (sleepMsBeforeReCompare > 0) {
reCompareTries = Math.max(reCompareTries, 1);
}
meaning, we've now aligned the these values to make sense together. If someone explicitly configures more retries, they'll get them. And otherwise they will get 1 retry if they specified sleepMsBeforeReCompare or 0 if not, like it used to work.
Then, way below where you currently decide whether to create a runnable or not, you should base it on reCompareTries == 0
rather than sleepMsBeforeReCompare == 0
.
I think this makes a little more intuitive sense, since what it's basing the logic on whats actually happening -- whether there are retries are not, rather than what the backoff is for those retries. The other benefit is if someone for some reason wanted to try 3 times with zero backoff, currently they can't do that but in the new setup they would.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I think those changes make sense!
} | ||
} | ||
} catch (InterruptedException e) { | ||
LOG.error("fail to await executor termination in cleanup", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we need to at least increment BADROWS/FAILED_RECOMPARE here, or otherwise throw an exception. I think throwing an exception probably makes sense, since we have no idea what the state of pending recompares was
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opted for throwing b/c as you mentioned we don't really have any idea what's going on within the executor at the time
if (cmd.startsWith(sleepToReCompareKey)) { | ||
sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); | ||
sleepMsBeforeReCompare = | ||
Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you meant to substring sleepToReCompareKey here. I realize it's the same length, but just to avoid errors/confusion in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah you're totally right, this was my bad
hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
Show resolved
Hide resolved
10f1677
to
b43626d
Compare
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row, | ||
Result replicatedRow) { | ||
byte[] rowKey = getRow(row, replicatedRow); | ||
if (sleepMsBeforeReCompare == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this to recompareTries == 0
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
Hey team. This commit hit the repo without the Jira ID in the commit title. Please revert and recommit with a corrected title. Thanks! |
Ugh!!! I don't know how that happened, I usually edit the commit message before merging to ensure. Will fix |
This reverts commit f6c5dbe.
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This reverts commit 2b3d322.
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
No description provided.