diff --git a/alert/topics.go b/alert/topics.go index 7d2a22220..b34b0618e 100644 --- a/alert/topics.go +++ b/alert/topics.go @@ -11,19 +11,25 @@ import ( ) const ( - // eventBufferSize is the number of events to buffer to each handler per topic. - eventBufferSize = 5000 + // DefaultEventBufferSize is the default number of events to buffer to each handler per topic. + DefaultEventBufferSize = 5000 + MinimumEventBufferSize = 1000 ) type Topics struct { - mu sync.RWMutex - - topics map[string]*Topic + mu sync.RWMutex + eventBufferSize int + topics map[string]*Topic } -func NewTopics() *Topics { +// NewTopics creates a new Topics struct with a minimum bufferSize of 500. +func NewTopics(bufferSize int) *Topics { + if bufferSize < MinimumEventBufferSize { + bufferSize = DefaultEventBufferSize + } s := &Topics{ - topics: make(map[string]*Topic), + eventBufferSize: bufferSize, + topics: make(map[string]*Topic), } return s } @@ -54,7 +60,7 @@ func (s *Topics) RestoreTopic(id string, eventStates map[string]EventState) { defer s.mu.Unlock() t, ok := s.topics[id] if !ok { - t = newTopic(id) + t = s.newTopic(id) s.topics[id] = t } t.restoreEventStates(eventStates) @@ -65,7 +71,7 @@ func (s *Topics) UpdateEvent(id string, event EventState) { defer s.mu.Unlock() t, ok := s.topics[id] if !ok { - t = newTopic(id) + t = s.newTopic(id) s.topics[id] = t } t.updateEvent(event) @@ -93,7 +99,7 @@ func (s *Topics) Collect(event Event) error { // Check again if the topic was created, now that we have the write lock topic = s.topics[event.Topic] if topic == nil { - topic = newTopic(event.Topic) + topic = s.newTopic(event.Topic) s.topics[event.Topic] = topic } s.mu.Unlock() @@ -122,7 +128,7 @@ func (s *Topics) RegisterHandler(topic string, h Handler) { t, ok := s.topics[topic] if !ok { - t = newTopic(topic) + t = s.newTopic(topic) s.topics[topic] = t } t.addHandler(h) @@ -147,7 +153,7 @@ func (s *Topics) ReplaceHandler(topic string, oldH, newH Handler) { t, ok := s.topics[topic] if !ok { - t = newTopic(topic) + t = s.newTopic(topic) s.topics[topic] = t } @@ -185,12 +191,11 @@ func PatternMatch(pattern, id string) bool { } type Topic struct { - id string - - mu sync.RWMutex - - events map[string]*EventState - sorted []*EventState + id string + mu sync.RWMutex + bufferLength int + events map[string]*EventState + sorted []*EventState collected *expvar.Int statsKey string @@ -198,11 +203,12 @@ type Topic struct { handlers []*bufHandler } -func newTopic(id string) *Topic { +func (s *Topics) newTopic(id string) *Topic { t := &Topic{ - id: id, - events: make(map[string]*EventState), - collected: new(expvar.Int), + id: id, + events: make(map[string]*EventState), + collected: new(expvar.Int), + bufferLength: s.eventBufferSize, } statsKey, statsMap := vars.NewStatistic("topics", map[string]string{ "id": id, @@ -240,7 +246,7 @@ func (t *Topic) addHandler(h Handler) { return } } - hdlr := newHandler(h) + hdlr := newHandler(h, t.bufferLength) t.handlers = append(t.handlers, hdlr) } @@ -385,10 +391,13 @@ type bufHandler struct { wg sync.WaitGroup } -func newHandler(h Handler) *bufHandler { +func newHandler(h Handler, bufferSize int) *bufHandler { + if bufferSize < MinimumEventBufferSize { + bufferSize = DefaultEventBufferSize + } hdlr := &bufHandler{ h: h, - events: make(chan Event, eventBufferSize), + events: make(chan Event, bufferSize), aborting: make(chan struct{}), } hdlr.wg.Add(1) diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index ec79dfc52..9e9b60cb3 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -13865,7 +13865,7 @@ func createTaskMaster(name string) (*kapacitor.TaskMaster, error) { tm.TaskStore = taskStore{} tm.DeadmanService = deadman{} tm.HTTPPostService, _ = httppost.NewService(nil, diagService.NewHTTPPostHandler()) - as := alertservice.NewService(diagService.NewAlertServiceHandler(), nil) + as := alertservice.NewService(diagService.NewAlertServiceHandler(), nil, 0) as.StorageService = storagetest.New() as.HTTPDService = httpdService if err := as.Open(); err != nil { diff --git a/server/server.go b/server/server.go index 26979dc60..23ac1c21d 100644 --- a/server/server.go +++ b/server/server.go @@ -404,7 +404,7 @@ func (s *Server) appendConfigOverrideService() { func (s *Server) initAlertService() { d := s.DiagService.NewAlertServiceHandler() - srv := alert.NewService(d, s.DisabledHandlers) + srv := alert.NewService(d, s.DisabledHandlers, s.config.Alert.TopicBufferLength) srv.Commander = s.Commander srv.HTTPDService = s.HTTPDService diff --git a/services/alert/config.go b/services/alert/config.go index 485611dac..dbf3979b6 100644 --- a/services/alert/config.go +++ b/services/alert/config.go @@ -4,6 +4,7 @@ import ( "time" "github.com/influxdata/influxdb/toml" + "github.com/influxdata/kapacitor/alert" ) const ( @@ -12,12 +13,14 @@ const ( type Config struct { // Whether we persist the alert topics to BoltDB or not - PersistTopics bool `toml:"persist-topics"` + PersistTopics bool `toml:"persist-topics"` + TopicBufferLength int `toml:"topic-buffer-length"` } func NewConfig() Config { return Config{ - PersistTopics: true, + PersistTopics: true, + TopicBufferLength: alert.DefaultEventBufferSize, } } diff --git a/services/alert/service.go b/services/alert/service.go index 9497ecca6..245345e11 100644 --- a/services/alert/service.go +++ b/services/alert/service.go @@ -154,12 +154,12 @@ type Service struct { } } -func NewService(d Diagnostic, disabled map[string]struct{}) *Service { +func NewService(d Diagnostic, disabled map[string]struct{}, topicBufLen int) *Service { s := &Service{ disabled: disabled, handlers: make(map[string]map[string]handler), closedTopics: make(map[string]bool), - topics: alert.NewTopics(), + topics: alert.NewTopics(topicBufLen), diag: d, inhibitorLookup: alert.NewInhibitorLookup(), }