Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions pkg/monitortests/etcd/etcdloganalyzer/etcd_recorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package etcdloganalyzer

import (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodeRabbit configuration

"strconv"
"testing"
"time"

"github.com/openshift/origin/pkg/monitor/monitorapi"
"github.com/openshift/origin/pkg/monitortestlibrary/podaccess"
"k8s.io/apimachinery/pkg/runtime"
)

// mockRecorder captures intervals added to it for testing
type mockRecorder struct {
intervals monitorapi.Intervals
}

func (m *mockRecorder) AddIntervals(eventIntervals ...monitorapi.Interval) {
m.intervals = append(m.intervals, eventIntervals...)
}

func (m *mockRecorder) StartInterval(interval monitorapi.Interval) int { return 0 }
func (m *mockRecorder) EndInterval(startedInterval int, t time.Time) *monitorapi.Interval {
return nil
}
func (m *mockRecorder) Record(conditions ...monitorapi.Condition) {}
func (m *mockRecorder) RecordAt(t time.Time, conditions ...monitorapi.Condition) {}
func (m *mockRecorder) RecordResource(resourceType string, obj runtime.Object) {}

func TestEtcdRecorderBatching(t *testing.T) {
mock := &mockRecorder{}
recorder := newEtcdRecorder(mock)

baseTime := time.Date(2026, 2, 3, 10, 51, 0, 0, time.UTC)
locator := monitorapi.Locator{
Type: monitorapi.LocatorTypeContainer,
Keys: map[monitorapi.LocatorKey]string{
"node": "node-1",
"pod": "etcd-node-1",
"container": "etcd",
"namespace": "openshift-etcd",
},
}

// Simulate 5 "apply request took too long" messages in the same minute
for i := 0; i < 5; i++ {
ts := baseTime.Add(time.Duration(i*10) * time.Second).Format(time.RFC3339)
logLine := podaccess.LogLineContent{
Locator: locator,
Instant: baseTime.Add(time.Duration(i*10) * time.Second),
Line: `{"level":"warn","ts":"` + ts + `","msg":"apply request took too long"}`,
}
recorder.HandleLogLine(logLine)
}

// Simulate 3 "slow fdatasync" messages in the same minute
for i := 0; i < 3; i++ {
ts := baseTime.Add(time.Duration(i*15) * time.Second).Format(time.RFC3339)
logLine := podaccess.LogLineContent{
Locator: locator,
Instant: baseTime.Add(time.Duration(i*15) * time.Second),
Line: `{"level":"warn","ts":"` + ts + `","msg":"slow fdatasync"}`,
}
recorder.HandleLogLine(logLine)
}

// Simulate 2 "apply request took too long" messages in a different minute
for i := 0; i < 2; i++ {
ts := baseTime.Add(time.Minute).Add(time.Duration(i*10) * time.Second).Format(time.RFC3339)
logLine := podaccess.LogLineContent{
Locator: locator,
Instant: baseTime.Add(time.Minute).Add(time.Duration(i*10) * time.Second),
Line: `{"level":"warn","ts":"` + ts + `","msg":"apply request took too long"}`,
}
recorder.HandleLogLine(logLine)
}

// Before flush, no intervals should be recorded
if len(mock.intervals) != 0 {
t.Errorf("Expected 0 intervals before Flush, got %d", len(mock.intervals))
}

// Flush the batches
recorder.Flush()

// After flush, we should have 3 batched intervals:
// - "apply request took too long" at 10:51 with count=5
// - "slow fdatasync" at 10:51 with count=3
// - "apply request took too long" at 10:52 with count=2
if len(mock.intervals) != 3 {
t.Errorf("Expected 3 intervals after Flush, got %d", len(mock.intervals))
}

// Verify the total count across all batches equals the original count
totalCount := 0
for _, interval := range mock.intervals {
countStr := interval.Message.Annotations[monitorapi.AnnotationCount]
count, err := strconv.Atoi(countStr)
if err != nil {
t.Errorf("Invalid count annotation %q: %v", countStr, err)
continue
}
totalCount += count

// Verify Display is true
if !interval.Display {
t.Errorf("Expected Display=true for batched interval")
}

// Verify source is EtcdLog
if interval.Source != monitorapi.SourceEtcdLog {
t.Errorf("Expected source EtcdLog, got %s", interval.Source)
}

// Verify time is minute-aligned
if interval.From.Second() != 0 || interval.From.Nanosecond() != 0 {
t.Errorf("Expected minute-aligned From time, got %v", interval.From)
}
}

expectedTotalCount := 5 + 3 + 2
if totalCount != expectedTotalCount {
t.Errorf("Expected total count %d, got %d", expectedTotalCount, totalCount)
}

// Verify the locator includes the etcd-event key for chart separation
for _, interval := range mock.intervals {
eventKey, ok := interval.Locator.Keys["etcd-event"]
if !ok {
t.Errorf("Expected locator to have etcd-event key, got keys: %v", interval.Locator.Keys)
continue
}
// Verify the event key is one of the expected values
validKeys := map[string]bool{
"apply-slow": true,
"slow-fdatasync": true,
}
if !validKeys[eventKey] {
t.Errorf("Unexpected etcd-event key: %s", eventKey)
}
}
}

func TestEtcdRecorderLeadershipMessagesNotBatched(t *testing.T) {
mock := &mockRecorder{}
recorder := newEtcdRecorder(mock)

baseTime := time.Date(2026, 2, 3, 10, 51, 0, 0, time.UTC)
locator := monitorapi.Locator{
Type: monitorapi.LocatorTypeContainer,
Keys: map[monitorapi.LocatorKey]string{
"node": "node-1",
"pod": "etcd-node-1",
"container": "etcd",
"namespace": "openshift-etcd",
},
}

// Leadership messages should be recorded immediately, not batched
logLine := podaccess.LogLineContent{
Locator: locator,
Instant: baseTime,
Line: `{"level":"info","ts":"2026-02-03T10:51:00Z","msg":"raft.node: abc123 elected leader def456 at term 5"}`,
}
recorder.HandleLogLine(logLine)

// Leadership message should be recorded immediately (not in batch)
if len(mock.intervals) != 1 {
t.Errorf("Expected 1 interval for leadership message, got %d", len(mock.intervals))
}

// Verify it's a leadership event, not a batched etcd log
if len(mock.intervals) > 0 {
if mock.intervals[0].Source != monitorapi.SourceEtcdLeadership {
t.Errorf("Expected source EtcdLeadership, got %s", mock.intervals[0].Source)
}
if mock.intervals[0].Message.Reason != "LeaderFound" {
t.Errorf("Expected reason LeaderFound, got %s", mock.intervals[0].Message.Reason)
}
}
}

func TestEtcdRecorderFlushClearsBatches(t *testing.T) {
mock := &mockRecorder{}
recorder := newEtcdRecorder(mock)

baseTime := time.Date(2026, 2, 3, 10, 51, 0, 0, time.UTC)
locator := monitorapi.Locator{
Type: monitorapi.LocatorTypeContainer,
Keys: map[monitorapi.LocatorKey]string{
"node": "node-1",
},
}

// Add some log lines
ts := baseTime.Format(time.RFC3339)
logLine := podaccess.LogLineContent{
Locator: locator,
Instant: baseTime,
Line: `{"level":"warn","ts":"` + ts + `","msg":"apply request took too long"}`,
}
recorder.HandleLogLine(logLine)

// First flush
recorder.Flush()
if len(mock.intervals) != 1 {
t.Errorf("Expected 1 interval after first Flush, got %d", len(mock.intervals))
}

// Second flush should not add any more intervals (batches were cleared)
recorder.Flush()
if len(mock.intervals) != 1 {
t.Errorf("Expected still 1 interval after second Flush, got %d", len(mock.intervals))
}
}
116 changes: 97 additions & 19 deletions pkg/monitortests/etcd/etcdloganalyzer/monitortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"regexp"
"sort"
"strings"
"sync"
"time"

configv1 "github.com/openshift/api/config/v1"
Expand Down Expand Up @@ -34,6 +35,7 @@ type etcdLogAnalyzer struct {
stopCollection context.CancelFunc
finishedCollecting chan struct{}
dualReplica bool // true if running on DualReplica topology where etcd runs externally
etcdRecorder *etcdRecorder
}

func NewEtcdLogAnalyzer() monitortestframework.MonitorTest {
Expand Down Expand Up @@ -72,7 +74,7 @@ func (w *etcdLogAnalyzer) PrepareCollection(ctx context.Context, adminRESTConfig
return nil
}

logToIntervalConverter := newEtcdRecorder(recorder)
w.etcdRecorder = newEtcdRecorder(recorder)
kubeClient, err := kubernetes.NewForConfig(w.adminRESTConfig)
if err != nil {
return err
Expand All @@ -91,7 +93,7 @@ func (w *etcdLogAnalyzer) PrepareCollection(ctx context.Context, adminRESTConfig
labels.NewSelector().Add(*etcdLabel),
"openshift-etcd",
"etcd",
logToIntervalConverter,
w.etcdRecorder,
namespaceScopedCoreInformers.Pods(),
)

Expand All @@ -113,6 +115,11 @@ func (w *etcdLogAnalyzer) CollectData(ctx context.Context, storageDir string, be
// wait until we're drained
<-w.finishedCollecting

// Flush any pending batched intervals
if w.etcdRecorder != nil {
w.etcdRecorder.Flush()
}

return nil, nil, nil
}

Expand Down Expand Up @@ -230,26 +237,83 @@ func (*etcdLogAnalyzer) Cleanup(ctx context.Context) error {
return nil
}

// batchKey uniquely identifies a batch of similar etcd log intervals
type batchKey struct {
locator string
humanMessage string
minuteBucket int64 // Unix timestamp truncated to minute
}

// batchEntry holds the data for a batch of intervals
type batchEntry struct {
locator monitorapi.Locator
level monitorapi.IntervalLevel
message string
eventKey string // short key for the event type (e.g., "apply-slow")
fromMinute time.Time
count int
}

type etcdRecorder struct {
recorder monitorapi.RecorderWriter
// TODO this limits our ability to have custom messages, we probably want something better
subStrings []subStringLevel

// batches tracks ongoing batches of high-volume log messages
batchMu sync.Mutex
batches map[batchKey]*batchEntry
}

func newEtcdRecorder(recorder monitorapi.RecorderWriter) etcdRecorder {
return etcdRecorder{
func newEtcdRecorder(recorder monitorapi.RecorderWriter) *etcdRecorder {
return &etcdRecorder{
recorder: recorder,
subStrings: []subStringLevel{
{"slow fdatasync", monitorapi.Warning},
{"dropped internal Raft message since sending buffer is full", monitorapi.Warning},
{"waiting for ReadIndex response took too long, retrying", monitorapi.Warning},
{"apply request took too long", monitorapi.Warning},
{"is starting a new election", monitorapi.Info},
{"slow fdatasync", monitorapi.Warning, "slow-fdatasync"},
{"dropped internal Raft message since sending buffer is full", monitorapi.Warning, "raft-buffer-full"},
{"waiting for ReadIndex response took too long, retrying", monitorapi.Warning, "readindex-slow"},
{"apply request took too long", monitorapi.Warning, "apply-slow"},
{"is starting a new election", monitorapi.Info, "election-start"},
},
batches: make(map[batchKey]*batchEntry),
}
}

// Flush emits all pending batched intervals to the recorder.
// This should be called when log collection is complete.
func (g *etcdRecorder) Flush() {
g.batchMu.Lock()
defer g.batchMu.Unlock()

for _, batch := range g.batches {
// Create a new locator with the event key added so different event types
// appear on separate lines in the timeline chart
locatorWithKey := monitorapi.Locator{
Type: batch.locator.Type,
Keys: make(map[monitorapi.LocatorKey]string, len(batch.locator.Keys)+1),
}
for k, v := range batch.locator.Keys {
locatorWithKey.Keys[k] = v
}
locatorWithKey.Keys["etcd-event"] = batch.eventKey

g.recorder.AddIntervals(
monitorapi.NewInterval(monitorapi.SourceEtcdLog, batch.level).
Locator(locatorWithKey).
Message(
monitorapi.NewMessage().
WithAnnotation(monitorapi.AnnotationCount, fmt.Sprintf("%d", batch.count)).
HumanMessage(batch.message),
).
Display().
Build(batch.fromMinute, batch.fromMinute.Add(time.Minute)),
)
}

// Clear the batches
g.batches = make(map[batchKey]*batchEntry)
}

func (g etcdRecorder) HandleLogLine(logLine podaccess.LogLineContent) {
func (g *etcdRecorder) HandleLogLine(logLine podaccess.LogLineContent) {
line := logLine.Line
parsedLine := etcdLogLine{}
err := json.Unmarshal([]byte(line), &parsedLine)
Expand All @@ -263,15 +327,29 @@ func (g etcdRecorder) HandleLogLine(logLine podaccess.LogLineContent) {
continue
}

g.recorder.AddIntervals(
monitorapi.NewInterval(monitorapi.SourceEtcdLog, monitorapi.Warning).
Locator(logLine.Locator).
Message(
monitorapi.NewMessage().
HumanMessage(parsedLine.Msg),
).
Display().
Build(parsedLine.Timestamp, parsedLine.Timestamp.Add(1*time.Second)))
// Batch these high-volume intervals instead of recording each one individually.
// They will be flushed as aggregated intervals when collection ends.
minuteBucket := parsedLine.Timestamp.Truncate(time.Minute)
key := batchKey{
locator: logLine.Locator.OldLocator(),
humanMessage: parsedLine.Msg,
minuteBucket: minuteBucket.Unix(),
}

g.batchMu.Lock()
if existing, ok := g.batches[key]; ok {
existing.count++
} else {
g.batches[key] = &batchEntry{
locator: logLine.Locator,
level: substring.level,
message: parsedLine.Msg,
eventKey: substring.key,
fromMinute: minuteBucket,
count: 1,
}
}
g.batchMu.Unlock()
}

var etcdSource monitorapi.IntervalSource = monitorapi.SourceEtcdLeadership
Expand Down
Loading