Skip to content

Commit

Permalink
Merge branch 'master' into fix_multi_rows
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Apr 27, 2020
2 parents 12f44ff + 507d39f commit afbd04f
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 3 deletions.
18 changes: 18 additions & 0 deletions drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package drainer

import (
"bytes"
"crypto/tls"
"fmt"
"net/http"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -260,9 +262,25 @@ func (c *Collector) reportErr(ctx context.Context, err error) {
}
}

// ref https://github.com/pingcap/tidb/pull/14954
// TiDB will write a fake ddl binlog like: select setval(`seq`.`sequence_name`, 1000)
// when the value of sequence is changed for replicate the value of the sequence.
// we CAN NOT query the job from the tikv according the job id.
// just skip this kind of binlog now.
func skipQueryJob(binlog *binlog.Binlog) bool {
q := binlog.GetDdlQuery()
return bytes.HasPrefix(q, []byte("select setval"))
}

func (c *Collector) syncBinlog(item *binlogItem) error {
binlog := item.binlog
// DO NOT replicate the value of sequence now.
if skipQueryJob(binlog) {
return nil
}

if binlog.DdlJobId > 0 {
log.Info("start query job", zap.Int64("id", binlog.DdlJobId), zap.Stringer("binlog", binlog))
msgPrefix := fmt.Sprintf("get ddl job by id %d error", binlog.DdlJobId)
var job *model.Job
for {
Expand Down
5 changes: 2 additions & 3 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionCreateTable, model.ActionCreateView, model.ActionRecoverTable:
case model.ActionCreateTable, model.ActionCreateView, model.ActionCreateSequence, model.ActionRecoverTable:
table := job.BinlogInfo.TableInfo
if table == nil {
return "", "", "", errors.NotFoundf("table %d", job.TableID)
Expand All @@ -380,7 +380,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionDropTable, model.ActionDropView:
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
Expand Down Expand Up @@ -422,7 +422,6 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
schemaName = schema.Name.O
tableName = table.Name.O
s.truncateTableID[job.TableID] = struct{}{}

default:
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
Expand Down
17 changes: 17 additions & 0 deletions tests/sequence/drainer.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
data-dir = '/tmp/tidb_binlog_test/data.drainer'

[syncer]
txn-batch = 1
worker-count = 1
safe-mode = false
db-type = 'mysql'
replicate-do-db = ['seq']

[syncer.to]
host = '127.0.0.1'
user = 'root'
password = ''
port = 3306

[syncer.to.checkpoint]
schema = "seq_checkpoint"
29 changes: 29 additions & 0 deletions tests/sequence/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/sh

set -e

cd "$(dirname "$0")"

run_drainer &

sleep 3

run_sql 'CREATE DATABASE seq;'
run_sql 'CREATE SEQUENCE seq.sequence_name;'
run_sql 'CREATE TABLE seq.table_name (id INT DEFAULT NEXT VALUE FOR seq.sequence_name, val int);'

run_sql 'INSERT INTO seq.table_name(val) values(10);'

sleep 3

down_run_sql 'SELECT count(*), sum(id), sum(val) FROM seq.table_name;'
check_contains 'count(*): 1'
check_contains 'sum(id): 1'
check_contains 'sum(val): 10'


run_sql 'DROP TABLE seq.table_name;'
run_sql 'DROP SEQUENCE seq.sequence_name;'


killall drainer

0 comments on commit afbd04f

Please sign in to comment.