Skip to content

Commit

Permalink
fixed: system can run out of memory with high events rate (#358)
Browse files Browse the repository at this point in the history
* fixed: system can run out of memory when events are generated at a higher rate than they are processed

The most likely scenario is logging in cases of non throttled recurring failures that report errors at a high rate without any throttling), combined with distributed hosts that throttle the processing rate of events gathered from subsystems via CommandHandler.DequeueEvents.

Other changes:

- added the log type to the max 2 messages in 5 minutes message to make it clearer that it is a limit x type of message
  • Loading branch information
freddyrios authored Dec 17, 2024
1 parent 3751dc7 commit ac1bdf4
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
10 changes: 5 additions & 5 deletions CA_DataUploaderLib/BaseSensorBox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,16 @@ private async Task ReadSensors(MCUBoard board, string boxName, SensorSample.Inpu
}
}

void LowFrequencyLogInfo<T>(Action<T, string> logAction, T args) => LowFrequencyLog(logAction, args, ref lastLogInfoTime, ref logInfoSkipped);
void LowFrequencyLogError<T>(Action<T, string> logAction, T args) => LowFrequencyLog(logAction, args, ref lastLogErrorTime, ref logErrorSkipped);
void LowFrequencyMultilineMessage<T>(Action<T, string> logAction, T args) => LowFrequencyLog(logAction, args, ref lastMultilineMessageTime, ref multilineMessageSkipped);
void LowFrequencyLogInfo<T>(Action<T, string> logAction, T args) => LowFrequencyLog(logAction, "info", args, ref lastLogInfoTime, ref logInfoSkipped);
void LowFrequencyLogError<T>(Action<T, string> logAction, T args) => LowFrequencyLog(logAction, "error", args, ref lastLogErrorTime, ref logErrorSkipped);
void LowFrequencyMultilineMessage<T>(Action<T, string> logAction, T args) => LowFrequencyLog(logAction, "multiline", args, ref lastMultilineMessageTime, ref multilineMessageSkipped);

void LowFrequencyLog<T>(Action<T, string> logAction, T args, ref long lastLogTime, ref int logSkipped)
void LowFrequencyLog<T>(Action<T, string> logAction, string logType, T args, ref long lastLogTime, ref int logSkipped)
{
if (lastLogTime != 0 && _cmd.Time.GetElapsedTime(lastLogTime).TotalMinutes < 5)
{
if (logSkipped++ == 0)
logAction(args, $"{Environment.NewLine}Skipping further messages for this board (max 2 messages every 5 minutes)");
logAction(args, $"{Environment.NewLine}Skipping further messages for this board (max 2 {logType} messages every 5 minutes)");
return;
}

Expand Down
18 changes: 11 additions & 7 deletions CA_DataUploaderLib/CommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public sealed class CommandHandler : IDisposable
private readonly TaskCompletionSource _runningTaskTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly bool _isMultipi;
private readonly List<ChannelWriter<DataVector>> _receivedVectorsWriters = [];
private readonly Queue<EventFiredArgs> _locallyFiredEvents = new();

private readonly Channel<EventFiredArgs> _locallyFiredEvents = Channel.CreateBounded<EventFiredArgs>(
new BoundedChannelOptions(200) { FullMode = BoundedChannelFullMode.DropOldest });
/// <remarks>
/// This method is not thread safe, so in general call it from the main thread/async flow before the program cycles are started.
/// The only additional cycle that is allowed to call it is the decision cycle *before* the first decision.
Expand Down Expand Up @@ -280,20 +280,24 @@ public void OnNewVectorReceived(DataVector args)
public void FireCustomEvent(string msg, DateTime timespan, byte eventType)
{
EventFired?.Invoke(this, new EventFiredArgs(msg, eventType, timespan));
lock (_locallyFiredEvents)
_locallyFiredEvents.Enqueue(new EventFiredArgs(msg, eventType, timespan));
_locallyFiredEvents.Writer.TryWrite(new(msg, eventType, timespan));
}

/// <summary>
/// Returns a (new) list of dequeued events.
/// </summary>
/// <param name="max">The maximum number of events to dequeue.</param>
/// <remarks>
/// If events are created at a higher rate than the host processes some events will never been returned by this method.
/// Subsystems normally throttle the error generation rate, but if some code path not doing that
/// sees repeating errors that causes more than 200 events to be pending, the oldest events are dropped/skipped.
/// </remarks>
public List<EventFiredArgs>? DequeueEvents(int max = int.MaxValue)
{
List<EventFiredArgs>? list = null; // delayed initialization to avoid creating lists when there is no data.
lock (_locallyFiredEvents)
for (int i = 0; i < max && _locallyFiredEvents.TryDequeue(out var e); i++)
(list ??= []).Add(e);
var reader = _locallyFiredEvents.Reader;
for (int i = 0; i < max && reader.TryRead(out var e); i++)
(list ??= []).Add(e);
return list;
}

Expand Down

0 comments on commit ac1bdf4

Please sign in to comment.