Skip to content

Commit

Permalink
change extension fields
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyangyu committed May 7, 2022
1 parent 3f9c424 commit ed9cbe7
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 197 deletions.
35 changes: 28 additions & 7 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -243,7 +244,12 @@ type avroSchemaTop struct {
Fields []map[string]interface{} `json:"fields"`
}

const tidbType = "tidbType"
const (
tidbType = "tidb_type"
tidbOp = "_tidb_op"
tidbCommitTs = "_tidb_commit_ts"
tidbPhysicalTime = "_tidb_physical_time"
)

var type2TiDBType = map[byte]string{
mysql.TypeTiny: "INT",
Expand Down Expand Up @@ -310,7 +316,12 @@ func rowToAvroSchema(
}

for i, col := range columnInfo {
avroType, err := columnToAvroSchema(col, colInfos[i].Ft, decimalHandlingMode, bigintUnsignedHandlingMode)
avroType, err := columnToAvroSchema(
col,
colInfos[i].Ft,
decimalHandlingMode,
bigintUnsignedHandlingMode,
)
if err != nil {
return "", err
}
Expand All @@ -329,11 +340,15 @@ func rowToAvroSchema(
if enableTiDBExtension {
top.Fields = append(top.Fields,
map[string]interface{}{
"name": "tidbOp",
"name": tidbOp,
"type": "string",
},
map[string]interface{}{
"name": "tidbCommitTs",
"name": tidbCommitTs,
"type": "long",
},
map[string]interface{}{
"name": tidbPhysicalTime,
"type": "long",
},
)
Expand Down Expand Up @@ -361,7 +376,12 @@ func rowToAvroData(
if col == nil {
continue
}
data, str, err := columnToAvroData(col, colInfos[i].Ft, decimalHandlingMode, bigintUnsignedHandlingMode)
data, str, err := columnToAvroData(
col,
colInfos[i].Ft,
decimalHandlingMode,
bigintUnsignedHandlingMode,
)
if err != nil {
return nil, err
}
Expand All @@ -375,8 +395,9 @@ func rowToAvroData(
}

if enableTiDBExtension {
ret["tidbOp"] = operation
ret["tidbCommitTs"] = int64(commitTs)
ret[tidbOp] = operation
ret[tidbCommitTs] = int64(commitTs)
ret[tidbPhysicalTime] = oracle.ExtractPhysical(commitTs)
}

return ret, nil
Expand Down
Loading

0 comments on commit ed9cbe7

Please sign in to comment.