Skip to content

Commit

Permalink
fix producer block (#326)
Browse files Browse the repository at this point in the history
Co-authored-by: 灰柯 <huike@tuya.com>
  • Loading branch information
LvBay and 灰柯 authored Jul 24, 2020
1 parent 65cb19a commit c75aa62
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 3 deletions.
4 changes: 3 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
dlq: dlq,
log: log.WithField("topic", options.topic),
}
pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription)
pc.log = pc.log.WithField("name", pc.name).
WithField("subscription", options.subscription).
WithField("consumerID", pc.consumerID)
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)

err := pc.grabConn()
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer)

if consumer, ok := c.consumerHandler(consumerID); ok {
consumer.ConnectionClosed()
delete(c.listeners, consumerID)
c.DeleteConsumeHandler(consumerID)
} else {
c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer")
}
Expand Down
3 changes: 2 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
return nil, err
}

p.log = p.log.WithField("producer_name", p.producerName)
p.log = p.log.WithField("producer_name", p.producerName).
WithField("producerID", p.producerID)
p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
atomic.StoreInt32(&p.state, producerReady)

Expand Down

0 comments on commit c75aa62

Please sign in to comment.