Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
lightning: restore W.I.P
Browse files Browse the repository at this point in the history
  • Loading branch information
hidehalo committed Dec 1, 2020
1 parent 8a276c0 commit 54d0bbb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
38 changes: 24 additions & 14 deletions lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,32 +135,42 @@ func InitSchema(ctx context.Context, g glue.Glue, database string, tablesSchema
if err != nil {
return errors.Trace(err)
}

task := logger.Begin(zap.InfoLevel, "create tables")
var sqlCreateStmts []string
stmtQueue := make(chan string, len(g.tableWorkers.GetLimit())))
createTableContext, finishCreateTable := context.WithCancel(ctx)
createTable := func (ctx ctx context.Context, database, table, stmt string, logger log.Logger) error {
log := zap.String("table", common.UniqueTable(database, tbl))
err = sqlExecutor.ExecuteWithLog(ctx, stmt, "create table", logger.With(log))
if err != nil {
return error
}
return nil
)
go func (ctx context.Context, database, table, string logger log.Logger) {
for {
switch {
case stmt := <-stmtQueue:
go createTable(ctx, stmt, database, table, logger)
case <-ctx.Done():
return
}
}
}(createTableContext, database, tbl, logger)
for tbl, sqlCreateTable := range tablesSchema {
task.Debug("create table", zap.String("schema", sqlCreateTable))

sqlCreateStmts, err = createTableIfNotExistsStmt(g.GetParser(), sqlCreateTable, database, tbl)
if err != nil {
break
}

//TODO: maybe we should put these createStems into a transaction
for _, s := range sqlCreateStmts {
err = sqlExecutor.ExecuteWithLog(
ctx,
s,
"create table",
logger.With(zap.String("table", common.UniqueTable(database, tbl))),
)
if err != nil {
break
}
for _, stmt := range sqlCreateStmts {
stmtQueue <- stmt
}
}
finishCreateTable()
close(stmtQueue)
task.End(zap.ErrorLevel, err)

return errors.Trace(err)
}

Expand Down
4 changes: 4 additions & 0 deletions lightning/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ func (pool *Pool) Recycle(worker *Worker) {
func (pool *Pool) HasWorker() bool {
return len(pool.workers) > 0
}

func (pool *Pool) GetLimit() int {
return pool.limit
}

0 comments on commit 54d0bbb

Please sign in to comment.