Skip to content

Commit f151b88

Browse files
author
slfan1989
committed
Merge remote-tracking branch 'origin/YARN-11212' into YARN-11212
2 parents 9d32684 + 3dfb537 commit f151b88

File tree

68 files changed

+587
-255
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+587
-255
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
4646
public static final Logger LOG = LoggerFactory.getLogger(
4747
RetryInvocationHandler.class);
4848

49+
@VisibleForTesting
50+
public static final ThreadLocal<Boolean> SET_CALL_ID_FOR_TEST =
51+
ThreadLocal.withInitial(() -> true);
52+
4953
static class Call {
5054
private final Method method;
5155
private final Object[] args;
@@ -159,7 +163,7 @@ CallReturn invoke() throws Throwable {
159163
}
160164

161165
Object invokeMethod() throws Throwable {
162-
if (isRpc) {
166+
if (isRpc && SET_CALL_ID_FOR_TEST.get()) {
163167
Client.setCallIdAndRetryCount(callId, counters.retries,
164168
retryInvocationHandler.asyncCallHandler);
165169
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public final class CallerContext {
4747
// field names
4848
public static final String CLIENT_IP_STR = "clientIp";
4949
public static final String CLIENT_PORT_STR = "clientPort";
50+
public static final String CLIENT_ID_STR = "clientId";
51+
public static final String CLIENT_CALL_ID_STR = "clientCallId";
5052

5153
/** The caller context.
5254
*

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ public static class CacheEntry implements LightWeightCache.Entry {
5555
/**
5656
* Processing state of the requests.
5757
*/
58-
private static byte INPROGRESS = 0;
59-
private static byte SUCCESS = 1;
60-
private static byte FAILED = 2;
58+
private static final byte INPROGRESS = 0;
59+
private static final byte SUCCESS = 1;
60+
private static final byte FAILED = 2;
6161

6262
private byte state = INPROGRESS;
6363

6464
// Store uuid as two long for better memory utilization
65-
private final long clientIdMsb; // Most signficant bytes
65+
private final long clientIdMsb; // Most significant bytes
6666
private final long clientIdLsb; // Least significant bytes
6767

6868
private final int callId;
@@ -140,8 +140,8 @@ public long getExpirationTime() {
140140

141141
@Override
142142
public String toString() {
143-
return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":"
144-
+ this.callId + ":" + this.state;
143+
return String.format("%s:%s:%s", new UUID(this.clientIdMsb, this.clientIdLsb),
144+
this.callId, this.state);
145145
}
146146
}
147147

@@ -183,7 +183,7 @@ public Object getPayload() {
183183

184184
private final LightWeightGSet<CacheEntry, CacheEntry> set;
185185
private final long expirationTime;
186-
private String cacheName;
186+
private final String cacheName;
187187

188188
private final ReentrantLock lock = new ReentrantLock();
189189

@@ -195,19 +195,19 @@ public Object getPayload() {
195195
*/
196196
public RetryCache(String cacheName, double percentage, long expirationTime) {
197197
int capacity = LightWeightGSet.computeCapacity(percentage, cacheName);
198-
capacity = capacity > MAX_CAPACITY ? capacity : MAX_CAPACITY;
198+
capacity = Math.max(capacity, MAX_CAPACITY);
199199
this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity,
200200
expirationTime, 0);
201201
this.expirationTime = expirationTime;
202202
this.cacheName = cacheName;
203203
this.retryCacheMetrics = RetryCacheMetrics.create(this);
204204
}
205205

206-
private static boolean skipRetryCache() {
206+
private static boolean skipRetryCache(byte[] clientId, int callId) {
207207
// Do not track non RPC invocation or RPC requests with
208208
// invalid callId or clientId in retry cache
209-
return !Server.isRpcInvocation() || Server.getCallId() < 0
210-
|| Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
209+
return !Server.isRpcInvocation() || callId < 0
210+
|| Arrays.equals(clientId, RpcConstants.DUMMY_CLIENT_ID);
211211
}
212212

213213
public void lock() {
@@ -332,43 +332,51 @@ public void addCacheEntryWithPayload(byte[] clientId, int callId,
332332
retryCacheMetrics.incrCacheUpdated();
333333
}
334334

335-
private static CacheEntry newEntry(long expirationTime) {
336-
return new CacheEntry(Server.getClientId(), Server.getCallId(),
335+
private static CacheEntry newEntry(long expirationTime,
336+
byte[] clientId, int callId) {
337+
return new CacheEntry(clientId, callId,
337338
System.nanoTime() + expirationTime);
338339
}
339340

340341
private static CacheEntryWithPayload newEntry(Object payload,
341-
long expirationTime) {
342-
return new CacheEntryWithPayload(Server.getClientId(), Server.getCallId(),
342+
long expirationTime, byte[] clientId, int callId) {
343+
return new CacheEntryWithPayload(clientId, callId,
343344
payload, System.nanoTime() + expirationTime);
344345
}
345346

346347
/**
347348
* Static method that provides null check for retryCache.
348349
* @param cache input Cache.
350+
* @param clientId client id of this request
351+
* @param callId client call id of this request
349352
* @return CacheEntry.
350353
*/
351-
public static CacheEntry waitForCompletion(RetryCache cache) {
352-
if (skipRetryCache()) {
354+
public static CacheEntry waitForCompletion(RetryCache cache,
355+
byte[] clientId, int callId) {
356+
if (skipRetryCache(clientId, callId)) {
353357
return null;
354358
}
355359
return cache != null ? cache
356-
.waitForCompletion(newEntry(cache.expirationTime)) : null;
360+
.waitForCompletion(newEntry(cache.expirationTime,
361+
clientId, callId)) : null;
357362
}
358363

359364
/**
360365
* Static method that provides null check for retryCache.
361366
* @param cache input cache.
362367
* @param payload input payload.
368+
* @param clientId client id of this request
369+
* @param callId client call id of this request
363370
* @return CacheEntryWithPayload.
364371
*/
365372
public static CacheEntryWithPayload waitForCompletion(RetryCache cache,
366-
Object payload) {
367-
if (skipRetryCache()) {
373+
Object payload, byte[] clientId, int callId) {
374+
if (skipRetryCache(clientId, callId)) {
368375
return null;
369376
}
370377
return (CacheEntryWithPayload) (cache != null ? cache
371-
.waitForCompletion(newEntry(payload, cache.expirationTime)) : null);
378+
.waitForCompletion(newEntry(payload, cache.expirationTime,
379+
clientId, callId)) : null);
372380
}
373381

374382
public static void setState(CacheEntry e, boolean success) {

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ public void setup() {
5050
static class TestServer {
5151
AtomicInteger retryCount = new AtomicInteger();
5252
AtomicInteger operationCount = new AtomicInteger();
53-
private RetryCache retryCache = new RetryCache("TestRetryCache", 1,
54-
100 * 1000 * 1000 * 1000L);
53+
private final RetryCache retryCache = new RetryCache(
54+
"TestRetryCache", 1, 100 * 1000 * 1000 * 1000L);
5555

5656
/**
5757
* A server method implemented using {@link RetryCache}.
5858
*
5959
* @param input is returned back in echo, if {@code success} is true.
60-
* @param failureOuput returned on failure, if {@code success} is false.
60+
* @param failureOutput returned on failure, if {@code success} is false.
6161
* @param methodTime time taken by the operation. By passing smaller/larger
6262
* value one can simulate an operation that takes short/long time.
6363
* @param success whether this operation completes successfully or not
@@ -67,7 +67,7 @@ static class TestServer {
6767
int echo(int input, int failureOutput, long methodTime, boolean success)
6868
throws InterruptedException {
6969
CacheEntryWithPayload entry = RetryCache.waitForCompletion(retryCache,
70-
null);
70+
null, Server.getClientId(), Server.getCallId());
7171
if (entry != null && entry.isSuccess()) {
7272
System.out.println("retryCount incremented " + retryCount.get());
7373
retryCount.incrementAndGet();
@@ -173,16 +173,13 @@ public void testOperations(final int input, final int numberOfThreads,
173173
final int failureOutput = input + 1;
174174
ExecutorService executorService = Executors
175175
.newFixedThreadPool(numberOfThreads);
176-
List<Future<Integer>> list = new ArrayList<Future<Integer>>();
176+
List<Future<Integer>> list = new ArrayList<>();
177177
for (int i = 0; i < numberOfThreads; i++) {
178-
Callable<Integer> worker = new Callable<Integer>() {
179-
@Override
180-
public Integer call() throws Exception {
181-
Server.getCurCall().set(call);
182-
Assert.assertEquals(Server.getCurCall().get(), call);
183-
int randomPause = pause == 0 ? pause : r.nextInt(pause);
184-
return testServer.echo(input, failureOutput, randomPause, success);
185-
}
178+
Callable<Integer> worker = () -> {
179+
Server.getCurCall().set(call);
180+
Assert.assertEquals(Server.getCurCall().get(), call);
181+
int randomPause = pause == 0 ? pause : r.nextInt(pause);
182+
return testServer.echo(input, failureOutput, randomPause, success);
186183
};
187184
Future<Integer> submit = executorService.submit(worker);
188185
list.add(submit);

hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,27 @@
1818

1919
#include "common/util_c.h"
2020
#include "expect.h"
21-
#include "hdfs/hdfs.h"
2221
#include "hdfspp/hdfs_ext.h"
2322
#include "native_mini_dfs.h"
2423
#include "os/thread.h"
2524
#include "x-platform/c-api/syscall.h"
25+
#include "hdfs/hdfs.h"
2626

2727
#include <errno.h>
2828
#include <inttypes.h>
29-
#include <pwd.h>
3029
#include <stdint.h>
3130
#include <stdio.h>
3231
#include <stdlib.h>
3332
#include <string.h>
34-
#include <sys/socket.h>
3533
#include <sys/types.h>
36-
#include <sys/wait.h>
3734
#include <unistd.h>
3835

36+
#ifndef WIN32
37+
#include <sys/socket.h>
38+
#include <sys/wait.h>
39+
#include <pwd.h>
40+
#endif
41+
3942
#define TO_STR_HELPER(X) #X
4043
#define TO_STR(X) TO_STR_HELPER(X)
4144

@@ -197,7 +200,7 @@ static int fileEventCallback1(const char * event, const char * cluster, const ch
197200
if (randomErrRatioStr) randomErrRatio = (int64_t)atoi(randomErrRatioStr);
198201
if (randomErrRatio == 0) return DEBUG_SIMULATE_ERROR;
199202
else if (randomErrRatio < 0) return LIBHDFSPP_EVENT_OK;
200-
return random() % randomErrRatio == 0 ? DEBUG_SIMULATE_ERROR : LIBHDFSPP_EVENT_OK;
203+
return rand() % randomErrRatio == 0 ? DEBUG_SIMULATE_ERROR : LIBHDFSPP_EVENT_OK;
201204
}
202205

203206
static int fileEventCallback2(const char * event, const char * cluster, const char * file, int64_t value, int64_t cookie)
@@ -235,7 +238,7 @@ static int doTestHdfsMiniStress(struct tlhThreadInfo *ti, int randomErr)
235238
EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file));
236239
file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0);
237240
EXPECT_NONNULL(file);
238-
seekPos = (((double)random()) / RAND_MAX) * (fileInfo->mSize - expected);
241+
seekPos = (((double)rand()) / RAND_MAX) * (fileInfo->mSize - expected);
239242
seekPos = (seekPos / expected) * expected;
240243
ret = hdfsSeek(ti->hdfs, file, seekPos);
241244
if (ret < 0) {

hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/os/windows/unistd.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
/* On Windows, unistd.h does not exist, so manually define what we need. */
2323

2424
#include <process.h> /* Declares getpid(). */
25-
#include <windows.h>
25+
#include <Windows.h>
2626

2727
/* Re-route sleep to Sleep, converting units from seconds to milliseconds. */
2828
#define sleep(seconds) Sleep((seconds) * 1000)

hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
* limitations under the License.
1717
*/
1818

19+
#include "fs/filehandle.h"
20+
1921
#include "hdfspp/hdfspp.h"
2022
#include "hdfspp/hdfs_ext.h"
2123

2224
#include "common/hdfs_configuration.h"
2325
#include "common/configuration_loader.h"
2426
#include "common/logging.h"
2527
#include "fs/filesystem.h"
26-
#include "fs/filehandle.h"
2728
#include "x-platform/utils.h"
2829
#include "x-platform/syscall.h"
2930

hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/fsinfo.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <algorithm>
2222
#include <sstream>
2323
#include <iomanip>
24+
#include <algorithm>
2425

2526
namespace hdfs {
2627

hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
#ifndef LIB_COMMON_LOGGING_H_
2020
#define LIB_COMMON_LOGGING_H_
2121

22-
#include <boost/asio/ip/tcp.hpp>
23-
2422
#include "hdfspp/log.h"
2523

24+
#include <boost/asio/ip/tcp.hpp>
25+
2626
#include <sstream>
2727
#include <mutex>
2828
#include <memory>

hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <utility>
2727
#include <future>
2828

29+
#include <boost/system/error_code.hpp>
2930

3031
namespace hdfs {
3132

0 commit comments

Comments
 (0)