diff --git a/br/pkg/lightning/mydump/BUILD.bazel b/br/pkg/lightning/mydump/BUILD.bazel index 349e4acc02b64..0e37e8c7525ea 100644 --- a/br/pkg/lightning/mydump/BUILD.bazel +++ b/br/pkg/lightning/mydump/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//util/table-filter", "//util/zeropool", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_spkg_bom//:bom", "@com_github_xitongsys_parquet_go//parquet", "@com_github_xitongsys_parquet_go//reader", diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index eb6172e0aa8b8..8c4232d828f58 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -22,6 +22,7 @@ import ( "strings" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" @@ -731,6 +732,14 @@ func calculateFileBytes(ctx context.Context, // SampleFileCompressRatio samples the compress ratio of the compressed file. func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) { + failpoint.Inject("SampleFileCompressPercentage", func(val failpoint.Value) { + switch v := val.(type) { + case string: + failpoint.Return(1.0, errors.New(v)) + case int: + failpoint.Return(float64(v)/100, nil) + } + }) if fileMeta.Compression == CompressionNone { return 1, nil } diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index 16cd0ed3c849d..eabf3db1117b7 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -8,6 +8,7 @@ import ( "io" "os" "path/filepath" + "runtime" "strings" "testing" @@ -181,17 +182,23 @@ func TestNewCompressReader(t *testing.T) { require.NoError(t, w.Close()) compressedData := buf.Bytes() - // default cfg + // default cfg(decode asynchronously) + prevRoutineCnt := runtime.NumGoroutine() r, err := newCompressReader(Zstd, DecompressConfig{}, bytes.NewReader(compressedData)) + currRoutineCnt := runtime.NumGoroutine() require.NoError(t, err) + require.Greater(t, currRoutineCnt, prevRoutineCnt) allData, err := io.ReadAll(r) require.NoError(t, err) require.Equal(t, "data", string(allData)) // sync decode + prevRoutineCnt = runtime.NumGoroutine() config := DecompressConfig{ZStdDecodeConcurrency: 1} r, err = newCompressReader(Zstd, config, bytes.NewReader(compressedData)) require.NoError(t, err) + currRoutineCnt = runtime.NumGoroutine() + require.Equal(t, prevRoutineCnt, currRoutineCnt) allData, err = io.ReadAll(r) require.NoError(t, err) require.Equal(t, "data", string(allData)) diff --git a/executor/importer/BUILD.bazel b/executor/importer/BUILD.bazel index 7953ac80caecf..60bd753569e87 100644 --- a/executor/importer/BUILD.bazel +++ b/executor/importer/BUILD.bazel @@ -84,10 +84,11 @@ go_test( embed = [":importer"], flaky = True, race = "on", - shard_count = 13, + shard_count = 14, deps = [ "//br/pkg/errors", "//br/pkg/lightning/config", + "//br/pkg/lightning/mydump", "//br/pkg/streamhelper", "//br/pkg/utils", "//config", @@ -106,6 +107,7 @@ go_test( "//util/sqlexec", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//util", "@io_etcd_go_etcd_client_v3//:client", diff --git a/executor/importer/import.go b/executor/importer/import.go index 4d0ba9f43eb92..0e4e1ba348f76 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -920,15 +920,14 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "failed to read file size by seek") } compressTp := mydump.ParseCompressionOnFileExtension(fileNameKey) - dataFiles = append(dataFiles, &mydump.SourceFileMeta{ + fileMeta := mydump.SourceFileMeta{ Path: fileNameKey, FileSize: size, Compression: compressTp, Type: sourceType, - // todo: if we support compression for physical mode, should set it to size * compressRatio to better split - // engines - RealSize: size, - }) + } + fileMeta.RealSize = e.getFileRealSize(ctx, fileMeta, s) + dataFiles = append(dataFiles, &fileMeta) totalSize = size } else { var commonPrefix string @@ -950,13 +949,14 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { return nil } compressTp := mydump.ParseCompressionOnFileExtension(remotePath) - dataFiles = append(dataFiles, &mydump.SourceFileMeta{ + fileMeta := mydump.SourceFileMeta{ Path: remotePath, FileSize: size, Compression: compressTp, Type: sourceType, - RealSize: size, - }) + } + fileMeta.RealSize = e.getFileRealSize(ctx, fileMeta, s) + dataFiles = append(dataFiles, &fileMeta) totalSize += size return nil }) @@ -971,6 +971,19 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { return nil } +func (e *LoadDataController) getFileRealSize(ctx context.Context, + fileMeta mydump.SourceFileMeta, store storage.ExternalStorage) int64 { + if fileMeta.Compression == mydump.CompressionNone { + return fileMeta.FileSize + } + compressRatio, err := mydump.SampleFileCompressRatio(ctx, fileMeta, store) + if err != nil { + e.logger.Warn("failed to get compress ratio", zap.String("file", fileMeta.Path), zap.Error(err)) + return fileMeta.FileSize + } + return int64(compressRatio * float64(fileMeta.FileSize)) +} + func (e *LoadDataController) getSourceType() mydump.SourceType { switch e.Format { case DataFormatParquet: diff --git a/executor/importer/import_test.go b/executor/importer/import_test.go index 7bc1a581b684c..62d3fd07f6eec 100644 --- a/executor/importer/import_test.go +++ b/executor/importer/import_test.go @@ -15,14 +15,17 @@ package importer import ( + "context" "fmt" "runtime" "testing" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" @@ -157,3 +160,19 @@ func TestASTArgsFromStmt(t *testing.T) { require.Equal(t, astArgs.ColumnAssignments, importIntoStmt.ColumnAssignments) require.Equal(t, astArgs.ColumnsAndUserVars, importIntoStmt.ColumnsAndUserVars) } + +func TestGetFileRealSize(t *testing.T) { + err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/mydump/SampleFileCompressPercentage", "return(250)") + require.NoError(t, err) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/mydump/SampleFileCompressPercentage") + }() + fileMeta := mydump.SourceFileMeta{Compression: mydump.CompressionNone, FileSize: 100} + c := &LoadDataController{logger: log.L()} + require.Equal(t, int64(100), c.getFileRealSize(context.Background(), fileMeta, nil)) + fileMeta.Compression = mydump.CompressionGZ + require.Equal(t, int64(250), c.getFileRealSize(context.Background(), fileMeta, nil)) + err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/mydump/SampleFileCompressPercentage", `return("test err")`) + require.NoError(t, err) + require.Equal(t, int64(100), c.getFileRealSize(context.Background(), fileMeta, nil)) +} diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index d486a7482f94f..e393d471a234d 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -255,7 +255,9 @@ type TableImporter struct { func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) { info := LoadDataReaderInfo{ Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { - reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore, storage.DecompressConfig{}) + reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore, storage.DecompressConfig{ + ZStdDecodeConcurrency: 1, + }) if err != nil { return nil, errors.Trace(err) } diff --git a/tests/realtikvtest/importintotest/import_into_test.go b/tests/realtikvtest/importintotest/import_into_test.go index c08450a4d6d57..b746eca539b7c 100644 --- a/tests/realtikvtest/importintotest/import_into_test.go +++ b/tests/realtikvtest/importintotest/import_into_test.go @@ -15,8 +15,6 @@ package importintotest import ( - "bytes" - "compress/gzip" "context" "fmt" "net" @@ -443,60 +441,6 @@ func (s *mockGCSSuite) TestMultiValueIndex() { )) } -func (s *mockGCSSuite) TestMixedCompression() { - s.tk.MustExec("DROP DATABASE IF EXISTS multi_load;") - s.tk.MustExec("CREATE DATABASE multi_load;") - s.tk.MustExec("CREATE TABLE multi_load.t (i INT PRIMARY KEY, s varchar(32));") - - // gzip content - var buf bytes.Buffer - w := gzip.NewWriter(&buf) - _, err := w.Write([]byte(`1,test1 -2,test2 -3,test3 -4,test4`)) - require.NoError(s.T(), err) - err = w.Close() - require.NoError(s.T(), err) - - s.server.CreateObject(fakestorage.Object{ - ObjectAttrs: fakestorage.ObjectAttrs{ - BucketName: "test-multi-load", - Name: "compress.001.tsv.gz", - }, - Content: buf.Bytes(), - }) - s.server.CreateObject(fakestorage.Object{ - ObjectAttrs: fakestorage.ObjectAttrs{ - BucketName: "test-multi-load", - Name: "compress.002.tsv", - }, - Content: []byte(`5,test5 -6,test6 -7,test7 -8,test8 -9,test9`), - }) - - sql := fmt.Sprintf(`IMPORT INTO multi_load.t FROM 'gs://test-multi-load/compress.*?endpoint=%s' - WITH thread=1;`, gcsEndpoint) - s.tk.MustQuery(sql) - s.tk.MustQuery("SELECT * FROM multi_load.t;").Check(testkit.Rows( - "1 test1", "2 test2", "3 test3", "4 test4", - "5 test5", "6 test6", "7 test7", "8 test8", "9 test9", - )) - - // with ignore N rows - s.tk.MustExec("truncate table multi_load.t") - sql = fmt.Sprintf(`IMPORT INTO multi_load.t FROM 'gs://test-multi-load/compress.*?endpoint=%s' - WITH skip_rows=3, thread=1;`, gcsEndpoint) - s.tk.MustQuery(sql) - s.tk.MustQuery("SELECT * FROM multi_load.t;").Check(testkit.Rows( - "4 test4", - "8 test8", "9 test9", - )) -} - func (s *mockGCSSuite) TestLoadSQLDump() { s.tk.MustExec("DROP DATABASE IF EXISTS load_csv;") s.tk.MustExec("CREATE DATABASE load_csv;") diff --git a/tests/realtikvtest/importintotest3/BUILD.bazel b/tests/realtikvtest/importintotest3/BUILD.bazel index c7f783d02987e..fe21f916e73cc 100644 --- a/tests/realtikvtest/importintotest3/BUILD.bazel +++ b/tests/realtikvtest/importintotest3/BUILD.bazel @@ -4,17 +4,20 @@ go_test( name = "importintotest3_test", timeout = "moderate", srcs = [ - "dummy_test.go", + "file_compression_test.go", "main_test.go", ], flaky = True, race = "on", deps = [ + "//br/pkg/lightning/mydump", "//config", "//kv", "//testkit", "//tests/realtikvtest", "@com_github_fsouza_fake_gcs_server//fakestorage", + "@com_github_golang_snappy//:snappy", + "@com_github_klauspost_compress//zstd", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", diff --git a/tests/realtikvtest/importintotest3/dummy_test.go b/tests/realtikvtest/importintotest3/dummy_test.go deleted file mode 100644 index de58a6c9a8323..0000000000000 --- a/tests/realtikvtest/importintotest3/dummy_test.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importintotest - -func (s *mockGCSSuite) TestDummy() { - s.True(true, gcsEndpoint) -} diff --git a/tests/realtikvtest/importintotest3/file_compression_test.go b/tests/realtikvtest/importintotest3/file_compression_test.go new file mode 100644 index 0000000000000..9a64b3b40cfdb --- /dev/null +++ b/tests/realtikvtest/importintotest3/file_compression_test.go @@ -0,0 +1,124 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importintotest + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + + "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/testkit" +) + +func (s *mockGCSSuite) getCompressedData(compression mydump.Compression, data []byte) []byte { + var buf bytes.Buffer + var w io.WriteCloser + switch compression { + case mydump.CompressionGZ: + w = gzip.NewWriter(&buf) + case mydump.CompressionZStd: + var err error + w, err = zstd.NewWriter(&buf) + s.NoError(err) + case mydump.CompressionSnappy: + w = snappy.NewBufferedWriter(&buf) + default: + panic(fmt.Sprintf("unknown compression type: %d", compression)) + } + _, err := w.Write(data) + s.NoError(err) + s.NoError(w.Close()) + compressedData := buf.Bytes() + s.NotEqual(data, compressedData) + return compressedData +} + +func (s *mockGCSSuite) TestGzipAndMixedCompression() { + s.prepareAndUseDB("gzip") + s.tk.MustExec("CREATE TABLE gzip.t (i INT PRIMARY KEY, s varchar(32));") + + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "gzip", Name: "compress.001.csv.gz"}, + Content: s.getCompressedData(mydump.CompressionGZ, []byte("1,test1\n2,test2")), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "gzip", Name: "compress.001.csv.gzip"}, + Content: s.getCompressedData(mydump.CompressionGZ, []byte("3,test3\n4,test4")), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "gzip", Name: "compress.002.csv"}, + Content: []byte("5,test5\n6,test6\n7,test7\n8,test8\n9,test9"), + }) + + sql := fmt.Sprintf(`IMPORT INTO gzip.t FROM 'gs://gzip/compress.*?endpoint=%s' + WITH thread=1;`, gcsEndpoint) + s.tk.MustQuery(sql) + s.tk.MustQuery("SELECT * FROM gzip.t;").Check(testkit.Rows( + "1 test1", "2 test2", "3 test3", "4 test4", + "5 test5", "6 test6", "7 test7", "8 test8", "9 test9", + )) + + // with ignore N rows + s.tk.MustExec("truncate table gzip.t") + sql = fmt.Sprintf(`IMPORT INTO gzip.t FROM 'gs://gzip/compress.*?endpoint=%s' + WITH skip_rows=1, thread=1;`, gcsEndpoint) + s.tk.MustQuery(sql) + s.tk.MustQuery("SELECT * FROM gzip.t;").Check(testkit.Rows( + "2 test2", "4 test4", "6 test6", "7 test7", "8 test8", "9 test9", + )) +} + +func (s *mockGCSSuite) TestZStd() { + s.prepareAndUseDB("zstd") + s.tk.MustExec("CREATE TABLE zstd.t (i INT PRIMARY KEY, s varchar(32));") + + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "zstd", Name: "t.01.csv.zst"}, + Content: s.getCompressedData(mydump.CompressionZStd, []byte("1,test1\n2,test2")), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "zstd", Name: "t.02.csv.zstd"}, + Content: s.getCompressedData(mydump.CompressionZStd, []byte("3,test3\n4,test4")), + }) + + sql := fmt.Sprintf(`IMPORT INTO zstd.t FROM 'gs://zstd/t.*?endpoint=%s' + WITH thread=1;`, gcsEndpoint) + s.tk.MustQuery(sql) + s.tk.MustQuery("SELECT * FROM zstd.t;").Check(testkit.Rows( + "1 test1", "2 test2", "3 test3", "4 test4", + )) +} + +func (s *mockGCSSuite) TestSnappy() { + s.prepareAndUseDB("snappy") + s.tk.MustExec("CREATE TABLE snappy.t (i INT PRIMARY KEY, s varchar(32));") + + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "snappy", Name: "t.01.csv.snappy"}, + Content: s.getCompressedData(mydump.CompressionSnappy, []byte("1,test1\n2,test2")), + }) + + sql := fmt.Sprintf(`IMPORT INTO snappy.t FROM 'gs://snappy/t.*?endpoint=%s' + WITH thread=1;`, gcsEndpoint) + s.tk.MustQuery(sql) + s.tk.MustQuery("SELECT * FROM snappy.t;").Check(testkit.Rows( + "1 test1", "2 test2", + )) +} diff --git a/tests/realtikvtest/importintotest3/main_test.go b/tests/realtikvtest/importintotest3/main_test.go index 07c466b298680..c5557eff4f19d 100644 --- a/tests/realtikvtest/importintotest3/main_test.go +++ b/tests/realtikvtest/importintotest3/main_test.go @@ -81,6 +81,12 @@ func (s *mockGCSSuite) cleanupSysTables() { s.tk.MustExec("delete from mysql.tidb_background_subtask") } +func (s *mockGCSSuite) prepareAndUseDB(db string) { + s.tk.MustExec("drop database if exists " + db) + s.tk.MustExec("create database " + db) + s.tk.MustExec("use " + db) +} + func init() { // need a real PD config.UpdateGlobal(func(conf *config.Config) {