-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-28292 Make Delay prefetch property to be dynamically configured #5605
Conversation
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
Show resolved
Hide resolved
@@ -20,6 +20,7 @@ | |||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; | |||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; | |||
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY; | |||
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use wildcard imports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected
// change PREFETCH_DELAY_ENABLE_KEY from false to true | ||
conf.setInt(PREFETCH_DELAY, 40000); | ||
PrefetchExecutor.loadConfiguration(conf); | ||
assertTrue(getPrefetchDelay() == 40000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use assertEquals
instead for equation checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected
public void testOnConfigurationChange() { | ||
// change PREFETCH_DELAY_ENABLE_KEY from false to true | ||
conf.setInt(PREFETCH_DELAY, 40000); | ||
PrefetchExecutor.loadConfiguration(conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you use onConfigurationChange
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected
@@ -298,6 +299,44 @@ public void testPrefetchDoesntSkipRefs() throws Exception { | |||
}); | |||
} | |||
|
|||
@Test | |||
public void testOnConfigurationChange() { | |||
// change PREFETCH_DELAY_ENABLE_KEY from false to true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
Show resolved
Hide resolved
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
Show resolved
Hide resolved
import org.apache.hadoop.hbase.trace.TraceUtil; | ||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; | ||
import org.apache.yetus.audience.InterfaceAudience; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
@InterfaceAudience.Private | ||
public final class PrefetchExecutor { | ||
public final class PrefetchExecutor implements PropagatingConfigurationObserver { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are we registering this observer to get notified by ConfigurationManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added new class 'PrefetchExecutorNotifier' which implements PropagatingConfigurationObserver. It's instance can be registerd as an observer and in turn will call the static functions of PrefetchExecutor class.
} | ||
|
||
@Override | ||
public void onConfigurationChange(Configuration conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please add info logging that config has changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
// For tests. Contains computed prefetch delay | ||
private static long computedPrefetchDelay; | ||
|
||
public static void request(Path path, boolean isInterrupted, Runnable runnable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the additional parameter? Just remove the future from the collection in the "interrupt" method.
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
Show resolved
Hide resolved
@@ -84,6 +87,7 @@ | |||
@Category({ IOTests.class, MediumTests.class }) | |||
public class TestPrefetch { | |||
private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class); | |||
protected PrefetchExecutorNotifier prefetchExecutorNotifier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this global? It should be created within the context of each individual test, so better make it local on each test method.
@@ -106,10 +115,19 @@ public class TestPrefetch { | |||
@Before | |||
public void setUp() throws IOException { | |||
conf = TEST_UTIL.getConfiguration(); | |||
long var = conf.getInt(PREFETCH_DELAY, 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: useless line, please remove it.
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); | ||
fs = HFileSystem.get(conf); | ||
blockCache = BlockCacheFactory.createBlockCache(conf); | ||
cacheConf = new CacheConfig(conf, blockCache); | ||
prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); | ||
resetTiming(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this irrelevant for non delay related test? If so, remove it from here and put at the end of the related tests only.
readStoreFile(storeFile); | ||
assertTrue("Elapsed Time {} | Computed Prefetch Delay {}" | ||
+ getElapsedTime() + prefetchExecutorNotifier.getComputedPrefetchDelay(), | ||
getElapsedTime() >= prefetchExecutorNotifier.getComputedPrefetchDelay() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this test is correct, as you call your endTimer function only after prefetch is guaranteed to be completed, it may yield true even if the start delay wasn't respected, but the prefetch execution time took longer to complete.
What I would like to assert here is that right after creating a reader, even if we wait for 5 or 10 secs, no prefetch is running as we set the delay to 60 secs.
public int getPrefetchDelay() { | ||
return PrefetchExecutor.getPrefetchDelay(); | ||
} | ||
public long getComputedPrefetchDelay() { return PrefetchExecutor.getComputedPrefetchDelay();} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PrefetchExecutor.getComputedPrefetchDelay()
is commented as test only purpose, so we should remove this call here and call it directly in the test.
public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay"; | ||
|
||
|
||
private final HRegionServer server; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unused variable.
/* Visible for testing only */ | ||
@RestrictedApi(explanation = "Should only be called in tests", link = "", | ||
allowedOnPath = ".*/src/test/.*") | ||
public static long getComputedPrefetchDelay() {return computedPrefetchDelay;} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also explain this is a non thread safe variable that would not yield accurate values under real use case scenarios, where we have multiple readers getting created and calling PrefetchExecutor.request
concurrently.
@@ -78,7 +85,10 @@ public Thread newThread(Runnable r) { | |||
+ Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR | |||
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")"); | |||
|
|||
public static void request(Path path, Runnable runnable) { | |||
// For tests. Contains computed prefetch delay | |||
private static long computedPrefetchDelay; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to calculate and check this on the tests? I guess the configured delay is a guaranteed minimum. so we could just check 1) before that minimum no prefetch is running, 2) we check that once prefetch has completed it took at least the minimum time (specified in the delay).
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
Show resolved
Hide resolved
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
Show resolved
Hide resolved
prefetchFutures.forEach((k, v) -> { | ||
// Do not cancel the task which is about to complete | ||
ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k); | ||
if (sf.getDelay(TimeUnit.MILLISECONDS) > prefetchDelayMillis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should test if it's > 0, no? That means the thread is still pending delay expiration and has not started to run yet, so can be re-scheduled at no cost.
public static void loadConfiguration(Configuration conf) { | ||
prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000); | ||
prefetchFutures.forEach((k, v) -> { | ||
// Do not cancel the task which is about to complete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not accurate. We are actually not cancelling running threads, but we have no idea if it's close to completion or not.
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have some concerns wrt UT.
Please also address spotless issues.
private long startTime; | ||
private boolean measureTiming; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't add global variables that are only used by one specific test logic.
startTimer(); | ||
|
||
while (!reader.prefetchComplete()) { | ||
// Sleep for a bit | ||
Thread.sleep(1000); | ||
if (getComputeTiming()) { | ||
// After task is scheduled and before the delay expires, prefetch should not start | ||
// if prefetchFutures contains entry (which means it's not cancelled or completed) | ||
// and wait time remaining is below delay expiry watermark, it can be deduced that | ||
// the prefetch is not started yet. | ||
if (getElapsedTime() >= (conf.getLong(PREFETCH_DELAY, 1000))) { | ||
assertTrue("Prefetch should be started at this point", reader.prefetchStarted()); | ||
setComputeTiming(false); | ||
} else { | ||
assertFalse("Prefetch Should not start at this point", reader.prefetchStarted()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This version of readStoreFile enforces the prefetch execution and completion. This is relevant to some of the tests in this class, but definitely not for yours.
You should rather just move the reader creation line to your test method, then put your validation logic there. That would yield the following benefits:
- Avoid the need for the additional global variables that are only of concern of a single test method (anti pattern);
- Make your test easier to be understood by other readers;
- Make your test more cohesive, and quicker, as it doesn't need to wait for the whole prefetch completion.
- Keep existing version of readStoreFile more cohesive, as it doesn't implement additional variations of behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In prefetch executor, we further compute passed in delay using variation and a random
multiplier to get 'effective delay'. Hence, in the test, for delay of 25000 milli-secs
checking that prefetch is started after 20000 milli-sec and prefetch started after that.
However, prefetch should not start after configured delay.
In previous change set, I had introduced new class variable i.e. computedDealy which could
have helped to get the exact value however, later removed due to previous comments.
Tried to disable the PREFETCH_DELAY_VARIATION in the tests but looks like it's not possible.
resetTiming(); | ||
setComputeTiming(true); | ||
|
||
readStoreFile(storeFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need the whole logic of readStoreFile here. Please see my comment above.
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
} | ||
|
||
// Prefech threads started working but not completed yet | ||
assertFalse(reader.prefetchComplete()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we testing this? IMO, it's irrelevant for the feature being implemented here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assert is added to cover cases where prefetch is completed very fast. If this check fails then it would give clear idea during investigation. Also, this assertion will ensure the subsequent assert is tested after prefetch started and before it completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are actually introducing flakeyness here, which we don't want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will remove this assert.
// In prefetch executor, we further compute passed in delay using variation and a random | ||
// multiplier to get 'effective delay'. Hence, in the test, for delay of 25000 milli-secs | ||
// check that prefetch is started after 20000 milli-sec and prefetch started after that. | ||
// However, prefetch should not start after configured delay. | ||
if (reader.prefetchStarted()) { | ||
LOG.info("elapsed time {}, Delay {}", getElapsedTime(startTime), | ||
PrefetchExecutor.getPrefetchDelay()); | ||
assertTrue("Prefetch should start post configured delay", | ||
getElapsedTime(startTime) <= PrefetchExecutor.getPrefetchDelay()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really confusing. Why prefetch is supposed to start before the passed delay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In PrefetchExecutor class, passed in dealy is modified due to further computation.
if (prefetchDelayMillis > 0) {
delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2)))
+ (prefetchDelayMillis * (prefetchDelayVariation / 2)
* ThreadLocalRandom.current().nextFloat()));
Prefetch delay variation and a random seed is used for this computation. This reduces the "effective delay" compared to passed in delay. The same is passed to schedule().
(Due to this, in one of the earlier patch sets, I had introduced new class variable to be able to compare against "effective delay")
Hence, in the test,
- set the delay to 25000
- ensurre that prefetch is not started till 20000
- In the window where prefetch is started and it's not complete, test that elapsed time is less than passed in prefetch delay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then just set the hbase.hfile.prefetch.delay.variation to 0 on your test. This way, you will have a deterministic delay of 25s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have already tried that. If the hbase.hfile.prefetch.delay.variation is set to 0 in the test, it does not take effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per the formula below, if you set it to 0, it should pick exactly the value of hbase.hfile.prefetch.delay
. So either your test is not setting the config properly, or the dynamic config logic isn't working properly.
delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2))) + (prefetchDelayMillis * (prefetchDelayVariation / 2) * ThreadLocalRandom.current().nextFloat()));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not also make it dynamic? Couldn't you just also set it in the PrefetchExecutor.loadConfiguration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For our initial requirement, making prefech delay alone dynamic is sufficient.
prefetchDelayVariation can be updated in the loadConfiguration but it will be useful for testing only, atleast for now. Trying it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, this is needed in certain cases where the user is expecting the exact delay before prefetch starts. With the delay variation, this delay is not fixed anymore which means the prefetch can trigger earlier or later depending on the calculated variance. Hence, IMO, this is not only a test only change, but it has its own merit. My 2 cents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree but this property is not externalised to the user, probably can be provided as a workaround in such cases. As mentioned above, I am trying out the change.
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, +1.
|
||
/** Futures for tracking block prefetch activity */ | ||
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>(); | ||
public static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be private.
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>(); | ||
public static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>(); | ||
/** Runnables for resetting the prefetch activity */ | ||
public static final Map<Path, Runnable> prefetchRunnable = new ConcurrentSkipListMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be private.
/** Executor pool shared among all HFiles for block prefetch */ | ||
private static final ScheduledExecutorService prefetchExecutorPool; | ||
public static final ScheduledExecutorService prefetchExecutorPool; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be private.
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
Any other concerns @petersomogyi , or would you think this is good to go now? |
Looks good to me! |
…apache#5605) Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: Peter Somogyi <psomogyi@apache.org>
Make the prefetch delay configurable. The prefetch delay is associated to hbase.hfile.prefetch.delay configuration. There are some cases where configuring hbase.hfile.prefetch.delay would help in achieving better throughput.