Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: Compress Reader/Writer supports reading/writing Snappy/Zstd type compressed files #38603

Merged
merged 11 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er
}

type compressReader struct {
io.ReadCloser
io.Reader
io.Closer
}

// nolint:interfacer
Expand All @@ -94,14 +95,20 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType
return nil, errors.Trace(err)
}
return &compressReader{
ReadCloser: r,
Reader: r,
Closer: fileReader,
}, nil
}

func (*compressReader) Seek(_ int64, _ int) (int64, error) {
return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now")
}

func (c *compressReader) Close() error {
err := c.Closer.Close()
return err
}

type flushStorageWriter struct {
writer io.Writer
flusher flusher
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"io"

"github.com/golang/snappy"
"github.com/pingcap/errors"
)

Expand All @@ -17,6 +18,10 @@ const (
NoCompression CompressType = iota
// Gzip will compress given bytes in gzip format.
Gzip
// LZO will compress given bytes in lzo format.
LZO
// Snappy will compress given bytes in snappy format.
Snappy
)

type flusher interface {
Expand Down Expand Up @@ -50,15 +55,19 @@ func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWri
switch compressType {
case Gzip:
return gzip.NewWriter(w)
case Snappy:
return snappy.NewBufferedWriter(w)
default:
return nil
}
}

func newCompressReader(compressType CompressType, r io.Reader) (io.ReadCloser, error) {
func newCompressReader(compressType CompressType, r io.Reader) (io.Reader, error) {
switch compressType {
case Gzip:
return gzip.NewReader(r)
case Snappy:
return snappy.NewReader(r), nil
default:
return nil, nil
}
Expand Down
15 changes: 11 additions & 4 deletions br/pkg/storage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,15 @@ func TestCompressReaderWriter(t *testing.T) {
ctx := context.Background()
storage, err := Create(ctx, backend, true)
require.NoError(t, err)
storage = WithCompression(storage, Gzip)
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt.gz"
storage = WithCompression(storage, test.compressType)
var suffix string
if test.compressType == Gzip {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about implementing a String() function for these compressType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified

suffix = ".txt.gz"
}
if test.compressType == Snappy {
suffix = ".txt.snappy"
}
fileName := strings.ReplaceAll(test.name, " ", "-") + suffix
writer, err := storage.Create(ctx, fileName)
require.NoError(t, err)
for _, str := range test.content {
Expand All @@ -124,7 +131,6 @@ func TestCompressReaderWriter(t *testing.T) {
_, err = bf.ReadFrom(r)
require.NoError(t, err)
require.Equal(t, strings.Join(test.content, ""), bf.String())
require.Nil(t, r.Close())

// test withCompression Open
r, err = storage.Open(ctx, fileName)
Expand All @@ -135,7 +141,8 @@ func TestCompressReaderWriter(t *testing.T) {

require.Nil(t, file.Close())
}
compressTypeArr := []CompressType{Gzip}
compressTypeArr := []CompressType{Gzip, Snappy}

tests := []testcase{
{
name: "long text medium chunks",
Expand Down