Skip to content

Commit b429c79

Browse files
HADOOP-19712. S3A: Deadlock in EvaluatingStatisticsMap.entryset() (#8006)
EvaluatingStatisticsMap used parallelStream() to process values; this uses a fixed thread pool of the JRE and can sporadically deadlock if there's not enough capacity and the direct/indirect map operations take place in a worker thread of its own. * Reworked how entrySet() and values() work, using .forEach() iterators after reviewing what ConcurrentHashMap does internally; it does a (safe) traverse. * Added EvaluatingStatisticsMap.forEach() implementation which maps the passed in BiConsumer down to the evaluators.forEach, evaluating each value as it goes. * Use that in IOStatisticsBinding.snapshot() code. Contributed by Steve Loughran
1 parent 77d72b6 commit b429c79

File tree

4 files changed

+182
-19
lines changed

4 files changed

+182
-19
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
package org.apache.hadoop.fs.statistics.impl;
2020

2121
import java.io.Serializable;
22+
import java.util.ArrayList;
2223
import java.util.Collection;
24+
import java.util.LinkedHashSet;
25+
import java.util.List;
2326
import java.util.Map;
27+
import java.util.Objects;
2428
import java.util.Set;
2529
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.function.BiConsumer;
2631
import java.util.function.Function;
27-
import java.util.stream.Collectors;
2832

2933
/**
3034
* A map of functions which can be invoked to dynamically
@@ -132,11 +136,10 @@ public Set<String> keySet() {
132136
*/
133137
@Override
134138
public Collection<E> values() {
135-
Set<Entry<String, Function<String, E>>> evalEntries =
136-
evaluators.entrySet();
137-
return evalEntries.parallelStream().map((e) ->
138-
e.getValue().apply(e.getKey()))
139-
.collect(Collectors.toList());
139+
List<E> result = new ArrayList<>(size());
140+
evaluators.forEach((k, f) ->
141+
result.add(f.apply(k)));
142+
return result;
140143
}
141144

142145
/**
@@ -149,22 +152,37 @@ public Map<String, E> snapshot() {
149152

150153
/**
151154
* Creating the entry set forces an evaluation of the functions.
152-
*
155+
* <p>
156+
* Not synchronized, though thread safe.
157+
* <p>
153158
* This is not a snapshot, so if the evaluators actually return
154159
* references to mutable objects (e.g. a MeanStatistic instance)
155160
* then that value may still change.
156161
*
157-
* The evaluation may be parallelized.
158162
* @return an evaluated set of values
159163
*/
160164
@Override
161-
public synchronized Set<Entry<String, E>> entrySet() {
162-
Set<Entry<String, Function<String, E>>> evalEntries =
163-
evaluators.entrySet();
164-
Set<Entry<String, E>> r = evalEntries.parallelStream().map((e) ->
165-
new EntryImpl<>(e.getKey(), e.getValue().apply(e.getKey())))
166-
.collect(Collectors.toSet());
167-
return r;
165+
public Set<Entry<String, E>> entrySet() {
166+
Set<Entry<String, E>> result = new LinkedHashSet<>(size());
167+
evaluators.forEach((key, evaluator) -> {
168+
final E current = evaluator.apply(key);
169+
result.add(new EntryImpl<>(key, current));
170+
});
171+
return result;
172+
}
173+
174+
175+
/**
176+
* Hand down to the foreach iterator of the evaluators, by evaluating as each
177+
* entry is processed and passing that in to the {@code action} consumer.
178+
* @param action consumer of entries.
179+
*/
180+
@Override
181+
public void forEach(final BiConsumer<? super String, ? super E> action) {
182+
BiConsumer<String, Function<String, E>> biConsumer = (key, value) -> {
183+
action.accept(key, value.apply(key));
184+
};
185+
evaluators.forEach(biConsumer);
168186
}
169187

170188
/**
@@ -173,7 +191,7 @@ public synchronized Set<Entry<String, E>> entrySet() {
173191
*/
174192
private static final class EntryImpl<E> implements Entry<String, E> {
175193

176-
private String key;
194+
private final String key;
177195

178196
private E value;
179197

@@ -197,6 +215,20 @@ public E setValue(final E val) {
197215
this.value = val;
198216
return val;
199217
}
218+
219+
@Override
220+
public boolean equals(final Object o) {
221+
if (!(o instanceof Entry)) {
222+
return false;
223+
}
224+
Entry<String, ?> entry = (Entry<String, ?>) o;
225+
return Objects.equals(key, entry.getKey()) && Objects.equals(value, entry.getValue());
226+
}
227+
228+
@Override
229+
public int hashCode() {
230+
return Objects.hashCode(key);
231+
}
200232
}
201233

202234
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,8 @@ private static <E> Map<String, E> copyMap(
170170
// we have to clone the values so that they aren't
171171
// bound to the original values
172172
dest.clear();
173-
source.entrySet()
174-
.forEach(entry ->
175-
dest.put(entry.getKey(), copyFn.apply(entry.getValue())));
173+
source.forEach((key, current) ->
174+
dest.put(key, copyFn.apply(current)));
176175
return dest;
177176
}
178177

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
package org.apache.hadoop.fs.statistics;
2020

21+
import java.util.Collection;
22+
import java.util.Map;
23+
import java.util.Set;
24+
import java.util.concurrent.atomic.LongAdder;
25+
26+
import org.assertj.core.api.Assertions;
2127
import org.junit.jupiter.api.AfterEach;
2228
import org.junit.jupiter.api.BeforeEach;
2329
import org.junit.jupiter.api.Test;
@@ -174,4 +180,59 @@ public void testNegativeCounterIncrementIgnored() throws Throwable {
174180
.isEqualTo(2);
175181
}
176182

183+
@Test
184+
public void testForeach() throws Throwable {
185+
186+
final IOStatisticsStore store = iostatisticsStore()
187+
.withCounters(COUNT, "c1", "c2")
188+
.withGauges(GAUGE)
189+
.withMinimums(MIN)
190+
.withMaximums(MAX)
191+
.withMeanStatistics(MEAN)
192+
.build();
193+
store.setCounter(COUNT, 10);
194+
store.setCounter("c1", 1);
195+
store.setCounter("c2", 2);
196+
197+
// get the counter map, which is evaluated on demand
198+
final Map<String, Long> counters = store.counters();
199+
LongAdder entryCount = new LongAdder();
200+
LongAdder sum = new LongAdder();
201+
202+
// apply the foreach iteration
203+
counters.forEach((k, v) -> {
204+
entryCount.increment();
205+
sum.add(v);
206+
});
207+
Assertions.assertThat(entryCount.longValue())
208+
.describedAs("entry count")
209+
.isEqualTo(3);
210+
Assertions.assertThat(sum.longValue())
211+
.describedAs("sum of values")
212+
.isEqualTo(13);
213+
214+
// keyset is as expected
215+
final Set<String> keys = counters.keySet();
216+
Assertions.assertThat(keys)
217+
.describedAs("keys")
218+
.hasSize(3)
219+
.contains("c1", "c2", COUNT);
220+
221+
// values are as expected
222+
final Collection<Long> values = counters.values();
223+
Assertions.assertThat(values)
224+
.describedAs("values")
225+
.hasSize(3)
226+
.contains(10L, 1L, 2L);
227+
228+
// entries will all be evaluated
229+
final Set<Map.Entry<String, Long>> entries = counters.entrySet();
230+
entryCount.reset();
231+
sum.reset();
232+
entries.forEach(e -> {
233+
entryCount.increment();
234+
sum.add(e.getValue());
235+
});
236+
}
237+
177238
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.statistics.impl;
20+
21+
import java.util.Map;
22+
23+
import org.assertj.core.api.Assertions;
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.apache.hadoop.test.AbstractHadoopTestBase;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
public class TestEvaluatingStatisticsMap extends AbstractHadoopTestBase {
31+
32+
33+
@Test
34+
public void testEvaluatingStatisticsMap() {
35+
EvaluatingStatisticsMap<String> map = new EvaluatingStatisticsMap<>();
36+
37+
Assertions.assertThat(map).isEmpty();
38+
Assertions.assertThat(map.keySet()).isEmpty();
39+
Assertions.assertThat(map.values()).isEmpty();
40+
Assertions.assertThat(map.entrySet()).isEmpty();
41+
42+
// fill the map with the environment vars
43+
final Map<String, String> env = System.getenv();
44+
env.forEach((k, v) -> map.addFunction(k, any -> v));
45+
46+
// verify the keys match
47+
assertThat(map.keySet())
48+
.describedAs("keys")
49+
.containsExactlyInAnyOrderElementsOf(env.keySet());
50+
51+
// and that the values do
52+
assertThat(map.values())
53+
.describedAs("Evaluated values")
54+
.containsExactlyInAnyOrderElementsOf(env.values());
55+
56+
// now assert that this holds for the entryset.
57+
env.forEach((k, v) ->
58+
assertThat(map.get(k))
59+
.describedAs("looked up key %s", k)
60+
.isNotNull()
61+
.isEqualTo(v));
62+
63+
map.forEach((k, v) ->
64+
assertThat(env.get(k))
65+
.describedAs("env var %s", k)
66+
.isNotNull()
67+
.isEqualTo(v));
68+
69+
70+
}
71+
}

0 commit comments

Comments
 (0)