From 13e6ad2bc1b1d85b81f8b88b09ba279f917743b2 Mon Sep 17 00:00:00 2001 From: Oleg Nenashev Date: Thu, 18 Aug 2016 14:39:12 +0200 Subject: [PATCH 1/7] [JENKINS-25218] - nio.FifoBuffer#writable() should be synchronized, since we use non-atomic int --- .../java/org/jenkinsci/remoting/nio/FifoBuffer.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java index 4f0702ccf..2cfedcc4b 100644 --- a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java +++ b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java @@ -9,6 +9,8 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import javax.annotation.Nonnegative; +import javax.annotation.concurrent.GuardedBy; /** * FIFO buffer for a reader thread and a writer thread to collaborate. @@ -149,6 +151,7 @@ public int receive(ReadableByteChannel ch, int max) throws IOException { /** * Cap to the # of bytes that we can hold. */ + @GuardedBy("lock") private int limit; private final int pageSize; @@ -209,11 +212,17 @@ public int readable() { } } + //TODO: Value beyond the limit is actually a bug (JENKINS-37514) /** - * Number of bytes writable + * Number of bytes writable. + * @return Number of bytes we can write to the buffer. + * If the buffer is closed, may return the value beyond the limit (JENKINS-37514) */ + @Nonnegative public int writable() { - return Math.max(0,limit-readable()); + synchronized(lock) { + return Math.max(0,limit-readable()); + } } /** From 072341c7766ff42bf9a181d5c3fd09e8a82b5e75 Mon Sep 17 00:00:00 2001 From: Oleg Nenashev Date: Thu, 18 Aug 2016 15:10:55 +0200 Subject: [PATCH 2/7] [JENKINS-25218] - Implement asynchronous processing of close requests in FifoBuffer --- .../jenkinsci/remoting/nio/FifoBuffer.java | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java index 2cfedcc4b..3a82e7440 100644 --- a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java +++ b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java @@ -162,8 +162,12 @@ public int receive(ReadableByteChannel ch, int max) throws IOException { /** * Set to true when the writer closes the write end. + * Close operation requires flushing of buffers, hence this field is synchronized. + * In order to get runtime status, use {@link #closeRequested} */ + @GuardedBy("lock") private boolean closed; + private boolean closeRequested = false; public FifoBuffer(int pageSize, int limit) { this(null,pageSize,limit); @@ -203,6 +207,7 @@ private Page newPage() { * * @return * -1 if the buffer is closed and there's no more data to read. + * May return non-negative value if the buffer close is requested, but not performed */ public int readable() { synchronized (lock) { @@ -230,6 +235,7 @@ public int writable() { * no more new data will arrive. * * Note that there still might be a data left in the buffer to be read. + * The method may also return {@code true} when the close operation is actually requested. */ public boolean isClosed() { return closed; @@ -254,8 +260,8 @@ public int send(WritableByteChannel ch) throws IOException { if (read>0) return read; // we've already read some - if (closed) { - releaseRing(); + if (closeRequested) { // Somebody requested the close operation in parallel thread + handleCloseRequest(); return -1; // no more data } return 0; // no data to read @@ -350,9 +356,10 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I synchronized (lock) { while ((chunk = Math.min(len,writable()))==0) { - if (closed) + if (closeRequested) { + handleCloseRequest(); throw new IOException("closed during write operation"); - + } lock.wait(100); } @@ -374,6 +381,22 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I * returning EOF signals. */ public void close() { + // Async modification of the field in order to notify other threads that we are about closing this buffer + closeRequested = true; + + // Now perform close operation + handleCloseRequest(); + } + + /** + * This is a close operation, which is guarded by the instance lock. + * Actually this method may be invoked by multiple threads, not only by {@link #close()} when it requests it. + */ + private void handleCloseRequest() { + if (!closeRequested) { + // Do nothing when actually we have no close request + return; + } synchronized (lock) { if (!closed) { closed = true; @@ -472,8 +495,8 @@ public int readNonBlocking(byte[] buf, int start, int len) { if (read>0) return read; // we've already read some - if (closed) { - releaseRing(); + if (closeRequested) { + handleCloseRequest(); return -1; // no more data } From b6e71fa34bbb5e533b6b2991246fa77649bcc01b Mon Sep 17 00:00:00 2001 From: Oleg Nenashev Date: Thu, 18 Aug 2016 15:16:14 +0200 Subject: [PATCH 3/7] [JENKINS-25218] - Prevent NPE when we close the channel in parallel with the FifoBuffer#receive() loop --- src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java index 3a82e7440..bee6780ef 100644 --- a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java +++ b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java @@ -310,6 +310,8 @@ public int writeNonBlock(ByteBuffer buf) { * * @return * number of bytes read, or -1 if the given channel has reached EOF and no further read is possible. + * @exception IOException + * receive error */ public int receive(ReadableByteChannel ch) throws IOException { if (closed) @@ -321,6 +323,13 @@ public int receive(ReadableByteChannel ch) throws IOException { int chunk = writable(); if (chunk==0) return written; // no more space to write + + // If the buffer gets closed before we acquire lock, we are at risk of null "w" and NPE. + // So in such case we just interrupt the receive process + if (closed) { + throw new IOException("closed during the receive() operation"); + } + try { int received = w.receive(ch, chunk); if (received==0) From 8f13f0c9c9bc8c41e5b215ee900a7b3a18daa1bf Mon Sep 17 00:00:00 2001 From: Oleg Nenashev Date: Thu, 18 Aug 2016 15:27:32 +0200 Subject: [PATCH 4/7] [JENKINS-25218] - FifoBuffer resizing should notify read/write threads waiting on the lock --- .../org/jenkinsci/remoting/nio/FifoBuffer.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java index bee6780ef..e5c4527dd 100644 --- a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java +++ b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java @@ -195,6 +195,8 @@ public FifoBuffer(Object lock, int pageSize, int limit) { public void setLimit(int newLimit) { synchronized (lock) { limit = newLimit; + // We resized the buffer, hence read/write threads may be able to proceed + lock.notifyAll(); } } @@ -363,12 +365,15 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I while (len>0) { int chunk; + final boolean shouldWaitToCleanTheBuffer; synchronized (lock) { + if (closeRequested) { + handleCloseRequest(); + throw new IOException("closed during write operation"); + } + while ((chunk = Math.min(len,writable()))==0) { - if (closeRequested) { - handleCloseRequest(); - throw new IOException("closed during write operation"); - } + lock.wait(100); } @@ -380,6 +385,8 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I lock.notifyAll(); } + + // } } From c991f6c4d995b4dd5dbf1fa954e4afe820a399e1 Mon Sep 17 00:00:00 2001 From: Oleg Nenashev Date: Thu, 18 Aug 2016 15:56:18 +0200 Subject: [PATCH 5/7] [JENKINS-25218] - Minor comments --- src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java index e5c4527dd..40f685b45 100644 --- a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java +++ b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java @@ -369,11 +369,11 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I synchronized (lock) { if (closeRequested) { handleCloseRequest(); - throw new IOException("closed during write operation"); + throw new IOException("closed during write() operation"); } while ((chunk = Math.min(len,writable()))==0) { - + // The buffer is full, but we give other threads a chance to cleanup lock.wait(100); } From f1baed3a7db9f704fdcda5afd80f5d439bacf8d8 Mon Sep 17 00:00:00 2001 From: Oleg Nenashev Date: Thu, 18 Aug 2016 15:58:53 +0200 Subject: [PATCH 6/7] [JENKINS-25218] - Automatically close FifoBuffer if the underlying channel is closed --- src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java index 40f685b45..b43112d24 100644 --- a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java +++ b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java @@ -279,6 +279,8 @@ public int send(WritableByteChannel ch) throws IOException { if (sent == 0) // channel filled up return read; } catch (ClosedChannelException e) { + // If the underlying channel is closed, we should close the buffer as well + close(); return -1; // propagate EOF } } @@ -344,6 +346,8 @@ public int receive(ReadableByteChannel ch) throws IOException { sz += received; written += received; } catch (ClosedChannelException e) { + // If the underlying channel is closed, we should close the buffer as well + close(); if (written == 0) return -1; // propagate EOF return written; } @@ -373,7 +377,7 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I } while ((chunk = Math.min(len,writable()))==0) { - // The buffer is full, but we give other threads a chance to cleanup + // The buffer is full, but we give other threads a chance to cleanup it lock.wait(100); } From faf84066f8744bd8534e2bd4f8cd88e1789884ad Mon Sep 17 00:00:00 2001 From: Oleg Nenashev Date: Thu, 18 Aug 2016 16:21:27 +0200 Subject: [PATCH 7/7] [JENKINS-25218] - Revert errorneous code --- .../java/org/jenkinsci/remoting/nio/FifoBuffer.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java index b43112d24..12308be90 100644 --- a/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java +++ b/src/main/java/org/jenkinsci/remoting/nio/FifoBuffer.java @@ -370,13 +370,12 @@ public void write(byte[] buf, int start, int len) throws InterruptedException, I int chunk; final boolean shouldWaitToCleanTheBuffer; - synchronized (lock) { - if (closeRequested) { - handleCloseRequest(); - throw new IOException("closed during write() operation"); - } - + synchronized (lock) { while ((chunk = Math.min(len,writable()))==0) { + if (closeRequested) { + handleCloseRequest(); + throw new IOException("closed during write() operation"); + } // The buffer is full, but we give other threads a chance to cleanup it lock.wait(100); }