Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in SubscriberInputStream. #540

Merged
merged 1 commit into from
Apr 2, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import io.helidon.common.reactive.Flow;

Expand All @@ -31,25 +30,41 @@
*/
class SubscriberInputStream extends InputStream implements Flow.Subscriber<ByteBuffer> {

private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile Flow.Subscription subscription;

// Operations that depend on both the future and the closed flag below must be handled under a lock to avoid race
// conditions where they may interleave, e.g.
//
// 1. read() thread sees closed == false
// 2. onComplete() thread changes closed to true
// 3. onComplete() calls processed.complete(null)
// 4. read() thread replaces processed with new instance
// 5. read() thread loops and blocks forever on processed.get() since the new instance in step 4 will never complete

private volatile CompletableFuture<ByteBuffer> processed = new CompletableFuture<>();
private boolean closed = false;

@Override
public int read() throws IOException {
try {
while (true) {
ByteBuffer currentBuffer = processed.get(); // block until a processing data are available

if (currentBuffer != null && currentBuffer.remaining() > 0) {
// if there is anything to read, then read one byte...
return currentBuffer.get();
} else if (!closed.get()) {
// reinitialize the processed buffer future and request more data
processed = new CompletableFuture<>();
subscription.request(1);
final ByteBuffer currentBuffer = processed.get(); // block until we have a buffer
if (currentBuffer != null) {
if (currentBuffer.remaining() > 0) {
// There's at least one more byte, return it
return currentBuffer.get();
} else {
// We've finished with the current buffer; if we're still open, reinitialize the future
// and request more data
if (reinitializeFuture()) {
subscription.request(1);
} else {
// We're closed
return -1;
}
}
} else {
// else we have read all the data already and the data inflow has completed
// We're closed
return -1;
}
}
Expand All @@ -69,23 +84,32 @@ public void onSubscribe(Flow.Subscription subscription) {

@Override
public void onNext(ByteBuffer item) {
processed.complete(item);
if (item != null) {
processed.complete(item);
}
}

@Override
public void onError(Throwable throwable) {
closed.set(true);
public synchronized void onError(Throwable throwable) {
closed = true;
if (!processed.completeExceptionally(throwable)) { // best effort exception propagation
CompletableFuture<ByteBuffer> cf = new CompletableFuture<>();
final CompletableFuture<ByteBuffer> cf = new CompletableFuture<>();
cf.completeExceptionally(throwable);
processed = cf;
}
}

@Override
public void onComplete() {
closed.set(true);
processed.complete(null); // if not already completed, then complete
public synchronized void onComplete() {
closed = true;
processed.complete(null); // complete if not already done
}

private synchronized boolean reinitializeFuture() {
final boolean open = !closed;
if (open) {
processed = new CompletableFuture<>();
}
return open;
}
}