Skip to content

Commit

Permalink
Ensure BoltConnection does not send signals after summaries are finis…
Browse files Browse the repository at this point in the history
…hed (#1592)
  • Loading branch information
injectives authored Nov 25, 2024
1 parent f6cd08c commit eb0c130
Showing 1 changed file with 60 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -563,94 +563,122 @@ private ResponseHandleImpl(ResponseHandler delegate, int expectedSummaries) {
@Override
public void onError(Throwable throwable) {
if (!(throwable instanceof MessageIgnoredException)) {
runIgnoringError(() -> delegate.onError(throwable));
if (!(throwable instanceof Neo4jException)
|| throwable instanceof ServiceUnavailableException
|| throwable instanceof ProtocolException) {
// assume unrecoverable error, ensure onComplete
expectedSummaries = 1;
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onError(throwable));
if (!(throwable instanceof Neo4jException)
|| throwable instanceof ServiceUnavailableException
|| throwable instanceof ProtocolException) {
// assume unrecoverable error, ensure onComplete
expectedSummaries = 1;
}
handleSummary();
}
handleSummary();
} else {
onIgnored();
}
}

@Override
public void onBeginSummary(BeginSummary summary) {
runIgnoringError(() -> delegate.onBeginSummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onBeginSummary(summary));
handleSummary();
}
}

@Override
public void onRunSummary(RunSummary summary) {
runIgnoringError(() -> delegate.onRunSummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onRunSummary(summary));
handleSummary();
}
}

@Override
public void onRecord(Value[] fields) {
runIgnoringError(() -> delegate.onRecord(fields));
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onRecord(fields));
}
}

@Override
public void onPullSummary(PullSummary summary) {
runIgnoringError(() -> delegate.onPullSummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onPullSummary(summary));
handleSummary();
}
}

@Override
public void onDiscardSummary(DiscardSummary summary) {
runIgnoringError(() -> delegate.onDiscardSummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onDiscardSummary(summary));
handleSummary();
}
}

@Override
public void onCommitSummary(CommitSummary summary) {
runIgnoringError(() -> delegate.onCommitSummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onCommitSummary(summary));
handleSummary();
}
}

@Override
public void onRollbackSummary(RollbackSummary summary) {
runIgnoringError(() -> delegate.onRollbackSummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onRollbackSummary(summary));
handleSummary();
}
}

@Override
public void onResetSummary(ResetSummary summary) {
runIgnoringError(() -> delegate.onResetSummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onResetSummary(summary));
handleSummary();
}
}

@Override
public void onRouteSummary(RouteSummary summary) {
runIgnoringError(() -> delegate.onRouteSummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onRouteSummary(summary));
handleSummary();
}
}

@Override
public void onLogoffSummary(LogoffSummary summary) {
runIgnoringError(() -> delegate.onLogoffSummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onLogoffSummary(summary));
handleSummary();
}
}

@Override
public void onLogonSummary(LogonSummary summary) {
runIgnoringError(() -> delegate.onLogonSummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onLogonSummary(summary));
handleSummary();
}
}

@Override
public void onTelemetrySummary(TelemetrySummary summary) {
runIgnoringError(() -> delegate.onTelemetrySummary(summary));
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(() -> delegate.onTelemetrySummary(summary));
handleSummary();
}
}

@Override
public void onIgnored() {
runIgnoringError(delegate::onIgnored);
handleSummary();
if (!summariesFuture.isDone()) {
runIgnoringError(delegate::onIgnored);
handleSummary();
}
}

@Override
Expand Down

0 comments on commit eb0c130

Please sign in to comment.