Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/upgrade/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Rollback(ctx context.Context, log *logger.Logger, c client.Client, topDirPa
}

// revert active commit
if err := UpdateActiveCommit(log, topDirPath, prevHash); err != nil {
if err := UpdateActiveCommit(log, topDirPath, prevHash, os.WriteFile); err != nil {
return err
}

Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/agent/application/upgrade/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ func createUpdateMarker(t *testing.T, log *logger.Logger, topDir, newAgentVersio
hash: oldAgentHash,
versionedHome: oldAgentVersionedHome,
}

markUpgrade := markUpgradeProvider(UpdateActiveCommit, os.WriteFile)
err := markUpgrade(log,
paths.DataFrom(topDir),
newAgentInstall,
Expand Down
77 changes: 41 additions & 36 deletions internal/pkg/agent/application/upgrade/step_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package upgrade

import (
"encoding/json"
goerrors "errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -196,50 +197,54 @@ type agentInstall struct {
versionedHome string
}

// markUpgrade marks update happened so we can handle grace period
func markUpgrade(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error {

if len(previousAgent.hash) > hashLen {
previousAgent.hash = previousAgent.hash[:hashLen]
}

marker := &UpdateMarker{
Version: agent.version,
Hash: agent.hash,
VersionedHome: agent.versionedHome,
UpdatedOn: time.Now(),
PrevVersion: previousAgent.version,
PrevHash: previousAgent.hash,
PrevVersionedHome: previousAgent.versionedHome,
Action: action,
Details: upgradeDetails,
DesiredOutcome: desiredOutcome,
}

markerBytes, err := yaml.Marshal(newMarkerSerializer(marker))
if err != nil {
return errors.New(err, errors.TypeConfig, "failed to parse marker file")
}
type updateActiveCommitFunc func(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error

markerPath := markerFilePath(dataDirPath)
log.Infow("Writing upgrade marker file", "file.path", markerPath, "hash", marker.Hash, "prev_hash", marker.PrevHash)
if err := os.WriteFile(markerPath, markerBytes, 0600); err != nil {
return errors.New(err, errors.TypeFilesystem, "failed to create update marker file", errors.M(errors.MetaKeyPath, markerPath))
}
// markUpgrade marks update happened so we can handle grace period
func markUpgradeProvider(updateActiveCommit updateActiveCommitFunc, writeFile writeFileFunc) markUpgradeFunc {
return func(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error {

if len(previousAgent.hash) > hashLen {
previousAgent.hash = previousAgent.hash[:hashLen]
}

marker := &UpdateMarker{
Version: agent.version,
Hash: agent.hash,
VersionedHome: agent.versionedHome,
UpdatedOn: time.Now(),
PrevVersion: previousAgent.version,
PrevHash: previousAgent.hash,
PrevVersionedHome: previousAgent.versionedHome,
Action: action,
Details: upgradeDetails,
DesiredOutcome: desiredOutcome,
}

markerBytes, err := yaml.Marshal(newMarkerSerializer(marker))
if err != nil {
return errors.New(err, errors.TypeConfig, "failed to parse marker file")
}

markerPath := markerFilePath(dataDirPath)
log.Infow("Writing upgrade marker file", "file.path", markerPath, "hash", marker.Hash, "prev_hash", marker.PrevHash)
if err := writeFile(markerPath, markerBytes, 0600); err != nil {
return goerrors.Join(err, errors.New(errors.TypeFilesystem, "failed to create update marker file", errors.M(errors.MetaKeyPath, markerPath)))
}

if err := updateActiveCommit(log, paths.Top(), agent.hash, writeFile); err != nil {
return err
}

if err := UpdateActiveCommit(log, paths.Top(), agent.hash); err != nil {
return err
return nil
}

return nil
}

// UpdateActiveCommit updates active.commit file to point to active version.
func UpdateActiveCommit(log *logger.Logger, topDirPath, hash string) error {
func UpdateActiveCommit(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error {
activeCommitPath := filepath.Join(topDirPath, agentCommitFile)
log.Infow("Updating active commit", "file.path", activeCommitPath, "hash", hash)
if err := os.WriteFile(activeCommitPath, []byte(hash), 0600); err != nil {
return errors.New(err, errors.TypeFilesystem, "failed to update active commit", errors.M(errors.MetaKeyPath, activeCommitPath))
if err := writeFile(activeCommitPath, []byte(hash), 0600); err != nil {
return goerrors.Join(err, errors.New(errors.TypeFilesystem, "failed to update active commit", errors.M(errors.MetaKeyPath, activeCommitPath)))
}

return nil
Expand Down
97 changes: 97 additions & 0 deletions internal/pkg/agent/application/upgrade/step_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
package upgrade

import (
"errors"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"
)

func TestSaveAndLoadMarker_NoLoss(t *testing.T) {
Expand Down Expand Up @@ -260,3 +264,96 @@ desired_outcome: true
})
}
}

func TestMarkUpgrade(t *testing.T) {
log, _ := loggertest.New("test")
agent := agentInstall{
version: "8.5.0",
hash: "abc123",
versionedHome: "home/v8.5.0",
}
previousAgent := agentInstall{
version: "8.4.0",
hash: "xyz789",
versionedHome: "home/v8.4.0",
}
action := &fleetapi.ActionUpgrade{
ActionID: "action-123",
ActionType: "UPGRADE",
Data: fleetapi.ActionUpgradeData{
Version: "8.5.0",
SourceURI: "https://example.com/upgrade",
},
}
upgradeDetails := details.NewDetails("8.5.0", details.StateScheduled, "action-123")
desiredOutcome := OUTCOME_UPGRADE

testError := errors.New("test error")

type testCase struct {
fileName string
expectedError error
markUpgrade markUpgradeFunc
}

testCases := map[string]testCase{
"should return error if it fails updating the active commit file": {
fileName: "commit",
expectedError: testError,
markUpgrade: markUpgradeProvider(func(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error {
return testError
}, func(name string, data []byte, perm os.FileMode) error {
return nil
}),
},
"should return error if it fails writing to marker file": {
fileName: "marker",
expectedError: testError,
markUpgrade: markUpgradeProvider(func(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error {
return nil
}, func(name string, data []byte, perm os.FileMode) error {
return testError
}),
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
baseDir := t.TempDir()
paths.SetTop(baseDir)

err := tc.markUpgrade(log, paths.Data(), agent, previousAgent, action, upgradeDetails, desiredOutcome)
require.Error(t, err)
require.ErrorIs(t, err, tc.expectedError)
})
}
}

func TestUpdateActiveCommit(t *testing.T) {
log, _ := loggertest.New("test")
testError := errors.New("test error")
testCases := map[string]struct {
expectedError error
writeFileFunc writeFileFunc
}{
"should return error if it fails writing to file": {
expectedError: testError,
writeFileFunc: func(name string, data []byte, perm os.FileMode) error {
return testError
},
},
"should not return error if it writes to file": {
expectedError: nil,
writeFileFunc: func(name string, data []byte, perm os.FileMode) error {
return nil
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
err := UpdateActiveCommit(log, paths.Top(), "hash", tc.writeFileFunc)
require.ErrorIs(t, err, tc.expectedError)
})
}

}
21 changes: 15 additions & 6 deletions internal/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type unpackHandler interface {
type copyActionStoreFunc func(log *logger.Logger, newHome string) error
type copyRunDirectoryFunc func(log *logger.Logger, oldRunPath, newRunPath string) error
type fileDirCopyFunc func(from, to string, opts ...copy.Options) error
type markUpgradeFunc func(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error
type changeSymlinkFunc func(log *logger.Logger, topDirPath, symlinkPath, newTarget string) error
type rollbackInstallFunc func(ctx context.Context, log *logger.Logger, topDirPath, versionedHome, oldVersionedHome string) error

// Types used to abstract stdlib functions
type mkdirAllFunc func(name string, perm fs.FileMode) error
Expand All @@ -96,6 +99,9 @@ type Upgrader struct {
extractAgentVersion func(metadata packageMetadata, upgradeVersion string) agentVersion
copyActionStore copyActionStoreFunc
copyRunDirectory copyRunDirectoryFunc
markUpgrade markUpgradeFunc
changeSymlink changeSymlinkFunc
rollbackInstall rollbackInstallFunc
}

// IsUpgradeable when agent is installed and running as a service or flag was provided.
Expand All @@ -119,6 +125,9 @@ func NewUpgrader(log *logger.Logger, settings *artifact.Config, agentInfo info.A
extractAgentVersion: extractAgentVersion,
copyActionStore: copyActionStoreProvider(os.ReadFile, os.WriteFile),
copyRunDirectory: copyRunDirectoryProvider(os.MkdirAll, copy.Copy),
markUpgrade: markUpgradeProvider(UpdateActiveCommit, os.WriteFile),
changeSymlink: changeSymlink,
rollbackInstall: rollbackInstall,
}, nil
}

Expand Down Expand Up @@ -350,9 +359,9 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string
return nil, fmt.Errorf("calculating home path relative to top, home: %q top: %q : %w", paths.Home(), paths.Top(), err)
}

if err := changeSymlink(u.log, paths.Top(), symlinkPath, newPath); err != nil {
if err := u.changeSymlink(u.log, paths.Top(), symlinkPath, newPath); err != nil {
u.log.Errorw("Rolling back: changing symlink failed", "error.message", err)
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(err, rollbackErr)
}

Expand All @@ -375,13 +384,13 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string
versionedHome: currentVersionedHome,
}

if err := markUpgrade(u.log,
if err := u.markUpgrade(u.log,
paths.Data(), // data dir to place the marker in
current, // new agent version data
previous, // old agent version data
action, det, OUTCOME_UPGRADE); err != nil {
u.log.Errorw("Rolling back: marking upgrade failed", "error.message", err)
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(err, rollbackErr)
}

Expand All @@ -390,14 +399,14 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string
var watcherCmd *exec.Cmd
if watcherCmd, err = InvokeWatcher(u.log, watcherExecutable); err != nil {
u.log.Errorw("Rolling back: starting watcher failed", "error.message", err)
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(err, rollbackErr)
}

watcherWaitErr := waitForWatcher(ctx, u.log, markerFilePath(paths.Data()), watcherMaxWaitTime)
if watcherWaitErr != nil {
killWatcherErr := watcherCmd.Process.Kill()
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(watcherWaitErr, killWatcherErr, rollbackErr)
}

Expand Down
75 changes: 75 additions & 0 deletions internal/pkg/agent/application/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,81 @@ func TestUpgradeErrorHandling(t *testing.T) {
}
},
},
"should return error if changeSymlink fails": {
isDiskSpaceErrorResult: false,
expectedError: testError,
upgraderMocker: func(upgrader *Upgrader) {
upgrader.artifactDownloader = &mockArtifactDownloader{}
upgrader.extractAgentVersion = func(metadata packageMetadata, upgradeVersion string) agentVersion {
return agentVersion{
version: upgradeVersion,
snapshot: false,
hash: metadata.hash,
}
}
upgrader.unpacker = &mockUnpacker{
returnPackageMetadata: packageMetadata{
manifest: &v1.PackageManifest{},
hash: "hash",
},
returnUnpackResult: UnpackResult{
Hash: "hash",
VersionedHome: "versionedHome",
},
}
upgrader.copyActionStore = func(log *logger.Logger, newHome string) error {
return nil
}
upgrader.copyRunDirectory = func(log *logger.Logger, oldRunPath, newRunPath string) error {
return nil
}
upgrader.rollbackInstall = func(ctx context.Context, log *logger.Logger, topDirPath, versionedHome, oldVersionedHome string) error {
return nil
}
upgrader.changeSymlink = func(log *logger.Logger, topDirPath, symlinkPath, newTarget string) error {
return testError
}
},
},
"should return error if markUpgrade fails": {
isDiskSpaceErrorResult: false,
expectedError: testError,
upgraderMocker: func(upgrader *Upgrader) {
upgrader.artifactDownloader = &mockArtifactDownloader{}
upgrader.extractAgentVersion = func(metadata packageMetadata, upgradeVersion string) agentVersion {
return agentVersion{
version: upgradeVersion,
snapshot: false,
hash: metadata.hash,
}
}
upgrader.unpacker = &mockUnpacker{
returnPackageMetadata: packageMetadata{
manifest: &v1.PackageManifest{},
hash: "hash",
},
returnUnpackResult: UnpackResult{
Hash: "hash",
VersionedHome: "versionedHome",
},
}
upgrader.copyActionStore = func(log *logger.Logger, newHome string) error {
return nil
}
upgrader.copyRunDirectory = func(log *logger.Logger, oldRunPath, newRunPath string) error {
return nil
}
upgrader.changeSymlink = func(log *logger.Logger, topDirPath, symlinkPath, newTarget string) error {
return nil
}
upgrader.rollbackInstall = func(ctx context.Context, log *logger.Logger, topDirPath, versionedHome, oldVersionedHome string) error {
return nil
}
upgrader.markUpgrade = func(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error {
return testError
}
},
},
"should add disk space error to the error chain if downloadArtifact fails with disk space error": {
isDiskSpaceErrorResult: true,
expectedError: upgradeErrors.ErrInsufficientDiskSpace,
Expand Down
Loading