4343import java .util .concurrent .ConcurrentHashMap ;
4444import java .util .concurrent .ConcurrentMap ;
4545import java .util .concurrent .ThreadLocalRandom ;
46+ import java .util .concurrent .atomic .AtomicInteger ;
4647import javax .security .sasl .SaslException ;
4748import org .apache .hadoop .conf .Configuration ;
4849import org .apache .hadoop .hbase .CellScanner ;
@@ -96,6 +97,13 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
9697 justification = "We are always under lock actually" )
9798 private Thread thread ;
9899
100+ // Used for ensuring two reader threads don't run over each other. Should only be used
101+ // in reader thread run() method, to avoid deadlocks with synchronization on BlockingRpcConnection
102+ private final Object readerThreadLock = new Object ();
103+
104+ // Used to suffix the threadName in a way that we can differentiate them in logs/thread dumps.
105+ private final AtomicInteger attempts = new AtomicInteger ();
106+
99107 // connected socket. protected for writing UT.
100108 protected Socket socket = null ;
101109 private DataInputStream in ;
@@ -323,6 +331,17 @@ private synchronized boolean waitForWork() {
323331 if (thread == null ) {
324332 return false ;
325333 }
334+
335+ // If closeConn is called while we are in the readResponse method, it's possible that a new
336+ // call to setupIOStreams comes in and creates a new value for "thread" before readResponse
337+ // finishes. Once readResponse finishes, it will come in here and thread will be non-null
338+ // above, but pointing at a new thread. In that case, we should end to avoid a situation
339+ // where two threads are forever competing for the same socket.
340+ if (!isCurrentThreadExpected ()) {
341+ LOG .debug ("Thread replaced by new connection thread. Ending waitForWork loop." );
342+ return false ;
343+ }
344+
326345 if (!calls .isEmpty ()) {
327346 return true ;
328347 }
@@ -336,20 +355,48 @@ private synchronized boolean waitForWork() {
336355 } catch (InterruptedException e ) {
337356 // Restore interrupt status
338357 Thread .currentThread ().interrupt ();
358+
359+ String msg = "Interrupted while waiting for work" ;
360+
361+ // If we were interrupted by closeConn, it would have set thread to null.
362+ // We are synchronized here and if we somehow got interrupted without setting thread to
363+ // null, we want to make sure the connection is closed since the read thread would be dead.
364+ // Rather than do a null check here, we check if the current thread is the expected thread.
365+ // This guards against the case where a call to setupIOStreams got the synchronized lock
366+ // first after closeConn, thus changing the thread to a new thread.
367+ if (isCurrentThreadExpected ()) {
368+ LOG .debug (msg + ", closing connection" );
369+ closeConn (new InterruptedIOException (msg ));
370+ } else {
371+ LOG .debug (msg );
372+ }
373+
374+ return false ;
339375 }
340376 }
341377 }
342378
343379 @ Override
344380 public void run () {
345381 if (LOG .isTraceEnabled ()) {
346- LOG .trace (threadName + ": starting" );
382+ LOG .trace (" starting" );
347383 }
348- while (waitForWork ()) {
349- readResponse ();
384+
385+ // We have a synchronization here because it's possible in error scenarios for a new
386+ // thread to be started while readResponse is still reading on the socket. We don't want
387+ // two threads to be reading from the same socket/inputstream.
388+ // The below calls can synchronize on "BlockingRpcConnection.this".
389+ // We should not synchronize on readerThreadLock anywhere else, to avoid deadlocks
390+ synchronized (readerThreadLock ) {
391+ if (LOG .isTraceEnabled ()) {
392+ LOG .trace ("started" );
393+ }
394+ while (waitForWork ()) {
395+ readResponse ();
396+ }
350397 }
351398 if (LOG .isTraceEnabled ()) {
352- LOG .trace (threadName + ": stopped" );
399+ LOG .trace (" stopped" );
353400 }
354401 }
355402
@@ -522,7 +569,7 @@ public Boolean run() throws IOException {
522569 }
523570
524571 // start the receiver thread after the socket connection has been set up
525- thread = new Thread (this , threadName );
572+ thread = new Thread (this , threadName + " (attempt: " + attempts . incrementAndGet () + ")" );
526573 thread .setDaemon (true );
527574 thread .start ();
528575 }
@@ -629,7 +676,7 @@ private void writeRequest(Call call) throws IOException {
629676 call .callStats .setRequestSizeBytes (write (this .out , requestHeader , call .param , cellBlock ));
630677 } catch (Throwable t ) {
631678 if (LOG .isTraceEnabled ()) {
632- LOG .trace ("Error while writing {}" , call .toShortString ());
679+ LOG .trace ("Error while writing {}" , call .toShortString (), t );
633680 }
634681 IOException e = IPCUtil .toIOE (t );
635682 closeConn (e );
@@ -716,16 +763,33 @@ private void readResponse() {
716763 // since we expect certain responses to not make it by the specified
717764 // {@link ConnectionId#rpcTimeout}.
718765 if (LOG .isTraceEnabled ()) {
719- LOG .trace ("ignored" , e );
766+ LOG .trace ("ignored ex for call {}" , call , e );
720767 }
721768 } else {
722769 synchronized (this ) {
723- closeConn (e );
770+ // The exception we received may have been caused by another thread closing
771+ // this connection. It's possible that before getting to this point, a new connection was
772+ // created. In that case, it doesn't help and can actually hurt to close again here.
773+ if (isCurrentThreadExpected ()) {
774+ LOG .debug ("Closing connection after error in call {}" , call , e );
775+ closeConn (e );
776+ }
724777 }
725778 }
726779 }
727780 }
728781
782+ /**
783+ * For use in the reader thread, tests if the current reader thread is the one expected to be
784+ * running. When closeConn is called, the reader thread is expected to end. setupIOStreams then
785+ * creates a new thread and updates the thread pointer. At that point, the new thread should be
786+ * the only one running. We use this method to guard against cases where the old thread may be
787+ * erroneously running or closing the connection in error states.
788+ */
789+ private boolean isCurrentThreadExpected () {
790+ return thread == Thread .currentThread ();
791+ }
792+
729793 @ Override
730794 protected synchronized void callTimeout (Call call ) {
731795 // call sender
0 commit comments