From 0ef10e6d370a84b82a34d205428f0283290cdf36 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 8 Feb 2022 14:01:36 +0800 Subject: [PATCH] lightning: cherry-pick some PRs (#1458) (#1460) --- pkg/lightning/mydump/region.go | 14 ++++++-- pkg/lightning/mydump/region_test.go | 56 +++++++++++++++++++++++++++++ pkg/pdutil/pd.go | 2 -- pkg/storage/s3.go | 14 ++++++++ pkg/storage/s3_test.go | 9 +++++ tests/br_log_restore/run.sh | 3 ++ tests/br_other/run.sh | 4 +-- 7 files changed, 95 insertions(+), 7 deletions(-) diff --git a/pkg/lightning/mydump/region.go b/pkg/lightning/mydump/region.go index a4402bf8a..a004159f9 100644 --- a/pkg/lightning/mydump/region.go +++ b/pkg/lightning/mydump/region.go @@ -31,7 +31,11 @@ import ( "github.com/pingcap/br/pkg/storage" ) -const tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 +const ( + tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 + // the increment ratio of large CSV file size threshold by `region-split-size` + largeCSVLowerThresholdRation = 10 +) type TableRegion struct { EngineID int32 @@ -260,7 +264,10 @@ func makeSourceFileRegion( } // If a csv file is overlarge, we need to split it into multiple regions. // Note: We can only split a csv file whose format is strict. - if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat { + // We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools + // like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can + // avoid split a lot of small chunks. + if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) { _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store) return regions, subFileSizes, err } @@ -351,6 +358,9 @@ func SplitLargeFile( columns = parser.Columns() startOffset, _ = parser.Pos() endOffset = startOffset + maxRegionSize + if endOffset > dataFile.FileMeta.FileSize { + endOffset = dataFile.FileMeta.FileSize + } } for { curRowsCnt := (endOffset - startOffset) / divisor diff --git a/pkg/lightning/mydump/region_test.go b/pkg/lightning/mydump/region_test.go index 7cbb25840..64e2dd01d 100644 --- a/pkg/lightning/mydump/region_test.go +++ b/pkg/lightning/mydump/region_test.go @@ -292,3 +292,59 @@ func (s *testMydumpRegionSuite) TestSplitLargeFileNoNewLine(c *C) { c.Assert(regions[i].Chunk.Columns, DeepEquals, columns) } } + +func (s *testMydumpRegionSuite) TestSplitLargeFileOnlyOneChunk(c *C) { + meta := &MDTableMeta{ + DB: "csv", + Name: "large_csv_file", + } + cfg := &config.Config{ + Mydumper: config.MydumperRuntime{ + ReadBlockSize: config.ReadBlockSize, + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: "", + Header: true, + TrimLastSep: false, + NotNull: false, + Null: "NULL", + BackslashEscape: true, + }, + StrictFormat: true, + Filter: []string{"*.*"}, + MaxRegionSize: 15, + }, + } + + dir := c.MkDir() + + fileName := "test.csv" + filePath := filepath.Join(dir, fileName) + + content := []byte("field1,field2\r\n123,456\r\n") + err := os.WriteFile(filePath, content, 0o644) + c.Assert(err, IsNil) + + dataFileInfo, err := os.Stat(filePath) + c.Assert(err, IsNil) + fileSize := dataFileInfo.Size() + fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} + colCnt := int64(2) + columns := []string{"field1", "field2"} + prevRowIdxMax := int64(0) + ioWorker := worker.NewPool(context.Background(), 4, "io") + + store, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + + offsets := [][]int64{{14, 24}} + + _, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store) + c.Assert(err, IsNil) + c.Assert(regions, HasLen, len(offsets)) + for i := range offsets { + c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0]) + c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1]) + c.Assert(regions[i].Chunk.Columns, DeepEquals, columns) + } +} diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 63276804b..0d548b0ac 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -123,9 +123,7 @@ var ( "max-merge-region-size": 20, "leader-schedule-limit": 4, "region-schedule-limit": 2048, - "max-snapshot-count": 3, "enable-location-replacement": "true", - "max-pending-peer-count": 16, } ) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 1d82f352b..f4d516157 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -30,6 +30,7 @@ import ( "go.uber.org/zap" berrors "github.com/pingcap/br/pkg/errors" + "github.com/pingcap/br/pkg/logutil" ) const ( @@ -655,6 +656,19 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { if realOffset == r.pos { return realOffset, nil + } else if realOffset >= r.rangeInfo.Size { + // See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 + // because s3's GetObject interface doesn't allow get a range that matches zero length data, + // so if the position is out of range, we need to always return io.EOF after the seek operation. + + // close current read and open a new one which target offset + if err := r.reader.Close(); err != nil { + log.L().Warn("close s3 reader failed, will ignore this error", logutil.ShortError(err)) + } + + r.reader = io.NopCloser(bytes.NewReader(nil)) + r.pos = r.rangeInfo.Size + return r.pos, nil } // if seek ahead no more than 64k, we discard these data diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index b953f972e..5b8064ab1 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -695,6 +695,15 @@ func (s *s3Suite) TestOpenSeek(c *C) { c.Assert(err, IsNil) c.Assert(n, Equals, 100) c.Assert(slice, DeepEquals, someRandomBytes[990100:990200]) + + // test seek to the file end or bigger positions + for _, p := range []int64{1000000, 1000001, 2000000} { + offset, err = reader.Seek(p, io.SeekStart) + c.Assert(offset, Equals, int64(1000000)) + c.Assert(err, IsNil) + _, err := reader.Read(slice) + c.Assert(err, Equals, io.EOF) + } } type limitedBytesReader struct { diff --git a/tests/br_log_restore/run.sh b/tests/br_log_restore/run.sh index e6734ef64..b3b9da581 100755 --- a/tests/br_log_restore/run.sh +++ b/tests/br_log_restore/run.sh @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +# skip log restore test because ticdc doesn't support this feature anymore. +exit 0 + set -eux DB="$TEST_NAME" TABLE="usertable" diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index c4d75eaa5..a25077131 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -134,9 +134,7 @@ default_pd_values='{ "max-merge-region-keys": 200000, "max-merge-region-size": 20, "leader-schedule-limit": 4, - "region-schedule-limit": 2048, - "max-snapshot-count": 3, - "max-pending-peer-count": 16 + "region-schedule-limit": 2048 }' for key in $(echo $default_pd_values | jq 'keys[]'); do