diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index ff6164b1d81..454b489fad6 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -70,10 +70,11 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit } consumerParams := consumer.Params{ - InternalConsumer: saramaConsumer, - ProcessorFactory: *processorFactory, - Factory: metricsFactory, - Logger: logger, + InternalConsumer: saramaConsumer, + ProcessorFactory: *processorFactory, + Factory: metricsFactory, + Logger: logger, + DeadlockCheckInterval: options.DeadlockInterval, } return consumer.New(consumerParams) } diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index c939eac60c4..aa899a06174 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -29,10 +29,11 @@ import ( // Params are the parameters of a Consumer type Params struct { - ProcessorFactory ProcessorFactory - Factory metrics.Factory - Logger *zap.Logger - InternalConsumer consumer.Consumer + ProcessorFactory ProcessorFactory + Factory metrics.Factory + Logger *zap.Logger + InternalConsumer consumer.Consumer + DeadlockCheckInterval time.Duration } // Consumer uses sarama to consume and handle messages from kafka @@ -55,7 +56,7 @@ type consumerState struct { // New is a constructor for a Consumer func New(params Params) (*Consumer, error) { - deadlockDetector := newDeadlockDetector(params.Factory, params.Logger, time.Minute) + deadlockDetector := newDeadlockDetector(params.Factory, params.Logger, params.DeadlockCheckInterval) return &Consumer{ metricsFactory: params.Factory, logger: params.Logger, diff --git a/cmd/ingester/app/consumer/deadlock_detector.go b/cmd/ingester/app/consumer/deadlock_detector.go index 4b5b2696742..8e683671b40 100644 --- a/cmd/ingester/app/consumer/deadlock_detector.go +++ b/cmd/ingester/app/consumer/deadlock_detector.go @@ -93,7 +93,11 @@ func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partiti }, } - go s.monitorForPartition(w, partition) + if s.interval == 0 { + s.logger.Debug("Partition deadlock detector disabled") + } else { + go s.monitorForPartition(w, partition) + } return w } @@ -138,21 +142,25 @@ func (s *deadlockDetector) start() { } go func() { - s.logger.Debug("Starting global deadlock detector") - ticker := time.NewTicker(s.interval) - defer ticker.Stop() - - for { - select { - case <-detector.done: - s.logger.Debug("Closing global ticker routine") - return - case <-ticker.C: - if atomic.LoadUint64(detector.msgConsumed) == 0 { - s.panicFunc(-1) - return // For tests + if s.interval == 0 { + s.logger.Debug("Global deadlock detector disabled") + } else { + s.logger.Debug("Starting global deadlock detector") + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + + for { + select { + case <-detector.done: + s.logger.Debug("Closing global ticker routine") + return + case <-ticker.C: + if atomic.LoadUint64(detector.msgConsumed) == 0 { + s.panicFunc(-1) + return // For tests + } + atomic.StoreUint64(detector.msgConsumed, 0) } - atomic.StoreUint64(detector.msgConsumed, 0) } } }() diff --git a/cmd/ingester/app/consumer/deadlock_detector_test.go b/cmd/ingester/app/consumer/deadlock_detector_test.go index 0cb31fbd1bf..5b659d167e6 100644 --- a/cmd/ingester/app/consumer/deadlock_detector_test.go +++ b/cmd/ingester/app/consumer/deadlock_detector_test.go @@ -112,3 +112,34 @@ func TestGlobalPanic(t *testing.T) { d.start() wg.Wait() } + +func TestNoGlobalPanicIfDeadlockDetectorDisabled(t *testing.T) { + l, _ := zap.NewDevelopment() + d := deadlockDetector{ + metricsFactory: metrics.NewLocalFactory(0), + logger: l, + interval: 0, + panicFunc: func(partition int32) { + t.Errorf("Should not panic when deadlock detector is disabled") + }, + } + + d.start() + + time.Sleep(100 * time.Millisecond) +} + +func TestNoPanicForPartitionIfDeadlockDetectorDisabled(t *testing.T) { + l, _ := zap.NewDevelopment() + d := deadlockDetector{ + metricsFactory: metrics.NewLocalFactory(0), + logger: l, + interval: 0, + panicFunc: func(partition int32) { + t.Errorf("Should not panic when deadlock detector is disabled") + }, + } + + d.startMonitoringForPartition(1) + time.Sleep(100 * time.Millisecond) +} diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 2439189fcd8..e52ad3924b8 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/spf13/viper" @@ -43,6 +44,8 @@ const ( SuffixParallelism = ".parallelism" // SuffixEncoding is a suffix for the encoding flag SuffixEncoding = ".encoding" + // SuffixDeadlockInterval is a suffix for deadlock detecor flag + SuffixDeadlockInterval = ".deadlockInterval" // DefaultBroker is the default kafka broker DefaultBroker = "127.0.0.1:9092" @@ -54,13 +57,16 @@ const ( DefaultParallelism = 1000 // DefaultEncoding is the default span encoding DefaultEncoding = EncodingProto + // DefaultDeadlockInterval is the default deadlock interval + DefaultDeadlockInterval = 1 * time.Minute ) // Options stores the configuration options for the Ingester type Options struct { kafkaConsumer.Configuration - Parallelism int - Encoding string + Parallelism int + Encoding string + DeadlockInterval time.Duration } // AddFlags adds flags for Builder @@ -85,6 +91,10 @@ func AddFlags(flagSet *flag.FlagSet) { ConfigPrefix+SuffixEncoding, DefaultEncoding, fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON)) + flagSet.String( + ConfigPrefix+SuffixDeadlockInterval, + DefaultDeadlockInterval.String(), + "Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disable deadlock check.") } // InitFromViper initializes Builder with properties from viper @@ -94,4 +104,11 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.GroupID = v.GetString(ConfigPrefix + SuffixGroupID) o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) o.Encoding = v.GetString(ConfigPrefix + SuffixEncoding) + + d, err := time.ParseDuration(v.GetString(ConfigPrefix + SuffixDeadlockInterval)) + if err != nil { + o.DeadlockInterval = DefaultDeadlockInterval + } else { + o.DeadlockInterval = d + } } diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 84a27b1c65b..bc92179299e 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -16,6 +16,7 @@ package app import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -30,6 +31,7 @@ func TestOptionsWithFlags(t *testing.T) { "--ingester.brokers=127.0.0.1:9092,0.0.0:1234", "--ingester.group-id=group1", "--ingester.parallelism=5", + "--ingester.deadlockInterval=2m", "--ingester.encoding=json"}) o.InitFromViper(v) @@ -37,6 +39,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers) assert.Equal(t, "group1", o.GroupID) assert.Equal(t, 5, o.Parallelism) + assert.Equal(t, 2*time.Minute, o.DeadlockInterval) assert.Equal(t, EncodingJSON, o.Encoding) } @@ -51,4 +54,5 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, DefaultGroupID, o.GroupID) assert.Equal(t, DefaultParallelism, o.Parallelism) assert.Equal(t, DefaultEncoding, o.Encoding) + assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval) }