Skip to content

Commit b75a80b

Browse files
committed
YARN-11656 RMStateStore event queue blocked
TODO
1 parent d1daf26 commit b75a80b

File tree

19 files changed

+598
-25
lines changed

19 files changed

+598
-25
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Event.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,14 @@ public interface Event<TYPE extends Enum<TYPE>> {
3232
TYPE getType();
3333
long getTimestamp();
3434
String toString();
35+
36+
/**
37+
* In case of parallel execution of events in the same dispatcher,
38+
* the result of this method will be used as semaphore.
39+
* If method returns null, then a default semaphore will be used.
40+
* @return the semaphore
41+
*/
42+
default String getLockKey() {
43+
return null;
44+
};
3545
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.event.multidispatcher;
20+
21+
import java.util.concurrent.BlockingQueue;
22+
import java.util.concurrent.LinkedBlockingQueue;
23+
import java.util.concurrent.ScheduledThreadPoolExecutor;
24+
import java.util.concurrent.ThreadPoolExecutor;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
31+
import org.apache.hadoop.conf.Configuration;
32+
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
33+
import org.apache.hadoop.service.AbstractService;
34+
import org.apache.hadoop.yarn.event.Dispatcher;
35+
import org.apache.hadoop.yarn.event.Event;
36+
import org.apache.hadoop.yarn.event.EventHandler;
37+
import org.apache.hadoop.yarn.metrics.DispatcherEventMetrics;
38+
import org.apache.hadoop.yarn.util.Clock;
39+
import org.apache.hadoop.yarn.util.MonotonicClock;
40+
41+
/**
42+
* Dispatches {@link Event}s in a parallel thread.
43+
* The {@link this#getEventHandler()} method can be used to post an event to the dispatcher.
44+
* The posted event will be added to the event queue what is polled by many thread,
45+
* based on the config values in the {@link MultiDispatcherConfig}.
46+
* The posted events can be parallel executed,
47+
* if the result of the {@link Event#getLockKey()} is different between 2 event.
48+
* If the method return with null, then a global semaphore will be used for these events.
49+
* The method usually returns with the applicationId based on the concept
50+
* parallel apps should not affect each others.
51+
* The locking logic is implemented in {@link MultiDispatcherLocks}.
52+
* The Dispatcher provides metric data using the {@link DispatcherEventMetrics}
53+
*/
54+
public class MultiDispatcher extends AbstractService implements Dispatcher {
55+
56+
private final Logger LOG;
57+
private final String dispatcherName;
58+
private final DispatcherEventMetrics metrics;
59+
private final MultiDispatcherLocks locks;
60+
private final MultiDispatcherLibrary library;
61+
private final Clock clock = new MonotonicClock();
62+
63+
private MultiDispatcherConfig config;
64+
private BlockingQueue<Runnable> eventQueue;
65+
private ThreadPoolExecutor threadPoolExecutor;
66+
private ScheduledThreadPoolExecutor monitorExecutor;
67+
68+
public MultiDispatcher(String dispatcherName) {
69+
super("Dispatcher");
70+
this.dispatcherName = dispatcherName.replaceAll(" ", "-").toLowerCase();
71+
this.LOG = LoggerFactory.getLogger(MultiDispatcher.class.getCanonicalName() + "." + this.dispatcherName);
72+
this.metrics = new DispatcherEventMetrics(this.dispatcherName);
73+
this.locks = new MultiDispatcherLocks(this.LOG);
74+
this.library = new MultiDispatcherLibrary();
75+
}
76+
77+
@Override
78+
protected void serviceInit(Configuration conf) throws Exception{
79+
super.serviceInit(conf);
80+
this.config = new MultiDispatcherConfig(getConfig(), this.dispatcherName);
81+
this.eventQueue = new LinkedBlockingQueue<>(this.config.getQueueSize());
82+
}
83+
84+
@Override
85+
protected void serviceStart() throws Exception {
86+
super.serviceStart();
87+
createWorkerPool();
88+
createMonitorThread();
89+
DefaultMetricsSystem.instance().register(
90+
"Event metrics for " + dispatcherName,
91+
"Event metrics for " + dispatcherName,
92+
metrics
93+
);
94+
}
95+
96+
@Override
97+
protected void serviceStop() throws Exception {
98+
if (monitorExecutor != null) {
99+
monitorExecutor.shutdownNow();
100+
}
101+
threadPoolExecutor.shutdown();
102+
threadPoolExecutor.awaitTermination(config.getGracefulStopSeconds(), TimeUnit.SECONDS);
103+
int terminatedSize = threadPoolExecutor.shutdownNow().size();
104+
if (0 < terminatedSize) {
105+
LOG.error("{} tasks not finished in time, so they were terminated", terminatedSize);
106+
}
107+
}
108+
109+
@Override
110+
public EventHandler getEventHandler() {
111+
return event -> {
112+
if (isInState(STATE.STOPPED)) {
113+
LOG.warn("Discard event {} because stopped state", event);
114+
} else {
115+
EventHandler handler = library.getEventHandler(event);
116+
threadPoolExecutor.execute(createRunnable(event, handler));
117+
metrics.addEvent(event.getType());
118+
}
119+
};
120+
}
121+
122+
@Override
123+
public void register(Class<? extends Enum> eventType, EventHandler handler) {
124+
library.register(eventType, handler);
125+
metrics.init(eventType);
126+
}
127+
128+
private Runnable createRunnable(Event event, EventHandler handler) {
129+
return () -> {
130+
LOG.debug("{} handle {}, queue size: {}",
131+
Thread.currentThread().getName(), event.getClass().getSimpleName(), eventQueue.size());
132+
locks.lock(event);
133+
long start = clock.getTime();
134+
try {
135+
handler.handle(event);
136+
} finally {
137+
metrics.updateRate(event.getType(), clock.getTime() - start);
138+
locks.unLock(event);
139+
metrics.removeEvent(event.getType());
140+
}
141+
};
142+
}
143+
144+
private void createWorkerPool() {
145+
this.threadPoolExecutor = new ThreadPoolExecutor(
146+
config.getDefaultPoolSize(),
147+
config.getMaxPoolSize(),
148+
config.getKeepAliveSeconds(),
149+
TimeUnit.SECONDS,
150+
eventQueue,
151+
new BasicThreadFactory.Builder()
152+
.namingPattern(this.dispatcherName + "-worker-%d")
153+
.build()
154+
);
155+
}
156+
157+
private void createMonitorThread() {
158+
int interval = config.getMonitorSeconds();
159+
if (interval < 1) {
160+
return;
161+
}
162+
this.monitorExecutor = new ScheduledThreadPoolExecutor(
163+
1,
164+
new BasicThreadFactory.Builder()
165+
.namingPattern(this.dispatcherName + "-monitor-%d")
166+
.build());
167+
monitorExecutor.scheduleAtFixedRate(() -> {
168+
int size = eventQueue.size();
169+
if (0 < size) {
170+
LOG.info("Event queue size is {}", size);
171+
LOG.debug("Metrics: {}", metrics);
172+
}
173+
},10, interval, TimeUnit.SECONDS);
174+
}
175+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.event.multidispatcher;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
23+
/**
24+
* All the config what can be use in the {@link MultiDispatcher}
25+
*/
26+
class MultiDispatcherConfig extends Configuration {
27+
28+
private final String prefix;
29+
30+
public MultiDispatcherConfig(Configuration configuration, String dispatcherName) {
31+
super(configuration);
32+
this.prefix = String.format("yarn.dispatcher.multi-thread.%s.", dispatcherName);
33+
}
34+
35+
public int getDefaultPoolSize() {
36+
return super.getInt(prefix + "default-pool-size", 4);
37+
}
38+
39+
public int getMaxPoolSize() {
40+
return super.getInt(prefix + "max-pool-size", 8);
41+
}
42+
43+
public int getKeepAliveSeconds() {
44+
return super.getInt(prefix + "keep-alive-seconds", 10);
45+
}
46+
47+
public int getQueueSize() {
48+
return super.getInt(prefix + "queue-size", 1_000_000);
49+
}
50+
51+
public int getMonitorSeconds() {
52+
return super.getInt(prefix + "monitor-seconds", 30);
53+
}
54+
55+
public int getGracefulStopSeconds() {
56+
return super.getInt(prefix + "graceful-stop-seconds", 60);
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.event.multidispatcher;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
import org.apache.hadoop.yarn.event.Event;
25+
import org.apache.hadoop.yarn.event.EventHandler;
26+
27+
28+
class MultiDispatcherLibrary {
29+
30+
private final Map<String, EventHandler> LIB = new HashMap<>();
31+
32+
public EventHandler getEventHandler(Event e) {
33+
EventHandler handler = LIB.get(e.getType().getClass().getCanonicalName());
34+
if (handler == null) {
35+
throw new Error("EventHandler for " + e.getType() + ", was not found in " + LIB.keySet());
36+
}
37+
return handler;
38+
}
39+
40+
public void register(Class<? extends Enum> eventType, EventHandler handler) {
41+
LIB.put(eventType.getCanonicalName(), handler);
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.event.multidispatcher;
20+
21+
import java.util.Set;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.locks.Lock;
24+
import java.util.concurrent.locks.ReentrantLock;
25+
26+
import org.slf4j.Logger;
27+
28+
import org.apache.hadoop.yarn.event.Event;
29+
30+
/**
31+
* The locking logic of the {@link MultiDispatcher}
32+
*/
33+
class MultiDispatcherLocks {
34+
private final Logger LOG;
35+
private final Set<String> lockKeys= ConcurrentHashMap.newKeySet();
36+
private final Lock globalLock = new ReentrantLock();
37+
38+
public MultiDispatcherLocks(Logger LOG) {
39+
this.LOG = LOG;
40+
}
41+
42+
void lock(Event event) {
43+
String lockKey = event.getLockKey();
44+
debugLockLog(lockKey, "lock", event);
45+
if (lockKey == null) {
46+
globalLock.lock();
47+
} else {
48+
lockKeys.add(lockKey);
49+
}
50+
debugLockLog(lockKey, "locked", event);
51+
}
52+
53+
void unLock(Event event) {
54+
String lockKey = event.getLockKey();
55+
debugLockLog(lockKey, "unlock", event);
56+
if (lockKey == null) {
57+
globalLock.unlock();
58+
} else {
59+
lockKeys.remove(lockKey);
60+
}
61+
debugLockLog(lockKey, "unlocked", event);
62+
}
63+
64+
private void debugLockLog(String lockKey, String command, Event event) {
65+
LOG.debug("{} {} with thread {} for event {}",
66+
lockKey, command, Thread.currentThread().getName(), event.getClass().getSimpleName());
67+
}
68+
}

0 commit comments

Comments
 (0)