Skip to content

Commit

Permalink
Configurable deadlock detector interval for ingester. Value of 0 disa…
Browse files Browse the repository at this point in the history
…bles deadlock_detector. #issue1225

Signed-off-by: Marek Chodor <marek.chodor@gmail.com>
  • Loading branch information
Chodor Marek committed Oct 22, 2018
1 parent 4a07b78 commit 363f112
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 26 deletions.
9 changes: 5 additions & 4 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
}

consumerParams := consumer.Params{
InternalConsumer: saramaConsumer,
ProcessorFactory: *processorFactory,
Factory: metricsFactory,
Logger: logger,
InternalConsumer: saramaConsumer,
ProcessorFactory: *processorFactory,
Factory: metricsFactory,
Logger: logger,
DeadlockCheckInterval: options.DeadlockInterval,
}
return consumer.New(consumerParams)
}
11 changes: 6 additions & 5 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (

// Params are the parameters of a Consumer
type Params struct {
ProcessorFactory ProcessorFactory
Factory metrics.Factory
Logger *zap.Logger
InternalConsumer consumer.Consumer
ProcessorFactory ProcessorFactory
Factory metrics.Factory
Logger *zap.Logger
InternalConsumer consumer.Consumer
DeadlockCheckInterval time.Duration
}

// Consumer uses sarama to consume and handle messages from kafka
Expand All @@ -55,7 +56,7 @@ type consumerState struct {

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
deadlockDetector := newDeadlockDetector(params.Factory, params.Logger, time.Minute)
deadlockDetector := newDeadlockDetector(params.Factory, params.Logger, params.DeadlockCheckInterval)
return &Consumer{
metricsFactory: params.Factory,
logger: params.Logger,
Expand Down
38 changes: 23 additions & 15 deletions cmd/ingester/app/consumer/deadlock_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partiti
},
}

go s.monitorForPartition(w, partition)
if s.interval == 0 {
s.logger.Debug("Partition deadlock detector disabled")
} else {
go s.monitorForPartition(w, partition)
}

return w
}
Expand Down Expand Up @@ -138,21 +142,25 @@ func (s *deadlockDetector) start() {
}

go func() {
s.logger.Debug("Starting global deadlock detector")
ticker := time.NewTicker(s.interval)
defer ticker.Stop()

for {
select {
case <-detector.done:
s.logger.Debug("Closing global ticker routine")
return
case <-ticker.C:
if atomic.LoadUint64(detector.msgConsumed) == 0 {
s.panicFunc(-1)
return // For tests
if s.interval == 0 {
s.logger.Debug("Global deadlock detector disabled")
} else {
s.logger.Debug("Starting global deadlock detector")
ticker := time.NewTicker(s.interval)
defer ticker.Stop()

for {
select {
case <-detector.done:
s.logger.Debug("Closing global ticker routine")
return
case <-ticker.C:
if atomic.LoadUint64(detector.msgConsumed) == 0 {
s.panicFunc(-1)
return // For tests
}
atomic.StoreUint64(detector.msgConsumed, 0)
}
atomic.StoreUint64(detector.msgConsumed, 0)
}
}
}()
Expand Down
31 changes: 31 additions & 0 deletions cmd/ingester/app/consumer/deadlock_detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,34 @@ func TestGlobalPanic(t *testing.T) {
d.start()
wg.Wait()
}

func TestNoGlobalPanicIfDeadlockDetectorDisabled(t *testing.T) {
l, _ := zap.NewDevelopment()
d := deadlockDetector{
metricsFactory: metrics.NewLocalFactory(0),
logger: l,
interval: 0,
panicFunc: func(partition int32) {
t.Errorf("Should not panic when deadlock detector is disabled")
},
}

d.start()

time.Sleep(100 * time.Millisecond)
}

func TestNoPanicForPartitionIfDeadlockDetectorDisabled(t *testing.T) {
l, _ := zap.NewDevelopment()
d := deadlockDetector{
metricsFactory: metrics.NewLocalFactory(0),
logger: l,
interval: 0,
panicFunc: func(partition int32) {
t.Errorf("Should not panic when deadlock detector is disabled")
},
}

d.startMonitoringForPartition(1)
time.Sleep(100 * time.Millisecond)
}
21 changes: 19 additions & 2 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/spf13/viper"

Expand All @@ -43,6 +44,8 @@ const (
SuffixParallelism = ".parallelism"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"
// SuffixDeadlockInterval is a suffix for deadlock detecor flag
SuffixDeadlockInterval = ".deadlockInterval"

// DefaultBroker is the default kafka broker
DefaultBroker = "127.0.0.1:9092"
Expand All @@ -54,13 +57,16 @@ const (
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
DefaultEncoding = EncodingProto
// DefaultDeadlockInterval is the default deadlock interval
DefaultDeadlockInterval = 1 * time.Minute
)

// Options stores the configuration options for the Ingester
type Options struct {
kafkaConsumer.Configuration
Parallelism int
Encoding string
Parallelism int
Encoding string
DeadlockInterval time.Duration
}

// AddFlags adds flags for Builder
Expand All @@ -85,6 +91,10 @@ func AddFlags(flagSet *flag.FlagSet) {
ConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON))
flagSet.String(
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval.String(),
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disable deadlock check.")
}

// InitFromViper initializes Builder with properties from viper
Expand All @@ -94,4 +104,11 @@ func (o *Options) InitFromViper(v *viper.Viper) {
o.GroupID = v.GetString(ConfigPrefix + SuffixGroupID)
o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.Encoding = v.GetString(ConfigPrefix + SuffixEncoding)

d, err := time.ParseDuration(v.GetString(ConfigPrefix + SuffixDeadlockInterval))
if err != nil {
o.DeadlockInterval = DefaultDeadlockInterval
} else {
o.DeadlockInterval = d
}
}
4 changes: 4 additions & 0 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package app

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -30,13 +31,15 @@ func TestOptionsWithFlags(t *testing.T) {
"--ingester.brokers=127.0.0.1:9092,0.0.0:1234",
"--ingester.group-id=group1",
"--ingester.parallelism=5",
"--ingester.deadlockInterval=2m",
"--ingester.encoding=json"})
o.InitFromViper(v)

assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, 5, o.Parallelism)
assert.Equal(t, 2*time.Minute, o.DeadlockInterval)
assert.Equal(t, EncodingJSON, o.Encoding)
}

Expand All @@ -51,4 +54,5 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, DefaultGroupID, o.GroupID)
assert.Equal(t, DefaultParallelism, o.Parallelism)
assert.Equal(t, DefaultEncoding, o.Encoding)
assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval)
}

0 comments on commit 363f112

Please sign in to comment.