Skip to content

Commit 47f67ae

Browse files
committed
YARN-4851. Metric improvements for ATS v1.5 storage components. Li Lu via junping_du.
(cherry picked from commit 06413da)
1 parent a50dc05 commit 47f67ae

File tree

5 files changed

+222
-11
lines changed

5 files changed

+222
-11
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public synchronized EntityGroupFSTimelineStore.AppLogs getAppLogs() {
6161
* Set the application logs to this cache item. The entity group should be
6262
* associated with this application.
6363
*
64-
* @param incomingAppLogs
64+
* @param incomingAppLogs Application logs this cache item mapped to
6565
*/
6666
public synchronized void setAppLogs(
6767
EntityGroupFSTimelineStore.AppLogs incomingAppLogs) {
@@ -80,18 +80,21 @@ public synchronized TimelineStore getStore() {
8080
* rescan and then load new data. The refresh process is synchronized with
8181
* other operations on the same cache item.
8282
*
83-
* @param groupId
84-
* @param aclManager
85-
* @param jsonFactory
86-
* @param objMapper
83+
* @param groupId Group id of the cache
84+
* @param aclManager ACL manager for the timeline storage
85+
* @param jsonFactory JSON factory for the storage
86+
* @param objMapper Object mapper for the storage
87+
* @param metrics Metrics to trace the status of the entity group store
8788
* @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
8889
* object filled with all entities in the group.
8990
* @throws IOException
9091
*/
9192
public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
9293
TimelineACLsManager aclManager, JsonFactory jsonFactory,
93-
ObjectMapper objMapper) throws IOException {
94+
ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics)
95+
throws IOException {
9496
if (needRefresh()) {
97+
long startTime = Time.monotonicNow();
9598
// If an application is not finished, we only update summary logs (and put
9699
// new entities into summary storage).
97100
// Otherwise, since the application is done, we can update detail logs.
@@ -106,9 +109,12 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
106109
"LeveldbCache." + groupId);
107110
store.init(config);
108111
store.start();
112+
} else {
113+
// Store is not null, the refresh is triggered by stale storage.
114+
metrics.incrCacheStaleRefreshes();
109115
}
110116
List<LogInfo> removeList = new ArrayList<>();
111-
try(TimelineDataManager tdm =
117+
try (TimelineDataManager tdm =
112118
new TimelineDataManager(store, aclManager)) {
113119
tdm.init(config);
114120
tdm.start();
@@ -133,16 +139,18 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
133139
appLogs.getDetailLogs().removeAll(removeList);
134140
}
135141
updateRefreshTimeToNow();
142+
metrics.addCacheRefreshTime(Time.monotonicNow() - startTime);
136143
} else {
137144
LOG.debug("Cache new enough, skip refreshing");
145+
metrics.incrNoRefreshCacheRead();
138146
}
139147
return store;
140148
}
141149

142150
/**
143151
* Release the cache item for the given group id.
144152
*
145-
* @param groupId
153+
* @param groupId the group id that the cache should release
146154
*/
147155
public synchronized void releaseCache(TimelineEntityGroupId groupId) {
148156
try {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,17 @@ public class EntityGroupFSTimelineStore extends CompositeService
128128
private List<TimelineEntityGroupPlugin> cacheIdPlugins;
129129
private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs;
130130

131+
@VisibleForTesting
132+
@InterfaceAudience.Private
133+
EntityGroupFSTimelineStoreMetrics metrics;
134+
131135
public EntityGroupFSTimelineStore() {
132136
super(EntityGroupFSTimelineStore.class.getSimpleName());
133137
}
134138

135139
@Override
136140
protected void serviceInit(Configuration conf) throws Exception {
141+
metrics = EntityGroupFSTimelineStoreMetrics.create();
137142
summaryStore = createSummaryStore();
138143
addService(summaryStore);
139144

@@ -171,6 +176,7 @@ protected boolean removeEldestEntry(
171176
if (cacheItem.getAppLogs().isDone()) {
172177
appIdLogMap.remove(groupId.getApplicationId());
173178
}
179+
metrics.incrCacheEvicts();
174180
return true;
175181
}
176182
return false;
@@ -316,6 +322,7 @@ protected void serviceStop() throws Exception {
316322
@InterfaceAudience.Private
317323
@VisibleForTesting
318324
int scanActiveLogs() throws IOException {
325+
long startTime = Time.monotonicNow();
319326
RemoteIterator<FileStatus> iter = list(activeRootPath);
320327
int logsToScanCount = 0;
321328
while (iter.hasNext()) {
@@ -331,6 +338,7 @@ int scanActiveLogs() throws IOException {
331338
LOG.debug("Unable to parse entry {}", name);
332339
}
333340
}
341+
metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
334342
return logsToScanCount;
335343
}
336344

@@ -423,6 +431,7 @@ void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
423431
if (!fs.delete(dirpath, true)) {
424432
LOG.error("Unable to remove " + dirpath);
425433
}
434+
metrics.incrLogsDirsCleaned();
426435
} catch (IOException e) {
427436
LOG.error("Unable to remove " + dirpath, e);
428437
}
@@ -588,6 +597,7 @@ public synchronized void parseSummaryLogs() throws IOException {
588597
@VisibleForTesting
589598
synchronized void parseSummaryLogs(TimelineDataManager tdm)
590599
throws IOException {
600+
long startTime = Time.monotonicNow();
591601
if (!isDone()) {
592602
LOG.debug("Try to parse summary log for log {} in {}",
593603
appId, appDirPath);
@@ -605,8 +615,10 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm)
605615
List<LogInfo> removeList = new ArrayList<LogInfo>();
606616
for (LogInfo log : summaryLogs) {
607617
if (fs.exists(log.getPath(appDirPath))) {
608-
log.parseForStore(tdm, appDirPath, isDone(), jsonFactory,
618+
long summaryEntityParsed
619+
= log.parseForStore(tdm, appDirPath, isDone(), jsonFactory,
609620
objMapper, fs);
621+
metrics.incrEntitiesReadToSummary(summaryEntityParsed);
610622
} else {
611623
// The log may have been removed, remove the log
612624
removeList.add(log);
@@ -615,6 +627,7 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm)
615627
}
616628
}
617629
summaryLogs.removeAll(removeList);
630+
metrics.addSummaryLogReadTime(Time.monotonicNow() - startTime);
618631
}
619632

620633
// scans for new logs and returns the modification timestamp of the
@@ -787,6 +800,7 @@ private class EntityLogCleaner implements Runnable {
787800
@Override
788801
public void run() {
789802
LOG.debug("Cleaner starting");
803+
long startTime = Time.monotonicNow();
790804
try {
791805
cleanLogs(doneRootPath, fs, logRetainMillis);
792806
} catch (Exception e) {
@@ -796,6 +810,8 @@ public void run() {
796810
} else {
797811
LOG.error("Error cleaning files", e);
798812
}
813+
} finally {
814+
metrics.addLogCleanTime(Time.monotonicNow() - startTime);
799815
}
800816
LOG.debug("Cleaner finished");
801817
}
@@ -824,11 +840,13 @@ private List<TimelineStore> getTimelineStoresFromCacheIds(
824840
if (storeForId != null) {
825841
LOG.debug("Adding {} as a store for the query", storeForId.getName());
826842
stores.add(storeForId);
843+
metrics.incrGetEntityToDetailOps();
827844
}
828845
}
829846
if (stores.size() == 0) {
830847
LOG.debug("Using summary store for {}", entityType);
831848
stores.add(this.summaryStore);
849+
metrics.incrGetEntityToSummaryOps();
832850
}
833851
return stores;
834852
}
@@ -898,7 +916,7 @@ private TimelineStore getCachedStore(TimelineEntityGroupId groupId)
898916
AppLogs appLogs = cacheItem.getAppLogs();
899917
LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
900918
store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
901-
objMapper);
919+
objMapper, metrics);
902920
} else {
903921
LOG.warn("AppLogs for group id {} is null", groupId);
904922
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.timeline;
19+
20+
import org.apache.hadoop.metrics2.MetricsSystem;
21+
import org.apache.hadoop.metrics2.annotation.Metric;
22+
import org.apache.hadoop.metrics2.annotation.Metrics;
23+
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
24+
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
25+
import org.apache.hadoop.metrics2.lib.MutableStat;
26+
27+
/**
28+
* This class tracks metrics for the EntityGroupFSTimelineStore. It tracks
29+
* the read and write metrics for timeline server v1.5. It serves as a
30+
* complement to {@link TimelineDataManagerMetrics}.
31+
*/
32+
@Metrics(about="Metrics for EntityGroupFSTimelineStore", context="yarn")
33+
public class EntityGroupFSTimelineStoreMetrics {
34+
private static final String DEFAULT_VALUE_WITH_SCALE = "TimeMs";
35+
36+
// General read related metrics
37+
@Metric("getEntity calls to summary storage")
38+
private MutableCounterLong getEntityToSummaryOps;
39+
40+
@Metric("getEntity calls to detail storage")
41+
private MutableCounterLong getEntityToDetailOps;
42+
43+
// Summary data related metrics
44+
@Metric(value = "summary log read ops and time",
45+
valueName = DEFAULT_VALUE_WITH_SCALE)
46+
private MutableStat summaryLogRead;
47+
48+
@Metric("entities read into the summary storage")
49+
private MutableCounterLong entitiesReadToSummary;
50+
51+
// Detail data cache related metrics
52+
@Metric("cache storage read that does not require a refresh")
53+
private MutableCounterLong noRefreshCacheRead;
54+
55+
@Metric("cache storage refresh due to the cached storage is stale")
56+
private MutableCounterLong cacheStaleRefreshes;
57+
58+
@Metric("cache storage evicts")
59+
private MutableCounterLong cacheEvicts;
60+
61+
@Metric(value = "cache storage refresh ops and time",
62+
valueName = DEFAULT_VALUE_WITH_SCALE)
63+
private MutableStat cacheRefresh;
64+
65+
// Log scanner and cleaner related metrics
66+
@Metric(value = "active log scan ops and time",
67+
valueName = DEFAULT_VALUE_WITH_SCALE)
68+
private MutableStat activeLogDirScan;
69+
70+
@Metric(value = "log cleaner purging ops and time",
71+
valueName = DEFAULT_VALUE_WITH_SCALE)
72+
private MutableStat logClean;
73+
74+
@Metric("log cleaner dirs purged")
75+
private MutableCounterLong logsDirsCleaned;
76+
77+
private static EntityGroupFSTimelineStoreMetrics instance = null;
78+
79+
EntityGroupFSTimelineStoreMetrics() {
80+
}
81+
82+
public static synchronized EntityGroupFSTimelineStoreMetrics create() {
83+
if (instance == null) {
84+
MetricsSystem ms = DefaultMetricsSystem.instance();
85+
instance = ms.register(new EntityGroupFSTimelineStoreMetrics());
86+
}
87+
return instance;
88+
}
89+
90+
// Setters
91+
// General read related
92+
public void incrGetEntityToSummaryOps() {
93+
getEntityToSummaryOps.incr();
94+
}
95+
96+
public void incrGetEntityToDetailOps() {
97+
getEntityToDetailOps.incr();
98+
}
99+
100+
// Summary data related
101+
public void addSummaryLogReadTime(long msec) {
102+
summaryLogRead.add(msec);
103+
}
104+
105+
public void incrEntitiesReadToSummary(long delta) {
106+
entitiesReadToSummary.incr(delta);
107+
}
108+
109+
// Cache related
110+
public void incrNoRefreshCacheRead() {
111+
noRefreshCacheRead.incr();
112+
}
113+
114+
public void incrCacheStaleRefreshes() {
115+
cacheStaleRefreshes.incr();
116+
}
117+
118+
public void incrCacheEvicts() {
119+
cacheEvicts.incr();
120+
}
121+
122+
public void addCacheRefreshTime(long msec) {
123+
cacheRefresh.add(msec);
124+
}
125+
126+
// Log scanner and cleaner related
127+
public void addActiveLogDirScanTime(long msec) {
128+
activeLogDirScan.add(msec);
129+
}
130+
131+
public void addLogCleanTime(long msec) {
132+
logClean.add(msec);
133+
}
134+
135+
public void incrLogsDirsCleaned() {
136+
logsDirsCleaned.incr();
137+
}
138+
139+
// Getters
140+
MutableCounterLong getEntitiesReadToSummary() {
141+
return entitiesReadToSummary;
142+
}
143+
144+
MutableCounterLong getLogsDirsCleaned() {
145+
return logsDirsCleaned;
146+
}
147+
148+
MutableCounterLong getGetEntityToSummaryOps() {
149+
return getEntityToSummaryOps;
150+
}
151+
152+
MutableCounterLong getGetEntityToDetailOps() {
153+
return getEntityToDetailOps;
154+
}
155+
156+
MutableStat getCacheRefresh() {
157+
return cacheRefresh;
158+
}
159+
}
160+

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,14 @@ boolean matchesGroupId(String groupId){
9898
));
9999
}
100100

101-
public void parseForStore(TimelineDataManager tdm, Path appDirPath,
101+
public long parseForStore(TimelineDataManager tdm, Path appDirPath,
102102
boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
103103
FileSystem fs) throws IOException {
104104
LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
105105
attemptDirName);
106106
Path logPath = getPath(appDirPath);
107107
FileStatus status = fs.getFileStatus(logPath);
108+
long numParsed = 0;
108109
if (status != null) {
109110
long startTime = Time.monotonicNow();
110111
try {
@@ -113,6 +114,7 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath,
113114
objMapper, fs);
114115
LOG.info("Parsed {} entities from {} in {} msec",
115116
count, logPath, Time.monotonicNow() - startTime);
117+
numParsed += count;
116118
} catch (RuntimeException e) {
117119
// If AppLogs cannot parse this log, it may be corrupted or just empty
118120
if (e.getCause() instanceof JsonParseException &&
@@ -125,6 +127,7 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath,
125127
} else {
126128
LOG.warn("{} no longer exists. Skip for scanning. ", logPath);
127129
}
130+
return numParsed;
128131
}
129132

130133
private long parsePath(TimelineDataManager tdm, Path logPath,

0 commit comments

Comments
 (0)