diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a459eac5..e5d3bd508 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Changes bulk error/reason field and some cat response fields to pointer as they can be nil ([#510](https://github.com/opensearch-project/opensearch-go/pull/510)) - Adjust workflows to work with security plugin ([#507](https://github.com/opensearch-project/opensearch-go/pull/507)) - Updates USER_GUIDE.md and /_samples/ ([#518](https://github.com/opensearch-project/opensearch-go/pull/518)) +- Updates opensearchtransport.Client to use pooled gzip writer and buffer ([#521](https://github.com/opensearch-project/opensearch-go/pull/521)) ### Deprecated diff --git a/opensearchtransport/gzip.go b/opensearchtransport/gzip.go new file mode 100644 index 000000000..d74cd07ec --- /dev/null +++ b/opensearchtransport/gzip.go @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package opensearchtransport + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + "sync" +) + +type gzipCompressor struct { + gzipWriterPool *sync.Pool + bufferPool *sync.Pool +} + +// newGzipCompressor returns a new gzipCompressor that uses a sync.Pool to reuse gzip.Writers. +func newGzipCompressor() *gzipCompressor { + gzipWriterPool := sync.Pool{ + New: func() any { + return gzip.NewWriter(io.Discard) + }, + } + + bufferPool := sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, + } + + return &gzipCompressor{ + gzipWriterPool: &gzipWriterPool, + bufferPool: &bufferPool, + } +} + +func (pg *gzipCompressor) compress(rc io.ReadCloser) (*bytes.Buffer, error) { + writer := pg.gzipWriterPool.Get().(*gzip.Writer) + defer pg.gzipWriterPool.Put(writer) + + buf := pg.bufferPool.Get().(*bytes.Buffer) + buf.Reset() + writer.Reset(buf) + + if _, err := io.Copy(writer, rc); err != nil { + return nil, fmt.Errorf("failed to compress request body: %w", err) + } + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("failed to compress request body (during close): %w", err) + } + return buf, nil +} + +func (pg *gzipCompressor) collectBuffer(buf *bytes.Buffer) { + pg.bufferPool.Put(buf) +} diff --git a/opensearchtransport/gzip_internal_test.go b/opensearchtransport/gzip_internal_test.go new file mode 100644 index 000000000..4c809e038 --- /dev/null +++ b/opensearchtransport/gzip_internal_test.go @@ -0,0 +1,124 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +//go:build !integration + +package opensearchtransport + +import ( + "compress/gzip" + "io" + "math/rand" + "strings" + "testing" +) + +func TestCompress(t *testing.T) { + t.Run("initialize & compress", func(t *testing.T) { + gzipCompressor := newGzipCompressor() + body := generateRandomString() + rc := io.NopCloser(strings.NewReader(body)) + + buf, err := gzipCompressor.compress(rc) + defer gzipCompressor.collectBuffer(buf) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // unzip + r, _ := gzip.NewReader(buf) + s, _ := io.ReadAll(r) + if string(s) != body { + t.Fatalf("expected body to be the same after compressing and decompressing: expected %s, got %s", body, string(s)) + } + }) + + t.Run("gzip multiple times", func(t *testing.T) { + gzipCompressor := newGzipCompressor() + for i := 0; i < 5; i++ { + body := generateRandomString() + rc := io.NopCloser(strings.NewReader(body)) + + buf, err := gzipCompressor.compress(rc) + defer gzipCompressor.collectBuffer(buf) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // unzip + r, _ := gzip.NewReader(buf) + s, _ := io.ReadAll(r) + if string(s) != body { + t.Fatal("expected body to be the same after compressing and decompressing") + } + } + }) + + t.Run("ensure gzipped data is smaller and different from original", func(t *testing.T) { + gzipCompressor := newGzipCompressor() + body := generateRandomString() + rc := io.NopCloser(strings.NewReader(body)) + + buf, err := gzipCompressor.compress(rc) + defer gzipCompressor.collectBuffer(buf) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(buf.Bytes()) <= len(body) { + t.Fatalf("expected compressed data to be smaller than original: expected %d, got %d", len(body), len(buf.Bytes())) + } + + if body == buf.String() { + t.Fatalf("expected compressed data to be different from original") + } + }) + + t.Run("compressing data twice", func(t *testing.T) { + gzipCompressor := newGzipCompressor() + body := generateRandomString() + rc := io.NopCloser(strings.NewReader(body)) + + buf, err := gzipCompressor.compress(rc) + defer gzipCompressor.collectBuffer(buf) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + rc = io.NopCloser(buf) + buf2, err := gzipCompressor.compress(rc) + defer gzipCompressor.collectBuffer(buf2) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // unzip + r, _ := gzip.NewReader(buf2) + r, _ = gzip.NewReader(r) + s, _ := io.ReadAll(r) + if string(s) != body { + t.Fatal("expected body to be the same after compressing and decompressing twice") + } + }) +} + +func generateRandomString() string { + length := rand.Intn(100) + 1 + + // Define the characters that can be used in the random string + charset := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + + // Create a byte slice with the specified length + randomBytes := make([]byte, length) + + // Generate a random character from the charset for each byte in the slice + for i := 0; i < length; i++ { + randomBytes[i] = charset[rand.Intn(len(charset))] + } + + // Convert the byte slice to a string and return it + return string(randomBytes) +} diff --git a/opensearchtransport/opensearchtransport.go b/opensearchtransport/opensearchtransport.go index e1cfc29c6..504cfc6a9 100644 --- a/opensearchtransport/opensearchtransport.go +++ b/opensearchtransport/opensearchtransport.go @@ -28,7 +28,6 @@ package opensearchtransport import ( "bytes" - "compress/gzip" "crypto/x509" "errors" "fmt" @@ -111,7 +110,8 @@ type Client struct { discoverNodesInterval time.Duration discoverNodesTimer *time.Timer - compressRequestBody bool + compressRequestBody bool + pooledGzipCompressor *gzipCompressor metrics *metrics @@ -211,6 +211,10 @@ func New(cfg Config) (*Client, error) { }) } + if cfg.CompressRequestBody { + client.pooledGzipCompressor = newGzipCompressor() + } + return &client, nil } @@ -234,18 +238,14 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { if req.Body != nil && req.Body != http.NoBody { if c.compressRequestBody { - var buf bytes.Buffer - zw := gzip.NewWriter(&buf) - if _, err := io.Copy(zw, req.Body); err != nil { + buf, err := c.pooledGzipCompressor.compress(req.Body) + defer c.pooledGzipCompressor.collectBuffer(buf) + if err != nil { return nil, fmt.Errorf("failed to compress request body: %w", err) } - if err := zw.Close(); err != nil { - return nil, fmt.Errorf("failed to compress request body (during close): %w", err) - } req.GetBody = func() (io.ReadCloser, error) { - r := buf - return io.NopCloser(&r), nil + return io.NopCloser(buf), nil } //nolint:errcheck // error is always nil req.Body, _ = req.GetBody()