Skip to content

Commit c5356cd

Browse files
authored
[cmd/opampsupervisor] chore: Store RemoteConfigStatus in persistent state (#40467)
1 parent 0245843 commit c5356cd

File tree

7 files changed

+269
-102
lines changed

7 files changed

+269
-102
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Supervisor now persists the remote config status to disk. This allows more accurate reporting of the remote config status. Also reports healthy status when not running the agent due to empty config (previous performance optimization).
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40467]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

cmd/opampsupervisor/e2e_test.go

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,6 @@ func TestSupervisorStartsCollectorWithRemoteConfigAndExecParams(t *testing.T) {
457457

458458
// create server
459459
var agentConfig atomic.Value
460-
var remoteConfigStatus atomic.Value
461460
server := newOpAMPServer(
462461
t,
463462
defaultConnectingHandler,
@@ -469,9 +468,6 @@ func TestSupervisorStartsCollectorWithRemoteConfigAndExecParams(t *testing.T) {
469468
agentConfig.Store(string(config.Body))
470469
}
471470
}
472-
if message.RemoteConfigStatus != nil {
473-
remoteConfigStatus.Store(message.RemoteConfigStatus)
474-
}
475471
return &protobufs.ServerToAgent{}
476472
},
477473
})
@@ -502,12 +498,6 @@ func TestSupervisorStartsCollectorWithRemoteConfigAndExecParams(t *testing.T) {
502498

503499
waitForSupervisorConnection(server.supervisorConnected, true)
504500

505-
// verify that the supervisor reports the remote config as applied and the correct hash
506-
require.Eventually(t, func() bool {
507-
status := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
508-
return status != nil && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED && bytes.Equal(status.LastRemoteConfigHash, hash)
509-
}, 20*time.Second, 500*time.Millisecond, "Remote config status never became applied")
510-
511501
for _, port := range []int{healthcheckPort, secondHealthcheckPort} {
512502
require.Eventually(t, func() bool {
513503
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d", port))
@@ -1364,6 +1354,7 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
13641354
tempDir := t.TempDir()
13651355

13661356
var agentConfig atomic.Value
1357+
var initialRemoteConfigStatus atomic.Value
13671358
initialServer := newOpAMPServer(
13681359
t,
13691360
defaultConnectingHandler,
@@ -1375,6 +1366,9 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
13751366
agentConfig.Store(string(config.Body))
13761367
}
13771368
}
1369+
if message.RemoteConfigStatus != nil {
1370+
initialRemoteConfigStatus.Store(message.RemoteConfigStatus)
1371+
}
13781372
return &protobufs.ServerToAgent{}
13791373
},
13801374
})
@@ -1385,16 +1379,16 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
13851379

13861380
waitForSupervisorConnection(initialServer.supervisorConnected, true)
13871381

1388-
cfg, hash, _, _ := createSimplePipelineCollectorConf(t)
1382+
simplePipelineCFG, simplePipelineHash, _, _ := createSimplePipelineCollectorConf(t)
13891383

13901384
initialServer.sendToSupervisor(&protobufs.ServerToAgent{
13911385
RemoteConfig: &protobufs.AgentRemoteConfig{
13921386
Config: &protobufs.AgentConfigMap{
13931387
ConfigMap: map[string]*protobufs.AgentConfigFile{
1394-
"": {Body: cfg.Bytes()},
1388+
"": {Body: simplePipelineCFG.Bytes()},
13951389
},
13961390
},
1397-
ConfigHash: hash,
1391+
ConfigHash: simplePipelineHash,
13981392
},
13991393
})
14001394

@@ -1404,10 +1398,17 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
14041398
return err == nil
14051399
}, 5*time.Second, 250*time.Millisecond, "Config file was not written to persistent storage directory")
14061400

1401+
// wait for remote config status to be applied
1402+
require.Eventually(t, func() bool {
1403+
status := initialRemoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
1404+
return status != nil && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED && bytes.Equal(status.LastRemoteConfigHash, simplePipelineHash)
1405+
}, 20*time.Second, 500*time.Millisecond, "Initial remote config never became applied")
1406+
14071407
agentConfig.Store("")
14081408
s.Shutdown()
14091409
initialServer.shutdown()
14101410

1411+
var remoteConfigStatus atomic.Value
14111412
newServer := newOpAMPServer(
14121413
t,
14131414
defaultConnectingHandler,
@@ -1419,6 +1420,9 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
14191420
agentConfig.Store(string(config.Body))
14201421
}
14211422
}
1423+
if message.RemoteConfigStatus != nil {
1424+
remoteConfigStatus.Store(message.RemoteConfigStatus)
1425+
}
14221426
return &protobufs.ServerToAgent{}
14231427
},
14241428
})
@@ -1435,6 +1439,12 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
14351439
Flags: uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState),
14361440
})
14371441

1442+
// verify that the supervisor reports the existing remote config status
1443+
require.Eventually(t, func() bool {
1444+
status := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
1445+
return status != nil && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED && bytes.Equal(status.LastRemoteConfigHash, simplePipelineHash)
1446+
}, 20*time.Second, 500*time.Millisecond, "Remote config status never became applied")
1447+
14381448
// Check that the new Supervisor instance starts with the configuration from the last received remote config
14391449
require.Eventually(t, func() bool {
14401450
loadedConfig, ok := agentConfig.Load().(string)
@@ -1635,6 +1645,8 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) {
16351645

16361646
func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
16371647
agentCfgChan := make(chan string, 1)
1648+
var healthReport atomic.Value
1649+
var remoteConfigStatus atomic.Value
16381650
server := newOpAMPServer(
16391651
t,
16401652
defaultConnectingHandler,
@@ -1649,7 +1661,12 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
16491661
}
16501662
}
16511663
}
1652-
1664+
if message.Health != nil {
1665+
healthReport.Store(message.Health)
1666+
}
1667+
if message.RemoteConfigStatus != nil {
1668+
remoteConfigStatus.Store(message.RemoteConfigStatus)
1669+
}
16531670
return &protobufs.ServerToAgent{}
16541671
},
16551672
})
@@ -1723,6 +1740,18 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
17231740
assert.ErrorContains(tt, err, "No connection could be made")
17241741
}
17251742
}, 3*time.Second, 250*time.Millisecond)
1743+
1744+
// Verify we have a healthy status (if it was ran with the empty config it would be healthy)
1745+
require.Eventually(t, func() bool {
1746+
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
1747+
return ok && health.Healthy
1748+
}, 3*time.Second, 250*time.Millisecond)
1749+
1750+
// Verify the status is set to APPLIED (if it was ran with the empty config it would be APPLIED)
1751+
require.Eventually(t, func() bool {
1752+
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
1753+
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED
1754+
}, 3*time.Second, 250*time.Millisecond)
17261755
}
17271756

17281757
type logEntry struct {
29.6 MB
Binary file not shown.

cmd/opampsupervisor/supervisor/persistence.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,68 @@
44
package supervisor
55

66
import (
7+
"encoding/hex"
78
"errors"
89
"os"
910

1011
"github.com/google/uuid"
12+
"github.com/open-telemetry/opamp-go/protobufs"
13+
"go.uber.org/zap"
1114
"gopkg.in/yaml.v3"
1215
)
1316

1417
// persistentState represents persistent state for the supervisor
1518
type persistentState struct {
16-
InstanceID uuid.UUID `yaml:"instance_id"`
19+
InstanceID uuid.UUID `yaml:"instance_id"`
20+
LastRemoteConfigStatus *RemoteConfigStatus `yaml:"last_remote_config_status"`
1721

1822
// Path to the config file that the state should be saved to.
1923
// This is not marshaled.
20-
configPath string `yaml:"-"`
24+
configPath string `yaml:"-"`
25+
logger *zap.Logger `yaml:"-"`
26+
}
27+
28+
// RemoteConfigStatus is a custom struct that is used to marshal/unmarshal the remote config status.
29+
// LastRemoteConfigHash is a hex encoded string of the last remote config hash for human readability.
30+
type RemoteConfigStatus struct {
31+
// Status is the status of the last remote config.
32+
Status protobufs.RemoteConfigStatuses `yaml:"status"`
33+
// LastRemoteConfigHash is a hex encoded string of the last remote config hash for human readability.
34+
LastRemoteConfigHash string `yaml:"last_remote_config_hash"`
35+
// ErrorMessage is the error message of the last remote config.
36+
ErrorMessage string `yaml:"error_message"`
2137
}
2238

2339
func (p *persistentState) SetInstanceID(id uuid.UUID) error {
2440
p.InstanceID = id
2541
return p.writeState()
2642
}
2743

44+
func (p *persistentState) SetLastRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error {
45+
p.LastRemoteConfigStatus = &RemoteConfigStatus{
46+
Status: status.Status,
47+
LastRemoteConfigHash: hex.EncodeToString(status.LastRemoteConfigHash),
48+
ErrorMessage: status.ErrorMessage,
49+
}
50+
return p.writeState()
51+
}
52+
53+
func (p *persistentState) GetLastRemoteConfigStatus() *protobufs.RemoteConfigStatus {
54+
if p.LastRemoteConfigStatus == nil {
55+
return nil
56+
}
57+
lastRemoteConfigHash, err := hex.DecodeString(p.LastRemoteConfigStatus.LastRemoteConfigHash)
58+
if err != nil {
59+
p.logger.Error("Failed to decode last remote config hash, returning empty status", zap.Error(err))
60+
return nil
61+
}
62+
return &protobufs.RemoteConfigStatus{
63+
Status: p.LastRemoteConfigStatus.Status,
64+
LastRemoteConfigHash: lastRemoteConfigHash,
65+
ErrorMessage: p.LastRemoteConfigStatus.ErrorMessage,
66+
}
67+
}
68+
2869
func (p *persistentState) writeState() error {
2970
by, err := yaml.Marshal(p)
3071
if err != nil {
@@ -36,19 +77,19 @@ func (p *persistentState) writeState() error {
3677

3778
// loadOrCreatePersistentState attempts to load the persistent state from disk. If it doesn't
3879
// exist, a new persistent state file is created.
39-
func loadOrCreatePersistentState(file string) (*persistentState, error) {
40-
state, err := loadPersistentState(file)
80+
func loadOrCreatePersistentState(file string, logger *zap.Logger) (*persistentState, error) {
81+
state, err := loadPersistentState(file, logger)
4182
switch {
4283
case errors.Is(err, os.ErrNotExist):
43-
return createNewPersistentState(file)
84+
return createNewPersistentState(file, logger)
4485
case err != nil:
4586
return nil, err
4687
default:
4788
return state, nil
4889
}
4990
}
5091

51-
func loadPersistentState(file string) (*persistentState, error) {
92+
func loadPersistentState(file string, logger *zap.Logger) (*persistentState, error) {
5293
var state *persistentState
5394

5495
by, err := os.ReadFile(file)
@@ -61,11 +102,12 @@ func loadPersistentState(file string) (*persistentState, error) {
61102
}
62103

63104
state.configPath = file
105+
state.logger = logger
64106

65107
return state, nil
66108
}
67109

68-
func createNewPersistentState(file string) (*persistentState, error) {
110+
func createNewPersistentState(file string, logger *zap.Logger) (*persistentState, error) {
69111
id, err := uuid.NewV7()
70112
if err != nil {
71113
return nil, err
@@ -74,6 +116,7 @@ func createNewPersistentState(file string) (*persistentState, error) {
74116
p := &persistentState{
75117
InstanceID: id,
76118
configPath: file,
119+
logger: logger,
77120
}
78121

79122
return p, p.writeState()

0 commit comments

Comments
 (0)