diff --git a/br/pkg/storage/compress.go b/br/pkg/storage/compress.go index 96258221d9b62..1d5300cfa8d55 100644 --- a/br/pkg/storage/compress.go +++ b/br/pkg/storage/compress.go @@ -81,7 +81,8 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er } type compressReader struct { - io.ReadCloser + io.Reader + io.Closer } // nolint:interfacer @@ -94,7 +95,8 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType return nil, errors.Trace(err) } return &compressReader{ - ReadCloser: r, + Reader: r, + Closer: fileReader, }, nil } @@ -102,6 +104,11 @@ func (*compressReader) Seek(_ int64, _ int) (int64, error) { return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now") } +func (c *compressReader) Close() error { + err := c.Closer.Close() + return err +} + type flushStorageWriter struct { writer io.Writer flusher flusher diff --git a/br/pkg/storage/writer.go b/br/pkg/storage/writer.go index 455cc9c3c3411..72d0e6dc61f4f 100644 --- a/br/pkg/storage/writer.go +++ b/br/pkg/storage/writer.go @@ -6,7 +6,11 @@ import ( "context" "io" + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" "github.com/pingcap/errors" + "github.com/pingcap/log" + "go.uber.org/zap" ) // CompressType represents the type of compression. @@ -17,6 +21,10 @@ const ( NoCompression CompressType = iota // Gzip will compress given bytes in gzip format. Gzip + // Snappy will compress given bytes in snappy format. + Snappy + // Zstd will compress given bytes in zstd format. + Zstd ) type flusher interface { @@ -39,6 +47,19 @@ type interceptBuffer interface { Compressed() bool } +func createSuffixString(compressType CompressType) string { + if compressType == Gzip { + return ".txt.gz" + } + if compressType == Snappy { + return ".txt.snappy" + } + if compressType == Zstd { + return ".txt.zst" + } + return "" +} + func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer { if compressType == NoCompression { return newNoCompressionBuffer(chunkSize) @@ -50,15 +71,27 @@ func newCompressWriter(compressType CompressType, w io.Writer) simpleCompressWri switch compressType { case Gzip: return gzip.NewWriter(w) + case Snappy: + return snappy.NewBufferedWriter(w) + case Zstd: + newWriter, err := zstd.NewWriter(w) + if err != nil { + log.Warn("Met error when creating new writer for Zstd type file", zap.Error(err)) + } + return newWriter default: return nil } } -func newCompressReader(compressType CompressType, r io.Reader) (io.ReadCloser, error) { +func newCompressReader(compressType CompressType, r io.Reader) (io.Reader, error) { switch compressType { case Gzip: return gzip.NewReader(r) + case Snappy: + return snappy.NewReader(r), nil + case Zstd: + return zstd.NewReader(r) default: return nil, nil } diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index c3d4080123f4f..22fa87d34de47 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -102,8 +102,9 @@ func TestCompressReaderWriter(t *testing.T) { ctx := context.Background() storage, err := Create(ctx, backend, true) require.NoError(t, err) - storage = WithCompression(storage, Gzip) - fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt.gz" + storage = WithCompression(storage, test.compressType) + suffix := createSuffixString(test.compressType) + fileName := strings.ReplaceAll(test.name, " ", "-") + suffix writer, err := storage.Create(ctx, fileName) require.NoError(t, err) for _, str := range test.content { @@ -124,7 +125,6 @@ func TestCompressReaderWriter(t *testing.T) { _, err = bf.ReadFrom(r) require.NoError(t, err) require.Equal(t, strings.Join(test.content, ""), bf.String()) - require.Nil(t, r.Close()) // test withCompression Open r, err = storage.Open(ctx, fileName) @@ -135,7 +135,8 @@ func TestCompressReaderWriter(t *testing.T) { require.Nil(t, file.Close()) } - compressTypeArr := []CompressType{Gzip} + compressTypeArr := []CompressType{Gzip, Snappy, Zstd} + tests := []testcase{ { name: "long text medium chunks",