diff --git a/go.mod b/go.mod index 653421b5b..4222e42d7 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,7 @@ require ( github.com/pingcap/parser v0.0.0-20181210061630-27e9d3e251d4 // indirect github.com/pingcap/pd v2.0.5+incompatible github.com/pingcap/tidb v2.1.0-beta.0.20180823032518-ef6590e1899a+incompatible - github.com/pingcap/tidb-tools v2.1.1-0.20181130053235-0206fdab9ef8+incompatible + github.com/pingcap/tidb-tools v2.1.3-0.20190215110732-23405d82dbe6+incompatible github.com/pingcap/tipb v0.0.0-20180711115030-4141907f6909 github.com/pkg/errors v0.8.0 github.com/pmezard/go-difflib v1.0.0 // indirect @@ -80,7 +80,7 @@ require ( github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165 github.com/samuel/go-zookeeper v0.0.0-20170815201139-e6b59f6144be github.com/siddontang/go v0.0.0-20161005110831-1e9ce2a5ac40 - github.com/sirupsen/logrus v0.0.0-20180830201151-78fa2915c1fa // indirect + github.com/sirupsen/logrus v0.0.0-20180830201151-78fa2915c1fa github.com/soheilhy/cmux v0.1.2 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 // indirect github.com/stretchr/testify v1.2.2 // indirect @@ -110,7 +110,7 @@ require ( gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect - gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.0.0-20170407172122-cd8b52f8269e // indirect ) diff --git a/pkg/loader/translate.go b/pkg/loader/translate.go index fb7df72cf..b2714db45 100644 --- a/pkg/loader/translate.go +++ b/pkg/loader/translate.go @@ -34,7 +34,7 @@ func SlaveBinlogToTxn(binlog *pb.Binlog) (txn *Txn) { dml.Values = make(map[string]interface{}) for i, col := range mut.Row.GetColumns() { name := table.ColumnInfo[i].Name - arg := columnToArg(col) + arg := columnToArg(table.ColumnInfo[i].GetMysqlType(), col) dml.Values[name] = arg } @@ -43,7 +43,7 @@ func SlaveBinlogToTxn(binlog *pb.Binlog) (txn *Txn) { dml.OldValues = make(map[string]interface{}) for i, col := range mut.ChangeRow.GetColumns() { name := table.ColumnInfo[i].Name - arg := columnToArg(col) + arg := columnToArg(table.ColumnInfo[i].GetMysqlType(), col) dml.OldValues[name] = arg } } @@ -53,7 +53,7 @@ func SlaveBinlogToTxn(binlog *pb.Binlog) (txn *Txn) { return } -func columnToArg(c *pb.Column) (arg interface{}) { +func columnToArg(mysqlType string, c *pb.Column) (arg interface{}) { if c.GetIsNull() { return nil } @@ -71,6 +71,13 @@ func columnToArg(c *pb.Column) (arg interface{}) { } if c.BytesValue != nil { + // https://github.com/go-sql-driver/mysql/issues/819 + // for downstream = mysql + // it work for tidb to use binary + if mysqlType == "json" { + var str string = string(c.GetBytesValue()) + return str + } return c.GetBytesValue() } diff --git a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/driver/reader/offset.go b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/driver/reader/offset.go index b8d7bb20a..0e3d4a19d 100644 --- a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/driver/reader/offset.go +++ b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/driver/reader/offset.go @@ -148,7 +148,7 @@ func (ks *KafkaSeeker) seekOffset(topic string, partition int32, start int64, en } if endTS <= ts { - return sarama.OffsetNewest, nil + return end + 1, nil } return end, nil