Skip to content

Commit

Permalink
ByteStream consumer can write to interface{}
Browse files Browse the repository at this point in the history
* fix(ByteStreamConsumer): may now write into an interface which
  underlying type is []byte or string.

* feat(ByteStreamConsumer): added support to io.ReaderFrom, preferred
  over io.Writer if available
* feat(ByteStreamProducer): added support to io.WriterTo, preferred
  over io.Reader if available

* refact(ByteStreamProducer): removed redundant case "string" and preferred
  the more general reflected case (supports aliased strings)

* test: refactored ByteStream tests
* test: added benchmark for bytestream.Consume

* fixes #167

Signed-off-by: Frederic BIDON <fredbi@yahoo.com>
  • Loading branch information
fredbi committed Dec 12, 2023
1 parent 248b38c commit e9d312a
Show file tree
Hide file tree
Showing 2 changed files with 477 additions and 186 deletions.
131 changes: 91 additions & 40 deletions bytestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@ type byteStreamOpts struct {
Close bool
}

// ByteStreamConsumer creates a consumer for byte streams,
// takes a Writer/BinaryUnmarshaler interface or binary slice by reference,
// and reads from the provided reader
// ByteStreamConsumer creates a consumer for byte streams.
//
// The consumer consumes from a provided reader into the data passed by reference.
//
// Supported output underlying types and interfaces, prioritized in this order:
// - io.ReaderFrom (for maximum control)
// - io.Writer (performs io.Copy)
// - encoding.BinaryUnmarshaler
// - *string
// - *[]byte
func ByteStreamConsumer(opts ...byteStreamOpt) Consumer {
var vals byteStreamOpts
for _, opt := range opts {
Expand All @@ -51,45 +58,70 @@ func ByteStreamConsumer(opts ...byteStreamOpt) Consumer {
if reader == nil {
return errors.New("ByteStreamConsumer requires a reader") // early exit
}
if data == nil {
return errors.New("nil destination for ByteStreamConsumer")
}

closer := defaultCloser
if vals.Close {
if cl, ok := reader.(io.Closer); ok {
if cl, isReaderCloser := reader.(io.Closer); isReaderCloser {
closer = cl.Close
}
}
defer func() {
_ = closer()
}()

if wrtr, ok := data.(io.Writer); ok {
_, err := io.Copy(wrtr, reader)
if readerFrom, isReaderFrom := data.(io.ReaderFrom); isReaderFrom {
_, err := readerFrom.ReadFrom(reader)
return err
}

buf := new(bytes.Buffer)
if writer, isDataWriter := data.(io.Writer); isDataWriter {
_, err := io.Copy(writer, reader)
return err
}

// buffers input before writing to data
var buf bytes.Buffer
_, err := buf.ReadFrom(reader)
if err != nil {
return err
}
b := buf.Bytes()

if bu, ok := data.(encoding.BinaryUnmarshaler); ok {
return bu.UnmarshalBinary(b)
}
switch destinationPointer := data.(type) {
case encoding.BinaryUnmarshaler:
return destinationPointer.UnmarshalBinary(b)
case *any:
switch (*destinationPointer).(type) {
case string:
*destinationPointer = string(b)

return nil

case []byte:
*destinationPointer = b

if data != nil {
if str, ok := data.(*string); ok {
*str = string(b)
return nil
}
}
default:
// check for the underlying type to be pointer to []byte or string,
if ptr := reflect.TypeOf(data); ptr.Kind() != reflect.Ptr {
return errors.New("destination must be a pointer")
}

if t := reflect.TypeOf(data); data != nil && t.Kind() == reflect.Ptr {
v := reflect.Indirect(reflect.ValueOf(data))
if t = v.Type(); t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8 {
t := v.Type()

switch {
case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8:
v.SetBytes(b)
return nil

case t.Kind() == reflect.String:
v.SetString(string(b))
return nil
}
}

Expand All @@ -98,68 +130,87 @@ func ByteStreamConsumer(opts ...byteStreamOpt) Consumer {
})
}

// ByteStreamProducer creates a producer for byte streams,
// takes a Reader/BinaryMarshaler interface or binary slice,
// and writes to a writer (essentially a pipe)
// ByteStreamProducer creates a producer for byte streams.
//
// The producer takes input data then writes to an output writer (essentially as a pipe).
//
// Supported input underlying types and interfaces, prioritized in this order:
// - io.WriterTo (for maximum control)
// - io.Reader (performs io.Copy). A ReadCloser is closed before exiting.
// - encoding.BinaryMarshaler
// - error (writes as a string)
// - []byte
// - string
// - struct, other slices: writes as JSON
func ByteStreamProducer(opts ...byteStreamOpt) Producer {
var vals byteStreamOpts
for _, opt := range opts {
opt(&vals)
}

return ProducerFunc(func(writer io.Writer, data interface{}) error {
if writer == nil {
return errors.New("ByteStreamProducer requires a writer") // early exit
}
if data == nil {
return errors.New("nil destination for ByteStreamProducer")
}

closer := defaultCloser
if vals.Close {
if cl, ok := writer.(io.Closer); ok {
if cl, isWriterCloser := writer.(io.Closer); isWriterCloser {
closer = cl.Close
}
}
defer func() {
_ = closer()
}()

if rc, ok := data.(io.ReadCloser); ok {
if rc, isDataCloser := data.(io.ReadCloser); isDataCloser {
defer rc.Close()
}

if rdr, ok := data.(io.Reader); ok {
_, err := io.Copy(writer, rdr)
switch origin := data.(type) {
case io.WriterTo:
_, err := origin.WriteTo(writer)
return err

case io.Reader:
_, err := io.Copy(writer, origin)
return err
}

if bm, ok := data.(encoding.BinaryMarshaler); ok {
bytes, err := bm.MarshalBinary()
case encoding.BinaryMarshaler:
bytes, err := origin.MarshalBinary()
if err != nil {
return err
}

_, err = writer.Write(bytes)
return err
}

if data != nil {
if str, ok := data.(string); ok {
_, err := writer.Write([]byte(str))
return err
}

if e, ok := data.(error); ok {
_, err := writer.Write([]byte(e.Error()))
return err
}
case error:
_, err := writer.Write([]byte(origin.Error()))
return err

default:
v := reflect.Indirect(reflect.ValueOf(data))
if t := v.Type(); t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8 {
t := v.Type()

switch {
case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8:
_, err := writer.Write(v.Bytes())
return err
}
if t := v.Type(); t.Kind() == reflect.Struct || t.Kind() == reflect.Slice {

case t.Kind() == reflect.String:
_, err := writer.Write([]byte(v.String()))
return err

case t.Kind() == reflect.Struct || t.Kind() == reflect.Slice:
b, err := swag.WriteJSON(data)
if err != nil {
return err
}

_, err = writer.Write(b)
return err
}
Expand Down
Loading

0 comments on commit e9d312a

Please sign in to comment.