diff --git a/bson/stream.go b/bson/stream.go new file mode 100644 index 000000000..466528457 --- /dev/null +++ b/bson/stream.go @@ -0,0 +1,90 @@ +package bson + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" +) + +const ( + // MinDocumentSize is the size of the smallest possible valid BSON document: + // an int32 size header + 0x00 (end of document). + MinDocumentSize = 5 + + // MaxDocumentSize is the largest possible size for a BSON document allowed by MongoDB, + // that is, 16 MiB (see https://docs.mongodb.com/manual/reference/limits/). + MaxDocumentSize = 16777216 +) + +// ErrInvalidDocumentSize is an error returned when a BSON document's header +// contains a size smaller than MinDocumentSize or greater than MaxDocumentSize. +type ErrInvalidDocumentSize struct { + DocumentSize int32 +} + +func (e ErrInvalidDocumentSize) Error() string { + return fmt.Sprintf("invalid document size %d", e.DocumentSize) +} + +// A Decoder reads and decodes BSON values from an input stream. +type Decoder struct { + source io.Reader +} + +// NewDecoder returns a new Decoder that reads from source. +// It does not add any extra buffering, and may not read data from source beyond the BSON values requested. +func NewDecoder(source io.Reader) *Decoder { + return &Decoder{source: source} +} + +// Decode reads the next BSON-encoded value from its input and stores it in the value pointed to by v. +// See the documentation for Unmarshal for details about the conversion of BSON into a Go value. +func (dec *Decoder) Decode(v interface{}) (err error) { + // BSON documents start with their size as a *signed* int32. + var docSize int32 + if err = binary.Read(dec.source, binary.LittleEndian, &docSize); err != nil { + return + } + + if docSize < MinDocumentSize || docSize > MaxDocumentSize { + return ErrInvalidDocumentSize{DocumentSize: docSize} + } + + docBuffer := bytes.NewBuffer(make([]byte, 0, docSize)) + if err = binary.Write(docBuffer, binary.LittleEndian, docSize); err != nil { + return + } + + // docSize is the *full* document's size (including the 4-byte size header, + // which has already been read). + if _, err = io.CopyN(docBuffer, dec.source, int64(docSize-4)); err != nil { + return + } + + // Let Unmarshal handle the rest. + defer handleErr(&err) + return Unmarshal(docBuffer.Bytes(), v) +} + +// An Encoder encodes and writes BSON values to an output stream. +type Encoder struct { + target io.Writer +} + +// NewEncoder returns a new Encoder that writes to target. +func NewEncoder(target io.Writer) *Encoder { + return &Encoder{target: target} +} + +// Encode encodes v to BSON, and if successful writes it to the Encoder's output stream. +// See the documentation for Marshal for details about the conversion of Go values to BSON. +func (enc *Encoder) Encode(v interface{}) error { + data, err := Marshal(v) + if err != nil { + return err + } + + _, err = enc.target.Write(data) + return err +} diff --git a/bson/stream_test.go b/bson/stream_test.go new file mode 100644 index 000000000..14acbe3c5 --- /dev/null +++ b/bson/stream_test.go @@ -0,0 +1,77 @@ +package bson_test + +import ( + "bytes" + + "github.com/globalsign/mgo/bson" + . "gopkg.in/check.v1" +) + +var invalidSizeDocuments = [][]byte{ + // Empty document + []byte{}, + // Incomplete header + []byte{0x04}, + // Negative size + []byte{0xff, 0xff, 0xff, 0xff}, + // Full, valid size header but too small (less than 5 bytes) + []byte{0x04, 0x00, 0x00, 0x00}, + // Valid header, valid size but incomplete document + []byte{0xff, 0x00, 0x00, 0x00, 0x00}, + // Too big + []byte{0xff, 0xff, 0xff, 0x7f}, +} + +// Reusing sampleItems from bson_test + +func (s *S) TestEncodeSampleItems(c *C) { + for i, item := range sampleItems { + buf := bytes.NewBuffer(nil) + enc := bson.NewEncoder(buf) + + err := enc.Encode(item.obj) + c.Assert(err, IsNil) + c.Assert(string(buf.Bytes()), Equals, item.data, Commentf("Failed on item %d", i)) + } +} + +func (s *S) TestDecodeSampleItems(c *C) { + for i, item := range sampleItems { + buf := bytes.NewBuffer([]byte(item.data)) + dec := bson.NewDecoder(buf) + + value := bson.M{} + err := dec.Decode(&value) + c.Assert(err, IsNil) + c.Assert(value, DeepEquals, item.obj, Commentf("Failed on item %d", i)) + } +} + +func (s *S) TestStreamRoundTrip(c *C) { + buf := bytes.NewBuffer(nil) + enc := bson.NewEncoder(buf) + + for _, item := range sampleItems { + err := enc.Encode(item.obj) + c.Assert(err, IsNil) + } + + // Ensure that everything that was encoded is decodable in the same order. + dec := bson.NewDecoder(buf) + for i, item := range sampleItems { + value := bson.M{} + err := dec.Decode(&value) + c.Assert(err, IsNil) + c.Assert(value, DeepEquals, item.obj, Commentf("Failed on item %d", i)) + } +} + +func (s *S) TestDecodeDocumentTooSmall(c *C) { + for i, item := range invalidSizeDocuments { + buf := bytes.NewBuffer(item) + dec := bson.NewDecoder(buf) + value := bson.M{} + err := dec.Decode(&value) + c.Assert(err, NotNil, Commentf("Failed on invalid size item %d", i)) + } +}