-
Notifications
You must be signed in to change notification settings - Fork 0
/
EntityActionScheduler.cs
149 lines (128 loc) · 4.82 KB
/
EntityActionScheduler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Yarp.ReverseProxy.Utilities;
namespace Yarp.ReverseProxy.Service.Management
{
/// <summary>
/// Periodically invokes specified actions on registered entities.
/// </summary>
/// <remarks>
/// It creates a separate <see cref="Timer"/> for each registration which is considered
/// reasonably efficient because .NET already maintains a process-wide managed timer queue.
/// There are 2 scheduling modes supported: run once and infinite run. In "run once" mode,
/// an entity gets unscheduled after the respective timer fired for the first time whereas
/// in "infinite run" entities get repeatedly rescheduled until either they are explicitly removed
/// or the <see cref="EntityActionScheduler{T}"/> instance is disposed.
/// </remarks>
internal class EntityActionScheduler<T> : IDisposable
{
private readonly ConcurrentDictionary<T, SchedulerEntry> _entries = new ConcurrentDictionary<T, SchedulerEntry>();
private readonly Func<T, Task> _action;
private readonly bool _runOnce;
private readonly ITimerFactory _timerFactory;
private readonly TimerCallback _timerCallback;
private int _isStarted;
public EntityActionScheduler(Func<T, Task> action, bool autoStart, bool runOnce, ITimerFactory timerFactory)
{
_action = action ?? throw new ArgumentNullException(nameof(action));
_runOnce = runOnce;
_timerFactory = timerFactory ?? throw new ArgumentNullException(nameof(timerFactory));
_timerCallback = async o => await Run(o);
_isStarted = autoStart ? 1 : 0;
}
public void Dispose()
{
foreach(var entry in _entries.Values)
{
entry.Timer.Dispose();
}
}
public void Start()
{
if (Interlocked.CompareExchange(ref _isStarted, 1, 0) != 0)
{
return;
}
foreach (var entry in _entries.Values)
{
entry.Timer.Change(entry.Period, Timeout.Infinite);
}
}
public void ScheduleEntity(T entity, TimeSpan period)
{
var entry = new SchedulerEntry(entity, (long)period.TotalMilliseconds, _timerCallback, Volatile.Read(ref _isStarted) == 1, _timerFactory);
var added = _entries.TryAdd(entity, entry);
Debug.Assert(added);
}
public void ChangePeriod(T entity, TimeSpan newPeriod)
{
if (_entries.TryGetValue(entity, out var entry))
{
entry.ChangePeriod((long)newPeriod.TotalMilliseconds, Volatile.Read(ref _isStarted) == 1);
}
}
public void UnscheduleEntity(T entity)
{
if (_entries.TryRemove(entity, out var entry))
{
entry.Timer.Dispose();
}
}
public bool IsScheduled(T entity)
{
return _entries.ContainsKey(entity);
}
private async Task Run(object entryObj)
{
var entry = (SchedulerEntry)entryObj;
if (_runOnce)
{
UnscheduleEntity(entry.Entity);
}
await _action(entry.Entity);
// Check if the entity is still scheduled.
if (_entries.ContainsKey(entry.Entity))
{
entry.SetTimer();
}
}
private class SchedulerEntry
{
private long _period;
public SchedulerEntry(T entity, long period, TimerCallback timerCallback, bool autoStart, ITimerFactory timerFactory)
{
Entity = entity;
_period = period;
Timer = timerFactory.CreateTimer(timerCallback, this, autoStart ? period : Timeout.Infinite, Timeout.Infinite);
}
public T Entity { get; }
public long Period => _period;
public ITimer Timer { get; }
public void ChangePeriod(long newPeriod, bool resetTimer)
{
Interlocked.Exchange(ref _period, newPeriod);
if (resetTimer)
{
SetTimer();
}
}
public void SetTimer()
{
try
{
Timer.Change(Interlocked.Read(ref _period), Timeout.Infinite);
}
catch (ObjectDisposedException)
{
// It can be thrown if the timer has been already disposed.
// Just suppress it.
}
}
}
}
}