Skip to content

Commit

Permalink
[ISSUE-520] Pool gzip compressor and buffer used in gzip writer (#521)
Browse files Browse the repository at this point in the history
* add gzip.go and replace with pooled gzip compressor & buffer

Signed-off-by: Kazuma (Pakio) Arimura <k.arimura96@gmail.com>

* add test case for gzip.go

Signed-off-by: Kazuma (Pakio) Arimura <k.arimura96@gmail.com>

* update CHANGELOG.md

Signed-off-by: Kazuma (Pakio) Arimura <k.arimura96@gmail.com>

* fix lint

Signed-off-by: Kazuma (Pakio) Arimura <k.arimura96@gmail.com>

* update test package name

Signed-off-by: Kazuma (Pakio) Arimura <k.arimura96@gmail.com>

* rename test file

Signed-off-by: Kazuma (Pakio) Arimura <k.arimura96@gmail.com>

---------

Signed-off-by: Kazuma (Pakio) Arimura <k.arimura96@gmail.com>
  • Loading branch information
pakio authored Apr 11, 2024
1 parent c2d2a5b commit 7ea16f7
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Changes bulk error/reason field and some cat response fields to pointer as they can be nil ([#510](https://github.com/opensearch-project/opensearch-go/pull/510))
- Adjust workflows to work with security plugin ([#507](https://github.com/opensearch-project/opensearch-go/pull/507))
- Updates USER_GUIDE.md and /_samples/ ([#518](https://github.com/opensearch-project/opensearch-go/pull/518))
- Updates opensearchtransport.Client to use pooled gzip writer and buffer ([#521](https://github.com/opensearch-project/opensearch-go/pull/521))

### Deprecated

Expand Down
61 changes: 61 additions & 0 deletions opensearchtransport/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// SPDX-License-Identifier: Apache-2.0
//
// The OpenSearch Contributors require contributions made to
// this file be licensed under the Apache-2.0 license or a
// compatible open source license.

package opensearchtransport

import (
"bytes"
"compress/gzip"
"fmt"
"io"
"sync"
)

type gzipCompressor struct {
gzipWriterPool *sync.Pool
bufferPool *sync.Pool
}

// newGzipCompressor returns a new gzipCompressor that uses a sync.Pool to reuse gzip.Writers.
func newGzipCompressor() *gzipCompressor {
gzipWriterPool := sync.Pool{
New: func() any {
return gzip.NewWriter(io.Discard)
},
}

bufferPool := sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}

return &gzipCompressor{
gzipWriterPool: &gzipWriterPool,
bufferPool: &bufferPool,
}
}

func (pg *gzipCompressor) compress(rc io.ReadCloser) (*bytes.Buffer, error) {
writer := pg.gzipWriterPool.Get().(*gzip.Writer)
defer pg.gzipWriterPool.Put(writer)

buf := pg.bufferPool.Get().(*bytes.Buffer)
buf.Reset()
writer.Reset(buf)

if _, err := io.Copy(writer, rc); err != nil {
return nil, fmt.Errorf("failed to compress request body: %w", err)
}
if err := writer.Close(); err != nil {
return nil, fmt.Errorf("failed to compress request body (during close): %w", err)
}
return buf, nil
}

func (pg *gzipCompressor) collectBuffer(buf *bytes.Buffer) {
pg.bufferPool.Put(buf)
}
124 changes: 124 additions & 0 deletions opensearchtransport/gzip_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// SPDX-License-Identifier: Apache-2.0
//
// The OpenSearch Contributors require contributions made to
// this file be licensed under the Apache-2.0 license or a
// compatible open source license.

//go:build !integration

package opensearchtransport

import (
"compress/gzip"
"io"
"math/rand"
"strings"
"testing"
)

func TestCompress(t *testing.T) {
t.Run("initialize & compress", func(t *testing.T) {
gzipCompressor := newGzipCompressor()
body := generateRandomString()
rc := io.NopCloser(strings.NewReader(body))

buf, err := gzipCompressor.compress(rc)
defer gzipCompressor.collectBuffer(buf)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// unzip
r, _ := gzip.NewReader(buf)
s, _ := io.ReadAll(r)
if string(s) != body {
t.Fatalf("expected body to be the same after compressing and decompressing: expected %s, got %s", body, string(s))
}
})

t.Run("gzip multiple times", func(t *testing.T) {
gzipCompressor := newGzipCompressor()
for i := 0; i < 5; i++ {
body := generateRandomString()
rc := io.NopCloser(strings.NewReader(body))

buf, err := gzipCompressor.compress(rc)
defer gzipCompressor.collectBuffer(buf)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// unzip
r, _ := gzip.NewReader(buf)
s, _ := io.ReadAll(r)
if string(s) != body {
t.Fatal("expected body to be the same after compressing and decompressing")
}
}
})

t.Run("ensure gzipped data is smaller and different from original", func(t *testing.T) {
gzipCompressor := newGzipCompressor()
body := generateRandomString()
rc := io.NopCloser(strings.NewReader(body))

buf, err := gzipCompressor.compress(rc)
defer gzipCompressor.collectBuffer(buf)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if len(buf.Bytes()) <= len(body) {
t.Fatalf("expected compressed data to be smaller than original: expected %d, got %d", len(body), len(buf.Bytes()))
}

if body == buf.String() {
t.Fatalf("expected compressed data to be different from original")
}
})

t.Run("compressing data twice", func(t *testing.T) {
gzipCompressor := newGzipCompressor()
body := generateRandomString()
rc := io.NopCloser(strings.NewReader(body))

buf, err := gzipCompressor.compress(rc)
defer gzipCompressor.collectBuffer(buf)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

rc = io.NopCloser(buf)
buf2, err := gzipCompressor.compress(rc)
defer gzipCompressor.collectBuffer(buf2)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// unzip
r, _ := gzip.NewReader(buf2)
r, _ = gzip.NewReader(r)
s, _ := io.ReadAll(r)
if string(s) != body {
t.Fatal("expected body to be the same after compressing and decompressing twice")
}
})
}

func generateRandomString() string {
length := rand.Intn(100) + 1

// Define the characters that can be used in the random string
charset := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

// Create a byte slice with the specified length
randomBytes := make([]byte, length)

// Generate a random character from the charset for each byte in the slice
for i := 0; i < length; i++ {
randomBytes[i] = charset[rand.Intn(len(charset))]
}

// Convert the byte slice to a string and return it
return string(randomBytes)
}
20 changes: 10 additions & 10 deletions opensearchtransport/opensearchtransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ package opensearchtransport

import (
"bytes"
"compress/gzip"
"crypto/x509"
"errors"
"fmt"
Expand Down Expand Up @@ -111,7 +110,8 @@ type Client struct {
discoverNodesInterval time.Duration
discoverNodesTimer *time.Timer

compressRequestBody bool
compressRequestBody bool
pooledGzipCompressor *gzipCompressor

metrics *metrics

Expand Down Expand Up @@ -211,6 +211,10 @@ func New(cfg Config) (*Client, error) {
})
}

if cfg.CompressRequestBody {
client.pooledGzipCompressor = newGzipCompressor()
}

return &client, nil
}

Expand All @@ -234,18 +238,14 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) {

if req.Body != nil && req.Body != http.NoBody {
if c.compressRequestBody {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
if _, err := io.Copy(zw, req.Body); err != nil {
buf, err := c.pooledGzipCompressor.compress(req.Body)
defer c.pooledGzipCompressor.collectBuffer(buf)
if err != nil {
return nil, fmt.Errorf("failed to compress request body: %w", err)
}
if err := zw.Close(); err != nil {
return nil, fmt.Errorf("failed to compress request body (during close): %w", err)
}

req.GetBody = func() (io.ReadCloser, error) {
r := buf
return io.NopCloser(&r), nil
return io.NopCloser(buf), nil
}
//nolint:errcheck // error is always nil
req.Body, _ = req.GetBody()
Expand Down

0 comments on commit 7ea16f7

Please sign in to comment.