Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FlightRPC][JAVA][Draft] Bearer token refresh interceptor #5

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ public class FlightClient implements AutoCloseable {
this.middleware = middleware;

final ClientInterceptor[] interceptors;
interceptors = new ClientInterceptor[]{authInterceptor, new ClientInterceptorAdapter(middleware)};
interceptors = new ClientInterceptor[]{authInterceptor, new RefreshClientInterceptor(), new ClientInterceptorAdapter(middleware)};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interceptor should be optional based on FlightClient.Builder. How do we propagate how to do the refresh?


// interceptors = new ClientInterceptor[]{authInterceptor, new ClientInterceptorAdapter(middleware)};

// Create a channel with interceptors pre-applied for DoGet and DoPut
this.interceptedChannel = ClientInterceptors.intercept(channel, interceptors);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package org.apache.arrow.flight;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Apache header.


import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;

import javax.annotation.Nullable;

public class RefreshClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
io.grpc.CallOptions callOptions, Channel next) {
System.out.println("Intercept the call");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove.

return new RetryClientCall<>(callOptions, next, method);
}

static class RetryClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {

Listener listener;
Metadata metadata;
CallOptions callOptions;
Channel next;
int req;
ReqT msg;
ClientCall call;
MethodDescriptor method;

public RetryClientCall(CallOptions callOptions, Channel next, MethodDescriptor method) {
this.callOptions = callOptions;
this.next = next;
this.method = method;
}

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
this.listener = responseListener;
this.metadata = headers;

startCall(new CheckingListener());


System.out.println("Run start method from interceptor");
}



@Override
public void request(int numMessages) {
System.out.println("Run request method from interceptor");
req += numMessages;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this tracked but never used?

call.request(numMessages);

}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
System.out.println("Run cancel method from interceptor");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this not delegate?

}

@Override
public void halfClose() {
System.out.println("Run halfClose method from interceptor");
call.halfClose();
}

private void startCall(Listener listener) {
System.out.println("Run startCall method from interceptor");

call = next.newCall(method, callOptions);
Metadata headers = new Metadata();
headers.merge(metadata);
call.start(listener, headers);
}

@Override
public void sendMessage(ReqT message) {
assert this.msg == null;
this.msg = message;
call.sendMessage(msg);
}

class CheckingListener extends ForwardingClientCallListener {
Listener<RespT> delegate;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this shouldn't be mutable state, but it is. Why does it change? Why is null sometimes a legal value but not all times?


@Override
protected Listener delegate() {
if (delegate == null) {
throw new IllegalStateException();
}
return delegate;
}

@Override
public void onReady() {
listener.onReady();
}

@Override
public void onHeaders(Metadata headers) {
delegate = listener;
super.onHeaders(headers);
}

@Override
public void onClose(Status status, Metadata trailers) {
System.out.println("Run close method from listener interceptor");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is difficult to understand what's going on here.
It's especially hard to reason because it's using both recursion and asynchronous execution. This needs alot more explanation.

if (delegate != null) {
super.onClose(status, trailers);
return;
}
if (!needToRetry(status, trailers)) { // YOUR CODE HERE
delegate = listener;
super.onClose(status, trailers);
return;
}
start(listener, trailers); // to allow multiple retries
}
}

private boolean needToRetry(Status status, Metadata trailers) {
return status.getCode().toStatus() == Status.UNAUTHENTICATED;
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newline at EOF