Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Add upgrade CLI to initiate upgrade of Agent locally #21425

Merged
merged 7 commits into from
Oct 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@
- Send `fleet.host.id` to Endpoint Security {pull}21042[21042]
- Add `install` and `uninstall` subcommands {pull}21206[21206]
- Send updating state {pull}21461[21461]
- Add `upgrade` subcommand to perform upgrade of installed Elastic Agent {pull}21425[21425]
12 changes: 9 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/upgrade"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/warn"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
Expand All @@ -25,8 +26,12 @@ type reexecManager interface {
ReExec(argOverrides ...string)
}

type upgraderControl interface {
SetUpgrader(upgrader *upgrade.Upgrader)
}

// New creates a new Agent and bootstrap the required subsystem.
func New(log *logger.Logger, pathConfigFile string, reexec reexecManager) (Application, error) {
func New(log *logger.Logger, pathConfigFile string, reexec reexecManager, uc upgraderControl) (Application, error) {
// Load configuration from disk to understand in which mode of operation
// we must start the elastic-agent, the mode of operation cannot be changed without restarting the
// elastic-agent.
Expand All @@ -39,14 +44,15 @@ func New(log *logger.Logger, pathConfigFile string, reexec reexecManager) (Appli
return nil, err
}

return createApplication(log, pathConfigFile, rawConfig, reexec)
return createApplication(log, pathConfigFile, rawConfig, reexec, uc)
}

func createApplication(
log *logger.Logger,
pathConfigFile string,
rawConfig *config.Config,
reexec reexecManager,
uc upgraderControl,
) (Application, error) {
warn.LogNotGA(log)
log.Info("Detecting execution mode")
Expand All @@ -59,7 +65,7 @@ func createApplication(

if isStandalone(cfg.Fleet) {
log.Info("Agent is managed locally")
return newLocal(ctx, log, pathConfigFile, rawConfig)
return newLocal(ctx, log, pathConfigFile, rawConfig, reexec, uc)
}

log.Info("Agent is managed by Fleet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,21 @@ func (h *handlerUpgrade) Handle(ctx context.Context, a action, acker fleetAcker)
return fmt.Errorf("invalid type, expected ActionUpgrade and received %T", a)
}

return h.upgrader.Upgrade(ctx, action)
return h.upgrader.Upgrade(ctx, &upgradeAction{action}, true)
}

type upgradeAction struct {
*fleetapi.ActionUpgrade
}

func (a *upgradeAction) Version() string {
return a.ActionUpgrade.Version
}

func (a *upgradeAction) SourceURI() string {
return a.ActionUpgrade.SourceURI
}

func (a *upgradeAction) FleetAction() *fleetapi.ActionUpgrade {
return a.ActionUpgrade
}
14 changes: 14 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/upgrade"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand Down Expand Up @@ -60,6 +61,8 @@ func newLocal(
log *logger.Logger,
pathConfigFile string,
rawConfig *config.Config,
reexec reexecManager,
uc upgraderControl,
) (*Local, error) {
cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
Expand Down Expand Up @@ -135,6 +138,17 @@ func newLocal(

localApplication.source = cfgSource

// create a upgrader to use in local mode
upgrader := upgrade.NewUpgrader(
agentInfo,
cfg.Settings.DownloadConfig,
log,
[]context.CancelFunc{localApplication.cancelCtxFn},
reexec,
newNoopAcker(),
reporter)
uc.SetUpgrader(upgrader)

return localApplication, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package upgrade

import (
"context"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
downloader "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download/localremote"
Expand All @@ -16,7 +17,13 @@ func (u *Upgrader) downloadArtifact(ctx context.Context, version, sourceURI stri
// do not update source config
settings := *u.settings
if sourceURI != "" {
settings.SourceURI = sourceURI
if strings.HasPrefix(sourceURI, "file://") {
// update the DropPath so the fs.Downloader can download from this
// path instead of looking into the installed downloads directory
settings.DropPath = strings.TrimPrefix(sourceURI, "file://")
} else {
settings.SourceURI = sourceURI
}
}

allowEmptyPgp, pgp := release.PGP()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type updateMarker struct {
}

// markUpgrade marks update happened so we can handle grace period
func (h *Upgrader) markUpgrade(ctx context.Context, hash string, action *fleetapi.ActionUpgrade) error {
func (h *Upgrader) markUpgrade(ctx context.Context, hash string, action Action) error {
prevVersion := release.Version()
prevHash := release.Commit()
if len(prevHash) > hashLen {
Expand All @@ -49,7 +49,7 @@ func (h *Upgrader) markUpgrade(ctx context.Context, hash string, action *fleetap
UpdatedOn: time.Now(),
PrevVersion: prevVersion,
PrevHash: prevHash,
Action: action,
Action: action.FleetAction(),
}

markerBytes, err := yaml.Marshal(marker)
Expand Down
37 changes: 27 additions & 10 deletions x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ type Upgrader struct {
upgradeable bool
}

// Action is the upgrade action state.
type Action interface {
// Version to upgrade to.
Version() string
// SourceURI for download.
SourceURI() string
// FleetAction is the action from fleet that started the action (optional).
FleetAction() *fleetapi.ActionUpgrade
}

type reexecManager interface {
ReExec(argOverrides ...string)
}
Expand All @@ -60,13 +70,14 @@ type stateReporter interface {
// NewUpgrader creates an upgrader which is capable of performing upgrade operation
func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter) *Upgrader {
return &Upgrader{
agentInfo: agentInfo,
settings: settings,
log: log,
closers: closers,
reexec: reexec,
acker: a,
reporter: r,
upgradeable: getUpgradable(),
upgradeable: getUpgradeable(),
}
}

Expand All @@ -76,11 +87,13 @@ func (u *Upgrader) Upgradeable() bool {
}

// Upgrade upgrades running agent
func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err error) {
func (u *Upgrader) Upgrade(ctx context.Context, a Action, reexecNow bool) (err error) {
// report failed
defer func() {
if err != nil {
u.reportFailure(ctx, a, err)
if action := a.FleetAction(); action != nil {
u.reportFailure(ctx, action, err)
}
}
}()

Expand All @@ -90,15 +103,15 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err
"running under control of the systems supervisor")
}

u.reportUpdating(a.Version)
u.reportUpdating(a.Version())

sourceURI, err := u.sourceURI(a.Version, a.SourceURI)
archivePath, err := u.downloadArtifact(ctx, a.Version, sourceURI)
sourceURI, err := u.sourceURI(a.Version(), a.SourceURI())
archivePath, err := u.downloadArtifact(ctx, a.Version(), sourceURI)
if err != nil {
return err
}

newHash, err := u.unpack(ctx, a.Version, archivePath)
newHash, err := u.unpack(ctx, a.Version(), archivePath)
if err != nil {
return err
}
Expand All @@ -109,7 +122,9 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err

if strings.HasPrefix(release.Commit(), newHash) {
// not an error
u.ackAction(ctx, a)
if action := a.FleetAction(); action != nil {
u.ackAction(ctx, action)
}
u.log.Warn("upgrading to same version")
return nil
}
Expand All @@ -128,7 +143,9 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err
return err
}

u.reexec.ReExec()
if reexecNow {
u.reexec.ReExec()
}
return nil
}

Expand Down Expand Up @@ -224,7 +241,7 @@ func rollbackInstall(hash string) {
os.RemoveAll(filepath.Join(paths.Data(), fmt.Sprintf("%s-%s", agentName, hash)))
}

func getUpgradable() bool {
func getUpgradeable() bool {
// only upgradeable if running from Agent installer and running under the
// control of the system supervisor (or built specifically with upgrading enabled)
return release.Upgradeable() || (install.RunningInstalled() && install.RunningUnderSupervisor())
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command {
cmd.AddCommand(run)
cmd.AddCommand(newInstallCommandWithArgs(flags, args, streams))
cmd.AddCommand(newUninstallCommandWithArgs(flags, args, streams))
cmd.AddCommand(newUpgradeCommandWithArgs(flags, args, streams))
cmd.AddCommand(newEnrollCommandWithArgs(flags, args, streams))
cmd.AddCommand(newInspectCommandWithArgs(flags, args, streams))

Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { // Windows: Mark se
rex := reexec.NewManager(rexLogger, execPath)

// start the control listener
control := server.New(logger.Named("control"), rex)
control := server.New(logger.Named("control"), rex, nil)
if err := control.Start(); err != nil {
return err
}
defer control.Stop()

app, err := application.New(logger, pathConfigFile, rex)
app, err := application.New(logger, pathConfigFile, rex, control)
if err != nil {
return err
}
Expand Down
56 changes: 56 additions & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cmd

import (
"context"
"fmt"
"os"

"github.com/spf13/cobra"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli"
)

func newUpgradeCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStreams) *cobra.Command {
cmd := &cobra.Command{
Use: "upgrade <version>",
Short: "Upgrade the currently running Elastic Agent to the specified version",
Args: cobra.ExactArgs(1),
Run: func(c *cobra.Command, args []string) {
if err := upgradeCmd(streams, c, flags, args); err != nil {
fmt.Fprintf(streams.Err, "%v\n", err)
os.Exit(1)
}
},
}

cmd.Flags().StringP("source-uri", "s", "", "Source URI to download the new version from")

return cmd
}

func upgradeCmd(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args []string) error {
fmt.Fprintln(streams.Out, "The upgrade process of Elastic Agent is currently EXPERIMENTAL and should not be used in production")

version := args[0]
sourceURI, _ := cmd.Flags().GetString("source-uri")

c := client.New()
err := c.Connect(context.Background())
if err != nil {
return errors.New(err, "Failed communicating to running daemon", errors.TypeNetwork, errors.M("socket", control.Address()))
}
defer c.Disconnect()
version, err = c.Upgrade(context.Background(), version, sourceURI)
if err != nil {
return errors.New(err, "Failed trigger upgrade of daemon")
}
fmt.Fprintf(streams.Out, "Upgrade triggered to version %s, Elastic Agent is currently restarting\n", version)
return nil
}
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

func TestServerClient_Version(t *testing.T) {
srv := server.New(newErrorLogger(t), nil)
srv := server.New(newErrorLogger(t), nil, nil)
err := srv.Start()
require.NoError(t, err)
defer srv.Stop()
Expand Down
Loading