Skip to content

Commit 205dddb

Browse files
hotcodemachaaajisaka
authored andcommitted
YARN-8234. Improve RM system metrics publisher's performance by pushing events to timeline server in batch (#3793)
Signed-off-by: Akira Ajisaka <aajisaka@apache.org> (cherry picked from commit 00e2405) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
1 parent 9ee5265 commit 205dddb

File tree

4 files changed

+261
-27
lines changed

4 files changed

+261
-27
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,20 @@ public static boolean isAclEnabled(Configuration conf) {
618618
public static final int
619619
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10;
620620

621+
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
622+
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.batch-size";
623+
public static final int
624+
DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
625+
1000;
626+
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
627+
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.interval-seconds";
628+
public static final int DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
629+
60;
630+
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
631+
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.enable-batch";
632+
public static final boolean DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
633+
false;
634+
621635
//RM delegation token related keys
622636
public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
623637
RM_PREFIX + "delegation.key.update-interval";

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,33 @@
861861
<value>10</value>
862862
</property>
863863

864+
<property>
865+
<description>
866+
This setting enables/disables timeline server v1 publisher to publish timeline events in batch.
867+
</description>
868+
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.enable-batch</name>
869+
<value>false</value>
870+
</property>
871+
872+
<property>
873+
<description>
874+
The size of timeline server v1 publisher sending events in one request.
875+
</description>
876+
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.batch-size</name>
877+
<value>1000</value>
878+
</property>
879+
880+
<property>
881+
<description>
882+
When enable batch publishing in timeline server v1, we must avoid that the
883+
publisher waits for a batch to be filled up and hold events in buffer for long
884+
time. So we add another thread which send event's in the buffer periodically.
885+
This config sets the interval of the cyclical sending thread.
886+
</description>
887+
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.interval-seconds</name>
888+
<value>60</value>
889+
</property>
890+
864891
<property>
865892
<description>Number of diagnostics/failure messages can be saved in RM for
866893
log aggregation. It also defines the number of diagnostics/failure

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java

Lines changed: 185 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
2020

21+
import java.util.ArrayList;
2122
import java.util.HashMap;
2223
import java.util.Map;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.LinkedBlockingQueue;
27+
import java.util.concurrent.TimeUnit;
2328

24-
import org.apache.commons.logging.Log;
25-
import org.apache.commons.logging.LogFactory;
2629
import org.apache.hadoop.conf.Configuration;
2730
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
2831
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -32,6 +35,7 @@
3235
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
3336
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
3437
import org.apache.hadoop.yarn.client.api.TimelineClient;
38+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
3539
import org.apache.hadoop.yarn.event.EventHandler;
3640
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
3741
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
@@ -44,31 +48,109 @@
4448
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
4549
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
4650
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
4753

4854
/**
4955
* This class is responsible for posting application, appattempt &amp; Container
5056
* lifecycle related events to timeline service v1.
5157
*/
5258
public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
5359

54-
private static final Log LOG =
55-
LogFactory.getLog(TimelineServiceV1Publisher.class);
60+
private static final Logger LOG =
61+
LoggerFactory.getLogger(TimelineServiceV1Publisher.class);
5662

5763
public TimelineServiceV1Publisher() {
5864
super("TimelineserviceV1Publisher");
5965
}
6066

6167
private TimelineClient client;
68+
private LinkedBlockingQueue<TimelineEntity> entityQueue;
69+
private ExecutorService sendEventThreadPool;
70+
private int dispatcherPoolSize;
71+
private int dispatcherBatchSize;
72+
private int putEventInterval;
73+
private boolean isTimeLineServerBatchEnabled;
74+
private volatile boolean stopped = false;
75+
private PutEventThread putEventThread;
76+
private Object sendEntityLock;
6277

6378
@Override
6479
protected void serviceInit(Configuration conf) throws Exception {
80+
isTimeLineServerBatchEnabled =
81+
conf.getBoolean(
82+
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
83+
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED);
84+
if (isTimeLineServerBatchEnabled) {
85+
putEventInterval =
86+
conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
87+
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL)
88+
* 1000;
89+
if (putEventInterval <= 0) {
90+
throw new IllegalArgumentException(
91+
"RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL should be greater than 0");
92+
}
93+
dispatcherPoolSize = conf.getInt(
94+
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
95+
YarnConfiguration.
96+
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
97+
if (dispatcherPoolSize <= 0) {
98+
throw new IllegalArgumentException(
99+
"RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE should be greater than 0");
100+
}
101+
dispatcherBatchSize = conf.getInt(
102+
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE,
103+
YarnConfiguration.
104+
DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE);
105+
if (dispatcherBatchSize <= 1) {
106+
throw new IllegalArgumentException(
107+
"RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE should be greater than 1");
108+
}
109+
putEventThread = new PutEventThread();
110+
sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize);
111+
entityQueue = new LinkedBlockingQueue<>(dispatcherBatchSize + 1);
112+
sendEntityLock = new Object();
113+
LOG.info("Timeline service v1 batch publishing enabled");
114+
} else {
115+
LOG.info("Timeline service v1 batch publishing disabled");
116+
}
65117
client = TimelineClient.createTimelineClient();
66118
addIfService(client);
67119
super.serviceInit(conf);
68120
getDispatcher().register(SystemMetricsEventType.class,
69121
new TimelineV1EventHandler());
70122
}
71123

124+
protected void serviceStart() throws Exception {
125+
if (isTimeLineServerBatchEnabled) {
126+
stopped = false;
127+
putEventThread.start();
128+
}
129+
super.serviceStart();
130+
}
131+
132+
protected void serviceStop() throws Exception {
133+
super.serviceStop();
134+
if (isTimeLineServerBatchEnabled) {
135+
stopped = true;
136+
putEventThread.interrupt();
137+
try {
138+
putEventThread.join();
139+
SendEntity task = new SendEntity();
140+
if (!task.buffer.isEmpty()) {
141+
LOG.info("Initiating final putEntities, remaining entities left in entityQueue: {}",
142+
task.buffer.size());
143+
sendEventThreadPool.submit(task);
144+
}
145+
} finally {
146+
sendEventThreadPool.shutdown();
147+
if (!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) {
148+
sendEventThreadPool.shutdownNow();
149+
}
150+
}
151+
}
152+
}
153+
72154
@SuppressWarnings("unchecked")
73155
@Override
74156
public void appCreated(RMApp app, long createdTime) {
@@ -244,7 +326,7 @@ public void appAttemptRegistered(RMAppAttempt appAttempt,
244326
@SuppressWarnings("unchecked")
245327
@Override
246328
public void appAttemptFinished(RMAppAttempt appAttempt,
247-
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
329+
RMAppAttemptState appAttemptState, RMApp app, long finishedTime) {
248330
TimelineEntity entity =
249331
createAppAttemptEntity(appAttempt.getAppAttemptId());
250332

@@ -261,7 +343,7 @@ public void appAttemptFinished(RMAppAttempt appAttempt,
261343
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
262344
app.getFinalApplicationStatus().toString());
263345
eventInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
264-
.createApplicationAttemptState(appAttemtpState).toString());
346+
.createApplicationAttemptState(appAttemptState).toString());
265347
if (appAttempt.getMasterContainer() != null) {
266348
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
267349
appAttempt.getMasterContainer().getId().toString());
@@ -361,23 +443,68 @@ private static TimelineEntity createContainerEntity(ContainerId containerId) {
361443
}
362444

363445
private void putEntity(TimelineEntity entity) {
364-
try {
446+
if (isTimeLineServerBatchEnabled) {
447+
try {
448+
entityQueue.put(entity);
449+
if (entityQueue.size() > dispatcherBatchSize) {
450+
SendEntity task = null;
451+
synchronized (sendEntityLock) {
452+
if (entityQueue.size() > dispatcherBatchSize) {
453+
task = new SendEntity();
454+
}
455+
}
456+
if (task != null) {
457+
sendEventThreadPool.submit(task);
458+
}
459+
}
460+
} catch (Exception e) {
461+
LOG.error("Error when publishing entity batch [ " + entity.getEntityType() + ","
462+
+ entity.getEntityId() + " ] ", e);
463+
}
464+
} else {
465+
try {
466+
if (LOG.isDebugEnabled()) {
467+
LOG.debug("Publishing the entity " + entity.getEntityId()
468+
+ ", JSON-style content: "
469+
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
470+
}
471+
client.putEntities(entity);
472+
} catch (Exception e) {
473+
LOG.error("Error when publishing entity [ " + entity.getEntityType() + ","
474+
+ entity.getEntityId() + " ] ", e);
475+
}
476+
}
477+
}
478+
479+
private class SendEntity implements Runnable {
480+
481+
private ArrayList<TimelineEntity> buffer;
482+
483+
SendEntity() {
484+
buffer = new ArrayList();
485+
entityQueue.drainTo(buffer);
486+
}
487+
488+
@Override
489+
public void run() {
365490
if (LOG.isDebugEnabled()) {
366-
LOG.debug("Publishing the entity " + entity.getEntityId()
367-
+ ", JSON-style content: "
368-
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
491+
LOG.debug("Number of timeline entities being sent in batch: {}", buffer.size());
492+
}
493+
if (buffer.isEmpty()) {
494+
return;
495+
}
496+
try {
497+
client.putEntities(buffer.toArray(new TimelineEntity[0]));
498+
} catch (Exception e) {
499+
LOG.error("Error when publishing entity: ", e);
369500
}
370-
client.putEntities(entity);
371-
} catch (Exception e) {
372-
LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
373-
+ entity.getEntityId() + "]", e);
374501
}
375502
}
376503

377504
private class TimelineV1PublishEvent extends TimelinePublishEvent {
378505
private TimelineEntity entity;
379506

380-
public TimelineV1PublishEvent(SystemMetricsEventType type,
507+
TimelineV1PublishEvent(SystemMetricsEventType type,
381508
TimelineEntity entity, ApplicationId appId) {
382509
super(type, appId);
383510
this.entity = entity;
@@ -395,4 +522,46 @@ public void handle(TimelineV1PublishEvent event) {
395522
putEntity(event.getEntity());
396523
}
397524
}
398-
}
525+
526+
private class PutEventThread extends Thread {
527+
PutEventThread() {
528+
super("PutEventThread");
529+
}
530+
531+
@Override
532+
public void run() {
533+
LOG.info("System metrics publisher will put events every " +
534+
String.valueOf(putEventInterval) + " milliseconds");
535+
while (!stopped && !Thread.currentThread().isInterrupted()) {
536+
if (System.currentTimeMillis() % putEventInterval >= 1000) {
537+
try {
538+
Thread.sleep(500);
539+
} catch (InterruptedException e) {
540+
LOG.warn(SystemMetricsPublisher.class.getName()
541+
+ " is interrupted. Exiting.");
542+
break;
543+
}
544+
continue;
545+
}
546+
SendEntity task = null;
547+
synchronized (sendEntityLock) {
548+
if (LOG.isDebugEnabled()) {
549+
LOG.debug("Creating SendEntity task in PutEventThread");
550+
}
551+
task = new SendEntity();
552+
}
553+
if (task != null) {
554+
sendEventThreadPool.submit(task);
555+
}
556+
try {
557+
// sleep added to avoid multiple SendEntity task within a single interval.
558+
Thread.sleep(1000);
559+
} catch (InterruptedException e) {
560+
LOG.warn(SystemMetricsPublisher.class.getName()
561+
+ " is interrupted. Exiting.");
562+
break;
563+
}
564+
}
565+
}
566+
}
567+
}

0 commit comments

Comments
 (0)