-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SlowConsumingPressureMonitor #2873
Conversation
Discussed with @jason-bragg offline, decided to take a simpler but less absolute approach. will shelve this commit. But still update the new approach to the same PR. |
65587d8
to
93e2deb
Compare
switched to a simpler approach, added one programmatic option to configure EventHubQueueCache with the new monitor. another option can be through EventHubStreamProviderSettings |
|
||
private IEventHubQueueCache CreateQueueCache(string partition, IStreamQueueCheckpointer<string> checkpointer, Logger log) | ||
{ | ||
var bufferPool = new FixedSizeObjectPool<FixedSizeBuffer>(adapterSettings.CacheSizeMb, () => new FixedSizeBuffer(1 << 20)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make it blatantly obvious, maybe have a const for this value called Megabyte
or BlockSize
with a comment saying it's 1MB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. will do it !
@@ -162,7 +228,7 @@ internal class AveragingCachePressureMonitor | |||
public AveragingCachePressureMonitor(double flowControlThreshold, Logger logger) | |||
{ | |||
this.flowControlThreshold = flowControlThreshold; | |||
this.logger = logger.GetSubLogger("flowcontrol", "-"); | |||
this.logger = logger.GetSubLogger("flowcontrol-averaging-cache-pressure", "-"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
As we're moving more towards an asp.net logging model, I suggest using the typename here rather than custom name.
Also, delilimiter argument is optional, and "-" is default, so it should not be needed.
so:
this.logger = logger.GetSubLogger(GetType().Name);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
bool IsUnderPressure(DateTime utcNow); | ||
} | ||
|
||
internal class AggregatedCachePressureMonitor : List<ICachePressureMonitor>, ICachePressureMonitor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest AggregatedCachePressureMonitor be public and placed in it's own file.
|
||
namespace ServiceBus.Tests.TestStreamProviders | ||
{ | ||
public class EHStreamProviderWithSlowConsumingPressureDetecting : PersistentStreamProvider<EHStreamProviderWithSlowConsumingPressureDetecting.AdapterFactory> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest test run against default stream provider, configured to use SlowConsumingPressureMonitor, rather than custom adapter. That is the preferred user scenario, so the one we should ensure works and is maintained going forward.
@@ -140,7 +140,20 @@ public virtual void Init(IProviderConfiguration providerCfg, string providerName | |||
{ | |||
var bufferPool = new FixedSizeObjectPool<FixedSizeBuffer>(adapterSettings.CacheSizeMb, () => new FixedSizeBuffer(1 << 20)); | |||
var timePurge = new TimePurgePredicate(adapterSettings.DataMinTimeInCache, adapterSettings.DataMaxAgeInCache); | |||
CacheFactory = (partition,checkpointer,cacheLogger) => new EventHubQueueCache(checkpointer, bufferPool, timePurge, cacheLogger, this.SerializationManager); | |||
if (adapterSettings.SlowConsumingMonitorThreshold > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We tend to use nullable values for optional settings. Suggest SlowConsumingMonitorThreshold be nullable rather than using it's value to detect if it's set or not.
if (adapterSettings.SlowConsumingMonitorThreshold > 0) | ||
{ | ||
CacheFactory = (partition, checkpointer, cacheLogger) => | ||
{ var cache = new EventHubQueueCache(checkpointer, bufferPool, timePurge, cacheLogger, this.SerializationManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest minimizing code in conditional statement. Only thing that depend on the condition is if the SlowConsumingPressureMonitor is added. so
var cache = new .....
if(..)
{
add monitor
}
return cache;
@@ -148,7 +159,62 @@ public bool TryGetNextMessage(object cursorObj, out IBatchContainer message) | |||
|
|||
} | |||
|
|||
internal class AveragingCachePressureMonitor | |||
public class SlowConsumingPressureMonitor : ICachePressureMonitor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest AveragingCachePressureMonitor be broke into it's own file, and added in the default cache creation logic, like other ICachePressureMonitors.
all good suggestions. will address them |
don't feel pressure that you need to finish reviewing in one pass. I'm totally fine if you finish the review in less than 4 pass. ;) jk |
06db2fa
to
68d77e0
Compare
} | ||
|
||
[Fact, TestCategory("EventHub"), TestCategory("Streaming"), TestCategory("Functional")] | ||
public async Task EHSlowConsuming_ShouldFavorSlowConsumer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jdom I think I need to skip this test if azure connection string is not set up correctly in Jenkins?
I got this when running in Jenkins:
MESSAGE:
System.AggregateException : One or more errors occurred.\r\n---- System.TypeInitializationException : The type initializer for 'ServiceBus.Tests.SlowConsumingTests.EHSlowConsumingTests' threw an exception.\r\n-------- System.ArgumentNullException : Value cannot be null.\r\nParameter name: dataConnectionString\r\n---- The following constructor parameters did not have matching fixture data: Fixture fixture
+++++++++++++++++++
STACK TRACE:
----- Inner Stack Trace #1 (System.TypeInitializationException) ----- at ServiceBus.Tests.SlowConsumingTests.EHSlowConsumingTests.Fixture.CreateTestCluster() in D:\j\workspace\netfx-functio---224c30ba\test\ServiceBus.Tests\SlowConsumingTests\EHSlowConsumingTests.cs:line 57 at TestExtensions.BaseTestClusterFixture..ctor() in D:\j\workspace\netfx-functio---224c30ba\test\TestExtensions\BaseClusterFixture.cs:line 30 ----- Inner Stack Trace ----- at Orleans.ServiceBus.Providers.EventHubCheckpointerSettings..ctor(String dataConnectionString, String table, String checkpointNamespace, Nullable`1 persistInterval) in D:\j\workspace\netfx-functio---224c30ba\src\OrleansServiceBus\Providers\Streams\EventHub\ICheckpointerSettings.cs:line 65 at ServiceBus.Tests.SlowConsumingTests.EHSlowConsumingTests..cctor() in D:\j\workspace\netfx-functio---224c30ba\test\ServiceBus.Tests\SlowConsumingTests\EHSlowConsumingTests.cs:line 45 ----- Inner Stack Trace #2 (Xunit.Sdk.TestClassException) -----
This test pass when I have connection string set up locally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's likely the Eventhub connection string. We don't run eh tests in Jenkins yet (to my knowledge).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, although we should start running EH tests in functionals in VSO, but skip the tests when the connection string is not set. I added some info on how to do it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I agree . We should set up the EH related config in VSO and start to run EH tests.
Although this test do run successfully in VSO. so I think some EH related config is set up in VSO. But I'm not sure how much of the EH related config is set up in VSO. where can I check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, configuration is already set up in VSO, it's just that currently none of the EH tests are marked as Functional, so they just don't run because of that. I'd start adding these tests using the Skippable approach, and then we can look at adding more of the other EH tests using the same approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do. I can add skippable to this test. A lot of EH tests run successfully locally. I think we can star add them to VSO and skip them in Jenkins.
{ | ||
bool isUnderPressure = false; | ||
|
||
this.ForEach(monitor => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Link statement would be much cleaner.
return this.Any(monitor => monitor.IsUnderPressure(utcNow);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
CacheFactory = (partition,checkpointer,cacheLogger) => new EventHubQueueCache(checkpointer, bufferPool, timePurge, cacheLogger, this.SerializationManager); | ||
CacheFactory = (partition, checkpointer, cacheLogger) => | ||
{ | ||
var cache = new EventHubQueueCache(checkpointer, bufferPool, timePurge, cacheLogger, this.SerializationManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We generally try to keep lambdas small, when they get large, we tend to put them into their own call. This may be better as a CreateCacheFactory(..) function.
namespace ServiceBus.Tests.SlowConsumingTests | ||
{ | ||
[Collection(TestEnvironmentFixture.DefaultCollection)] | ||
public class EHSlowConsumingTests : OrleansTestingBase, IClassFixture<EHSlowConsumingTests.Fixture> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For targeted tests of the persistent stream provider infrastructure (kinda like this), we tend to use the generator stream provider, as it allows better control of the data flow as well as eliminates external dependencies like EventHub. We can test this using EH, I'm just suggesting that the stream generator may be better in the long run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is also testing if the monitor config can be set correctly through EventHubProviderSettings class, so I'd prefer use real EventHubStreamProvider
@xiazen this failure is a real one (not timing related, which we are fixing): https://ci.dot.net/job/dotnet_orleans/job/master/job/netfx-functional_prtest/10/testReport/junit/ServiceBus.Tests.SlowConsumingTests/EHSlowConsumingTests/EHSlowConsuming_ShouldFavorSlowConsumer/ |
/// <summary> | ||
/// Default flow control threshold | ||
/// </summary> | ||
public static readonly double DefaultThreashold = 1 / 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo in threshold
@@ -17,6 +17,37 @@ public class EventHubStreamProviderSettings | |||
public string StreamProviderName { get; } | |||
|
|||
/// <summary> | |||
/// SlowConsumingMonitorFlowControlThresholdName | |||
/// </summary> | |||
public const string SlowConsumingMonitorFlowControlThresholdName = "SlowConsumingMonitorFlowControlThreshold"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nameof(SlowConsumingMonitorFlowControlThreshold)
Avoid hardcoding strings in several places in this class. We were probably doing this before we upgraded to c# 6, but there's no reason to do more hardcoding now
/// </summary> | ||
/// <param name="checkPeriod"></param> | ||
/// <param name="logger"></param> | ||
public SlowConsumingPressureMonitor(TimeSpan checkPeriod, Logger logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkPeriod is a confusing name. It took me some time by reading and re-reading the code to understand what it does, because it's not the time period between checks.
Maybe it should be pressure window size? Or something else not related to check time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will think of a better name. PressureWindowPeriod? size is too general for me. let me think
{ | ||
[TestCategory("EventHub"), TestCategory("Streaming")] | ||
[Collection(TestEnvironmentFixture.DefaultCollection)] | ||
class EHSlowConsumingTests : OrleansTestingBase, IClassFixture<EHSlowConsumingTests.Fixture> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this class dead code? I don't see any tests here, and it's named the same as the other tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow I didn't notice this ! there's another EHSlowConsumingTests.cs on the top... That's the real one. I will delete this one. Thanks for noticing
|
||
private readonly Fixture fixture; | ||
|
||
public class Fixture : BaseTestClusterFixture |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should override CheckPreconditionsOrThrow and throw SkipException if the connection string is null.
Then in the test class' constructor you must call fixture.EnsurePreconditionsMet()
Also mark all of the tests with SkippableFact.
See usages of EnsurePreconditionsMet if you want some examples.
LGTM |
@jdom the Dispose() method was called. That's why I added that Boolean check. maybe there's a bug somewhere or I used OrleansTestBase in an incorrectly way? |
// If we changed state, log | ||
if (isUnderPressure != wasUnderPressure) | ||
{ | ||
logger.Info(isUnderPressure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the info level version of this log message should be in the aggregate pressure monitor, and the logging in the specific monitors should be verbose.
The reason for this is that a specific monitor may become healthy, but reading will still be off until all monitors are healthy. So having per monitor logging will make it very hard for users to decipher, from logs, if a queue should be reading or not. These details may be useful in verbose logging, as they would indicate what monitor is stopping reading, but this level of detail is less important, imo, than the overall flow control state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do . valid concern.
if (this.isUnderPressure != underPressure) | ||
{ | ||
this.isUnderPressure = underPressure; | ||
logger.Verbose(this.isUnderPressure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about logging as Info when under pressure, and as Verbose when not? Otherwise the aggregating monitor might not log the necessary information, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It logs only when there is a state change.
under pressure => not under pressure or not under pressure => under pressure.
In both cases, this is not necessarily authoritative of the behavior, so users shouldn't care about this under normal logging conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ouch. I submitted a commit to address this. but under a second thought, I think Jason might be right. So I reverted it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, on a state change makes more sense. But still it would be nice to know the details for when an individual monitor starts throttling. Maybe in the individual monitor, log as Info only when it transitions to under pressure when it wasn't (but not necessarily log as Info when it goes back to not under pressure, at that point the aggregated monitor logger can be enough).
this.wasUnderPressure = underPressure; | ||
} | ||
|
||
return this.wasUnderPressure; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this logic, monitor will stay underPressure for a whole window if a slow consumer appears, while biggestPressureInCurrentWindow will be reset to 0. So at the end of this window, if still underPressure, then underPressure stays for another whole window. Otherwise, monitor become notUnderPressure.
run through real life case,
monitor will be notUnderPressure at the beginning-> a slow consumer appears ->monitor become underPressure, and stay at underPressure for a whole window -> window ends, if slow consumer cleared out in this window, monitor become notUnderPressure.
welcome to run through more scenario with me, and let me know if this algorithm is conservative enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or if you want something even more conservative, such as :
public bool IsUnderPressure(DateTime utcNow)
{
//if any pressure contribution in current period is bigger than flowControlThreshold
//we see the cache is under pressure
bool underPressure = this.biggestPressureInCurrentPeriod > this.FlowControlThreshold;
if (underPressure)
{
//if under pressure, extend the nextCheckedTime, make sure wasUnderPressure is true for a whole window
this.wasUnderPressure = underPressure;
this.nextCheckedTime = utcNow + this.PressureWindowSize;
logger.Verbose($"Ingesting messages too fast. Throttling message reading. BiggestPressureInCurrentPeriod: {biggestPressureInCurrentPeriod}, Threshold: {FlowControlThreshold}");
this.biggestPressureInCurrentPeriod = 0;
}
else
{
if (this.nextCheckedTime < utcNow)
{
//at the end of each check period, reset biggestPressureInCurrentPeriod
this.nextCheckedTime = utcNow + this.PressureWindowSize;
this.biggestPressureInCurrentPeriod = 0;
this.wasUnderPressure = underPressure;
if(!this.wasUnderPressure)
logger.Verbose($"Message ingestion is healthy. BiggestPressureInCurrentPeriod: {biggestPressureInCurrentPeriod}, Threshold: {FlowControlThreshold}");
}
}
return this.wasUnderPressure;
}
isUnderPressure will be true until slow consumer catch up to above threshold, and be healthy for a whole window
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logic works. It immediately stops reading if there is a cursor past the threshold, but if currently under pressure or no cursor is past the threshold, it only checks again every window.
@dotnet-bot test this please |
functionals do succeed for me in vso . |
@dotnet-bot test netfx-functional |
/// </summary> | ||
public class SlowConsumingPressureMonitor : ICachePressureMonitor | ||
{ | ||
private static TimeSpan DefaultPressureWindowSize = TimeSpan.FromMinutes(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defaults should probably be public, so users can specify them in configurations if they want to be explicit.
mostly finished coding, still need to test it. submit for early feedback