Skip to content

Commit 3735add

Browse files
committed
fix: reconnect to the logs stream in dashboard after reboot
The log stream displayed in the dashboard was stopping to work when a node was rebooted. Rework the log data source to establish a per-node connection and use a retry loop to always reconnect until the dashboard is terminated. Print the connection errors in the log stream in red color. Closes #8388. Signed-off-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
1 parent 9aa1e1b commit 3735add

File tree

6 files changed

+132
-71
lines changed

6 files changed

+132
-71
lines changed

internal/pkg/dashboard/components/logviewer.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func NewLogViewer() *LogViewer {
2323
}
2424

2525
widget.logs.ScrollToEnd().
26+
SetDynamicColors(true).
2627
SetMaxLines(maxLogLines).
2728
SetText(noData).
2829
SetBorderPadding(0, 0, 1, 1).
@@ -54,7 +55,12 @@ func NewLogViewer() *LogViewer {
5455
}
5556

5657
// WriteLog writes the log line to the widget.
57-
func (widget *LogViewer) WriteLog(logLine string) {
58+
func (widget *LogViewer) WriteLog(logLine, logError string) {
59+
if logError != "" {
60+
logLine = "[red]" + tview.Escape(logError) + "[-]\n"
61+
} else {
62+
logLine = tview.Escape(logLine) + "\n"
63+
}
64+
5865
widget.logs.Write([]byte(logLine)) //nolint:errcheck
59-
widget.logs.Write([]byte("\n")) //nolint:errcheck
6066
}

internal/pkg/dashboard/dashboard.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type ResourceDataListener interface {
7070

7171
// LogDataListener is a listener which is notified when a log line is received.
7272
type LogDataListener interface {
73-
OnLogDataChange(node string, logLine string)
73+
OnLogDataChange(node, logLine, logError string)
7474
}
7575

7676
// NodeSelectListener is a listener which is notified when a node is selected.
@@ -376,10 +376,7 @@ func (d *Dashboard) startDataHandler(ctx context.Context) func() error {
376376
defer d.resourceDataSource.Stop() //nolint:errcheck
377377

378378
// start logs data source
379-
if err := d.logDataSource.Start(ctx); err != nil {
380-
return err
381-
}
382-
379+
d.logDataSource.Start(ctx)
383380
defer d.logDataSource.Stop() //nolint:errcheck
384381

385382
lastLogTime := time.Now()
@@ -393,11 +390,11 @@ func (d *Dashboard) startDataHandler(ctx context.Context) func() error {
393390

394391
if time.Since(lastLogTime) < 50*time.Millisecond {
395392
d.app.QueueUpdate(func() {
396-
d.processLog(nodeAlias, nodeLog.Log)
393+
d.processLog(nodeAlias, nodeLog.Log, nodeLog.Error)
397394
})
398395
} else {
399396
d.app.QueueUpdateDraw(func() {
400-
d.processLog(nodeAlias, nodeLog.Log)
397+
d.processLog(nodeAlias, nodeLog.Log, nodeLog.Error)
401398
})
402399
}
403400

@@ -461,9 +458,9 @@ func (d *Dashboard) processNodeResource(nodeResource resourcedata.Data) {
461458
}
462459

463460
// processLog re-renders the log components with new log data.
464-
func (d *Dashboard) processLog(node, line string) {
461+
func (d *Dashboard) processLog(node, logLine, logError string) {
465462
for _, component := range d.logDataListeners {
466-
component.OnLogDataChange(node, line)
463+
component.OnLogDataChange(node, logLine, logError)
467464
}
468465
}
469466

internal/pkg/dashboard/logdata/logdata.go

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,26 @@ package logdata
88
import (
99
"context"
1010
"errors"
11+
"fmt"
1112
"strings"
1213
"sync"
14+
"time"
1315

1416
"golang.org/x/sync/errgroup"
17+
"google.golang.org/grpc/codes"
18+
"google.golang.org/grpc/status"
1519

1620
"github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers"
21+
"github.com/siderolabs/talos/internal/pkg/dashboard/util"
1722
"github.com/siderolabs/talos/pkg/machinery/api/common"
1823
"github.com/siderolabs/talos/pkg/machinery/client"
1924
)
2025

2126
// Data is a log line from a node.
2227
type Data struct {
23-
Node string
24-
Log string
28+
Node string
29+
Log string
30+
Error string
2531
}
2632

2733
// Source is a data source for Kernel (dmesg) logs.
@@ -45,14 +51,10 @@ func NewSource(client *client.Client) *Source {
4551
}
4652

4753
// Start starts the data source.
48-
func (source *Source) Start(ctx context.Context) error {
49-
var err error
50-
54+
func (source *Source) Start(ctx context.Context) {
5155
source.once.Do(func() {
52-
err = source.start(ctx)
56+
source.start(ctx)
5357
})
54-
55-
return err
5658
}
5759

5860
// Stop stops the data source.
@@ -62,38 +64,70 @@ func (source *Source) Stop() error {
6264
return source.eg.Wait()
6365
}
6466

65-
func (source *Source) start(ctx context.Context) error {
67+
func (source *Source) start(ctx context.Context) {
6668
ctx, source.logCtxCancel = context.WithCancel(ctx)
6769

68-
dmesgStream, err := source.client.Dmesg(ctx, true, false)
69-
if err != nil {
70-
return err
70+
for _, nodeContext := range util.NodeContexts(ctx) {
71+
source.eg.Go(func() error {
72+
return source.tailNodeWithRetries(nodeContext.Ctx, nodeContext.Node)
73+
})
7174
}
75+
}
7276

73-
source.eg.Go(func() error {
74-
return helpers.ReadGRPCStream(dmesgStream, func(data *common.Data, node string, multipleNodes bool) error {
75-
if len(data.Bytes) == 0 {
76-
return nil
77-
}
77+
func (source *Source) tailNodeWithRetries(ctx context.Context, node string) error {
78+
for {
79+
readErr := source.readDmesg(ctx, node)
80+
if errors.Is(readErr, context.Canceled) || status.Code(readErr) == codes.Canceled {
81+
return nil
82+
}
7883

79-
line := strings.TrimSpace(string(data.Bytes))
80-
if line == "" {
81-
return nil
82-
}
84+
if readErr != nil {
85+
source.LogCh <- Data{Node: node, Error: readErr.Error()}
86+
}
8387

84-
select {
85-
case <-ctx.Done():
86-
if errors.Is(ctx.Err(), context.Canceled) {
87-
return nil
88-
}
88+
// back off a bit before retrying
89+
sleepWithContext(ctx, 30*time.Second)
90+
}
91+
}
8992

90-
return ctx.Err()
91-
case source.LogCh <- Data{Node: node, Log: line}:
92-
}
93+
func (source *Source) readDmesg(ctx context.Context, node string) error {
94+
dmesgStream, err := source.client.Dmesg(ctx, true, false)
95+
if err != nil {
96+
return fmt.Errorf("dashboard: error opening dmesg stream: %w", err)
97+
}
9398

99+
readErr := helpers.ReadGRPCStream(dmesgStream, func(data *common.Data, _ string, _ bool) error {
100+
if len(data.Bytes) == 0 {
94101
return nil
95-
})
102+
}
103+
104+
line := strings.TrimSpace(string(data.Bytes))
105+
if line == "" {
106+
return nil
107+
}
108+
109+
select {
110+
case <-ctx.Done():
111+
return ctx.Err()
112+
case source.LogCh <- Data{Node: node, Log: line}:
113+
}
114+
115+
return nil
96116
})
117+
if readErr != nil {
118+
return fmt.Errorf("error reading dmesg stream: %w", readErr)
119+
}
97120

98121
return nil
99122
}
123+
124+
func sleepWithContext(ctx context.Context, d time.Duration) {
125+
timer := time.NewTimer(d)
126+
select {
127+
case <-ctx.Done():
128+
if !timer.Stop() {
129+
<-timer.C
130+
}
131+
case <-timer.C:
132+
}
133+
}

internal/pkg/dashboard/resourcedata/resourcedata.go

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@ import (
1616
"github.com/cosi-project/runtime/pkg/state"
1717
"github.com/siderolabs/gen/channel"
1818
"golang.org/x/sync/errgroup"
19-
"google.golang.org/grpc/metadata"
2019

21-
"github.com/siderolabs/talos/pkg/machinery/client"
20+
"github.com/siderolabs/talos/internal/pkg/dashboard/util"
2221
"github.com/siderolabs/talos/pkg/machinery/constants"
2322
"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
2423
"github.com/siderolabs/talos/pkg/machinery/resources/config"
@@ -70,10 +69,9 @@ func (source *Source) run(ctx context.Context) {
7069

7170
source.NodeResourceCh = source.ch
7271

73-
nodes := source.nodes(ctx)
74-
for _, node := range nodes {
72+
for _, nodeContext := range util.NodeContexts(ctx) {
7573
source.eg.Go(func() error {
76-
source.runResourceWatchWithRetries(ctx, node)
74+
source.runResourceWatchWithRetries(nodeContext.Ctx, nodeContext.Node)
7775

7876
return nil
7977
})
@@ -101,10 +99,6 @@ func (source *Source) runResourceWatchWithRetries(ctx context.Context, node stri
10199

102100
//nolint:gocyclo,cyclop
103101
func (source *Source) runResourceWatch(ctx context.Context, node string) error {
104-
if node != "" {
105-
ctx = client.WithNode(ctx, node)
106-
}
107-
108102
ctx, cancel := context.WithCancel(ctx)
109103
defer cancel()
110104

@@ -211,22 +205,3 @@ func (source *Source) runResourceWatch(ctx context.Context, node string) error {
211205
}
212206
}
213207
}
214-
215-
func (source *Source) nodes(ctx context.Context) []string {
216-
md, mdOk := metadata.FromOutgoingContext(ctx)
217-
if !mdOk {
218-
return []string{""} // local node
219-
}
220-
221-
nodeVal := md.Get("node")
222-
if len(nodeVal) > 0 {
223-
return []string{nodeVal[0]}
224-
}
225-
226-
nodesVal := md.Get("nodes")
227-
if len(nodesVal) == 0 {
228-
return []string{""} // local node
229-
}
230-
231-
return nodesVal
232-
}

internal/pkg/dashboard/summary.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ func (widget *SummaryGrid) OnResourceDataChange(nodeResource resourcedata.Data)
9191
}
9292

9393
// OnLogDataChange implements the LogDataListener interface.
94-
func (widget *SummaryGrid) OnLogDataChange(node string, logLine string) {
95-
widget.logViewer(node).WriteLog(logLine)
94+
func (widget *SummaryGrid) OnLogDataChange(node, logLine, logError string) {
95+
widget.logViewer(node).WriteLog(logLine, logError)
9696
}
9797

9898
func (widget *SummaryGrid) updateLogViewer() {

internal/pkg/dashboard/util/util.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
// Package util provides utility functions for the dashboard.
6+
package util
7+
8+
import (
9+
"context"
10+
11+
"google.golang.org/grpc/metadata"
12+
13+
"github.com/siderolabs/talos/pkg/machinery/client"
14+
)
15+
16+
// NodeContext contains the context.Context for a single node and the node name.
17+
type NodeContext struct {
18+
Ctx context.Context //nolint:containedctx
19+
Node string
20+
}
21+
22+
// NodeContexts returns a list of NodeContexts from the given context.
23+
//
24+
// It extracts the node names from the outgoing GRPC context metadata.
25+
// If the node name is not present in the metadata, context will be returned as-is with an empty node name.
26+
func NodeContexts(ctx context.Context) []NodeContext {
27+
md, mdOk := metadata.FromOutgoingContext(ctx)
28+
if !mdOk {
29+
return []NodeContext{{Ctx: ctx}}
30+
}
31+
32+
nodeVal := md.Get("node")
33+
if len(nodeVal) > 0 {
34+
return []NodeContext{{Ctx: ctx, Node: nodeVal[0]}}
35+
}
36+
37+
nodesVal := md.Get("nodes")
38+
if len(nodesVal) == 0 {
39+
return []NodeContext{{Ctx: ctx}}
40+
}
41+
42+
nodeContexts := make([]NodeContext, 0, len(nodesVal))
43+
44+
for _, node := range nodesVal {
45+
nodeContexts = append(nodeContexts, NodeContext{Ctx: client.WithNode(ctx, node), Node: node})
46+
}
47+
48+
return nodeContexts
49+
}

0 commit comments

Comments
 (0)