Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: fix lightning split large csv file error and adjust s3 seek result #27769

Merged
merged 36 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
050e33c
fix lightning populate chunks
glorv Sep 2, 2021
c506864
fix comment
glorv Sep 2, 2021
6338289
Merge branch 'master' into fix-chunk
glorv Sep 2, 2021
3a7e535
Merge branch 'master' into fix-chunk
glorv Sep 3, 2021
d87cdb8
resolve comments
glorv Sep 6, 2021
30f32d1
fix seek
glorv Sep 6, 2021
b79e04e
add check for the seek position
glorv Sep 6, 2021
2dc464d
Merge branch 'master' into fix-chunk
glorv Sep 6, 2021
368ced5
Merge branch 'master' into fix-chunk
glorv Sep 6, 2021
429d2ac
Merge branch 'master' into fix-chunk
glorv Sep 6, 2021
3e76255
resolve comments
glorv Sep 6, 2021
d631f2b
Merge branch 'fix-chunk' of ssh://github.com/glorv/tidb into fix-chunk
glorv Sep 6, 2021
77855eb
Merge branch 'master' into fix-chunk
glorv Sep 6, 2021
8441774
fmt
glorv Sep 6, 2021
9878f8f
Merge branch 'master' into fix-chunk
glorv Sep 6, 2021
01c4e53
fix typo
glorv Sep 6, 2021
f78fba4
Merge branch 'master' into fix-chunk
kennytm Sep 6, 2021
22afcc9
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 6, 2021
519ebec
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 6, 2021
7d23699
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 6, 2021
7943a3c
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
88d493e
Merge branch 'master' into fix-chunk
glorv Sep 7, 2021
67b0813
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
32ad3b6
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
cfb9255
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
9db8405
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
00f0fa3
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
5d2392a
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
acb6d0d
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
4e50b52
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
25a157e
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
b247989
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
a1fbc70
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 8, 2021
3360a5e
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 8, 2021
689f411
fix unit test
glorv Sep 8, 2021
69a37a3
Merge branch 'fix-chunk' of ssh://github.com/glorv/tidb into fix-chunk
glorv Sep 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ import (
"go.uber.org/zap"
)

const tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
const (
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
// the increment ratio of large CSV file size threshold by `region-split-size`
largeCSVLowerThresholdRation = 10
)

type TableRegion struct {
EngineID int32
Expand Down Expand Up @@ -268,7 +272,10 @@ func makeSourceFileRegion(
}
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat {
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
return regions, subFileSizes, err
}
Expand Down Expand Up @@ -362,6 +369,9 @@ func SplitLargeFile(
columns = parser.Columns()
startOffset, _ = parser.Pos()
endOffset = startOffset + maxRegionSize
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
}
for {
curRowsCnt := (endOffset - startOffset) / divisor
Expand Down
56 changes: 56 additions & 0 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,59 @@ func (s *testMydumpRegionSuite) TestSplitLargeFileWithCustomTerminator(c *C) {
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
}
}

func (s *testMydumpRegionSuite) TestSplitLargeFileOnlyOneChunk(c *C) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 15,
},
}

dir := c.MkDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("field1,field2\r\n123,456\r\n")
err := os.WriteFile(filePath, content, 0o644)
c.Assert(err, IsNil)

dataFileInfo, err := os.Stat(filePath)
c.Assert(err, IsNil)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := int64(2)
columns := []string{"field1", "field2"}
prevRowIdxMax := int64(0)
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

offsets := [][]int64{{14, 24}}

_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
c.Assert(err, IsNil)
c.Assert(regions, HasLen, len(offsets))
for i := range offsets {
c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0])
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}
13 changes: 13 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,19 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {

if realOffset == r.pos {
return realOffset, nil
} else if realOffset >= r.rangeInfo.Size {
// See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// because s3's GetObject interface doesn't allow get a range that matches zero length data,
// so if the position is out of range, we need to always return io.EOF after the seek operation.

// close current read and open a new one which target offset
if err := r.reader.Close(); err != nil {
log.L().Warn("close s3 reader failed, will ignore this error", logutil.ShortError(err))
}

r.reader = io.NopCloser(bytes.NewReader(nil))
r.pos = r.rangeInfo.Size
return r.pos, nil
}

// if seek ahead no more than 64k, we discard these data
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,15 @@ func (s *s3Suite) TestOpenSeek(c *C) {
c.Assert(err, IsNil)
c.Assert(n, Equals, 100)
c.Assert(slice, DeepEquals, someRandomBytes[990100:990200])

// test seek to the file end or bigger positions
for _, p := range []int64{1000000, 1000001, 2000000} {
offset, err = reader.Seek(p, io.SeekStart)
c.Assert(offset, Equals, int64(1000000))
c.Assert(err, IsNil)
_, err := reader.Read(slice)
c.Assert(err, Equals, io.EOF)
}
}

type limitedBytesReader struct {
Expand Down