Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning: cherry-pick some PRs (#1458) #1459

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (ss *Schemas) BackupSchemas(
startAll := time.Now()
for _, s := range ss.schemas {
schema := s
// Because schema.dbInfo is a pointer that many tables point to.
// Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations.
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
workerPool.ApplyOnErrorGroup(errg, func() error {
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
Expand Down
41 changes: 41 additions & 0 deletions pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package backup_test

import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"

. "github.com/pingcap/check"
Expand All @@ -15,6 +17,11 @@ import (

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/mock"
<<<<<<< HEAD
=======
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/utils"
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
)

var _ = Suite(&testBackupSchemaSuite{})
Expand Down Expand Up @@ -184,3 +191,37 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchemaWithBrokenStats(c *
c.Assert(schemas2[0].Table, DeepEquals, schemas[0].Table)
c.Assert(schemas2[0].Db, DeepEquals, schemas[0].Db)
}

func (s *testBackupSchemaSuite) TestBackupSchemasForSystemTable(c *C) {
tk := testkit.NewTestKit(c, s.mock.Storage)
es2 := s.GetRandomStorage(c)

systemTablesCount := 32
tablePrefix := "systable"
tk.MustExec("use mysql")
for i := 1; i <= systemTablesCount; i++ {
query := fmt.Sprintf("create table %s%d (a char(1));", tablePrefix, i)
tk.MustExec(query)
}

f, err := filter.Parse([]string{"mysql.systable*"})
c.Assert(err, IsNil)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(s.mock.Storage, f, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, systemTablesCount)

ctx := context.Background()
updateCh := new(simpleProgress)

metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false)
err = backupSchemas.BackupSchemas(ctx, metaWriter2, s.mock.Storage, nil,
math.MaxUint64, 1, variable.DefChecksumTableConcurrency, true, updateCh)
c.Assert(err, IsNil)

schemas2 := s.GetSchemasFromMeta(c, es2)
c.Assert(schemas2, HasLen, systemTablesCount)
for _, schema := range schemas2 {
c.Assert(schema.DB.Name, Equals, utils.TemporaryDBName("mysql"))
c.Assert(strings.HasPrefix(schema.Info.Name.O, tablePrefix), Equals, true)
}
}
14 changes: 12 additions & 2 deletions pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ import (
"github.com/pingcap/br/pkg/storage"
)

const tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
const (
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
// the increment ratio of large CSV file size threshold by `region-split-size`
largeCSVLowerThresholdRation = 10
)

type TableRegion struct {
EngineID int32
Expand Down Expand Up @@ -260,7 +264,10 @@ func makeSourceFileRegion(
}
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat {
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
return regions, subFileSizes, err
}
Expand Down Expand Up @@ -351,6 +358,9 @@ func SplitLargeFile(
columns = parser.Columns()
startOffset, _ = parser.Pos()
endOffset = startOffset + maxRegionSize
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
}
for {
curRowsCnt := (endOffset - startOffset) / divisor
Expand Down
108 changes: 108 additions & 0 deletions pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,111 @@ func (s *testMydumpRegionSuite) TestSplitLargeFileNoNewLine(c *C) {
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}
<<<<<<< HEAD
=======

func (s *testMydumpRegionSuite) TestSplitLargeFileWithCustomTerminator(c *C) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_with_custom_terminator",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: "|+|",
Terminator: "|+|\n",
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 1,
},
}

dir := c.MkDir()

fileName := "test2.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("5|+|abc\ndef\nghi|+|6|+|\n7|+|xyz|+|8|+|\n9|+||+|10")
err := os.WriteFile(filePath, content, 0o644)
c.Assert(err, IsNil)

dataFileInfo, err := os.Stat(filePath)
c.Assert(err, IsNil)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := int64(3)
prevRowIdxMax := int64(0)
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

offsets := [][]int64{{0, 23}, {23, 38}, {38, 47}}

_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
c.Assert(err, IsNil)
c.Assert(regions, HasLen, len(offsets))
for i := range offsets {
c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0])
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
}
}

func (s *testMydumpRegionSuite) TestSplitLargeFileOnlyOneChunk(c *C) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 15,
},
}

dir := c.MkDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("field1,field2\r\n123,456\r\n")
err := os.WriteFile(filePath, content, 0o644)
c.Assert(err, IsNil)

dataFileInfo, err := os.Stat(filePath)
c.Assert(err, IsNil)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := int64(2)
columns := []string{"field1", "field2"}
prevRowIdxMax := int64(0)
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

offsets := [][]int64{{14, 24}}

_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
c.Assert(err, IsNil)
c.Assert(regions, HasLen, len(offsets))
for i := range offsets {
c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0])
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
2 changes: 0 additions & 2 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ var (
"max-merge-region-size": 20,
"leader-schedule-limit": 4,
"region-schedule-limit": 2048,
"max-snapshot-count": 3,
"enable-location-replacement": "true",
"max-pending-peer-count": 16,
}
)

Expand Down
33 changes: 33 additions & 0 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,41 @@ func (s *gcsStorage) Open(ctx context.Context, path string) (ExternalFileReader,
// function; the second argument is the size in byte of the file determined
// by path.
func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
<<<<<<< HEAD
// TODO, implement this if needed
panic("Unsupported Operation")
=======
if opt == nil {
opt = &WalkOption{}
}

prefix := path.Join(s.gcs.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

query := &storage.Query{Prefix: prefix}
// only need each object's name and size
query.SetAttrSelection([]string{"Name", "Size"})
iter := s.bucket.Objects(ctx, query)
for {
attrs, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return errors.Trace(err)
}
// when walk on specify directory, the result include storage.Prefix,
// which can not be reuse in other API(Open/Read) directly.
// so we use TrimPrefix to filter Prefix for next Open/Read.
path := strings.TrimPrefix(attrs.Name, s.gcs.Prefix)
if err = fn(path, attrs.Size); err != nil {
return errors.Trace(err)
}
}
return nil
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
}

func (s *gcsStorage) URI() string {
Expand Down
94 changes: 94 additions & 0 deletions pkg/storage/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ package storage

import (
"context"
<<<<<<< HEAD
"io/ioutil"
=======
"fmt"
"io"
>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
"os"

"github.com/fsouza/fake-gcs-server/fakestorage"
Expand Down Expand Up @@ -59,6 +64,95 @@ func (r *testStorageSuite) TestGCS(c *C) {
c.Assert(err, IsNil)
c.Assert(exist, IsFalse)

<<<<<<< HEAD
=======
list := ""
var totalSize int64 = 0
err = stg.WalkDir(ctx, nil, func(name string, size int64) error {
list += name
totalSize += size
return nil
})
c.Assert(err, IsNil)
c.Assert(list, Equals, "keykey1key2")
c.Assert(totalSize, Equals, int64(42))

// test 1003 files
totalSize = 0
for i := 0; i < 1000; i += 1 {
err = stg.WriteFile(ctx, fmt.Sprintf("f%d", i), []byte("data"))
c.Assert(err, IsNil)
}
filesSet := make(map[string]struct{}, 1003)
err = stg.WalkDir(ctx, nil, func(name string, size int64) error {
filesSet[name] = struct{}{}
totalSize += size
return nil
})
c.Assert(err, IsNil)
c.Assert(totalSize, Equals, int64(42+4000))
_, ok := filesSet["key"]
c.Assert(ok, IsTrue)
_, ok = filesSet["key1"]
c.Assert(ok, IsTrue)
_, ok = filesSet["key2"]
c.Assert(ok, IsTrue)
for i := 0; i < 1000; i += 1 {
_, ok = filesSet[fmt.Sprintf("f%d", i)]
c.Assert(ok, IsTrue)
}

efr, err := stg.Open(ctx, "key2")
c.Assert(err, IsNil)

p := make([]byte, 10)
n, err := efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 10)
c.Assert(string(p), Equals, "data222233")

p = make([]byte, 40)
n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 23)
c.Assert(string(p[:23]), Equals, "46757222222222289722222")

p = make([]byte, 5)
offs, err := efr.Seek(3, io.SeekStart)
c.Assert(err, IsNil)
c.Assert(offs, Equals, int64(3))

n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 5)
c.Assert(string(p), Equals, "a2222")

p = make([]byte, 5)
offs, err = efr.Seek(3, io.SeekCurrent)
c.Assert(err, IsNil)
c.Assert(offs, Equals, int64(11))

n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 5)
c.Assert(string(p), Equals, "67572")

/* Since fake_gcs_server hasn't support for negative offset yet.
p = make([]byte, 5)
offs, err = efr.Seek(int64(-7), io.SeekEnd)
c.Assert(err, IsNil)
c.Assert(offs, Equals, int64(-7))

n, err = efr.Read(p)
c.Assert(err, IsNil)
c.Assert(n, Equals, 5)
c.Assert(string(p), Equals, "97222")
*/

err = efr.Close()
c.Assert(err, IsNil)

>>>>>>> ec59c7b6 (lightning: cherry-pick some PRs (#1458))
c.Assert(stg.URI(), Equals, "gcs://testbucket/a/b/")
}

Expand Down
Loading