diff --git a/pkg/kafka/subscriber.go b/pkg/kafka/subscriber.go index be680cd..46a5a0c 100644 --- a/pkg/kafka/subscriber.go +++ b/pkg/kafka/subscriber.go @@ -4,6 +4,7 @@ import ( "context" "strings" "sync" + "sync/atomic" "time" "github.com/IBM/sarama" @@ -21,7 +22,7 @@ type Subscriber struct { closing chan struct{} subscribersWg sync.WaitGroup - closed bool + closed uint32 } // NewSubscriber creates a new Kafka Subscriber. @@ -137,7 +138,7 @@ func DefaultSaramaSubscriberConfig() *sarama.Config { // // There are multiple subscribers spawned func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { - if s.closed { + if atomic.LoadUint32(&s.closed) == 1 { return nil, errors.New("subscriber closed") } @@ -474,11 +475,10 @@ func (s *Subscriber) createMessagesHandler(output chan *message.Message) message } func (s *Subscriber) Close() error { - if s.closed { + if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) { return nil } - s.closed = true close(s.closing) s.subscribersWg.Wait()