Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lightning: cherry-pick some PRs (#1458) (#1460)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 8, 2022
1 parent db72d76 commit 0ef10e6
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 7 deletions.
14 changes: 12 additions & 2 deletions pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ import (
"github.com/pingcap/br/pkg/storage"
)

const tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
const (
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
// the increment ratio of large CSV file size threshold by `region-split-size`
largeCSVLowerThresholdRation = 10
)

type TableRegion struct {
EngineID int32
Expand Down Expand Up @@ -260,7 +264,10 @@ func makeSourceFileRegion(
}
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat {
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
return regions, subFileSizes, err
}
Expand Down Expand Up @@ -351,6 +358,9 @@ func SplitLargeFile(
columns = parser.Columns()
startOffset, _ = parser.Pos()
endOffset = startOffset + maxRegionSize
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
}
for {
curRowsCnt := (endOffset - startOffset) / divisor
Expand Down
56 changes: 56 additions & 0 deletions pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,59 @@ func (s *testMydumpRegionSuite) TestSplitLargeFileNoNewLine(c *C) {
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}

func (s *testMydumpRegionSuite) TestSplitLargeFileOnlyOneChunk(c *C) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 15,
},
}

dir := c.MkDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("field1,field2\r\n123,456\r\n")
err := os.WriteFile(filePath, content, 0o644)
c.Assert(err, IsNil)

dataFileInfo, err := os.Stat(filePath)
c.Assert(err, IsNil)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := int64(2)
columns := []string{"field1", "field2"}
prevRowIdxMax := int64(0)
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

offsets := [][]int64{{14, 24}}

_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
c.Assert(err, IsNil)
c.Assert(regions, HasLen, len(offsets))
for i := range offsets {
c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0])
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}
2 changes: 0 additions & 2 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ var (
"max-merge-region-size": 20,
"leader-schedule-limit": 4,
"region-schedule-limit": 2048,
"max-snapshot-count": 3,
"enable-location-replacement": "true",
"max-pending-peer-count": 16,
}
)

Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/logutil"
)

const (
Expand Down Expand Up @@ -655,6 +656,19 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {

if realOffset == r.pos {
return realOffset, nil
} else if realOffset >= r.rangeInfo.Size {
// See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// because s3's GetObject interface doesn't allow get a range that matches zero length data,
// so if the position is out of range, we need to always return io.EOF after the seek operation.

// close current read and open a new one which target offset
if err := r.reader.Close(); err != nil {
log.L().Warn("close s3 reader failed, will ignore this error", logutil.ShortError(err))
}

r.reader = io.NopCloser(bytes.NewReader(nil))
r.pos = r.rangeInfo.Size
return r.pos, nil
}

// if seek ahead no more than 64k, we discard these data
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,15 @@ func (s *s3Suite) TestOpenSeek(c *C) {
c.Assert(err, IsNil)
c.Assert(n, Equals, 100)
c.Assert(slice, DeepEquals, someRandomBytes[990100:990200])

// test seek to the file end or bigger positions
for _, p := range []int64{1000000, 1000001, 2000000} {
offset, err = reader.Seek(p, io.SeekStart)
c.Assert(offset, Equals, int64(1000000))
c.Assert(err, IsNil)
_, err := reader.Read(slice)
c.Assert(err, Equals, io.EOF)
}
}

type limitedBytesReader struct {
Expand Down
3 changes: 3 additions & 0 deletions tests/br_log_restore/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# skip log restore test because ticdc doesn't support this feature anymore.
exit 0

set -eux
DB="$TEST_NAME"
TABLE="usertable"
Expand Down
4 changes: 1 addition & 3 deletions tests/br_other/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ default_pd_values='{
"max-merge-region-keys": 200000,
"max-merge-region-size": 20,
"leader-schedule-limit": 4,
"region-schedule-limit": 2048,
"max-snapshot-count": 3,
"max-pending-peer-count": 16
"region-schedule-limit": 2048
}'

for key in $(echo $default_pd_values | jq 'keys[]'); do
Expand Down

0 comments on commit 0ef10e6

Please sign in to comment.