Skip to content

Commit 465a0de

Browse files
committed
YARN-11656 RMStateStore event queue blocked
- fix style
1 parent 0654211 commit 465a0de

File tree

6 files changed

+85
-23
lines changed

6 files changed

+85
-23
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
* Dispatches {@link Event}s in a parallel thread.
4444
* The {@link Dispatcher#getEventHandler()} method can be used to post an event to the dispatcher.
4545
* The posted event will be separated based on the hashcode of the {@link Event#getLockKey()}.
46+
* If the getLockKey() method returns with null,
47+
* then the first executor thread will be used as default worker
4648
* The {@link MultiDispatcherConfig} contains the information,
4749
* how many thread will be used, for parallel execution.
4850
* The {@link MultiDispatcherExecutor} contains the worker threads, which handle the events.
@@ -65,7 +67,6 @@ public MultiDispatcher(String dispatcherName) {
6567
super("Dispatcher");
6668
this.dispatcherName = dispatcherName.replaceAll(" ", "-").toLowerCase();
6769
this.log = LoggerFactory.getLogger(MultiDispatcher.class.getCanonicalName() + "." + this.dispatcherName);
68-
this.metrics = new DispatcherEventMetricsImpl(this.dispatcherName);
6970
this.library = new MultiDispatcherLibrary();
7071
}
7172

@@ -74,6 +75,7 @@ protected void serviceInit(Configuration conf) throws Exception{
7475
super.serviceInit(conf);
7576
MultiDispatcherConfig config = new MultiDispatcherConfig(getConfig(), dispatcherName);
7677
workerExecutor = new MultiDispatcherExecutor(log, config, dispatcherName);
78+
workerExecutor.start();
7779
createMonitorThread(config);
7880
if (config.getMetricsEnabled()) {
7981
metrics = new DispatcherEventMetricsImpl(dispatcherName);
@@ -89,6 +91,9 @@ protected void serviceInit(Configuration conf) throws Exception{
8991

9092
@Override
9193
protected void serviceStop() throws Exception {
94+
if (monitorExecutor != null) {
95+
monitorExecutor.shutdown();
96+
}
9297
workerExecutor.stop();
9398
}
9499

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,45 @@ public MultiDispatcherConfig(Configuration configuration, String dispatcherName)
3232
this.prefix = String.format("yarn.dispatcher.multi-thread.%s.", dispatcherName);
3333
}
3434

35+
/**
36+
* How many executor thread should be created to handle the incoming events
37+
* @return configured value, or default 4
38+
*/
3539
public int getDefaultPoolSize() {
3640
return super.getInt(prefix + "default-pool-size", 4);
3741
}
3842

43+
/**
44+
* Maximus size of the event queue of the executor threads.
45+
* If limit is reached then the queue#add method will block.
46+
* @return configured value, or default 1_000_000
47+
*/
3948
public int getQueueSize() {
40-
return super.getInt(prefix + "queue-size", 10_000_000);
49+
return super.getInt(prefix + "queue-size", 1_000_000);
4150
}
4251

52+
/**
53+
* How frequently the monitor thread should write the state of the dispatcher to the LOG.
54+
* If less than 1 this monitoring will be turned off.
55+
* @return configured value, or default 0
56+
*/
4357
public int getMonitorSeconds() {
44-
return super.getInt(prefix + "monitor-seconds", 30);
58+
return super.getInt(prefix + "monitor-seconds", 0);
4559
}
4660

61+
/**
62+
* How long should the dispatcher wait to drain all event queue of workers,
63+
* after stop signal is received.
64+
* @return configured value, or default 60
65+
*/
4766
public int getGracefulStopSeconds() {
4867
return super.getInt(prefix + "graceful-stop-seconds", 60);
4968
}
5069

70+
/**
71+
* Dispatcher metrics should be published to the metric system.
72+
* @return configured value, or default false
73+
*/
5174
public boolean getMetricsEnabled() {
5275
return super.getBoolean(prefix + "metrics-enabled", false);
5376
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,32 +35,39 @@
3535
*/
3636
public class MultiDispatcherExecutor {
3737

38-
private final Logger LOG;
38+
private final Logger log;
3939
private final MultiDispatcherConfig config;
4040
private final MultiDispatcherExecutorThread[] threads;
4141
private final Clock clock = new MonotonicClock();
4242

4343
public MultiDispatcherExecutor(
44-
Logger LOG,
44+
Logger log,
4545
MultiDispatcherConfig config,
4646
String dispatcherName
4747
) {
48-
this.LOG = LOG;
48+
this.log = log;
4949
this.config = config;
5050
this.threads = new MultiDispatcherExecutorThread[config.getDefaultPoolSize()];
5151
ThreadGroup group = new ThreadGroup(dispatcherName);
52-
for (int i=0; i < threads.length; ++i) {
52+
for (int i = 0; i < threads.length; ++i) {
5353
threads[i] = new MultiDispatcherExecutorThread(group, i, config.getQueueSize());
54-
threads[i].start();
54+
}
55+
}
56+
57+
public void start() {
58+
for(Thread t : threads) {
59+
t.start();
5560
}
5661
}
5762

5863
public void execute(Event event, Runnable runnable) {
5964
String lockKey = event.getLockKey();
60-
int threadIndex = lockKey == null ? 0 : Math.abs(lockKey.hashCode()) % threads.length;
65+
// abs of Integer.MIN_VALUE is Integer.MIN_VALUE
66+
int threadIndex = lockKey == null || lockKey.hashCode() == Integer.MIN_VALUE ?
67+
0 : Math.abs(lockKey.hashCode() % threads.length);
6168
MultiDispatcherExecutorThread thread = threads[threadIndex];
6269
thread.add(runnable);
63-
LOG.debug("The {} with lock key {} will be handled by {}",
70+
log.trace("The {} with lock key {} will be handled by {}",
6471
event.getType(), lockKey, thread.getName());
6572
}
6673

@@ -70,7 +77,7 @@ public void stop() throws InterruptedException {
7077
if (Arrays.stream(threads).anyMatch(t -> 0 < t.queueSize())
7178
// and not timeout yet
7279
&& clock.getTime() < timeOut) {
73-
LOG.debug("Not all event queue is empty, waiting to drain ...");
80+
log.debug("Not all event queue is empty, waiting to drain ...");
7481
Thread.sleep(1_000);
7582
}
7683
for (MultiDispatcherExecutorThread thread : threads) {
@@ -108,7 +115,7 @@ public void run() {
108115
queue.take().run();
109116
}
110117
} catch (InterruptedException e) {
111-
LOG.warn("{} get interrupted", getName());
118+
log.warn("{} get interrupted", getName());
112119
}
113120
}
114121
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,34 @@
1919

2020
import org.apache.hadoop.metrics2.MetricsSource;
2121

22+
/**
23+
* Interface for {@link org.apache.hadoop.yarn.event.Dispatcher}
24+
* can be used to publish {@link org.apache.hadoop.yarn.event.Event} related metrics
25+
*/
2226
public interface DispatcherEventMetrics extends MetricsSource {
2327

28+
/**
29+
* Class of the event type what can be handled by the DispatcherEventMetrics
30+
* @param typeClass the event type
31+
*/
2432
void init(Class<? extends Enum> typeClass);
2533

34+
/**
35+
* Call if Event added for dispatching
36+
* @param type type of the event
37+
*/
2638
void addEvent(Object type);
2739

40+
/**
41+
* Call if Event handled
42+
* @param type type of the event
43+
*/
2844
void removeEvent(Object type);
2945

46+
/**
47+
* Call with how much time was required to handle the event
48+
* @param type type of the event
49+
* @param millisecond time interval
50+
*/
3051
void updateRate(Object type, long millisecond);
3152
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
3030
import org.apache.hadoop.metrics2.lib.MutableRate;
3131

32+
/**
33+
* Metric object for {@link org.apache.hadoop.yarn.event.multidispatcher.MultiDispatcher}
34+
*/
3235
@InterfaceAudience.Private
3336
@Metrics(context="yarn")
3437
public class DispatcherEventMetricsImpl implements DispatcherEventMetrics {
@@ -53,11 +56,12 @@ public void getMetrics(MetricsCollector collector, boolean all) {
5356

5457
@Override
5558
public void init(Class<? extends Enum> typeClass) {
56-
for(Object c : typeClass.getEnumConstants()) {
57-
String key = createKey(c);
58-
currentEventCountMetrics.put(key, this.registry.newGauge(
59-
key + "_Current", key + "_Current", 0L));
60-
processingTimeMetrics.put(key, this.registry.newRate(key + "_", key+ "_"));
59+
for(Object constant : typeClass.getEnumConstants()) {
60+
String key = createKey(constant);
61+
currentEventCountMetrics.put(key,
62+
registry.newGauge(key + "_Current", key + "_Current", 0L));
63+
processingTimeMetrics.put(key,
64+
registry.newRate(key + "_", key + "_"));
6165
}
6266
}
6367

@@ -87,5 +91,4 @@ public String toString() {
8791
.add("processingTimeMetrics=" + processingTimeMetrics)
8892
.toString();
8993
}
90-
9194
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
import org.apache.hadoop.metrics2.MetricsCollector;
2323

24+
/**
25+
* Used if metric publication should be disabled
26+
*/
2427
public class DispatcherEventMetricsNoOps implements DispatcherEventMetrics {
2528

2629
private final Logger log;
@@ -31,26 +34,26 @@ public DispatcherEventMetricsNoOps(Logger log) {
3134

3235
@Override
3336
public void getMetrics(MetricsCollector collector, boolean all) {
34-
log.debug("called getMetrics");
37+
log.trace("called getMetrics");
3538
}
3639

3740
@Override
3841
public void init(Class<? extends Enum> typeClass) {
39-
log.debug("called init");
42+
log.trace("called init");
4043
}
4144

4245
@Override
4346
public void addEvent(Object type) {
44-
log.debug("called addEvent");
47+
log.trace("called addEvent");
4548
}
4649

4750
@Override
4851
public void removeEvent(Object type) {
49-
log.debug("called removeEvent");
52+
log.trace("called removeEvent");
5053
}
5154

5255
@Override
5356
public void updateRate(Object type, long millisecond) {
54-
log.debug("called updateRate");
57+
log.trace("called updateRate");
5558
}
5659
}

0 commit comments

Comments
 (0)