diff --git a/conn/pool.go b/conn/pool.go index c7cfb410a79..6cdeff7b47c 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -156,7 +156,8 @@ func newPool(addr string) (*Pool, error) { grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(x.GrpcMaxSize), - grpc.MaxCallSendMsgSize(x.GrpcMaxSize)), + grpc.MaxCallSendMsgSize(x.GrpcMaxSize), + grpc.UseCompressor((snappyCompressor{}).Name())), grpc.WithBackoffMaxDelay(time.Second), grpc.WithInsecure()) if err != nil { diff --git a/conn/snappy.go b/conn/snappy.go new file mode 100644 index 00000000000..ff42e62730f --- /dev/null +++ b/conn/snappy.go @@ -0,0 +1,81 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package conn + +import ( + "io" + "sync" + + "github.com/golang/snappy" + "google.golang.org/grpc/encoding" +) + +// NB: The encoding.Compressor implementation needs to be goroutine +// safe as multiple goroutines may be using the same compressor for +// different streams on the same connection. +var snappyWriterPool sync.Pool +var snappyReaderPool sync.Pool + +type snappyWriter struct { + *snappy.Writer +} + +func (w *snappyWriter) Close() error { + defer snappyWriterPool.Put(w) + return w.Writer.Close() +} + +type snappyReader struct { + *snappy.Reader +} + +func (r *snappyReader) Read(p []byte) (n int, err error) { + n, err = r.Reader.Read(p) + if err == io.EOF { + snappyReaderPool.Put(r) + } + return n, err +} + +type snappyCompressor struct { +} + +func (snappyCompressor) Name() string { + return "snappy" +} + +func (snappyCompressor) Compress(w io.Writer) (io.WriteCloser, error) { + sw, ok := snappyWriterPool.Get().(*snappyWriter) + if !ok { + sw = &snappyWriter{snappy.NewBufferedWriter(w)} + } else { + sw.Reset(w) + } + return sw, nil +} + +func (snappyCompressor) Decompress(r io.Reader) (io.Reader, error) { + sr, ok := snappyReaderPool.Get().(*snappyReader) + if !ok { + sr = &snappyReader{snappy.NewReader(r)} + } else { + sr.Reset(r) + } + return sr, nil +} + +func init() { + encoding.RegisterCompressor(snappyCompressor{}) +} diff --git a/vendor/github.com/golang/snappy/AUTHORS b/vendor/github.com/golang/snappy/AUTHORS index bcfa19520af..f10b49bb3b3 100644 --- a/vendor/github.com/golang/snappy/AUTHORS +++ b/vendor/github.com/golang/snappy/AUTHORS @@ -11,5 +11,6 @@ Damian Gryski Google Inc. Jan Mercl <0xjnml@gmail.com> +Klaus Post Rodolfo Carvalho Sebastien Binet diff --git a/vendor/github.com/golang/snappy/CONTRIBUTORS b/vendor/github.com/golang/snappy/CONTRIBUTORS index 931ae31606f..3bd40cf5555 100644 --- a/vendor/github.com/golang/snappy/CONTRIBUTORS +++ b/vendor/github.com/golang/snappy/CONTRIBUTORS @@ -29,6 +29,7 @@ Damian Gryski Jan Mercl <0xjnml@gmail.com> Kai Backman +Klaus Post Marc-Antoine Ruel Nigel Tao Rob Pike diff --git a/vendor/github.com/golang/snappy/decode.go b/vendor/github.com/golang/snappy/decode.go index 72efb0353dd..f1e04b172c5 100644 --- a/vendor/github.com/golang/snappy/decode.go +++ b/vendor/github.com/golang/snappy/decode.go @@ -52,6 +52,8 @@ const ( // Otherwise, a newly allocated slice will be returned. // // The dst and src must not overlap. It is valid to pass a nil dst. +// +// Decode handles the Snappy block format, not the Snappy stream format. func Decode(dst, src []byte) ([]byte, error) { dLen, s, err := decodedLen(src) if err != nil { @@ -83,6 +85,8 @@ func NewReader(r io.Reader) *Reader { } // Reader is an io.Reader that can read Snappy-compressed bytes. +// +// Reader handles the Snappy stream format, not the Snappy block format. type Reader struct { r io.Reader err error diff --git a/vendor/github.com/golang/snappy/decode_other.go b/vendor/github.com/golang/snappy/decode_other.go index 8c9f2049bc7..b88318ecdbe 100644 --- a/vendor/github.com/golang/snappy/decode_other.go +++ b/vendor/github.com/golang/snappy/decode_other.go @@ -85,14 +85,28 @@ func decode(dst, src []byte) int { if offset <= 0 || d < offset || length > len(dst)-d { return decodeErrCodeCorrupt } - // Copy from an earlier sub-slice of dst to a later sub-slice. Unlike - // the built-in copy function, this byte-by-byte copy always runs + // Copy from an earlier sub-slice of dst to a later sub-slice. + // If no overlap, use the built-in copy: + if offset >= length { + copy(dst[d:d+length], dst[d-offset:]) + d += length + continue + } + + // Unlike the built-in copy function, this byte-by-byte copy always runs // forwards, even if the slices overlap. Conceptually, this is: // // d += forwardCopy(dst[d:d+length], dst[d-offset:]) - for end := d + length; d != end; d++ { - dst[d] = dst[d-offset] + // + // We align the slices into a and b and show the compiler they are the same size. + // This allows the loop to run without bounds checks. + a := dst[d : d+length] + b := dst[d-offset:] + b = b[:len(a)] + for i := range a { + a[i] = b[i] } + d += length } if d != len(dst) { return decodeErrCodeCorrupt diff --git a/vendor/github.com/golang/snappy/encode.go b/vendor/github.com/golang/snappy/encode.go index 8d393e904bb..7f23657076c 100644 --- a/vendor/github.com/golang/snappy/encode.go +++ b/vendor/github.com/golang/snappy/encode.go @@ -15,6 +15,8 @@ import ( // Otherwise, a newly allocated slice will be returned. // // The dst and src must not overlap. It is valid to pass a nil dst. +// +// Encode handles the Snappy block format, not the Snappy stream format. func Encode(dst, src []byte) []byte { if n := MaxEncodedLen(len(src)); n < 0 { panic(ErrTooLarge) @@ -139,6 +141,8 @@ func NewBufferedWriter(w io.Writer) *Writer { } // Writer is an io.Writer that can write Snappy-compressed bytes. +// +// Writer handles the Snappy stream format, not the Snappy block format. type Writer struct { w io.Writer err error diff --git a/vendor/vendor.json b/vendor/vendor.json index eded9cb3017..a3107bef338 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -716,6 +716,12 @@ "revision": "b285ee9cfc6c881bb20c0d8dc73370ea9b9ec90f", "revisionTime": "2019-05-17T06:12:10Z" }, + { + "checksumSHA1": "4gKrTOaoNvbAgDCVfSLd6J77oPw=", + "path": "github.com/golang/snappy", + "revision": "ff6b7dc882cf4cfba7ee0b9f7dcc1ac096c554aa", + "revisionTime": "2019-09-04T06:34:54Z" + }, { "checksumSHA1": "L3HoHVqp2EaBSOqBxB7l0PTyu7g=", "path": "github.com/golang/snappy",