Skip to content

Commit

Permalink
Simplify
Browse files Browse the repository at this point in the history
Signed-off-by: albertteoh <albert.teoh@logz.io>
  • Loading branch information
albertteoh committed Oct 25, 2020
1 parent 5652c13 commit 592e1d0
Showing 1 changed file with 5 additions and 31 deletions.
36 changes: 5 additions & 31 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ type Consumer struct {
partitionsHeld int64
partitionsHeldGauge metrics.Gauge

messagesDoneChan chan string
errorsDoneChan chan string
doneWg sync.WaitGroup
doneWg sync.WaitGroup
}

type consumerState struct {
Expand All @@ -71,8 +69,6 @@ func New(params Params) (*Consumer, error) {
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
messagesDoneChan: make(chan string),
errorsDoneChan: make(chan string),
}, nil
}

Expand All @@ -92,20 +88,6 @@ func (c *Consumer) Start() {
go c.handleErrors(pc.Partition(), pc.Errors())
}
}()

// Expect to receive message and error handler "done" signals from each partition.
go waitForDoneSignals(c.messagesDoneChan, &c.doneWg, c.logger)
go waitForDoneSignals(c.errorsDoneChan, &c.doneWg, c.logger)
}

// waitForDoneSignals watches the doneChan for incoming "done" messages. If a message is received,
// the doneWg WaitGroup is decremented via a call to Done().
func waitForDoneSignals(doneChan <-chan string, doneWg *sync.WaitGroup, logger *zap.Logger) {
logger.Debug("Waiting for done signals")
for v := range doneChan {
logger.Debug("Received done signal", zap.String("msg", v))
doneWg.Done()
}
}

// Close closes the Consumer and underlying sarama consumer
Expand All @@ -120,15 +102,10 @@ func (c *Consumer) Close() error {
c.logger.Debug("Waiting for messages and errors to be handled")
c.doneWg.Wait()

c.logger.Debug("Closing message and error done channels")
close(c.messagesDoneChan)
close(c.errorsDoneChan)

return err
}

// handleMessages handles incoming Kafka messages on a channel. Upon the closure of the message channel,
// handleMessages will signal the messagesDoneChan to indicate the graceful shutdown of message handling is done.
// handleMessages handles incoming Kafka messages on a channel
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionMapLock.Lock()
Expand All @@ -141,7 +118,7 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.partitionsHeld--
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()
c.messagesDoneChan <- "HandleMessages done"
c.doneWg.Done()
}()

msgMetrics := c.newMsgMetrics(pc.Partition())
Expand Down Expand Up @@ -185,13 +162,10 @@ func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
}

// handleErrors handles incoming Kafka consumer errors on a channel. Upon the closure of the error channel,
// handleErrors will signal the errorsDoneChan to indicate the graceful shutdown of error handling is done.
// handleErrors handles incoming Kafka consumer errors on a channel
func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
c.logger.Info("Starting error handler", zap.Int32("partition", partition))
defer func() {
c.errorsDoneChan <- "HandleErrors done"
}()
defer c.doneWg.Done()

errMetrics := c.newErrMetrics(partition)
for err := range errChan {
Expand Down

0 comments on commit 592e1d0

Please sign in to comment.