Skip to content

Commit

Permalink
Fix the error that fail to seek the msg stream for kafka
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Nov 23, 2023
1 parent 9ca2777 commit 79f3ff5
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func (r *replicateChannelHandler) handlePack(pack *msgstream.MsgPack) *msgstream
}
if err != nil {
r.sendErrEvent(err)
log.Warn("fail to get partition info", zap.Any("msg", msg), zap.Error(err))
log.Warn("fail to get partition info", zap.Any("msg", msg.Type()), zap.Error(err))
return nil
}
if pChannel != info.PChannel {
Expand Down Expand Up @@ -721,7 +721,7 @@ func newReplicateChannelHandler(ctx context.Context,
return nil, err
}
subPositionType := mqwrapper.SubscriptionPositionUnknown
if sourceInfo.SeekPosition != nil {
if sourceInfo.SeekPosition == nil {
subPositionType = mqwrapper.SubscriptionPositionLatest
}
err = stream.AsConsumer(ctx, []string{sourceInfo.PChannelName}, sourceInfo.PChannelName+strconv.Itoa(rand.Int()), subPositionType)
Expand Down

0 comments on commit 79f3ff5

Please sign in to comment.