Skip to content

Commit

Permalink
codec(ticdc): canal-json decouple get value from java type and refact…
Browse files Browse the repository at this point in the history
…or unit test (#10123)

close #10122
  • Loading branch information
3AceShowHand authored Nov 22, 2023
1 parent 68dc49c commit 5921050
Show file tree
Hide file tree
Showing 15 changed files with 922 additions and 877 deletions.
3 changes: 0 additions & 3 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,6 @@ func (ti *TableInfo) initColumnsFlag() {
if mysql.HasUnsignedFlag(colInfo.GetFlag()) {
flag.SetIsUnsigned()
}
if mysql.HasZerofillFlag(colInfo.GetFlag()) {
flag.SetZeroFill()
}
ti.ColumnsFlag[colInfo.ID] = flag
}

Expand Down
12 changes: 0 additions & 12 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,8 @@ const (
NullableFlag
// UnsignedFlag means the column stores an unsigned integer
UnsignedFlag
// ZerofillFlag means the column is zerofill
ZerofillFlag
)

// SetZeroFill sets ZerofillFlag
func (b *ColumnFlagType) SetZeroFill() {
(*util.Flag)(b).Add(util.Flag(ZerofillFlag))
}

// IsZerofill shows whether ZerofillFlag is set
func (b *ColumnFlagType) IsZerofill() bool {
return (*util.Flag)(b).HasAll(util.Flag(ZerofillFlag))
}

// SetIsBinary sets BinaryFlag
func (b *ColumnFlagType) SetIsBinary() {
(*util.Flag)(b).Add(util.Flag(BinaryFlag))
Expand Down
22 changes: 14 additions & 8 deletions cdc/sink/dmlsink/mq/mq_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
"time"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -60,8 +59,6 @@ func TestNewKafkaDMLSinkFailed(t *testing.T) {
}

func TestWriteEvents(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -84,12 +81,21 @@ func TestWriteEvents(t *testing.T) {
require.NotNil(t, s)
defer s.Close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

tableStatus := state.TableSinkSinking
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
}

events := make([]*dmlsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000)
Expand Down
139 changes: 88 additions & 51 deletions cdc/sink/dmlsink/mq/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"time"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -64,7 +63,13 @@ func newNonBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlpro
}

func TestNonBatchEncode_SendMessages(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -77,10 +82,11 @@ func TestNonBatchEncode_SendMessages(t *testing.T) {
Partition: 1,
}
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
}
tableStatus := state.TableSinkSinking

Expand Down Expand Up @@ -258,7 +264,13 @@ func TestBatchEncode_Group(t *testing.T) {
}

func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

key1 := TopicPartitionKey{
Topic: "test",
Expand All @@ -278,9 +290,11 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand Down Expand Up @@ -325,8 +339,6 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
}

func TestBatchEncode_SendMessages(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
Topic: "test",
Partition: 1,
Expand All @@ -345,14 +357,24 @@ func TestBatchEncode_SendMessages(t *testing.T) {
defer cancel()
worker, p := newBatchEncodeWorker(ctx, t)
defer worker.close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

events := []mqEvent{
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -362,10 +384,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -375,10 +398,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -388,10 +412,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "aa", Table: "bb"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -401,10 +426,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -414,10 +440,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand Down Expand Up @@ -475,8 +502,6 @@ func TestBatchEncodeWorker_Abort(t *testing.T) {
}

func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
Topic: "test",
Partition: 1,
Expand All @@ -491,14 +516,24 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
defer worker.close()
replicatingStatus := state.TableSinkSinking
stoppedStatus := state.TableSinkStopping

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

events := []mqEvent{
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -508,10 +543,11 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -521,10 +557,11 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &stoppedStatus,
Expand Down
Loading

0 comments on commit 5921050

Please sign in to comment.