Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: Compress Reader/Writer supports reading/writing Snappy/Zstd type compressed files #38603

Merged
merged 11 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,7 @@ func newChunkRestore(
if chunk.FileMeta.Type == mydump.SourceTypeParquet {
reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize)
} else {
storage.WithCompression(store, storage.LZO).Open(ctx, chunk.FileMeta.Path)
reader, err = store.Open(ctx, chunk.FileMeta.Path)
}
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er
}

type compressReader struct {
io.ReadCloser
io.Reader
io.Closer
}

// nolint:interfacer
Expand All @@ -94,14 +95,20 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType
return nil, errors.Trace(err)
}
return &compressReader{
ReadCloser: r,
Reader: r,
Closer: fileReader,
}, nil
}

func (*compressReader) Seek(_ int64, _ int) (int64, error) {
return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now")
}

func (c *compressReader) Close() error {
err := c.Closer.Close()
return err
}

type flushStorageWriter struct {
writer io.Writer
flusher flusher
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"io"

"github.com/golang/snappy"
"github.com/pingcap/errors"
)

Expand All @@ -17,6 +18,10 @@ const (
NoCompression CompressType = iota
// Gzip will compress given bytes in gzip format.
Gzip
// LZO will compress given bytes in lzo format.
LZO
// Snappy will compress given bytes in snappy format.
Snappy
)

type flusher interface {
Expand Down Expand Up @@ -50,15 +55,19 @@ func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWri
switch compressType {
case Gzip:
return gzip.NewWriter(w)
case Snappy:
return snappy.NewBufferedWriter(w)
default:
return nil
}
}

func newCompressReader(compressType CompressType, r io.Reader) (io.ReadCloser, error) {
func newCompressReader(compressType CompressType, r io.Reader) (io.Reader, error) {
switch compressType {
case Gzip:
return gzip.NewReader(r)
case Snappy:
return snappy.NewReader(r), nil
default:
return nil, nil
}
Expand Down
55 changes: 50 additions & 5 deletions br/pkg/storage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestCompressReaderWriter(t *testing.T) {
content []string
compressType CompressType
}
testFn := func(test *testcase, t *testing.T) {
testFnGzip := func(test *testcase, t *testing.T) {
t.Log(test.name)
backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -124,7 +124,6 @@ func TestCompressReaderWriter(t *testing.T) {
_, err = bf.ReadFrom(r)
require.NoError(t, err)
require.Equal(t, strings.Join(test.content, ""), bf.String())
require.Nil(t, r.Close())

// test withCompression Open
r, err = storage.Open(ctx, fileName)
Expand All @@ -135,7 +134,49 @@ func TestCompressReaderWriter(t *testing.T) {

require.Nil(t, file.Close())
}
compressTypeArr := []CompressType{Gzip}
compressTypeGzipArr := []CompressType{Gzip}

testFnSnappy := func(test *testcase, t *testing.T) {
t.Log(test.name)
backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil)
require.NoError(t, err)
ctx := context.Background()
storage, err := Create(ctx, backend, true)
require.NoError(t, err)
storage = WithCompression(storage, Snappy)
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt.snappy"
writer, err := storage.Create(ctx, fileName)
require.NoError(t, err)
for _, str := range test.content {
p := []byte(str)
written, err2 := writer.Write(ctx, p)
require.Nil(t, err2)
require.Len(t, p, written)
}
err = writer.Close(ctx)
require.NoError(t, err)

// make sure compressed file is written correctly
file, err := os.Open(filepath.Join(dir, fileName))
require.NoError(t, err)
r, err := newCompressReader(test.compressType, file)
require.NoError(t, err)
var bf bytes.Buffer
_, err = bf.ReadFrom(r)
require.NoError(t, err)
require.Equal(t, strings.Join(test.content, ""), bf.String())

// test withCompression Open
r, err = storage.Open(ctx, fileName)
require.NoError(t, err)
content, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, strings.Join(test.content, ""), string(content))

require.Nil(t, file.Close())
}
compressTypeSnappyArr := []CompressType{Snappy}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine these two functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified


tests := []testcase{
{
name: "long text medium chunks",
Expand All @@ -161,9 +202,13 @@ func TestCompressReaderWriter(t *testing.T) {
},
}
for i := range tests {
for _, compressType := range compressTypeArr {
for _, compressType := range compressTypeGzipArr {
tests[i].compressType = compressType
testFnGzip(&tests[i], t)
}
for _, compressType := range compressTypeSnappyArr {
tests[i].compressType = compressType
testFn(&tests[i], t)
testFnSnappy(&tests[i], t)
}
}
}