Skip to content

Commit

Permalink
Merge branch 'master' into fix_multi_rows
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 authored May 14, 2020
2 parents afbd04f + 57bb598 commit f40177c
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 13 deletions.
2 changes: 2 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ func (cfg *Config) adjustConfig() error {
}

if cfg.SyncerCfg.DestDBType == "kafka" {
maxMsgSize = maxKafkaMsgSize

// get KafkaAddrs from zookeeper if ZkAddrs is setted
if cfg.SyncerCfg.To.ZKAddrs != "" {
zkClient, err := newZKFromConnectionString(cfg.SyncerCfg.To.ZKAddrs, time.Second*5, time.Second*60)
Expand Down
2 changes: 2 additions & 0 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file")
c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1)
c.Assert(maxMsgSize, Equals, maxGrpcMsgSize)

cfg = NewConfig()
err = cfg.adjustConfig()
Expand Down Expand Up @@ -334,6 +335,7 @@ func (t *testKafkaSuite) TestConfigDestDBTypeKafka(c *C) {
c.Assert(cfg.SyncerCfg.To.KafkaAddrs, Matches, defaultKafkaAddrs)
c.Assert(cfg.SyncerCfg.To.KafkaVersion, Equals, defaultKafkaVersion)
c.Assert(cfg.SyncerCfg.To.KafkaMaxMessages, Equals, 1024)
c.Assert(maxMsgSize, Equals, maxKafkaMsgSize)

// With Zookeeper address
cfg = NewConfig()
Expand Down
3 changes: 2 additions & 1 deletion drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ func DatumToColumn(colInfo *model.ColumnInfo, datum types.Datum) (col *obinlog.C
col.StringValue = proto.String(str)

// numeric type
case "int", "bigint", "smallint", "tinyint":
// https://dev.mysql.com/doc/refman/8.0/en/integer-types.html
case "int", "bigint", "smallint", "tinyint", "mediumint":
str := fmt.Sprintf("%v", datum.GetValue())
if mysql.HasUnsignedFlag(colInfo.Flag) {
val, err := strconv.ParseUint(str, 10, 64)
Expand Down
11 changes: 10 additions & 1 deletion drainer/translator/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package translator
import (
"fmt"
"io"
"strings"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -48,7 +49,7 @@ func TiBinlogToPbBinlog(infoGetter TableInfoGetter, schema string, table string,
if isCreateDatabase {
sql += ";"
} else {
sql = fmt.Sprintf("use %s; %s;", schema, sql)
sql = fmt.Sprintf("use %s; %s;", quoteName(schema), sql)
}

pbBinlog.Tp = pb.BinlogType_DDL
Expand Down Expand Up @@ -303,3 +304,11 @@ func packEvent(schemaName, tableName string, tp pb.EventType, rowData [][]byte)

return event
}

func escapeName(name string) string {
return strings.Replace(name, "`", "``", -1)
}

func quoteName(name string) string {
return "`" + escapeName(name) + "`"
}
2 changes: 1 addition & 1 deletion drainer/translator/pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (t *testPbSuite) TestDDL(c *check.C) {
c.Assert(err, check.IsNil)

c.Log("get ddl: ", string(pbBinog.GetDdlQuery()))
expected := fmt.Sprintf("use %s; %s;", t.Schema, string(t.TiBinlog.GetDdlQuery()))
expected := fmt.Sprintf("use `%s`; %s;", t.Schema, string(t.TiBinlog.GetDdlQuery()))
c.Assert(pbBinog, check.DeepEquals, &pb.Binlog{
Tp: pb.BinlogType_DDL,
CommitTs: t.TiBinlog.GetCommitTs(),
Expand Down
12 changes: 9 additions & 3 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package drainer

import (
"fmt"
"math"
"net"
"net/url"
"os"
Expand All @@ -33,7 +34,12 @@ import (
)

const (
maxMsgSize = 1024 * 1024 * 1024
maxKafkaMsgSize = 1024 * 1024 * 1024
maxGrpcMsgSize = math.MaxInt32
)

var (
maxMsgSize = maxGrpcMsgSize
)

// taskGroup is a wrapper of `sync.WaitGroup`.
Expand Down Expand Up @@ -125,9 +131,9 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) {
}

func initializeSaramaGlobalConfig() {
sarama.MaxResponseSize = int32(maxMsgSize)
sarama.MaxResponseSize = int32(maxKafkaMsgSize)
// add 1 to avoid confused log: Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored
sarama.MaxRequestSize = int32(maxMsgSize) + 1
sarama.MaxRequestSize = int32(maxKafkaMsgSize) + 1
}

func getDDLJob(tiStore kv.Storage, id int64) (*model.Job, error) {
Expand Down
5 changes: 3 additions & 2 deletions pump/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"crypto/tls"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
Expand All @@ -35,7 +36,7 @@ const (
defaultEtcdDialTimeout = 5 * time.Second
defaultEtcdURLs = "http://127.0.0.1:2379"
defaultListenAddr = "127.0.0.1:8250"
defautMaxKafkaSize = 1024 * 1024 * 1024
defautMaxMsgSize = math.MaxInt32 // max grpc message size
defaultHeartbeatInterval = 2
defaultGC = 7
defaultDataDir = "data.pump"
Expand Down Expand Up @@ -110,7 +111,7 @@ func NewConfig() *Config {

// global config
fs.BoolVar(&GlobalConfig.enableDebug, "enable-debug", false, "enable print debug log")
fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defautMaxKafkaSize, "max msg size producer produce into kafka")
fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defautMaxMsgSize, "max message size tidb produce into pump")
fs.Int64Var(new(int64), "binlog-file-size", 0, "DEPRECATED")
fs.BoolVar(new(bool), "enable-binlog-slice", false, "DEPRECATED")
fs.IntVar(new(int), "binlog-slice-size", 0, "DEPRECATED")
Expand Down
2 changes: 1 addition & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func init() {
// it must be set before any real grpc operation.
grpc.EnableTracing = false
GlobalConfig = &globalConfig{
maxMsgSize: defautMaxKafkaSize,
maxMsgSize: defautMaxMsgSize,
}
}

Expand Down
17 changes: 17 additions & 0 deletions tests/partition/drainer.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
data-dir = '/tmp/tidb_binlog_test/data.drainer'

[syncer]
txn-batch = 1
worker-count = 1
safe-mode = false
db-type = 'mysql'
replicate-do-db = ['partition_test']

[syncer.to]
host = '127.0.0.1'
user = 'root'
password = ''
port = 3306

[syncer.to.checkpoint]
schema = "partition_test_checkpoint"
38 changes: 38 additions & 0 deletions tests/partition/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/sh

set -e

cd "$(dirname "$0")"

run_drainer &

sleep 3

run_sql 'create database partition_test;'
# range partition
run_sql 'create table partition_test.t1( a int ,b varchar(128) ) partition by range (a) (partition p0 values less than (3),partition p1 values less than (7));'
run_sql "insert into partition_test.t1 (a,b) values(1,'a'),(2,'b'),(3,'c'),(4,'d');"
run_sql "alter table partition_test.t1 add partition (partition p2 values less than (10));"
run_sql "insert into partition_test.t1 (a,b) values(5,'e'),(6,'f'),(7,'g'),(8,'h');"

# hash partition
run_sql 'create table partition_test.t2(a int ,b varchar(128) ) partition by hash (a) partitions 2;'
run_sql "insert into partition_test.t2 (a,b) values(1,'a'),(2,'b'),(3,'c'),(4,'d');"
run_sql 'truncate table partition_test.t2;'
run_sql "insert into partition_test.t2 (a,b) values(5,'e'),(6,'f'),(7,'g'),(8,'h');"

sleep 3

down_run_sql 'SELECT a, b FROM partition_test.t1 partition(p2) order by a'
check_contains 'a: 7'
check_contains 'b: g'
check_contains 'a: 8'
check_contains 'b: h'

down_run_sql 'select a, b from partition_test.t2 order by a limit 1'
check_contains 'a: 5'
check_contains 'b: e'

run_sql 'DROP database partition_test'

killall drainer
2 changes: 1 addition & 1 deletion tests/reparo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ batch = 10
host = "127.0.0.1"
user = "root"
password = ""
name = "reparo_test"
name = "reparo-test"
port = 4000
6 changes: 4 additions & 2 deletions tests/reparo/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ run_drainer "$args" &

GO111MODULE=on go build -o out

run_sql "CREATE DATABASE IF NOT EXISTS \`reparo_test\`"
sleep 5

run_sql "CREATE DATABASE IF NOT EXISTS \`reparo-test\`"

./out -config ./config.toml > ${OUT_DIR-/tmp}/$TEST_NAME.out 2>&1

Expand All @@ -27,6 +29,6 @@ sleep 15
check_data ./sync_diff_inspector.toml

# clean up
run_sql "DROP DATABASE IF EXISTS \`reparo_test\`"
run_sql "DROP DATABASE IF EXISTS \`reparo-test\`"

killall drainer
2 changes: 1 addition & 1 deletion tests/reparo/sync_diff_inspector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tidb-instance-id = "source-1"
use-checksum=true

[[check-tables]]
schema = "reparo_test"
schema = "reparo-test"
tables = ["~.*"]

[target-db]
Expand Down

0 comments on commit f40177c

Please sign in to comment.