Skip to content

Commit 9aca734

Browse files
HADOOP-19280. [ABFS] Initialize client timer only if metric collection is enabled (#7061)
Contributed by Manish Bhatt
1 parent a9b7913 commit 9aca734

File tree

2 files changed

+153
-6
lines changed

2 files changed

+153
-6
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@
131131
public abstract class AbfsClient implements Closeable {
132132
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
133133
public static final String HUNDRED_CONTINUE_USER_AGENT = SINGLE_WHITE_SPACE + HUNDRED_CONTINUE + SEMICOLON;
134+
public static final String ABFS_CLIENT_TIMER_THREAD_NAME = "abfs-timer-client";
134135

135136
private final URL baseUrl;
136137
private final SharedKeyCredentials sharedKeyCredentials;
@@ -149,7 +150,7 @@ public abstract class AbfsClient implements Closeable {
149150
private AccessTokenProvider tokenProvider;
150151
private SASTokenProvider sasTokenProvider;
151152
private final AbfsCounters abfsCounters;
152-
private final Timer timer;
153+
private Timer timer;
153154
private final String abfsMetricUrl;
154155
private boolean isMetricCollectionEnabled = false;
155156
private final MetricFormat metricFormat;
@@ -258,9 +259,9 @@ private AbfsClient(final URL baseUrl,
258259
throw new IOException("Exception while initializing metric credentials " + e);
259260
}
260261
}
261-
this.timer = new Timer(
262-
"abfs-timer-client", true);
263262
if (isMetricCollectionEnabled) {
263+
this.timer = new Timer(
264+
ABFS_CLIENT_TIMER_THREAD_NAME, true);
264265
timer.schedule(new TimerTaskImpl(),
265266
metricIdlePeriod,
266267
metricIdlePeriod);
@@ -292,9 +293,9 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent
292293

293294
@Override
294295
public void close() throws IOException {
295-
if (runningTimerTask != null) {
296+
if (isMetricCollectionEnabled && runningTimerTask != null) {
296297
runningTimerTask.cancel();
297-
timer.purge();
298+
timer.cancel();
298299
}
299300
if (keepAliveCache != null) {
300301
keepAliveCache.close();
@@ -1418,7 +1419,7 @@ private TracingContext getMetricTracingContext() {
14181419
boolean timerOrchestrator(TimerFunctionality timerFunctionality, TimerTask timerTask) {
14191420
switch (timerFunctionality) {
14201421
case RESUME:
1421-
if (isMetricCollectionStopped.get()) {
1422+
if (isMetricCollectionEnabled && isMetricCollectionStopped.get()) {
14221423
synchronized (this) {
14231424
if (isMetricCollectionStopped.get()) {
14241425
resumeTimer();
@@ -1597,6 +1598,11 @@ KeepAliveCache getKeepAliveCache() {
15971598
return keepAliveCache;
15981599
}
15991600

1601+
@VisibleForTesting
1602+
protected Timer getTimer() {
1603+
return timer;
1604+
}
1605+
16001606
protected String getUserAgent() {
16011607
return userAgent;
16021608
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
import java.net.URI;
22+
import java.net.URL;
23+
import java.util.Map;
24+
25+
import org.assertj.core.api.Assertions;
26+
import org.junit.Test;
27+
import org.mockito.Mockito;
28+
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
31+
import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
32+
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
33+
import org.apache.hadoop.fs.azurebfs.utils.Base64;
34+
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
35+
36+
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_KEY;
37+
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_NAME;
38+
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
39+
import static org.apache.hadoop.fs.azurebfs.services.AbfsClient.ABFS_CLIENT_TIMER_THREAD_NAME;
40+
41+
/**
42+
* Unit test cases for the AbfsClient class.
43+
*/
44+
public class TestAbfsClient {
45+
private static final String ACCOUNT_NAME = "bogusAccountName.dfs.core.windows.net";
46+
private static final String ACCOUNT_KEY = "testKey";
47+
private static final long SLEEP_DURATION_MS = 500;
48+
49+
/**
50+
* Test the initialization of the AbfsClient timer when metric collection is disabled.
51+
* In case of metric collection being disabled, the timer should not be initialized.
52+
* Asserting that the timer is null and the abfs-timer-client thread is not running.
53+
*/
54+
@Test
55+
public void testTimerInitializationWithoutMetricCollection() throws Exception {
56+
final Configuration configuration = new Configuration();
57+
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, ACCOUNT_NAME);
58+
59+
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
60+
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
61+
62+
// Get an instance of AbfsClient.
63+
AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"),
64+
null,
65+
abfsConfiguration,
66+
(AccessTokenProvider) null,
67+
null,
68+
abfsClientContext);
69+
70+
Assertions.assertThat(client.getTimer())
71+
.describedAs("Timer should not be initialized")
72+
.isNull();
73+
74+
// Check if a thread with the name "abfs-timer-client" exists
75+
Assertions.assertThat(isThreadRunning(ABFS_CLIENT_TIMER_THREAD_NAME))
76+
.describedAs("Expected thread 'abfs-timer-client' not found")
77+
.isEqualTo(false);
78+
client.close();
79+
}
80+
81+
/**
82+
* Test the initialization of the AbfsClient timer when metric collection is enabled.
83+
* In case of metric collection being enabled, the timer should be initialized.
84+
* Asserting that the timer is not null and the abfs-timer-client thread is running.
85+
* Also, asserting that the thread is removed after closing the client.
86+
*/
87+
@Test
88+
public void testTimerInitializationWithMetricCollection() throws Exception {
89+
final Configuration configuration = new Configuration();
90+
configuration.set(FS_AZURE_METRIC_FORMAT, String.valueOf(MetricFormat.INTERNAL_BACKOFF_METRIC_FORMAT));
91+
configuration.set(FS_AZURE_METRIC_ACCOUNT_NAME, ACCOUNT_NAME);
92+
configuration.set(FS_AZURE_METRIC_ACCOUNT_KEY, Base64.encode(ACCOUNT_KEY.getBytes()));
93+
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, ACCOUNT_NAME);
94+
95+
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
96+
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
97+
98+
// Get an instance of AbfsClient.
99+
AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"),
100+
null,
101+
abfsConfiguration,
102+
(AccessTokenProvider) null,
103+
null,
104+
abfsClientContext);
105+
106+
Assertions.assertThat(client.getTimer())
107+
.describedAs("Timer should be initialized")
108+
.isNotNull();
109+
110+
// Check if a thread with the name "abfs-timer-client" exists
111+
Assertions.assertThat(isThreadRunning(ABFS_CLIENT_TIMER_THREAD_NAME))
112+
.describedAs("Expected thread 'abfs-timer-client' not found")
113+
.isEqualTo(true);
114+
client.close();
115+
116+
// Check if the thread is removed after closing the client
117+
Thread.sleep(SLEEP_DURATION_MS);
118+
Assertions.assertThat(isThreadRunning(ABFS_CLIENT_TIMER_THREAD_NAME))
119+
.describedAs("Unexpected thread 'abfs-timer-client' found")
120+
.isEqualTo(false);
121+
}
122+
123+
/**
124+
* Check if a thread with the specified name is running.
125+
*
126+
* @param threadName Name of the thread to check
127+
* @return true if the thread is running, false otherwise
128+
*/
129+
private boolean isThreadRunning(String threadName) {
130+
// Get all threads and their stack traces
131+
Map<Thread, StackTraceElement[]> allThreads = Thread.getAllStackTraces();
132+
133+
// Check if any thread has the specified name
134+
for (Thread thread : allThreads.keySet()) {
135+
if (thread.getName().equals(threadName)) {
136+
return true;
137+
}
138+
}
139+
return false;
140+
}
141+
}

0 commit comments

Comments
 (0)