-
Notifications
You must be signed in to change notification settings - Fork 20
/
AsyncExecutor.java
168 lines (150 loc) · 5.99 KB
/
AsyncExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/*
* Made with all the love in the world
* by scireum in Remshalden, Germany
*
* Copyright by scireum GmbH
* http://www.scireum.de - info@scireum.de
*/
package sirius.kernel.async;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import sirius.kernel.commons.Explain;
import sirius.kernel.commons.Strings;
import sirius.kernel.health.Average;
import sirius.kernel.health.Counter;
import sirius.kernel.health.Exceptions;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Represents an executor used by sirius to schedule background tasks.
* <p>
* Instances of this class are created and managed by {@link Tasks}. This class is only made public so it can be
* accessed for statistical reasons like ({@link #getBlocked()} or {@link #getDropped()}.
*/
public class AsyncExecutor extends ThreadPoolExecutor implements RejectedExecutionHandler {
private final String category;
private final Counter blocked = new Counter();
private final Counter dropped = new Counter();
protected Counter executed = new Counter();
protected Average duration = new Average();
private static final long DEFAULT_KEEP_ALIVE_TIME = 10;
AsyncExecutor(String category, int poolSize, int queueLength) {
super(poolSize, poolSize, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, createWorkQueue(queueLength));
this.category = category;
setThreadFactory(new ThreadFactoryBuilder().setNameFormat(category + "-%d").build());
setRejectedExecutionHandler(this);
}
private static BlockingQueue<Runnable> createWorkQueue(int queueLength) {
// Create queue with the given max. queue length
if (queueLength > 0) {
return new LinkedBlockingQueue<>(queueLength);
}
// Create a queue which will not hold any elements (no work queue)
if (queueLength < 0) {
return new SynchronousQueue<>();
}
// Create an unbounded queue
return new LinkedBlockingQueue<>();
}
@Override
@SuppressWarnings("PatternVariableCanBeUsed")
@Explain("We don't use a pattern here, as the instanceof is negated")
public void rejectedExecution(Runnable originalTask, ThreadPoolExecutor executor) {
try {
// If the executor is used by another Java framework, we cannot recover anything and
// simply try to execute while blocking the caller...
if (!(originalTask instanceof ExecutionBuilder.TaskWrapper)) {
originalTask.run();
blocked.inc();
return;
}
ExecutionBuilder.TaskWrapper wrappedTask = (ExecutionBuilder.TaskWrapper) originalTask;
// Otherwise we invoke the drop handler if present or otherwise also attempt a
// blocking execution.
if (wrappedTask.dropHandler != null) {
wrappedTask.drop();
dropped.inc();
} else if (wrappedTask.synchronizer == null) {
CallContext current = CallContext.getCurrent();
try {
wrappedTask.run();
} finally {
CallContext.setCurrent(current);
}
blocked.inc();
} else {
Exceptions.handle()
.to(Tasks.LOG)
.withSystemErrorMessage(
"The execution of a frequency scheduled task '%s' (%s) synchronized on '%s' "
+ "was rejected by: %s - Aborting!",
wrappedTask.runnable,
wrappedTask.runnable.getClass(),
wrappedTask.synchronizer,
category)
.handle();
}
} catch (Exception t) {
Exceptions.handle(Tasks.LOG, t);
}
}
@Override
public String toString() {
return Strings.apply("%s - Active: %d, Queued: %d, Executed: %d, Blocked: %d, Rejected: %d",
category,
getActiveCount(),
getQueue().size(),
executed.getCount(),
blocked.getCount(),
dropped.getCount());
}
/**
* Returns the category this executor was created for.
*
* @return the category of tasks this executor runs.
*/
public String getCategory() {
return category;
}
/**
* The number of tasks which were executed by this executor
*
* @return the number of tasks executed so far
*/
public long getExecuted() {
return executed.getCount();
}
/**
* The average duration of a task in milliseconds.
*
* @return the average execution time of a task in milliseconds
*/
public double getAverageDuration() {
return duration.getAvg();
}
/**
* The number of tasks which were executed by blocking the caller due to system overload conditions.
* <p>
* A system overload occurs if all available tasks are busy and the queue of this executor reached its limit.
*
* @return the number of blocking task executions so far.
*/
public long getBlocked() {
return blocked.getCount();
}
/**
* The number of tasks dropped due to system overload conditions.
* <p>
* A system overload occurs if all available tasks are busy and the queue of this executor reached its limit. If
* a task has a <tt>dropHandler</tt> attached, the handler is informed and the task is not executed by simply
* deleted.
*
* @return the number of dropped tasks so far.
*/
public long getDropped() {
return dropped.getCount();
}
}