Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V2] Enable support for shippers #1527

Merged
merged 17 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ fleet.enc.lock
# Files generated with the bump version automations
*.bck


# agent
build/
elastic-agent
Expand All @@ -54,9 +53,5 @@ elastic-agent.yml.*
fleet.yml
fleet.yml.lock
fleet.yml.old
internal/pkg/agent/application/fleet.yml
internal/pkg/agent/transpiler/tests/exec-1.0-darwin-x86_64/exec
pkg/component/fake/fake

# VSCode
/.vscode
pkg/component/fake/component/component
pkg/component/fake/shipper/shipper
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add experimental support for running the elastic-agent-shipper

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
#description:

# Affected component; a word indicating the component this changeset affects.
component:

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 1527

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 219
2 changes: 2 additions & 0 deletions control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ message DiagnosticAgentResponse {

// DiagnosticUnitRequest specifies a specific unit to gather diagnostics from.
message DiagnosticUnitRequest {
// ID of the component.
string component_id = 1;
// Type of unit.
UnitType unit_type = 2;
// ID of the unit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker acker.A
}

state := h.coord.State(false)
unit, ok := findUnitFromInputType(state, action.InputType)
comp, unit, ok := findUnitFromInputType(state, action.InputType)
if !ok {
// If the matching action is not found ack the action with the error for action result document
action.StartedAt = time.Now().UTC().Format(time.RFC3339Nano)
Expand Down Expand Up @@ -78,7 +78,7 @@ func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker acker.A
h.log.Debugf("handlerAppAction: action '%v' started with timeout: %v", action.ActionType, timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err = h.coord.PerformAction(ctx, unit, action.InputType, params)
res, err = h.coord.PerformAction(ctx, comp, unit, action.InputType, params)
}
end := time.Now().UTC()

Expand Down Expand Up @@ -151,13 +151,13 @@ func readMapString(m map[string]interface{}, key string, def string) string {
return def
}

func findUnitFromInputType(state coordinator.State, inputType string) (component.Unit, bool) {
func findUnitFromInputType(state coordinator.State, inputType string) (component.Component, component.Unit, bool) {
for _, comp := range state.Components {
for _, unit := range comp.Component.Units {
if unit.Type == client.UnitTypeInput && unit.Config != nil && unit.Config.Type == inputType {
return unit, true
return comp.Component, unit, true
}
}
}
return component.Unit{}, false
return component.Component{}, component.Unit{}, false
}
13 changes: 7 additions & 6 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ type RuntimeManager interface {
State() []runtime.ComponentComponentState

// PerformAction executes an action on a unit.
PerformAction(ctx context.Context, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error)
PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error)

// SubscribeAll provides an interface to watch for changes in all components.
SubscribeAll(context.Context) *runtime.SubscriptionAll

// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
// it performs diagnostics for all current units.
PerformDiagnostics(context.Context, ...component.Unit) []runtime.ComponentUnitDiagnostic
PerformDiagnostics(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic
}

// ConfigChange provides an interface for receiving a new configuration.
Expand Down Expand Up @@ -285,19 +285,20 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
return nil
}

// AckUpgrade performs acknowledgement for upgrade.
func (c *Coordinator) AckUpgrade(ctx context.Context, acker acker.Acker) error {
return c.upgradeMgr.Ack(ctx, acker)
}

// PerformAction executes an action on a unit.
func (c *Coordinator) PerformAction(ctx context.Context, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) {
return c.runtimeMgr.PerformAction(ctx, unit, name, params)
func (c *Coordinator) PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) {
return c.runtimeMgr.PerformAction(ctx, comp, unit, name, params)
}

// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
// it performs diagnostics for all current units.
func (c *Coordinator) PerformDiagnostics(ctx context.Context, units ...component.Unit) []runtime.ComponentUnitDiagnostic {
return c.runtimeMgr.PerformDiagnostics(ctx, units...)
func (c *Coordinator) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
return c.runtimeMgr.PerformDiagnostics(ctx, req...)
}

// Run runs the coordinator.
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/fleet_server_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var injectFleetServerInput = config.MustNewConfigFrom(map[string]interface{}{
func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) coordinator.ComponentsModifier {
return func(comps []component.Component, _ map[string]interface{}) ([]component.Component, error) {
for i, comp := range comps {
if comp.Spec.InputType == fleetServer {
if comp.InputSpec != nil && comp.InputSpec.InputType == fleetServer {
for j, unit := range comp.Units {
if unit.Type == client.UnitTypeOutput && unit.Config.Type == elasticsearch {
unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), &serverCfg.Output.Elasticsearch)
Expand Down Expand Up @@ -89,7 +89,7 @@ func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) co
func EndpointComponentModifier(fleetCfg *configuration.FleetAgentConfig) coordinator.ComponentsModifier {
return func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) {
for i, comp := range comps {
if comp.Spec.InputType == endpoint {
if comp.InputSpec != nil && comp.InputSpec.InputType == endpoint {
for j, unit := range comp.Units {
if unit.Type == client.UnitTypeInput && unit.Config.Type == endpoint {
unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), map[string]interface{}{"fleet": fleetCfg})
Expand Down
10 changes: 9 additions & 1 deletion internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,19 @@ func (f *fleetGateway) convertToCheckinComponents(components []runtime.Component
component := item.Component
state := item.State

var shipperReference *fleetapi.CheckinShipperReference
if component.Shipper != nil {
shipperReference = &fleetapi.CheckinShipperReference{
ComponentID: component.Shipper.ComponentID,
UnitID: component.Shipper.UnitID,
}
}
checkinComponent := fleetapi.CheckinComponent{
ID: component.ID,
Type: component.Spec.InputType,
Type: component.Type(),
Status: stateString(state.State),
Message: state.Message,
Shipper: shipperReference,
}

if state.Units != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (m *managedConfigManager) waitForFleetServer(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case compState := <-sub.Ch():
if compState.Component.Spec.InputType == "fleet-server" {
if compState.Component.InputSpec != nil && compState.Component.InputSpec.InputType == "fleet-server" {
if fleetServerRunning(compState.State) {
m.log.With("state", compState.State).Debugf("Fleet Server is running")
return nil
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/agent/application/paths/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ const (
// InstallPath is the installation path using for install command.
InstallPath = "/opt/Elastic/Agent"

// SocketPath is the socket path used when installed.
SocketPath = "unix:///run/elastic-agent.sock"
// ControlSocketPath is the control socket path used when installed.
ControlSocketPath = "unix:///run/elastic-agent.sock"

// ShipperSocketPipePattern is the socket path used when installed for a shipper pipe.
ShipperSocketPipePattern = "unix:///run/elastic-agent-%s-pipe.sock"

// ServiceName is the service name when installed.
ServiceName = "elastic-agent"
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/agent/application/paths/paths_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ const (
// InstallPath is the installation path using for install command.
InstallPath = "/Library/Elastic/Agent"

// SocketPath is the socket path used when installed.
SocketPath = "unix:///var/run/elastic-agent.sock"
// ControlSocketPath is the control socket path used when installed.
ControlSocketPath = "unix:///var/run/elastic-agent.sock"

// ShipperSocketPipePattern is the socket path used when installed for a shipper pipe.
ShipperSocketPipePattern = "unix:///var/run/elastic-agent-%s-pipe.sock"

// ServiceName is the service name when installed.
ServiceName = "co.elastic.elastic-agent"
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/agent/application/paths/paths_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ const (
// InstallPath is the installation path using for install command.
InstallPath = `C:\Program Files\Elastic\Agent`

// SocketPath is the socket path used when installed.
SocketPath = `\\.\pipe\elastic-agent-system`
// ControlSocketPath is the control socket path used when installed.
ControlSocketPath = `\\.\pipe\elastic-agent-system`

// ShipperSocketPipePattern is the socket path used when installed for a shipper pipe.
ShipperSocketPipePattern = `\\.\pipe\elastic-agent-%s-pipe.sock`

// ServiceName is the service name when installed.
ServiceName = "Elastic Agent"
Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponen
return fmt.Errorf("unable to find unit with ID: %s/%s", compID, unitID)
}
if !opts.showSpec {
comp.Spec = component.InputRuntimeSpec{}
comp.InputSpec = nil
comp.ShipperSpec = nil
}
if !opts.showConfig {
for key, unit := range comp.Units {
Expand All @@ -314,7 +315,8 @@ func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponen
// Hide runtime specification unless toggled on.
if !opts.showSpec {
for i, comp := range comps {
comp.Spec = component.InputRuntimeSpec{}
comp.InputSpec = nil
comp.ShipperSpec = nil
comps[i] = comp
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/control/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
func Address() string {
// when installed the control address is fixed
if info.RunningInstalled() {
return paths.SocketPath
return paths.ControlSocketPath
}

// unix socket path must be less than 104 characters
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/control/addr_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
func Address() string {
// when installed the control address is fixed
if info.RunningInstalled() {
return paths.SocketPath
return paths.ControlSocketPath
}

// not install, adjust the path based on data path
Expand Down
11 changes: 7 additions & 4 deletions internal/pkg/agent/control/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Version struct {
Snapshot bool `json:"snapshot" yaml:"snapshot"`
}

// ComponentVersionInfo is the version information for the component.
type ComponentVersionInfo struct {
// Name of the component.
Name string `json:"name" yaml:"name"`
Expand Down Expand Up @@ -115,8 +116,9 @@ type DiagnosticFileResult struct {

// DiagnosticUnitRequest allows a specific unit to be targeted for diagnostics.
type DiagnosticUnitRequest struct {
UnitID string
UnitType UnitType
ComponentID string
UnitID string
UnitType UnitType
}

// DiagnosticUnitResult is a set of results for a unit.
Expand Down Expand Up @@ -308,8 +310,9 @@ func (c *client) DiagnosticUnits(ctx context.Context, units ...DiagnosticUnitReq
reqs := make([]*cproto.DiagnosticUnitRequest, 0, len(units))
for _, u := range units {
reqs = append(reqs, &cproto.DiagnosticUnitRequest{
UnitType: u.UnitType,
UnitId: u.UnitID,
ComponentId: u.ComponentID,
UnitType: u.UnitType,
UnitId: u.UnitID,
})
}

Expand Down
Loading