Skip to content

Commit

Permalink
Improve logging for agent upgrades.
Browse files Browse the repository at this point in the history
Attempt to log every major step, and ensure upgrade errors are always
logged.
  • Loading branch information
cmacknz committed Sep 25, 2022
1 parent 28fab94 commit 4008440
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, acker store.Fle
}

_, err := h.upgrader.Upgrade(ctx, &upgradeAction{action}, true)
if err != nil {
// Always log upgrade failures at the error level. Action errors are logged at debug level
// by default higher up the stack in ActionDispatcher.Dispatch()
h.log.Errorw("Upgrade action failed", "error.message", err,
"action.version", action.Version, "action.source_uri", action.SourceURI, "action.id", action.ActionID,
"action.start_time", action.StartTime, "action.expiration", action.ActionExpiration)
}

return err
}

Expand Down
10 changes: 7 additions & 3 deletions internal/pkg/agent/application/upgrade/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ import (
"github.com/hashicorp/go-multierror"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

// preUpgradeCleanup will remove files that do not have the passed version number from the downloads directory.
func preUpgradeCleanup(version string) error {
files, err := os.ReadDir(paths.Downloads())
// cleanNonMatchingVersionsFromDownloads will remove files that do not have the passed version number from the downloads directory.
func cleanNonMatchingVersionsFromDownloads(log *logger.Logger, version string) error {
downloadsPath := paths.Downloads()
log.Infow("Cleaning up non-matching downloaded versions", "version", version, "downloads.path", downloadsPath)

files, err := os.ReadDir(downloadsPath)
if err != nil {
return fmt.Errorf("unable to read directory %q: %w", paths.Downloads(), err)
}
Expand Down
16 changes: 15 additions & 1 deletion internal/pkg/agent/application/upgrade/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"path/filepath"
"testing"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/core/logger"

"github.com/stretchr/testify/require"
)
Expand All @@ -31,7 +33,8 @@ func setupDir(t *testing.T) {

func TestPreUpgradeCleanup(t *testing.T) {
setupDir(t)
err := preUpgradeCleanup("8.4.0")
log := newErrorLogger(t)
err := cleanNonMatchingVersionsFromDownloads(log, "8.4.0")
require.NoError(t, err)

files, err := os.ReadDir(paths.Downloads())
Expand All @@ -42,3 +45,14 @@ func TestPreUpgradeCleanup(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte("hello, world!"), p)
}

func newErrorLogger(t *testing.T) *logger.Logger {
t.Helper()

loggerCfg := logger.DefaultLoggingConfig()
loggerCfg.Level = logp.ErrorLevel

log, err := logger.NewFromConfig("", loggerCfg, false)
require.NoError(t, err)
return log
}
7 changes: 4 additions & 3 deletions internal/pkg/agent/application/upgrade/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ const (
)

// Rollback rollbacks to previous version which was functioning before upgrade.
func Rollback(ctx context.Context, prevHash, currentHash string) error {
func Rollback(ctx context.Context, log *logger.Logger, prevHash string, currentHash string) error {
// change symlink
if err := ChangeSymlink(ctx, prevHash); err != nil {
if err := ChangeSymlink(ctx, log, prevHash); err != nil {
return err
}

// revert active commit
if err := UpdateActiveCommit(prevHash); err != nil {
if err := UpdateActiveCommit(log, prevHash); err != nil {
return err
}

Expand Down Expand Up @@ -113,6 +113,7 @@ func InvokeWatcher(log *logger.Logger) error {
}
}()

log.Debugw("Starting upgrade watcher", "path", cmd.Path, "args", cmd.Args, "env", cmd.Env, "dir", cmd.Dir)
return cmd.Start()
}

Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/agent/application/upgrade/step_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri
}
}

u.log.Infow("Downloading upgrade artifact", "version", version,
"source_uri", settings.SourceURI, "drop_path", settings.DropPath,
"target_path", settings.TargetDirectory, "install_path", settings.InstallPath)

verifier, err := newVerifier(version, u.log, &settings)
if err != nil {
return "", errors.New(err, "initiating verifier")
Expand Down
9 changes: 6 additions & 3 deletions internal/pkg/agent/application/upgrade/step_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

const markerFilename = ".update-marker"
Expand Down Expand Up @@ -91,7 +92,7 @@ func newMarkerSerializer(m *UpdateMarker) *updateMarkerSerializer {
}

// markUpgrade marks update happened so we can handle grace period
func (u *Upgrader) markUpgrade(_ context.Context, hash string, action Action) error {
func (u *Upgrader) markUpgrade(_ context.Context, log *logger.Logger, hash string, action Action) error {
prevVersion := release.Version()
prevHash := release.Commit()
if len(prevHash) > hashLen {
Expand All @@ -112,20 +113,22 @@ func (u *Upgrader) markUpgrade(_ context.Context, hash string, action Action) er
}

markerPath := markerFilePath()
log.Infow("Writing upgrade marker file", "file.path", markerPath, "hash", marker.Hash, "prev_hash", prevHash)
if err := ioutil.WriteFile(markerPath, markerBytes, 0600); err != nil {
return errors.New(err, errors.TypeFilesystem, "failed to create update marker file", errors.M(errors.MetaKeyPath, markerPath))
}

if err := UpdateActiveCommit(hash); err != nil {
if err := UpdateActiveCommit(log, hash); err != nil {
return err
}

return nil
}

// UpdateActiveCommit updates active.commit file to point to active version.
func UpdateActiveCommit(hash string) error {
func UpdateActiveCommit(log *logger.Logger, hash string) error {
activeCommitPath := filepath.Join(paths.Top(), agentCommitFile)
log.Infow("Updating active commit", "file.path", activeCommitPath, "hash", hash)
if err := ioutil.WriteFile(activeCommitPath, []byte(hash), 0600); err != nil {
return errors.New(err, errors.TypeFilesystem, "failed to update active commit", errors.M(errors.MetaKeyPath, activeCommitPath))
}
Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/agent/application/upgrade/step_relink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (
"github.com/elastic/elastic-agent-libs/file"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

// ChangeSymlink updates symlink paths to match current version.
func ChangeSymlink(ctx context.Context, targetHash string) error {
func ChangeSymlink(ctx context.Context, log *logger.Logger, targetHash string) error {
// create symlink to elastic-agent-{hash}
hashedDir := fmt.Sprintf("%s-%s", agentName, targetHash)

Expand All @@ -31,6 +32,7 @@ func ChangeSymlink(ctx context.Context, targetHash string) error {
}

prevNewPath := prevSymlinkPath()
log.Infow("Changing symlink", "symlink_path", symlinkPath, "new_path", newPath, "prev_path", prevNewPath)

// remove symlink to avoid upgrade failures
if err := os.Remove(prevNewPath); !os.IsNotExist(err) {
Expand Down
16 changes: 12 additions & 4 deletions internal/pkg/agent/application/upgrade/step_unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

// unpack unpacks archive correctly, skips root (symlink, config...) unpacks data/*
Expand All @@ -30,18 +31,21 @@ func (u *Upgrader) unpack(ctx context.Context, version, archivePath string) (str
var hash string
var err error
if runtime.GOOS == "windows" {
hash, err = unzip(version, archivePath)
hash, err = unzip(u.log, version, archivePath)
} else {
hash, err = untar(version, archivePath)
hash, err = untar(u.log, version, archivePath)
}

if err != nil {
u.log.Infow("Failed to unpack upgrade artifact", "error.message", err, "version", version, "file.path", archivePath, "hash", hash)
return "", err
}

u.log.Infow("Unpacked upgrade artifact", "version", version, "file.path", archivePath, "hash", hash)
return hash, nil
}

func unzip(version, archivePath string) (string, error) {
func unzip(log *logger.Logger, version string, archivePath string) (string, error) {
var hash, rootDir string
r, err := zip.OpenReader(archivePath)
if err != nil {
Expand Down Expand Up @@ -82,8 +86,10 @@ func unzip(version, archivePath string) (string, error) {
path := filepath.Join(paths.Data(), strings.TrimPrefix(fileName, "data/"))

if f.FileInfo().IsDir() {
log.Debugw("Unpacking directory", "archive", "zip", "file.path", path)
os.MkdirAll(path, f.Mode())
} else {
log.Debugw("Unpacking file", "archive", "zip", "file.path", path)
os.MkdirAll(filepath.Dir(path), f.Mode())
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
Expand Down Expand Up @@ -119,7 +125,7 @@ func unzip(version, archivePath string) (string, error) {
return hash, nil
}

func untar(version, archivePath string) (string, error) {
func untar(log *logger.Logger, version string, archivePath string) (string, error) {
r, err := os.Open(archivePath)
if err != nil {
return "", errors.New(fmt.Sprintf("artifact for 'elastic-agent' version '%s' could not be found at '%s'", version, archivePath), errors.TypeFilesystem, errors.M(errors.MetaKeyPath, archivePath))
Expand Down Expand Up @@ -183,6 +189,7 @@ func untar(version, archivePath string) (string, error) {
mode := fi.Mode()
switch {
case mode.IsRegular():
log.Debugw("Unpacking file", "archive", "tar", "file.path", abs)
// just to be sure, it should already be created by Dir type
if err := os.MkdirAll(filepath.Dir(abs), 0755); err != nil {
return "", errors.New(err, "TarInstaller: creating directory for file "+abs, errors.TypeFilesystem, errors.M(errors.MetaKeyPath, abs))
Expand All @@ -201,6 +208,7 @@ func untar(version, archivePath string) (string, error) {
return "", fmt.Errorf("TarInstaller: error writing to %s: %w", abs, err)
}
case mode.IsDir():
log.Debugw("Unpacking directory", "archive", "tar", "file.path", abs)
if err := os.MkdirAll(abs, 0755); err != nil {
return "", errors.New(err, "TarInstaller: creating directory for file "+abs, errors.TypeFilesystem, errors.M(errors.MetaKeyPath, abs))
}
Expand Down
48 changes: 29 additions & 19 deletions internal/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (u *Upgrader) Upgradeable() bool {
// Upgrade upgrades running agent, function returns shutdown callback if some needs to be executed for cases when
// reexec is called by caller.
func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (_ reexec.ShutdownCallbackFn, err error) {
u.log.Infow("Upgrading agent", "version", a.Version(), "source_uri", a.SourceURI())
span, ctx := apm.StartSpan(ctx, "upgrade", "app.internal")
defer span.End()
// report failed
Expand All @@ -126,9 +127,9 @@ func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (_ ree
"running under control of the systems supervisor")
}

err = preUpgradeCleanup(u.agentInfo.Version())
err = cleanNonMatchingVersionsFromDownloads(u.log, u.agentInfo.Version())
if err != nil {
u.log.Errorf("Unable to clean downloads dir %q before update: %v", paths.Downloads(), err)
u.log.Errorw("Unable to clean downloads before update", "error.message", err, "downloads.path", paths.Downloads())
}

if u.caps != nil {
Expand All @@ -142,10 +143,10 @@ func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (_ ree
sourceURI := u.sourceURI(a.SourceURI())
archivePath, err := u.downloadArtifact(ctx, a.Version(), sourceURI)
if err != nil {
// Run the same preUpgradeCleanup task to get rid of any newly downloaded files
// Run the same pre-upgrade cleanup task to get rid of any newly downloaded files
// This may have an issue if users are upgrading to the same version number.
if dErr := preUpgradeCleanup(u.agentInfo.Version()); dErr != nil {
u.log.Errorf("Unable to remove file after verification failure: %v", dErr)
if dErr := cleanNonMatchingVersionsFromDownloads(u.log, u.agentInfo.Version()); dErr != nil {
u.log.Errorw("Unable to remove file after verification failure", "error.message", dErr)
}
return nil, err
}
Expand All @@ -169,39 +170,47 @@ func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (_ ree
return nil, nil
}

if err := copyActionStore(newHash); err != nil {
if err := copyActionStore(u.log, newHash); err != nil {
return nil, errors.New(err, "failed to copy action store")
}

if err := ChangeSymlink(ctx, newHash); err != nil {
rollbackInstall(ctx, newHash)
if err := ChangeSymlink(ctx, u.log, newHash); err != nil {
u.log.Errorw("Rolling back: changing symlink failed", "error.message", err)
rollbackInstall(ctx, u.log, newHash)
return nil, err
}

if err := u.markUpgrade(ctx, newHash, a); err != nil {
rollbackInstall(ctx, newHash)
if err := u.markUpgrade(ctx, u.log, newHash, a); err != nil {
u.log.Errorw("Rolling back: marking upgrade failed", "error.message", err)
rollbackInstall(ctx, u.log, newHash)
return nil, err
}

if err := InvokeWatcher(u.log); err != nil {
rollbackInstall(ctx, newHash)
u.log.Errorw("Rolling back: starting watcher failed", "error.message", err)
rollbackInstall(ctx, u.log, newHash)
return nil, errors.New("failed to invoke rollback watcher", err)
}

cb := shutdownCallback(u.log, paths.Home(), release.Version(), a.Version(), release.TrimCommit(newHash))
trimmedNewHash := release.TrimCommit(newHash)
cb := shutdownCallback(u.log, paths.Home(), release.Version(), a.Version(), trimmedNewHash)
if reexecNow {
u.log.Infow("Removing downloads directory", "file.path", paths.Downloads(), "rexec", reexecNow)
err = os.RemoveAll(paths.Downloads())
if err != nil {
u.log.Errorf("Unable to clean downloads dir %q after update: %v", paths.Downloads(), err)
u.log.Errorw("Unable to clean downloads after update", "error.message", err, "downloads.path", paths.Downloads())
}
u.log.Infow("Restarting after upgrade", "new_version", release.Version(), "prev_version", a.Version(),
"hash", trimmedNewHash, "home", paths.Home())
u.reexec.ReExec(cb)
return nil, nil
}

// Clean everything from the downloads dir
u.log.Infow("Removing downloads directory", "file.path", paths.Downloads(), "rexec", reexecNow)
err = os.RemoveAll(paths.Downloads())
if err != nil {
u.log.Errorf("Unable to clean downloads dir %q after update: %v", paths.Downloads(), err)
u.log.Errorw("Unable to clean downloads after update", "error.message", err, "file.path", paths.Downloads())
}

return cb, nil
Expand Down Expand Up @@ -283,19 +292,20 @@ func (u *Upgrader) reportUpdating(version string) {
)
}

func rollbackInstall(ctx context.Context, hash string) {
func rollbackInstall(ctx context.Context, log *logger.Logger, hash string) {
os.RemoveAll(filepath.Join(paths.Data(), fmt.Sprintf("%s-%s", agentName, hash)))
_ = ChangeSymlink(ctx, release.ShortCommit())
_ = ChangeSymlink(ctx, log, release.ShortCommit())
}

func copyActionStore(newHash string) error {
func copyActionStore(log *logger.Logger, newHash string) error {
// copies legacy action_store.yml, state.yml and state.enc encrypted file if exists
storePaths := []string{paths.AgentActionStoreFile(), paths.AgentStateStoreYmlFile(), paths.AgentStateStoreFile()}
newHome := filepath.Join(filepath.Dir(paths.Home()), fmt.Sprintf("%s-%s", agentName, newHash))
log.Infow("Copying action store", "new_home_path", newHome)

for _, currentActionStorePath := range storePaths {
newHome := filepath.Join(filepath.Dir(paths.Home()), fmt.Sprintf("%s-%s", agentName, newHash))
newActionStorePath := filepath.Join(newHome, filepath.Base(currentActionStorePath))

log.Debugw("Copying action store path", "from", currentActionStorePath, "to", newActionStorePath)
currentActionStore, err := ioutil.ReadFile(currentActionStorePath)
if os.IsNotExist(err) {
// nothing to copy
Expand Down
Loading

0 comments on commit 4008440

Please sign in to comment.