Skip to content

Commit

Permalink
stream: Implement Request and Response handling with Enveloping (#526)
Browse files Browse the repository at this point in the history
Based off of the implementation by @usmyth for enveloping, perform reading out a
request as well as providing an appropriate `stream.ReponseWriter`, based on the
enveloping that was included on the request itself.

In both cases of reading out the request as well as providing a
`stream.ResponseWriter`, the interfaces for these needed to change to support
doing so with enveloping.  Since the envelope's begin and end are meant to
encapsulate the request and response body, an object needs to be provided that
understands how to parse, or write out these bodies respectively.

For the request flow, this object implements a `stream.BodyReader` to parse out
the body and should be a thriftrw object that reflects the request's body.  For
the response flow, a `stream.Enveloper` that reflects the response's body and is
also a compatible thriftrw object, must be provided.

Co-authored-by: Timothy Smyth <smyth@uber.com>
Co-authored-by: Abhinav Gupta <abg@uber.com>
  • Loading branch information
3 people authored Aug 11, 2021
1 parent b46f028 commit 09679fc
Show file tree
Hide file tree
Showing 7 changed files with 632 additions and 81 deletions.
13 changes: 0 additions & 13 deletions envelope/stream/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,3 @@
// THE SOFTWARE.

package stream

import (
"go.uber.org/thriftrw/protocol/stream"
"go.uber.org/thriftrw/wire"
)

// Enveloper is the interface implemented by a type that can be written with
// an envelope via a stream.Writer.
type Enveloper interface {
MethodName() string
EnvelopeType() wire.EnvelopeType
Encode(stream.Writer) error
}
35 changes: 19 additions & 16 deletions protocol/binary/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package binary
import (
"fmt"

"go.uber.org/thriftrw/protocol/stream"
"go.uber.org/thriftrw/wire"
)

Expand All @@ -33,39 +34,41 @@ const (

// WriteEnveloped writes enveloped value using the strict envelope.
func (bw *Writer) WriteEnveloped(e wire.Envelope) error {
version := uint32(version1) | uint32(e.Type)

if err := bw.sw.WriteInt32(int32(version)); err != nil {
return err
}

if err := bw.sw.WriteString(e.Name); err != nil {
if err := bw.sw.WriteEnvelopeBegin(
stream.EnvelopeHeader{
Name: e.Name,
Type: e.Type,
SeqID: e.SeqID,
},
); err != nil {
return err
}

if err := bw.sw.WriteInt32(e.SeqID); err != nil {
if err := bw.WriteValue(e.Value); err != nil {
return err
}

return bw.WriteValue(e.Value)
return bw.sw.WriteEnvelopeEnd()
}

// WriteLegacyEnveloped writes enveloped value using the non-strict envelope
// (non-strict lacks an envelope version).
func (bw *Writer) WriteLegacyEnveloped(e wire.Envelope) error {
if err := bw.sw.WriteString(e.Name); err != nil {
return err
}

if err := bw.sw.writeByte(uint8(e.Type)); err != nil {
if err := bw.sw.WriteLegacyEnvelopeBegin(
stream.EnvelopeHeader{
Name: e.Name,
Type: e.Type,
SeqID: e.SeqID,
},
); err != nil {
return err
}

if err := bw.sw.WriteInt32(e.SeqID); err != nil {
if err := bw.WriteValue(e.Value); err != nil {
return err
}

return bw.WriteValue(e.Value)
return bw.sw.WriteLegacyEnvelopeEnd()
}

// ReadEnveloped reads an Apache Thrift envelope
Expand Down
154 changes: 113 additions & 41 deletions protocol/binary/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package binary

import (
"bytes"
"context"
"fmt"
"io"

Expand All @@ -38,6 +40,9 @@ type Protocol struct {
iface.Private
}

var _ stream.Protocol = (*Protocol)(nil)
var _ stream.RequestReader = (*Protocol)(nil)

// Encode the given Value and write the result to the given Writer.
func (*Protocol) Encode(v wire.Value, w io.Writer) error {
writer := BorrowWriter(w)
Expand Down Expand Up @@ -161,54 +166,121 @@ func (p *Protocol) DecodeRequest(et wire.EnvelopeType, r io.ReaderAt) (wire.Valu
return val, NoEnvelopeResponder, err
}

// noEnvelopeResponder responds to a request without an envelope.
type noEnvelopeResponder struct{}
// ReadRequest reads off the request envelope (if present) from an io.Reader,
// populating the provided BodyReader to read off the full request struct,
// asserting the EnvelopeType (either OneWay or Unary) if an envlope exists.
// A ResponseWriter that understands the enveloping used is returned.
//
// This allows a Thrift request handler to transparently read requests
// regardless of whether the caller is configured to submit envelopes.
//
// This is possible because we can distinguish an envelope from a bare request
// struct by looking at the first byte and the length of the message.
//
// 1. A message of length 1 containing only 0x00 can only be an empty struct.
// 0x00 is the type ID for STOP, indicating the end of the struct.
//
// 2. A message of length >1 starting with 0x00 can only be a non-strict
// envelope (not versioned), assuming the message name is less than 16MB long.
// In this case, the first four bytes indicate the length of the method name,
// which is unlikely to overflow into the high byte.
//
// 3. A message of length >1, where the first byte is <0 can only be a strict envelope.
// The MSB indicates that the message is versioned. Reading the first two bytes
// and masking out the MSB indicates the version number.
// At this time, there is only one version.
//
// 4. A message of length >1, where the first byte is >=0 can only be a bare
// struct starting with that field identifier. Valid field identifiers today
// are in the range 0x00-0x0f. There is some chance that a future version of
// the protocol will add more field types, but it is very unlikely that the
// field type will flow into the MSB (128 type identifiers, starting with the
// 15 valid types today).
func (p *Protocol) ReadRequest(
ctx context.Context,
et wire.EnvelopeType,
r io.Reader,
body stream.BodyReader,
) (stream.ResponseWriter, error) {
var buf [2]byte

func (noEnvelopeResponder) EncodeResponse(v wire.Value, t wire.EnvelopeType, w io.Writer) error {
return Default.Encode(v, w)
}
// If we fail to read two bytes, the only possible valid value is the
// empty struct.
if count, _ := r.Read(buf[0:2]); count < 2 {
sr := p.Reader(bytes.NewReader(buf[:count]))
defer sr.Close()
return NoEnvelopeResponder, body.Decode(sr)
}

// NoEnvelopeResponder responds to a request without an envelope.
var NoEnvelopeResponder = &noEnvelopeResponder{}
// Reset the Reader to allow for properly reading the envelope if it
// exists.
r = io.MultiReader(bytes.NewReader(buf[:]), r)
sr := p.Reader(r)
defer sr.Close()

// EnvelopeV0Responder responds to requests with a non-strict (unversioned) envelope.
type EnvelopeV0Responder struct {
Name string
SeqID int32
}
switch {
case buf[0] == 0x00:
// If length > 1, 0x00 is only a valid preamble for a
// non-strict enveloped request.
e, err := p.readEnvelopeHeader(sr, et)
if err != nil {
return NoEnvelopeResponder, err
}

// EncodeResponse writes the response to the writer using a non-strict
// envelope.
func (r EnvelopeV0Responder) EncodeResponse(v wire.Value, t wire.EnvelopeType, w io.Writer) error {
writer := BorrowWriter(w)
err := writer.WriteLegacyEnveloped(wire.Envelope{
Name: r.Name,
Type: t,
SeqID: r.SeqID,
Value: v,
})
ReturnWriter(writer)
return err
}
if err := body.Decode(sr); err != nil {
return NoEnvelopeResponder, err
}

if err := sr.ReadEnvelopeEnd(); err != nil {
return NoEnvelopeResponder, err
}

return &EnvelopeV0Responder{
Name: e.Name,
SeqID: e.SeqID,
}, nil

case buf[0]&0x80 > 0:
// Only strict (versioned) envelopes begin with the most
// significant bit set. This could only be confused for a type
// identifier greater than 127 (beyond the 15 Thrift has at
// time of writing), or a message name longer than 16MB.

e, err := p.readEnvelopeHeader(sr, et)
if err != nil {
return NoEnvelopeResponder, err
}

// EnvelopeV1Responder responds to requests with a strict, version 1 envelope.
type EnvelopeV1Responder struct {
Name string
SeqID int32
if err := body.Decode(sr); err != nil {
return NoEnvelopeResponder, err
}

if err := sr.ReadEnvelopeEnd(); err != nil {
return NoEnvelopeResponder, err
}

return &EnvelopeV1Responder{
Name: e.Name,
SeqID: e.SeqID,
}, nil

default:
// All other patterns are either bare structs or invalid. We
// delegate to the struct decoder to distinguish invalid type
// identifiers, outside the 0-15 range.
return NoEnvelopeResponder, body.Decode(sr)
}
}

// EncodeResponse writes the response to the writer using a strict, version 1
// envelope.
func (r EnvelopeV1Responder) EncodeResponse(v wire.Value, t wire.EnvelopeType, w io.Writer) error {
writer := BorrowWriter(w)
err := writer.WriteEnveloped(wire.Envelope{
Name: r.Name,
Type: t,
SeqID: r.SeqID,
Value: v,
})
ReturnWriter(writer)
return err
func (p *Protocol) readEnvelopeHeader(sr stream.Reader, et wire.EnvelopeType) (stream.EnvelopeHeader, error) {
eh, err := sr.ReadEnvelopeBegin()
if err != nil {
return eh, err
}
if eh.Type != et {
return eh, errUnexpectedEnvelopeType(eh.Type)
}
return eh, err
}

type errUnexpectedEnvelopeType wire.EnvelopeType
Expand Down
Loading

0 comments on commit 09679fc

Please sign in to comment.