From ad935ddc515feb7a8712e2cca52f3682504c5c33 Mon Sep 17 00:00:00 2001 From: Timothy Smyth Date: Thu, 8 Jul 2021 19:01:31 -0400 Subject: [PATCH] thrift: Implement client "StreamClient" to support thriftrw streaming protocol for outbound requests --- encoding/thrift/outbound.go | 2 +- encoding/thrift/stream_outbound.go | 221 ++++++++++++++++++++++++++++- 2 files changed, 217 insertions(+), 6 deletions(-) diff --git a/encoding/thrift/outbound.go b/encoding/thrift/outbound.go index 6ec427120a..c6c9399c39 100644 --- a/encoding/thrift/outbound.go +++ b/encoding/thrift/outbound.go @@ -157,7 +157,7 @@ func (c thriftClient) Call(ctx context.Context, reqBody envelope.Enveloper, opts // optimization for avoiding additional buffer copy as tchannel outbound // already decodes the body into io.ReaderAt compatible type - // thrift deserialiser reads sets, maps, and lists lazilly which makes + // thrift deserializer reads sets, maps, and lists lazily which makes // buffer pool unusable as response handling is out of scope of this method if body, ok := tres.Body.(io.ReaderAt); ok { bodyReader = body diff --git a/encoding/thrift/stream_outbound.go b/encoding/thrift/stream_outbound.go index d5462af98a..023e8313f0 100644 --- a/encoding/thrift/stream_outbound.go +++ b/encoding/thrift/stream_outbound.go @@ -1,20 +1,231 @@ package thrift import ( + "bytes" "context" + "io" - "go.uber.org/thriftrw/envelope/stream" - pstream "go.uber.org/thriftrw/protocol/stream" + envelope "go.uber.org/thriftrw/envelope/stream" + "go.uber.org/thriftrw/protocol/binary" + "go.uber.org/thriftrw/protocol/stream" + "go.uber.org/thriftrw/wire" "go.uber.org/yarpc" + encodingapi "go.uber.org/yarpc/api/encoding" "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/encoding/thrift/internal" + "go.uber.org/yarpc/pkg/encoding" + "go.uber.org/yarpc/pkg/errors" + "go.uber.org/yarpc/pkg/procedure" ) -// StreamClient is a generic Thrift client. It speaks in raw Thrift payloads. +// StreamClient is a generic Thrift client for stream encoding/decoding. +// It speaks in raw Thrift payloads. // // Users should use the client generated by the code generator rather than // using this directly. type StreamClient interface { // Call the given Thrift method. - Call(ctx context.Context, reqBody stream.Enveloper, opts ...yarpc.CallOption) (pstream.Reader, error) - CallOneway(ctx context.Context, reqBody stream.Enveloper, opts ...yarpc.CallOption) (transport.Ack, error) + Call(ctx context.Context, reqBody envelope.Enveloper, opts ...yarpc.CallOption) (stream.Reader, error) + CallOneway(ctx context.Context, reqBody envelope.Enveloper, opts ...yarpc.CallOption) (transport.Ack, error) +} + +// NewStreamClient creates a new Thrift client. +func NewStreamClient(c Config, opts ...ClientOption) StreamClient { + // Code generated for Thrift client instantiation will probably be something + // like this: + // + // func New(cc transport.ClientConfig, opts ...ClientOption) *MyServiceClient { + // c := thrift.NewStreamClient(thrift.Config{ + // Service: "MyService", + // ClientConfig: cc, + // Protocol: binary.Default, + // }, opts...) + // return &MyServiceClient{client: c} + // } + // + // So Config is really the internal config as far as consumers of the + // generated client are concerned. + + var cc clientConfig + for _, opt := range opts { + opt.applyClientOption(&cc) + } + + p := stream.Protocol(binary.Default) + if cc.Protocol != nil { + val, ok := cc.Protocol.(stream.Protocol) + if !ok { + panic("yarpc.NewStreamClient expects a Protocol of type stream.Protocol") + } + p = val + } + + svc := c.Service + if cc.ServiceName != "" { + svc = cc.ServiceName + } + + return streamThriftClient{ + p: p, + cc: c.ClientConfig, + thriftService: svc, + Enveloping: cc.Enveloping, + Multiplexed: cc.Multiplexed, + } +} + +type streamThriftClient struct { + cc transport.ClientConfig + p stream.Protocol + + // name of the Thrift service + thriftService string + Enveloping bool + Multiplexed bool +} + +func (c streamThriftClient) Call(ctx context.Context, reqBody envelope.Enveloper, opts ...yarpc.CallOption) (stream.Reader, error) { + // Code generated for Thrift client calls will probably be something like + // this: + // + // func (c *MyServiceClient) someMethod(ctx context.Context, arg1 Arg1Type, arg2 arg2Type, opts ...yarpc.CallOption) (returnValue, error) { + // args := myservice.SomeMethodHelper.Args(arg1, arg2) + // resBody, err := c.client.Call(ctx, args, opts...) + // var result myservice.SomeMethodResult + // if err = result.Decode(resBody); err != nil { + // return nil, err + // } + // success, err := myservice.SomeMethodHelper.UnwrapResponse(&result) + // return success, err + // } + + out := c.cc.GetUnaryOutbound() + + treq, proto, err := c.buildTransportRequest(reqBody) + if err != nil { + return nil, err + } + + call := encodingapi.NewOutboundCall(encoding.FromOptions(opts)...) + ctx, err = call.WriteToRequest(ctx, treq) + if err != nil { + return nil, err + } + + tres, err := out.Call(ctx, treq) + if err != nil { + return nil, err + } + defer tres.Body.Close() + + if _, err = call.ReadFromResponse(ctx, tres); err != nil { + return nil, err + } + + var r io.Reader + // optimization for avoiding additional buffer copy as tchannel outbound + // already decodes the body into io.Reader compatible type + // thrift deserializer reads sets, maps, and lists lazily which makes + // buffer pool unusable as response handling is out of scope of this method + if body, ok := tres.Body.(io.Reader); ok { + r = body + } else { + buf := bytes.NewBuffer(make([]byte, 0, _defaultBufferSize)) + if _, err = buf.ReadFrom(tres.Body); err != nil { + return nil, err + } + r = bytes.NewReader(buf.Bytes()) + } + + sr := proto.Reader(r) + + env, err := sr.ReadEnvelopeBegin() + if err != nil { + return nil, errors.ResponseBodyDecodeError(treq, err) + } + + switch env.Type { + case wire.Reply: + return sr, nil + case wire.Exception: + defer sr.Close() + var exc internal.TApplicationException + if err = exc.Decode(sr); err != nil { + return nil, errors.ResponseBodyDecodeError(treq, err) + } + return nil, thriftException{ + Service: treq.Service, + Procedure: treq.Procedure, + Reason: &exc, + } + default: + sr.Close() + return nil, errors.ResponseBodyDecodeError( + treq, errUnexpectedEnvelopeType(env.Type)) + } + +} + +func (c streamThriftClient) CallOneway(ctx context.Context, reqBody envelope.Enveloper, opts ...yarpc.CallOption) (transport.Ack, error) { + out := c.cc.GetOnewayOutbound() + + treq, _, err := c.buildTransportRequest(reqBody) + if err != nil { + return nil, err + } + + call := encodingapi.NewOutboundCall(encoding.FromOptions(opts)...) + ctx, err = call.WriteToRequest(ctx, treq) + if err != nil { + return nil, err + } + + return out.CallOneway(ctx, treq) +} + +func (c streamThriftClient) buildTransportRequest(reqBody envelope.Enveloper) (*transport.Request, stream.Protocol, error) { + proto := c.p + treq := transport.Request{ + Caller: c.cc.Caller(), + Service: c.cc.Service(), + Encoding: Encoding, + Procedure: procedure.ToName(c.thriftService, reqBody.MethodName()), + } + + envType := reqBody.EnvelopeType() + if envType != wire.Call && envType != wire.OneWay { + return nil, nil, errors.RequestBodyEncodeError( + &treq, errUnexpectedEnvelopeType(envType), + ) + } + + var buffer bytes.Buffer + sw := proto.Writer(&buffer) + defer sw.Close() + + if c.Enveloping { + if err := sw.WriteEnvelopeBegin(stream.EnvelopeHeader{ + Name: reqBody.MethodName(), + Type: reqBody.EnvelopeType(), + SeqID: 1, // don't care + }); err != nil { + return nil, nil, errors.RequestBodyEncodeError(&treq, err) + } + + if err := reqBody.Encode(sw); err != nil { + return nil, nil, errors.RequestBodyEncodeError(&treq, err) + } + + if err := sw.WriteEnvelopeEnd(); err != nil { + return nil, nil, errors.RequestBodyEncodeError(&treq, err) + } + } else { + if err := reqBody.Encode(sw); err != nil { + return nil, nil, errors.RequestBodyEncodeError(&treq, err) + } + } + + treq.Body = &buffer + treq.BodySize = buffer.Len() + return &treq, proto, nil }