Skip to content

Commit 10194f7

Browse files
owen-dashwanthgoli
andauthored
feat(block-scheduler): status page shows completed jobs (#15580)
Signed-off-by: Owen Diehl <ow.diehl@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com>
1 parent 780173a commit 10194f7

7 files changed

+225
-0
lines changed

pkg/blockbuilder/scheduler/priority_queue.go

+20
Original file line numberDiff line numberDiff line change
@@ -210,3 +210,23 @@ func (b *CircularBuffer[V]) Lookup(f func(V) bool) (V, bool) {
210210
var zero V
211211
return zero, false
212212
}
213+
214+
// Range iterates over the elements in the buffer from oldest to newest
215+
// and calls the given function for each element.
216+
// If the function returns false, iteration stops.
217+
func (b *CircularBuffer[V]) Range(f func(V) bool) {
218+
if b.size == 0 {
219+
return
220+
}
221+
222+
// Start from head (oldest) and iterate to tail (newest)
223+
idx := b.head
224+
remaining := b.size
225+
for remaining > 0 {
226+
if !f(b.buffer[idx]) {
227+
return
228+
}
229+
idx = (idx + 1) % len(b.buffer)
230+
remaining--
231+
}
232+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package scheduler
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestCircularBuffer_Range(t *testing.T) {
10+
tests := []struct {
11+
name string
12+
capacity int
13+
input []int
14+
want []int
15+
}{
16+
{
17+
name: "empty buffer",
18+
capacity: 3,
19+
input: []int{},
20+
want: []int{},
21+
},
22+
{
23+
name: "partially filled buffer",
24+
capacity: 3,
25+
input: []int{1, 2},
26+
want: []int{1, 2},
27+
},
28+
{
29+
name: "full buffer",
30+
capacity: 3,
31+
input: []int{1, 2, 3},
32+
want: []int{1, 2, 3},
33+
},
34+
{
35+
name: "buffer with eviction",
36+
capacity: 3,
37+
input: []int{1, 2, 3, 4, 5},
38+
want: []int{3, 4, 5}, // oldest elements (1,2) were evicted
39+
},
40+
{
41+
name: "buffer with multiple evictions",
42+
capacity: 2,
43+
input: []int{1, 2, 3, 4, 5},
44+
want: []int{4, 5}, // only newest elements remain
45+
},
46+
}
47+
48+
for _, tt := range tests {
49+
t.Run(tt.name, func(t *testing.T) {
50+
// Create and fill buffer
51+
buf := NewCircularBuffer[int](tt.capacity)
52+
for _, v := range tt.input {
53+
buf.Push(v)
54+
}
55+
56+
// Use Range to collect elements
57+
got := make([]int, 0)
58+
buf.Range(func(v int) bool {
59+
got = append(got, v)
60+
return true
61+
})
62+
63+
require.Equal(t, tt.want, got, "Range should iterate in order from oldest to newest")
64+
})
65+
}
66+
}
67+
68+
func TestCircularBuffer_Range_EarlyStop(t *testing.T) {
69+
buf := NewCircularBuffer[int](5)
70+
for i := 1; i <= 5; i++ {
71+
buf.Push(i)
72+
}
73+
74+
var got []int
75+
buf.Range(func(v int) bool {
76+
got = append(got, v)
77+
return v != 3 // stop after seeing 3
78+
})
79+
80+
require.Equal(t, []int{1, 2, 3}, got, "Range should stop when function returns false")
81+
}

pkg/blockbuilder/scheduler/queue.go

+12
Original file line numberDiff line numberDiff line change
@@ -382,3 +382,15 @@ func (q *JobQueue) ListInProgressJobs() []JobWithMetadata {
382382
}
383383
return jobs
384384
}
385+
386+
func (q *JobQueue) ListCompletedJobs() []JobWithMetadata {
387+
q.mu.RLock()
388+
defer q.mu.RUnlock()
389+
390+
jobs := make([]JobWithMetadata, 0, q.completed.Len())
391+
q.completed.Range(func(job *JobWithMetadata) bool {
392+
jobs = append(jobs, *job)
393+
return true
394+
})
395+
return jobs
396+
}

pkg/blockbuilder/scheduler/status.go

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.F
2020
type jobQueue interface {
2121
ListPendingJobs() []JobWithMetadata
2222
ListInProgressJobs() []JobWithMetadata
23+
ListCompletedJobs() []JobWithMetadata
2324
}
2425

2526
type offsetReader interface {
@@ -63,12 +64,14 @@ func (h *statusPageHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
6364
data := struct {
6465
PendingJobs []JobWithMetadata
6566
InProgressJobs []JobWithMetadata
67+
CompletedJobs []JobWithMetadata
6668
Now time.Time
6769
PartitionInfo []partitionInfo
6870
}{
6971
Now: time.Now(),
7072
PendingJobs: pendingJobs,
7173
InProgressJobs: inProgressJobs,
74+
CompletedJobs: h.jobQueue.ListCompletedJobs(),
7275
}
7376

7477
for _, partitionOffset := range offsets {

pkg/blockbuilder/scheduler/status.gohtml

+32
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
</thead>
2323
<tbody>
2424
{{ range $i, $job := .PendingJobs }}
25+
<tr>
2526
<td>{{ .ID }}</td>
2627
<td>{{ .Priority }}</td>
2728
<td>{{ .Partition }}</td>
@@ -47,6 +48,7 @@
4748
</thead>
4849
<tbody>
4950
{{ range $i, $job := .InProgressJobs }}
51+
<tr>
5052
<td>{{ .ID }}</td>
5153
<td>{{ .Priority }}</td>
5254
<td>{{ .Partition }}</td>
@@ -58,6 +60,35 @@
5860
{{ end }}
5961
</tbody>
6062
</table>
63+
<h2>Completed Jobs</h2>
64+
<table width="100%" border="1">
65+
<thead>
66+
<tr>
67+
<th>ID</th>
68+
<th>Priority</th>
69+
<th>Partition</th>
70+
<th>Start Offset</th>
71+
<th>End Offset</th>
72+
<th>Status</th>
73+
<th>Start Timestamp</th>
74+
<th>Completion Timestamp</th>
75+
</tr>
76+
</thead>
77+
<tbody>
78+
{{ range $i, $job := .CompletedJobs }}
79+
<tr>
80+
<td>{{ .ID }}</td>
81+
<td>{{ .Priority }}</td>
82+
<td>{{ .Partition }}</td>
83+
<td>{{ .Offsets.Min }}</td>
84+
<td>{{ .Offsets.Max }}</td>
85+
<td>{{ .Status }}</td>
86+
<td>{{ .StartTime | durationSince }} ago ({{ .StartTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
87+
<td>{{ .UpdateTime | durationSince }} ago ({{ .UpdateTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
88+
</tr>
89+
{{ end }}
90+
</tbody>
91+
</table>
6192
<h3>Partition Lag</h2>
6293
<table width="100%" border="1">
6394
<thead>
@@ -70,6 +101,7 @@
70101
</thead>
71102
<tbody>
72103
{{ range .PartitionInfo }}
104+
<tr>
73105
<td>{{ .Partition }}</td>
74106
<td>{{ .Lag }}</td>
75107
<td>{{ .EndOffset }}</td>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
//go:build preview
2+
3+
package scheduler
4+
5+
import (
6+
"fmt"
7+
"net/http/httptest"
8+
"testing"
9+
"time"
10+
11+
"github.com/twmb/franz-go/pkg/kadm"
12+
13+
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
14+
)
15+
16+
// TestPreview is a utility test that runs a local server with the status page.
17+
// Run it with: go test -tags=preview -v -run TestPreview
18+
func TestPreview(t *testing.T) {
19+
// Setup mock data with varied timestamps
20+
now := time.Now()
21+
mockLister := &mockQueueLister{
22+
pendingJobs: []JobWithMetadata{
23+
{Job: types.NewJob(11, types.Offsets{Min: 11, Max: 20}), UpdateTime: now.Add(-2 * time.Hour), Priority: 23},
24+
{Job: types.NewJob(22, types.Offsets{Min: 21, Max: 30}), UpdateTime: now.Add(-1 * time.Hour), Priority: 42},
25+
{Job: types.NewJob(33, types.Offsets{Min: 22, Max: 40}), UpdateTime: now.Add(-30 * time.Minute), Priority: 11},
26+
},
27+
inProgressJobs: []JobWithMetadata{
28+
{Job: types.NewJob(44, types.Offsets{Min: 1, Max: 10}), StartTime: now.Add(-4 * time.Hour), UpdateTime: now.Add(-3 * time.Hour)},
29+
{Job: types.NewJob(55, types.Offsets{Min: 11, Max: 110}), StartTime: now.Add(-5 * time.Hour), UpdateTime: now.Add(-4 * time.Hour)},
30+
},
31+
completedJobs: []JobWithMetadata{
32+
{Job: types.NewJob(66, types.Offsets{Min: 1, Max: 50}), StartTime: now.Add(-8 * time.Hour), UpdateTime: now.Add(-7 * time.Hour), Status: types.JobStatusComplete},
33+
{Job: types.NewJob(77, types.Offsets{Min: 51, Max: 100}), StartTime: now.Add(-6 * time.Hour), UpdateTime: now.Add(-5 * time.Hour), Status: types.JobStatusComplete},
34+
{Job: types.NewJob(88, types.Offsets{Min: 101, Max: 150}), StartTime: now.Add(-4 * time.Hour), UpdateTime: now.Add(-3 * time.Hour), Status: types.JobStatusFailed},
35+
{Job: types.NewJob(99, types.Offsets{Min: 151, Max: 200}), StartTime: now.Add(-2 * time.Hour), UpdateTime: now.Add(-1 * time.Hour), Status: types.JobStatusComplete},
36+
},
37+
}
38+
39+
mockReader := &mockOffsetReader{
40+
groupLag: map[int32]kadm.GroupMemberLag{
41+
0: {
42+
Lag: 10,
43+
Partition: 3,
44+
End: kadm.ListedOffset{Offset: 100},
45+
Commit: kadm.Offset{At: 90},
46+
},
47+
1: {
48+
Lag: 0,
49+
Partition: 1,
50+
End: kadm.ListedOffset{Offset: 100},
51+
Commit: kadm.Offset{At: 100},
52+
},
53+
2: {
54+
Lag: 233,
55+
Partition: 2,
56+
End: kadm.ListedOffset{Offset: 333},
57+
Commit: kadm.Offset{At: 100},
58+
},
59+
},
60+
}
61+
62+
handler := newStatusPageHandler(mockLister, mockReader, time.Hour)
63+
64+
// Start local server
65+
server := httptest.NewServer(handler)
66+
defer server.Close()
67+
68+
fmt.Printf("\n\n=== Preview server running ===\nOpen this URL in your browser:\n%s\nPress Ctrl+C to stop the server\n\n", server.URL)
69+
70+
// Keep server running until interrupted
71+
select {}
72+
}

pkg/blockbuilder/scheduler/status_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
type mockQueueLister struct {
1616
pendingJobs []JobWithMetadata
1717
inProgressJobs []JobWithMetadata
18+
completedJobs []JobWithMetadata
1819
}
1920

2021
func (m *mockQueueLister) ListPendingJobs() []JobWithMetadata {
@@ -25,6 +26,10 @@ func (m *mockQueueLister) ListInProgressJobs() []JobWithMetadata {
2526
return m.inProgressJobs
2627
}
2728

29+
func (m *mockQueueLister) ListCompletedJobs() []JobWithMetadata {
30+
return m.completedJobs
31+
}
32+
2833
func TestStatusPageHandler_ServeHTTP(t *testing.T) {
2934
t.Skip("skipping. only added to inspect the generated status page.")
3035

0 commit comments

Comments
 (0)