Skip to content

Commit b2aa446

Browse files
authored
Merge branch 'apache:trunk' into YARN-11180
2 parents 515f422 + e994635 commit b2aa446

File tree

62 files changed

+2193
-499
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+2193
-499
lines changed

LICENSE-binary

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ com.aliyun:aliyun-java-sdk-ecs:4.2.0
215215
com.aliyun:aliyun-java-sdk-ram:3.0.0
216216
com.aliyun:aliyun-java-sdk-sts:3.0.0
217217
com.aliyun.oss:aliyun-sdk-oss:3.13.2
218-
com.amazonaws:aws-java-sdk-bundle:1.11.901
218+
com.amazonaws:aws-java-sdk-bundle:1.12.262
219219
com.cedarsoftware:java-util:1.9.0
220220
com.cedarsoftware:json-io:2.5.1
221221
com.fasterxml.jackson.core:jackson-annotations:2.12.7

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,4 +475,18 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
475475
* default hadoop temp dir on local system: {@value}.
476476
*/
477477
public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
478+
479+
/**
480+
* Thread-level IOStats Support.
481+
* {@value}
482+
*/
483+
public static final String THREAD_LEVEL_IOSTATISTICS_ENABLED =
484+
"fs.thread.level.iostatistics.enabled";
485+
486+
/**
487+
* Default value for Thread-level IOStats Support is true.
488+
*/
489+
public static final boolean THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT =
490+
true;
491+
478492
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
5858
import org.apache.hadoop.fs.permission.FsPermission;
5959
import org.apache.hadoop.fs.statistics.IOStatistics;
60+
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
61+
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
6062
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
6163
import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
6264
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
@@ -156,11 +158,19 @@ class LocalFSFileInputStream extends FSInputStream implements
156158
/** Reference to the bytes read counter for slightly faster counting. */
157159
private final AtomicLong bytesRead;
158160

161+
/**
162+
* Thread level IOStatistics aggregator to update in close().
163+
*/
164+
private final IOStatisticsAggregator
165+
ioStatisticsAggregator;
166+
159167
public LocalFSFileInputStream(Path f) throws IOException {
160168
name = pathToFile(f);
161169
fis = new FileInputStream(name);
162170
bytesRead = ioStatistics.getCounterReference(
163171
STREAM_READ_BYTES);
172+
ioStatisticsAggregator =
173+
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator();
164174
}
165175

166176
@Override
@@ -193,9 +203,13 @@ public boolean seekToNewSource(long targetPos) throws IOException {
193203

194204
@Override
195205
public void close() throws IOException {
196-
fis.close();
197-
if (asyncChannel != null) {
198-
asyncChannel.close();
206+
try {
207+
fis.close();
208+
if (asyncChannel != null) {
209+
asyncChannel.close();
210+
}
211+
} finally {
212+
ioStatisticsAggregator.aggregate(ioStatistics);
199213
}
200214
}
201215

@@ -278,6 +292,7 @@ public boolean hasCapability(String capability) {
278292
// new capabilities.
279293
switch (capability.toLowerCase(Locale.ENGLISH)) {
280294
case StreamCapabilities.IOSTATISTICS:
295+
case StreamCapabilities.IOSTATISTICS_CONTEXT:
281296
case StreamCapabilities.VECTOREDIO:
282297
return true;
283298
default:
@@ -407,9 +422,19 @@ final class LocalFSFileOutputStream extends OutputStream implements
407422
STREAM_WRITE_EXCEPTIONS)
408423
.build();
409424

425+
/**
426+
* Thread level IOStatistics aggregator to update in close().
427+
*/
428+
private final IOStatisticsAggregator
429+
ioStatisticsAggregator;
430+
410431
private LocalFSFileOutputStream(Path f, boolean append,
411432
FsPermission permission) throws IOException {
412433
File file = pathToFile(f);
434+
// store the aggregator before attempting any IO.
435+
ioStatisticsAggregator =
436+
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator();
437+
413438
if (!append && permission == null) {
414439
permission = FsPermission.getFileDefault();
415440
}
@@ -436,10 +461,17 @@ private LocalFSFileOutputStream(Path f, boolean append,
436461
}
437462

438463
/*
439-
* Just forward to the fos
464+
* Close the fos; update the IOStatisticsContext.
440465
*/
441466
@Override
442-
public void close() throws IOException { fos.close(); }
467+
public void close() throws IOException {
468+
try {
469+
fos.close();
470+
} finally {
471+
ioStatisticsAggregator.aggregate(ioStatistics);
472+
}
473+
}
474+
443475
@Override
444476
public void flush() throws IOException { fos.flush(); }
445477
@Override
@@ -485,6 +517,7 @@ public boolean hasCapability(String capability) {
485517
// new capabilities.
486518
switch (capability.toLowerCase(Locale.ENGLISH)) {
487519
case StreamCapabilities.IOSTATISTICS:
520+
case StreamCapabilities.IOSTATISTICS_CONTEXT:
488521
return true;
489522
default:
490523
return StoreImplementationUtils.isProbeForSyncable(capability);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ public interface StreamCapabilities {
9393
*/
9494
String ABORTABLE_STREAM = CommonPathCapabilities.ABORTABLE_STREAM;
9595

96+
/**
97+
* Streams that support IOStatistics context and capture thread-level
98+
* IOStatistics.
99+
*/
100+
String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported";
101+
96102
/**
97103
* Capabilities that a stream can support and be queried for.
98104
*/

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.fs.impl;
2020

21+
import java.lang.ref.WeakReference;
2122
import java.util.function.Consumer;
2223
import java.util.function.Function;
2324
import javax.annotation.Nullable;
@@ -48,7 +49,17 @@ public long currentThreadId() {
4849
}
4950

5051
public V setForCurrentThread(V newVal) {
51-
return put(currentThreadId(), newVal);
52+
long id = currentThreadId();
53+
54+
// if the same object is already in the map, just return it.
55+
WeakReference<V> ref = lookup(id);
56+
// Reference value could be set to null. Thus, ref.get() could return
57+
// null. Should be handled accordingly while using the returned value.
58+
if (ref != null && ref.get() == newVal) {
59+
return ref.get();
60+
}
61+
62+
return put(id, newVal);
5263
}
5364

5465
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.statistics;
20+
21+
import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration;
22+
23+
/**
24+
* An interface defined to capture thread-level IOStatistics by using per
25+
* thread context.
26+
* <p>
27+
* The aggregator should be collected in their constructor by statistics-generating
28+
* classes to obtain the aggregator to update <i>across all threads</i>.
29+
* <p>
30+
* The {@link #snapshot()} call creates a snapshot of the statistics;
31+
* <p>
32+
* The {@link #reset()} call resets the statistics in the context so
33+
* that later snapshots will get the incremental data.
34+
*/
35+
public interface IOStatisticsContext extends IOStatisticsSource {
36+
37+
/**
38+
* Get the IOStatisticsAggregator for the context.
39+
*
40+
* @return return the aggregator for the context.
41+
*/
42+
IOStatisticsAggregator getAggregator();
43+
44+
/**
45+
* Capture the snapshot of the context's IOStatistics.
46+
*
47+
* @return IOStatisticsSnapshot for the context.
48+
*/
49+
IOStatisticsSnapshot snapshot();
50+
51+
/**
52+
* Get a unique ID for this context, for logging
53+
* purposes.
54+
*
55+
* @return an ID unique for all contexts in this process.
56+
*/
57+
long getID();
58+
59+
/**
60+
* Reset the context's IOStatistics.
61+
*/
62+
void reset();
63+
64+
/**
65+
* Get the context's IOStatisticsContext.
66+
*
67+
* @return instance of IOStatisticsContext for the context.
68+
*/
69+
static IOStatisticsContext getCurrentIOStatisticsContext() {
70+
return IOStatisticsContextIntegration.getCurrentIOStatisticsContext();
71+
}
72+
73+
/**
74+
* Set the IOStatisticsContext for the current thread.
75+
* @param statisticsContext IOStatistics context instance for the
76+
* current thread. If null, the context is reset.
77+
*/
78+
static void setThreadIOStatisticsContext(
79+
IOStatisticsContext statisticsContext) {
80+
IOStatisticsContextIntegration.setThreadIOStatisticsContext(
81+
statisticsContext);
82+
}
83+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public final class StreamStatisticNames {
4747
public static final String STREAM_READ_ABORTED = "stream_aborted";
4848

4949
/**
50-
* Bytes read from an input stream in read() calls.
50+
* Bytes read from an input stream in read()/readVectored() calls.
5151
* Does not include bytes read and then discarded in seek/close etc.
5252
* These are the bytes returned to the caller.
5353
* Value: {@value}.
@@ -110,6 +110,34 @@ public final class StreamStatisticNames {
110110
public static final String STREAM_READ_OPERATIONS =
111111
"stream_read_operations";
112112

113+
/**
114+
* Count of readVectored() operations in an input stream.
115+
* Value: {@value}.
116+
*/
117+
public static final String STREAM_READ_VECTORED_OPERATIONS =
118+
"stream_read_vectored_operations";
119+
120+
/**
121+
* Count of bytes discarded during readVectored() operation
122+
* in an input stream.
123+
* Value: {@value}.
124+
*/
125+
public static final String STREAM_READ_VECTORED_READ_BYTES_DISCARDED =
126+
"stream_read_vectored_read_bytes_discarded";
127+
128+
/**
129+
* Count of incoming file ranges during readVectored() operation.
130+
* Value: {@value}
131+
*/
132+
public static final String STREAM_READ_VECTORED_INCOMING_RANGES =
133+
"stream_read_vectored_incoming_ranges";
134+
/**
135+
* Count of combined file ranges during readVectored() operation.
136+
* Value: {@value}
137+
*/
138+
public static final String STREAM_READ_VECTORED_COMBINED_RANGES =
139+
"stream_read_vectored_combined_ranges";
140+
113141
/**
114142
* Count of incomplete read() operations in an input stream,
115143
* that is, when the bytes returned were less than that requested.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.statistics.impl;
20+
21+
import org.apache.hadoop.fs.statistics.IOStatistics;
22+
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
23+
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
24+
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
25+
26+
/**
27+
* Empty IOStatistics context which serves no-op for all the operations and
28+
* returns an empty Snapshot if asked.
29+
*
30+
*/
31+
final class EmptyIOStatisticsContextImpl implements IOStatisticsContext {
32+
33+
private static final IOStatisticsContext EMPTY_CONTEXT = new EmptyIOStatisticsContextImpl();
34+
35+
private EmptyIOStatisticsContextImpl() {
36+
}
37+
38+
/**
39+
* Create a new empty snapshot.
40+
* A new one is always created for isolation.
41+
*
42+
* @return a statistics snapshot
43+
*/
44+
@Override
45+
public IOStatisticsSnapshot snapshot() {
46+
return new IOStatisticsSnapshot();
47+
}
48+
49+
@Override
50+
public IOStatisticsAggregator getAggregator() {
51+
return EmptyIOStatisticsStore.getInstance();
52+
}
53+
54+
@Override
55+
public IOStatistics getIOStatistics() {
56+
return EmptyIOStatistics.getInstance();
57+
}
58+
59+
@Override
60+
public void reset() {}
61+
62+
/**
63+
* The ID is always 0.
64+
* As the real context implementation counter starts at 1,
65+
* we are guaranteed to have unique IDs even between them and
66+
* the empty context.
67+
* @return 0
68+
*/
69+
@Override
70+
public long getID() {
71+
return 0;
72+
}
73+
74+
/**
75+
* Get the single instance.
76+
* @return an instance.
77+
*/
78+
static IOStatisticsContext getInstance() {
79+
return EMPTY_CONTEXT;
80+
}
81+
}

0 commit comments

Comments
 (0)