diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 85007f07510c7..4a3ce247a43a9 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -30,7 +30,11 @@ import ( "go.uber.org/zap" ) -const tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 +const ( + tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 + // the increment ratio of large CSV file size threshold by `region-split-size` + largeCSVLowerThresholdRation = 10 +) type TableRegion struct { EngineID int32 @@ -268,7 +272,10 @@ func makeSourceFileRegion( } // If a csv file is overlarge, we need to split it into multiple regions. // Note: We can only split a csv file whose format is strict. - if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat { + // We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools + // like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can + // avoid split a lot of small chunks. + if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) { _, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store) return regions, subFileSizes, err } @@ -362,6 +369,9 @@ func SplitLargeFile( columns = parser.Columns() startOffset, _ = parser.Pos() endOffset = startOffset + maxRegionSize + if endOffset > dataFile.FileMeta.FileSize { + endOffset = dataFile.FileMeta.FileSize + } } for { curRowsCnt := (endOffset - startOffset) / divisor diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 657781c92c996..6ee26692d4e8c 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -331,3 +331,59 @@ func (s *testMydumpRegionSuite) TestSplitLargeFileWithCustomTerminator(c *C) { c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1]) } } + +func (s *testMydumpRegionSuite) TestSplitLargeFileOnlyOneChunk(c *C) { + meta := &MDTableMeta{ + DB: "csv", + Name: "large_csv_file", + } + cfg := &config.Config{ + Mydumper: config.MydumperRuntime{ + ReadBlockSize: config.ReadBlockSize, + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: "", + Header: true, + TrimLastSep: false, + NotNull: false, + Null: "NULL", + BackslashEscape: true, + }, + StrictFormat: true, + Filter: []string{"*.*"}, + MaxRegionSize: 15, + }, + } + + dir := c.MkDir() + + fileName := "test.csv" + filePath := filepath.Join(dir, fileName) + + content := []byte("field1,field2\r\n123,456\r\n") + err := os.WriteFile(filePath, content, 0o644) + c.Assert(err, IsNil) + + dataFileInfo, err := os.Stat(filePath) + c.Assert(err, IsNil) + fileSize := dataFileInfo.Size() + fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}} + colCnt := int64(2) + columns := []string{"field1", "field2"} + prevRowIdxMax := int64(0) + ioWorker := worker.NewPool(context.Background(), 4, "io") + + store, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + + offsets := [][]int64{{14, 24}} + + _, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store) + c.Assert(err, IsNil) + c.Assert(regions, HasLen, len(offsets)) + for i := range offsets { + c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0]) + c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1]) + c.Assert(regions[i].Chunk.Columns, DeepEquals, columns) + } +} diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 8fe1b6b3ae24c..2c07b5af2cad0 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -666,6 +666,19 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { if realOffset == r.pos { return realOffset, nil + } else if realOffset >= r.rangeInfo.Size { + // See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 + // because s3's GetObject interface doesn't allow get a range that matches zero length data, + // so if the position is out of range, we need to always return io.EOF after the seek operation. + + // close current read and open a new one which target offset + if err := r.reader.Close(); err != nil { + log.L().Warn("close s3 reader failed, will ignore this error", logutil.ShortError(err)) + } + + r.reader = io.NopCloser(bytes.NewReader(nil)) + r.pos = r.rangeInfo.Size + return r.pos, nil } // if seek ahead no more than 64k, we discard these data diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index f1a4e42221afa..413f5e8881da1 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -740,6 +740,15 @@ func (s *s3Suite) TestOpenSeek(c *C) { c.Assert(err, IsNil) c.Assert(n, Equals, 100) c.Assert(slice, DeepEquals, someRandomBytes[990100:990200]) + + // test seek to the file end or bigger positions + for _, p := range []int64{1000000, 1000001, 2000000} { + offset, err = reader.Seek(p, io.SeekStart) + c.Assert(offset, Equals, int64(1000000)) + c.Assert(err, IsNil) + _, err := reader.Read(slice) + c.Assert(err, Equals, io.EOF) + } } type limitedBytesReader struct {