Skip to content

Commit

Permalink
FAB-4850 Orderer reconnect on errors
Browse files Browse the repository at this point in the history
PS 3 Make timeout for orderer waits configurable.
PS 4 Strubs are not reused managedchannel maintains single connection
      no need to warn.
PS 5 Factor out timeout setting

Change-Id: Ice49db47580723bf983a411f253268a7e00c7991
Signed-off-by: rickr <cr22rc@gmail.com>
  • Loading branch information
cr22rc committed Jun 19, 2017
1 parent 7c87f20 commit eccc738
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 116 deletions.
12 changes: 2 additions & 10 deletions src/main/java/org/hyperledger/fabric/sdk/Orderer.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,8 @@ Ab.BroadcastResponse sendTransaction(Common.Envelope transaction) throws Excepti
}

try {
Ab.BroadcastResponse resp = localOrdererClient.sendTransaction(transaction);

return resp;
} catch (TransactionException e) { //For any error lets start with a fresh connection.
ordererClient = null;
throw e;
return localOrdererClient.sendTransaction(transaction);
} catch (Throwable t) {
ordererClient = null;
throw t;
Expand Down Expand Up @@ -165,12 +161,8 @@ DeliverResponse[] sendDeliver(Common.Envelope transaction) throws TransactionExc
}

try {
DeliverResponse[] response = localOrdererClient.sendDeliver(transaction);

return response;
} catch (TransactionException e) { //For any error lets start with a fresh connection.
ordererClient = null;
throw e;
return localOrdererClient.sendDeliver(transaction);
} catch (Throwable t) {
ordererClient = null;
throw t;
Expand Down
265 changes: 159 additions & 106 deletions src/main/java/org/hyperledger/fabric/sdk/OrdererClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,19 @@
*/
class OrdererClient {
private final String channelName;
boolean shutdown = false;
private final ManagedChannelBuilder channelBuilder;
private boolean shutdown = false;
private static final Log logger = LogFactory.getLog(OrdererClient.class);
private ManagedChannel managedChannel;
private ManagedChannel managedChannel = null;
private final String name;
private final String url;

/**
* Construct client for accessing Orderer server using the existing managedChannel.
*/
OrdererClient(Orderer orderer, ManagedChannelBuilder<?> channelBuilder) {
managedChannel = channelBuilder.build();

this.channelBuilder = channelBuilder;
name = orderer.getName();
url = orderer.getUrl();
channelName = orderer.getChannel().getName();
Expand Down Expand Up @@ -87,155 +89,206 @@ public void finalize() {
}

Ab.BroadcastResponse sendTransaction(Common.Envelope envelope) throws Exception {
StreamObserver<Common.Envelope> nso = null;

if (shutdown) {
throw new TransactionException("Orderer client is shutdown");
}

final CountDownLatch finishLatch = new CountDownLatch(1);
AtomicBroadcastGrpc.AtomicBroadcastStub broadcast = AtomicBroadcastGrpc.newStub(managedChannel);
AtomicBroadcastGrpc.AtomicBroadcastBlockingStub bsc = AtomicBroadcastGrpc.newBlockingStub(managedChannel);
bsc.withDeadlineAfter(2, TimeUnit.MINUTES);
ManagedChannel lmanagedChannel = managedChannel;

final Ab.BroadcastResponse[] ret = new Ab.BroadcastResponse[1];
final Throwable[] throwable = new Throwable[] {null};
if (lmanagedChannel == null || lmanagedChannel.isTerminated() || lmanagedChannel.isShutdown()) {

StreamObserver<Ab.BroadcastResponse> so = new StreamObserver<Ab.BroadcastResponse>() {
@Override
public void onNext(Ab.BroadcastResponse resp) {
// logger.info("Got Broadcast response: " + resp);
logger.debug("resp status value: " + resp.getStatusValue() + ", resp: " + resp.getStatus());
ret[0] = resp;
finishLatch.countDown();
lmanagedChannel = channelBuilder.build();
managedChannel = lmanagedChannel;

}
}

try {
final CountDownLatch finishLatch = new CountDownLatch(1);
AtomicBroadcastGrpc.AtomicBroadcastStub broadcast = AtomicBroadcastGrpc.newStub(lmanagedChannel);

final Ab.BroadcastResponse[] ret = new Ab.BroadcastResponse[1];
final Throwable[] throwable = new Throwable[] {null};

StreamObserver<Ab.BroadcastResponse> so = new StreamObserver<Ab.BroadcastResponse>() {
@Override
public void onNext(Ab.BroadcastResponse resp) {
// logger.info("Got Broadcast response: " + resp);
logger.debug("resp status value: " + resp.getStatusValue() + ", resp: " + resp.getStatus());
ret[0] = resp;
finishLatch.countDown();

@Override
public void onError(Throwable t) {
if (!shutdown) {
logger.error(format("Received error on channel %s, orderer %s, url %s, %s",
channelName, name, url, t.getMessage()), t);
}
throwable[0] = t;
finishLatch.countDown();
}

@Override
public void onCompleted() {
logger.warn("onCompleted");
finishLatch.countDown();
}
};
@Override
public void onError(Throwable t) {
if (!shutdown) {
logger.error(format("Received error on channel %s, orderer %s, url %s, %s",
channelName, name, url, t.getMessage()), t);
}
throwable[0] = t;
finishLatch.countDown();
}

StreamObserver<Common.Envelope> nso = broadcast.broadcast(so);
@Override
public void onCompleted() {
finishLatch.countDown();
}
};

nso.onNext(envelope);
//nso.onCompleted();
nso = broadcast.broadcast(so);

nso.onNext(envelope);

try {
if (!finishLatch.await(2, TimeUnit.MINUTES)) {
TransactionException ste = new TransactionException("Send transactions failed. Reason: timeout");
logger.error("sendTransaction error " + ste.getMessage(), ste);
throw ste;
}

if (throwable[0] != null) {
//get full stack trace
TransactionException ste = new TransactionException("Send transactions failed. Reason: " + throwable[0].getMessage(), throwable[0]);
logger.error("sendTransaction error " + ste.getMessage(), ste);
throw ste;
}
logger.debug("Done waiting for reply! Got:" + ret[0]);

} catch (InterruptedException e) {
logger.error(e);

try {
if (!finishLatch.await(2, TimeUnit.MINUTES)) {
TransactionException ste = new TransactionException("Send transactions failed. Reason: timeout");
logger.error("sendTransaction error " + ste.getMessage(), ste);
throw ste;
}
if (throwable[0] != null) {
//get full stack trace
TransactionException ste = new TransactionException("Send transactions failed. Reason: " + throwable[0].getMessage(), throwable[0]);
logger.error("sendTransaction error " + ste.getMessage(), ste);
throw ste;
}
logger.debug("Done waiting for reply! Got:" + ret[0]);

} catch (InterruptedException e) {
logger.error(e);
return ret[0];
} catch (Throwable t) {
managedChannel = null;
throw t;

}
} finally {

if (null != nso) {

try {
nso.onCompleted();
} catch (Exception e) { //Best effort only report on debug
logger.debug(format("Exception completing sendTransaction with channel %s, name %s, url %s %s",
channelName, name, url, e.getMessage()), e);
}
}

return ret[0];
}
}

public DeliverResponse[] sendDeliver(Common.Envelope envelope) throws TransactionException {
DeliverResponse[] sendDeliver(Common.Envelope envelope) throws TransactionException {

if (shutdown) {
throw new TransactionException("Orderer client is shutdown");
}

final CountDownLatch finishLatch = new CountDownLatch(1);
AtomicBroadcastGrpc.AtomicBroadcastStub broadcast = AtomicBroadcastGrpc.newStub(managedChannel);
AtomicBroadcastGrpc.AtomicBroadcastBlockingStub bsc = AtomicBroadcastGrpc.newBlockingStub(managedChannel);
bsc.withDeadlineAfter(2, TimeUnit.MINUTES);
StreamObserver<Common.Envelope> nso = null;

// final DeliverResponse[] ret = new DeliverResponse[1];
final List<DeliverResponse> retList = new ArrayList<>();
final List<Throwable> throwableList = new ArrayList<>();
// ret[0] = null;
ManagedChannel lmanagedChannel = managedChannel;

StreamObserver<DeliverResponse> so = new StreamObserver<DeliverResponse>() {
boolean done = false;
if (lmanagedChannel == null || lmanagedChannel.isTerminated() || lmanagedChannel.isShutdown()) {

@Override
public void onNext(DeliverResponse resp) {
lmanagedChannel = channelBuilder.build();
managedChannel = lmanagedChannel;

// logger.info("Got Broadcast response: " + resp);
logger.debug("resp status value: " + resp.getStatusValue() + ", resp: " + resp.getStatus() + ", type case: " + resp.getTypeCase());
}

if (done) {
return;
}
try {

AtomicBroadcastGrpc.AtomicBroadcastStub broadcast = AtomicBroadcastGrpc.newStub(lmanagedChannel);

// final DeliverResponse[] ret = new DeliverResponse[1];
final List<DeliverResponse> retList = new ArrayList<>();
final List<Throwable> throwableList = new ArrayList<>();
final CountDownLatch finishLatch = new CountDownLatch(1);

StreamObserver<DeliverResponse> so = new StreamObserver<DeliverResponse>() {
boolean done = false;

if (resp.getTypeCase() == STATUS) {
done = true;
retList.add(0, resp);
@Override
public void onNext(DeliverResponse resp) {

// logger.info("Got Broadcast response: " + resp);
logger.debug("resp status value: " + resp.getStatusValue() + ", resp: " + resp.getStatus() + ", type case: " + resp.getTypeCase());

if (done) {
return;
}

if (resp.getTypeCase() == STATUS) {
done = true;
retList.add(0, resp);

finishLatch.countDown();

} else {
retList.add(resp);
}

}

@Override
public void onError(Throwable t) {
if (!shutdown) {
logger.error(format("Received error on channel %s, orderer %s, url %s, %s",
channelName, name, url, t.getMessage()), t);
}
throwableList.add(t);
finishLatch.countDown();
}

} else {
retList.add(resp);
@Override
public void onCompleted() {
logger.warn("onCompleted");
finishLatch.countDown();
}
};

}
nso = broadcast.deliver(so);
nso.onNext(envelope);
//nso.onCompleted();

@Override
public void onError(Throwable t) {
if (!shutdown) {
logger.error(format("Received error on channel %s, orderer %s, url %s, %s",
channelName, name, url, t.getMessage()), t);
try {
if (!finishLatch.await(2, TimeUnit.MINUTES)) {
TransactionException ex = new TransactionException("sendDeliver time exceeded for orderer");
logger.error(ex.getMessage(), ex);
throw ex;
}
throwableList.add(t);
finishLatch.countDown();
logger.trace("Done waiting for reply!");

} catch (InterruptedException e) {
logger.error(e);
}

@Override
public void onCompleted() {
logger.warn("onCompleted");
finishLatch.countDown();
if (!throwableList.isEmpty()) {
Throwable throwable = throwableList.get(0);
TransactionException e = new TransactionException(throwable.getMessage(), throwable);
logger.error(e.getMessage(), e);
throw e;
}
};

StreamObserver<Common.Envelope> nso = broadcast.deliver(so);
nso.onNext(envelope);
//nso.onCompleted();
return retList.toArray(new DeliverResponse[retList.size()]);
} catch (Throwable t) {
managedChannel = null;
throw t;

try {
if (!finishLatch.await(2, TimeUnit.MINUTES)) {
TransactionException ex = new TransactionException("sendDeliver time exceeded for orderer");
logger.error(ex.getMessage(), ex);
throw ex;
}
logger.trace("Done waiting for reply!");
} finally {
if (null != nso) {

} catch (InterruptedException e) {
logger.error(e);
}
try {
nso.onCompleted();
} catch (Exception e) { //Best effort only report on debug
logger.debug(format("Exception completing sendDeliver with channel %s, name %s, url %s %s",
channelName, name, url, e.getMessage()), e);
}

if (!throwableList.isEmpty()) {
Throwable throwable = throwableList.get(0);
TransactionException e = new TransactionException(throwable.getMessage(), throwable);
logger.error(e.getMessage(), e);
throw e;
}
}

return retList.toArray(new DeliverResponse[retList.size()]);
}

boolean isChannelActive() {
Expand Down

0 comments on commit eccc738

Please sign in to comment.