From aa74c915b271569c17912ce5f628f74233519c84 Mon Sep 17 00:00:00 2001 From: Kaan Yalti Date: Mon, 15 Sep 2025 18:29:21 +0300 Subject: [PATCH] Enhancement/5235 wrap errors when marking upgrade (#9366) * enhancement(5235): added markUpgradeFunc to abstract markUpgrade * enhancement(5235): using markUpgradeFunc in Upgrade function * enhancement(5235): wrapping writeFile errors in markUpgrade enhancement(5235): added goerrors in step_mark * enhancement(5235): using writeFile wrapper in markUpgrade * enhancement(5235): added tests mark upgrade tests enhancement(5235): updated step mark test to mock writeFile * enhancement(5235): updated markUpgrade function so that it can be tested, jecting dependencies. updated relevant tests * enhancement(5235): updated use of markUpgrade in rollback and rollback tests * enhancement(5235): abstracted markupgrade from upgrader, added relevant types and updated the upgrader struct. added tests case for mark upgrade error handling in the upgrade function * enhancement(5235): added abstractions for changesymlink and rollbackInstall in upgrader, updated error handling tests to use these abstractions * enhancement(5235): added error handling test for changesymlink (cherry picked from commit 2e4e777312bf49f9f5214262ef2991299977de3e) --- .../pkg/agent/application/upgrade/rollback.go | 2 +- .../application/upgrade/rollback_test.go | 2 + .../agent/application/upgrade/step_mark.go | 77 ++++++++------- .../application/upgrade/step_mark_test.go | 97 +++++++++++++++++++ .../pkg/agent/application/upgrade/upgrade.go | 21 ++-- .../agent/application/upgrade/upgrade_test.go | 75 ++++++++++++++ 6 files changed, 231 insertions(+), 43 deletions(-) diff --git a/internal/pkg/agent/application/upgrade/rollback.go b/internal/pkg/agent/application/upgrade/rollback.go index b562ab323b6..ef0ed1b9516 100644 --- a/internal/pkg/agent/application/upgrade/rollback.go +++ b/internal/pkg/agent/application/upgrade/rollback.go @@ -52,7 +52,7 @@ func Rollback(ctx context.Context, log *logger.Logger, c client.Client, topDirPa } // revert active commit - if err := UpdateActiveCommit(log, topDirPath, prevHash); err != nil { + if err := UpdateActiveCommit(log, topDirPath, prevHash, os.WriteFile); err != nil { return err } diff --git a/internal/pkg/agent/application/upgrade/rollback_test.go b/internal/pkg/agent/application/upgrade/rollback_test.go index 664eb72978b..2cb23c25ad0 100644 --- a/internal/pkg/agent/application/upgrade/rollback_test.go +++ b/internal/pkg/agent/application/upgrade/rollback_test.go @@ -504,6 +504,8 @@ func createUpdateMarker(t *testing.T, log *logger.Logger, topDir, newAgentVersio hash: oldAgentHash, versionedHome: oldAgentVersionedHome, } + + markUpgrade := markUpgradeProvider(UpdateActiveCommit, os.WriteFile) err := markUpgrade(log, paths.DataFrom(topDir), newAgentInstall, diff --git a/internal/pkg/agent/application/upgrade/step_mark.go b/internal/pkg/agent/application/upgrade/step_mark.go index 106d12ec09f..8f81bbd43f7 100644 --- a/internal/pkg/agent/application/upgrade/step_mark.go +++ b/internal/pkg/agent/application/upgrade/step_mark.go @@ -6,6 +6,7 @@ package upgrade import ( "encoding/json" + goerrors "errors" "fmt" "os" "path/filepath" @@ -196,50 +197,54 @@ type agentInstall struct { versionedHome string } -// markUpgrade marks update happened so we can handle grace period -func markUpgrade(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error { - - if len(previousAgent.hash) > hashLen { - previousAgent.hash = previousAgent.hash[:hashLen] - } - - marker := &UpdateMarker{ - Version: agent.version, - Hash: agent.hash, - VersionedHome: agent.versionedHome, - UpdatedOn: time.Now(), - PrevVersion: previousAgent.version, - PrevHash: previousAgent.hash, - PrevVersionedHome: previousAgent.versionedHome, - Action: action, - Details: upgradeDetails, - DesiredOutcome: desiredOutcome, - } - - markerBytes, err := yaml.Marshal(newMarkerSerializer(marker)) - if err != nil { - return errors.New(err, errors.TypeConfig, "failed to parse marker file") - } +type updateActiveCommitFunc func(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error - markerPath := markerFilePath(dataDirPath) - log.Infow("Writing upgrade marker file", "file.path", markerPath, "hash", marker.Hash, "prev_hash", marker.PrevHash) - if err := os.WriteFile(markerPath, markerBytes, 0600); err != nil { - return errors.New(err, errors.TypeFilesystem, "failed to create update marker file", errors.M(errors.MetaKeyPath, markerPath)) - } +// markUpgrade marks update happened so we can handle grace period +func markUpgradeProvider(updateActiveCommit updateActiveCommitFunc, writeFile writeFileFunc) markUpgradeFunc { + return func(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error { + + if len(previousAgent.hash) > hashLen { + previousAgent.hash = previousAgent.hash[:hashLen] + } + + marker := &UpdateMarker{ + Version: agent.version, + Hash: agent.hash, + VersionedHome: agent.versionedHome, + UpdatedOn: time.Now(), + PrevVersion: previousAgent.version, + PrevHash: previousAgent.hash, + PrevVersionedHome: previousAgent.versionedHome, + Action: action, + Details: upgradeDetails, + DesiredOutcome: desiredOutcome, + } + + markerBytes, err := yaml.Marshal(newMarkerSerializer(marker)) + if err != nil { + return errors.New(err, errors.TypeConfig, "failed to parse marker file") + } + + markerPath := markerFilePath(dataDirPath) + log.Infow("Writing upgrade marker file", "file.path", markerPath, "hash", marker.Hash, "prev_hash", marker.PrevHash) + if err := writeFile(markerPath, markerBytes, 0600); err != nil { + return goerrors.Join(err, errors.New(errors.TypeFilesystem, "failed to create update marker file", errors.M(errors.MetaKeyPath, markerPath))) + } + + if err := updateActiveCommit(log, paths.Top(), agent.hash, writeFile); err != nil { + return err + } - if err := UpdateActiveCommit(log, paths.Top(), agent.hash); err != nil { - return err + return nil } - - return nil } // UpdateActiveCommit updates active.commit file to point to active version. -func UpdateActiveCommit(log *logger.Logger, topDirPath, hash string) error { +func UpdateActiveCommit(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error { activeCommitPath := filepath.Join(topDirPath, agentCommitFile) log.Infow("Updating active commit", "file.path", activeCommitPath, "hash", hash) - if err := os.WriteFile(activeCommitPath, []byte(hash), 0600); err != nil { - return errors.New(err, errors.TypeFilesystem, "failed to update active commit", errors.M(errors.MetaKeyPath, activeCommitPath)) + if err := writeFile(activeCommitPath, []byte(hash), 0600); err != nil { + return goerrors.Join(err, errors.New(errors.TypeFilesystem, "failed to update active commit", errors.M(errors.MetaKeyPath, activeCommitPath))) } return nil diff --git a/internal/pkg/agent/application/upgrade/step_mark_test.go b/internal/pkg/agent/application/upgrade/step_mark_test.go index fc1731e7b24..34ac01e4888 100644 --- a/internal/pkg/agent/application/upgrade/step_mark_test.go +++ b/internal/pkg/agent/application/upgrade/step_mark_test.go @@ -5,6 +5,7 @@ package upgrade import ( + "errors" "os" "path/filepath" "testing" @@ -12,8 +13,11 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" + "github.com/elastic/elastic-agent/pkg/core/logger" + "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" ) func TestSaveAndLoadMarker_NoLoss(t *testing.T) { @@ -260,3 +264,96 @@ desired_outcome: true }) } } + +func TestMarkUpgrade(t *testing.T) { + log, _ := loggertest.New("test") + agent := agentInstall{ + version: "8.5.0", + hash: "abc123", + versionedHome: "home/v8.5.0", + } + previousAgent := agentInstall{ + version: "8.4.0", + hash: "xyz789", + versionedHome: "home/v8.4.0", + } + action := &fleetapi.ActionUpgrade{ + ActionID: "action-123", + ActionType: "UPGRADE", + Data: fleetapi.ActionUpgradeData{ + Version: "8.5.0", + SourceURI: "https://example.com/upgrade", + }, + } + upgradeDetails := details.NewDetails("8.5.0", details.StateScheduled, "action-123") + desiredOutcome := OUTCOME_UPGRADE + + testError := errors.New("test error") + + type testCase struct { + fileName string + expectedError error + markUpgrade markUpgradeFunc + } + + testCases := map[string]testCase{ + "should return error if it fails updating the active commit file": { + fileName: "commit", + expectedError: testError, + markUpgrade: markUpgradeProvider(func(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error { + return testError + }, func(name string, data []byte, perm os.FileMode) error { + return nil + }), + }, + "should return error if it fails writing to marker file": { + fileName: "marker", + expectedError: testError, + markUpgrade: markUpgradeProvider(func(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error { + return nil + }, func(name string, data []byte, perm os.FileMode) error { + return testError + }), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + baseDir := t.TempDir() + paths.SetTop(baseDir) + + err := tc.markUpgrade(log, paths.Data(), agent, previousAgent, action, upgradeDetails, desiredOutcome) + require.Error(t, err) + require.ErrorIs(t, err, tc.expectedError) + }) + } +} + +func TestUpdateActiveCommit(t *testing.T) { + log, _ := loggertest.New("test") + testError := errors.New("test error") + testCases := map[string]struct { + expectedError error + writeFileFunc writeFileFunc + }{ + "should return error if it fails writing to file": { + expectedError: testError, + writeFileFunc: func(name string, data []byte, perm os.FileMode) error { + return testError + }, + }, + "should not return error if it writes to file": { + expectedError: nil, + writeFileFunc: func(name string, data []byte, perm os.FileMode) error { + return nil + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + err := UpdateActiveCommit(log, paths.Top(), "hash", tc.writeFileFunc) + require.ErrorIs(t, err, tc.expectedError) + }) + } + +} diff --git a/internal/pkg/agent/application/upgrade/upgrade.go b/internal/pkg/agent/application/upgrade/upgrade.go index 6a00ac00a82..c30adb3489b 100644 --- a/internal/pkg/agent/application/upgrade/upgrade.go +++ b/internal/pkg/agent/application/upgrade/upgrade.go @@ -74,6 +74,9 @@ type unpackHandler interface { type copyActionStoreFunc func(log *logger.Logger, newHome string) error type copyRunDirectoryFunc func(log *logger.Logger, oldRunPath, newRunPath string) error type fileDirCopyFunc func(from, to string, opts ...copy.Options) error +type markUpgradeFunc func(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error +type changeSymlinkFunc func(log *logger.Logger, topDirPath, symlinkPath, newTarget string) error +type rollbackInstallFunc func(ctx context.Context, log *logger.Logger, topDirPath, versionedHome, oldVersionedHome string) error // Types used to abstract stdlib functions type mkdirAllFunc func(name string, perm fs.FileMode) error @@ -96,6 +99,9 @@ type Upgrader struct { extractAgentVersion func(metadata packageMetadata, upgradeVersion string) agentVersion copyActionStore copyActionStoreFunc copyRunDirectory copyRunDirectoryFunc + markUpgrade markUpgradeFunc + changeSymlink changeSymlinkFunc + rollbackInstall rollbackInstallFunc } // IsUpgradeable when agent is installed and running as a service or flag was provided. @@ -119,6 +125,9 @@ func NewUpgrader(log *logger.Logger, settings *artifact.Config, agentInfo info.A extractAgentVersion: extractAgentVersion, copyActionStore: copyActionStoreProvider(os.ReadFile, os.WriteFile), copyRunDirectory: copyRunDirectoryProvider(os.MkdirAll, copy.Copy), + markUpgrade: markUpgradeProvider(UpdateActiveCommit, os.WriteFile), + changeSymlink: changeSymlink, + rollbackInstall: rollbackInstall, }, nil } @@ -350,9 +359,9 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string return nil, fmt.Errorf("calculating home path relative to top, home: %q top: %q : %w", paths.Home(), paths.Top(), err) } - if err := changeSymlink(u.log, paths.Top(), symlinkPath, newPath); err != nil { + if err := u.changeSymlink(u.log, paths.Top(), symlinkPath, newPath); err != nil { u.log.Errorw("Rolling back: changing symlink failed", "error.message", err) - rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome) + rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome) return nil, goerrors.Join(err, rollbackErr) } @@ -375,13 +384,13 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string versionedHome: currentVersionedHome, } - if err := markUpgrade(u.log, + if err := u.markUpgrade(u.log, paths.Data(), // data dir to place the marker in current, // new agent version data previous, // old agent version data action, det, OUTCOME_UPGRADE); err != nil { u.log.Errorw("Rolling back: marking upgrade failed", "error.message", err) - rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome) + rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome) return nil, goerrors.Join(err, rollbackErr) } @@ -390,14 +399,14 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string var watcherCmd *exec.Cmd if watcherCmd, err = InvokeWatcher(u.log, watcherExecutable); err != nil { u.log.Errorw("Rolling back: starting watcher failed", "error.message", err) - rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome) + rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome) return nil, goerrors.Join(err, rollbackErr) } watcherWaitErr := waitForWatcher(ctx, u.log, markerFilePath(paths.Data()), watcherMaxWaitTime) if watcherWaitErr != nil { killWatcherErr := watcherCmd.Process.Kill() - rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome) + rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome) return nil, goerrors.Join(watcherWaitErr, killWatcherErr, rollbackErr) } diff --git a/internal/pkg/agent/application/upgrade/upgrade_test.go b/internal/pkg/agent/application/upgrade/upgrade_test.go index 227580a0f0c..f170d33e113 100644 --- a/internal/pkg/agent/application/upgrade/upgrade_test.go +++ b/internal/pkg/agent/application/upgrade/upgrade_test.go @@ -1438,6 +1438,81 @@ func TestUpgradeErrorHandling(t *testing.T) { } }, }, + "should return error if changeSymlink fails": { + isDiskSpaceErrorResult: false, + expectedError: testError, + upgraderMocker: func(upgrader *Upgrader) { + upgrader.artifactDownloader = &mockArtifactDownloader{} + upgrader.extractAgentVersion = func(metadata packageMetadata, upgradeVersion string) agentVersion { + return agentVersion{ + version: upgradeVersion, + snapshot: false, + hash: metadata.hash, + } + } + upgrader.unpacker = &mockUnpacker{ + returnPackageMetadata: packageMetadata{ + manifest: &v1.PackageManifest{}, + hash: "hash", + }, + returnUnpackResult: UnpackResult{ + Hash: "hash", + VersionedHome: "versionedHome", + }, + } + upgrader.copyActionStore = func(log *logger.Logger, newHome string) error { + return nil + } + upgrader.copyRunDirectory = func(log *logger.Logger, oldRunPath, newRunPath string) error { + return nil + } + upgrader.rollbackInstall = func(ctx context.Context, log *logger.Logger, topDirPath, versionedHome, oldVersionedHome string) error { + return nil + } + upgrader.changeSymlink = func(log *logger.Logger, topDirPath, symlinkPath, newTarget string) error { + return testError + } + }, + }, + "should return error if markUpgrade fails": { + isDiskSpaceErrorResult: false, + expectedError: testError, + upgraderMocker: func(upgrader *Upgrader) { + upgrader.artifactDownloader = &mockArtifactDownloader{} + upgrader.extractAgentVersion = func(metadata packageMetadata, upgradeVersion string) agentVersion { + return agentVersion{ + version: upgradeVersion, + snapshot: false, + hash: metadata.hash, + } + } + upgrader.unpacker = &mockUnpacker{ + returnPackageMetadata: packageMetadata{ + manifest: &v1.PackageManifest{}, + hash: "hash", + }, + returnUnpackResult: UnpackResult{ + Hash: "hash", + VersionedHome: "versionedHome", + }, + } + upgrader.copyActionStore = func(log *logger.Logger, newHome string) error { + return nil + } + upgrader.copyRunDirectory = func(log *logger.Logger, oldRunPath, newRunPath string) error { + return nil + } + upgrader.changeSymlink = func(log *logger.Logger, topDirPath, symlinkPath, newTarget string) error { + return nil + } + upgrader.rollbackInstall = func(ctx context.Context, log *logger.Logger, topDirPath, versionedHome, oldVersionedHome string) error { + return nil + } + upgrader.markUpgrade = func(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error { + return testError + } + }, + }, "should add disk space error to the error chain if downloadArtifact fails with disk space error": { isDiskSpaceErrorResult: true, expectedError: upgradeErrors.ErrInsufficientDiskSpace,