diff --git a/heartbeat/_meta/config/beat.reference.yml.tmpl b/heartbeat/_meta/config/beat.reference.yml.tmpl index d4ad845228aa..e7bcf3d0eaa3 100644 --- a/heartbeat/_meta/config/beat.reference.yml.tmpl +++ b/heartbeat/_meta/config/beat.reference.yml.tmpl @@ -248,3 +248,14 @@ heartbeat.scheduler: # Set the scheduler it's time zone #location: '' + +heartbeat.jobs: + # Limit the number of concurrent monitors executed by heartbeat. This differs from + # heartbeat.scheduler.limit in that it maps to individual monitors rather than the + # subtasks of monitors. For non-browser monitors a subtask usually corresponds to a + # single file descriptor. + # This feature is most useful for the browser type + #browser.limit: 1 + #http.limit: 10 + #tcp.limit: 10 + #icmp.limit: 10 \ No newline at end of file diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index dac3b877fd95..3ae0ed002bcb 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -249,6 +249,16 @@ heartbeat.scheduler: # Set the scheduler it's time zone #location: '' +heartbeat.jobs: + # Limit the number of concurrent monitors executed by heartbeat. This differs from + # heartbeat.scheduler.limit in that it maps to individual monitors rather than the + # subtasks of monitors. For non-browser monitors a subtask usually corresponds to a + # single file descriptor. + # This feature is most useful for the browser type + #browser.limit: 1 + #http.limit: 10 + #tcp.limit: 10 + #icmp.limit: 10 # ================================== General =================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/heartbeat/scheduler/schedjob.go b/heartbeat/scheduler/schedjob.go new file mode 100644 index 000000000000..dc112ec03609 --- /dev/null +++ b/heartbeat/scheduler/schedjob.go @@ -0,0 +1,124 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package scheduler + +import ( + "context" + "sync" + "time" + + "golang.org/x/sync/semaphore" + + "github.com/elastic/beats/v7/libbeat/common/atomic" +) + +type schedJob struct { + id string + ctx context.Context + scheduler *Scheduler + wg *sync.WaitGroup + entrypoint TaskFunc + jobLimitSem *semaphore.Weighted + activeTasks atomic.Int +} + +// runRecursiveJob runs the entry point for a job, blocking until all subtasks are completed. +// Subtasks are run in separate goroutines. +// returns the time execution began on its first task +func newSchedJob(ctx context.Context, s *Scheduler, id string, jobType string, task TaskFunc) *schedJob { + return &schedJob{ + id: id, + ctx: ctx, + scheduler: s, + jobLimitSem: s.jobLimitSem[jobType], + entrypoint: task, + activeTasks: atomic.MakeInt(0), + wg: &sync.WaitGroup{}, + } +} + +// runRecursiveTask runs an individual task and its continuations until none are left with as much parallelism as possible. +// Since task funcs can emit continuations recursively we need a function to execute +// recursively. +// The wait group passed into this function expects to already have its count incremented by one. +func (sj *schedJob) run() (startedAt time.Time) { + sj.wg.Add(1) + sj.activeTasks.Inc() + if sj.jobLimitSem != nil { + sj.jobLimitSem.Acquire(sj.ctx, 1) + } + + startedAt = sj.runTask(sj.entrypoint) + + sj.wg.Wait() + return startedAt +} + +// runRecursiveTask runs an individual task and its continuations until none are left with as much parallelism as possible. +// Since task funcs can emit continuations recursively we need a function to execute +// recursively. +// The wait group passed into this function expects to already have its count incremented by one. +func (sj *schedJob) runTask(task TaskFunc) time.Time { + defer sj.wg.Done() + defer sj.activeTasks.Dec() + + // The accounting for waiting/active tasks is done using atomics. + // Absolute accuracy is not critical here so the gap between modifying waitingTasks and activeJobs is acceptable. + sj.scheduler.stats.waitingTasks.Inc() + + // Acquire an execution slot in keeping with heartbeat.scheduler.limit + // this should block until resources are available. + // In the case where the semaphore has free resources immediately + // it will not block and will not check the cancelled status of the + // context, which is OK, because we check it later anyway. + limitErr := sj.scheduler.limitSem.Acquire(sj.ctx, 1) + sj.scheduler.stats.waitingTasks.Dec() + if limitErr == nil { + defer sj.scheduler.limitSem.Release(1) + } + + // Record the time this task started now that we have a resource to execute with + startedAt := time.Now() + + // Check if the scheduler has been shut down. If so, exit early + select { + case <-sj.ctx.Done(): + return startedAt + default: + sj.scheduler.stats.activeTasks.Inc() + + continuations := task(sj.ctx) + sj.scheduler.stats.activeTasks.Dec() + + sj.wg.Add(len(continuations)) + sj.activeTasks.Add(len(continuations)) + for _, cont := range continuations { + // Run continuations in parallel, note that these each will acquire their own slots + // We can discard the started at times for continuations as those are + // irrelevant + go sj.runTask(cont) + } + // There is always at least 1 task (the current one), if that's all, then we know + // there are no other jobs active or pending, and we can release the jobLimitSem + if sj.jobLimitSem != nil && sj.activeTasks.Load() == 1 { + sj.jobLimitSem.Release(1) + } + } + + return startedAt +} diff --git a/heartbeat/scheduler/schedjob_test.go b/heartbeat/scheduler/schedjob_test.go new file mode 100644 index 000000000000..48f4bf5a18b6 --- /dev/null +++ b/heartbeat/scheduler/schedjob_test.go @@ -0,0 +1,122 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package scheduler + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/heartbeat/config" + batomic "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/monitoring" +) + +func TestSchedJobRun(t *testing.T) { + cancelledCtx, cancel := context.WithCancel(context.Background()) + cancel() + + testCases := []struct { + name string + jobCtx context.Context + overLimit bool + shouldRunTask bool + }{ + { + "context not cancelled", + context.Background(), + false, + true, + }, + { + "context cancelled", + cancelledCtx, + false, + false, + }, + { + "context cancelled over limit", + cancelledCtx, + true, + false, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + limit := int64(100) + s := NewWithLocation(limit, monitoring.NewRegistry(), tarawaTime(), nil) + + if testCase.overLimit { + s.limitSem.Acquire(context.Background(), limit) + } + + wg := &sync.WaitGroup{} + wg.Add(1) + executed := batomic.MakeBool(false) + + tf := func(ctx context.Context) []TaskFunc { + executed.Store(true) + return nil + } + + beforeStart := time.Now() + sj := newSchedJob(testCase.jobCtx, s, "myid", "atype", tf) + startedAt := sj.run() + + // This will panic in the case where we don't check s.limitSem.Acquire + // for an error value and released an unacquired resource in scheduler.go. + // In that case this will release one more resource than allowed causing + // the panic. + if testCase.overLimit { + s.limitSem.Release(limit) + } + + require.Equal(t, testCase.shouldRunTask, executed.Load()) + require.True(t, startedAt.Equal(beforeStart) || startedAt.After(beforeStart)) + }) + } +} + +// testRecursiveForkingJob tests that a schedJob that splits into multiple parallel pieces executes without error +func TestRecursiveForkingJob(t *testing.T) { + s := NewWithLocation(1000, monitoring.NewRegistry(), tarawaTime(), map[string]config.JobLimit{ + "atype": {Limit: 1}, + }) + ran := batomic.NewInt(0) + + var terminalTf TaskFunc = func(ctx context.Context) []TaskFunc { + ran.Inc() + return nil + } + var forkingTf TaskFunc = func(ctx context.Context) []TaskFunc { + ran.Inc() + return []TaskFunc{ + terminalTf, terminalTf, terminalTf, + } + } + + sj := newSchedJob(context.Background(), s, "myid", "atype", forkingTf) + + sj.run() + require.Equal(t, 4, ran.Load()) + +} diff --git a/heartbeat/scheduler/scheduler.go b/heartbeat/scheduler/scheduler.go index 55731dd7d11f..3b6d34fa5795 100644 --- a/heartbeat/scheduler/scheduler.go +++ b/heartbeat/scheduler/scheduler.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "math" - "sync" "time" "golang.org/x/sync/semaphore" @@ -207,7 +206,8 @@ func (s *Scheduler) Add(sched Schedule, id string, entrypoint TaskFunc, jobType default: } s.stats.activeJobs.Inc() - lastRanAt = s.runRecursiveJob(jobCtx, entrypoint, jobType) + debugf("Job '%s' started", id) + lastRanAt := newSchedJob(jobCtx, s, id, jobType, entrypoint).run() s.stats.activeJobs.Dec() s.runOnce(sched.Next(lastRanAt), taskFn) debugf("Job '%v' returned at %v", id, time.Now()) @@ -241,68 +241,3 @@ func (s *Scheduler) runOnce(runAt time.Time, taskFn timerqueue.TimerTaskFn) { asyncTask := func(now time.Time) { go taskFn(now) } s.timerQueue.Push(runAt, asyncTask) } - -// runRecursiveJob runs the entry point for a job, blocking until all subtasks are completed. -// Subtasks are run in separate goroutines. -// returns the time execution began on its first task -func (s *Scheduler) runRecursiveJob(jobCtx context.Context, task TaskFunc, jobType string) (startedAt time.Time) { - wg := &sync.WaitGroup{} - jobSem := s.jobLimitSem[jobType] - if jobSem != nil { - jobSem.Acquire(jobCtx, 1) - } - wg.Add(1) - startedAt = s.runRecursiveTask(jobCtx, task, wg, jobSem) - wg.Wait() - return startedAt -} - -// runRecursiveTask runs an individual task and its continuations until none are left with as much parallelism as possible. -// Since task funcs can emit continuations recursively we need a function to execute -// recursively. -// The wait group passed into this function expects to already have its count incremented by one. -func (s *Scheduler) runRecursiveTask(jobCtx context.Context, task TaskFunc, wg *sync.WaitGroup, jobSem *semaphore.Weighted) (startedAt time.Time) { - defer wg.Done() - - // The accounting for waiting/active tasks is done using atomics. - // Absolute accuracy is not critical here so the gap between modifying waitingTasks and activeJobs is acceptable. - s.stats.waitingTasks.Inc() - - // Acquire an execution slot in keeping with heartbeat.scheduler.limit - // this should block until resources are available. - // In the case where the semaphore has free resources immediately - // it will not block and will not check the cancelled status of the - // context, which is OK, because we check it later anyway. - limitErr := s.limitSem.Acquire(jobCtx, 1) - s.stats.waitingTasks.Dec() - if limitErr == nil { - defer s.limitSem.Release(1) - } - - // Record the time this task started now that we have a resource to execute with - startedAt = time.Now() - - // Check if the scheduler has been shut down. If so, exit early - select { - case <-jobCtx.Done(): - return startedAt - default: - s.stats.activeTasks.Inc() - - continuations := task(jobCtx) - s.stats.activeTasks.Dec() - - wg.Add(len(continuations)) - for _, cont := range continuations { - // Run continuations in parallel, note that these each will acquire their own slots - // We can discard the started at times for continuations as those are - // irrelevant - go s.runRecursiveTask(jobCtx, cont, wg, jobSem) - } - if jobSem != nil && len(continuations) == 0 { - jobSem.Release(1) - } - } - - return startedAt -} diff --git a/heartbeat/scheduler/scheduler_test.go b/heartbeat/scheduler/scheduler_test.go index 0b699c2a7782..61c062184a40 100644 --- a/heartbeat/scheduler/scheduler_test.go +++ b/heartbeat/scheduler/scheduler_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/heartbeat/config" - batomic "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/monitoring" ) @@ -177,71 +176,6 @@ func TestScheduler_Stop(t *testing.T) { assert.Equal(t, ErrAlreadyStopped, err) } -func TestScheduler_runRecursiveTask(t *testing.T) { - cancelledCtx, cancel := context.WithCancel(context.Background()) - cancel() - - testCases := []struct { - name string - jobCtx context.Context - overLimit bool - shouldRunTask bool - }{ - { - "context not cancelled", - context.Background(), - false, - true, - }, - { - "context cancelled", - cancelledCtx, - false, - false, - }, - { - "context cancelled over limit", - cancelledCtx, - true, - false, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - limit := int64(100) - s := NewWithLocation(limit, monitoring.NewRegistry(), tarawaTime(), nil) - - if testCase.overLimit { - s.limitSem.Acquire(context.Background(), limit) - } - - wg := &sync.WaitGroup{} - wg.Add(1) - executed := batomic.MakeBool(false) - - tf := func(ctx context.Context) []TaskFunc { - executed.Store(true) - return nil - } - - beforeStart := time.Now() - startedAt := s.runRecursiveTask(testCase.jobCtx, tf, wg, nil) - - // This will panic in the case where we don't check s.limitSem.Acquire - // for an error value and released an unacquired resource in scheduler.go. - // In that case this will release one more resource than allowed causing - // the panic. - if testCase.overLimit { - s.limitSem.Release(limit) - } - - require.Equal(t, testCase.shouldRunTask, executed.Load()) - require.True(t, startedAt.Equal(beforeStart) || startedAt.After(beforeStart)) - }) - } -} - func makeTasks(num int, callback func()) TaskFunc { return func(ctx context.Context) []TaskFunc { callback() @@ -252,7 +186,7 @@ func makeTasks(num int, callback func()) TaskFunc { } } -func TestScheduler_runRecursiveJob(t *testing.T) { +func TestSchedTaskLimits(t *testing.T) { tests := []struct { name string numJobs int @@ -311,7 +245,8 @@ func TestScheduler_runRecursiveJob(t *testing.T) { taskArr = append(taskArr, num) }) go func(tff TaskFunc) { - s.runRecursiveJob(context.Background(), tff, jobType) + sj := newSchedJob(context.Background(), s, "myid", jobType, tff) + sj.run() wg.Done() }(tf) } diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index dac3b877fd95..3ae0ed002bcb 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -249,6 +249,16 @@ heartbeat.scheduler: # Set the scheduler it's time zone #location: '' +heartbeat.jobs: + # Limit the number of concurrent monitors executed by heartbeat. This differs from + # heartbeat.scheduler.limit in that it maps to individual monitors rather than the + # subtasks of monitors. For non-browser monitors a subtask usually corresponds to a + # single file descriptor. + # This feature is most useful for the browser type + #browser.limit: 1 + #http.limit: 10 + #tcp.limit: 10 + #icmp.limit: 10 # ================================== General =================================== # The name of the shipper that publishes the network data. It can be used to group