Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define enveloping interfaces for Thrift streaming protocol #511

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions protocol/binary/stream_envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package binary

import (
"errors"

"go.uber.org/thriftrw/protocol/stream"
)

// WriteEnvelopeBegin writes the start of a strict envelope (contains an envelope version).
func (sw *StreamWriter) WriteEnvelopeBegin(eh stream.EnvelopeHeader) error {
return errors.New("not implemented")
}

// WriteEnvelopeEnd writes the "end" of an envelope. Since there is no ending
// to an envelope, this is a no-op.
func (sw *StreamWriter) WriteEnvelopeEnd() error {
return errors.New("not implemented")
}

// ReadEnvelopeBegin reads the start of an Apache Thrift envelope. Thrift supports
// two kinds of envelopes: strict, and non-strict. See ReadEnveloped method
// for more information on enveloping.
func (sw *StreamReader) ReadEnvelopeBegin() (stream.EnvelopeHeader, error) {
return stream.EnvelopeHeader{}, errors.New("not implemented")
}

// ReadEnvelopeEnd reads the "end" of an envelope. Since there is no real
// envelope end, this is a no-op.
func (sw *StreamReader) ReadEnvelopeEnd() error {
return errors.New("not implemented")
}
49 changes: 49 additions & 0 deletions protocol/stream/envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package stream

import (
"io"

"go.uber.org/thriftrw/wire"
)

// RequestReader captures how to read from a request in a streaming fashion.
type RequestReader interface {
// ReadRequest reads off the request envelope (if present) from a Reader
// and returns a stream.Reader to read the remaining un-enveloped request struct.
// This allows a Thrift request handler to transparently read requests
// regardless of whether the caller is configured to submit envelopes.
// The caller specifies the expected EnvelopeType, either OneWay or Unary,
// on which the read asserts the specified envelope is present.
ReadRequest(wire.EnvelopeType, io.Reader) (body Reader, res ResponseWriter, err error)
}

// ResponseWriter captures how to respond to a request in a streaming fashion.
type ResponseWriter interface {
// WriteResponse writes a response to the Writer with the envelope
// style of the corresponding request, and returns a stream.Writer to write
// remaining un-enveloped response bytes. Once writing of the response is complete,
// whether successful or not (error), users must call Close() on the stream.Writer.
//
// The EnvelopeType should be either wire.Reply or wire.Exception.
WriteResponse(wire.EnvelopeType, io.Writer) (body Writer, err error)
}
16 changes: 16 additions & 0 deletions protocol/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ type Protocol interface {
Reader(r io.Reader) Reader
}

// EnvelopeHeader represents the envelope of a response or a request which includes
// metadata about the method, the type of data in the envelope, and the value.
// It is equivalent of `wire.Envelope`, but for streaming purposes.
type EnvelopeHeader struct {
Name string
Type wire.EnvelopeType
SeqID int32
}

// FieldHeader defines the metadata needed to define the beginning of a field
// in a Thrift value.
type FieldHeader struct {
Expand Down Expand Up @@ -95,6 +104,10 @@ type Writer interface {
WriteSetEnd() error
WriteListBegin(l ListHeader) error
WriteListEnd() error

WriteEnvelopeBegin(eh EnvelopeHeader) error
usmyth marked this conversation as resolved.
Show resolved Hide resolved
WriteEnvelopeEnd() error

Close() error
}

Expand Down Expand Up @@ -122,6 +135,9 @@ type Reader interface {
ReadMapBegin() (MapHeader, error)
ReadMapEnd() error

ReadEnvelopeBegin() (EnvelopeHeader, error)
ReadEnvelopeEnd() error

// Skip skips over the bytes of the wire type and any applicable headers.
Skip(w wire.Type) error
}