Skip to content

Commit

Permalink
Handle create/drop sequence job type (#950)
Browse files Browse the repository at this point in the history
1. handle create/drop sequence job type
2. ref pingcap/tidb#14954
TiDB will write a fake ddl binlog like: select setval(`seq`.`sequence_name`, 1000)
when the value of sequence is changed for replicate the value of the sequence.
we CAN NOT query the job from the tikv according the job id.
just skip this kind of binlog now.
  • Loading branch information
july2993 authored Apr 24, 2020
1 parent 72626a9 commit 507d39f
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 3 deletions.
18 changes: 18 additions & 0 deletions drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package drainer

import (
"bytes"
"crypto/tls"
"fmt"
"net/http"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -260,9 +262,25 @@ func (c *Collector) reportErr(ctx context.Context, err error) {
}
}

// ref https://github.com/pingcap/tidb/pull/14954
// TiDB will write a fake ddl binlog like: select setval(`seq`.`sequence_name`, 1000)
// when the value of sequence is changed for replicate the value of the sequence.
// we CAN NOT query the job from the tikv according the job id.
// just skip this kind of binlog now.
func skipQueryJob(binlog *binlog.Binlog) bool {
q := binlog.GetDdlQuery()
return bytes.HasPrefix(q, []byte("select setval"))
}

func (c *Collector) syncBinlog(item *binlogItem) error {
binlog := item.binlog
// DO NOT replicate the value of sequence now.
if skipQueryJob(binlog) {
return nil
}

if binlog.DdlJobId > 0 {
log.Info("start query job", zap.Int64("id", binlog.DdlJobId), zap.Stringer("binlog", binlog))
msgPrefix := fmt.Sprintf("get ddl job by id %d error", binlog.DdlJobId)
var job *model.Job
for {
Expand Down
5 changes: 2 additions & 3 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionCreateTable, model.ActionCreateView, model.ActionRecoverTable:
case model.ActionCreateTable, model.ActionCreateView, model.ActionCreateSequence, model.ActionRecoverTable:
table := job.BinlogInfo.TableInfo
if table == nil {
return "", "", "", errors.NotFoundf("table %d", job.TableID)
Expand All @@ -380,7 +380,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionDropTable, model.ActionDropView:
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
Expand Down Expand Up @@ -422,7 +422,6 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
schemaName = schema.Name.O
tableName = table.Name.O
s.truncateTableID[job.TableID] = struct{}{}

default:
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
Expand Down
17 changes: 17 additions & 0 deletions tests/sequence/drainer.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
data-dir = '/tmp/tidb_binlog_test/data.drainer'

[syncer]
txn-batch = 1
worker-count = 1
safe-mode = false
db-type = 'mysql'
replicate-do-db = ['seq']

[syncer.to]
host = '127.0.0.1'
user = 'root'
password = ''
port = 3306

[syncer.to.checkpoint]
schema = "seq_checkpoint"
29 changes: 29 additions & 0 deletions tests/sequence/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/sh

set -e

cd "$(dirname "$0")"

run_drainer &

sleep 3

run_sql 'CREATE DATABASE seq;'
run_sql 'CREATE SEQUENCE seq.sequence_name;'
run_sql 'CREATE TABLE seq.table_name (id INT DEFAULT NEXT VALUE FOR seq.sequence_name, val int);'

run_sql 'INSERT INTO seq.table_name(val) values(10);'

sleep 3

down_run_sql 'SELECT count(*), sum(id), sum(val) FROM seq.table_name;'
check_contains 'count(*): 1'
check_contains 'sum(id): 1'
check_contains 'sum(val): 10'


run_sql 'DROP TABLE seq.table_name;'
run_sql 'DROP SEQUENCE seq.sequence_name;'


killall drainer

0 comments on commit 507d39f

Please sign in to comment.