Skip to content

Commit 4eb6852

Browse files
committed
HADOOP-19093. IORateLimiter API takes a path.
I intend to pull the hadoop-common side of things out to be a self contained patch Change-Id: I18e32907abccdfa30c28ba5e1f2e15ae58012f69
1 parent 70fe8c4 commit 4eb6852

File tree

9 files changed

+42
-29
lines changed

9 files changed

+42
-29
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORateLimiter.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,17 @@ public interface IORateLimiter {
6868
* If there is not enough space, the permits will be acquired,
6969
* but the subsequent call will block until the capacity has been
7070
* refilled.
71+
* <p>
72+
* The path parameter is used to support stores where there may be different throttling
73+
* under different paths.
7174
* @param operation operation being performed. Must not be null, may be "",
72-
* should be from {@link org.apache.hadoop.fs.statistics.StoreStatisticNames}
73-
* where there is a matching operation.
75+
* should be from {@link org.apache.hadoop.fs.statistics.StoreStatisticNames}
76+
* where there is a matching operation.
77+
* @param path path under which the operations will be initiated.
7478
* @param requestedCapacity capacity to acquire.
75-
* Must be greater than or equal to 0.
79+
* Must be greater than or equal to 0.
7680
* @return time spent waiting for output.
7781
*/
78-
Duration acquireIOCapacity(String operation, int requestedCapacity);
82+
Duration acquireIOCapacity(String operation, final Path path, int requestedCapacity);
7983

8084
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/IORateLimiterSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private IORateLimiterSupport() {
3737
* @return a rate limiter source which has no rate limiting.
3838
*/
3939
public static IORateLimiter unlimited() {
40-
return (operation, requestedCapacity) -> {
40+
return (operation, path, requestedCapacity) -> {
4141
requireNonNull(operation, "operation");
4242
return RateLimitingFactory.unlimitedRate().acquire(requestedCapacity);
4343
};
@@ -50,7 +50,7 @@ public static IORateLimiter unlimited() {
5050
*/
5151
public static IORateLimiter create(int capacityPerSecond) {
5252
final RateLimiting limiting = RateLimitingFactory.create(capacityPerSecond);
53-
return (operation, requestedCapacity) -> {
53+
return (operation, path, requestedCapacity) -> {
5454
requireNonNull(operation, "operation");
5555
return limiting.acquire(requestedCapacity);
5656
};

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestIORateLimiter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ public void testLimitedCapacity() {
111111
@Test
112112
public void testUnlimitedRejectsNegativeCapacity() throws Exception {
113113
intercept(IllegalArgumentException.class, () ->
114-
IORateLimiterSupport.unlimited().acquireIOCapacity("", -1));
114+
IORateLimiterSupport.unlimited().acquireIOCapacity("", new Path("/"), -1));
115115
}
116116

117117
@Test
118118
public void testUnlimitedRejectsNullOperation() throws Exception {
119119
intercept(NullPointerException.class, () ->
120-
IORateLimiterSupport.unlimited().acquireIOCapacity(null, 0));
120+
IORateLimiterSupport.unlimited().acquireIOCapacity(null, new Path("/"), 0));
121121
}
122122

123123
/**
@@ -147,7 +147,7 @@ private static void assertDelayed(final RateLimiting limiter, final int capacity
147147
* @param capacity capacity
148148
*/
149149
private static void assertNotDelayed(IORateLimiter limiter, String op, int capacity) {
150-
assertZeroDuration(capacity, limiter.acquireIOCapacity(op, capacity));
150+
assertZeroDuration(capacity, limiter.acquireIOCapacity(op, new Path("/"), capacity));
151151
}
152152

153153
/**
@@ -158,7 +158,7 @@ private static void assertNotDelayed(IORateLimiter limiter, String op, int capac
158158
* @param capacity capacity
159159
*/
160160
private static void assertDelayed(IORateLimiter limiter, String op, int capacity) {
161-
assertNonZeroDuration(capacity, limiter.acquireIOCapacity(op, capacity));
161+
assertNonZeroDuration(capacity, limiter.acquireIOCapacity(op, new Path("/"), capacity));
162162
}
163163

164164
/**

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,8 @@ public IORateLimiter rateLimiterSource() {
333333
* {@inheritDoc}
334334
*/
335335
@Override
336-
public Duration acquireIOCapacity(final String operation, final int requestedCapacity) {
337-
return rateLimiterSource().acquireIOCapacity(operation, requestedCapacity);
336+
public Duration acquireIOCapacity(final String operation, final Path path, final int requestedCapacity) {
337+
return rateLimiterSource().acquireIOCapacity(operation, new Path("/"), requestedCapacity);
338338
}
339339

340340
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void bindToFileSystem(FileSystem filesystem, Path path) throws IOExceptio
9696

9797
@Override
9898
public FileStatus getFileStatus(Path path) throws IOException {
99-
acquireIOCapacity(OP_GET_FILE_STATUS, GET_FILE_STATUS_CAPACITY);
99+
acquireIOCapacity(OP_GET_FILE_STATUS, new Path("/"), GET_FILE_STATUS_CAPACITY);
100100
return fileSystem.getFileStatus(path);
101101
}
102102

@@ -109,7 +109,7 @@ public FileStatus getFileStatus(Path path) throws IOException {
109109
@SuppressWarnings("deprecation")
110110
@Override
111111
public boolean isFile(Path path) throws IOException {
112-
acquireIOCapacity(OP_IS_FILE, GET_FILE_STATUS_CAPACITY);
112+
acquireIOCapacity(OP_IS_FILE, new Path("/"), GET_FILE_STATUS_CAPACITY);
113113
return fileSystem.isFile(path);
114114
}
115115

@@ -122,42 +122,42 @@ public boolean isFile(Path path) throws IOException {
122122
public boolean delete(Path path, boolean recursive)
123123
throws IOException {
124124
acquireIOCapacity(OP_DELETE,
125-
recursive ? DELETE_FILE_CAPACITY : DELETE_DIR_CAPACITY);
125+
new Path("/"), recursive ? DELETE_FILE_CAPACITY : DELETE_DIR_CAPACITY);
126126
return fileSystem.delete(path, recursive);
127127
}
128128

129129
@Override
130130
public boolean rmdir(final Path path, final int capacity) throws IOException {
131-
acquireIOCapacity(OP_DELETE_DIR, capacity);
131+
acquireIOCapacity(OP_DELETE_DIR, new Path("/"), capacity);
132132
return fileSystem.delete(path, true);
133133
}
134134

135135
@Override
136136
public boolean mkdirs(Path path)
137137
throws IOException {
138-
acquireIOCapacity(OP_MKDIRS, MKDIRS_CAPACITY);
138+
acquireIOCapacity(OP_MKDIRS, new Path("/"), MKDIRS_CAPACITY);
139139
return fileSystem.mkdirs(path);
140140
}
141141

142142
@Override
143143
public boolean renameFile(Path source, Path dest)
144144
throws IOException {
145-
acquireIOCapacity(OP_RENAME, RENAME_CAPACITY);
145+
acquireIOCapacity(OP_RENAME, new Path("/"), RENAME_CAPACITY);
146146
return fileSystem.rename(source, dest);
147147
}
148148

149149
@Override
150150
public RemoteIterator<FileStatus> listStatusIterator(Path path)
151151
throws IOException {
152-
acquireIOCapacity(OP_LIST_STATUS, LIST_CAPACITY);
152+
acquireIOCapacity(OP_LIST_STATUS, new Path("/"), LIST_CAPACITY);
153153
return fileSystem.listStatusIterator(path);
154154
}
155155

156156
@Override
157157
public TaskManifest loadTaskManifest(
158158
JsonSerialization<TaskManifest> serializer,
159159
FileStatus st) throws IOException {
160-
acquireIOCapacity(OP_OPENFILE, 1);
160+
acquireIOCapacity(OP_OPENFILE, new Path("/"), 1);
161161
return TaskManifest.load(serializer, fileSystem, st.getPath(), st);
162162
}
163163

@@ -166,7 +166,7 @@ public <T extends AbstractManifestData<T>> void save(
166166
final T manifestData,
167167
final Path path,
168168
final boolean overwrite) throws IOException {
169-
acquireIOCapacity(OP_CREATE, 1);
169+
acquireIOCapacity(OP_CREATE, new Path("/"), 1);
170170
manifestData.save(fileSystem, path, overwrite);
171171
}
172172

@@ -204,7 +204,7 @@ public void msync(Path path) throws IOException {
204204
// qualify so we can be confident that the FS being synced
205205
// is the one we expect.
206206
fileSystem.makeQualified(path);
207-
acquireIOCapacity(OP_MSYNC, 1);
207+
acquireIOCapacity(OP_MSYNC, new Path("/"), 1);
208208
try {
209209
fileSystem.msync();
210210
} catch (UnsupportedOperationException ignored) {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,8 @@ public Pair<Boolean, Duration> commitSingleFileByRename(
554554
}
555555

556556
// acquire one IO permit
557-
final Duration waitTime = acquireIOCapacity(StoreStatisticNames.OP_RENAME, 1);
557+
final Duration waitTime = acquireIOCapacity(StoreStatisticNames.OP_RENAME,
558+
qualifiedSrcPath, 1);
558559

559560
try {
560561
final boolean recovered = abfsStore.rename(qualifiedSrcPath,
@@ -1677,8 +1678,10 @@ public IOStatistics getIOStatistics() {
16771678
}
16781679

16791680
@Override
1680-
public Duration acquireIOCapacity(final String operation, final int requestedCapacity) {
1681-
return abfsStore.acquireIOCapacity(operation, requestedCapacity);
1681+
public Duration acquireIOCapacity(final String operation,
1682+
final Path path,
1683+
final int requestedCapacity) {
1684+
return abfsStore.acquireIOCapacity(operation, path, requestedCapacity);
16821685
}
16831686

16841687
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2232,7 +2232,7 @@ private void populateRenameRecoveryStatistics(
22322232
* {@inheritDoc}
22332233
*/
22342234
@Override
2235-
public Duration acquireIOCapacity(final String operation, final int requestedCapacity) {
2235+
public Duration acquireIOCapacity(final String operation, final Path path, final int requestedCapacity) {
22362236

22372237
double multiplier;
22382238
int lowCost = 1;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,14 @@
2020

2121
import java.io.IOException;
2222

23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
2326
import org.apache.hadoop.classification.InterfaceAudience;
2427
import org.apache.hadoop.classification.InterfaceStability;
2528
import org.apache.hadoop.conf.Configuration;
2629
import org.apache.hadoop.fs.Path;
30+
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
2731
import org.apache.hadoop.mapreduce.TaskAttemptContext;
2832
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter;
2933
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory;
@@ -41,10 +45,12 @@
4145
@InterfaceStability.Evolving
4246
public class AzureManifestCommitterFactory extends ManifestCommitterFactory {
4347

48+
private static final Logger LOG = LoggerFactory.getLogger(AzureManifestCommitterFactory.class);
49+
4450
/**
4551
* Classname, which can be declared in job configurations.
4652
*/
47-
public static final String NAME = ManifestCommitterFactory.class.getName();
53+
public static final String NAME = AzureManifestCommitterFactory.class.getName();
4854

4955
@Override
5056
public ManifestCommitter createOutputCommitter(final Path outputPath,

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,12 @@ public void testCapacityLimiting() throws Throwable {
198198
// the second assertion fails.
199199
final String operation = OP_LIST_FILES;
200200
// first one has no delay
201-
Assertions.assertThat(limiter.acquireIOCapacity(operation, capacity))
201+
Assertions.assertThat(limiter.acquireIOCapacity(operation, new Path("/"), capacity))
202202
.describedAs("Duration of acquiring %d capacity", capacity)
203203
.isEqualTo(Duration.ZERO);
204204

205205
// second one is delayed
206-
final Duration duration = limiter.acquireIOCapacity(operation, capacity);
206+
final Duration duration = limiter.acquireIOCapacity(operation, new Path("/"), capacity);
207207
describe("Duration of second capacity request of %d: %s", capacity, duration);
208208
Assertions.assertThat(duration)
209209
.describedAs("Duration of acquiring %d capacity", capacity)

0 commit comments

Comments
 (0)