From b8a862dc82ec5e0c54d91d5f7587c6d7c3a6713a Mon Sep 17 00:00:00 2001 From: Ming Luo Date: Mon, 27 Apr 2020 10:59:35 -0400 Subject: [PATCH] allow empty payload for nonbatch message print out error message from MessageReceived --- pulsar/internal/commands.go | 2 +- pulsar/internal/connection.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index 8798443bda..d9f2a1f783 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -114,7 +114,7 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) { } func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) { - if r.buffer.ReadableBytes() == 0 { + if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 { return nil, nil, ErrEOM } if !r.batched { diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index dace305620..d02dedad6b 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -527,7 +527,7 @@ func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) if consumer, ok := c.consumerHandler(consumerID); ok { err := consumer.MessageReceived(response, payload) if err != nil { - c.log.WithField("consumerID", consumerID).Error("handle message err: ", response.MessageId) + c.log.WithField("consumerID", consumerID).WithError(err).Error("handle message Id: ", response.MessageId) } } else { c.log.WithField("consumerID", consumerID).Warn("Got unexpected message: ", response.MessageId)