Skip to content

Commit

Permalink
translator/: Refactor code to decode insert data
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed Feb 24, 2019
1 parent 1a2a892 commit 4de3fff
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 96 deletions.
24 changes: 2 additions & 22 deletions drainer/translator/flash.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
)

// flashTranslator translates TiDB binlog to flash sqls
Expand All @@ -42,42 +41,23 @@ func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r
version := makeInternalVersionValue(uint64(commitTS))
delFlag := makeInternalDelmarkValue(false)

colsTypeMap := toFlashColumnTypeMap(columns)
columnList := genColumnList(columns)
// addition 2 holder is for del flag and version
columnPlaceholders := dml.GenColumnPlaceholders(len(columns) + 2)
sql := fmt.Sprintf("IMPORT INTO `%s`.`%s` (%s) values (%s);", schema, table.Name.L, columnList, columnPlaceholders)

for _, row := range rows {
//decode the pk value
remain, pk, err := codec.DecodeOne(row)
hashKey := pk.GetInt64()
pk, columnValues, err := insertRowToDatums(table, row)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, gotime.Local)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

if columnValues == nil {
columnValues = make(map[int64]types.Datum)
}
hashKey := pk.GetInt64()

var vals []interface{}
vals = append(vals, hashKey)
for _, col := range columns {
if IsPKHandleColumn(table, col) {
columnValues[col.ID] = pk
pkVal, err := formatFlashData(&pk, &col.FieldType)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
vals = append(vals, pkVal)
continue
}

val, ok := columnValues[col.ID]
if !ok {
vals = append(vals, col.DefaultValue)
Expand Down
33 changes: 1 addition & 32 deletions drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
)

// kafkaTranslator translates TiDB binlog to self-description protobuf
Expand Down Expand Up @@ -129,38 +128,12 @@ func (p *kafkaTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (
}

func insertRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, err error) {
_, columnValues, err := insertRowToDatums(tableInfo, raw)
columns := tableInfo.Columns

remain, pk, err := codec.DecodeOne(raw)
if err != nil {
log.Error(err)
err = errors.Trace(err)
return
}

log.Debugf("decode pk: %+v", pk)

colsTypeMap := util.ToColumnTypeMap(tableInfo.Columns)
columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, time.Local)
if err != nil {
log.Error(err)
err = errors.Trace(err)
return
}

// log.Debugf("decodeRow: %+v\n", columnValues)
// maybe only the pk column value
if columnValues == nil {
columnValues = make(map[int64]types.Datum)
}

row = new(obinlog.Row)

for _, col := range columns {
if IsPKHandleColumn(tableInfo, col) {
columnValues[col.ID] = pk
}

var column *obinlog.Column
val, ok := columnValues[col.ID]
if ok {
Expand All @@ -175,10 +148,6 @@ func insertRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e
row.Columns = append(row.Columns, column)
}

if len(columnValues) == 0 {
panic(errors.New("columnValues is nil"))
}

return
}

Expand Down
23 changes: 1 addition & 22 deletions drainer/translator/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r
keys := make([][]string, 0, len(rows))
values := make([][]interface{}, 0, len(rows))

colsTypeMap := util.ToColumnTypeMap(columns)
columnList := m.genColumnList(columns)
columnPlaceholders := dml.GenColumnPlaceholders((len(columns)))

Expand All @@ -59,29 +58,13 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r
sql := fmt.Sprintf("%s into `%s`.`%s` (%s) values (%s);", insertStr, schema, table.Name, columnList, columnPlaceholders)

for _, row := range rows {
//decode the pk value
remain, pk, err := codec.DecodeOne(row)
_, columnValues, err := insertRowToDatums(table, row)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, time.Local)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

if columnValues == nil {
columnValues = make(map[int64]types.Datum)
}

var vals []interface{}
for _, col := range columns {
if IsPKHandleColumn(table, col) {
columnValues[col.ID] = pk
vals = append(vals, pk.GetValue())
continue
}

val, ok := columnValues[col.ID]
if !ok {
vals = append(vals, col.DefaultValue)
Expand All @@ -95,10 +78,6 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r
}
}

if len(columnValues) == 0 {
panic(errors.New("columnValues is nil"))
}

sqls = append(sqls, sql)
values = append(values, vals)
// generate dispatching key
Expand Down
21 changes: 1 addition & 20 deletions drainer/translator/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,20 @@ func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows
sqls := make([]string, 0, len(rows))
keys := make([][]string, 0, len(rows))
values := make([][]interface{}, 0, len(rows))
colsTypeMap := util.ToColumnTypeMap(columns)

for _, row := range rows {
//decode the pk value
remain, pk, err := codec.DecodeOne(row)
if err != nil {
return nil, nil, nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name)
}

columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, time.Local)
_, columnValues, err := insertRowToDatums(table, row)
if err != nil {
return nil, nil, nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name)
}

if columnValues == nil {
columnValues = make(map[int64]types.Datum)
}

var (
vals = make([]types.Datum, 0, len(columns))
cols = make([]string, 0, len(columns))
tps = make([]byte, 0, len(columns))
mysqlTypes = make([]string, 0, len(columns))
)
for _, col := range columns {
if IsPKHandleColumn(table, col) {
columnValues[col.ID] = pk
}

cols = append(cols, col.Name.O)
tps = append(tps, col.Tp)
mysqlTypes = append(mysqlTypes, types.TypeToStr(col.Tp, col.Charset))
Expand All @@ -83,10 +68,6 @@ func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows
}
}

if len(columnValues) == 0 {
panic(errors.New("columnValues is nil"))
}

rowData, err := encodeRow(vals, cols, tps, mysqlTypes)
if err != nil {
return nil, nil, nil, errors.Trace(err)
Expand Down
34 changes: 34 additions & 0 deletions drainer/translator/translator.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package translator

import (
"time"

"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
)

// OpType represents type of the operation
Expand Down Expand Up @@ -69,3 +75,31 @@ func New(providerName string) (SQLTranslator, error) {

return translator, nil
}

func insertRowToDatums(table *model.TableInfo, row []byte) (pk types.Datum, datums map[int64]types.Datum, err error) {
colsTypeMap := util.ToColumnTypeMap(table.Columns)

// decode the pk value
var remain []byte
remain, pk, err = codec.DecodeOne(row)
if err != nil {
return types.Datum{}, nil, errors.Trace(err)
}

datums, err = tablecodec.DecodeRow(remain, colsTypeMap, time.Local)
if err != nil {
return types.Datum{}, nil, errors.Trace(err)
}

if datums == nil {
datums = make(map[int64]types.Datum)
}

for _, col := range table.Columns {
if IsPKHandleColumn(table, col) {
datums[col.ID] = pk
}
}

return
}

0 comments on commit 4de3fff

Please sign in to comment.