Skip to content

Commit

Permalink
Add metrics for how many times a worker is marked as terminating
Browse files Browse the repository at this point in the history
You could already run a query like this to get the number of worker
threads synchronizing against the scheduler:

    buildbarn_builder_in_memory_build_queue_workers_created_total
    -
    sum(buildbarn_builder_in_memory_build_queue_workers_removed_total) without (state)

That said, this expression does not show how many workers are actually
usable for running actions, due to them already being marked as
terminating. One may now use the following query to exclude workers that
are terminating:

    buildbarn_builder_in_memory_build_queue_workers_created_total
    -
    buildbarn_builder_in_memory_build_queue_workers_terminating_total

Relatedly, one can use the folllowing query to obtain the number of
workers that are currently terminating:

    buildbarn_builder_in_memory_build_queue_workers_terminating_total
    -
    sum(buildbarn_builder_in_memory_build_queue_workers_removed_total) without (state)
  • Loading branch information
EdSchouten committed Dec 6, 2023
1 parent 022742f commit b5c0ea4
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion pkg/scheduler/in_memory_build_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ var (
Help: "Number of workers created by Synchronize().",
},
[]string{"instance_name_prefix", "platform", "size_class"})
inMemoryBuildQueueWorkersTerminatingTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "buildbarn",
Subsystem: "builder",
Name: "in_memory_build_queue_workers_terminating_total",
Help: "Number of workers that have entered the terminating state.",
},
[]string{"instance_name_prefix", "platform", "size_class"})
inMemoryBuildQueueWorkersRemovedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "buildbarn",
Expand Down Expand Up @@ -290,6 +298,7 @@ func NewInMemoryBuildQueue(contentAddressableStorage blobstore.BlobAccess, clock
prometheus.MustRegister(inMemoryBuildQueueTasksCompletedDurationSeconds)

prometheus.MustRegister(inMemoryBuildQueueWorkersCreatedTotal)
prometheus.MustRegister(inMemoryBuildQueueWorkersTerminatingTotal)
prometheus.MustRegister(inMemoryBuildQueueWorkersRemovedTotal)

prometheus.MustRegister(inMemoryBuildQueueWorkerInvocationStickinessRetained)
Expand Down Expand Up @@ -1178,7 +1187,7 @@ func (bq *InMemoryBuildQueue) TerminateWorkers(ctx context.Context, request *bui
for _, scq := range bq.sizeClassQueues {
for workerKey, w := range scq.workers {
if workerMatchesPattern(workerKey.getWorkerID(), request.WorkerIdPattern) {
w.terminating = true
scq.markWorkerTerminating(w)
if t := w.currentTask; t != nil {
// The task will be at the
// EXECUTING stage, so it can
Expand Down Expand Up @@ -1381,6 +1390,7 @@ func (pq *platformQueue) addSizeClassQueue(bq *InMemoryBuildQueue, sizeClass uin
tasksCompletedDurationSeconds: inMemoryBuildQueueTasksCompletedDurationSeconds.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr),

workersCreatedTotal: inMemoryBuildQueueWorkersCreatedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr),
workersTerminatingTotal: inMemoryBuildQueueWorkersTerminatingTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr),
workersRemovedIdleTotal: inMemoryBuildQueueWorkersRemovedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "Idle"),
workersRemovedExecutingTotal: inMemoryBuildQueueWorkersRemovedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "Executing"),

Expand Down Expand Up @@ -1469,6 +1479,7 @@ type sizeClassQueue struct {
tasksCompletedDurationSeconds prometheus.Observer

workersCreatedTotal prometheus.Counter
workersTerminatingTotal prometheus.Counter
workersRemovedIdleTotal prometheus.Counter
workersRemovedExecutingTotal prometheus.Counter

Expand Down Expand Up @@ -1523,6 +1534,7 @@ func (scq *sizeClassQueue) remove(bq *InMemoryBuildQueue) {
// the InMemoryBuildQueue.
func (scq *sizeClassQueue) removeStaleWorker(bq *InMemoryBuildQueue, workerKey workerKey, removalTime time.Time) {
w := scq.workers[workerKey]
scq.markWorkerTerminating(w)
if t := w.currentTask; t == nil {
scq.workersRemovedIdleTotal.Inc()
} else {
Expand Down Expand Up @@ -1596,6 +1608,13 @@ func (scq *sizeClassQueue) incrementInvocationsCreatedTotal(depth int) {
scq.invocationsMetrics[depth].createdTotal.Inc()
}

func (scq *sizeClassQueue) markWorkerTerminating(w *worker) {
if !w.terminating {
scq.workersTerminatingTotal.Inc()
w.terminating = true
}
}

// workerKey can be used as a key for maps to uniquely identify a worker
// within the domain of a certain platform. This key is used for looking
// up the state of a worker when synchronizing.
Expand Down

0 comments on commit b5c0ea4

Please sign in to comment.