Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(compress): using compression pool #209

Merged
merged 3 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions lib/pool/compression_cache_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2024 openGemini Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pool

import (
"bytes"
"compress/gzip"
"errors"
"runtime"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)

var (
gzipReaderPool = NewCachePool[*gzip.Reader](func() *gzip.Reader {
return new(gzip.Reader)
}, 2*runtime.NumCPU())

snappyReaderPool = NewCachePool[*snappy.Reader](func() *snappy.Reader {
return snappy.NewReader(nil)
}, 2*runtime.NumCPU())

zstdDecoderPool = NewCachePool[*zstd.Decoder](func() *zstd.Decoder {
decoder, error := zstd.NewReader(nil)
if error != nil {
return nil
}
return decoder
}, 2*runtime.NumCPU())
)

func GetGzipReader(body []byte) (*gzip.Reader, error) {
xkx9431 marked this conversation as resolved.
Show resolved Hide resolved
gzipReader := gzipReaderPool.Get()
if gzipReader == nil {
return nil, errors.New("failed to get gzip reader")
}
err := gzipReader.Reset(bytes.NewReader(body))
if err != nil {
gzipReaderPool.Put(gzipReader) // Return the reader to the pool if reset fails
return nil, err
}
return gzipReader, nil
}

func PutGzipReader(reader *gzip.Reader) {
gzipReaderPool.Put(reader)
}

func GetSnappyReader(body []byte) (*snappy.Reader, error) {
snappyReader := snappyReaderPool.Get()
if snappyReader == nil {
return nil, errors.New("failed to get snappy reader")
}

snappyReader.Reset(bytes.NewReader(body))
return snappyReader, nil
}

func PutSnappyReader(reader *snappy.Reader) {
reader.Reset(nil)
snappyReaderPool.Put(reader)
}

func GetZstdDecoder(body []byte) (*zstd.Decoder, error) {
decoder := zstdDecoderPool.Get()
if decoder == nil {
return nil, errors.New("failed to get zstd decoder")
}

err := decoder.Reset(bytes.NewReader(body))
if err != nil {
zstdDecoderPool.Put(decoder) // Return the decoder to the pool if reset fails
return nil, err
}
return decoder, nil
}

func PutZstdDecoder(decoder *zstd.Decoder) {
err := decoder.Reset(nil)
if err != nil {
return
}
zstdDecoderPool.Put(decoder)
}
109 changes: 109 additions & 0 deletions lib/pool/compression_cache_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 openGemini Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pool

import (
"bytes"
"compress/gzip"
"io"
"testing"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)

func TestGzipReaderPool(t *testing.T) {
data := []byte("test data")
var buf bytes.Buffer
writer := gzip.NewWriter(&buf)
_, err := writer.Write(data)
if err != nil {
t.Fatalf("failed to write gzip data: %v", err)
}
writer.Close()

compressedData := buf.Bytes()

reader, err := GetGzipReader(compressedData)
if err != nil {
t.Fatalf("failed to get gzip reader: %v", err)
}

decompressedData, err := io.ReadAll(reader)
if err != nil {
t.Fatalf("failed to read gzip data: %v", err)
}

if !bytes.Equal(decompressedData, data) {
t.Errorf("expected %v, got %v", data, decompressedData)
}

PutGzipReader(reader)
}

func TestSnappyReaderPool(t *testing.T) {
data := []byte("test data")
var buf bytes.Buffer

// Write data to buffer
writer := snappy.NewBufferedWriter(&buf)
_, err := writer.Write(data)
if err != nil {
t.Fatalf("failed to write snappy data: %v", err)
}
writer.Close()

compressedData := buf.Bytes()

reader, err := GetSnappyReader(compressedData)
if err != nil {
t.Fatalf("failed to get snappy reader: %v", err)
}

decompressedData, err := io.ReadAll(reader)
if err != nil {
t.Fatalf("failed to read snappy data: %v", err)
}

if !bytes.Equal(decompressedData, data) {
t.Errorf("expected %v, got %v", data, decompressedData)
}

PutSnappyReader(reader)

}

func TestZstdDecoderPool(t *testing.T) {
data := []byte("test data")
encoder, _ := zstd.NewWriter(nil)
compressedData := encoder.EncodeAll(data, nil)
encoder.Close()

decoder, err := GetZstdDecoder(compressedData)
if err != nil {
t.Fatalf("failed to get zstd decoder: %v", err)
}

decompressedData, err := decoder.DecodeAll(compressedData, nil)
if err != nil {
t.Fatalf("failed to read zstd data: %v", err)
}

if !bytes.Equal(decompressedData, data) {
t.Errorf("expected %v, got %v", data, decompressedData)
}

PutZstdDecoder(decoder)
}
57 changes: 57 additions & 0 deletions lib/pool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2024 openGemini Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pool

import (
"sync"
)

type CachePool[T any] struct {
pool sync.Pool
capacityChan chan struct{}
newFunc func() T
}

func NewCachePool[T any](newFunc func() T, maxSize int) *CachePool[T] {
return &CachePool[T]{
pool: sync.Pool{
New: func() interface{} {
return newFunc()
},
},
capacityChan: make(chan struct{}, maxSize),
xkx9431 marked this conversation as resolved.
Show resolved Hide resolved
newFunc: newFunc,
}
}

func (c *CachePool[T]) Get() T {
select {
case <-c.capacityChan:
item := c.pool.Get()

return item.(T)
default:
return c.newFunc()
}
}

func (c *CachePool[T]) Put(x T) {
select {
case c.capacityChan <- struct{}{}:
c.pool.Put(x)
default:
// Pool is full, discard the item
}
}
77 changes: 77 additions & 0 deletions lib/pool/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2024 openGemini Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pool

import (
"testing"
)

func TestCachePool(t *testing.T) {
// Create a new CachePool with a max size of 2
pool := NewCachePool(func() interface{} {
return new(struct{})
}, 2)

// Get an item from the pool
item1 := pool.Get().(*struct{})
if item1 == nil {
t.Errorf("expected non-nil item, got nil")
}

// Put the item back into the pool
pool.Put(item1)

// Get another item from the pool
item2 := pool.Get().(*struct{})
if item2 == nil {
t.Errorf("expected non-nil item, got nil")
}

// Ensure the item is the same as the first one
if item1 != item2 {
t.Errorf("expected the same item, got different items")
}

}

func TestPoolDiscardWhenFull(t *testing.T) {
// Create a pool with a capacity of 1
pool := NewCachePool(func() interface{} {
return 1
}, 1)

// Get an item from the pool
item1 := pool.Get().(int)

// Put the item back into the pool
pool.Put(item1)

// Try to put another item into the pool, which should be discarded
item2 := 2
pool.Put(item2)

// Get an item from the pool
item3 := pool.Get().(int)

// Ensure the item is the same as the first one, meaning the second item was discarded
if item1 != item3 {
t.Errorf("expected the same item, got different items")
}

// Ensure the discarded item is not the same as the one in the pool
if item2 == item3 {
t.Errorf("expected different items, got the same item")
}
}
Loading
Loading