Skip to content

Commit 5d08ffa

Browse files
committed
YARN-11182. Refactor TestAggregatedLogDeletionService: 2nd phase. Contributed by Szilard Nemeth.
1 parent 36c4be8 commit 5d08ffa

File tree

10 files changed

+1082
-433
lines changed

10 files changed

+1082
-433
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class AggregatedLogDeletionService extends AbstractService {
5959
private long checkIntervalMsecs;
6060
private LogDeletionTask task;
6161

62-
static class LogDeletionTask extends TimerTask {
62+
public static class LogDeletionTask extends TimerTask {
6363
private Configuration conf;
6464
private long retentionMillis;
6565
private String suffix = null;
@@ -101,7 +101,7 @@ public void run() {
101101
}
102102
}
103103
} catch (Throwable t) {
104-
logException("Error reading root log dir this deletion " +
104+
logException("Error reading root log dir, this deletion " +
105105
"attempt is being aborted", t);
106106
}
107107
LOG.info("aggregated log deletion finished.");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.logaggregation;
20+
21+
import org.apache.commons.lang3.StringUtils;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
24+
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
25+
26+
import java.util.List;
27+
28+
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
29+
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT;
30+
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT;
31+
32+
33+
public class LogAggregationTestUtils {
34+
public static final String REMOTE_LOG_ROOT = "target/app-logs/";
35+
36+
public static void enableFileControllers(Configuration conf,
37+
List<Class<? extends LogAggregationFileController>> fileControllers,
38+
List<String> fileControllerNames) {
39+
enableFcs(conf, REMOTE_LOG_ROOT, fileControllers, fileControllerNames);
40+
}
41+
42+
public static void enableFileControllers(Configuration conf,
43+
String remoteLogRoot,
44+
List<Class<? extends LogAggregationFileController>> fileControllers,
45+
List<String> fileControllerNames) {
46+
enableFcs(conf, remoteLogRoot, fileControllers, fileControllerNames);
47+
}
48+
49+
50+
private static void enableFcs(Configuration conf,
51+
String remoteLogRoot,
52+
List<Class<? extends LogAggregationFileController>> fileControllers,
53+
List<String> fileControllerNames) {
54+
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
55+
StringUtils.join(fileControllerNames, ","));
56+
for (int i = 0; i < fileControllers.size(); i++) {
57+
Class<? extends LogAggregationFileController> fileController = fileControllers.get(i);
58+
String controllerName = fileControllerNames.get(i);
59+
60+
conf.setClass(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, controllerName),
61+
fileController, LogAggregationFileController.class);
62+
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, controllerName),
63+
remoteLogRoot + controllerName + "/");
64+
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, controllerName),
65+
controllerName);
66+
}
67+
}
68+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java

Lines changed: 135 additions & 379 deletions
Large diffs are not rendered by default.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java

Lines changed: 24 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,6 @@
1818

1919
package org.apache.hadoop.yarn.logaggregation.filecontroller;
2020

21-
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
22-
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT;
23-
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT;
24-
import static org.junit.Assert.assertEquals;
25-
import static org.junit.Assert.assertTrue;
26-
import static org.junit.Assert.fail;
27-
28-
import java.io.File;
29-
import java.io.FileWriter;
30-
import java.io.IOException;
31-
import java.io.OutputStream;
32-
import java.io.Writer;
33-
import java.util.Arrays;
34-
import java.util.Collections;
35-
import java.util.List;
36-
import java.util.Map;
37-
38-
import org.apache.commons.lang3.StringUtils;
3921
import org.apache.hadoop.conf.Configuration;
4022
import org.apache.hadoop.conf.Configured;
4123
import org.apache.hadoop.fs.FileSystem;
@@ -56,14 +38,28 @@
5638
import org.slf4j.Logger;
5739
import org.slf4j.LoggerFactory;
5840

41+
import java.io.File;
42+
import java.io.FileWriter;
43+
import java.io.IOException;
44+
import java.io.OutputStream;
45+
import java.io.Writer;
46+
import java.util.Arrays;
47+
import java.util.Collections;
48+
import java.util.List;
49+
import java.util.Map;
50+
51+
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS;
52+
import static org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.REMOTE_LOG_ROOT;
53+
import static org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.enableFileControllers;
54+
import static org.junit.Assert.*;
55+
5956
/**
6057
* Test LogAggregationFileControllerFactory.
6158
*/
6259
public class TestLogAggregationFileControllerFactory extends Configured {
6360
private static final Logger LOG = LoggerFactory.getLogger(
6461
TestLogAggregationFileControllerFactory.class);
6562

66-
private static final String REMOTE_LOG_ROOT = "target/app-logs/";
6763
private static final String REMOTE_DEFAULT_DIR = "default/";
6864
private static final String APP_OWNER = "test";
6965

@@ -87,8 +83,7 @@ public class TestLogAggregationFileControllerFactory extends Configured {
8783
public void setup() throws IOException {
8884
Configuration conf = new YarnConfiguration();
8985
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
90-
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT +
91-
REMOTE_DEFAULT_DIR);
86+
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT + REMOTE_DEFAULT_DIR);
9287
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
9388
setConf(conf);
9489
}
@@ -143,36 +138,15 @@ public void testDefaultLogAggregationFileControllerFactory()
143138
@Test(expected = Exception.class)
144139
public void testLogAggregationFileControllerFactoryClassNotSet() {
145140
Configuration conf = getConf();
146-
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
147-
"TestLogAggregationFileController");
141+
conf.set(LOG_AGGREGATION_FILE_FORMATS, "TestLogAggregationFileController");
148142
new LogAggregationFileControllerFactory(conf);
149143
fail("TestLogAggregationFileController's class was not set, " +
150144
"but the factory creation did not fail.");
151145
}
152146

153-
private void enableFileControllers(
154-
List<Class<? extends LogAggregationFileController>> fileControllers,
155-
List<String> fileControllerNames) {
156-
Configuration conf = getConf();
157-
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
158-
StringUtils.join(fileControllerNames, ","));
159-
for (int i = 0; i < fileControllers.size(); i++) {
160-
Class<? extends LogAggregationFileController> fileController =
161-
fileControllers.get(i);
162-
String controllerName = fileControllerNames.get(i);
163-
164-
conf.setClass(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT,
165-
controllerName), fileController, LogAggregationFileController.class);
166-
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
167-
controllerName), REMOTE_LOG_ROOT + controllerName + "/");
168-
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
169-
controllerName), controllerName);
170-
}
171-
}
172-
173147
@Test
174148
public void testLogAggregationFileControllerFactory() throws Exception {
175-
enableFileControllers(ALL_FILE_CONTROLLERS, ALL_FILE_CONTROLLER_NAMES);
149+
enableFileControllers(getConf(), ALL_FILE_CONTROLLERS, ALL_FILE_CONTROLLER_NAMES);
176150
LogAggregationFileControllerFactory factory =
177151
new LogAggregationFileControllerFactory(getConf());
178152
List<LogAggregationFileController> list =
@@ -199,8 +173,7 @@ public void testLogAggregationFileControllerFactory() throws Exception {
199173

200174
@Test
201175
public void testClassConfUsed() {
202-
enableFileControllers(Collections.singletonList(
203-
LogAggregationTFileController.class),
176+
enableFileControllers(getConf(), Collections.singletonList(LogAggregationTFileController.class),
204177
Collections.singletonList("TFile"));
205178
LogAggregationFileControllerFactory factory =
206179
new LogAggregationFileControllerFactory(getConf());
@@ -215,7 +188,7 @@ public void testClassConfUsed() {
215188
@Test
216189
public void testNodemanagerConfigurationIsUsed() {
217190
Configuration conf = getConf();
218-
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
191+
conf.set(LOG_AGGREGATION_FILE_FORMATS, "TFile");
219192
LogAggregationFileControllerFactory factory =
220193
new LogAggregationFileControllerFactory(conf);
221194
LogAggregationFileController fc = factory.getFileControllerForWrite();
@@ -231,7 +204,7 @@ public void testDefaultConfUsed() {
231204
Configuration conf = getConf();
232205
conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR);
233206
conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX);
234-
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
207+
conf.set(LOG_AGGREGATION_FILE_FORMATS, "TFile");
235208

236209
LogAggregationFileControllerFactory factory =
237210
new LogAggregationFileControllerFactory(getConf());
@@ -268,20 +241,19 @@ public void postWrite(LogAggregationFileControllerContext record)
268241
}
269242

270243
@Override
271-
public void initializeWriter(LogAggregationFileControllerContext context)
272-
throws IOException {
244+
public void initializeWriter(LogAggregationFileControllerContext context) {
273245
// Do Nothing
274246
}
275247

276248
@Override
277249
public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
278-
OutputStream os) throws IOException {
250+
OutputStream os) {
279251
return false;
280252
}
281253

282254
@Override
283255
public List<ContainerLogMeta> readAggregatedLogsMeta(
284-
ContainerLogsRequest logRequest) throws IOException {
256+
ContainerLogsRequest logRequest) {
285257
return null;
286258
}
287259

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.logaggregation.testutils;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
23+
import org.apache.hadoop.yarn.api.records.ApplicationId;
24+
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
25+
26+
import java.io.IOException;
27+
import java.util.List;
28+
29+
import static org.apache.hadoop.yarn.logaggregation.testutils.MockRMClientUtils.createMockRMClient;
30+
31+
public class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionService {
32+
private final List<ApplicationId> finishedApplications;
33+
private final List<ApplicationId> runningApplications;
34+
private final Configuration conf;
35+
36+
public AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
37+
List<ApplicationId> finishedApplications) {
38+
this(runningApplications, finishedApplications, null);
39+
}
40+
41+
public AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
42+
List<ApplicationId> finishedApplications,
43+
Configuration conf) {
44+
this.runningApplications = runningApplications;
45+
this.finishedApplications = finishedApplications;
46+
this.conf = conf;
47+
}
48+
49+
@Override
50+
protected ApplicationClientProtocol createRMClient() throws IOException {
51+
try {
52+
return createMockRMClient(finishedApplications, runningApplications);
53+
} catch (Exception e) {
54+
throw new IOException(e);
55+
}
56+
}
57+
58+
@Override
59+
protected Configuration createConf() {
60+
return conf;
61+
}
62+
63+
@Override
64+
protected void stopRMClient() {
65+
// DO NOTHING
66+
}
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.logaggregation.testutils;
20+
21+
import org.apache.hadoop.fs.FileStatus;
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.yarn.api.records.ApplicationId;
24+
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
25+
26+
public class FileStatusUtils {
27+
public static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
28+
ApplicationId appId,
29+
String user, String suffix,
30+
long modificationTime) {
31+
Path path = LogAggregationUtils.getRemoteAppLogDir(
32+
remoteRootLogDir, appId, user, suffix);
33+
FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
34+
return new PathWithFileStatus(path, fileStatus);
35+
}
36+
37+
public static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
38+
return new FileStatus(0, true, 0, 0, modificationTime, path);
39+
}
40+
41+
public static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
42+
long modificationTime) {
43+
Path logPath = new Path(baseDir, childDir);
44+
FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
45+
return new PathWithFileStatus(logPath, fStatus);
46+
}
47+
48+
public static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
49+
long modificationTime) {
50+
Path logPath = new Path(baseDir, childDir);
51+
FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
52+
return new PathWithFileStatus(logPath, fStatus);
53+
}
54+
55+
public static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
56+
String user,
57+
String suffix,
58+
ApplicationId appId,
59+
long modificationTime) {
60+
Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
61+
FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
62+
return new PathWithFileStatus(bucketDir, fStatus);
63+
}
64+
65+
public static FileStatus createFileStatusWithLengthForFile(long length,
66+
long modificationTime,
67+
Path logPath) {
68+
return new FileStatus(length, false, 1, 1, modificationTime, logPath);
69+
}
70+
71+
public static FileStatus createFileStatusWithLengthForDir(long length,
72+
long modificationTime,
73+
Path logPath) {
74+
return new FileStatus(length, true, 1, 1, modificationTime, logPath);
75+
}
76+
}

0 commit comments

Comments
 (0)