-
Notifications
You must be signed in to change notification settings - Fork 20
/
DelayLine.java
150 lines (133 loc) · 5.32 KB
/
DelayLine.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/*
* Made with all the love in the world
* by scireum in Remshalden, Germany
*
* Copyright by scireum GmbH
* http://www.scireum.de - info@scireum.de
*/
package sirius.kernel.async;
import sirius.kernel.di.std.Part;
import sirius.kernel.di.std.Register;
import sirius.kernel.health.Counter;
import sirius.kernel.health.metrics.Metric;
import sirius.kernel.health.metrics.MetricProvider;
import sirius.kernel.health.metrics.MetricsCollector;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Permits to delay the execution of a task by a certain amount of seconds.
* <p>
* Provides a background queue, which is checked about every 500ms. All tasks,
* which delay is expired will be invoked in the given executor.
* <p>
* Note that this should not be used with large delays, but rather for short durations (e.g. less than 60 seconds,
* although this depends heavily on the use case, as there is no inherent limit other than tasks won't survive a
* system restart).
*/
@Register(classes = {DelayLine.class, BackgroundLoop.class, MetricProvider.class})
public class DelayLine extends BackgroundLoop implements MetricProvider {
private static class WaitingTask implements Runnable {
long timeout;
String executor;
Runnable task;
@Override
public void run() {
task.run();
}
}
private final List<WaitingTask> waitingTasks = new ArrayList<>();
private final Counter backgroundTasks = new Counter();
@Part
private Tasks tasks;
/**
* Queues the given task to be called after roughly the number of seconds given here.
*
* @param executor the executor to execute the task in. Use {@link Tasks#DEFAULT} is no other appropriate
* pool
* is available.
* @param delayInSeconds the number to wait in seconds. Note that the delay can be a bit longer, depending on the
* system load.
* @param task the task to execute. Note that the {@link CallContext} isn't transferred to the task being
* invoked. Use {@link #forkDelayed(String, long, Runnable)} if you need the context.
*/
public void callDelayed(@Nonnull String executor, long delayInSeconds, @Nonnull Runnable task) {
synchronized (waitingTasks) {
WaitingTask waitingTask = new WaitingTask();
waitingTask.executor = executor;
waitingTask.timeout = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delayInSeconds);
waitingTask.task = task;
waitingTasks.add(waitingTask);
}
}
/**
* Queues the given task to be called after roughly the number of seconds given here.
* <p>
* In contrast to {@link #callDelayed(String, long, Runnable)}, this will preserve the {@link CallContext} when
* invoking the <tt>task</tt>.
* </p>
*
* @param executor the executor to execute the task in. Use {@link Tasks#DEFAULT} is no other appropriate
* pool
* is available.
* @param delayInSeconds the number to wait in seconds. Note that the delay can be a bit longer, depending on the
* system load.
* @param task the task to execute
*/
public void forkDelayed(@Nonnull String executor, long delayInSeconds, @Nonnull Runnable task) {
CallContext currentContext = CallContext.getCurrent();
callDelayed(executor, delayInSeconds, () -> {
CallContext backup = CallContext.getCurrent();
try {
CallContext.setCurrent(currentContext);
task.run();
} finally {
CallContext.setCurrent(backup);
}
});
}
@Nonnull
@Override
public String getName() {
return "delay-line";
}
@Override
public double maxCallFrequency() {
return 2;
}
@Override
protected String doWork() throws Exception {
long now = System.currentTimeMillis();
int numScheduled = 0;
synchronized (waitingTasks) {
Iterator<WaitingTask> iter = waitingTasks.iterator();
while (iter.hasNext()) {
WaitingTask next = iter.next();
if (next.timeout > now) {
return numScheduled > 0 ? "Re-Scheduled Tasks: " + numScheduled : null;
}
iter.remove();
tasks.executor(next.executor).start(next);
numScheduled++;
}
}
return null;
}
@Override
public void gather(MetricsCollector collector) {
if (backgroundTasks.getCount() > 0) {
collector.differentialMetric("kernel_delay_line_tasks",
"delay-line-tasks",
"Delay-Line Tasks",
backgroundTasks.getCount(),
Metric.UNIT_PER_MIN);
}
long length;
synchronized (waitingTasks) {
length = waitingTasks.size();
}
collector.metric("kernel_delay_line_length", "delay-line-length", "Delay-Line Length", length, null);
}
}