diff --git a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java index 4f0702ccf..12308be90 100644 --- a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java +++ b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java @@ -9,6 +9,8 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import javax.annotation.Nonnegative; +import javax.annotation.concurrent.GuardedBy; /** * FIFO buffer for a reader thread and a writer thread to collaborate. @@ -149,6 +151,7 @@ public int receive(ReadableByteChannel ch, int max) throws IOException { /** * Cap to the # of bytes that we can hold. */ + @GuardedBy("lock") private int limit; private final int pageSize; @@ -159,8 +162,12 @@ public int receive(ReadableByteChannel ch, int max) throws IOException { /** * Set to true when the writer closes the write end. + * Close operation requires flushing of buffers, hence this field is synchronized. + * In order to get runtime status, use {@link #closeRequested} */ + @GuardedBy("lock") private boolean closed; + private boolean closeRequested = false; public FifoBuffer(int pageSize, int limit) { this(null,pageSize,limit); @@ -188,6 +195,8 @@ public FifoBuffer(Object lock, int pageSize, int limit) { public void setLimit(int newLimit) { synchronized (lock) { limit = newLimit; + // We resized the buffer, hence read/write threads may be able to proceed + lock.notifyAll(); } } @@ -200,6 +209,7 @@ private Page newPage() { * * @return * -1 if the buffer is closed and there's no more data to read. + * May return non-negative value if the buffer close is requested, but not performed */ public int readable() { synchronized (lock) { @@ -209,11 +219,17 @@ public int readable() { } } + //TODO: Value beyond the limit is actually a bug (JENKINS-37514) /** - * Number of bytes writable + * Number of bytes writable. + * @return Number of bytes we can write to the buffer. + * If the buffer is closed, may return the value beyond the limit (JENKINS-37514) */ + @Nonnegative public int writable() { - return Math.max(0,limit-readable()); + synchronized(lock) { + return Math.max(0,limit-readable()); + } } /** @@ -221,6 +237,7 @@ public int writable() { * no more new data will arrive. * * Note that there still might be a data left in the buffer to be read. + * The method may also return {@code true} when the close operation is actually requested. */ public boolean isClosed() { return closed; @@ -245,8 +262,8 @@ public int send(WritableByteChannel ch) throws IOException { if (read>0) return read; // we've already read some - if (closed) { - releaseRing(); + if (closeRequested) { // Somebody requested the close operation in parallel thread + handleCloseRequest(); return -1; // no more data } return 0; // no data to read @@ -262,6 +279,8 @@ public int send(WritableByteChannel ch) throws IOException { if (sent == 0) // channel filled up return read; } catch (ClosedChannelException e) { + // If the underlying channel is closed, we should close the buffer as well + close(); return -1; // propagate EOF } } @@ -295,6 +314,8 @@ public int writeNonBlock(ByteBuffer buf) { * * @return * number of bytes read, or -1 if the given channel has reached EOF and no further read is possible. + * @exception IOException + * receive error */ public int receive(ReadableByteChannel ch) throws IOException { if (closed) @@ -306,6 +327,13 @@ public int receive(ReadableByteChannel ch) throws IOException { int chunk = writable(); if (chunk==0) return written; // no more space to write + + // If the buffer gets closed before we acquire lock, we are at risk of null "w" and NPE. + // So in such case we just interrupt the receive process + if (closed) { + throw new IOException("closed during the receive() operation"); + } + try { int received = w.receive(ch, chunk); if (received==0) @@ -318,6 +346,8 @@ public int receive(ReadableByteChannel ch) throws IOException { sz += received; written += received; } catch (ClosedChannelException e) { + // If the underlying channel is closed, we should close the buffer as well + close(); if (written == 0) return -1; // propagate EOF return written; } @@ -339,11 +369,14 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I while (len>0) { int chunk; - synchronized (lock) { + final boolean shouldWaitToCleanTheBuffer; + synchronized (lock) { while ((chunk = Math.min(len,writable()))==0) { - if (closed) - throw new IOException("closed during write operation"); - + if (closeRequested) { + handleCloseRequest(); + throw new IOException("closed during write() operation"); + } + // The buffer is full, but we give other threads a chance to cleanup it lock.wait(100); } @@ -355,6 +388,8 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I lock.notifyAll(); } + + // } } @@ -365,6 +400,22 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I * returning EOF signals. */ public void close() { + // Async modification of the field in order to notify other threads that we are about closing this buffer + closeRequested = true; + + // Now perform close operation + handleCloseRequest(); + } + + /** + * This is a close operation, which is guarded by the instance lock. + * Actually this method may be invoked by multiple threads, not only by {@link #close()} when it requests it. + */ + private void handleCloseRequest() { + if (!closeRequested) { + // Do nothing when actually we have no close request + return; + } synchronized (lock) { if (!closed) { closed = true; @@ -463,8 +514,8 @@ public int readNonBlocking(byte[] buf, int start, int len) { if (read>0) return read; // we've already read some - if (closed) { - releaseRing(); + if (closeRequested) { + handleCloseRequest(); return -1; // no more data }