Skip to content

Commit

Permalink
[Heartbeat] Reimplement Scheduler / Fix recurring tasks with limit (e…
Browse files Browse the repository at this point in the history
…lastic#14569)

Fixes elastic#14567

Took the opportunity to re-implement the heartbeat scheduler with an eye toward clarity as the original code was tricky to debug. While there are more LOC here, there are also additional tests and some additional features that made sense to include as well. The net result IMHO is a reduction in complexity.

This changes the behavior to start monitors using the @every X syntax immediately, rather than waiting X duration. So, we'll call this both an enhancement and a bugfix.

Structurally the code now relies more heavily on the go runtime scheduler, in combination with a new throttler package which helps constrain the maximum number of executing goroutines run by the scheduler in an efficient way. There may be another way to build the throttler using buffered go routines, but I'm on the fence whether reworking it is worth it in any way.
  • Loading branch information
andrewvc authored Dec 2, 2019
1 parent 376eb7c commit fa63445
Show file tree
Hide file tree
Showing 16 changed files with 658 additions and 405 deletions.
4 changes: 3 additions & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"time"

"github.com/elastic/beats/heartbeat/hbregistry"

"github.com/pkg/errors"

"github.com/elastic/beats/heartbeat/config"
Expand Down Expand Up @@ -63,7 +65,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return nil, err
}

scheduler := scheduler.NewWithLocation(limit, location)
scheduler := scheduler.NewWithLocation(limit, hbregistry.SchedulerRegistry, location)

bt := &Heartbeat{
done: make(chan struct{}),
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Config struct {

// Scheduler defines the syntax of a heartbeat.yml scheduler block.
type Scheduler struct {
Limit uint `config:"limit" validate:"min=0"`
Limit int64 `config:"limit" validate:"min=0"`
Location string `config:"location"`
}

Expand Down
29 changes: 29 additions & 0 deletions heartbeat/hbregistry/hbregistry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package hbregistry

import "github.com/elastic/beats/libbeat/monitoring"

// StatsRegistry contains a singleton instance of the heartbeat stats registry
var StatsRegistry = monitoring.Default.NewRegistry("heartbeat")

// SchedulerRegistry holds scheduler stats
var SchedulerRegistry = StatsRegistry.NewRegistry("scheduler")

// TelemetryRegistry contains a singleton instance of the heartbeat telemetry registry
var TelemetryRegistry = monitoring.GetNamespace("state").GetRegistry().NewRegistry("heartbeat")
8 changes: 5 additions & 3 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/monitoring"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -34,7 +36,7 @@ func TestMonitor(t *testing.T) {
reg := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.New(1)
sched := scheduler.New(1, monitoring.NewRegistry())
err := sched.Start()
require.NoError(t, err)
defer sched.Stop()
Expand Down Expand Up @@ -82,7 +84,7 @@ func TestDuplicateMonitorIDs(t *testing.T) {
reg := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.New(1)
sched := scheduler.New(1, monitoring.NewRegistry())
err := sched.Start()
require.NoError(t, err)
defer sched.Stop()
Expand Down Expand Up @@ -111,7 +113,7 @@ func TestCheckInvalidConfig(t *testing.T) {
reg := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.New(1)
sched := scheduler.New(1, monitoring.NewRegistry())
err := sched.Start()
require.NoError(t, err)
defer sched.Stop()
Expand Down
11 changes: 4 additions & 7 deletions heartbeat/monitors/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"sort"
"strings"

"github.com/elastic/beats/heartbeat/hbregistry"
"github.com/elastic/beats/heartbeat/monitors/jobs"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/plugin"
)

Expand All @@ -38,19 +38,16 @@ type pluginBuilder struct {

var pluginKey = "heartbeat.monitor"

var statsRegistry = monitoring.Default.NewRegistry("heartbeat")
var stateRegistry = monitoring.GetNamespace("state").GetRegistry().NewRegistry("heartbeat")

// stateGlobalRecorder records statistics across all plugin types
var stateGlobalRecorder = newRootGaugeRecorder(stateRegistry)
var stateGlobalRecorder = newRootGaugeRecorder(hbregistry.TelemetryRegistry)

func statsForPlugin(pluginName string) registryRecorder {
return multiRegistryRecorder{
recorders: []registryRecorder{
// state (telemetry)
newPluginGaugeRecorder(pluginName, stateRegistry),
newPluginGaugeRecorder(pluginName, hbregistry.TelemetryRegistry),
// Record global monitors / endpoints count
newPluginCountersRecorder(pluginName, statsRegistry),
newPluginCountersRecorder(pluginName, hbregistry.StatsRegistry),
// When stats for this plugin are updated, update the global stats as well
stateGlobalRecorder,
},
Expand Down
9 changes: 4 additions & 5 deletions heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monitors

import (
"context"
"fmt"

"github.com/pkg/errors"
Expand All @@ -32,16 +33,14 @@ import (
"github.com/elastic/beats/libbeat/processors"
)

type taskCanceller func() error

// configuredJob represents a job combined with its config and any
// subsequent processors.
type configuredJob struct {
job jobs.Job
config jobConfig
monitor *Monitor
processors *processors.Processors
cancelFn taskCanceller
cancelFn context.CancelFunc
client beat.Client
}

Expand Down Expand Up @@ -89,7 +88,7 @@ func (e ProcessorsError) Error() string {
}

func (t *configuredJob) prepareSchedulerJob(job jobs.Job) scheduler.TaskFunc {
return func() []scheduler.TaskFunc {
return func(_ context.Context) []scheduler.TaskFunc {
return runPublishJob(job, t.client)
}
}
Expand Down Expand Up @@ -177,7 +176,7 @@ func runPublishJob(job jobs.Job, client beat.Client) []scheduler.TaskFunc {
// Without this only the last continuation will be executed len(conts) times
localCont := cont

contTasks[i] = func() []scheduler.TaskFunc {
contTasks[i] = func(_ context.Context) []scheduler.TaskFunc {
return runPublishJob(localCont, client)
}
}
Expand Down
3 changes: 2 additions & 1 deletion heartbeat/monitors/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monitors

import (
"context"
"testing"

"github.com/elastic/go-lookslike/validator"
Expand Down Expand Up @@ -102,7 +103,7 @@ func Test_runPublishJob(t *testing.T) {
}
tf := queue[0]
queue = queue[1:]
conts := tf()
conts := tf(context.Background())
for _, cont := range conts {
queue = append(queue, cont)
}
Expand Down
32 changes: 4 additions & 28 deletions heartbeat/scheduler/util.go → heartbeat/scheduler/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,8 @@
// specific language governing permissions and limitations
// under the License.

/*
Package scheduler lets you run multi-stage tasks on a regular interval. These tasks are single functions that may spawn
an arbitrary number of continuations.
*/
package scheduler

import "sort"

type timeOrd []*job

func sortEntries(es []*job) {
sort.Sort(timeOrd(es))
}

func (b timeOrd) Len() int {
return len(b)
}

func (b timeOrd) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}

// Less reports `earliest` time i should sort before j.
// zero time is not `earliest` time.
func (b timeOrd) Less(i, j int) bool {
if b[i].next.IsZero() {
return false
}
if b[j].next.IsZero() {
return true
}
return b[i].next.Before(b[j].next)
}
5 changes: 5 additions & 0 deletions heartbeat/scheduler/schedule/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ func (s *Schedule) Unpack(str string) error {
}
return err
}

// RunOnInit returns false for interval schedulers.
func (s *Schedule) RunOnInit() bool {
return false
}
5 changes: 5 additions & 0 deletions heartbeat/scheduler/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type intervalScheduler struct {
interval time.Duration
}

// RunOnInit returns true for interval schedulers.
func (s intervalScheduler) RunOnInit() bool {
return true
}

func Parse(in string) (*Schedule, error) {
every := "@every"

Expand Down
Loading

0 comments on commit fa63445

Please sign in to comment.