From 986ca5166d339923d8e3cee11ecae7c8e4c2c925 Mon Sep 17 00:00:00 2001 From: kennytm Date: Thu, 6 Dec 2018 16:53:35 +0800 Subject: [PATCH] *: added support for storing checkpoint at local filesystem (#90) * *: added support for storing checkpoint at local filesystem * config, tests: change default checkpoint driver to "file" * mydump: regenerate the parser to avoid golang/go#27350 during coverage * checkpoints: addressed comments --- Makefile | 3 +- go.mod | 1 + lightning/config/config.go | 11 +- lightning/mydump/parser_generated.go | 152 +- lightning/restore/checkpoints.go | 217 ++- lightning/restore/file_checkpoints.pb.go | 1456 +++++++++++++++++ lightning/restore/file_checkpoints.proto | 30 + lightning/restore/restore.go | 28 +- tests/checkpoint/config.toml | 1 + tests/checkpoint_chunks/config.toml | 2 +- tests/checkpoint_chunks/file.toml | 30 + tests/checkpoint_chunks/run.sh | 19 + tests/checkpoint_error_destroy/bad.toml | 1 + tests/checkpoint_error_destroy/bad_file.toml | 27 + tests/checkpoint_error_destroy/good.toml | 1 + tests/checkpoint_error_destroy/good_file.toml | 27 + tests/checkpoint_error_destroy/run.sh | 19 +- tidb-lightning.toml | 13 +- vendor/modules.txt | 2 +- 19 files changed, 1945 insertions(+), 95 deletions(-) create mode 100644 lightning/restore/file_checkpoints.pb.go create mode 100644 lightning/restore/file_checkpoints.proto create mode 100644 tests/checkpoint_chunks/file.toml create mode 100644 tests/checkpoint_error_destroy/bad_file.toml create mode 100644 tests/checkpoint_error_destroy/good_file.toml diff --git a/Makefile b/Makefile index 4a97bc9d45408..ff548d5234c58 100644 --- a/Makefile +++ b/Makefile @@ -48,8 +48,9 @@ checksuccess: data_parsers: ragel -Z -G2 -o tmp_parser.go lightning/mydump/parser.rl - @echo '// Code generated by ragel DO NOT EDIT.' | cat - tmp_parser.go > lightning/mydump/parser_generated.go + @echo '// Code generated by ragel DO NOT EDIT.' | cat - tmp_parser.go | sed 's|//line |//.... |g' > lightning/mydump/parser_generated.go @rm tmp_parser.go + PATH="$(GOPATH)/bin":$(PATH) protoc -I. -I"$(GOPATH)/src" lightning/restore/file_checkpoints.proto --gogofaster_out=. lightning: $(GOBUILD) $(RACE_FLAG) -ldflags '$(LDFLAGS)' -o $(LIGHTNING_BIN) cmd/main.go diff --git a/go.mod b/go.mod index a8fac729a43b1..51d904cc7d10b 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/cznic/strutil v0.0.0-20150430124730-1eb03e3cc9d3 github.com/cznic/y v0.0.0-20160420101755-9fdf92d4aac0 github.com/go-sql-driver/mysql v1.4.0 + github.com/gogo/protobuf v1.1.1 github.com/joho/sqltocsv v0.0.0-20180904231936-b24deec2b806 github.com/pingcap/check v0.0.0-20171206051426-1c287c953996 github.com/pingcap/kvproto v0.0.0-20181105061835-1b5d69cd1d26 diff --git a/lightning/config/config.go b/lightning/config/config.go index c28df4a5a96e9..a88c24e3af324 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -97,6 +97,7 @@ type Checkpoint struct { Enable bool `toml:"enable" json:"enable"` Schema string `toml:"schema" json:"schema"` DSN string `toml:"dsn" json:"-"` // DSN may contain password, don't expose this to JSON. + Driver string `toml:"driver" json:"driver"` KeepAfterSuccess bool `toml:"keep-after-success" json:"keep-after-success"` } @@ -175,8 +176,16 @@ func (cfg *Config) Load() error { if len(cfg.Checkpoint.Schema) == 0 { cfg.Checkpoint.Schema = "tidb_lightning_checkpoint" } + if len(cfg.Checkpoint.Driver) == 0 { + cfg.Checkpoint.Driver = "file" + } if len(cfg.Checkpoint.DSN) == 0 { - cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw) + switch cfg.Checkpoint.Driver { + case "mysql": + cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw) + case "file": + cfg.Checkpoint.DSN = "/tmp/" + cfg.Checkpoint.Schema + ".pb" + } } return nil diff --git a/lightning/mydump/parser_generated.go b/lightning/mydump/parser_generated.go index 0fcbbc226383e..c4965ad8bc25d 100644 --- a/lightning/mydump/parser_generated.go +++ b/lightning/mydump/parser_generated.go @@ -1,6 +1,6 @@ // Code generated by ragel DO NOT EDIT. -//line lightning/mydump/parser.rl:1 +//.... lightning/mydump/parser.rl:1 // Please edit `parser.rl` if you want to modify this file. To generate // `parser_generated.go`, please execute // @@ -18,11 +18,11 @@ import ( ) -//line lightning/mydump/parser.rl:54 +//.... lightning/mydump/parser.rl:76 -//line tmp_parser.go:25 +//.... tmp_parser.go:25 const chunk_parser_start int = 21 const chunk_parser_first_final int = 21 const chunk_parser_error int = 0 @@ -30,12 +30,12 @@ const chunk_parser_error int = 0 const chunk_parser_en_main int = 21 -//line lightning/mydump/parser.rl:57 +//.... lightning/mydump/parser.rl:79 func (parser *ChunkParser) lex() (token, []byte, error) { var cs, ts, te, act, p int -//line tmp_parser.go:38 +//.... tmp_parser.go:38 { cs = chunk_parser_start ts = 0 @@ -43,7 +43,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { act = 0 } -//line lightning/mydump/parser.rl:61 +//.... lightning/mydump/parser.rl:83 for { data := parser.buf @@ -55,7 +55,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { } -//line tmp_parser.go:58 +//.... tmp_parser.go:58 { if p == pe { goto _test_eof @@ -146,7 +146,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { } goto st_out tr0: -//line NONE:1 +//.... NONE:1 switch act { case 0: {{goto st0 }} @@ -169,7 +169,7 @@ tr0: goto st21 tr8: -//line lightning/mydump/parser.rl:42 +//.... lightning/mydump/parser.rl:64 te = p+1 { consumedToken = tokRow @@ -177,7 +177,7 @@ te = p+1 } goto st21 tr12: -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 p = (te) - 1 { consumedToken = tokName @@ -185,12 +185,12 @@ p = (te) - 1 } goto st21 tr14: -//line lightning/mydump/parser.rl:35 +//.... lightning/mydump/parser.rl:57 te = p+1 goto st21 tr34: -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 te = p p-- { @@ -199,26 +199,26 @@ p-- } goto st21 tr35: -//line lightning/mydump/parser.rl:35 +//.... lightning/mydump/parser.rl:57 te = p p-- goto st21 st21: -//line NONE:1 +//.... NONE:1 ts = 0 -//line NONE:1 +//.... NONE:1 act = 0 if p++; p == pe { goto _test_eof21 } st_case_21: -//line NONE:1 +//.... NONE:1 ts = p -//line tmp_parser.go:221 +//.... tmp_parser.go:221 switch data[p] { case 32: goto tr14 @@ -255,24 +255,24 @@ ts = p } goto tr2 tr2: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st22 tr37: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:35 +//.... lightning/mydump/parser.rl:57 act = 1; goto st22 tr47: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:37 +//.... lightning/mydump/parser.rl:59 act = 2; goto st22 st22: @@ -280,7 +280,7 @@ act = 2; goto _test_eof22 } st_case_22: -//line tmp_parser.go:283 +//.... tmp_parser.go:283 switch data[p] { case 32: goto tr0 @@ -395,10 +395,10 @@ st_case_0: } goto st9 tr30: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st23 st23: @@ -406,7 +406,7 @@ act = 4; goto _test_eof23 } st_case_23: -//line tmp_parser.go:409 +//.... tmp_parser.go:409 switch data[p] { case 32: goto tr34 @@ -431,7 +431,7 @@ act = 4; } goto tr2 tr17: -//line NONE:1 +//.... NONE:1 te = p+1 goto st24 @@ -440,7 +440,7 @@ te = p+1 goto _test_eof24 } st_case_24: -//line tmp_parser.go:443 +//.... tmp_parser.go:443 switch data[p] { case 10: goto tr14 @@ -488,10 +488,10 @@ te = p+1 } goto st11 tr16: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:35 +//.... lightning/mydump/parser.rl:57 act = 1; goto st25 st25: @@ -499,7 +499,7 @@ act = 1; goto _test_eof25 } st_case_25: -//line tmp_parser.go:502 +//.... tmp_parser.go:502 switch data[p] { case 34: goto tr2 @@ -529,10 +529,10 @@ act = 1; } goto st13 tr20: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:35 +//.... lightning/mydump/parser.rl:57 act = 1; goto st26 st26: @@ -540,16 +540,16 @@ act = 1; goto _test_eof26 } st_case_26: -//line tmp_parser.go:543 +//.... tmp_parser.go:543 if data[p] == 96 { goto tr2 } goto st3 tr31: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st27 st27: @@ -557,7 +557,7 @@ act = 4; goto _test_eof27 } st_case_27: -//line tmp_parser.go:560 +//.... tmp_parser.go:560 switch data[p] { case 32: goto tr34 @@ -582,7 +582,7 @@ act = 4; } goto tr2 tr24: -//line NONE:1 +//.... NONE:1 te = p+1 goto st28 @@ -591,7 +591,7 @@ te = p+1 goto _test_eof28 } st_case_28: -//line tmp_parser.go:594 +//.... tmp_parser.go:594 switch data[p] { case 32: goto st14 @@ -676,7 +676,7 @@ te = p+1 } goto st16 tr36: -//line NONE:1 +//.... NONE:1 te = p+1 goto st29 @@ -685,7 +685,7 @@ te = p+1 goto _test_eof29 } st_case_29: -//line tmp_parser.go:688 +//.... tmp_parser.go:688 switch data[p] { case 32: goto st14 @@ -738,10 +738,10 @@ te = p+1 } goto st19 tr32: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st30 st30: @@ -749,7 +749,7 @@ act = 4; goto _test_eof30 } st_case_30: -//line tmp_parser.go:752 +//.... tmp_parser.go:752 switch data[p] { case 32: goto tr34 @@ -776,10 +776,10 @@ act = 4; } goto tr2 tr38: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st31 st31: @@ -787,7 +787,7 @@ act = 4; goto _test_eof31 } st_case_31: -//line tmp_parser.go:790 +//.... tmp_parser.go:790 switch data[p] { case 32: goto tr34 @@ -818,10 +818,10 @@ act = 4; } goto tr2 tr39: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st32 st32: @@ -829,7 +829,7 @@ act = 4; goto _test_eof32 } st_case_32: -//line tmp_parser.go:832 +//.... tmp_parser.go:832 switch data[p] { case 32: goto tr34 @@ -856,10 +856,10 @@ act = 4; } goto tr2 tr41: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st33 st33: @@ -867,7 +867,7 @@ act = 4; goto _test_eof33 } st_case_33: -//line tmp_parser.go:870 +//.... tmp_parser.go:870 switch data[p] { case 32: goto tr34 @@ -894,10 +894,10 @@ act = 4; } goto tr2 tr42: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st34 st34: @@ -905,7 +905,7 @@ act = 4; goto _test_eof34 } st_case_34: -//line tmp_parser.go:908 +//.... tmp_parser.go:908 switch data[p] { case 32: goto tr34 @@ -932,10 +932,10 @@ act = 4; } goto tr2 tr40: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st35 st35: @@ -943,7 +943,7 @@ act = 4; goto _test_eof35 } st_case_35: -//line tmp_parser.go:946 +//.... tmp_parser.go:946 switch data[p] { case 32: goto tr34 @@ -970,10 +970,10 @@ act = 4; } goto tr2 tr33: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st36 st36: @@ -981,7 +981,7 @@ act = 4; goto _test_eof36 } st_case_36: -//line tmp_parser.go:984 +//.... tmp_parser.go:984 switch data[p] { case 32: goto tr34 @@ -1008,10 +1008,10 @@ act = 4; } goto tr2 tr43: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st37 st37: @@ -1019,7 +1019,7 @@ act = 4; goto _test_eof37 } st_case_37: -//line tmp_parser.go:1022 +//.... tmp_parser.go:1022 switch data[p] { case 32: goto tr34 @@ -1046,10 +1046,10 @@ act = 4; } goto tr2 tr44: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st38 st38: @@ -1057,7 +1057,7 @@ act = 4; goto _test_eof38 } st_case_38: -//line tmp_parser.go:1060 +//.... tmp_parser.go:1060 switch data[p] { case 32: goto tr34 @@ -1084,10 +1084,10 @@ act = 4; } goto tr2 tr45: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st39 st39: @@ -1095,7 +1095,7 @@ act = 4; goto _test_eof39 } st_case_39: -//line tmp_parser.go:1098 +//.... tmp_parser.go:1098 switch data[p] { case 32: goto tr34 @@ -1122,10 +1122,10 @@ act = 4; } goto tr2 tr46: -//line NONE:1 +//.... NONE:1 te = p+1 -//line lightning/mydump/parser.rl:47 +//.... lightning/mydump/parser.rl:69 act = 4; goto st40 st40: @@ -1133,7 +1133,7 @@ act = 4; goto _test_eof40 } st_case_40: -//line tmp_parser.go:1136 +//.... tmp_parser.go:1136 switch data[p] { case 32: goto tr34 @@ -1276,7 +1276,7 @@ act = 4; _out: {} } -//line lightning/mydump/parser.rl:72 +//.... lightning/mydump/parser.rl:94 if cs == 0 { common.AppLogger.Errorf("Syntax error near byte %d, content is «%s»", parser.pos, string(data)) diff --git a/lightning/restore/checkpoints.go b/lightning/restore/checkpoints.go index 1975066e90842..2556b744c1e9f 100644 --- a/lightning/restore/checkpoints.go +++ b/lightning/restore/checkpoints.go @@ -5,6 +5,8 @@ import ( "database/sql" "fmt" "io" + "io/ioutil" + "sort" "strings" "time" @@ -74,6 +76,17 @@ func (key *ChunkCheckpointKey) String() string { return fmt.Sprintf("%s:%d", key.Path, key.Offset) } +func (key *ChunkCheckpointKey) less(other *ChunkCheckpointKey) bool { + switch { + case key.Path < other.Path: + return true + case key.Path > other.Path: + return false + default: + return key.Offset < other.Offset + } +} + type ChunkCheckpoint struct { Key ChunkCheckpointKey Columns []byte @@ -488,9 +501,155 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi } } +type FileCheckpointsDB struct { + checkpoints CheckpointsModel + path string +} + +func NewFileCheckpointsDB(path string) *FileCheckpointsDB { + cpdb := &FileCheckpointsDB{path: path} + // ignore all errors -- file maybe not created yet (and it is fine). + content, err := ioutil.ReadFile(path) + if err == nil { + cpdb.checkpoints.Unmarshal(content) + } else { + common.AppLogger.Warnf("failed to open checkpoint file %s, going to create a new one: %v", path, err) + } + return cpdb +} + +func (cpdb *FileCheckpointsDB) save() error { + serialized, err := cpdb.checkpoints.Marshal() + if err != nil { + return errors.Trace(err) + } + if err := ioutil.WriteFile(cpdb.path, serialized, 0644); err != nil { + return errors.Trace(err) + } + return nil +} + +func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, dbInfo map[string]*TidbDBInfo) error { + if cpdb.checkpoints.Checkpoints == nil { + cpdb.checkpoints.Checkpoints = make(map[string]*TableCheckpointModel) + } + + for _, db := range dbInfo { + for _, table := range db.Tables { + tableName := common.UniqueTable(db.Name, table.Name) + if _, ok := cpdb.checkpoints.Checkpoints[tableName]; !ok { + cpdb.checkpoints.Checkpoints[tableName] = &TableCheckpointModel{ + Status: uint32(CheckpointStatusLoaded), + Engine: uuid.NewV4().Bytes(), + } + } + // TODO check if hash matches + } + } + + return errors.Trace(cpdb.save()) +} + +func (cpdb *FileCheckpointsDB) Close() error { + return errors.Trace(cpdb.save()) +} + +func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error) { + tableModel := cpdb.checkpoints.Checkpoints[tableName] + + engine, err := uuid.FromBytes(tableModel.Engine) + if err != nil { + return nil, errors.Trace(err) + } + + cp := &TableCheckpoint{ + Status: CheckpointStatus(tableModel.Status), + Engine: engine, + AllocBase: tableModel.AllocBase, + Chunks: make([]*ChunkCheckpoint, 0, len(tableModel.Chunks)), + } + + for _, chunkModel := range tableModel.Chunks { + cp.Chunks = append(cp.Chunks, &ChunkCheckpoint{ + Key: ChunkCheckpointKey{ + Path: chunkModel.Path, + Offset: chunkModel.Offset, + }, + Columns: chunkModel.Columns, + ShouldIncludeRowID: chunkModel.ShouldIncludeRowId, + Chunk: mydump.Chunk{ + Offset: chunkModel.Pos, + EndOffset: chunkModel.EndOffset, + PrevRowIDMax: chunkModel.PrevRowidMax, + RowIDMax: chunkModel.RowidMax, + }, + Checksum: verify.MakeKVChecksum(chunkModel.KvcBytes, chunkModel.KvcKvs, chunkModel.KvcChecksum), + }) + } + sort.Slice(cp.Chunks, func(i, j int) bool { + return cp.Chunks[i].Key.less(&cp.Chunks[j].Key) + }) + + return cp, nil +} + +func (cpdb *FileCheckpointsDB) InsertChunkCheckpoints(_ context.Context, tableName string, checkpoints []*ChunkCheckpoint) error { + tableModel := cpdb.checkpoints.Checkpoints[tableName] + if tableModel.Chunks == nil { + tableModel.Chunks = make(map[string]*ChunkCheckpointModel) + } + + for _, value := range checkpoints { + key := value.Key.String() + chunk, ok := tableModel.Chunks[key] + if !ok { + chunk = &ChunkCheckpointModel{ + Path: value.Key.Path, + Offset: value.Key.Offset, + Columns: value.Columns, + ShouldIncludeRowId: value.ShouldIncludeRowID, + } + tableModel.Chunks[key] = chunk + } + chunk.Pos = value.Chunk.Offset + chunk.EndOffset = value.Chunk.EndOffset + chunk.PrevRowidMax = value.Chunk.PrevRowIDMax + chunk.RowidMax = value.Chunk.RowIDMax + chunk.KvcBytes = value.Checksum.SumSize() + chunk.KvcKvs = value.Checksum.SumKVS() + chunk.KvcChecksum = value.Checksum.Sum() + } + + return errors.Trace(cpdb.save()) +} + +func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff) { + for tableName, cpd := range checkpointDiffs { + tableModel := cpdb.checkpoints.Checkpoints[tableName] + if cpd.hasStatus { + tableModel.Status = uint32(cpd.status) + } + if cpd.hasRebase { + tableModel.AllocBase = cpd.allocBase + } + for key, diff := range cpd.chunks { + chunkModel := tableModel.Chunks[key.String()] + chunkModel.Pos = diff.pos + chunkModel.PrevRowidMax = diff.rowID + chunkModel.KvcBytes = diff.checksum.SumSize() + chunkModel.KvcKvs = diff.checksum.SumKVS() + chunkModel.KvcChecksum = diff.checksum.Sum() + } + } + + if err := cpdb.save(); err != nil { + common.AppLogger.Errorf("failed to save checkpoint: %v", err) + } +} + // Management functions ---------------------------------------------------------------------------- -var cannotManageNullDB = errors.NotSupportedf("checkpoints is disabled") +var cannotManageNullDB = errors.New("cannot perform this function while checkpoints is disabled") func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error { return errors.Trace(cannotManageNullDB) @@ -671,3 +830,59 @@ func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer return errors.Trace(sqltocsv.Write(writer, rows)) } + +func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error { + if tableName == "all" { + cpdb.checkpoints.Reset() + } else { + delete(cpdb.checkpoints.Checkpoints, tableName) + } + return errors.Trace(cpdb.save()) +} + +func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error { + for tableName, tableModel := range cpdb.checkpoints.Checkpoints { + if !(targetTableName == "all" || targetTableName == tableName) { + continue + } + if tableModel.Status <= uint32(CheckpointStatusMaxInvalid) { + tableModel.Status = uint32(CheckpointStatusLoaded) + } + } + return errors.Trace(cpdb.save()) +} + +func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error) { + var targetTables []DestroyedTableCheckpoint + + for tableName, tableModel := range cpdb.checkpoints.Checkpoints { + // Obtain the list of tables + if !(targetTableName == "all" || targetTableName == tableName) { + continue + } + if tableModel.Status <= uint32(CheckpointStatusMaxInvalid) { + targetTables = append(targetTables, DestroyedTableCheckpoint{ + TableName: tableName, + Engine: uuid.FromBytesOrNil(tableModel.Engine), + }) + } + } + + // Delete the checkpoints + for _, dtcp := range targetTables { + delete(cpdb.checkpoints.Checkpoints, dtcp.TableName) + } + if err := cpdb.save(); err != nil { + return nil, errors.Trace(err) + } + + return targetTables, nil +} + +func (cpdb *FileCheckpointsDB) DumpTables(context.Context, io.Writer) error { + return errors.Errorf("dumping file checkpoint into CSV not unsupported, you may copy %s instead", cpdb.path) +} + +func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error { + return errors.Errorf("dumping file checkpoint into CSV not unsupported, you may copy %s instead", cpdb.path) +} diff --git a/lightning/restore/file_checkpoints.pb.go b/lightning/restore/file_checkpoints.pb.go new file mode 100644 index 0000000000000..579f41148b9d7 --- /dev/null +++ b/lightning/restore/file_checkpoints.pb.go @@ -0,0 +1,1456 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: lightning/restore/file_checkpoints.proto + +package restore + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import encoding_binary "encoding/binary" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type CheckpointsModel struct { + // key is table_name + Checkpoints map[string]*TableCheckpointModel `protobuf:"bytes,1,rep,name=checkpoints" json:"checkpoints,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CheckpointsModel) Reset() { *m = CheckpointsModel{} } +func (m *CheckpointsModel) String() string { return proto.CompactTextString(m) } +func (*CheckpointsModel) ProtoMessage() {} +func (*CheckpointsModel) Descriptor() ([]byte, []int) { + return fileDescriptor_file_checkpoints_e8be6a4c2b3dc1c8, []int{0} +} +func (m *CheckpointsModel) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CheckpointsModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CheckpointsModel.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *CheckpointsModel) XXX_Merge(src proto.Message) { + xxx_messageInfo_CheckpointsModel.Merge(dst, src) +} +func (m *CheckpointsModel) XXX_Size() int { + return m.Size() +} +func (m *CheckpointsModel) XXX_DiscardUnknown() { + xxx_messageInfo_CheckpointsModel.DiscardUnknown(m) +} + +var xxx_messageInfo_CheckpointsModel proto.InternalMessageInfo + +func (m *CheckpointsModel) GetCheckpoints() map[string]*TableCheckpointModel { + if m != nil { + return m.Checkpoints + } + return nil +} + +type TableCheckpointModel struct { + Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` + Engine []byte `protobuf:"bytes,2,opt,name=engine,proto3" json:"engine,omitempty"` + Status uint32 `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"` + AllocBase int64 `protobuf:"varint,4,opt,name=alloc_base,json=allocBase,proto3" json:"alloc_base,omitempty"` + // key is "$path:$offset" + Chunks map[string]*ChunkCheckpointModel `protobuf:"bytes,5,rep,name=chunks" json:"chunks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TableCheckpointModel) Reset() { *m = TableCheckpointModel{} } +func (m *TableCheckpointModel) String() string { return proto.CompactTextString(m) } +func (*TableCheckpointModel) ProtoMessage() {} +func (*TableCheckpointModel) Descriptor() ([]byte, []int) { + return fileDescriptor_file_checkpoints_e8be6a4c2b3dc1c8, []int{1} +} +func (m *TableCheckpointModel) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TableCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TableCheckpointModel.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *TableCheckpointModel) XXX_Merge(src proto.Message) { + xxx_messageInfo_TableCheckpointModel.Merge(dst, src) +} +func (m *TableCheckpointModel) XXX_Size() int { + return m.Size() +} +func (m *TableCheckpointModel) XXX_DiscardUnknown() { + xxx_messageInfo_TableCheckpointModel.DiscardUnknown(m) +} + +var xxx_messageInfo_TableCheckpointModel proto.InternalMessageInfo + +func (m *TableCheckpointModel) GetHash() []byte { + if m != nil { + return m.Hash + } + return nil +} + +func (m *TableCheckpointModel) GetEngine() []byte { + if m != nil { + return m.Engine + } + return nil +} + +func (m *TableCheckpointModel) GetStatus() uint32 { + if m != nil { + return m.Status + } + return 0 +} + +func (m *TableCheckpointModel) GetAllocBase() int64 { + if m != nil { + return m.AllocBase + } + return 0 +} + +func (m *TableCheckpointModel) GetChunks() map[string]*ChunkCheckpointModel { + if m != nil { + return m.Chunks + } + return nil +} + +type ChunkCheckpointModel struct { + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` + Columns []byte `protobuf:"bytes,3,opt,name=columns,proto3" json:"columns,omitempty"` + ShouldIncludeRowId bool `protobuf:"varint,4,opt,name=should_include_row_id,json=shouldIncludeRowId,proto3" json:"should_include_row_id,omitempty"` + EndOffset int64 `protobuf:"varint,5,opt,name=end_offset,json=endOffset,proto3" json:"end_offset,omitempty"` + Pos int64 `protobuf:"varint,6,opt,name=pos,proto3" json:"pos,omitempty"` + PrevRowidMax int64 `protobuf:"varint,7,opt,name=prev_rowid_max,json=prevRowidMax,proto3" json:"prev_rowid_max,omitempty"` + RowidMax int64 `protobuf:"varint,8,opt,name=rowid_max,json=rowidMax,proto3" json:"rowid_max,omitempty"` + KvcBytes uint64 `protobuf:"varint,9,opt,name=kvc_bytes,json=kvcBytes,proto3" json:"kvc_bytes,omitempty"` + KvcKvs uint64 `protobuf:"varint,10,opt,name=kvc_kvs,json=kvcKvs,proto3" json:"kvc_kvs,omitempty"` + KvcChecksum uint64 `protobuf:"fixed64,11,opt,name=kvc_checksum,json=kvcChecksum,proto3" json:"kvc_checksum,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ChunkCheckpointModel) Reset() { *m = ChunkCheckpointModel{} } +func (m *ChunkCheckpointModel) String() string { return proto.CompactTextString(m) } +func (*ChunkCheckpointModel) ProtoMessage() {} +func (*ChunkCheckpointModel) Descriptor() ([]byte, []int) { + return fileDescriptor_file_checkpoints_e8be6a4c2b3dc1c8, []int{2} +} +func (m *ChunkCheckpointModel) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ChunkCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ChunkCheckpointModel.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *ChunkCheckpointModel) XXX_Merge(src proto.Message) { + xxx_messageInfo_ChunkCheckpointModel.Merge(dst, src) +} +func (m *ChunkCheckpointModel) XXX_Size() int { + return m.Size() +} +func (m *ChunkCheckpointModel) XXX_DiscardUnknown() { + xxx_messageInfo_ChunkCheckpointModel.DiscardUnknown(m) +} + +var xxx_messageInfo_ChunkCheckpointModel proto.InternalMessageInfo + +func (m *ChunkCheckpointModel) GetPath() string { + if m != nil { + return m.Path + } + return "" +} + +func (m *ChunkCheckpointModel) GetOffset() int64 { + if m != nil { + return m.Offset + } + return 0 +} + +func (m *ChunkCheckpointModel) GetColumns() []byte { + if m != nil { + return m.Columns + } + return nil +} + +func (m *ChunkCheckpointModel) GetShouldIncludeRowId() bool { + if m != nil { + return m.ShouldIncludeRowId + } + return false +} + +func (m *ChunkCheckpointModel) GetEndOffset() int64 { + if m != nil { + return m.EndOffset + } + return 0 +} + +func (m *ChunkCheckpointModel) GetPos() int64 { + if m != nil { + return m.Pos + } + return 0 +} + +func (m *ChunkCheckpointModel) GetPrevRowidMax() int64 { + if m != nil { + return m.PrevRowidMax + } + return 0 +} + +func (m *ChunkCheckpointModel) GetRowidMax() int64 { + if m != nil { + return m.RowidMax + } + return 0 +} + +func (m *ChunkCheckpointModel) GetKvcBytes() uint64 { + if m != nil { + return m.KvcBytes + } + return 0 +} + +func (m *ChunkCheckpointModel) GetKvcKvs() uint64 { + if m != nil { + return m.KvcKvs + } + return 0 +} + +func (m *ChunkCheckpointModel) GetKvcChecksum() uint64 { + if m != nil { + return m.KvcChecksum + } + return 0 +} + +func init() { + proto.RegisterType((*CheckpointsModel)(nil), "CheckpointsModel") + proto.RegisterMapType((map[string]*TableCheckpointModel)(nil), "CheckpointsModel.CheckpointsEntry") + proto.RegisterType((*TableCheckpointModel)(nil), "TableCheckpointModel") + proto.RegisterMapType((map[string]*ChunkCheckpointModel)(nil), "TableCheckpointModel.ChunksEntry") + proto.RegisterType((*ChunkCheckpointModel)(nil), "ChunkCheckpointModel") +} +func (m *CheckpointsModel) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CheckpointsModel) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Checkpoints) > 0 { + for k, _ := range m.Checkpoints { + dAtA[i] = 0xa + i++ + v := m.Checkpoints[k] + msgSize := 0 + if v != nil { + msgSize = v.Size() + msgSize += 1 + sovFileCheckpoints(uint64(msgSize)) + } + mapSize := 1 + len(k) + sovFileCheckpoints(uint64(len(k))) + msgSize + i = encodeVarintFileCheckpoints(dAtA, i, uint64(mapSize)) + dAtA[i] = 0xa + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(len(k))) + i += copy(dAtA[i:], k) + if v != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(v.Size())) + n1, err := v.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + } + } + return i, nil +} + +func (m *TableCheckpointModel) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TableCheckpointModel) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Hash) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(len(m.Hash))) + i += copy(dAtA[i:], m.Hash) + } + if len(m.Engine) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(len(m.Engine))) + i += copy(dAtA[i:], m.Engine) + } + if m.Status != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.Status)) + } + if m.AllocBase != 0 { + dAtA[i] = 0x20 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.AllocBase)) + } + if len(m.Chunks) > 0 { + for k, _ := range m.Chunks { + dAtA[i] = 0x2a + i++ + v := m.Chunks[k] + msgSize := 0 + if v != nil { + msgSize = v.Size() + msgSize += 1 + sovFileCheckpoints(uint64(msgSize)) + } + mapSize := 1 + len(k) + sovFileCheckpoints(uint64(len(k))) + msgSize + i = encodeVarintFileCheckpoints(dAtA, i, uint64(mapSize)) + dAtA[i] = 0xa + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(len(k))) + i += copy(dAtA[i:], k) + if v != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(v.Size())) + n2, err := v.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + } + } + return i, nil +} + +func (m *ChunkCheckpointModel) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ChunkCheckpointModel) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Path) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(len(m.Path))) + i += copy(dAtA[i:], m.Path) + } + if m.Offset != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.Offset)) + } + if len(m.Columns) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(len(m.Columns))) + i += copy(dAtA[i:], m.Columns) + } + if m.ShouldIncludeRowId { + dAtA[i] = 0x20 + i++ + if m.ShouldIncludeRowId { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } + if m.EndOffset != 0 { + dAtA[i] = 0x28 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.EndOffset)) + } + if m.Pos != 0 { + dAtA[i] = 0x30 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.Pos)) + } + if m.PrevRowidMax != 0 { + dAtA[i] = 0x38 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.PrevRowidMax)) + } + if m.RowidMax != 0 { + dAtA[i] = 0x40 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.RowidMax)) + } + if m.KvcBytes != 0 { + dAtA[i] = 0x48 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.KvcBytes)) + } + if m.KvcKvs != 0 { + dAtA[i] = 0x50 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.KvcKvs)) + } + if m.KvcChecksum != 0 { + dAtA[i] = 0x59 + i++ + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(m.KvcChecksum)) + i += 8 + } + return i, nil +} + +func encodeVarintFileCheckpoints(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *CheckpointsModel) Size() (n int) { + var l int + _ = l + if len(m.Checkpoints) > 0 { + for k, v := range m.Checkpoints { + _ = k + _ = v + l = 0 + if v != nil { + l = v.Size() + l += 1 + sovFileCheckpoints(uint64(l)) + } + mapEntrySize := 1 + len(k) + sovFileCheckpoints(uint64(len(k))) + l + n += mapEntrySize + 1 + sovFileCheckpoints(uint64(mapEntrySize)) + } + } + return n +} + +func (m *TableCheckpointModel) Size() (n int) { + var l int + _ = l + l = len(m.Hash) + if l > 0 { + n += 1 + l + sovFileCheckpoints(uint64(l)) + } + l = len(m.Engine) + if l > 0 { + n += 1 + l + sovFileCheckpoints(uint64(l)) + } + if m.Status != 0 { + n += 1 + sovFileCheckpoints(uint64(m.Status)) + } + if m.AllocBase != 0 { + n += 1 + sovFileCheckpoints(uint64(m.AllocBase)) + } + if len(m.Chunks) > 0 { + for k, v := range m.Chunks { + _ = k + _ = v + l = 0 + if v != nil { + l = v.Size() + l += 1 + sovFileCheckpoints(uint64(l)) + } + mapEntrySize := 1 + len(k) + sovFileCheckpoints(uint64(len(k))) + l + n += mapEntrySize + 1 + sovFileCheckpoints(uint64(mapEntrySize)) + } + } + return n +} + +func (m *ChunkCheckpointModel) Size() (n int) { + var l int + _ = l + l = len(m.Path) + if l > 0 { + n += 1 + l + sovFileCheckpoints(uint64(l)) + } + if m.Offset != 0 { + n += 1 + sovFileCheckpoints(uint64(m.Offset)) + } + l = len(m.Columns) + if l > 0 { + n += 1 + l + sovFileCheckpoints(uint64(l)) + } + if m.ShouldIncludeRowId { + n += 2 + } + if m.EndOffset != 0 { + n += 1 + sovFileCheckpoints(uint64(m.EndOffset)) + } + if m.Pos != 0 { + n += 1 + sovFileCheckpoints(uint64(m.Pos)) + } + if m.PrevRowidMax != 0 { + n += 1 + sovFileCheckpoints(uint64(m.PrevRowidMax)) + } + if m.RowidMax != 0 { + n += 1 + sovFileCheckpoints(uint64(m.RowidMax)) + } + if m.KvcBytes != 0 { + n += 1 + sovFileCheckpoints(uint64(m.KvcBytes)) + } + if m.KvcKvs != 0 { + n += 1 + sovFileCheckpoints(uint64(m.KvcKvs)) + } + if m.KvcChecksum != 0 { + n += 9 + } + return n +} + +func sovFileCheckpoints(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozFileCheckpoints(x uint64) (n int) { + return sovFileCheckpoints(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *CheckpointsModel) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CheckpointsModel: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CheckpointsModel: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Checkpoints", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthFileCheckpoints + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Checkpoints == nil { + m.Checkpoints = make(map[string]*TableCheckpointModel) + } + var mapkey string + var mapvalue *TableCheckpointModel + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthFileCheckpoints + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var mapmsglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapmsglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if mapmsglen < 0 { + return ErrInvalidLengthFileCheckpoints + } + postmsgIndex := iNdEx + mapmsglen + if mapmsglen < 0 { + return ErrInvalidLengthFileCheckpoints + } + if postmsgIndex > l { + return io.ErrUnexpectedEOF + } + mapvalue = &TableCheckpointModel{} + if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { + return err + } + iNdEx = postmsgIndex + } else { + iNdEx = entryPreIndex + skippy, err := skipFileCheckpoints(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthFileCheckpoints + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.Checkpoints[mapkey] = mapvalue + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipFileCheckpoints(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthFileCheckpoints + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TableCheckpointModel: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TableCheckpointModel: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthFileCheckpoints + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Hash = append(m.Hash[:0], dAtA[iNdEx:postIndex]...) + if m.Hash == nil { + m.Hash = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Engine", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthFileCheckpoints + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Engine = append(m.Engine[:0], dAtA[iNdEx:postIndex]...) + if m.Engine == nil { + m.Engine = []byte{} + } + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + m.Status = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Status |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field AllocBase", wireType) + } + m.AllocBase = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.AllocBase |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthFileCheckpoints + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Chunks == nil { + m.Chunks = make(map[string]*ChunkCheckpointModel) + } + var mapkey string + var mapvalue *ChunkCheckpointModel + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthFileCheckpoints + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var mapmsglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapmsglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if mapmsglen < 0 { + return ErrInvalidLengthFileCheckpoints + } + postmsgIndex := iNdEx + mapmsglen + if mapmsglen < 0 { + return ErrInvalidLengthFileCheckpoints + } + if postmsgIndex > l { + return io.ErrUnexpectedEOF + } + mapvalue = &ChunkCheckpointModel{} + if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { + return err + } + iNdEx = postmsgIndex + } else { + iNdEx = entryPreIndex + skippy, err := skipFileCheckpoints(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthFileCheckpoints + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.Chunks[mapkey] = mapvalue + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipFileCheckpoints(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthFileCheckpoints + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ChunkCheckpointModel: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ChunkCheckpointModel: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Path", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthFileCheckpoints + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Path = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType) + } + m.Offset = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Offset |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Columns", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthFileCheckpoints + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Columns = append(m.Columns[:0], dAtA[iNdEx:postIndex]...) + if m.Columns == nil { + m.Columns = []byte{} + } + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ShouldIncludeRowId", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.ShouldIncludeRowId = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EndOffset", wireType) + } + m.EndOffset = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EndOffset |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Pos", wireType) + } + m.Pos = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Pos |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PrevRowidMax", wireType) + } + m.PrevRowidMax = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PrevRowidMax |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RowidMax", wireType) + } + m.RowidMax = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.RowidMax |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 9: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field KvcBytes", wireType) + } + m.KvcBytes = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.KvcBytes |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 10: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field KvcKvs", wireType) + } + m.KvcKvs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.KvcKvs |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 11: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field KvcChecksum", wireType) + } + m.KvcChecksum = 0 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + m.KvcChecksum = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) + iNdEx += 8 + default: + iNdEx = preIndex + skippy, err := skipFileCheckpoints(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthFileCheckpoints + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipFileCheckpoints(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthFileCheckpoints + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipFileCheckpoints(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthFileCheckpoints = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowFileCheckpoints = fmt.Errorf("proto: integer overflow") +) + +func init() { + proto.RegisterFile("lightning/restore/file_checkpoints.proto", fileDescriptor_file_checkpoints_e8be6a4c2b3dc1c8) +} + +var fileDescriptor_file_checkpoints_e8be6a4c2b3dc1c8 = []byte{ + // 499 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcd, 0x8e, 0xd3, 0x3c, + 0x14, 0xfd, 0xdc, 0xb4, 0x69, 0x7b, 0xd3, 0x0f, 0x55, 0xd6, 0x0c, 0x98, 0x41, 0x54, 0x99, 0x8a, + 0x45, 0x24, 0xa4, 0x8e, 0x18, 0x36, 0xc0, 0xb2, 0x85, 0xc5, 0x08, 0x8d, 0x40, 0x16, 0x6c, 0xd8, + 0x44, 0x69, 0xe2, 0x36, 0x51, 0x5c, 0xbb, 0x8a, 0x9d, 0xcc, 0xf4, 0x05, 0x58, 0xf3, 0x26, 0xbc, + 0x06, 0x4b, 0x1e, 0x80, 0x05, 0x2a, 0x2f, 0x82, 0xec, 0x04, 0x35, 0xa0, 0x0a, 0xb1, 0xbb, 0xe7, + 0xe7, 0x9e, 0xdb, 0x1e, 0xc5, 0x10, 0xf0, 0x6c, 0x9d, 0x6a, 0x91, 0x89, 0xf5, 0x45, 0xc1, 0x94, + 0x96, 0x05, 0xbb, 0x58, 0x65, 0x9c, 0x85, 0x71, 0xca, 0xe2, 0x7c, 0x2b, 0x33, 0xa1, 0xd5, 0x6c, + 0x5b, 0x48, 0x2d, 0xa7, 0x9f, 0x11, 0x8c, 0x17, 0x07, 0xf6, 0x5a, 0x26, 0x8c, 0xe3, 0x97, 0xe0, + 0xb5, 0x9c, 0x04, 0xf9, 0x4e, 0xe0, 0x5d, 0x4e, 0x67, 0x7f, 0xfa, 0xda, 0xc4, 0x2b, 0xa1, 0x8b, + 0x1d, 0x6d, 0xaf, 0x9d, 0xbd, 0xff, 0x2d, 0xd9, 0x1a, 0xf0, 0x18, 0x9c, 0x9c, 0xed, 0x08, 0xf2, + 0x51, 0x30, 0xa4, 0x66, 0xc4, 0x8f, 0xa1, 0x57, 0x45, 0xbc, 0x64, 0xa4, 0xe3, 0xa3, 0xc0, 0xbb, + 0x3c, 0x9d, 0xbd, 0x8b, 0x96, 0x9c, 0x1d, 0x16, 0xed, 0x25, 0x5a, 0x7b, 0x5e, 0x74, 0x9e, 0xa1, + 0xe9, 0xc7, 0x0e, 0x9c, 0x1c, 0xf3, 0x60, 0x0c, 0xdd, 0x34, 0x52, 0xa9, 0x0d, 0x1f, 0x51, 0x3b, + 0xe3, 0xbb, 0xe0, 0x32, 0xb1, 0xce, 0x44, 0x1d, 0x3f, 0xa2, 0x0d, 0x32, 0xbc, 0xd2, 0x91, 0x2e, + 0x15, 0x71, 0x7c, 0x14, 0xfc, 0x4f, 0x1b, 0x84, 0x1f, 0x02, 0x44, 0x9c, 0xcb, 0x38, 0x5c, 0x46, + 0x8a, 0x91, 0xae, 0x8f, 0x02, 0x87, 0x0e, 0x2d, 0x33, 0x8f, 0x14, 0xc3, 0xcf, 0xc1, 0x8d, 0xd3, + 0x52, 0xe4, 0x8a, 0xf4, 0x6c, 0x27, 0xe7, 0x47, 0x7f, 0xed, 0x6c, 0x61, 0x3d, 0x75, 0x25, 0xcd, + 0xc2, 0xd9, 0x5b, 0xf0, 0x5a, 0xf4, 0xbf, 0x14, 0x61, 0xed, 0x7f, 0x29, 0xe2, 0x5b, 0x07, 0x4e, + 0x8e, 0x79, 0x4c, 0x11, 0xdb, 0x48, 0xa7, 0x4d, 0xb8, 0x9d, 0xcd, 0x1f, 0x96, 0xab, 0x95, 0x62, + 0xda, 0xc6, 0x3b, 0xb4, 0x41, 0x98, 0x40, 0x3f, 0x96, 0xbc, 0xdc, 0x88, 0xba, 0x89, 0x11, 0xfd, + 0x05, 0xf1, 0x13, 0x38, 0x55, 0xa9, 0x2c, 0x79, 0x12, 0x66, 0x22, 0xe6, 0x65, 0xc2, 0xc2, 0x42, + 0xde, 0x84, 0x59, 0x62, 0x5b, 0x19, 0x50, 0x5c, 0x8b, 0x57, 0xb5, 0x46, 0xe5, 0xcd, 0x55, 0x62, + 0xda, 0x63, 0x22, 0x09, 0x9b, 0x43, 0xbd, 0xba, 0x3d, 0x26, 0x92, 0x37, 0xf5, 0xad, 0x31, 0x38, + 0x5b, 0xa9, 0x88, 0x6b, 0x79, 0x33, 0xe2, 0x47, 0x70, 0x67, 0x5b, 0xb0, 0xca, 0x24, 0x67, 0x49, + 0xb8, 0x89, 0x6e, 0x49, 0xdf, 0x8a, 0x23, 0xc3, 0x52, 0x43, 0x5e, 0x47, 0xb7, 0xf8, 0x01, 0x0c, + 0x0f, 0x86, 0x81, 0x35, 0x0c, 0x8a, 0x96, 0x98, 0x57, 0x71, 0xb8, 0xdc, 0x69, 0xa6, 0xc8, 0xd0, + 0x47, 0x41, 0x97, 0x0e, 0xf2, 0x2a, 0x9e, 0x1b, 0x8c, 0xef, 0x41, 0xdf, 0x88, 0x79, 0xa5, 0x08, + 0x58, 0xc9, 0xcd, 0xab, 0xf8, 0x75, 0xa5, 0xf0, 0x39, 0x8c, 0x8c, 0x60, 0x3f, 0x57, 0x55, 0x6e, + 0x88, 0xe7, 0xa3, 0xc0, 0xa5, 0x5e, 0x5e, 0xc5, 0x8b, 0x86, 0x9a, 0xdf, 0xff, 0xb2, 0x9f, 0xa0, + 0xaf, 0xfb, 0x09, 0xfa, 0xbe, 0x9f, 0xa0, 0x4f, 0x3f, 0x26, 0xff, 0x7d, 0xe8, 0x37, 0x6f, 0x69, + 0xe9, 0xda, 0xb7, 0xf3, 0xf4, 0x67, 0x00, 0x00, 0x00, 0xff, 0xff, 0x11, 0x26, 0x76, 0xf9, 0x67, + 0x03, 0x00, 0x00, +} diff --git a/lightning/restore/file_checkpoints.proto b/lightning/restore/file_checkpoints.proto new file mode 100644 index 0000000000000..9d1236080419f --- /dev/null +++ b/lightning/restore/file_checkpoints.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; +option go_package = "restore"; + +message CheckpointsModel { + // key is table_name + map checkpoints = 1; +} + +message TableCheckpointModel { + bytes hash = 1; + bytes engine = 2; + uint32 status = 3; + int64 alloc_base = 4; + // key is "$path:$offset" + map chunks = 5; +} + +message ChunkCheckpointModel { + string path = 1; + int64 offset = 2; + bytes columns = 3; + bool should_include_row_id = 4; + int64 end_offset = 5; + int64 pos = 6; + int64 prev_rowid_max = 7; + int64 rowid_max = 8; + uint64 kvc_bytes = 9; + uint64 kvc_kvs = 10; + fixed64 kvc_checksum = 11; +} diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 5d21ea3fa649f..162f8663859dd 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -142,16 +142,26 @@ func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (CheckpointsDB, if !cfg.Checkpoint.Enable { return NewNullCheckpointsDB(), nil } - db, err := sql.Open("mysql", cfg.Checkpoint.DSN) - if err != nil { - return nil, errors.Trace(err) - } - cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema) - if err != nil { - db.Close() - return nil, errors.Trace(err) + + switch cfg.Checkpoint.Driver { + case "mysql": + db, err := sql.Open("mysql", cfg.Checkpoint.DSN) + if err != nil { + return nil, errors.Trace(err) + } + cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema) + if err != nil { + db.Close() + return nil, errors.Trace(err) + } + return cpdb, nil + + case "file": + return NewFileCheckpointsDB(cfg.Checkpoint.DSN), nil + + default: + return nil, errors.Errorf("Unknown checkpoint driver %s", cfg.Checkpoint.Driver) } - return cpdb, nil } func (rc *RestoreController) Wait() { diff --git a/tests/checkpoint/config.toml b/tests/checkpoint/config.toml index 9c03363bad977..0512adf4c3f42 100644 --- a/tests/checkpoint/config.toml +++ b/tests/checkpoint/config.toml @@ -8,6 +8,7 @@ level = "error" [checkpoint] enable = true schema = "tidb_lightning_checkpoint_test_cppk" +driver = "mysql" keep-after-success = true [tikv-importer] diff --git a/tests/checkpoint_chunks/config.toml b/tests/checkpoint_chunks/config.toml index 90a1f0c6d4e09..aff3da205ea2b 100644 --- a/tests/checkpoint_chunks/config.toml +++ b/tests/checkpoint_chunks/config.toml @@ -1,5 +1,4 @@ [lightning] -# pprof-port = 12683 region-concurrency = 1 check-requirements = false file = "/tmp/lightning_test_result/lightning.log" @@ -8,6 +7,7 @@ level = "error" [checkpoint] enable = true schema = "tidb_lightning_checkpoint_test_cpch" +driver = "mysql" keep-after-success = true [tikv-importer] diff --git a/tests/checkpoint_chunks/file.toml b/tests/checkpoint_chunks/file.toml new file mode 100644 index 0000000000000..0cd4b8fc4c1a2 --- /dev/null +++ b/tests/checkpoint_chunks/file.toml @@ -0,0 +1,30 @@ +[lightning] +region-concurrency = 1 +check-requirements = false +file = "/tmp/lightning_test_result/lightning.log" +level = "error" + +[checkpoint] +enable = true +schema = "tidb_lightning_checkpoint_test_cpch" +driver = "file" +dsn = "/tmp/lightning_test_result/cpch.pb" + +[tikv-importer] +addr = "127.0.0.1:19557" + +[mydumper] +data-source-dir = "/tmp/lightning_test_result/cpch.mydump" + +[tidb] +host = "127.0.0.1" +port = 4000 +user = "root" +status-port = 10080 +pd-addr = "127.0.0.1:2379" +log-level = "error" + +[post-restore] +checksum = true +compact = false +analyze = false diff --git a/tests/checkpoint_chunks/run.sh b/tests/checkpoint_chunks/run.sh index 9b748eace24a7..8d0a442d6122c 100755 --- a/tests/checkpoint_chunks/run.sh +++ b/tests/checkpoint_chunks/run.sh @@ -56,3 +56,22 @@ run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.table_v1 WHERE check_contains "count(*): 1" run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.chunk_v3 WHERE pos = end_offset" check_contains "count(*): $CHUNK_COUNT" + +# Repeat, but using the file checkpoint +run_sql 'DROP DATABASE IF EXISTS cpch_tsr' +run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_test_cpch' +rm -f "$TEST_DIR/cpch.pb" + +set +e +for i in $(seq "$CHUNK_COUNT"); do + echo "******** Importing Chunk using File checkpoint Now (step $i/$CHUNK_COUNT) ********" + PIDFILE="$PIDFILE" run_lightning file +done +set -e + +echo "******** Verify File checkpoint no-op ********" +PIDFILE="$PIDFILE" run_lightning file +run_sql 'SELECT count(i), sum(i) FROM cpch_tsr.tbl;' +check_contains "count(i): $(($ROW_COUNT*$CHUNK_COUNT))" +check_contains "sum(i): $(( $ROW_COUNT*$CHUNK_COUNT*(($CHUNK_COUNT+2)*$ROW_COUNT + 1)/2 ))" +[ -f "$TEST_DIR/cpch.pb" ] diff --git a/tests/checkpoint_error_destroy/bad.toml b/tests/checkpoint_error_destroy/bad.toml index b711777e9d3f9..e1e2175be3ba7 100644 --- a/tests/checkpoint_error_destroy/bad.toml +++ b/tests/checkpoint_error_destroy/bad.toml @@ -5,6 +5,7 @@ level = "info" [checkpoint] enable = true +driver = "mysql" [tikv-importer] addr = "127.0.0.1:8808" diff --git a/tests/checkpoint_error_destroy/bad_file.toml b/tests/checkpoint_error_destroy/bad_file.toml new file mode 100644 index 0000000000000..7d3e2f5f369cc --- /dev/null +++ b/tests/checkpoint_error_destroy/bad_file.toml @@ -0,0 +1,27 @@ +[lightning] +check-requirements = false +file = "/tmp/lightning_test_result/lightning.log" +level = "info" + +[checkpoint] +enable = true +driver = "file" + +[tikv-importer] +addr = "127.0.0.1:8808" + +[mydumper] +data-source-dir = "tests/checkpoint_error_destroy/bad-data" + +[tidb] +host = "127.0.0.1" +port = 4000 +user = "root" +status-port = 10080 +pd-addr = "127.0.0.1:2379" +log-level = "error" + +[post-restore] +checksum = true +compact = false +analyze = false diff --git a/tests/checkpoint_error_destroy/good.toml b/tests/checkpoint_error_destroy/good.toml index 364ea95c01c1b..ceedb2ea53b37 100644 --- a/tests/checkpoint_error_destroy/good.toml +++ b/tests/checkpoint_error_destroy/good.toml @@ -5,6 +5,7 @@ level = "info" [checkpoint] enable = true +driver = "mysql" [tikv-importer] addr = "127.0.0.1:8808" diff --git a/tests/checkpoint_error_destroy/good_file.toml b/tests/checkpoint_error_destroy/good_file.toml new file mode 100644 index 0000000000000..6cf718ebfeac2 --- /dev/null +++ b/tests/checkpoint_error_destroy/good_file.toml @@ -0,0 +1,27 @@ +[lightning] +check-requirements = false +file = "/tmp/lightning_test_result/lightning.log" +level = "info" + +[checkpoint] +enable = true +driver = "file" + +[tikv-importer] +addr = "127.0.0.1:8808" + +[mydumper] +data-source-dir = "tests/checkpoint_error_destroy/good-data" + +[tidb] +host = "127.0.0.1" +port = 4000 +user = "root" +status-port = 10080 +pd-addr = "127.0.0.1:2379" +log-level = "error" + +[post-restore] +checksum = true +compact = false +analyze = false diff --git a/tests/checkpoint_error_destroy/run.sh b/tests/checkpoint_error_destroy/run.sh index c4f44b13e1da7..103e93106883b 100755 --- a/tests/checkpoint_error_destroy/run.sh +++ b/tests/checkpoint_error_destroy/run.sh @@ -8,9 +8,26 @@ for i in $(seq 8); do set +e run_lightning bad set -e - bin/tidb-lightning-ctl -checkpoint-error-destroy=all + bin/tidb-lightning-ctl -config=tests/$TEST_NAME/bad.toml -checkpoint-error-destroy=all done run_lightning good run_sql 'SELECT * FROM cped.t' check_contains 'x: 1999-09-09 09:09:09' + +# Try again with the file checkpoints + +run_sql 'DROP DATABASE cped' + +for i in $(seq 8); do + set +e + run_lightning bad_file + set -e + ls -la /tmp/lightning_test_result/importer/.temp/ + bin/tidb-lightning-ctl -config=tests/$TEST_NAME/bad_file.toml -checkpoint-error-destroy=all + ls -la /tmp/lightning_test_result/importer/.temp/ +done + +run_lightning good_file +run_sql 'SELECT * FROM cped.t' +check_contains 'x: 1999-09-09 09:09:09' diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 069dcb8f9c8a6..83b74e4593da2 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -28,10 +28,15 @@ max-backups = 14 enable = true # The schema name (database name) to store the checkpoints schema = "tidb_lightning_checkpoint" -# The data source name (DSN) in the form "USER:PASS@tcp(HOST:PORT)/". -# If not specified, the TiDB server from the [tidb] section will be used to store the checkpoints. You could also -# specify a different MySQL-compatible database server if you like. -#dsn = "root@tcp(127.0.0.1:4000)/" +# Where to store the checkpoints. +# Set to "file" to store as a local file. +# Set to "mysql" to store into a remote MySQL-compatible database +driver = "file" +# The data source name (DSN) indicating the location of the checkpoint storage. +# For "file" driver, the DSN is a path. If not specified, Lightning would default to "/tmp/CHKPTSCHEMA.pb". +# For "mysql" driver, the DSN is a URL in the form "USER:PASS@tcp(HOST:PORT)/". +# If not specified, the TiDB server from the [tidb] section will be used to store the checkpoints. +#dsn = "/tmp/tidb_lightning_checkpoint.pb" # Whether to keep the checkpoints after all data are imported. If false, the checkpoints will be deleted. The schema # needs to be dropped manually, however. #keep-after-success = false diff --git a/vendor/modules.txt b/vendor/modules.txt index 35b4227b13aac..da49f8c3d8d1d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -29,8 +29,8 @@ github.com/cznic/y # github.com/go-sql-driver/mysql v1.4.0 github.com/go-sql-driver/mysql # github.com/gogo/protobuf v1.1.1 -github.com/gogo/protobuf/gogoproto github.com/gogo/protobuf/proto +github.com/gogo/protobuf/gogoproto github.com/gogo/protobuf/protoc-gen-gogo/descriptor # github.com/golang/protobuf v1.2.0 github.com/golang/protobuf/proto