55import java .io .InputStream ;
66import java .net .InetAddress ;
77import java .util .concurrent .TimeUnit ;
8- import java .util .concurrent .locks .Lock ;
9- import java .util .concurrent .locks .ReentrantLock ;
8+ import java .util .concurrent .atomic .AtomicBoolean ;
109
1110import com .google .common .io .ByteStreams ;
1211import io .grpc .CallOptions ;
@@ -71,21 +70,19 @@ public void close() {
7170 }
7271
7372 private static class CallProxy <ReqT , RespT > {
74- final RequestProxy serverCallListener ;
75- final ResponseProxy clientCallListener ;
73+ final ClientProxy clientProxy ;
74+ final ServerProxy serverProxy ;
7675
7776 CallProxy (ServerCall <ReqT , RespT > serverCall , ClientCall <ReqT , RespT > clientCall ) {
78- serverCallListener = new RequestProxy (clientCall );
79- clientCallListener = new ResponseProxy (serverCall );
77+ clientProxy = new ClientProxy (clientCall );
78+ serverProxy = new ServerProxy (serverCall );
8079 }
8180
82- private class RequestProxy extends ServerCall .Listener <ReqT > {
83- private final Lock clientCallLock = new ReentrantLock ();
81+ private class ClientProxy extends ServerCall .Listener <ReqT > {
8482 private final ClientCall <ReqT , ?> clientCall ;
85- // Hold 'this' lock when accessing
86- private boolean needToRequest ;
83+ private final AtomicBoolean needToRequest = new AtomicBoolean (false );
8784
88- RequestProxy (ClientCall <ReqT , ?> clientCall ) {
85+ ClientProxy (ClientCall <ReqT , ?> clientCall ) {
8986 this .clientCall = clientCall ;
9087 }
9188
@@ -102,47 +99,34 @@ public void onHalfClose() {
10299 @ Override
103100 public void onMessage (ReqT message ) {
104101 clientCall .sendMessage (message );
105- clientCallLock .lock ();
106- try {
107- if (clientCall .isReady ()) {
108- clientCallListener .serverCall .request (1 );
109- } else {
110- // The outgoing call is not ready for more requests. Stop requesting additional data and
111- // wait for it to catch up.
112- needToRequest = true ;
113- }
114- } finally {
115- clientCallLock .unlock ();
102+ if (clientCall .isReady ()) {
103+ serverProxy .serverCall .request (1 );
104+ } else {
105+ // The outgoing call is not ready for more requests. Stop requesting additional data and
106+ // wait for it to catch up.
107+ needToRequest .set (true );
116108 }
117109 }
118110
119111 @ Override
120112 public void onReady () {
121- clientCallListener .onServerReady ();
113+ serverProxy .onServerReady ();
122114 }
123115
124116 // Called from ResponseProxy, which is a different thread than the ServerCall.Listener
125117 // callbacks.
126118 void onClientReady () {
127- clientCallLock .lock ();
128- try {
129- if (needToRequest ) {
130- clientCallListener .serverCall .request (1 );
131- needToRequest = false ;
132- }
133- } finally {
134- clientCallLock .unlock ();
119+ if (needToRequest .compareAndSet (true , false )) {
120+ serverProxy .serverCall .request (1 );
135121 }
136122 }
137123 }
138124
139- private class ResponseProxy extends ClientCall .Listener <RespT > {
140- private final Lock serverCallLock = new ReentrantLock ();
125+ private class ServerProxy extends ClientCall .Listener <RespT > {
141126 private final ServerCall <?, RespT > serverCall ;
142- // Hold 'this' lock when accessing
143- private boolean needToRequest ;
127+ private final AtomicBoolean needToRequest = new AtomicBoolean (false );
144128
145- ResponseProxy (ServerCall <?, RespT > serverCall ) {
129+ ServerProxy (ServerCall <?, RespT > serverCall ) {
146130 this .serverCall = serverCall ;
147131 }
148132
@@ -159,36 +143,25 @@ public void onHeaders(Metadata headers) {
159143 @ Override
160144 public void onMessage (RespT message ) {
161145 serverCall .sendMessage (message );
162- serverCallLock .lock ();
163- try {
164- if (serverCall .isReady ()) {
165- serverCallListener .clientCall .request (1 );
166- } else {
167- // The incoming call is not ready for more responses. Stop requesting additional data
168- // and wait for it to catch up.
169- needToRequest = true ;
170- }
171- } finally {
172- serverCallLock .unlock ();
146+ if (serverCall .isReady ()) {
147+ clientProxy .clientCall .request (1 );
148+ } else {
149+ // The incoming call is not ready for more responses. Stop requesting additional data
150+ // and wait for it to catch up.
151+ needToRequest .set (true );
173152 }
174153 }
175154
176155 @ Override
177156 public void onReady () {
178- serverCallListener .onClientReady ();
157+ clientProxy .onClientReady ();
179158 }
180159
181160 // Called from RequestProxy, which is a different thread than the ClientCall.Listener
182161 // callbacks.
183162 void onServerReady () {
184- serverCallLock .lock ();
185- try {
186- if (needToRequest ) {
187- serverCallListener .clientCall .request (1 );
188- needToRequest = false ;
189- }
190- } finally {
191- serverCallLock .unlock ();
163+ if (needToRequest .compareAndSet (true , false )) {
164+ clientProxy .clientCall .request (1 );
192165 }
193166 }
194167 }
@@ -199,10 +172,10 @@ private class ProxyHandler<ReqT, RespT> implements ServerCallHandler<ReqT, RespT
199172 public ServerCall .Listener <ReqT > startCall (ServerCall <ReqT , RespT > serverCall , Metadata metadata ) {
200173 ClientCall <ReqT , RespT > clientCall = target .newCall (serverCall .getMethodDescriptor (), CallOptions .DEFAULT );
201174 CallProxy <ReqT , RespT > proxy = new CallProxy <>(serverCall , clientCall );
202- clientCall .start (proxy .clientCallListener , metadata );
203- serverCall .request (1 );
175+ clientCall .start (proxy .serverProxy , metadata );
204176 clientCall .request (1 );
205- return proxy .serverCallListener ;
177+ serverCall .request (1 );
178+ return proxy .clientProxy ;
206179 }
207180 }
208181
0 commit comments