Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SlowConsumingPressureMonitor #2873

Merged
merged 8 commits into from
Apr 7, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/Orleans/Providers/IOrleansProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ public static int GetIntProperty(this IProviderConfiguration config, string key,
return config.Properties.TryGetValue(key, out s) ? int.Parse(s) : settingDefault;
}

public static bool TryGetDoubleProperty(this IProviderConfiguration config, string key, out double setting)
{
if (config == null)
{
throw new ArgumentNullException("config");
}
string s;
setting = 0;
return config.Properties.TryGetValue(key, out s) ? double.TryParse(s, out setting) : false;
}

public static string GetProperty(this IProviderConfiguration config, string key, string settingDefault)
{
if (config == null)
Expand Down Expand Up @@ -163,6 +174,17 @@ public static TimeSpan GetTimeSpanProperty(this IProviderConfiguration config, s
string s;
return config.Properties.TryGetValue(key, out s) ? TimeSpan.Parse(s) : settingDefault;
}

public static bool TryGetTimeSpanProperty(this IProviderConfiguration config, string key, out TimeSpan setting)
{
if (config == null)
{
throw new ArgumentNullException("config");
}
string s;
setting = TimeSpan.Zero;
return config.Properties.TryGetValue(key, out s) ? TimeSpan.TryParse(s, out setting) : false;
}
}

/// <summary>
Expand Down
4 changes: 4 additions & 0 deletions src/OrleansServiceBus/OrleansServiceBus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
</Compile>
<Compile Include="ErrorCode.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Providers\Streams\EventHub\CachePressureMonitors\AggregatedCachePressureMonitor.cs" />
<Compile Include="Providers\Streams\EventHub\CachePressureMonitors\AveragingCachePressureMonitor.cs" />
<Compile Include="Providers\Streams\EventHub\CachePressureMonitors\ICachePressureMonitor.cs" />
<Compile Include="Providers\Streams\EventHub\CachePressureMonitors\SlowConsumingPressureMonitor.cs" />
<Compile Include="Providers\Streams\EventHub\DefaultEventHubReceiverMonitor.cs" />
<Compile Include="Providers\Streams\EventHub\EventDataExtensions.cs" />
<Compile Include="Providers\Streams\EventHub\EventHubAdapterFactory.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.ServiceBus.Providers
{
/// <summary>
/// Aggregated cache pressure monitor
/// </summary>
public class AggregatedCachePressureMonitor : List<ICachePressureMonitor>, ICachePressureMonitor
{
/// <summary>
/// Record cache pressure to every monitor in this aggregated cache monitor group
/// </summary>
/// <param name="cachePressureContribution"></param>
public void RecordCachePressureContribution(double cachePressureContribution)
{
this.ForEach(monitor =>
{
monitor.RecordCachePressureContribution(cachePressureContribution);
});
}

/// <summary>
/// Add one monitor to this aggregated cache monitor group
/// </summary>
/// <param name="monitor"></param>
public void AddCachePressureMonitor(ICachePressureMonitor monitor)
{
this.Add(monitor);
}

/// <summary>
/// If any mornitor in this aggregated cache monitor group is under pressure, then return true
/// </summary>
/// <param name="utcNow"></param>
/// <returns></returns>
public bool IsUnderPressure(DateTime utcNow)
{
bool isUnderPressure = false;

this.ForEach(monitor =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Link statement would be much cleaner.

return this.Any(monitor => monitor.IsUnderPressure(utcNow);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

{
if (monitor.IsUnderPressure(utcNow))
{
isUnderPressure = true;
}
});
return isUnderPressure;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using Orleans.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.ServiceBus.Providers
{
/// <summary>
/// Cache pressure monitor whose back pressure algorithm is based on averaging pressure value
/// over all pressure contribution
/// </summary>
public class AveragingCachePressureMonitor : ICachePressureMonitor
{
/// <summary>
/// Default flow control threshold
/// </summary>
public static readonly double DefaultThreashold = 1 / 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in threshold

private static readonly TimeSpan checkPeriod = TimeSpan.FromSeconds(2);
private readonly Logger logger;

private double accumulatedCachePressure;
private double cachePressureContributionCount;
private DateTime nextCheckedTime;
private bool isUnderPressure;
private double flowControlThreshold;

/// <summary>
/// Constructor
/// </summary>
/// <param name="logger"></param>
public AveragingCachePressureMonitor(Logger logger)
:this(DefaultThreashold, logger)
{ }

/// <summary>
/// Contructor
/// </summary>
/// <param name="flowControlThreshold"></param>
/// <param name="logger"></param>
public AveragingCachePressureMonitor(double flowControlThreshold, Logger logger)
{
this.flowControlThreshold = flowControlThreshold;
this.logger = logger.GetSubLogger(this.GetType().Name);
nextCheckedTime = DateTime.MinValue;
isUnderPressure = false;
}

public void RecordCachePressureContribution(double cachePressureContribution)
{
// Weight unhealthy contributions thrice as much as healthy ones.
// This is a crude compensation for the fact that healthy consumers wil consume more often than unhealthy ones.
double weight = cachePressureContribution < flowControlThreshold ? 1.0 : 3.0;
accumulatedCachePressure += cachePressureContribution * weight;
cachePressureContributionCount += weight;
}

public bool IsUnderPressure(DateTime utcNow)
{
if (nextCheckedTime < utcNow)
{
CalculatePressure();
nextCheckedTime = utcNow + checkPeriod;
}
return isUnderPressure;
}

private void CalculatePressure()
{
// if we don't have any contributions, don't change status
if (cachePressureContributionCount < 0.5)
{
// after 5 checks with no contributions, check anyway
cachePressureContributionCount += 0.1;
return;
}

double pressure = accumulatedCachePressure / cachePressureContributionCount;
bool wasUnderPressure = isUnderPressure;
isUnderPressure = pressure > flowControlThreshold;
// If we changed state, log
if (isUnderPressure != wasUnderPressure)
{
logger.Info(isUnderPressure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the info level version of this log message should be in the aggregate pressure monitor, and the logging in the specific monitors should be verbose.
The reason for this is that a specific monitor may become healthy, but reading will still be off until all monitors are healthy. So having per monitor logging will make it very hard for users to decipher, from logs, if a queue should be reading or not. These details may be useful in verbose logging, as they would indicate what monitor is stopping reading, but this level of detail is less important, imo, than the overall flow control state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do . valid concern.

? $"Ingesting messages too fast. Throttling message reading. AccumulatedCachePressure: {accumulatedCachePressure}, Contributions: {cachePressureContributionCount}, AverageCachePressure: {pressure}, Threshold: {flowControlThreshold}"
: $"Message ingestion is healthy. AccumulatedCachePressure: {accumulatedCachePressure}, Contributions: {cachePressureContributionCount}, AverageCachePressure: {pressure}, Threshold: {flowControlThreshold}");
}
cachePressureContributionCount = 0.0;
accumulatedCachePressure = 0.0;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.ServiceBus.Providers
{
/// <summary>
/// Cache pressure monitor records pressure contribution to the cache, and determine if the cache is under pressure based on its
/// back pressure algorithm
/// </summary>
public interface ICachePressureMonitor
{
/// <summary>
/// Record cache pressure contribution to the monitor
/// </summary>
/// <param name="cachePressureContribution"></param>
void RecordCachePressureContribution(double cachePressureContribution);

/// <summary>
/// Determine if the monitor is under pressure
/// </summary>
/// <param name="utcNow"></param>
/// <returns></returns>
bool IsUnderPressure(DateTime utcNow);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
using Orleans.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.ServiceBus.Providers
{
/// <summary>
/// Pressure monitor which is in favor of the slow consumer in the cache
/// </summary>
public class SlowConsumingPressureMonitor : ICachePressureMonitor
{
private static TimeSpan DefaultCheckPeriod = TimeSpan.FromMinutes(1);
private const double DefaultFlowControlThreshold = 0.5;

/// <summary>
/// CheckPeriod
/// </summary>
public TimeSpan CheckPeriod { get; set; }
/// <summary>
/// FlowControlThreshold
/// </summary>
public double FlowControlThreshold { get; set; }

private readonly Logger logger;
private double biggestPressureInCurrentPeriod;
private DateTime nextCheckedTime;
private bool isUnderPressure;

/// <summary>
/// Constructor
/// </summary>
/// <param name="logger"></param>
public SlowConsumingPressureMonitor(Logger logger)
: this(DefaultFlowControlThreshold, DefaultCheckPeriod, logger)
{ }

/// <summary>
/// Constructor
/// </summary>
/// <param name="checkPeriod"></param>
/// <param name="logger"></param>
public SlowConsumingPressureMonitor(TimeSpan checkPeriod, Logger logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkPeriod is a confusing name. It took me some time by reading and re-reading the code to understand what it does, because it's not the time period between checks.
Maybe it should be pressure window size? Or something else not related to check time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will think of a better name. PressureWindowPeriod? size is too general for me. let me think

: this(DefaultFlowControlThreshold, checkPeriod, logger)
{
}

/// <summary>
/// Constructor
/// </summary>
/// <param name="flowControlThreshold"></param>
/// <param name="logger"></param>
public SlowConsumingPressureMonitor(double flowControlThreshold, Logger logger)
: this(flowControlThreshold, DefaultCheckPeriod, logger)
{
}

/// <summary>
/// Constructor
/// </summary>
/// <param name="flowControlThreshold"></param>
/// <param name="checkPeriod"></param>
/// <param name="logger"></param>
public SlowConsumingPressureMonitor(double flowControlThreshold, TimeSpan checkPeriod, Logger logger)
{
this.FlowControlThreshold = flowControlThreshold;
this.logger = logger.GetSubLogger(this.GetType().Name);
this.nextCheckedTime = DateTime.MinValue;
this.biggestPressureInCurrentPeriod = 0;
this.isUnderPressure = false;
this.CheckPeriod = checkPeriod;
}

public void RecordCachePressureContribution(double cachePressureContribution)
{
if (cachePressureContribution > biggestPressureInCurrentPeriod)
biggestPressureInCurrentPeriod = cachePressureContribution;
}

public bool IsUnderPressure(DateTime utcNow)
{
//if any pressure contribution in current period is bigger than flowControlThreshold
//we see the cache is under pressure
bool underPressure = this.biggestPressureInCurrentPeriod > this.FlowControlThreshold;
if (this.isUnderPressure != underPressure)
{
this.isUnderPressure = underPressure;
logger.Info(this.isUnderPressure
? $"Ingesting messages too fast. Throttling message reading. BiggestPressureInCurrentPeriod: {biggestPressureInCurrentPeriod}, Threshold: {FlowControlThreshold}"
: $"Message ingestion is healthy. BiggestPressureInCurrentPeriod: {biggestPressureInCurrentPeriod}, Threshold: {FlowControlThreshold}");
}

if (nextCheckedTime < utcNow)
{
//at the end of each check period, reset biggestPressureInCurrentPeriod
this.nextCheckedTime = utcNow + this.CheckPeriod;
this.biggestPressureInCurrentPeriod = 0;
}
return underPressure;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,27 @@ public virtual void Init(IProviderConfiguration providerCfg, string providerName
{
var bufferPool = new FixedSizeObjectPool<FixedSizeBuffer>(adapterSettings.CacheSizeMb, () => new FixedSizeBuffer(1 << 20));
var timePurge = new TimePurgePredicate(adapterSettings.DataMinTimeInCache, adapterSettings.DataMaxAgeInCache);
CacheFactory = (partition,checkpointer,cacheLogger) => new EventHubQueueCache(checkpointer, bufferPool, timePurge, cacheLogger, this.SerializationManager);
CacheFactory = (partition, checkpointer, cacheLogger) =>
{
var cache = new EventHubQueueCache(checkpointer, bufferPool, timePurge, cacheLogger, this.SerializationManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We generally try to keep lambdas small, when they get large, we tend to put them into their own call. This may be better as a CreateCacheFactory(..) function.

if (adapterSettings.AveragingCachePressureMonitorFlowControlThreshold.HasValue)
{
var avgMonitor = new AveragingCachePressureMonitor(adapterSettings.AveragingCachePressureMonitorFlowControlThreshold.Value, log);
cache.AddCachePressureMonitor(avgMonitor);
}
if (adapterSettings.SlowConsumingMonitorFlowControlCheckperiod.HasValue
|| adapterSettings.SlowConsumingMonitorFlowControlThreshold.HasValue)
{

var slowConsumeMonitor = new SlowConsumingPressureMonitor(log);
if (adapterSettings.SlowConsumingMonitorFlowControlThreshold.HasValue)
slowConsumeMonitor.FlowControlThreshold = adapterSettings.SlowConsumingMonitorFlowControlThreshold.Value;
if (adapterSettings.SlowConsumingMonitorFlowControlCheckperiod.HasValue)
slowConsumeMonitor.CheckPeriod = adapterSettings.SlowConsumingMonitorFlowControlCheckperiod.Value;
cache.AddCachePressureMonitor(slowConsumeMonitor);
}
return cache;
};
}

if (StreamFailureHandlerFactory == null)
Expand Down
Loading