From dcc904712318c1a2d2a567ee99a7bfeb742809ca Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 6 May 2020 14:53:53 +0800 Subject: [PATCH 1/4] .*: update max message size for grpc (#960) tidb support big transaction, and the binlog's size may bigger than 1G, so need to update the grpc's max message size. --- drainer/config.go | 2 ++ drainer/config_test.go | 2 ++ drainer/util.go | 12 +++++++++--- pump/config.go | 5 +++-- pump/server.go | 2 +- 5 files changed, 17 insertions(+), 6 deletions(-) diff --git a/drainer/config.go b/drainer/config.go index 11891cc8f..17e9e68b5 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -434,6 +434,8 @@ func (cfg *Config) adjustConfig() error { } if cfg.SyncerCfg.DestDBType == "kafka" { + maxMsgSize = maxKafkaMsgSize + // get KafkaAddrs from zookeeper if ZkAddrs is setted if cfg.SyncerCfg.To.ZKAddrs != "" { zkClient, err := newZKFromConnectionString(cfg.SyncerCfg.To.ZKAddrs, time.Second*5, time.Second*60) diff --git a/drainer/config_test.go b/drainer/config_test.go index f0596250f..4c65cb27b 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -206,6 +206,7 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) { c.Assert(err, IsNil) c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file") c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1) + c.Assert(maxMsgSize, Equals, maxGrpcMsgSize) cfg = NewConfig() err = cfg.adjustConfig() @@ -334,6 +335,7 @@ func (t *testKafkaSuite) TestConfigDestDBTypeKafka(c *C) { c.Assert(cfg.SyncerCfg.To.KafkaAddrs, Matches, defaultKafkaAddrs) c.Assert(cfg.SyncerCfg.To.KafkaVersion, Equals, defaultKafkaVersion) c.Assert(cfg.SyncerCfg.To.KafkaMaxMessages, Equals, 1024) + c.Assert(maxMsgSize, Equals, maxKafkaMsgSize) // With Zookeeper address cfg = NewConfig() diff --git a/drainer/util.go b/drainer/util.go index 9543cdf3c..f595ec6df 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -15,6 +15,7 @@ package drainer import ( "fmt" + "math" "net" "net/url" "os" @@ -33,7 +34,12 @@ import ( ) const ( - maxMsgSize = 1024 * 1024 * 1024 + maxKafkaMsgSize = 1024 * 1024 * 1024 + maxGrpcMsgSize = math.MaxInt32 +) + +var ( + maxMsgSize = maxGrpcMsgSize ) // taskGroup is a wrapper of `sync.WaitGroup`. @@ -125,9 +131,9 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) { } func initializeSaramaGlobalConfig() { - sarama.MaxResponseSize = int32(maxMsgSize) + sarama.MaxResponseSize = int32(maxKafkaMsgSize) // add 1 to avoid confused log: Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored - sarama.MaxRequestSize = int32(maxMsgSize) + 1 + sarama.MaxRequestSize = int32(maxKafkaMsgSize) + 1 } func getDDLJob(tiStore kv.Storage, id int64) (*model.Job, error) { diff --git a/pump/config.go b/pump/config.go index 9f8945b58..4838a4976 100644 --- a/pump/config.go +++ b/pump/config.go @@ -17,6 +17,7 @@ import ( "crypto/tls" "flag" "fmt" + "math" "net" "net/url" "os" @@ -35,7 +36,7 @@ const ( defaultEtcdDialTimeout = 5 * time.Second defaultEtcdURLs = "http://127.0.0.1:2379" defaultListenAddr = "127.0.0.1:8250" - defautMaxKafkaSize = 1024 * 1024 * 1024 + defautMaxMsgSize = math.MaxInt32 // max grpc message size defaultHeartbeatInterval = 2 defaultGC = 7 defaultDataDir = "data.pump" @@ -110,7 +111,7 @@ func NewConfig() *Config { // global config fs.BoolVar(&GlobalConfig.enableDebug, "enable-debug", false, "enable print debug log") - fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defautMaxKafkaSize, "max msg size producer produce into kafka") + fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defautMaxMsgSize, "max message size tidb produce into pump") fs.Int64Var(new(int64), "binlog-file-size", 0, "DEPRECATED") fs.BoolVar(new(bool), "enable-binlog-slice", false, "DEPRECATED") fs.IntVar(new(int), "binlog-slice-size", 0, "DEPRECATED") diff --git a/pump/server.go b/pump/server.go index e074211ad..2e6d5034b 100644 --- a/pump/server.go +++ b/pump/server.go @@ -102,7 +102,7 @@ func init() { // it must be set before any real grpc operation. grpc.EnableTracing = false GlobalConfig = &globalConfig{ - maxMsgSize: defautMaxKafkaSize, + maxMsgSize: defautMaxMsgSize, } } From 12b6ed50d5a762dcfbda304a37a2f4310c27381c Mon Sep 17 00:00:00 2001 From: july2993 Date: Tue, 12 May 2020 13:42:35 +0800 Subject: [PATCH 2/4] Fix unknown type for mediumint (#962) --- drainer/translator/kafka.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/drainer/translator/kafka.go b/drainer/translator/kafka.go index 3127b43bb..ec083ed04 100644 --- a/drainer/translator/kafka.go +++ b/drainer/translator/kafka.go @@ -229,7 +229,8 @@ func DatumToColumn(colInfo *model.ColumnInfo, datum types.Datum) (col *obinlog.C col.StringValue = proto.String(str) // numeric type - case "int", "bigint", "smallint", "tinyint": + // https://dev.mysql.com/doc/refman/8.0/en/integer-types.html + case "int", "bigint", "smallint", "tinyint", "mediumint": str := fmt.Sprintf("%v", datum.GetValue()) if mysql.HasUnsignedFlag(colInfo.Flag) { val, err := strconv.ParseUint(str, 10, 64) From a75036cf8933a581cac42c1007bf92c9e5417b90 Mon Sep 17 00:00:00 2001 From: july2993 Date: Tue, 12 May 2020 14:08:05 +0800 Subject: [PATCH 3/4] Fix not quote db name in file data ddl (#961) --- drainer/translator/pb.go | 11 ++++++++++- drainer/translator/pb_test.go | 2 +- tests/reparo/config.toml | 2 +- tests/reparo/run.sh | 6 ++++-- tests/reparo/sync_diff_inspector.toml | 2 +- 5 files changed, 17 insertions(+), 6 deletions(-) diff --git a/drainer/translator/pb.go b/drainer/translator/pb.go index 83921a38f..6a95af189 100644 --- a/drainer/translator/pb.go +++ b/drainer/translator/pb.go @@ -16,6 +16,7 @@ package translator import ( "fmt" "io" + "strings" "time" "github.com/golang/protobuf/proto" @@ -48,7 +49,7 @@ func TiBinlogToPbBinlog(infoGetter TableInfoGetter, schema string, table string, if isCreateDatabase { sql += ";" } else { - sql = fmt.Sprintf("use %s; %s;", schema, sql) + sql = fmt.Sprintf("use %s; %s;", quoteName(schema), sql) } pbBinlog.Tp = pb.BinlogType_DDL @@ -303,3 +304,11 @@ func packEvent(schemaName, tableName string, tp pb.EventType, rowData [][]byte) return event } + +func escapeName(name string) string { + return strings.Replace(name, "`", "``", -1) +} + +func quoteName(name string) string { + return "`" + escapeName(name) + "`" +} diff --git a/drainer/translator/pb_test.go b/drainer/translator/pb_test.go index 7b4cda3a5..ac4d1823d 100644 --- a/drainer/translator/pb_test.go +++ b/drainer/translator/pb_test.go @@ -36,7 +36,7 @@ func (t *testPbSuite) TestDDL(c *check.C) { c.Assert(err, check.IsNil) c.Log("get ddl: ", string(pbBinog.GetDdlQuery())) - expected := fmt.Sprintf("use %s; %s;", t.Schema, string(t.TiBinlog.GetDdlQuery())) + expected := fmt.Sprintf("use `%s`; %s;", t.Schema, string(t.TiBinlog.GetDdlQuery())) c.Assert(pbBinog, check.DeepEquals, &pb.Binlog{ Tp: pb.BinlogType_DDL, CommitTs: t.TiBinlog.GetCommitTs(), diff --git a/tests/reparo/config.toml b/tests/reparo/config.toml index 5e7f551b4..41d1ee0c9 100644 --- a/tests/reparo/config.toml +++ b/tests/reparo/config.toml @@ -10,5 +10,5 @@ batch = 10 host = "127.0.0.1" user = "root" password = "" -name = "reparo_test" +name = "reparo-test" port = 4000 diff --git a/tests/reparo/run.sh b/tests/reparo/run.sh index eb4a00e31..1557e75c1 100755 --- a/tests/reparo/run.sh +++ b/tests/reparo/run.sh @@ -14,7 +14,9 @@ run_drainer "$args" & GO111MODULE=on go build -o out -run_sql "CREATE DATABASE IF NOT EXISTS \`reparo_test\`" +sleep 5 + +run_sql "CREATE DATABASE IF NOT EXISTS \`reparo-test\`" ./out -config ./config.toml > ${OUT_DIR-/tmp}/$TEST_NAME.out 2>&1 @@ -27,6 +29,6 @@ sleep 15 check_data ./sync_diff_inspector.toml # clean up -run_sql "DROP DATABASE IF EXISTS \`reparo_test\`" +run_sql "DROP DATABASE IF EXISTS \`reparo-test\`" killall drainer diff --git a/tests/reparo/sync_diff_inspector.toml b/tests/reparo/sync_diff_inspector.toml index 6723f9056..c69b03335 100644 --- a/tests/reparo/sync_diff_inspector.toml +++ b/tests/reparo/sync_diff_inspector.toml @@ -22,7 +22,7 @@ tidb-instance-id = "source-1" use-checksum=true [[check-tables]] -schema = "reparo_test" +schema = "reparo-test" tables = ["~.*"] [target-db] From 57bb59848cb0fcf8230d3ed0345fa9a9915cf3c5 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Tue, 12 May 2020 15:17:51 +0800 Subject: [PATCH 4/4] test: add test cases for range/hash partition (#965) * test: add test cases for range/hash partition --- tests/partition/drainer.toml | 17 ++++++++++++++++ tests/partition/run.sh | 38 ++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 tests/partition/drainer.toml create mode 100755 tests/partition/run.sh diff --git a/tests/partition/drainer.toml b/tests/partition/drainer.toml new file mode 100644 index 000000000..3b6f59eb4 --- /dev/null +++ b/tests/partition/drainer.toml @@ -0,0 +1,17 @@ +data-dir = '/tmp/tidb_binlog_test/data.drainer' + +[syncer] +txn-batch = 1 +worker-count = 1 +safe-mode = false +db-type = 'mysql' +replicate-do-db = ['partition_test'] + +[syncer.to] +host = '127.0.0.1' +user = 'root' +password = '' +port = 3306 + +[syncer.to.checkpoint] +schema = "partition_test_checkpoint" diff --git a/tests/partition/run.sh b/tests/partition/run.sh new file mode 100755 index 000000000..8924dccdb --- /dev/null +++ b/tests/partition/run.sh @@ -0,0 +1,38 @@ +#!/bin/sh + +set -e + +cd "$(dirname "$0")" + +run_drainer & + +sleep 3 + +run_sql 'create database partition_test;' +# range partition +run_sql 'create table partition_test.t1( a int ,b varchar(128) ) partition by range (a) (partition p0 values less than (3),partition p1 values less than (7));' +run_sql "insert into partition_test.t1 (a,b) values(1,'a'),(2,'b'),(3,'c'),(4,'d');" +run_sql "alter table partition_test.t1 add partition (partition p2 values less than (10));" +run_sql "insert into partition_test.t1 (a,b) values(5,'e'),(6,'f'),(7,'g'),(8,'h');" + +# hash partition +run_sql 'create table partition_test.t2(a int ,b varchar(128) ) partition by hash (a) partitions 2;' +run_sql "insert into partition_test.t2 (a,b) values(1,'a'),(2,'b'),(3,'c'),(4,'d');" +run_sql 'truncate table partition_test.t2;' +run_sql "insert into partition_test.t2 (a,b) values(5,'e'),(6,'f'),(7,'g'),(8,'h');" + +sleep 3 + +down_run_sql 'SELECT a, b FROM partition_test.t1 partition(p2) order by a' +check_contains 'a: 7' +check_contains 'b: g' +check_contains 'a: 8' +check_contains 'b: h' + +down_run_sql 'select a, b from partition_test.t2 order by a limit 1' +check_contains 'a: 5' +check_contains 'b: e' + +run_sql 'DROP database partition_test' + +killall drainer