Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
W.I.P
Browse files Browse the repository at this point in the history
  • Loading branch information
hidehalo committed Dec 1, 2020
1 parent 54d0bbb commit 16634a3
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 26 deletions.
83 changes: 83 additions & 0 deletions lightning/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"errors"
"sync"

"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -47,6 +48,88 @@ type SQLExecutor interface {
Close()
}

type dbConnector struct {
dbConf *config.Config
lazyConn func(dbConf *config.Config) (*sql.DB, error)
conn *sql.DB
err error
}

func (connector *dbConnector) connect() error {
if connector.conn == nil {
connector.conn, connector.err = connector.lazyConn(connector.dbConf)
}
return connector.err
}

type dbPool struct {
singleton *dbConnector
pool sync.Pool
}

func (self *dbPool) getConn() *dbConnector {
conn := self.pool.Get()
if conn != nil {
return conn.(*dbConnector)
}
return self.singleton
}

func (self *dbPool) putConn(conn *dbConnector) (self *dbPool) {
self.pool.Put(conn)
return self
}

func newDBPool(size int, dbConf *config.Config, lazyConn func(dbConf *config.Config) (*sql.DB, error)) (*dbPool, error) {
singleton, err := lazyConn(dbConf)
if err != nil {
return nil, err
}
dbPool := &dbPool{}
dbPool.singleton = &dbConnector{
dbConf: dbConf,
lazyConn: lazyConn,
conn: singleton,
err: nil,
}
dbPool.putConn(dbPool.singleton)
for i := 0; i < size-1; i++ {
dbPool.putConn(&dbConnector{
dbConf: dbConf,
lazyConn: lazyConn,
conn: nil,
err: nil,
})
}
return dbPool, nil
}

func (pool *dbPool) ExecuteWithLog(ctx context.Context, query string, purpose string, logger log.Logger) error {
conn := pool.getConn()
if err := conn.connect(); err != nil {
return err
}
defer func() {
pool.putConn(conn)
}()
// TODO: impl
}

func (pool *dbPool) ObtainStringWithLog(ctx context.Context, query string, purpose string, logger log.Logger) (string, error) {
conn := pool.getConn()
if err := conn.connect(); err != nil {
return err
}
defer func() {
pool.putConn(conn)
}()
// TODO: impl
}

func (pool *dbPool) Close() {
// TODO: impl
}

type ExternalTiDBGlue struct {
db *sql.DB
parser *parser.Parser
Expand Down
62 changes: 40 additions & 22 deletions lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"

tmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -137,39 +138,56 @@ func InitSchema(ctx context.Context, g glue.Glue, database string, tablesSchema
}
task := logger.Begin(zap.InfoLevel, "create tables")
var sqlCreateStmts []string
stmtQueue := make(chan string, len(g.tableWorkers.GetLimit())))
createTableContext, finishCreateTable := context.WithCancel(ctx)
createTable := func (ctx ctx context.Context, database, table, stmt string, logger log.Logger) error {
log := zap.String("table", common.UniqueTable(database, tbl))
err = sqlExecutor.ExecuteWithLog(ctx, stmt, "create table", logger.With(log))
if err != nil {
return error
}
return nil
)
go func (ctx context.Context, database, table, string logger log.Logger) {
for {
switch {
case stmt := <-stmtQueue:
go createTable(ctx, stmt, database, table, logger)
case <-ctx.Done():
return
errCh := make(chan error)
var wg sync.WaitGroup
fnCreateTable := func(ctx context.Context, database, table, stmt string, logger log.Logger, errCh chan error) {
defer wg.Done()
select {
case <-ctx.Done():
return
default:
log := zap.String("table", common.UniqueTable(database, table))
err = sqlExecutor.ExecuteWithLog(ctx, stmt, "create table", logger.With(log))
if err != nil {
errCh <- err
}
errCh <- nil
}
}(createTableContext, database, tbl, logger)
}
barrierOfConcurrency := 0
childCtx, finishChild := context.WithCancel(ctx)
for tbl, sqlCreateTable := range tablesSchema {
task.Debug("create table", zap.String("schema", sqlCreateTable))
sqlCreateStmts, err = createTableIfNotExistsStmt(g.GetParser(), sqlCreateTable, database, tbl)
if err != nil {
break
}
//TODO: maybe we should put these createStems into a transaction
for _, stmt := range sqlCreateStmts {
stmtQueue <- stmt
for idx, stmt := range sqlCreateStmts {
select {
case <-ctx.Done():
err = ctx.Err()
break
case err = <-errCh:
if err != nil {
finishChild()
break
}
default:
if barrierOfConcurrency > 15 || idx == len(sqlCreateStmts)-1 {
wg.Wait()
barrierOfConcurrency = 0
}
barrierOfConcurrency++
wg.Add(1)
go fnCreateTable(childCtx, database, tbl, stmt, logger, errCh)
}
}
wg.Wait()
if err != nil {
break
}
}
finishCreateTable()
close(stmtQueue)
task.End(zap.ErrorLevel, err)
return errors.Trace(err)
}
Expand Down
4 changes: 0 additions & 4 deletions lightning/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,3 @@ func (pool *Pool) Recycle(worker *Worker) {
func (pool *Pool) HasWorker() bool {
return len(pool.workers) > 0
}

func (pool *Pool) GetLimit() int {
return pool.limit
}

0 comments on commit 16634a3

Please sign in to comment.