Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#1458
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
glorv authored and ti-chi-bot committed Jan 25, 2022
1 parent 0ef8d4c commit 3e281c7
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 7 deletions.
3 changes: 3 additions & 0 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ func (ss *Schemas) BackupSchemas(
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
<<<<<<< HEAD

=======
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
workerPool.ApplyOnErrorGroup(errg, func() error {
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
Expand Down
14 changes: 12 additions & 2 deletions pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ import (
"github.com/pingcap/br/pkg/storage"
)

const tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
const (
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
// the increment ratio of large CSV file size threshold by `region-split-size`
largeCSVLowerThresholdRation = 10
)

type TableRegion struct {
EngineID int32
Expand Down Expand Up @@ -260,7 +264,10 @@ func makeSourceFileRegion(
}
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat {
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
return regions, subFileSizes, err
}
Expand Down Expand Up @@ -351,6 +358,9 @@ func SplitLargeFile(
columns = parser.Columns()
startOffset, _ = parser.Pos()
endOffset = startOffset + maxRegionSize
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
}
for {
curRowsCnt := (endOffset - startOffset) / divisor
Expand Down
108 changes: 108 additions & 0 deletions pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,111 @@ func (s *testMydumpRegionSuite) TestSplitLargeFileNoNewLine(c *C) {
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}
<<<<<<< HEAD
=======

func (s *testMydumpRegionSuite) TestSplitLargeFileWithCustomTerminator(c *C) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_with_custom_terminator",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: "|+|",
Terminator: "|+|\n",
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 1,
},
}

dir := c.MkDir()

fileName := "test2.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("5|+|abc\ndef\nghi|+|6|+|\n7|+|xyz|+|8|+|\n9|+||+|10")
err := os.WriteFile(filePath, content, 0o644)
c.Assert(err, IsNil)

dataFileInfo, err := os.Stat(filePath)
c.Assert(err, IsNil)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := int64(3)
prevRowIdxMax := int64(0)
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

offsets := [][]int64{{0, 23}, {23, 38}, {38, 47}}

_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
c.Assert(err, IsNil)
c.Assert(regions, HasLen, len(offsets))
for i := range offsets {
c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0])
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
}
}

func (s *testMydumpRegionSuite) TestSplitLargeFileOnlyOneChunk(c *C) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 15,
},
}

dir := c.MkDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("field1,field2\r\n123,456\r\n")
err := os.WriteFile(filePath, content, 0o644)
c.Assert(err, IsNil)

dataFileInfo, err := os.Stat(filePath)
c.Assert(err, IsNil)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := int64(2)
columns := []string{"field1", "field2"}
prevRowIdxMax := int64(0)
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

offsets := [][]int64{{14, 24}}

_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
c.Assert(err, IsNil)
c.Assert(regions, HasLen, len(offsets))
for i := range offsets {
c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0])
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
2 changes: 0 additions & 2 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ var (
"max-merge-region-size": 20,
"leader-schedule-limit": 4,
"region-schedule-limit": 2048,
"max-snapshot-count": 3,
"enable-location-replacement": "true",
"max-pending-peer-count": 16,
}
)

Expand Down
33 changes: 33 additions & 0 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,41 @@ func (s *gcsStorage) Open(ctx context.Context, path string) (ExternalFileReader,
// function; the second argument is the size in byte of the file determined
// by path.
func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
<<<<<<< HEAD
// TODO, implement this if needed
panic("Unsupported Operation")
=======
if opt == nil {
opt = &WalkOption{}
}

prefix := path.Join(s.gcs.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

query := &storage.Query{Prefix: prefix}
// only need each object's name and size
query.SetAttrSelection([]string{"Name", "Size"})
iter := s.bucket.Objects(ctx, query)
for {
attrs, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return errors.Trace(err)
}
// when walk on specify directory, the result include storage.Prefix,
// which can not be reuse in other API(Open/Read) directly.
// so we use TrimPrefix to filter Prefix for next Open/Read.
path := strings.TrimPrefix(attrs.Name, s.gcs.Prefix)
if err = fn(path, attrs.Size); err != nil {
return errors.Trace(err)
}
}
return nil
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
}

func (s *gcsStorage) URI() string {
Expand Down
94 changes: 94 additions & 0 deletions pkg/storage/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ package storage

import (
"context"
<<<<<<< HEAD
"io/ioutil"
=======
"fmt"
"io"
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
"os"

"github.com/fsouza/fake-gcs-server/fakestorage"
Expand Down Expand Up @@ -59,6 +64,95 @@ func (r *testStorageSuite) TestGCS(c *C) {
c.Assert(err, IsNil)
c.Assert(exist, IsFalse)

<<<<<<< HEAD
=======
list := ""
var totalSize int64 = 0
err = stg.WalkDir(ctx, nil, func(name string, size int64) error {
list += name
totalSize += size
return nil
})
c.Assert(err, IsNil)
c.Assert(list, Equals, "keykey1key2")
c.Assert(totalSize, Equals, int64(42))

// test 1003 files
totalSize = 0
for i := 0; i < 1000; i += 1 {
err = stg.WriteFile(ctx, fmt.Sprintf("f%d", i), []byte("data"))
c.Assert(err, IsNil)
}
filesSet := make(map[string]struct{}, 1003)
err = stg.WalkDir(ctx, nil, func(name string, size int64) error {
filesSet[name] = struct{}{}
totalSize += size
return nil
})
c.Assert(err, IsNil)
c.Assert(totalSize, Equals, int64(42+4000))
_, ok := filesSet["key"]
c.Assert(ok, IsTrue)
_, ok = filesSet["key1"]
c.Assert(ok, IsTrue)
_, ok = filesSet["key2"]
c.Assert(ok, IsTrue)
for i := 0; i < 1000; i += 1 {
_, ok = filesSet[fmt.Sprintf("f%d", i)]
c.Assert(ok, IsTrue)
}

efr, err := stg.Open(ctx, "key2")
c.Assert(err, IsNil)

p := make([]byte, 10)
n, err := efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 10)
c.Assert(string(p), Equals, "data222233")

p = make([]byte, 40)
n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 23)
c.Assert(string(p[:23]), Equals, "46757222222222289722222")

p = make([]byte, 5)
offs, err := efr.Seek(3, io.SeekStart)
c.Assert(err, IsNil)
c.Assert(offs, Equals, int64(3))

n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 5)
c.Assert(string(p), Equals, "a2222")

p = make([]byte, 5)
offs, err = efr.Seek(3, io.SeekCurrent)
c.Assert(err, IsNil)
c.Assert(offs, Equals, int64(11))

n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 5)
c.Assert(string(p), Equals, "67572")

/* Since fake_gcs_server hasn't support for negative offset yet.
p = make([]byte, 5)
offs, err = efr.Seek(int64(-7), io.SeekEnd)
c.Assert(err, IsNil)
c.Assert(offs, Equals, int64(-7))
n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 5)
c.Assert(string(p), Equals, "97222")
*/

err = efr.Close()
c.Assert(err, IsNil)

>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
c.Assert(stg.URI(), Equals, "gcs://testbucket/a/b/")
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/logutil"
)

const (
Expand Down Expand Up @@ -655,6 +656,19 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {

if realOffset == r.pos {
return realOffset, nil
} else if realOffset >= r.rangeInfo.Size {
// See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// because s3's GetObject interface doesn't allow get a range that matches zero length data,
// so if the position is out of range, we need to always return io.EOF after the seek operation.

// close current read and open a new one which target offset
if err := r.reader.Close(); err != nil {
log.L().Warn("close s3 reader failed, will ignore this error", logutil.ShortError(err))
}

r.reader = io.NopCloser(bytes.NewReader(nil))
r.pos = r.rangeInfo.Size
return r.pos, nil
}

// if seek ahead no more than 64k, we discard these data
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,15 @@ func (s *s3Suite) TestOpenSeek(c *C) {
c.Assert(err, IsNil)
c.Assert(n, Equals, 100)
c.Assert(slice, DeepEquals, someRandomBytes[990100:990200])

// test seek to the file end or bigger positions
for _, p := range []int64{1000000, 1000001, 2000000} {
offset, err = reader.Seek(p, io.SeekStart)
c.Assert(offset, Equals, int64(1000000))
c.Assert(err, IsNil)
_, err := reader.Read(slice)
c.Assert(err, Equals, io.EOF)
}
}

type limitedBytesReader struct {
Expand Down
3 changes: 3 additions & 0 deletions tests/br_log_restore/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# skip log restore test because ticdc doesn't support this feature anymore.
exit 0

set -eux
DB="$TEST_NAME"
TABLE="usertable"
Expand Down
4 changes: 1 addition & 3 deletions tests/br_other/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ default_pd_values='{
"max-merge-region-keys": 200000,
"max-merge-region-size": 20,
"leader-schedule-limit": 4,
"region-schedule-limit": 2048,
"max-snapshot-count": 3,
"max-pending-peer-count": 16
"region-schedule-limit": 2048
}'

for key in $(echo $default_pd_values | jq 'keys[]'); do
Expand Down
Loading

0 comments on commit 3e281c7

Please sign in to comment.