diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index 55261eabd82..ee0301765f2 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -70,10 +70,11 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit } consumerParams := consumer.Params{ - InternalConsumer: saramaConsumer, - ProcessorFactory: *processorFactory, - MetricsFactory: metricsFactory, - Logger: logger, + InternalConsumer: saramaConsumer, + ProcessorFactory: *processorFactory, + MetricsFactory: metricsFactory, + Logger: logger, + DeadlockCheckInterval: options.DeadlockInterval, } return consumer.New(consumerParams) } diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 03ca9ecf7a1..5f983f3914a 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -29,10 +29,11 @@ import ( // Params are the parameters of a Consumer type Params struct { - ProcessorFactory ProcessorFactory - MetricsFactory metrics.Factory - Logger *zap.Logger - InternalConsumer consumer.Consumer + ProcessorFactory ProcessorFactory + MetricsFactory metrics.Factory + Logger *zap.Logger + InternalConsumer consumer.Consumer + DeadlockCheckInterval time.Duration } // Consumer uses sarama to consume and handle messages from kafka @@ -56,7 +57,7 @@ type consumerState struct { // New is a constructor for a Consumer func New(params Params) (*Consumer, error) { - deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, time.Minute) + deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval) return &Consumer{ metricsFactory: params.MetricsFactory, logger: params.Logger, diff --git a/cmd/ingester/app/consumer/deadlock_detector.go b/cmd/ingester/app/consumer/deadlock_detector.go index 4b5b2696742..ebb8a65ea70 100644 --- a/cmd/ingester/app/consumer/deadlock_detector.go +++ b/cmd/ingester/app/consumer/deadlock_detector.go @@ -52,12 +52,14 @@ type partitionDeadlockDetector struct { closePartition chan struct{} done chan struct{} incrementAllPartitionMsgCount func() + disabled bool } type allPartitionsDeadlockDetector struct { msgConsumed *uint64 logger *zap.Logger done chan struct{} + disabled bool } func newDeadlockDetector(metricsFactory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector { @@ -87,13 +89,18 @@ func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partiti closePartition: make(chan struct{}, 1), done: make(chan struct{}), logger: s.logger, + disabled: 0 == s.interval, incrementAllPartitionMsgCount: func() { s.allPartitionsDeadlockDetector.incrementMsgCount() }, } - go s.monitorForPartition(w, partition) + if w.disabled { + s.logger.Debug("Partition deadlock detector disabled") + } else { + go s.monitorForPartition(w, partition) + } return w } @@ -135,32 +142,40 @@ func (s *deadlockDetector) start() { msgConsumed: &msgConsumed, done: make(chan struct{}), logger: s.logger, + disabled: 0 == s.interval, } - go func() { + if detector.disabled { + s.logger.Debug("Global deadlock detector disabled") + } else { s.logger.Debug("Starting global deadlock detector") - ticker := time.NewTicker(s.interval) - defer ticker.Stop() - - for { - select { - case <-detector.done: - s.logger.Debug("Closing global ticker routine") - return - case <-ticker.C: - if atomic.LoadUint64(detector.msgConsumed) == 0 { - s.panicFunc(-1) - return // For tests + go func() { + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + + for { + select { + case <-detector.done: + s.logger.Debug("Closing global ticker routine") + return + case <-ticker.C: + if atomic.LoadUint64(detector.msgConsumed) == 0 { + s.panicFunc(-1) + return // For tests + } + atomic.StoreUint64(detector.msgConsumed, 0) } - atomic.StoreUint64(detector.msgConsumed, 0) } - } - }() + }() + } s.allPartitionsDeadlockDetector = detector } func (s *deadlockDetector) close() { + if s.allPartitionsDeadlockDetector.disabled { + return + } s.logger.Debug("Closing all partitions deadlock detector") s.allPartitionsDeadlockDetector.done <- struct{}{} } @@ -174,6 +189,9 @@ func (w *partitionDeadlockDetector) closePartitionChannel() chan struct{} { } func (w *partitionDeadlockDetector) close() { + if w.disabled { + return + } w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition)) w.done <- struct{}{} } diff --git a/cmd/ingester/app/consumer/deadlock_detector_test.go b/cmd/ingester/app/consumer/deadlock_detector_test.go index 0cb31fbd1bf..3d736f77c87 100644 --- a/cmd/ingester/app/consumer/deadlock_detector_test.go +++ b/cmd/ingester/app/consumer/deadlock_detector_test.go @@ -112,3 +112,54 @@ func TestGlobalPanic(t *testing.T) { d.start() wg.Wait() } + +func TestNoGlobalPanicIfDeadlockDetectorDisabled(t *testing.T) { + l, _ := zap.NewDevelopment() + d := deadlockDetector{ + metricsFactory: metrics.NewLocalFactory(0), + logger: l, + interval: 0, + panicFunc: func(partition int32) { + t.Errorf("Should not panic when deadlock detector is disabled") + }, + } + + d.start() + + time.Sleep(100 * time.Millisecond) + + d.close() +} + +func TestNoPanicForPartitionIfDeadlockDetectorDisabled(t *testing.T) { + l, _ := zap.NewDevelopment() + d := deadlockDetector{ + metricsFactory: metrics.NewLocalFactory(0), + logger: l, + interval: 0, + panicFunc: func(partition int32) { + t.Errorf("Should not panic when deadlock detector is disabled") + }, + } + + w := d.startMonitoringForPartition(1) + time.Sleep(100 * time.Millisecond) + + w.close() +} + +//same as TestNoClosingSignalIfMessagesProcessedInInterval but with disabled deadlock detector +func TestApiCompatibilityWhenDeadlockDetectorDisabled(t *testing.T) { + mf := metrics.NewLocalFactory(0) + l, _ := zap.NewDevelopment() + f := newDeadlockDetector(mf, l, 0) + f.start() + defer f.close() + + w := f.startMonitoringForPartition(1) + + w.incrementMsgCount() + w.incrementAllPartitionMsgCount() + assert.Zero(t, len(w.closePartitionChannel())) + w.close() +} diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 75c1eed814f..9ffbffac113 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/spf13/viper" @@ -43,6 +44,8 @@ const ( SuffixGroupID = ".group-id" // SuffixEncoding is a suffix for the encoding flag SuffixEncoding = ".encoding" + // SuffixDeadlockInterval is a suffix for deadlock detecor flag + SuffixDeadlockInterval = ".deadlockInterval" // SuffixParallelism is a suffix for the parallelism flag SuffixParallelism = ".parallelism" // SuffixHTTPPort is a suffix for the HTTP port @@ -58,6 +61,8 @@ const ( DefaultParallelism = 1000 // DefaultEncoding is the default span encoding DefaultEncoding = EncodingProto + // DefaultDeadlockInterval is the default deadlock interval + DefaultDeadlockInterval = 1 * time.Minute // DefaultHTTPPort is the default HTTP port (e.g. for /metrics) DefaultHTTPPort = 14271 // IngesterDefaultHealthCheckHTTPPort is the default HTTP Port for health check @@ -71,6 +76,7 @@ type Options struct { Encoding string // IngesterHTTPPort is the port that the ingester service listens in on for http requests IngesterHTTPPort int + DeadlockInterval time.Duration } // AddFlags adds flags for Builder @@ -99,6 +105,10 @@ func AddFlags(flagSet *flag.FlagSet) { ConfigPrefix+SuffixHTTPPort, DefaultHTTPPort, "The http port for the ingester service") + flagSet.Duration( + ConfigPrefix+SuffixDeadlockInterval, + DefaultDeadlockInterval, + "Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.") } @@ -110,4 +120,6 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.Encoding = v.GetString(KafkaConfigPrefix + SuffixEncoding) o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) o.IngesterHTTPPort = v.GetInt(ConfigPrefix + SuffixHTTPPort) + + o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval) } diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index f7be47c9096..c502581d3b1 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -16,6 +16,7 @@ package app import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -31,6 +32,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.group-id=group1", "--kafka.encoding=json", "--ingester.parallelism=5", + "--ingester.deadlockInterval=2m", "--ingester.http-port=2345"}) o.InitFromViper(v) @@ -38,6 +40,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers) assert.Equal(t, "group1", o.GroupID) assert.Equal(t, 5, o.Parallelism) + assert.Equal(t, 2*time.Minute, o.DeadlockInterval) assert.Equal(t, EncodingJSON, o.Encoding) assert.Equal(t, 2345, o.IngesterHTTPPort) } @@ -53,4 +56,5 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, DefaultGroupID, o.GroupID) assert.Equal(t, DefaultParallelism, o.Parallelism) assert.Equal(t, DefaultEncoding, o.Encoding) + assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval) }