Skip to content

Commit

Permalink
Create prototype shipper-beat (#35318)
Browse files Browse the repository at this point in the history
* first pass at shipper beat

* finish shutdown, cleanup code

* cleanup management code

* try to make linter happy

* fix tests

* use built-in acker, clean up server code

* fix merge

* cleanup

* add check for shipper mode in processors

* add headers

* fix duration in windows

* fix linter

* clean up server

* cleanup, use env var for shipper mode

* remove debug statements
  • Loading branch information
fearful-symmetry authored May 11, 2023
1 parent e2bfb2f commit 327b5fe
Show file tree
Hide file tree
Showing 14 changed files with 857 additions and 28 deletions.
3 changes: 3 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ type Pipeline interface {
Connect() (Client, error)
}

// PipelineConnector wraps the Pipeline interface
type PipelineConnector = Pipeline

// Client holds a connection to the beats publisher pipeline
type Client interface {
// Publish the event
Publish(Event)
// PublishAll events specified in the Event array
PublishAll([]Event)
Close() error
}
Expand Down
27 changes: 18 additions & 9 deletions x-pack/filebeat/cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,25 @@ import (
)

func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
}
var modules []map[string]interface{}
var err error
if rawIn.Type == "shipper" { // place filebeat in "shipper mode", with one filebeat input per agent config input
modules, err = management.CreateShipperInput(rawIn, "logs", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating shipper config from raw expected config: %w", err)
}
} else {
modules, err = management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
}

// Extract the module name from the stream-level type
// these types are defined in the elastic-agent's specfiles
for iter := range modules {
if _, ok := modules[iter]["type"]; !ok {
modules[iter]["type"] = rawIn.Type
// Extract the module name from the stream-level type
// these types are defined in the elastic-agent's specfiles
for iter := range modules {
if _, ok := modules[iter]["type"]; !ok {
modules[iter]["type"] = rawIn.Type
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cmd

import (
"fmt"
"os"

fbcmd "github.com/elastic/beats/v7/filebeat/cmd"
cmd "github.com/elastic/beats/v7/libbeat/cmd"
Expand Down Expand Up @@ -44,6 +45,13 @@ func defaultProcessors() []mapstr.M {
// - add_cloud_metadata: ~
// - add_docker_metadata: ~
// - add_kubernetes_metadata: ~

// This gets called early enough that the CLI handling isn't properly initialized yet,
// so use an environment variable.
shipperEnv := os.Getenv("SHIPPER_MODE")
if shipperEnv == "True" {
return []mapstr.M{}
}
return []mapstr.M{
{
"add_host_metadata": mapstr.M{
Expand All @@ -54,4 +62,5 @@ func defaultProcessors() []mapstr.M {
{"add_docker_metadata": nil},
{"add_kubernetes_metadata": nil},
}

}
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/default-inputs/inputs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
"github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
"github.com/elastic/beats/v7/x-pack/filebeat/input/shipper"
"github.com/elastic/elastic-agent-libs/logp"
)

Expand All @@ -38,5 +39,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
awss3.Plugin(store),
awscloudwatch.Plugin(),
lumberjack.Plugin(),
shipper.Plugin(log, store),
}
}
30 changes: 30 additions & 0 deletions x-pack/filebeat/input/shipper/acker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package shipper

import (
"sync/atomic"
)

type shipperAcker struct {
persistedIndex uint64
}

func newShipperAcker() *shipperAcker {
return &shipperAcker{persistedIndex: 0}
}

// Update the input's persistedIndex by adding total to it.
// Despite the name, "total" here means an incremental total, i.e.
// the total number of events that are being acknowledged by this callback, not the total that have been sent overall.
// The acked parameter includes only those events that were successfully sent upstream rather than dropped by processors, etc.,
// but since we don't make that distinction in persistedIndex we can probably ignore it.
func (acker *shipperAcker) Track(_ int, total int) {
atomic.AddUint64(&acker.persistedIndex, uint64(total))
}

func (acker *shipperAcker) PersistedIndex() uint64 {
return acker.persistedIndex
}
63 changes: 63 additions & 0 deletions x-pack/filebeat/input/shipper/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package shipper

import (
"time"

"github.com/elastic/elastic-agent-libs/config"
)

// Instance represents all the config needed to start a single shipper input
// because a beat works fundamentally differently from the old shipper, we dont have to deal with async config that's being pieced together,
// this one config object recievewd on create has both the input and output config
type Instance struct {
// config for the shipper's gRPC input
Conn ConnectionConfig `config:",inline"`
Input InputConfig `config:",inline"`
}

// ConnectionConfig is the shipper-relevant portion of the config received from input units
type ConnectionConfig struct {
Server string `config:"server"`
InitialTimeout time.Duration `config:"grpc_setup_timeout"`
TLS TLS `config:"ssl"`
}

// TLS is TLS-specific shipper client settings
type TLS struct {
CAs []string `config:"certificate_authorities"`
Cert string `config:"certificate"`
Key string `config:"key"`
}

// InputConfig represents the config for a shipper input. This is the complete config for that input, mirrored and sent to us.
// This is more or less the same as the the proto.UnitExpectedConfig type, but that doesn't have `config` struct tags,
// so for the sake of quick prototyping we're just (roughly) duplicating the structure here, minus any fields the shipper doesn't need (for now)
type InputConfig struct {
ID string `config:"id"`
Type string `config:"type"`
Name string `config:"name"`
DataStream DataStream `config:"data_stream"`
// for now don't try to parse the streams,
// once we have a better idea of how per-stream processors work, we can find a better way to unpack this
Streams []Stream `config:"streams"`
}

// DataStream represents the datastream metadata from an input
type DataStream struct {
Dataset string `config:"dataset"`
Type string `config:"type"`
Namespace string `config:"namespace"`
}

// Stream represents a single stream present inside an input.
// this field is largely unpredictable and varies by input type,
// we're just grabbing the fields the shipper needs.
type Stream struct {
ID string `config:"id"`
Processors []*config.C `config:"processors"`
Index string `config:"index"`
}
Loading

0 comments on commit 327b5fe

Please sign in to comment.