Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#1458
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
glorv authored and ti-chi-bot committed Jan 25, 2022
1 parent a135dc5 commit 5a18f31
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 7 deletions.
5 changes: 5 additions & 0 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (ss *Schemas) BackupSchemas(
startAll := time.Now()
for _, s := range ss.schemas {
schema := s
// Because schema.dbInfo is a pointer that many tables point to.
// Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations.
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
workerPool.ApplyOnErrorGroup(errg, func() error {
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
Expand Down
41 changes: 41 additions & 0 deletions pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package backup_test

import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"

. "github.com/pingcap/check"
Expand All @@ -15,6 +17,11 @@ import (

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/mock"
<<<<<<< HEAD
=======
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/utils"
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
)

var _ = Suite(&testBackupSchemaSuite{})
Expand Down Expand Up @@ -184,3 +191,37 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchemaWithBrokenStats(c *
c.Assert(schemas2[0].Table, DeepEquals, schemas[0].Table)
c.Assert(schemas2[0].Db, DeepEquals, schemas[0].Db)
}

func (s *testBackupSchemaSuite) TestBackupSchemasForSystemTable(c *C) {
tk := testkit.NewTestKit(c, s.mock.Storage)
es2 := s.GetRandomStorage(c)

systemTablesCount := 32
tablePrefix := "systable"
tk.MustExec("use mysql")
for i := 1; i <= systemTablesCount; i++ {
query := fmt.Sprintf("create table %s%d (a char(1));", tablePrefix, i)
tk.MustExec(query)
}

f, err := filter.Parse([]string{"mysql.systable*"})
c.Assert(err, IsNil)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(s.mock.Storage, f, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, systemTablesCount)

ctx := context.Background()
updateCh := new(simpleProgress)

metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false)
err = backupSchemas.BackupSchemas(ctx, metaWriter2, s.mock.Storage, nil,
math.MaxUint64, 1, variable.DefChecksumTableConcurrency, true, updateCh)
c.Assert(err, IsNil)

schemas2 := s.GetSchemasFromMeta(c, es2)
c.Assert(schemas2, HasLen, systemTablesCount)
for _, schema := range schemas2 {
c.Assert(schema.DB.Name, Equals, utils.TemporaryDBName("mysql"))
c.Assert(strings.HasPrefix(schema.Info.Name.O, tablePrefix), Equals, true)
}
}
14 changes: 12 additions & 2 deletions pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ import (
"github.com/pingcap/br/pkg/storage"
)

const tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
const (
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
// the increment ratio of large CSV file size threshold by `region-split-size`
largeCSVLowerThresholdRation = 10
)

type TableRegion struct {
EngineID int32
Expand Down Expand Up @@ -260,7 +264,10 @@ func makeSourceFileRegion(
}
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat {
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
return regions, subFileSizes, err
}
Expand Down Expand Up @@ -351,6 +358,9 @@ func SplitLargeFile(
columns = parser.Columns()
startOffset, _ = parser.Pos()
endOffset = startOffset + maxRegionSize
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
}
for {
curRowsCnt := (endOffset - startOffset) / divisor
Expand Down
108 changes: 108 additions & 0 deletions pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,111 @@ func (s *testMydumpRegionSuite) TestSplitLargeFileNoNewLine(c *C) {
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}
<<<<<<< HEAD
=======

func (s *testMydumpRegionSuite) TestSplitLargeFileWithCustomTerminator(c *C) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_with_custom_terminator",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: "|+|",
Terminator: "|+|\n",
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 1,
},
}

dir := c.MkDir()

fileName := "test2.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("5|+|abc\ndef\nghi|+|6|+|\n7|+|xyz|+|8|+|\n9|+||+|10")
err := os.WriteFile(filePath, content, 0o644)
c.Assert(err, IsNil)

dataFileInfo, err := os.Stat(filePath)
c.Assert(err, IsNil)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := int64(3)
prevRowIdxMax := int64(0)
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

offsets := [][]int64{{0, 23}, {23, 38}, {38, 47}}

_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
c.Assert(err, IsNil)
c.Assert(regions, HasLen, len(offsets))
for i := range offsets {
c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0])
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
}
}

func (s *testMydumpRegionSuite) TestSplitLargeFileOnlyOneChunk(c *C) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 15,
},
}

dir := c.MkDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("field1,field2\r\n123,456\r\n")
err := os.WriteFile(filePath, content, 0o644)
c.Assert(err, IsNil)

dataFileInfo, err := os.Stat(filePath)
c.Assert(err, IsNil)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := int64(2)
columns := []string{"field1", "field2"}
prevRowIdxMax := int64(0)
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

offsets := [][]int64{{14, 24}}

_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
c.Assert(err, IsNil)
c.Assert(regions, HasLen, len(offsets))
for i := range offsets {
c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0])
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
2 changes: 0 additions & 2 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ var (
"max-merge-region-size": 20,
"leader-schedule-limit": 4,
"region-schedule-limit": 2048,
"max-snapshot-count": 3,
"enable-location-replacement": "true",
"max-pending-peer-count": 16,
}
)

Expand Down
33 changes: 33 additions & 0 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,41 @@ func (s *gcsStorage) Open(ctx context.Context, path string) (ExternalFileReader,
// function; the second argument is the size in byte of the file determined
// by path.
func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
<<<<<<< HEAD
// TODO, implement this if needed
panic("Unsupported Operation")
=======
if opt == nil {
opt = &WalkOption{}
}

prefix := path.Join(s.gcs.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

query := &storage.Query{Prefix: prefix}
// only need each object's name and size
query.SetAttrSelection([]string{"Name", "Size"})
iter := s.bucket.Objects(ctx, query)
for {
attrs, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return errors.Trace(err)
}
// when walk on specify directory, the result include storage.Prefix,
// which can not be reuse in other API(Open/Read) directly.
// so we use TrimPrefix to filter Prefix for next Open/Read.
path := strings.TrimPrefix(attrs.Name, s.gcs.Prefix)
if err = fn(path, attrs.Size); err != nil {
return errors.Trace(err)
}
}
return nil
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
}

func (s *gcsStorage) URI() string {
Expand Down
94 changes: 94 additions & 0 deletions pkg/storage/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ package storage

import (
"context"
<<<<<<< HEAD
"io/ioutil"
=======
"fmt"
"io"
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
"os"

"github.com/fsouza/fake-gcs-server/fakestorage"
Expand Down Expand Up @@ -59,6 +64,95 @@ func (r *testStorageSuite) TestGCS(c *C) {
c.Assert(err, IsNil)
c.Assert(exist, IsFalse)

<<<<<<< HEAD
=======
list := ""
var totalSize int64 = 0
err = stg.WalkDir(ctx, nil, func(name string, size int64) error {
list += name
totalSize += size
return nil
})
c.Assert(err, IsNil)
c.Assert(list, Equals, "keykey1key2")
c.Assert(totalSize, Equals, int64(42))

// test 1003 files
totalSize = 0
for i := 0; i < 1000; i += 1 {
err = stg.WriteFile(ctx, fmt.Sprintf("f%d", i), []byte("data"))
c.Assert(err, IsNil)
}
filesSet := make(map[string]struct{}, 1003)
err = stg.WalkDir(ctx, nil, func(name string, size int64) error {
filesSet[name] = struct{}{}
totalSize += size
return nil
})
c.Assert(err, IsNil)
c.Assert(totalSize, Equals, int64(42+4000))
_, ok := filesSet["key"]
c.Assert(ok, IsTrue)
_, ok = filesSet["key1"]
c.Assert(ok, IsTrue)
_, ok = filesSet["key2"]
c.Assert(ok, IsTrue)
for i := 0; i < 1000; i += 1 {
_, ok = filesSet[fmt.Sprintf("f%d", i)]
c.Assert(ok, IsTrue)
}

efr, err := stg.Open(ctx, "key2")
c.Assert(err, IsNil)

p := make([]byte, 10)
n, err := efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 10)
c.Assert(string(p), Equals, "data222233")

p = make([]byte, 40)
n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 23)
c.Assert(string(p[:23]), Equals, "46757222222222289722222")

p = make([]byte, 5)
offs, err := efr.Seek(3, io.SeekStart)
c.Assert(err, IsNil)
c.Assert(offs, Equals, int64(3))

n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 5)
c.Assert(string(p), Equals, "a2222")

p = make([]byte, 5)
offs, err = efr.Seek(3, io.SeekCurrent)
c.Assert(err, IsNil)
c.Assert(offs, Equals, int64(11))

n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 5)
c.Assert(string(p), Equals, "67572")

/* Since fake_gcs_server hasn't support for negative offset yet.
p = make([]byte, 5)
offs, err = efr.Seek(int64(-7), io.SeekEnd)
c.Assert(err, IsNil)
c.Assert(offs, Equals, int64(-7))
n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 5)
c.Assert(string(p), Equals, "97222")
*/

err = efr.Close()
c.Assert(err, IsNil)

>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
c.Assert(stg.URI(), Equals, "gcs://testbucket/a/b/")
}

Expand Down
Loading

0 comments on commit 5a18f31

Please sign in to comment.