Skip to content

Commit

Permalink
Removed unacked messages tracker (apache#90)
Browse files Browse the repository at this point in the history
* Removed unacked messages tracker

* Fixed tests
  • Loading branch information
merlimat authored Nov 12, 2019
1 parent 163fba0 commit 367984a
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 284 deletions.
9 changes: 0 additions & 9 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package pulsar

import (
"context"
"time"
)

// Pair of a Consumer and Message
Expand Down Expand Up @@ -82,11 +81,6 @@ type ConsumerOptions struct {
// This properties will be visible in the topic stats
Properties map[string]string

// Set the timeout for unacked messages
// Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
// Default is 0, which means message are not being replayed based on ack time
AckTimeout time.Duration

// Select the subscription type to be used when subscribing to the topic.
// Default is `Exclusive`
Type SubscriptionType
Expand Down Expand Up @@ -128,9 +122,6 @@ type ConsumerOptions struct {

// Consumer is an interface that abstracts behavior of Pulsar's consumer
type Consumer interface {
// Topic get the topic for the consumer
Topic() string

// Subscription get a subscription for the consumer
Subscription() string

Expand Down
9 changes: 1 addition & 8 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import (
var ErrConsumerClosed = errors.New("consumer closed")

type consumer struct {
topic string

options ConsumerOptions

consumers []*partitionConsumer
Expand Down Expand Up @@ -88,7 +86,6 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage) (Consumer, error) {
consumer := &consumer{
topic: topic,
messageCh: messageCh,
errorCh: make(chan error),
log: log.WithField("topic", topic),
Expand Down Expand Up @@ -165,10 +162,6 @@ func topicSubscribe(client *client, options ConsumerOptions, topic string,
return consumer, nil
}

func (c *consumer) Topic() string {
return c.topic
}

func (c *consumer) Subscription() string {
return c.options.SubscriptionName
}
Expand All @@ -177,7 +170,7 @@ func (c *consumer) Unsubscribe() error {
var errMsg string
for _, consumer := range c.consumers {
if err := consumer.Unsubscribe(); err != nil {
errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
errMsg += fmt.Sprintf("topic %s, subscription %s: %s", consumer.topic, c.Subscription(), err)
}
}
if errMsg != "" {
Expand Down
2 changes: 0 additions & 2 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func TestBatchMessageReceive(t *testing.T) {
SubscriptionName: subName,
})
assert.Nil(t, err)
assert.Equal(t, topicName, consumer.Topic())
count := 0

for i := 0; i < numOfMessages; i++ {
Expand Down Expand Up @@ -397,7 +396,6 @@ func TestConsumerReceiveTimeout(t *testing.T) {
Topic: topic,
SubscriptionName: "my-sub1",
Type: Shared,
AckTimeout: 5 * 1000,
})
assert.Nil(t, err)
defer consumer.Close()
Expand Down
199 changes: 0 additions & 199 deletions pulsar/unacked_msg_tracker.go

This file was deleted.

66 changes: 0 additions & 66 deletions pulsar/unacked_msg_tracker_test.go

This file was deleted.

0 comments on commit 367984a

Please sign in to comment.