Skip to content

Commit

Permalink
fix conflicts.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jan 18, 2022
1 parent b5768dc commit 23adaf7
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 13 deletions.
4 changes: 0 additions & 4 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,8 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage {
if err != nil {
log.Panic("Error when serializing Canal packet", zap.Error(err))
}
<<<<<<< HEAD
ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
=======
ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(rowCount)
>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192))
d.messages.Reset()
d.resetPacket()
return []*MQMessage{ret}
Expand Down
6 changes: 1 addition & 5 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,9 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
return nil
}
<<<<<<< HEAD
ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table)
=======
m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable())
m := NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table)
m.IncRowsCount()
ret[i] = m
>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192))
}
c.resolvedBuf = c.resolvedBuf[0:0]
return ret
Expand Down
4 changes: 0 additions & 4 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,8 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage {
return nil
}

<<<<<<< HEAD
ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
=======
ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(d.batchSize)
>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192))
d.Reset()
return []*MQMessage{ret}
}
Expand Down

0 comments on commit 23adaf7

Please sign in to comment.