Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JENKINS-25218] - Hardening of FifoBuffer operation logic #132

71 changes: 61 additions & 10 deletions src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import javax.annotation.Nonnegative;
import javax.annotation.concurrent.GuardedBy;

/**
* FIFO buffer for a reader thread and a writer thread to collaborate.
Expand Down Expand Up @@ -149,6 +151,7 @@ public int receive(ReadableByteChannel ch, int max) throws IOException {
/**
* Cap to the # of bytes that we can hold.
*/
@GuardedBy("lock")
private int limit;
private final int pageSize;

Expand All @@ -159,8 +162,12 @@ public int receive(ReadableByteChannel ch, int max) throws IOException {

/**
* Set to true when the writer closes the write end.
* Close operation requires flushing of buffers, hence this field is synchronized.
* In order to get runtime status, use {@link #closeRequested}
*/
@GuardedBy("lock")
private boolean closed;
private boolean closeRequested = false;

public FifoBuffer(int pageSize, int limit) {
this(null,pageSize,limit);
Expand Down Expand Up @@ -188,6 +195,8 @@ public FifoBuffer(Object lock, int pageSize, int limit) {
public void setLimit(int newLimit) {
synchronized (lock) {
limit = newLimit;
// We resized the buffer, hence read/write threads may be able to proceed
lock.notifyAll();
}
}

Expand All @@ -200,6 +209,7 @@ private Page newPage() {
*
* @return
* -1 if the buffer is closed and there's no more data to read.
* May return non-negative value if the buffer close is requested, but not performed
*/
public int readable() {
synchronized (lock) {
Expand All @@ -209,18 +219,25 @@ public int readable() {
}
}

//TODO: Value beyond the limit is actually a bug (JENKINS-37514)
/**
* Number of bytes writable
* Number of bytes writable.
* @return Number of bytes we can write to the buffer.
* If the buffer is closed, may return the value beyond the limit (JENKINS-37514)
*/
@Nonnegative
public int writable() {
return Math.max(0,limit-readable());
synchronized(lock) {
return Math.max(0,limit-readable());
}
}

/**
* Returns true if the write end of the buffer is already closed, and that
* no more new data will arrive.
*
* Note that there still might be a data left in the buffer to be read.
* The method may also return {@code true} when the close operation is actually requested.
*/
public boolean isClosed() {
return closed;
Expand All @@ -245,8 +262,8 @@ public int send(WritableByteChannel ch) throws IOException {

if (read>0) return read; // we've already read some

if (closed) {
releaseRing();
if (closeRequested) { // Somebody requested the close operation in parallel thread
handleCloseRequest();
return -1; // no more data
}
return 0; // no data to read
Expand All @@ -262,6 +279,8 @@ public int send(WritableByteChannel ch) throws IOException {
if (sent == 0) // channel filled up
return read;
} catch (ClosedChannelException e) {
// If the underlying channel is closed, we should close the buffer as well
close();
return -1; // propagate EOF
}
}
Expand Down Expand Up @@ -295,6 +314,8 @@ public int writeNonBlock(ByteBuffer buf) {
*
* @return
* number of bytes read, or -1 if the given channel has reached EOF and no further read is possible.
* @exception IOException
* receive error
*/
public int receive(ReadableByteChannel ch) throws IOException {
if (closed)
Expand All @@ -306,6 +327,13 @@ public int receive(ReadableByteChannel ch) throws IOException {
int chunk = writable();
if (chunk==0)
return written; // no more space to write

// If the buffer gets closed before we acquire lock, we are at risk of null "w" and NPE.
// So in such case we just interrupt the receive process
if (closed) {
throw new IOException("closed during the receive() operation");
}

try {
int received = w.receive(ch, chunk);
if (received==0)
Expand All @@ -318,6 +346,8 @@ public int receive(ReadableByteChannel ch) throws IOException {
sz += received;
written += received;
} catch (ClosedChannelException e) {
// If the underlying channel is closed, we should close the buffer as well
close();
if (written == 0) return -1; // propagate EOF
return written;
}
Expand All @@ -339,11 +369,14 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I
while (len>0) {
int chunk;

synchronized (lock) {
final boolean shouldWaitToCleanTheBuffer;
synchronized (lock) {
while ((chunk = Math.min(len,writable()))==0) {
if (closed)
throw new IOException("closed during write operation");

if (closeRequested) {
handleCloseRequest();
throw new IOException("closed during write() operation");
}
// The buffer is full, but we give other threads a chance to cleanup it
lock.wait(100);
}

Expand All @@ -355,6 +388,8 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I

lock.notifyAll();
}

//
}
}

Expand All @@ -365,6 +400,22 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I
* returning EOF signals.
*/
public void close() {
// Async modification of the field in order to notify other threads that we are about closing this buffer
closeRequested = true;

// Now perform close operation
handleCloseRequest();
}

/**
* This is a close operation, which is guarded by the instance lock.
* Actually this method may be invoked by multiple threads, not only by {@link #close()} when it requests it.
*/
private void handleCloseRequest() {
if (!closeRequested) {
// Do nothing when actually we have no close request
return;
}
synchronized (lock) {
if (!closed) {
closed = true;
Expand Down Expand Up @@ -463,8 +514,8 @@ public int readNonBlocking(byte[] buf, int start, int len) {

if (read>0) return read; // we've already read some

if (closed) {
releaseRing();
if (closeRequested) {
handleCloseRequest();
return -1; // no more data
}

Expand Down