Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: Compress Reader/Writer supports reading/writing Snappy/Zstd type compressed files #38603

Merged
merged 11 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er
}

type compressReader struct {
io.ReadCloser
io.Reader
io.Closer
}

// nolint:interfacer
Expand All @@ -94,14 +95,20 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType
return nil, errors.Trace(err)
}
return &compressReader{
ReadCloser: r,
Reader: r,
Closer: fileReader,
}, nil
}

func (*compressReader) Seek(_ int64, _ int) (int64, error) {
return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now")
}

func (c *compressReader) Close() error {
err := c.Closer.Close()
return err
}

type flushStorageWriter struct {
writer io.Writer
flusher flusher
Expand Down
35 changes: 34 additions & 1 deletion br/pkg/storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import (
"context"
"io"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
)

// CompressType represents the type of compression.
Expand All @@ -17,6 +21,10 @@ const (
NoCompression CompressType = iota
// Gzip will compress given bytes in gzip format.
Gzip
// Snappy will compress given bytes in snappy format.
Snappy
// Zstd will compress given bytes in zstd format.
Zstd
)

type flusher interface {
Expand All @@ -39,6 +47,19 @@ type interceptBuffer interface {
Compressed() bool
}

func createSuffixString(compressType CompressType) string {
if compressType == Gzip {
return ".txt.gz"
}
if compressType == Snappy {
return ".txt.snappy"
}
if compressType == Zstd {
return ".txt.zst"
}
return ""
}

func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer {
if compressType == NoCompression {
return newNoCompressionBuffer(chunkSize)
Expand All @@ -50,15 +71,27 @@ func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWri
switch compressType {
case Gzip:
return gzip.NewWriter(w)
case Snappy:
return snappy.NewBufferedWriter(w)
case Zstd:
newWriter, err := zstd.NewWriter(w)
if err != nil {
log.Warn("Met error when creating new writer for Zstd type file", zap.Error(err))
}
return newWriter
default:
return nil
}
}

func newCompressReader(compressType CompressType, r io.Reader) (io.ReadCloser, error) {
func newCompressReader(compressType CompressType, r io.Reader) (io.Reader, error) {
switch compressType {
case Gzip:
return gzip.NewReader(r)
case Snappy:
return snappy.NewReader(r), nil
case Zstd:
return zstd.NewReader(r)
default:
return nil, nil
}
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/storage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func TestCompressReaderWriter(t *testing.T) {
ctx := context.Background()
storage, err := Create(ctx, backend, true)
require.NoError(t, err)
storage = WithCompression(storage, Gzip)
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt.gz"
storage = WithCompression(storage, test.compressType)
suffix := createSuffixString(test.compressType)
fileName := strings.ReplaceAll(test.name, " ", "-") + suffix
writer, err := storage.Create(ctx, fileName)
require.NoError(t, err)
for _, str := range test.content {
Expand All @@ -124,7 +125,6 @@ func TestCompressReaderWriter(t *testing.T) {
_, err = bf.ReadFrom(r)
require.NoError(t, err)
require.Equal(t, strings.Join(test.content, ""), bf.String())
require.Nil(t, r.Close())

// test withCompression Open
r, err = storage.Open(ctx, fileName)
Expand All @@ -135,7 +135,8 @@ func TestCompressReaderWriter(t *testing.T) {

require.Nil(t, file.Close())
}
compressTypeArr := []CompressType{Gzip}
compressTypeArr := []CompressType{Gzip, Snappy, Zstd}

tests := []testcase{
{
name: "long text medium chunks",
Expand Down