-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FlightRPC][JAVA][Draft] Bearer token refresh interceptor #5
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
package org.apache.arrow.flight; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add Apache header. |
||
|
||
import io.grpc.CallOptions; | ||
import io.grpc.Channel; | ||
import io.grpc.ClientCall; | ||
import io.grpc.ClientInterceptor; | ||
import io.grpc.ForwardingClientCall; | ||
import io.grpc.ForwardingClientCallListener; | ||
import io.grpc.Metadata; | ||
import io.grpc.MethodDescriptor; | ||
import io.grpc.Status; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
public class RefreshClientInterceptor implements ClientInterceptor { | ||
@Override | ||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, | ||
io.grpc.CallOptions callOptions, Channel next) { | ||
System.out.println("Intercept the call"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove. |
||
return new RetryClientCall<>(callOptions, next, method); | ||
} | ||
|
||
static class RetryClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> { | ||
|
||
Listener listener; | ||
Metadata metadata; | ||
CallOptions callOptions; | ||
Channel next; | ||
int req; | ||
ReqT msg; | ||
ClientCall call; | ||
MethodDescriptor method; | ||
|
||
public RetryClientCall(CallOptions callOptions, Channel next, MethodDescriptor method) { | ||
this.callOptions = callOptions; | ||
this.next = next; | ||
this.method = method; | ||
} | ||
|
||
@Override | ||
public void start(Listener<RespT> responseListener, Metadata headers) { | ||
this.listener = responseListener; | ||
this.metadata = headers; | ||
|
||
startCall(new CheckingListener()); | ||
|
||
|
||
System.out.println("Run start method from interceptor"); | ||
} | ||
|
||
|
||
|
||
@Override | ||
public void request(int numMessages) { | ||
System.out.println("Run request method from interceptor"); | ||
req += numMessages; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this tracked but never used? |
||
call.request(numMessages); | ||
|
||
} | ||
|
||
@Override | ||
public void cancel(@Nullable String message, @Nullable Throwable cause) { | ||
System.out.println("Run cancel method from interceptor"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this not delegate? |
||
} | ||
|
||
@Override | ||
public void halfClose() { | ||
System.out.println("Run halfClose method from interceptor"); | ||
call.halfClose(); | ||
} | ||
|
||
private void startCall(Listener listener) { | ||
System.out.println("Run startCall method from interceptor"); | ||
|
||
call = next.newCall(method, callOptions); | ||
Metadata headers = new Metadata(); | ||
headers.merge(metadata); | ||
call.start(listener, headers); | ||
} | ||
|
||
@Override | ||
public void sendMessage(ReqT message) { | ||
assert this.msg == null; | ||
this.msg = message; | ||
call.sendMessage(msg); | ||
} | ||
|
||
class CheckingListener extends ForwardingClientCallListener { | ||
Listener<RespT> delegate; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like this shouldn't be mutable state, but it is. Why does it change? Why is null sometimes a legal value but not all times? |
||
|
||
@Override | ||
protected Listener delegate() { | ||
if (delegate == null) { | ||
throw new IllegalStateException(); | ||
} | ||
return delegate; | ||
} | ||
|
||
@Override | ||
public void onReady() { | ||
listener.onReady(); | ||
} | ||
|
||
@Override | ||
public void onHeaders(Metadata headers) { | ||
delegate = listener; | ||
super.onHeaders(headers); | ||
} | ||
|
||
@Override | ||
public void onClose(Status status, Metadata trailers) { | ||
System.out.println("Run close method from listener interceptor"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is difficult to understand what's going on here. |
||
if (delegate != null) { | ||
super.onClose(status, trailers); | ||
return; | ||
} | ||
if (!needToRetry(status, trailers)) { // YOUR CODE HERE | ||
delegate = listener; | ||
super.onClose(status, trailers); | ||
return; | ||
} | ||
start(listener, trailers); // to allow multiple retries | ||
} | ||
} | ||
|
||
private boolean needToRetry(Status status, Metadata trailers) { | ||
return status.getCode().toStatus() == Status.UNAUTHENTICATED; | ||
} | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Newline at EOF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interceptor should be optional based on FlightClient.Builder. How do we propagate how to do the refresh?