Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

notifier: deprecate NSQ ( rip :'( ) + add mt-kafka-persist-sniff tool #1161

Merged
merged 5 commits into from
Dec 7, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
introduce concept of a NotifierHandler
not to be confused with the previous type, which should just have been
called Notifier.
The goal is to not tightly couple the Notifier to the idx/metric,
to enable upcoming tools to read the notifier data from kafka and print
it
Dieterbe committed Dec 4, 2018
commit 9f5e6486467da27a8e8ffd534ee7dbb64d14b86d
12 changes: 6 additions & 6 deletions cmd/metrictank/metrictank.go
Original file line number Diff line number Diff line change
@@ -112,7 +112,7 @@ func main() {
inKafkaMdm.ConfigSetup()
inPrometheus.ConfigSetup()

// load config for cluster handlers
// load config for cluster notifiers
notifierNsq.ConfigSetup()

// load config for metricIndexers
@@ -387,18 +387,18 @@ func main() {
/***********************************
Initialize MetricPersist notifiers
***********************************/
handlers := make([]mdata.NotifierHandler, 0)
var notifiers []mdata.Notifier
if notifierKafka.Enabled {
// The notifierKafka handler will block here until it has processed the backlog of metricPersist messages.
// The notifierKafka notifiers will block here until it has processed the backlog of metricPersist messages.
// it will block for at most kafka-cluster.backlog-process-timeout (default 60s)
handlers = append(handlers, notifierKafka.New(*instance, metrics, metricIndex))
notifiers = append(notifiers, notifierKafka.New(*instance, mdata.NewDefaultNotifierHandler(metrics, metricIndex)))
}

if notifierNsq.Enabled {
handlers = append(handlers, notifierNsq.New(*instance, metrics, metricIndex))
notifiers = append(notifiers, notifierNsq.New(*instance, metrics, metricIndex))
}

mdata.InitPersistNotifier(handlers...)
mdata.InitPersistNotifier(notifiers...)

/***********************************
Start our inputs
40 changes: 32 additions & 8 deletions mdata/notifier.go
Original file line number Diff line number Diff line change
@@ -12,13 +12,13 @@ import (
)

var (
notifierHandlers []NotifierHandler
notifiers []Notifier

// metric cluster.notifier.all.messages-received is a counter of messages received from cluster notifiers
messagesReceived = stats.NewCounter32("cluster.notifier.all.messages-received")
)

type NotifierHandler interface {
type Notifier interface {
Send(SavedChunk)
}

@@ -39,16 +39,40 @@ type SavedChunk struct {

func SendPersistMessage(key string, t0 uint32) {
sc := SavedChunk{Key: key, T0: t0}
for _, h := range notifierHandlers {
for _, h := range notifiers {
h.Send(sc)
}
}

func InitPersistNotifier(handlers ...NotifierHandler) {
notifierHandlers = handlers
func InitPersistNotifier(not ...Notifier) {
notifiers = not
}

type NotifierHandler interface {
// Handle handles an incoming message
Handle([]byte)
// PartitionOf is used for notifiers that want to flush and need partition information for metrics
PartitionOf(key schema.MKey) (int32, bool)
}

type DefaultNotifierHandler struct {
idx idx.MetricIndex
metrics Metrics
}

func NewDefaultNotifierHandler(metrics Metrics, idx idx.MetricIndex) DefaultNotifierHandler {
return DefaultNotifierHandler{
idx: idx,
metrics: metrics,
}
}

func (dn DefaultNotifierHandler) PartitionOf(key schema.MKey) (int32, bool) {
def, ok := dn.idx.Get(key)
return def.Partition, ok
}

func Handle(metrics Metrics, data []byte, idx idx.MetricIndex) {
func (dn DefaultNotifierHandler) Handle(data []byte) {
version := uint8(data[0])
if version == uint8(PersistMessageBatchV1) {
batch := PersistMessageBatch{}
@@ -66,12 +90,12 @@ func Handle(metrics Metrics, data []byte, idx idx.MetricIndex) {
}
// we only need to handle saves for series that we know about.
// if the series is not in the index, then we dont need to worry about it.
def, ok := idx.Get(amkey.MKey)
def, ok := dn.idx.Get(amkey.MKey)
if !ok {
log.Debugf("notifier: skipping metric with MKey %s as it is not in the index", amkey.MKey)
continue
}
agg := metrics.GetOrCreate(amkey.MKey, def.SchemaId, def.AggId)
agg := dn.metrics.GetOrCreate(amkey.MKey, def.SchemaId, def.AggId)
if amkey.Archive != 0 {
consolidator := consolidation.FromArchive(amkey.Archive.Method())
aggSpan := amkey.Archive.Span()
16 changes: 6 additions & 10 deletions mdata/notifierKafka/notifierKafka.go
Original file line number Diff line number Diff line change
@@ -10,7 +10,6 @@ import (
"github.com/raintank/schema"

"github.com/Shopify/sarama"
"github.com/grafana/metrictank/idx"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/util"
log "github.com/sirupsen/logrus"
@@ -21,9 +20,8 @@ type NotifierKafka struct {
in chan mdata.SavedChunk
buf []mdata.SavedChunk
wg sync.WaitGroup
idx idx.MetricIndex
metrics mdata.Metrics
bPool *util.BufferPool
handler mdata.NotifierHandler
client sarama.Client
consumer sarama.Consumer
producer sarama.SyncProducer
@@ -33,7 +31,7 @@ type NotifierKafka struct {
stopConsuming chan struct{}
}

func New(instance string, metrics mdata.Metrics, idx idx.MetricIndex) *NotifierKafka {
func New(instance string, handler mdata.NotifierHandler) *NotifierKafka {
client, err := sarama.NewClient(brokers, config)
if err != nil {
log.Fatalf("kafka-cluster: failed to start client: %s", err)
@@ -52,8 +50,6 @@ func New(instance string, metrics mdata.Metrics, idx idx.MetricIndex) *NotifierK
c := NotifierKafka{
instance: instance,
in: make(chan mdata.SavedChunk),
idx: idx,
metrics: metrics,
bPool: util.NewBufferPool(),
client: client,
consumer: consumer,
@@ -137,7 +133,7 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, currentO
select {
case msg := <-messages:
log.Debugf("kafka-cluster: received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key)
mdata.Handle(c.metrics, msg.Value, c.idx)
c.handler.Handle(msg.Value)
currentOffset = msg.Offset
case <-ticker.C:
if startingUp && currentOffset >= bootTimeOffset {
@@ -207,7 +203,7 @@ func (c *NotifierKafka) flush() {
}

// In order to correctly route the saveMessages to the correct partition,
// we cant send them in batches anymore.
// we can't send them in batches anymore.
payload := make([]*sarama.ProducerMessage, 0, len(c.buf))
var pMsg mdata.PersistMessageBatch
for i, msg := range c.buf {
@@ -217,7 +213,7 @@ func (c *NotifierKafka) flush() {
continue
}

def, ok := c.idx.Get(amkey.MKey)
partition, ok := c.handler.PartitionOf(amkey.MKey)
if !ok {
log.Errorf("kafka-cluster: failed to lookup metricDef with id %s", msg.Key)
continue
@@ -234,7 +230,7 @@ func (c *NotifierKafka) flush() {
kafkaMsg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(buf.Bytes()),
Partition: def.Partition,
Partition: partition,
}
payload = append(payload, kafkaMsg)
}