Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,12 @@ public CompletionStage<BoltConnection> clear() {
@Override
public CompletionStage<Void> flush(ResponseHandler handler) {
return delegate.flush(new ResponseHandler() {
private Throwable error;
boolean notifyHandler = true;

@Override
public void onError(Throwable throwable) {
if (error == null) {
error = handledError(throwable);
handler.onError(error);
}
handler.onError(handledError(throwable, notifyHandler));
notifyHandler = false;
}

@Override
Expand Down Expand Up @@ -306,34 +304,36 @@ public boolean telemetrySupported() {
return delegate.telemetrySupported();
}

private Throwable handledError(Throwable receivedError) {
private Throwable handledError(Throwable receivedError, boolean notifyHandler) {
var error = FutureUtil.completionExceptionCause(receivedError);

if (error instanceof ServiceUnavailableException) {
return handledServiceUnavailableException(((ServiceUnavailableException) error));
} else if (error instanceof ClientException) {
return handledClientException(((ClientException) error));
} else if (error instanceof TransientException) {
return handledTransientException(((TransientException) error));
if (error instanceof ServiceUnavailableException exception) {
return handledServiceUnavailableException(exception, notifyHandler);
} else if (error instanceof ClientException exception) {
return handledClientException(exception, notifyHandler);
} else if (error instanceof TransientException exception) {
return handledTransientException(exception, notifyHandler);
} else {
return error;
}
}

private Throwable handledServiceUnavailableException(ServiceUnavailableException e) {
routingTableHandler.onConnectionFailure(serverAddress());
private Throwable handledServiceUnavailableException(ServiceUnavailableException e, boolean notifyHandler) {
if (notifyHandler) {
routingTableHandler.onConnectionFailure(serverAddress());
}
return new SessionExpiredException(format("Server at %s is no longer available", serverAddress()), e);
}

private Throwable handledTransientException(TransientException e) {
private Throwable handledTransientException(TransientException e, boolean notifyHandler) {
var errorCode = e.code();
if (Objects.equals(errorCode, "Neo.TransientError.General.DatabaseUnavailable")) {
if (Objects.equals(errorCode, "Neo.TransientError.General.DatabaseUnavailable") && notifyHandler) {
routingTableHandler.onConnectionFailure(serverAddress());
}
return e;
}

private Throwable handledClientException(ClientException e) {
private Throwable handledClientException(ClientException e, boolean notifyHandler) {
if (isFailureToWrite(e)) {
// The server is unaware of the session mode, so we have to implement this logic in the driver.
// In the future, we might be able to move this logic to the server.
Expand All @@ -349,7 +349,9 @@ private Throwable handledClientException(ClientException e) {
null);
}
case WRITE -> {
routingTableHandler.onWriteFailure(serverAddress());
if (notifyHandler) {
routingTableHandler.onWriteFailure(serverAddress());
}
return new SessionExpiredException(
format("Server at %s no longer accepts writes", serverAddress()));
}
Expand Down