diff --git a/protocol/binary/stream_envelope.go b/protocol/binary/stream_envelope.go new file mode 100644 index 00000000..d274a6d2 --- /dev/null +++ b/protocol/binary/stream_envelope.go @@ -0,0 +1,51 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package binary + +import ( + "errors" + + "go.uber.org/thriftrw/protocol/stream" +) + +// WriteEnvelopeBegin writes the start of a strict envelope (contains an envelope version). +func (sw *StreamWriter) WriteEnvelopeBegin(eh stream.EnvelopeHeader) error { + return errors.New("not implemented") +} + +// WriteEnvelopeEnd writes the "end" of an envelope. Since there is no ending +// to an envelope, this is a no-op. +func (sw *StreamWriter) WriteEnvelopeEnd() error { + return errors.New("not implemented") +} + +// ReadEnvelopeBegin reads the start of an Apache Thrift envelope. Thrift supports +// two kinds of envelopes: strict, and non-strict. See ReadEnveloped method +// for more information on enveloping. +func (sw *StreamReader) ReadEnvelopeBegin() (stream.EnvelopeHeader, error) { + return stream.EnvelopeHeader{}, errors.New("not implemented") +} + +// ReadEnvelopeEnd reads the "end" of an envelope. Since there is no real +// envelope end, this is a no-op. +func (sw *StreamReader) ReadEnvelopeEnd() error { + return errors.New("not implemented") +} diff --git a/protocol/stream/envelope.go b/protocol/stream/envelope.go new file mode 100644 index 00000000..446a1365 --- /dev/null +++ b/protocol/stream/envelope.go @@ -0,0 +1,49 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package stream + +import ( + "io" + + "go.uber.org/thriftrw/wire" +) + +// RequestReader captures how to read from a request in a streaming fashion. +type RequestReader interface { + // ReadRequest reads off the request envelope (if present) from a Reader + // and returns a stream.Reader to read the remaining un-enveloped request struct. + // This allows a Thrift request handler to transparently read requests + // regardless of whether the caller is configured to submit envelopes. + // The caller specifies the expected EnvelopeType, either OneWay or Unary, + // on which the read asserts the specified envelope is present. + ReadRequest(wire.EnvelopeType, io.Reader) (body Reader, res ResponseWriter, err error) +} + +// ResponseWriter captures how to respond to a request in a streaming fashion. +type ResponseWriter interface { + // WriteResponse writes a response to the Writer with the envelope + // style of the corresponding request, and returns a stream.Writer to write + // remaining un-enveloped response bytes. Once writing of the response is complete, + // whether successful or not (error), users must call Close() on the stream.Writer. + // + // The EnvelopeType should be either wire.Reply or wire.Exception. + WriteResponse(wire.EnvelopeType, io.Writer) (body Writer, err error) +} diff --git a/protocol/stream/stream.go b/protocol/stream/stream.go index ed0b9e01..01d75528 100644 --- a/protocol/stream/stream.go +++ b/protocol/stream/stream.go @@ -43,6 +43,15 @@ type Protocol interface { Reader(r io.Reader) Reader } +// EnvelopeHeader represents the envelope of a response or a request which includes +// metadata about the method, the type of data in the envelope, and the value. +// It is equivalent of `wire.Envelope`, but for streaming purposes. +type EnvelopeHeader struct { + Name string + Type wire.EnvelopeType + SeqID int32 +} + // FieldHeader defines the metadata needed to define the beginning of a field // in a Thrift value. type FieldHeader struct { @@ -95,6 +104,10 @@ type Writer interface { WriteSetEnd() error WriteListBegin(l ListHeader) error WriteListEnd() error + + WriteEnvelopeBegin(eh EnvelopeHeader) error + WriteEnvelopeEnd() error + Close() error } @@ -122,6 +135,9 @@ type Reader interface { ReadMapBegin() (MapHeader, error) ReadMapEnd() error + ReadEnvelopeBegin() (EnvelopeHeader, error) + ReadEnvelopeEnd() error + // Skip skips over the bytes of the wire type and any applicable headers. Skip(w wire.Type) error }