Skip to content

Commit

Permalink
Forward metadata in proxy (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jun 24, 2024
1 parent 0773268 commit 180af4b
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 69 deletions.
7 changes: 7 additions & 0 deletions cmd/proxygenerator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ import (
// Temporal Frontend.
type WorkflowServiceProxyOptions struct {
Client workflowservice.WorkflowServiceClient
DisableHeaderForwarding bool
}
type workflowServiceProxyServer struct {
workflowservice.UnimplementedWorkflowServiceServer
client workflowservice.WorkflowServiceClient
disableHeaderForwarding bool
}
// NewWorkflowServiceProxyServer creates a WorkflowServiceServer suitable for registering with a gRPC Server. Requests will
Expand All @@ -68,6 +70,7 @@ type workflowServiceProxyServer struct {
func NewWorkflowServiceProxyServer(options WorkflowServiceProxyOptions) (workflowservice.WorkflowServiceServer, error) {
return &workflowServiceProxyServer{
client: options.Client,
disableHeaderForwarding: options.DisableHeaderForwarding,
}, nil
}
`
Expand Down Expand Up @@ -119,6 +122,10 @@ func generateService(cfg config) error {
counter += 1
}
paramDecl[i] = fmt.Sprintf("%s %s", params[i], types.TypeString(typ, qual))
// Wrap ctx parameter in reqCtx
if params[i] == "ctx" {
params[i] = "s.reqCtx(ctx)"
}
}
fmt.Fprintf(buf, "\nfunc (s *workflowServiceProxyServer) %s(%s) %s {\n", name, strings.Join(paramDecl, ", "), types.TypeString(sig.Results(), qual))
fmt.Fprintf(buf, "\treturn s.client.%s(%s)\n", name, strings.Join(params, ", "))
Expand Down
14 changes: 11 additions & 3 deletions proxy/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -190,16 +191,18 @@ func TestClientInterceptor(t *testing.T) {
type testGRPCServer struct {
workflowservice.UnimplementedWorkflowServiceServer
*grpc.Server
addr string
startWorkflowExecutionRequest *workflowservice.StartWorkflowExecutionRequest
listener net.Listener
addr string
startWorkflowExecutionRequest *workflowservice.StartWorkflowExecutionRequest
startWorkflowExecutionMetadata metadata.MD
}

func startTestGRPCServer() (*testGRPCServer, error) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, err
}
t := &testGRPCServer{Server: grpc.NewServer(), addr: l.Addr().String()}
t := &testGRPCServer{Server: grpc.NewServer(), listener: l, addr: l.Addr().String()}
workflowservice.RegisterWorkflowServiceServer(t.Server, t)
go func() {
if err := t.Serve(l); err != nil {
Expand Down Expand Up @@ -235,6 +238,10 @@ func (t *testGRPCServer) waitUntilServing() error {
return fmt.Errorf("failed waiting, last error: %w", lastErr)
}

func (t *testGRPCServer) Stop() {
t.Server.Stop()
}

func (t *testGRPCServer) GetClusterInfo(
context.Context,
*workflowservice.GetClusterInfoRequest,
Expand All @@ -247,6 +254,7 @@ func (t *testGRPCServer) StartWorkflowExecution(
req *workflowservice.StartWorkflowExecutionRequest,
) (*workflowservice.StartWorkflowExecutionResponse, error) {
t.startWorkflowExecutionRequest = req
t.startWorkflowExecutionMetadata, _ = metadata.FromIncomingContext(ctx)
return &workflowservice.StartWorkflowExecutionResponse{}, nil
}

Expand Down
Loading

0 comments on commit 180af4b

Please sign in to comment.