Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
restore: try to create tables in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
hidehalo committed Dec 3, 2020
1 parent 8a276c0 commit d7ce306
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 26 deletions.
122 changes: 122 additions & 0 deletions lightning/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"errors"
"sync"

"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -47,6 +48,127 @@ type SQLExecutor interface {
Close()
}

type dbConnector struct {
dbConf config.DBStore
lazyConn func(dbConf config.DBStore) (*sql.DB, error)
conn *sql.DB
err error
}

func (connector *dbConnector) connect() error {
if connector.conn == nil {
connector.conn, connector.err = connector.lazyConn(connector.dbConf)
}
return connector.err
}

func (connector *dbConnector) close() error {
if connector.conn == nil {
return connector.conn.Close()
}
return nil
}

func (connector *dbConnector) getError() error {
return connector.err
}

func (connector *dbConnector) getDB() *sql.DB {
return connector.conn
}

// DBPool is multi-connection implementation of `SQLExecutor` interface
// It will pick one of any idle connections to handle DB events
// and put it back when events done
type DBPool struct {
SQLExecutor
singleton *dbConnector
pool sync.Pool
}

func (dbPool *DBPool) getConn() *dbConnector {
conn := dbPool.pool.Get()
if conn != nil {
return conn.(*dbConnector)
}
return dbPool.singleton
}

func (dbPool *DBPool) putConn(conn *dbConnector) *DBPool {
dbPool.pool.Put(conn)
return dbPool
}

// NewDBPool is constructor of DBPool
func NewDBPool(size int, dbConf config.DBStore, lazyConn func(dbConf config.DBStore) (*sql.DB, error)) (*DBPool, error) {
// TODO: `size` validation
// NOTE: make sure we have one db connection at least
singleton, err := lazyConn(dbConf)
if err != nil {
return nil, err
}
dbPool := &DBPool{}
dbPool.singleton = &dbConnector{
dbConf: dbConf,
lazyConn: lazyConn,
conn: singleton,
err: nil,
}
dbPool.putConn(dbPool.singleton)
for i := 0; i < size-1; i++ {
dbPool.putConn(&dbConnector{
dbConf: dbConf,
lazyConn: lazyConn,
conn: nil,
err: nil,
})
}
return dbPool, nil
}

// ExecuteWithLog implement `SQLExecutor` interface
func (dbPool *DBPool) ExecuteWithLog(ctx context.Context, query string, purpose string, logger log.Logger) error {
conn := dbPool.getConn()
defer func() {
dbPool.putConn(conn)
}()
if err := conn.connect(); err != nil {
return err
}
sql := common.SQLWithRetry{
DB: conn.getDB(),
Logger: logger,
}
return sql.Exec(ctx, purpose, query)
}

// ObtainStringWithLog implement `SQLExecutor` interface
func (dbPool *DBPool) ObtainStringWithLog(ctx context.Context, query string, purpose string, logger log.Logger) (string, error) {
conn := dbPool.getConn()
defer func() {
dbPool.putConn(conn)
}()
var s string
var err error
if err = conn.connect(); err == nil {
err = common.SQLWithRetry{
DB: conn.getDB(),
Logger: logger,
}.QueryRow(ctx, purpose, query, &s)
}
return s, err
}

// Close implement `SQLExecutor` interface
func (dbPool *DBPool) Close() {
for conn := dbPool.getConn(); conn != nil; conn = dbPool.getConn() {
if conn.getError() == nil {
// CONFUSED: why we ignore error occurs via db connection close issue?
_ = conn.close()
}
}
}

type ExternalTiDBGlue struct {
db *sql.DB
parser *parser.Parser
Expand Down
28 changes: 25 additions & 3 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package restore

import (
"context"
"database/sql"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -306,15 +307,36 @@ func (rc *RestoreController) restoreSchema(ctx context.Context) error {
defer db.Close()
db.ExecContext(ctx, "SET SQL_MODE = ?", rc.cfg.TiDB.StrSQLMode)
}

// CONFUSED: how we control concurrency via configuration?
// OR do not implement glue.SQLExecutor inside ExternalTiDBGlue
// Maybe we can aggregate an instance of glue.SQLExecutor?
concurrency := 16
var sqlExec glue.SQLExecutor
var err error
if concurrency > 1 {
dbFactory := func(conf config.DBStore) (*sql.DB, error) {
db, err := DBFromConfig(conf)
if err == nil {
db.ExecContext(ctx, "SET SQL_MODE = ?", conf.StrSQLMode)
}
return db, err
}
sqlExec, err = glue.NewDBPool(concurrency, rc.cfg.TiDB, dbFactory)
defer sqlExec.Close()
if err != nil {
return errors.Trace(err)
}
} else {
sqlExec = rc.tidbGlue.GetSQLExecutor()
}
for _, dbMeta := range rc.dbMetas {
task := log.With(zap.String("db", dbMeta.Name)).Begin(zap.InfoLevel, "restore table schema")

tablesSchema := make(map[string]string)
for _, tblMeta := range dbMeta.Tables {
tablesSchema[tblMeta.Name] = tblMeta.GetSchema(ctx, rc.store)
}
err := InitSchema(ctx, rc.tidbGlue, dbMeta.Name, tablesSchema)
err = InitSchema(ctx, concurrency, rc.tidbGlue.GetParser(), sqlExec, dbMeta.Name, tablesSchema)

task.End(zap.ErrorLevel, err)
if err != nil {
Expand All @@ -330,7 +352,7 @@ func (rc *RestoreController) restoreSchema(ctx context.Context) error {
for _, viewMeta := range dbMeta.Views {
viewsSchema[viewMeta.Name] = viewMeta.GetSchema(ctx, rc.store)
}
err := InitSchema(ctx, rc.tidbGlue, dbMeta.Name, viewsSchema)
err := InitSchema(ctx, concurrency, rc.tidbGlue.GetParser(), sqlExec, dbMeta.Name, viewsSchema)

task.End(zap.ErrorLevel, err)
if err != nil {
Expand Down
73 changes: 53 additions & 20 deletions lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"

tmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/tidb-lightning/lightning/mydump"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type TiDBManager struct {
Expand Down Expand Up @@ -124,43 +126,74 @@ func (timgr *TiDBManager) Close() {
timgr.db.Close()
}

func InitSchema(ctx context.Context, g glue.Glue, database string, tablesSchema map[string]string) error {
logger := log.With(zap.String("db", database))
sqlExecutor := g.GetSQLExecutor()
type schemaJob struct {
sql string
log zapcore.Field
}

func InitSchema(ctx context.Context, concurrency int, parser *parser.Parser, exec glue.SQLExecutor, database string, tablesSchema map[string]string) error {
logger := log.With(zap.String("db", database))
var createDatabase strings.Builder
createDatabase.WriteString("CREATE DATABASE IF NOT EXISTS ")
common.WriteMySQLIdentifier(&createDatabase, database)
err := sqlExecutor.ExecuteWithLog(ctx, createDatabase.String(), "create database", logger)
err := exec.ExecuteWithLog(ctx, createDatabase.String(), "create database", logger)
if err != nil {
return errors.Trace(err)
}

task := logger.Begin(zap.InfoLevel, "create tables")
var wg sync.WaitGroup
fnCreateTable := func(ctx context.Context, exec glue.SQLExecutor, jobCh chan *schemaJob, logger log.Logger, errCh chan error) {
for {
select {
case <-ctx.Done():
return
case job := <-jobCh:
//TODO: maybe we should put these createStems into a transaction
err = exec.ExecuteWithLog(ctx, job.sql, "create table", logger.With(job.log))
wg.Done()
if err != nil {
errCh <- err
}
}
}
}
fnHandleError := func(ctx context.Context, err error, errCh chan error, quit context.CancelFunc) {
for {
select {
case err = <-errCh:
quit()
return
case <-ctx.Done():
return
}
}
}
var sqlCreateStmts []string
errCh := make(chan error)
queue := make(chan *schemaJob, concurrency)
task := logger.Begin(zap.InfoLevel, "create tables")
childCtx, cancel := context.WithCancel(ctx)
for i := 0; i < concurrency; i++ {
go fnCreateTable(childCtx, exec, queue, logger, errCh)
}
go fnHandleError(childCtx, err, errCh, cancel)
for tbl, sqlCreateTable := range tablesSchema {
task.Debug("create table", zap.String("schema", sqlCreateTable))

sqlCreateStmts, err = createTableIfNotExistsStmt(g.GetParser(), sqlCreateTable, database, tbl)
sqlCreateStmts, err = createTableIfNotExistsStmt(parser, sqlCreateTable, database, tbl)
if err != nil {
errCh <- err
break
}

//TODO: maybe we should put these createStems into a transaction
for _, s := range sqlCreateStmts {
err = sqlExecutor.ExecuteWithLog(
ctx,
s,
"create table",
logger.With(zap.String("table", common.UniqueTable(database, tbl))),
)
if err != nil {
break
log := zap.String("table", common.UniqueTable(database, tbl))
for _, stmt := range sqlCreateStmts {
wg.Add(1)
queue <- &schemaJob{
sql: stmt,
log: log,
}
}
}
wg.Wait()
task.End(zap.ErrorLevel, err)

return errors.Trace(err)
}

Expand Down
6 changes: 3 additions & 3 deletions lightning/restore/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *tidbSuite) TestInitSchema(c *C) {
ExpectClose()

s.mockDB.MatchExpectationsInOrder(false) // maps are unordered.
err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
err := InitSchema(ctx, 1, s.tiGlue.GetParser(), s.tiGlue.GetSQLExecutor(), "db", map[string]string{
"t1": "create table t1 (a int primary key, b varchar(200));",
"t2": "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;CREATE TABLE `db`.`t2` (xx TEXT) AUTO_INCREMENT=11203;",
})
Expand All @@ -218,7 +218,7 @@ func (s *tidbSuite) TestInitSchemaSyntaxError(c *C) {
s.mockDB.
ExpectClose()

err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
err := InitSchema(ctx, 1, s.tiGlue.GetParser(), s.tiGlue.GetSQLExecutor(), "db", map[string]string{
"t1": "create table `t1` with invalid syntax;",
})
c.Assert(err, NotNil)
Expand All @@ -239,7 +239,7 @@ func (s *tidbSuite) TestInitSchemaUnsupportedSchemaError(c *C) {
s.mockDB.
ExpectClose()

err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
err := InitSchema(ctx, 1, s.tiGlue.GetParser(), s.tiGlue.GetSQLExecutor(), "db", map[string]string{
"t1": "create table `t1` (a VARCHAR(999999999));",
})
c.Assert(err, ErrorMatches, ".*Column length too big.*")
Expand Down

0 comments on commit d7ce306

Please sign in to comment.