Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

fix panic caused by multiple closes of pluginFatal channel #1107

Merged
merged 1 commit into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/metrictank/metrictank.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"fmt"
l "log"
Expand Down Expand Up @@ -402,12 +403,12 @@ func main() {
/***********************************
Start our inputs
***********************************/
pluginFatal := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
for _, plugin := range inputs {
if carbonPlugin, ok := plugin.(*inCarbon.Carbon); ok {
carbonPlugin.IntervalGetter(inCarbon.NewIndexIntervalGetter(metricIndex))
}
err = plugin.Start(input.NewDefaultHandler(metrics, metricIndex, plugin.Name()), pluginFatal)
err = plugin.Start(input.NewDefaultHandler(metrics, metricIndex, plugin.Name()), cancel)
if err != nil {
shutdown()
return
Expand Down Expand Up @@ -439,7 +440,7 @@ func main() {
select {
case sig := <-sigChan:
log.Infof("Received signal %q. Shutting down", sig)
case <-pluginFatal:
case <-ctx.Done():
log.Info("An input plugin signalled a fatal error. Shutting down")
}
shutdown()
Expand Down
7 changes: 4 additions & 3 deletions cmd/mt-kafka-mdm-sniff-out-of-order/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"fmt"
"math/rand"
Expand Down Expand Up @@ -211,14 +212,14 @@ func main() {
stats.NewDevnull() // make sure metrics don't pile up without getting discarded

mdm := inKafkaMdm.New()
pluginFatal := make(chan struct{})
mdm.Start(newInputOOOFinder(*format), pluginFatal)
ctx, cancel := context.WithCancel(context.Background())
mdm.Start(newInputOOOFinder(*format), cancel)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-sigChan:
log.Infof("Received signal %q. Shutting down", sig)
case <-pluginFatal:
case <-ctx.Done():
log.Info("Mdm input plugin signalled a fatal error. Shutting down")
}
mdm.Stop()
Expand Down
7 changes: 4 additions & 3 deletions cmd/mt-kafka-mdm-sniff/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"fmt"
"math/rand"
Expand Down Expand Up @@ -158,14 +159,14 @@ func main() {
stats.NewDevnull() // make sure metrics don't pile up without getting discarded

mdm := inKafkaMdm.New()
pluginFatal := make(chan struct{})
mdm.Start(newInputPrinter(*formatMd, *formatP), pluginFatal)
ctx, cancel := context.WithCancel(context.Background())
mdm.Start(newInputPrinter(*formatMd, *formatP), cancel)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-sigChan:
log.Infof("Received signal %q. Shutting down", sig)
case <-pluginFatal:
case <-ctx.Done():
log.Info("Mdm input plugin signalled a fatal error. Shutting down")
}
mdm.Stop()
Expand Down
3 changes: 2 additions & 1 deletion input/carbon/carbon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package carbon

import (
"bufio"
"context"
"flag"
"io"
"net"
Expand Down Expand Up @@ -106,7 +107,7 @@ func (c *Carbon) IntervalGetter(i IntervalGetter) {
c.intervalGetter = i
}

func (c *Carbon) Start(handler input.Handler, fatal chan struct{}) error {
func (c *Carbon) Start(handler input.Handler, cancel context.CancelFunc) error {
c.Handler = handler
l, err := net.ListenTCP("tcp", c.addr)
if nil != err {
Expand Down
15 changes: 8 additions & 7 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafkamdm

import (
"context"
"flag"
"fmt"
"strconv"
Expand Down Expand Up @@ -37,7 +38,7 @@ type KafkaMdm struct {
// signal to PartitionConsumers to shutdown
stopConsuming chan struct{}
// signal to caller that it should shutdown
fatal chan struct{}
cancel context.CancelFunc
}

func (k *KafkaMdm) Name() string {
Expand Down Expand Up @@ -209,9 +210,9 @@ func New() *KafkaMdm {
return &k
}

func (k *KafkaMdm) Start(handler input.Handler, fatal chan struct{}) error {
func (k *KafkaMdm) Start(handler input.Handler, cancel context.CancelFunc) error {
k.Handler = handler
k.fatal = fatal
k.cancel = cancel
var err error
for _, topic := range topics {
for _, partition := range partitions {
Expand Down Expand Up @@ -288,7 +289,7 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
newest, err := k.tryGetOffset(topic, partition, sarama.OffsetNewest, 7, time.Second*10)
if err != nil {
log.Errorf("kafkamdm: %s", err.Error())
close(k.fatal)
k.cancel()
return
}
if currentOffset == sarama.OffsetNewest {
Expand All @@ -297,7 +298,7 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
currentOffset, err = k.tryGetOffset(topic, partition, sarama.OffsetOldest, 7, time.Second*10)
if err != nil {
log.Errorf("kafkamdm: %s", err.Error())
close(k.fatal)
k.cancel()
return
}
}
Expand All @@ -310,7 +311,7 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
pc, err := k.consumer.ConsumePartition(topic, partition, currentOffset)
if err != nil {
log.Errorf("kafkamdm: failed to start partitionConsumer for %s:%d. %s", topic, partition, err)
close(k.fatal)
k.cancel()
return
}
messages := pc.Messages()
Expand All @@ -324,7 +325,7 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil {
log.Errorf("kafkamdm: failed to commit offset for %s:%d, %s", topic, partition, err)
}
close(k.fatal)
k.cancel()
return
}
log.Debugf("kafkamdm: received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key)
Expand Down
8 changes: 5 additions & 3 deletions input/plugin.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package input

import "context"

type Plugin interface {
Name() string
// Start starts the plugin.
// The plugin closes the fatal chan should any non-recoverable error occur after Start has returned.
// if Start returns an error, or the fatal chan is closed by the plugin,
// The plugin calls cancel should any non-recoverable error occur after Start has returned.
// if Start returns an error, or cancel is called by the plugin,
// the caller (e.g. main process) should shut down all its resources and exit.
// Note that upon fatal close, metrictank will call Stop() on all plugins, also the one that triggered it.
Start(handler Handler, fatal chan struct{}) error
Start(handler Handler, cancel context.CancelFunc) error
MaintainPriority()
ExplainPriority() interface{}
Stop() // Should block until shutdown is complete.
Expand Down
5 changes: 2 additions & 3 deletions input/prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prometheus

import (
"context"
"flag"
"fmt"
"io/ioutil"
Expand All @@ -25,7 +26,6 @@ var (

type prometheusWriteHandler struct {
input.Handler
quit chan struct{}
}

func New() *prometheusWriteHandler {
Expand All @@ -36,9 +36,8 @@ func (p *prometheusWriteHandler) Name() string {
return "prometheus"
}

func (p *prometheusWriteHandler) Start(handler input.Handler, fatal chan struct{}) error {
func (p *prometheusWriteHandler) Start(handler input.Handler, cancel context.CancelFunc) error {
p.Handler = handler
p.quit = fatal
ConfigSetup()

mux := http.NewServeMux()
Expand Down