diff --git a/drainer/translator/flash.go b/drainer/translator/flash.go index 91e4668ca..6179fa60a 100644 --- a/drainer/translator/flash.go +++ b/drainer/translator/flash.go @@ -16,7 +16,6 @@ import ( "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" ) // flashTranslator translates TiDB binlog to flash sqls @@ -42,7 +41,6 @@ func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r version := makeInternalVersionValue(uint64(commitTS)) delFlag := makeInternalDelmarkValue(false) - colsTypeMap := toFlashColumnTypeMap(columns) columnList := genColumnList(columns) // addition 2 holder is for del flag and version columnPlaceholders := dml.GenColumnPlaceholders(len(columns) + 2) @@ -50,34 +48,16 @@ func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r for _, row := range rows { //decode the pk value - remain, pk, err := codec.DecodeOne(row) - hashKey := pk.GetInt64() + pk, columnValues, err := insertRowToDatums(table, row) if err != nil { return nil, nil, nil, errors.Trace(err) } - columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, gotime.Local) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - - if columnValues == nil { - columnValues = make(map[int64]types.Datum) - } + hashKey := pk.GetInt64() var vals []interface{} vals = append(vals, hashKey) for _, col := range columns { - if IsPKHandleColumn(table, col) { - columnValues[col.ID] = pk - pkVal, err := formatFlashData(&pk, &col.FieldType) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - vals = append(vals, pkVal) - continue - } - val, ok := columnValues[col.ID] if !ok { vals = append(vals, col.DefaultValue) diff --git a/drainer/translator/kafka.go b/drainer/translator/kafka.go index e1e8a1e41..9a4d425c5 100644 --- a/drainer/translator/kafka.go +++ b/drainer/translator/kafka.go @@ -14,7 +14,6 @@ import ( "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" ) // kafkaTranslator translates TiDB binlog to self-description protobuf @@ -129,38 +128,12 @@ func (p *kafkaTranslator) GenDDLSQL(sql string, schema string, commitTS int64) ( } func insertRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, err error) { + _, columnValues, err := insertRowToDatums(tableInfo, raw) columns := tableInfo.Columns - remain, pk, err := codec.DecodeOne(raw) - if err != nil { - log.Error(err) - err = errors.Trace(err) - return - } - - log.Debugf("decode pk: %+v", pk) - - colsTypeMap := util.ToColumnTypeMap(tableInfo.Columns) - columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, time.Local) - if err != nil { - log.Error(err) - err = errors.Trace(err) - return - } - - // log.Debugf("decodeRow: %+v\n", columnValues) - // maybe only the pk column value - if columnValues == nil { - columnValues = make(map[int64]types.Datum) - } - row = new(obinlog.Row) for _, col := range columns { - if IsPKHandleColumn(tableInfo, col) { - columnValues[col.ID] = pk - } - var column *obinlog.Column val, ok := columnValues[col.ID] if ok { @@ -175,10 +148,6 @@ func insertRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e row.Columns = append(row.Columns, column) } - if len(columnValues) == 0 { - panic(errors.New("columnValues is nil")) - } - return } diff --git a/drainer/translator/mysql.go b/drainer/translator/mysql.go index 9eb9b2515..bd5895b89 100644 --- a/drainer/translator/mysql.go +++ b/drainer/translator/mysql.go @@ -48,7 +48,6 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r keys := make([][]string, 0, len(rows)) values := make([][]interface{}, 0, len(rows)) - colsTypeMap := util.ToColumnTypeMap(columns) columnList := m.genColumnList(columns) columnPlaceholders := dml.GenColumnPlaceholders((len(columns))) @@ -59,29 +58,13 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r sql := fmt.Sprintf("%s into `%s`.`%s` (%s) values (%s);", insertStr, schema, table.Name, columnList, columnPlaceholders) for _, row := range rows { - //decode the pk value - remain, pk, err := codec.DecodeOne(row) + _, columnValues, err := insertRowToDatums(table, row) if err != nil { return nil, nil, nil, errors.Trace(err) } - columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, time.Local) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - - if columnValues == nil { - columnValues = make(map[int64]types.Datum) - } - var vals []interface{} for _, col := range columns { - if IsPKHandleColumn(table, col) { - columnValues[col.ID] = pk - vals = append(vals, pk.GetValue()) - continue - } - val, ok := columnValues[col.ID] if !ok { vals = append(vals, col.DefaultValue) @@ -95,10 +78,6 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r } } - if len(columnValues) == 0 { - panic(errors.New("columnValues is nil")) - } - sqls = append(sqls, sql) values = append(values, vals) // generate dispatching key diff --git a/drainer/translator/pb.go b/drainer/translator/pb.go index b2f1a0d2e..3076d745c 100644 --- a/drainer/translator/pb.go +++ b/drainer/translator/pb.go @@ -35,24 +35,13 @@ func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows sqls := make([]string, 0, len(rows)) keys := make([][]string, 0, len(rows)) values := make([][]interface{}, 0, len(rows)) - colsTypeMap := util.ToColumnTypeMap(columns) for _, row := range rows { - //decode the pk value - remain, pk, err := codec.DecodeOne(row) - if err != nil { - return nil, nil, nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name) - } - - columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, time.Local) + _, columnValues, err := insertRowToDatums(table, row) if err != nil { return nil, nil, nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name) } - if columnValues == nil { - columnValues = make(map[int64]types.Datum) - } - var ( vals = make([]types.Datum, 0, len(columns)) cols = make([]string, 0, len(columns)) @@ -60,10 +49,6 @@ func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows mysqlTypes = make([]string, 0, len(columns)) ) for _, col := range columns { - if IsPKHandleColumn(table, col) { - columnValues[col.ID] = pk - } - cols = append(cols, col.Name.O) tps = append(tps, col.Tp) mysqlTypes = append(mysqlTypes, types.TypeToStr(col.Tp, col.Charset)) @@ -83,10 +68,6 @@ func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows } } - if len(columnValues) == 0 { - panic(errors.New("columnValues is nil")) - } - rowData, err := encodeRow(vals, cols, tps, mysqlTypes) if err != nil { return nil, nil, nil, errors.Trace(err) diff --git a/drainer/translator/translator.go b/drainer/translator/translator.go index 4e289f886..425347a63 100644 --- a/drainer/translator/translator.go +++ b/drainer/translator/translator.go @@ -1,9 +1,15 @@ package translator import ( + "time" + "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb-binlog/pkg/util" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" ) // OpType represents type of the operation @@ -69,3 +75,31 @@ func New(providerName string) (SQLTranslator, error) { return translator, nil } + +func insertRowToDatums(table *model.TableInfo, row []byte) (pk types.Datum, datums map[int64]types.Datum, err error) { + colsTypeMap := util.ToColumnTypeMap(table.Columns) + + // decode the pk value + var remain []byte + remain, pk, err = codec.DecodeOne(row) + if err != nil { + return types.Datum{}, nil, errors.Trace(err) + } + + datums, err = tablecodec.DecodeRow(remain, colsTypeMap, time.Local) + if err != nil { + return types.Datum{}, nil, errors.Trace(err) + } + + if datums == nil { + datums = make(map[int64]types.Datum) + } + + for _, col := range table.Columns { + if IsPKHandleColumn(table, col) { + datums[col.ID] = pk + } + } + + return +}