Skip to content

Commit

Permalink
Optimize the performance of bidirectional synchronous update table of…
Browse files Browse the repository at this point in the history
… _drainer_repl_mark (#903) (#909)
  • Loading branch information
sre-bot committed Feb 24, 2020
1 parent b49c75c commit 51b82e1
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 15 deletions.
2 changes: 2 additions & 0 deletions drainer/loopbacksync/loopbacksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package loopbacksync
const (
//MarkTableName mark table name
MarkTableName = "retl._drainer_repl_mark"
//ID syncer worker coroutine id
ID = "id"
//ChannelID channel id
ChannelID = "channel_id"
//Val val
Expand Down
49 changes: 40 additions & 9 deletions pkg/loader/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
gosql "database/sql"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
Expand All @@ -32,20 +33,26 @@ import (
"golang.org/x/sync/errgroup"
)

var defaultBatchSize = 128
var (
defaultBatchSize = 128
defaultWorkerCount = 16
index int64
)

type executor struct {
db *gosql.DB
batchSize int
workerCount int
info *loopbacksync.LoopBackSync
queryHistogramVec *prometheus.HistogramVec
refreshTableInfo func(schema string, table string) (info *tableInfo, err error)
}

func newExecutor(db *gosql.DB) *executor {
exe := &executor{
db: db,
batchSize: defaultBatchSize,
db: db,
batchSize: defaultBatchSize,
workerCount: defaultWorkerCount,
}

return exe
Expand All @@ -65,6 +72,10 @@ func (e *executor) setSyncInfo(info *loopbacksync.LoopBackSync) {
e.info = info
}

func (e *executor) setWorkerCount(workerCount int) {
e.workerCount = workerCount
}

func (e *executor) withQueryHistogramVec(queryHistogramVec *prometheus.HistogramVec) *executor {
e.queryHistogramVec = queryHistogramVec
return e
Expand Down Expand Up @@ -119,16 +130,36 @@ func (e *executor) updateMark(channel string, tx *tx) error {
if e.info == nil {
return nil
}
status := 1
columns := fmt.Sprintf("(%s,%s,%s) VALUES(?,?,?)", loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo)
var args []interface{}
sql := fmt.Sprintf("INSERT INTO %s%s on duplicate key update %s=%s+1;", loopbacksync.MarkTableName, columns, loopbacksync.Val, loopbacksync.Val)
args = append(args, e.info.ChannelID, status, channel)
_, err := tx.autoRollbackExec(sql, args...)
sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.ID, loopbacksync.ChannelID)
args = append(args, e.addIndex(), e.info.ChannelID)
_, err1 := tx.autoRollbackExec(sql, args...)
if err1 != nil {
return errors.Trace(err1)
}
return nil
}

func (e *executor) cleanChannelInfo() error {
if e.info == nil {
return nil
}
tx, err := e.begin()
if err != nil {
return errors.Trace(err)
}
return nil
var args []interface{}
sql := fmt.Sprintf("delete from %s where %s=? ", loopbacksync.MarkTableName, loopbacksync.ChannelID)
args = append(args, e.info.ChannelID)
_, err1 := tx.autoRollbackExec(sql, args...)
if err1 != nil {
return errors.Trace(err1)
}
err2 := tx.commit()
return errors.Trace(err2)
}
func (e *executor) addIndex() int64 {
return atomic.AddInt64(&index, 1) % ((int64)(e.workerCount))
}

// return a wrap of sql.Tx
Expand Down
50 changes: 49 additions & 1 deletion pkg/loader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,16 +505,63 @@ func (s *loaderImpl) createMarkTable() error {
return nil
}

func (s *loaderImpl) initMarkTable() error {
if err := s.createMarkTable(); err != nil {
return errors.Trace(err)
}
return s.initMarkTableData()
}
func (s *loaderImpl) initMarkTableData() error {
tx, err := s.db.Begin()
if err != nil {
return errors.Trace(err)
}
status := 1
channel := ""
var builder strings.Builder
holder := "(?,?,?,?)"
columns := fmt.Sprintf("(%s,%s,%s,%s) ", loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo)
builder.WriteString("REPLACE INTO " + loopbacksync.MarkTableName + columns + " VALUES ")
for i := 0; i < s.workerCount; i++ {
if i > 0 {
builder.WriteByte(',')
}
builder.WriteString(holder)
}
var args []interface{}
for id := 0; id < s.workerCount; id++ {
args = append(args, id, s.loopBackSyncInfo.ChannelID, status, channel)
}
query := builder.String()
if _, err = tx.Exec(query, args...); err != nil {
log.Error("Exec fail, will rollback", zap.String("query", query), zap.Reflect("args", args), zap.Error(err))
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Auto rollback", zap.Error(rbErr))
}
return errors.Trace(err)
}
if err = tx.Commit(); err != nil {
return errors.Trace(err)
}
return nil
}

func (s *loaderImpl) cleanChannelInfo() {
executor := s.getExecutor()
_ = executor.cleanChannelInfo()
}

// Run will quit when meet any error, or all the txn are drained
func (s *loaderImpl) Run() error {
if s.loopBackSyncInfo != nil && s.loopBackSyncInfo.LoopbackControl {
if err := s.createMarkTable(); err != nil {
if err := s.initMarkTable(); err != nil {
return errors.Trace(err)
}
}
txnManager := newTxnManager(1024, s.input)
defer func() {
log.Info("Run()... in Loader quit")
s.cleanChannelInfo()
close(s.successTxn)
txnManager.Close()
}()
Expand Down Expand Up @@ -624,6 +671,7 @@ func (s *loaderImpl) getExecutor() *executor {
e = e.withRefreshTableInfo(s.refreshTableInfo)
}
e.setSyncInfo(s.loopBackSyncInfo)
e.setWorkerCount(s.workerCount)
if s.metrics != nil && s.metrics.QueryHistogramVec != nil {
e = e.withQueryHistogramVec(s.metrics.QueryHistogramVec)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/loader/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (dml *DML) updateSQL() (sql string, args []interface{}) {
}

func createMarkTableDDL() string {
sql := fmt.Sprintf("CREATE TABLE If Not Exists %s ( %s bigint primary key, %s bigint DEFAULT 0, %s varchar(64));", loopbacksync.MarkTableName, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo)
sql := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.ID, loopbacksync.ChannelID)
return sql
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/loader/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,10 @@ func (s *SQLSuite) TestUpdateMarkSQL(c *check.C) {
db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)
defer db.Close()
columns := fmt.Sprintf("(%s,%s,%s) VALUES(?,?,?)", loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo)
sql := fmt.Sprintf("INSERT INTO %s%s on duplicate key update %s=%s+1;", loopbacksync.MarkTableName, columns, loopbacksync.Val, loopbacksync.Val)
sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.ID, loopbacksync.ChannelID)
mock.ExpectBegin()
mock.ExpectExec(regexp.QuoteMeta(sql)).
WithArgs(100, 1, "").WillReturnResult(sqlmock.NewResult(1, 1))
WithArgs(1, 100).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
e := newExecutor(db)
tx, err := e.begin()
Expand All @@ -261,6 +260,6 @@ func (s *SQLSuite) TestUpdateMarkSQL(c *check.C) {
}
func (s *SQLSuite) TestCreateMarkTable(c *check.C) {
sql := createMarkTableDDL()
sql1 := fmt.Sprintf("CREATE TABLE If Not Exists %s ( %s bigint primary key, %s bigint DEFAULT 0, %s varchar(64));", loopbacksync.MarkTableName, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo)
sql1 := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.ID, loopbacksync.ChannelID)
c.Assert(sql, check.Equals, sql1)
}

0 comments on commit 51b82e1

Please sign in to comment.