diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 90894625846..791c175efe4 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -391,12 +391,8 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage { if err != nil { log.Panic("Error when serializing Canal packet", zap.Error(err)) } -<<<<<<< HEAD ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil) -======= - ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil) ret.SetRowsCount(rowCount) ->>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192)) d.messages.Reset() d.resetPacket() return []*MQMessage{ret} diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index 52587fb0214..a6c709a90bd 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -218,13 +218,9 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage { log.Panic("CanalFlatEventBatchEncoder", zap.Error(err)) return nil } -<<<<<<< HEAD - ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table) -======= - m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable()) + m := NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table) m.IncRowsCount() ret[i] = m ->>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192)) } c.resolvedBuf = c.resolvedBuf[0:0] return ret diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index c59d48676db..0951a528aea 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -277,12 +277,8 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage { return nil } -<<<<<<< HEAD ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil) -======= - ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil) ret.SetRowsCount(d.batchSize) ->>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192)) d.Reset() return []*MQMessage{ret} }