diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java index 9d2de940..52947979 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java @@ -88,7 +88,7 @@ public CompletableFuture campaign(ByteSequence electionName, l execute( () -> stubWithLeader().campaign(request), CampaignResponse::new, - Errors::isRetryable)); + Errors::isSafeRetryMutableRPC)); } @Override @@ -111,7 +111,7 @@ public CompletableFuture proclaim(LeaderKey leaderKey, ByteSeq execute( () -> stubWithLeader().proclaim(request), ProclaimResponse::new, - Errors::isRetryable)); + Errors::isSafeRetryMutableRPC)); } @Override @@ -126,7 +126,7 @@ public CompletableFuture leader(ByteSequence electionName) { execute( () -> stubWithLeader().leader(request), response -> new LeaderResponse(response, namespace), - Errors::isRetryable)); + Errors::isSafeRetryImmutableRPC)); } @Override @@ -162,7 +162,7 @@ public CompletableFuture resign(LeaderKey leaderKey) { execute( () -> stubWithLeader().resign(request), ResignResponse::new, - Errors::isRetryable)); + Errors::isSafeRetryMutableRPC)); } private CompletableFuture wrapConvertException(CompletableFuture future) { diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/Impl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/Impl.java index 5ab23661..b5217ede 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/Impl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/Impl.java @@ -90,8 +90,8 @@ protected CompletableFuture completable( protected CompletableFuture execute( Supplier> supplier, Function resultConvert) { - - return execute(supplier, resultConvert, Errors::isRetryable); + // TODO: in go etcd client lease operations are 'repeatable' + return execute(supplier, resultConvert, Errors::isSafeRetryMutableRPC); } /** diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/KVImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/KVImpl.java index 4baf11e1..f7b4d655 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/KVImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/KVImpl.java @@ -65,7 +65,7 @@ public CompletableFuture put(ByteSequence key, ByteSequence value, return execute( () -> stub.put(Requests.mapPutRequest(key, value, option, namespace)), response -> new PutResponse(response, namespace), - Errors::isRetryable); + Errors::isSafeRetryMutableRPC); } @Override @@ -81,7 +81,7 @@ public CompletableFuture get(ByteSequence key, GetOption option) { return execute( () -> stub.range(Requests.mapRangeRequest(key, option, namespace)), response -> new GetResponse(response, namespace), - Errors::isRetryable); + Errors::isSafeRetryImmutableRPC); } @Override @@ -97,7 +97,7 @@ public CompletableFuture delete(ByteSequence key, DeleteOption o return execute( () -> stub.deleteRange(Requests.mapDeleteRequest(key, option, namespace)), response -> new DeleteResponse(response, namespace), - Errors::isRetryable); + Errors::isSafeRetryMutableRPC); } @Override @@ -116,7 +116,7 @@ public CompletableFuture compact(long rev, CompactOption option return execute( () -> stub.compact(request), CompactResponse::new, - Errors::isRetryable); + Errors::isSafeRetryMutableRPC); } @Override @@ -124,7 +124,7 @@ public Txn txn() { return TxnImpl.newTxn( request -> execute( () -> stub.txn(request), - response -> new TxnResponse(response, namespace), Errors::isRetryable), + response -> new TxnResponse(response, namespace), Errors::isSafeRetryMutableRPC), namespace); } } diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java index 4518082f..d4f0d51f 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java @@ -69,7 +69,7 @@ public CompletableFuture lock(ByteSequence name, long leaseId) { return execute( () -> stubWithLeader().lock(request), response -> new LockResponse(response, namespace), - Errors::isRetryable); + Errors::isSafeRetryMutableRPC); } @Override @@ -83,6 +83,6 @@ public CompletableFuture unlock(ByteSequence lockKey) { return execute( () -> stubWithLeader().unlock(request), UnlockResponse::new, - Errors::isRetryable); + Errors::isSafeRetryMutableRPC); } } diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/support/Errors.java b/jetcd-core/src/main/java/io/etcd/jetcd/support/Errors.java index 2b825aad..27e4a41c 100755 --- a/jetcd-core/src/main/java/io/etcd/jetcd/support/Errors.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/support/Errors.java @@ -26,11 +26,30 @@ public final class Errors { private Errors() { } - public static boolean isRetryable(Status status) { + public static boolean isSafeRetryImmutableRPC(Status status) { + // similar to go etcd client isSafeRetryImmutableRPC + return Status.UNAVAILABLE.getCode().equals(status.getCode()) || isInvalidTokenError(status) || isAuthStoreExpired(status); } + public static boolean isSafeRetryMutableRPC(Status status) { + // similar to go etcd client isSafeRetryMutableRPC + + if (isInvalidTokenError(status) || isAuthStoreExpired(status)) { + return true; + } + if (!Status.UNAVAILABLE.getCode().equals(status.getCode())) { + return false; + } + String desc = status.getDescription(); + if (desc == null) { + return false; + } + + return desc.equals("there is no address available") || desc.equals("there is no connection available"); + } + public static boolean isInvalidTokenError(Throwable e) { Status status = Status.fromThrowable(e); return isInvalidTokenError(status); diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/impl/UtilTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/impl/UtilTest.java index 6ba039ae..f8dd5de4 100644 --- a/jetcd-core/src/test/java/io/etcd/jetcd/impl/UtilTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/impl/UtilTest.java @@ -39,16 +39,29 @@ public void testAuthStoreExpired() { } @Test - public void testAuthErrorIsNotRetryable() { + public void testAuthErrorIsSafeRetryImmutableRPC() { Status authErrorStatus = Status.UNAUTHENTICATED .withDescription("etcdserver: invalid auth token"); Status status = Status.fromThrowable(new StatusException(authErrorStatus)); - assertThat(Errors.isRetryable(status)).isTrue(); + assertThat(Errors.isSafeRetryImmutableRPC(status)).isTrue(); } @Test - public void testUnavailableErrorIsRetryable() { + public void testUnavailableErrorIsSafeRetryImmutableRPC() { Status status = Status.fromThrowable(new StatusException(Status.UNAVAILABLE)); - assertThat(Errors.isRetryable(status)).isTrue(); + assertThat(Errors.isSafeRetryImmutableRPC(status)).isTrue(); + } + + @Test + public void testUnavailableErrorIsSafeRetryMutableRPC() { + Status status = Status.fromThrowable(new StatusException(Status.UNAVAILABLE)); + assertThat(Errors.isSafeRetryMutableRPC(status)).isFalse(); + } + + @Test + public void testNoAddressAvailableIsSafeRetryMutableRPC() { + Status status = Status.UNAVAILABLE + .withDescription("there is no address available"); + assertThat(Errors.isSafeRetryMutableRPC(status)).isTrue(); } }