From 76f9103c5cffe893eec5630a8fed75b2a0958d06 Mon Sep 17 00:00:00 2001 From: lyzx2001 Date: Fri, 21 Oct 2022 22:31:43 +0800 Subject: [PATCH 1/9] compress reader/writer support reading/writing snappy type compressed files --- br/pkg/storage/compress.go | 11 ++++++-- br/pkg/storage/writer.go | 11 +++++++- br/pkg/storage/writer_test.go | 47 ++++++++++++++++++++++++++++++++++- 3 files changed, 65 insertions(+), 4 deletions(-) diff --git a/br/pkg/storage/compress.go b/br/pkg/storage/compress.go index 96258221d9b62..1d5300cfa8d55 100644 --- a/br/pkg/storage/compress.go +++ b/br/pkg/storage/compress.go @@ -81,7 +81,8 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er } type compressReader struct { - io.ReadCloser + io.Reader + io.Closer } // nolint:interfacer @@ -94,7 +95,8 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType return nil, errors.Trace(err) } return &compressReader{ - ReadCloser: r, + Reader: r, + Closer: fileReader, }, nil } @@ -102,6 +104,11 @@ func (*compressReader) Seek(_ int64, _ int) (int64, error) { return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now") } +func (c *compressReader) Close() error { + err := c.Closer.Close() + return err +} + type flushStorageWriter struct { writer io.Writer flusher flusher diff --git a/br/pkg/storage/writer.go b/br/pkg/storage/writer.go index 455cc9c3c3411..6c74c4109ff5a 100644 --- a/br/pkg/storage/writer.go +++ b/br/pkg/storage/writer.go @@ -6,6 +6,7 @@ import ( "context" "io" + "github.com/golang/snappy" "github.com/pingcap/errors" ) @@ -17,6 +18,10 @@ const ( NoCompression CompressType = iota // Gzip will compress given bytes in gzip format. Gzip + // LZO will compress given bytes in lzo format. + LZO + // Snappy will compress given bytes in snappy format. + Snappy ) type flusher interface { @@ -50,15 +55,19 @@ func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWri switch compressType { case Gzip: return gzip.NewWriter(w) + case Snappy: + return snappy.NewBufferedWriter(w) default: return nil } } -func newCompressReader(compressType CompressType, r io.Reader) (io.ReadCloser, error) { +func newCompressReader(compressType CompressType, r io.Reader) (io.Reader, error) { switch compressType { case Gzip: return gzip.NewReader(r) + case Snappy: + return snappy.NewReader(r), nil default: return nil, nil } diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index c3d4080123f4f..84100de094a97 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -124,7 +124,6 @@ func TestCompressReaderWriter(t *testing.T) { _, err = bf.ReadFrom(r) require.NoError(t, err) require.Equal(t, strings.Join(test.content, ""), bf.String()) - require.Nil(t, r.Close()) // test withCompression Open r, err = storage.Open(ctx, fileName) @@ -136,6 +135,48 @@ func TestCompressReaderWriter(t *testing.T) { require.Nil(t, file.Close()) } compressTypeArr := []CompressType{Gzip} + + testFnSanppy := func(test *testcase, t *testing.T) { + t.Log(test.name) + backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil) + require.NoError(t, err) + ctx := context.Background() + storage, err := Create(ctx, backend, true) + require.NoError(t, err) + storage = WithCompression(storage, Snappy) + fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt.snappy" + writer, err := storage.Create(ctx, fileName) + require.NoError(t, err) + for _, str := range test.content { + p := []byte(str) + written, err2 := writer.Write(ctx, p) + require.Nil(t, err2) + require.Len(t, p, written) + } + err = writer.Close(ctx) + require.NoError(t, err) + + // make sure compressed file is written correctly + file, err := os.Open(filepath.Join(dir, fileName)) + require.NoError(t, err) + r, err := newCompressReader(test.compressType, file) + require.NoError(t, err) + var bf bytes.Buffer + _, err = bf.ReadFrom(r) + require.NoError(t, err) + require.Equal(t, strings.Join(test.content, ""), bf.String()) + + // test withCompression Open + r, err = storage.Open(ctx, fileName) + require.NoError(t, err) + content, err := io.ReadAll(r) + require.NoError(t, err) + require.Equal(t, strings.Join(test.content, ""), string(content)) + + require.Nil(t, file.Close()) + } + compressTypeSnappyArr := []CompressType{Snappy} + tests := []testcase{ { name: "long text medium chunks", @@ -165,5 +206,9 @@ func TestCompressReaderWriter(t *testing.T) { tests[i].compressType = compressType testFn(&tests[i], t) } + for _, compressType := range compressTypeSnappyArr { + tests[i].compressType = compressType + testFnSanppy(&tests[i], t) + } } } From fc6791ebcbf3b00d3039f2303f0dfb1993295fe5 Mon Sep 17 00:00:00 2001 From: lyzx2001 Date: Tue, 25 Oct 2022 11:44:28 +0800 Subject: [PATCH 2/9] compress reader/writer support reading/writing snappy type compressed files --- br/pkg/lightning/restore/restore.go | 1 + br/pkg/storage/writer_test.go | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index ba8faac2996a3..ecedf90e3c10b 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2140,6 +2140,7 @@ func newChunkRestore( if chunk.FileMeta.Type == mydump.SourceTypeParquet { reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize) } else { + storage.WithCompression(store, storage.LZO).Open(ctx, chunk.FileMeta.Path) reader, err = store.Open(ctx, chunk.FileMeta.Path) } if err != nil { diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index 84100de094a97..237bdb8cbd2f5 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -95,7 +95,7 @@ func TestCompressReaderWriter(t *testing.T) { content []string compressType CompressType } - testFn := func(test *testcase, t *testing.T) { + testFnGzip := func(test *testcase, t *testing.T) { t.Log(test.name) backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil) require.NoError(t, err) @@ -134,9 +134,9 @@ func TestCompressReaderWriter(t *testing.T) { require.Nil(t, file.Close()) } - compressTypeArr := []CompressType{Gzip} + compressTypeGzipArr := []CompressType{Gzip} - testFnSanppy := func(test *testcase, t *testing.T) { + testFnSnappy := func(test *testcase, t *testing.T) { t.Log(test.name) backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil) require.NoError(t, err) @@ -202,13 +202,13 @@ func TestCompressReaderWriter(t *testing.T) { }, } for i := range tests { - for _, compressType := range compressTypeArr { + for _, compressType := range compressTypeGzipArr { tests[i].compressType = compressType - testFn(&tests[i], t) + testFnGzip(&tests[i], t) } for _, compressType := range compressTypeSnappyArr { tests[i].compressType = compressType - testFnSanppy(&tests[i], t) + testFnSnappy(&tests[i], t) } } } From 362bbb82e44ada3e6485310760fe2dd8166bf657 Mon Sep 17 00:00:00 2001 From: lyzx2001 Date: Wed, 26 Oct 2022 16:37:02 +0800 Subject: [PATCH 3/9] compress reader/writer support reading/writing snappy type compressed files --- br/pkg/storage/writer_test.go | 62 +++++++---------------------------- 1 file changed, 12 insertions(+), 50 deletions(-) diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index 237bdb8cbd2f5..2418b53367c77 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -95,56 +95,22 @@ func TestCompressReaderWriter(t *testing.T) { content []string compressType CompressType } - testFnGzip := func(test *testcase, t *testing.T) { + testFn := func(test *testcase, t *testing.T) { t.Log(test.name) backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil) require.NoError(t, err) ctx := context.Background() storage, err := Create(ctx, backend, true) require.NoError(t, err) - storage = WithCompression(storage, Gzip) - fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt.gz" - writer, err := storage.Create(ctx, fileName) - require.NoError(t, err) - for _, str := range test.content { - p := []byte(str) - written, err2 := writer.Write(ctx, p) - require.Nil(t, err2) - require.Len(t, p, written) + storage = WithCompression(storage, test.compressType) + var suffix string + if test.compressType == Gzip { + suffix = ".txt.gz" } - err = writer.Close(ctx) - require.NoError(t, err) - - // make sure compressed file is written correctly - file, err := os.Open(filepath.Join(dir, fileName)) - require.NoError(t, err) - r, err := newCompressReader(test.compressType, file) - require.NoError(t, err) - var bf bytes.Buffer - _, err = bf.ReadFrom(r) - require.NoError(t, err) - require.Equal(t, strings.Join(test.content, ""), bf.String()) - - // test withCompression Open - r, err = storage.Open(ctx, fileName) - require.NoError(t, err) - content, err := io.ReadAll(r) - require.NoError(t, err) - require.Equal(t, strings.Join(test.content, ""), string(content)) - - require.Nil(t, file.Close()) - } - compressTypeGzipArr := []CompressType{Gzip} - - testFnSnappy := func(test *testcase, t *testing.T) { - t.Log(test.name) - backend, err := ParseBackend("local://"+filepath.ToSlash(dir), nil) - require.NoError(t, err) - ctx := context.Background() - storage, err := Create(ctx, backend, true) - require.NoError(t, err) - storage = WithCompression(storage, Snappy) - fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt.snappy" + if test.compressType == Snappy { + suffix = ".txt.snappy" + } + fileName := strings.ReplaceAll(test.name, " ", "-") + suffix writer, err := storage.Create(ctx, fileName) require.NoError(t, err) for _, str := range test.content { @@ -175,7 +141,7 @@ func TestCompressReaderWriter(t *testing.T) { require.Nil(t, file.Close()) } - compressTypeSnappyArr := []CompressType{Snappy} + compressTypeArr := []CompressType{Gzip, Snappy} tests := []testcase{ { @@ -202,13 +168,9 @@ func TestCompressReaderWriter(t *testing.T) { }, } for i := range tests { - for _, compressType := range compressTypeGzipArr { - tests[i].compressType = compressType - testFnGzip(&tests[i], t) - } - for _, compressType := range compressTypeSnappyArr { + for _, compressType := range compressTypeArr { tests[i].compressType = compressType - testFnSnappy(&tests[i], t) + testFn(&tests[i], t) } } } From 41d226569ccb2ada77c98d9bccc9218f930b04ae Mon Sep 17 00:00:00 2001 From: lyzx2001 Date: Wed, 26 Oct 2022 17:18:55 +0800 Subject: [PATCH 4/9] compress reader/writer support reading/writing snappy type compressed files --- br/pkg/lightning/restore/restore.go | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index ecedf90e3c10b..ba8faac2996a3 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2140,7 +2140,6 @@ func newChunkRestore( if chunk.FileMeta.Type == mydump.SourceTypeParquet { reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize) } else { - storage.WithCompression(store, storage.LZO).Open(ctx, chunk.FileMeta.Path) reader, err = store.Open(ctx, chunk.FileMeta.Path) } if err != nil { From ce536b879dfcf0c82c5d9efca9329c56d91911fc Mon Sep 17 00:00:00 2001 From: lyzx2001 Date: Wed, 26 Oct 2022 18:26:00 +0800 Subject: [PATCH 5/9] compress reader/writer support reading/writing snappy type compressed files --- br/pkg/storage/writer_test.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index 2418b53367c77..fddb37c46df16 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -14,6 +14,16 @@ import ( "github.com/stretchr/testify/require" ) +func createSuffixString(compressType CompressType) string { + if compressType == Gzip { + return ".txt.gz" + } + if compressType == Snappy { + return ".txt.snappy" + } + return "" +} + func TestExternalFileWriter(t *testing.T) { dir := t.TempDir() @@ -103,13 +113,7 @@ func TestCompressReaderWriter(t *testing.T) { storage, err := Create(ctx, backend, true) require.NoError(t, err) storage = WithCompression(storage, test.compressType) - var suffix string - if test.compressType == Gzip { - suffix = ".txt.gz" - } - if test.compressType == Snappy { - suffix = ".txt.snappy" - } + suffix := createSuffixString(test.compressType) fileName := strings.ReplaceAll(test.name, " ", "-") + suffix writer, err := storage.Create(ctx, fileName) require.NoError(t, err) From bb7fdae58310d94157b62f7a5316ec3854afc32c Mon Sep 17 00:00:00 2001 From: lyzx2001 Date: Wed, 26 Oct 2022 19:13:54 +0800 Subject: [PATCH 6/9] compress reader/writer support reading/writing snappy type compressed files --- br/pkg/storage/writer.go | 10 ++++++++++ br/pkg/storage/writer_test.go | 10 ---------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/br/pkg/storage/writer.go b/br/pkg/storage/writer.go index 6c74c4109ff5a..98a970a170026 100644 --- a/br/pkg/storage/writer.go +++ b/br/pkg/storage/writer.go @@ -44,6 +44,16 @@ type interceptBuffer interface { Compressed() bool } +func createSuffixString(compressType CompressType) string { + if compressType == Gzip { + return ".txt.gz" + } + if compressType == Snappy { + return ".txt.snappy" + } + return "" +} + func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer { if compressType == NoCompression { return newNoCompressionBuffer(chunkSize) diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index fddb37c46df16..ab3320f4dc8f2 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -14,16 +14,6 @@ import ( "github.com/stretchr/testify/require" ) -func createSuffixString(compressType CompressType) string { - if compressType == Gzip { - return ".txt.gz" - } - if compressType == Snappy { - return ".txt.snappy" - } - return "" -} - func TestExternalFileWriter(t *testing.T) { dir := t.TempDir() From f5ef657f15a72f500540f2ae32ce5a748a1625df Mon Sep 17 00:00:00 2001 From: lyzx2001 Date: Fri, 28 Oct 2022 11:56:26 +0800 Subject: [PATCH 7/9] compress reader/writer support reading/writing zstd type compressed files --- br/pkg/storage/writer.go | 16 ++++++++++++++++ br/pkg/storage/writer_test.go | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/br/pkg/storage/writer.go b/br/pkg/storage/writer.go index 98a970a170026..600c59ff214cb 100644 --- a/br/pkg/storage/writer.go +++ b/br/pkg/storage/writer.go @@ -7,7 +7,10 @@ import ( "io" "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" "github.com/pingcap/errors" + "github.com/pingcap/log" + "go.uber.org/zap" ) // CompressType represents the type of compression. @@ -22,6 +25,8 @@ const ( LZO // Snappy will compress given bytes in snappy format. Snappy + // Zstd will compress given bytes in zstd format. + Zstd ) type flusher interface { @@ -51,6 +56,9 @@ func createSuffixString(compressType CompressType) string { if compressType == Snappy { return ".txt.snappy" } + if compressType == Zstd { + return ".txt.zst" + } return "" } @@ -67,6 +75,12 @@ func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWri return gzip.NewWriter(w) case Snappy: return snappy.NewBufferedWriter(w) + case Zstd: + newWriter, err := zstd.NewWriter(w) + if err != nil { + log.Warn("Met error when creating new writer for Zstd file", zap.Error(err)) + } + return newWriter default: return nil } @@ -78,6 +92,8 @@ func newCompressReader(compressType CompressType, r io.Reader) (io.Reader, error return gzip.NewReader(r) case Snappy: return snappy.NewReader(r), nil + case Zstd: + return zstd.NewReader(r) default: return nil, nil } diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index ab3320f4dc8f2..22fa87d34de47 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -135,7 +135,7 @@ func TestCompressReaderWriter(t *testing.T) { require.Nil(t, file.Close()) } - compressTypeArr := []CompressType{Gzip, Snappy} + compressTypeArr := []CompressType{Gzip, Snappy, Zstd} tests := []testcase{ { From b60513c5d52c48a656182acce6c297c281c2138f Mon Sep 17 00:00:00 2001 From: lyzx2001 Date: Fri, 28 Oct 2022 11:56:56 +0800 Subject: [PATCH 8/9] compress reader/writer support reading/writing zstd type compressed files --- br/pkg/storage/writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/storage/writer.go b/br/pkg/storage/writer.go index 600c59ff214cb..f28515f3c37e2 100644 --- a/br/pkg/storage/writer.go +++ b/br/pkg/storage/writer.go @@ -78,7 +78,7 @@ func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWri case Zstd: newWriter, err := zstd.NewWriter(w) if err != nil { - log.Warn("Met error when creating new writer for Zstd file", zap.Error(err)) + log.Warn("Met error when creating new writer for Zstd type file", zap.Error(err)) } return newWriter default: From 215831a325b1262c81aac473a385e32cf10eccd6 Mon Sep 17 00:00:00 2001 From: lyzx2001 Date: Fri, 28 Oct 2022 18:46:45 +0800 Subject: [PATCH 9/9] compress reader/writer support reading/writing snappy/zstd type compressed files --- br/pkg/storage/writer.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/br/pkg/storage/writer.go b/br/pkg/storage/writer.go index f28515f3c37e2..72d0e6dc61f4f 100644 --- a/br/pkg/storage/writer.go +++ b/br/pkg/storage/writer.go @@ -21,8 +21,6 @@ const ( NoCompression CompressType = iota // Gzip will compress given bytes in gzip format. Gzip - // LZO will compress given bytes in lzo format. - LZO // Snappy will compress given bytes in snappy format. Snappy // Zstd will compress given bytes in zstd format.