From 327b5fe8215c73b526b9528b91aa68b26ccb843f Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Thu, 11 May 2023 13:00:48 -0700 Subject: [PATCH] Create prototype shipper-beat (#35318) * first pass at shipper beat * finish shutdown, cleanup code * cleanup management code * try to make linter happy * fix tests * use built-in acker, clean up server code * fix merge * cleanup * add check for shipper mode in processors * add headers * fix duration in windows * fix linter * clean up server * cleanup, use env var for shipper mode * remove debug statements --- libbeat/beat/pipeline.go | 3 + x-pack/filebeat/cmd/agent.go | 27 +- x-pack/filebeat/cmd/root.go | 9 + .../input/default-inputs/inputs_other.go | 2 + x-pack/filebeat/input/shipper/acker.go | 30 ++ x-pack/filebeat/input/shipper/config.go | 63 ++++ x-pack/filebeat/input/shipper/input.go | 283 ++++++++++++++++++ x-pack/filebeat/input/shipper/server.go | 223 ++++++++++++++ x-pack/filebeat/input/shipper/srv_unix.go | 17 ++ x-pack/filebeat/input/shipper/srv_windows.go | 80 +++++ .../filebeat/input/shipper/tools/test_unix.go | 27 ++ .../input/shipper/tools/test_windows.go | 27 ++ x-pack/filebeat/input/shipper/tools/tools.go | 17 ++ x-pack/libbeat/management/generate.go | 77 +++-- 14 files changed, 857 insertions(+), 28 deletions(-) create mode 100644 x-pack/filebeat/input/shipper/acker.go create mode 100644 x-pack/filebeat/input/shipper/config.go create mode 100644 x-pack/filebeat/input/shipper/input.go create mode 100644 x-pack/filebeat/input/shipper/server.go create mode 100644 x-pack/filebeat/input/shipper/srv_unix.go create mode 100644 x-pack/filebeat/input/shipper/srv_windows.go create mode 100644 x-pack/filebeat/input/shipper/tools/test_unix.go create mode 100644 x-pack/filebeat/input/shipper/tools/test_windows.go create mode 100644 x-pack/filebeat/input/shipper/tools/tools.go diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index aed27b3e2ed1..7b957099f6cc 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -30,11 +30,14 @@ type Pipeline interface { Connect() (Client, error) } +// PipelineConnector wraps the Pipeline interface type PipelineConnector = Pipeline // Client holds a connection to the beats publisher pipeline type Client interface { + // Publish the event Publish(Event) + // PublishAll events specified in the Event array PublishAll([]Event) Close() error } diff --git a/x-pack/filebeat/cmd/agent.go b/x-pack/filebeat/cmd/agent.go index 9a3f787eab69..e7125ab0ffab 100644 --- a/x-pack/filebeat/cmd/agent.go +++ b/x-pack/filebeat/cmd/agent.go @@ -14,16 +14,25 @@ import ( ) func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { - modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo) - if err != nil { - return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) - } + var modules []map[string]interface{} + var err error + if rawIn.Type == "shipper" { // place filebeat in "shipper mode", with one filebeat input per agent config input + modules, err = management.CreateShipperInput(rawIn, "logs", agentInfo) + if err != nil { + return nil, fmt.Errorf("error creating shipper config from raw expected config: %w", err) + } + } else { + modules, err = management.CreateInputsFromStreams(rawIn, "logs", agentInfo) + if err != nil { + return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) + } - // Extract the module name from the stream-level type - // these types are defined in the elastic-agent's specfiles - for iter := range modules { - if _, ok := modules[iter]["type"]; !ok { - modules[iter]["type"] = rawIn.Type + // Extract the module name from the stream-level type + // these types are defined in the elastic-agent's specfiles + for iter := range modules { + if _, ok := modules[iter]["type"]; !ok { + modules[iter]["type"] = rawIn.Type + } } } diff --git a/x-pack/filebeat/cmd/root.go b/x-pack/filebeat/cmd/root.go index 9489fdb8a934..77dadde7b162 100644 --- a/x-pack/filebeat/cmd/root.go +++ b/x-pack/filebeat/cmd/root.go @@ -6,6 +6,7 @@ package cmd import ( "fmt" + "os" fbcmd "github.com/elastic/beats/v7/filebeat/cmd" cmd "github.com/elastic/beats/v7/libbeat/cmd" @@ -44,6 +45,13 @@ func defaultProcessors() []mapstr.M { // - add_cloud_metadata: ~ // - add_docker_metadata: ~ // - add_kubernetes_metadata: ~ + + // This gets called early enough that the CLI handling isn't properly initialized yet, + // so use an environment variable. + shipperEnv := os.Getenv("SHIPPER_MODE") + if shipperEnv == "True" { + return []mapstr.M{} + } return []mapstr.M{ { "add_host_metadata": mapstr.M{ @@ -54,4 +62,5 @@ func defaultProcessors() []mapstr.M { {"add_docker_metadata": nil}, {"add_kubernetes_metadata": nil}, } + } diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index 3677a9d7eb64..cd8c58236889 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" + "github.com/elastic/beats/v7/x-pack/filebeat/input/shipper" "github.com/elastic/elastic-agent-libs/logp" ) @@ -38,5 +39,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 awss3.Plugin(store), awscloudwatch.Plugin(), lumberjack.Plugin(), + shipper.Plugin(log, store), } } diff --git a/x-pack/filebeat/input/shipper/acker.go b/x-pack/filebeat/input/shipper/acker.go new file mode 100644 index 000000000000..8bfd856828f4 --- /dev/null +++ b/x-pack/filebeat/input/shipper/acker.go @@ -0,0 +1,30 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package shipper + +import ( + "sync/atomic" +) + +type shipperAcker struct { + persistedIndex uint64 +} + +func newShipperAcker() *shipperAcker { + return &shipperAcker{persistedIndex: 0} +} + +// Update the input's persistedIndex by adding total to it. +// Despite the name, "total" here means an incremental total, i.e. +// the total number of events that are being acknowledged by this callback, not the total that have been sent overall. +// The acked parameter includes only those events that were successfully sent upstream rather than dropped by processors, etc., +// but since we don't make that distinction in persistedIndex we can probably ignore it. +func (acker *shipperAcker) Track(_ int, total int) { + atomic.AddUint64(&acker.persistedIndex, uint64(total)) +} + +func (acker *shipperAcker) PersistedIndex() uint64 { + return acker.persistedIndex +} diff --git a/x-pack/filebeat/input/shipper/config.go b/x-pack/filebeat/input/shipper/config.go new file mode 100644 index 000000000000..e28217d8712c --- /dev/null +++ b/x-pack/filebeat/input/shipper/config.go @@ -0,0 +1,63 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package shipper + +import ( + "time" + + "github.com/elastic/elastic-agent-libs/config" +) + +// Instance represents all the config needed to start a single shipper input +// because a beat works fundamentally differently from the old shipper, we dont have to deal with async config that's being pieced together, +// this one config object recievewd on create has both the input and output config +type Instance struct { + // config for the shipper's gRPC input + Conn ConnectionConfig `config:",inline"` + Input InputConfig `config:",inline"` +} + +// ConnectionConfig is the shipper-relevant portion of the config received from input units +type ConnectionConfig struct { + Server string `config:"server"` + InitialTimeout time.Duration `config:"grpc_setup_timeout"` + TLS TLS `config:"ssl"` +} + +// TLS is TLS-specific shipper client settings +type TLS struct { + CAs []string `config:"certificate_authorities"` + Cert string `config:"certificate"` + Key string `config:"key"` +} + +// InputConfig represents the config for a shipper input. This is the complete config for that input, mirrored and sent to us. +// This is more or less the same as the the proto.UnitExpectedConfig type, but that doesn't have `config` struct tags, +// so for the sake of quick prototyping we're just (roughly) duplicating the structure here, minus any fields the shipper doesn't need (for now) +type InputConfig struct { + ID string `config:"id"` + Type string `config:"type"` + Name string `config:"name"` + DataStream DataStream `config:"data_stream"` + // for now don't try to parse the streams, + // once we have a better idea of how per-stream processors work, we can find a better way to unpack this + Streams []Stream `config:"streams"` +} + +// DataStream represents the datastream metadata from an input +type DataStream struct { + Dataset string `config:"dataset"` + Type string `config:"type"` + Namespace string `config:"namespace"` +} + +// Stream represents a single stream present inside an input. +// this field is largely unpredictable and varies by input type, +// we're just grabbing the fields the shipper needs. +type Stream struct { + ID string `config:"id"` + Processors []*config.C `config:"processors"` + Index string `config:"index"` +} diff --git a/x-pack/filebeat/input/shipper/input.go b/x-pack/filebeat/input/shipper/input.go new file mode 100644 index 000000000000..213e14be0eb6 --- /dev/null +++ b/x-pack/filebeat/input/shipper/input.go @@ -0,0 +1,283 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package shipper + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/x-pack/filebeat/input/shipper/tools" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-shipper-client/pkg/helpers" + pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" + "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" + "github.com/elastic/go-concert/unison" + + "github.com/docker/go-units" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + inputName = "shipper" +) + +// Plugin registers the input +func Plugin(log *logp.Logger, _ inputcursor.StateStore) v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Experimental, + Manager: NewInputManager(log), + } +} + +// InputManager wraps one stateless input manager +type InputManager struct { + log *logp.Logger +} + +// NewInputManager creates a new shipper input manager +func NewInputManager(log *logp.Logger) *InputManager { + + log.Infof("creating new InputManager") + return &InputManager{ + log: log.Named("shipper-beat"), + } +} + +// Init initializes the manager +// not sure if the shipper needs to do anything at this point? +func (im *InputManager) Init(_ unison.Group, _ v2.Mode) error { + return nil +} + +// Create creates the input from a given config +// in an attempt to speed things up, this will create the processors from the config before we have access to the pipeline to create the clients +func (im *InputManager) Create(cfg *config.C) (v2.Input, error) { + + config := Instance{} + if err := cfg.Unpack(&config); err != nil { + return nil, fmt.Errorf("error unpacking config: %w", err) + } + // strip the config we get from agent + config.Conn.Server = strings.TrimPrefix(config.Conn.Server, "unix://") + // following lines are helpful for debugging config, + // will be useful as we figure out how to integrate with agent + + // raw := mapstr.M{} + // err := cfg.Unpack(&raw) + // if err != nil { + // return nil, fmt.Errorf("error unpacking debug config: %w", err) + // } + // im.log.Infof("creating a new shipper input with config: %s", raw.String()) + // im.log.Infof("parsed config as: %#v", config) + + // create a mapping of streams + // when we get a new event, we use this to decide what processors to use + streamDataMap := map[string]streamData{} + for _, stream := range config.Input.Streams { + // convert to an actual processor used by the client + procList, err := processors.New(stream.Processors) + if err != nil { + return nil, fmt.Errorf("error creating processors for input: %w", err) + } + im.log.Infof("created processors for %s: %s", stream.ID, procList.String()) + streamDataMap[stream.ID] = streamData{index: stream.Index, processors: procList} + } + return &shipperInput{log: im.log, cfg: config, srvMut: &sync.Mutex{}, streams: streamDataMap, shipperSrv: config.Conn.Server, acker: newShipperAcker()}, nil +} + +// shipperInput is the main runtime object for the shipper +type shipperInput struct { + log *logp.Logger + cfg Instance + streams map[string]streamData + + server *grpc.Server + shipper *ShipperServer + // TODO: we probably don't need this, and can just fetch the config + shipperSrv string + srvMut *sync.Mutex + + acker *shipperAcker + + // incrementing counter that serves as an event ID + eventIDInc uint64 +} + +// all the data associated with a given stream that the shipper needs access to. +type streamData struct { + index string + client beat.Client + processors beat.ProcessorList +} + +func (in *shipperInput) Name() string { return inputName } + +func (in *shipperInput) Test(ctx v2.TestContext) error { + return nil +} + +// Stop the shipper +func (in *shipperInput) Stop() { + in.log.Infof("shipper shutting down") + //stop individual clients + for streamID, stream := range in.streams { + err := stream.client.Close() + if err != nil { + in.log.Infof("error closing client for stream: %s: %w", streamID, stream) + } + } + in.srvMut.Lock() + defer in.srvMut.Unlock() + if in.shipper != nil { + err := in.shipper.Close() + if err != nil { + in.log.Debugf("Error stopping shipper input: %s", err) + } + in.shipper = nil + } + if in.server != nil { + in.server.GracefulStop() + in.server = nil + + } + err := os.Remove(in.shipperSrv) + if err != nil { + in.log.Debugf("error removing unix socket for grpc listener during shutdown: %s", err) + } +} + +// Run the shipper +func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) error { + in.log.Infof("Running shipper input") + // create clients ahead of time + for streamID, streamProc := range in.streams { + client, err := pipeline.ConnectWith(beat.ClientConfig{ + PublishMode: beat.GuaranteedSend, + EventListener: acker.TrackingCounter(in.acker.Track), + Processing: beat.ProcessingConfig{ + Processor: streamProc.processors, + }, + CloseRef: inputContext.Cancelation, + }) + if err != nil { + return fmt.Errorf("error creating client for stream %s: %w", streamID, err) + } + in.log.Infof("Creating beat client for stream %s", streamID) + + newStreamData := streamData{client: client, index: in.streams[streamID].index, processors: in.streams[streamID].processors} + in.streams[streamID] = newStreamData + } + + //setup gRPC + err := in.setupgRPC(pipeline) + if err != nil { + return fmt.Errorf("error starting shipper gRPC server: %w", err) + } + in.log.Infof("done setting up gRPC server") + + // wait for shutdown + <-inputContext.Cancelation.Done() + + in.Stop() + + return nil +} + +func (in *shipperInput) setupgRPC(pipeline beat.Pipeline) error { + in.log.Infof("initializing grpc server at %s", in.shipperSrv) + // Currently no TLS until we figure out mTLS issues in agent/shipper + creds := insecure.NewCredentials() + opts := []grpc.ServerOption{ + grpc.Creds(creds), + grpc.MaxRecvMsgSize(64 * units.MiB), + } + + var err error + in.server = grpc.NewServer(opts...) + in.shipper, err = NewShipperServer(pipeline, in) + if err != nil { + return fmt.Errorf("error creating shipper server: %w", err) + } + + pb.RegisterProducerServer(in.server, in.shipper) + + in.srvMut.Lock() + + // treat most of these checking errors as "soft" errors + // Try to make the environment clean, but trust newListener() to fail if it can't just start. + + // paranoid checking, make sure we have the base directory. + dir := filepath.Dir(in.shipperSrv) + err = os.MkdirAll(dir, 0o755) + if err != nil { + in.log.Warnf("could not create directory for unix socket %s: %w", dir, err) + } + + // on linux, net.Listen will fail if the file already exists + err = os.Remove(in.shipperSrv) + if err != nil && !errors.Is(err, os.ErrNotExist) { + in.log.Warnf("could not remove pre-existing socket at %s: %w", in.shipperSrv, err) + } + + lis, err := newListener(in.log, in.shipperSrv) + if err != nil { + in.srvMut.Unlock() + return fmt.Errorf("failed to listen on %s: %w", in.shipperSrv, err) + } + + go func() { + in.log.Infof("gRPC listening on %s", in.shipperSrv) + err = in.server.Serve(lis) + if err != nil { + in.log.Errorf("gRPC server shut down with error: %s", err) + } + }() + + // make sure connection is up before mutex is released; + // if close() on the socket is called before it's started, it will trigger a race. + defer in.srvMut.Unlock() + con, err := tools.DialTestAddr(in.shipperSrv, in.cfg.Conn.InitialTimeout) + if err != nil { + // this will stop the other go routine in the wait group + in.server.Stop() + return fmt.Errorf("failed to test connection with the gRPC server on %s: %w", in.shipperSrv, err) + } + _ = con.Close() + + return nil +} + +func (in *shipperInput) sendEvent(event *messages.Event) (uint64, error) { + //look for matching processor config + stream, ok := in.streams[event.Source.StreamId] + if !ok { + return 0, fmt.Errorf("could not find data stream associated with ID '%s'", event.Source.StreamId) + } + + evt := beat.Event{ + Timestamp: event.Timestamp.AsTime(), + Fields: helpers.AsMap(event.Fields), + Meta: helpers.AsMap(event.Metadata), + } + atomic.AddUint64(&in.eventIDInc, 1) + + stream.client.Publish(evt) + + return in.eventIDInc, nil +} diff --git a/x-pack/filebeat/input/shipper/server.go b/x-pack/filebeat/input/shipper/server.go new file mode 100644 index 000000000000..3350a235782f --- /dev/null +++ b/x-pack/filebeat/input/shipper/server.go @@ -0,0 +1,223 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package shipper + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/gofrs/uuid" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + + pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" + "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" +) + +// ShipperServer handles the actual gRPC server and associated connections +type ShipperServer struct { + logger *logp.Logger + pipeline beat.Pipeline + + uuid string + + close *sync.Once + ctx context.Context + stop func() + + strictMode bool + + beatInput *shipperInput + + pb.UnimplementedProducerServer +} + +// NewShipperServer creates a new server instance for handling gRPC endpoints. +// publisher can be set to nil, in which case the SetOutput() method must be called. +func NewShipperServer(pipeline beat.Pipeline, shipper *shipperInput) (*ShipperServer, error) { + log := logp.NewLogger("shipper-server") + + id, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("error generating shipper UUID: %w", err) + } + + srv := ShipperServer{ + uuid: id.String(), + logger: log, + pipeline: pipeline, + close: &sync.Once{}, + beatInput: shipper, + strictMode: false, + } + + srv.ctx, srv.stop = context.WithCancel(context.Background()) + + return &srv, nil +} + +// PublishEvents is the server implementation of the gRPC PublishEvents call. +func (serv *ShipperServer) PublishEvents(_ context.Context, req *messages.PublishRequest) (*messages.PublishReply, error) { + resp := &messages.PublishReply{ + Uuid: serv.uuid, + } + // the value in the request is optional + if req.Uuid != "" && req.Uuid != serv.uuid { + serv.logger.Debugf("shipper UUID does not match, all events rejected. Expected = %s, actual = %s", serv.uuid, req.Uuid) + return resp, status.Error(codes.FailedPrecondition, fmt.Sprintf("UUID does not match. Expected = %s, actual = %s", serv.uuid, req.Uuid)) + } + + if len(req.Events) == 0 { + return nil, status.Error(codes.InvalidArgument, "publish request must contain at least one event") + } + + if serv.strictMode { + for _, e := range req.Events { + err := serv.validateEvent(e) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + } + } + + var accIdx uint64 + var err error + for _, evt := range req.Events { + accIdx, err = serv.beatInput.sendEvent(evt) + if err != nil { + serv.logger.Errorf("error sending event: %s", err) + } else { + resp.AcceptedCount++ + } + + } + resp.AcceptedIndex = accIdx + serv.logger. + Debugf("finished publishing a batch. Events = %d, accepted = %d, accepted index = %d", + len(req.Events), + resp.AcceptedCount, + resp.AcceptedIndex, + ) + + return resp, nil +} + +// PersistedIndex implementation. Will track and send the oldest unacked event in the queue. +func (serv *ShipperServer) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error { + serv.logger.Debug("new subscriber for persisted index change") + defer serv.logger.Debug("unsubscribed from persisted index change") + + err := producer.Send(&messages.PersistedIndexReply{ + Uuid: serv.uuid, + PersistedIndex: serv.beatInput.acker.PersistedIndex(), + }) + if err != nil { + return fmt.Errorf("error sending index reply: %w", err) + } + + pollingIntervalDur := req.PollingInterval.AsDuration() + if pollingIntervalDur == 0 { + return nil + } + + ticker := time.NewTicker(pollingIntervalDur) + defer ticker.Stop() + + for { + select { + case <-producer.Context().Done(): + return fmt.Errorf("producer context: %w", producer.Context().Err()) + + case <-serv.ctx.Done(): + return fmt.Errorf("server is stopped: %w", serv.ctx.Err()) + + case <-ticker.C: + serv.logger.Infof("persistedIndex=%d", serv.beatInput.acker.PersistedIndex()) + err = producer.Send(&messages.PersistedIndexReply{ + Uuid: serv.uuid, + PersistedIndex: serv.beatInput.acker.PersistedIndex(), + }) + if err != nil { + return fmt.Errorf("failed to send the update: %w", err) + } + } + } + +} + +// Close the server connection +func (serv *ShipperServer) Close() error { + return nil +} + +func (serv *ShipperServer) validateEvent(m *messages.Event) error { + var msgs []string + + if err := m.Timestamp.CheckValid(); err != nil { + msgs = append(msgs, fmt.Sprintf("timestamp: %s", err)) + } + + if err := serv.validateDataStream(m.DataStream); err != nil { + msgs = append(msgs, fmt.Sprintf("datastream: %s", err)) + } + + if err := serv.validateSource(m.Source); err != nil { + msgs = append(msgs, fmt.Sprintf("source: %s", err)) + } + + if len(msgs) == 0 { + return nil + } + + return errors.New(strings.Join(msgs, "; ")) +} + +func (serv *ShipperServer) validateSource(s *messages.Source) error { + if s == nil { + return fmt.Errorf("cannot be nil") + } + + var msgs []string + if s.InputId == "" { + msgs = append(msgs, "input_id is a required field") + } + + if len(msgs) == 0 { + return nil + } + + return errors.New(strings.Join(msgs, "; ")) +} + +func (serv *ShipperServer) validateDataStream(ds *messages.DataStream) error { + if ds == nil { + return fmt.Errorf("cannot be nil") + } + + var msgs []string + if ds.Dataset == "" { + msgs = append(msgs, "dataset is a required field") + } + if ds.Namespace == "" { + msgs = append(msgs, "namespace is a required field") + } + if ds.Type == "" { + msgs = append(msgs, "type is a required field") + } + + if len(msgs) == 0 { + return nil + } + + return errors.New(strings.Join(msgs, "; ")) +} diff --git a/x-pack/filebeat/input/shipper/srv_unix.go b/x-pack/filebeat/input/shipper/srv_unix.go new file mode 100644 index 000000000000..2fc49b3a6cc0 --- /dev/null +++ b/x-pack/filebeat/input/shipper/srv_unix.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !windows + +package shipper + +import ( + "net" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func newListener(_ *logp.Logger, addr string) (net.Listener, error) { + return net.Listen("unix", addr) +} diff --git a/x-pack/filebeat/input/shipper/srv_windows.go b/x-pack/filebeat/input/shipper/srv_windows.go new file mode 100644 index 000000000000..053533bf6c00 --- /dev/null +++ b/x-pack/filebeat/input/shipper/srv_windows.go @@ -0,0 +1,80 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. +//go:build windows + +package shipper + +import ( + "fmt" + "net" + "os/user" + "strings" + + "github.com/elastic/elastic-agent-libs/api/npipe" + "github.com/elastic/elastic-agent-libs/logp" +) + +const ( + NTAUTHORITY_SYSTEM = "S-1-5-18" + ADMINISTRATORS_GROUP = "S-1-5-32-544" +) + +func newListener(log *logp.Logger, addr string) (net.Listener, error) { + sd, err := securityDescriptor(log) + if err != nil { + return nil, err + } + return npipe.NewListener(addr, sd) +} + +func securityDescriptor(log *logp.Logger) (string, error) { + u, err := user.Current() + if err != nil { + return "", fmt.Errorf("failed to get current user: %w", err) + } + // Named pipe security and access rights. + // We create the pipe and the specific users should only be able to write to it. + // See docs: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-security-and-access-rights + // String definition: https://docs.microsoft.com/en-us/windows/win32/secauthz/ace-strings + // Give generic read/write access to the specified user. + descriptor := "D:P(A;;GA;;;" + u.Uid + ")" + + if isAdmin, err := isWindowsAdmin(u); err != nil { + // do not fail, agent would end up in a loop, continue with limited permissions + log.Warnf("failed to detect admin: %w", err) + } else if isAdmin { + // running as SYSTEM, include Administrators group so Administrators can talk over + // the named pipe to the running Elastic Agent system process + // https://support.microsoft.com/en-us/help/243330/well-known-security-identifiers-in-windows-operating-systems + descriptor += "(A;;GA;;;" + ADMINISTRATORS_GROUP + ")" + } + return descriptor, nil +} + +func isWindowsAdmin(u *user.User) (bool, error) { + if u.Username == "NT AUTHORITY\\SYSTEM" { + return true, nil + } + + if equalsSystemGroup(u.Uid) || equalsSystemGroup(u.Gid) { + return true, nil + } + + groups, err := u.GroupIds() + if err != nil { + return false, fmt.Errorf("failed to get current user groups: %w", err) + } + + for _, groupSid := range groups { + if equalsSystemGroup(groupSid) { + return true, nil + } + } + + return false, nil +} + +func equalsSystemGroup(s string) bool { + return strings.EqualFold(s, NTAUTHORITY_SYSTEM) || strings.EqualFold(s, ADMINISTRATORS_GROUP) +} diff --git a/x-pack/filebeat/input/shipper/tools/test_unix.go b/x-pack/filebeat/input/shipper/tools/test_unix.go new file mode 100644 index 000000000000..c092029767f9 --- /dev/null +++ b/x-pack/filebeat/input/shipper/tools/test_unix.go @@ -0,0 +1,27 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. +//go:build !windows + +package tools + +import ( + "net" + "net/url" + "path/filepath" + "time" +) + +// DialTestAddr dials the address with the operating specific function +func DialTestAddr(addr string, timeout time.Duration) (net.Conn, error) { + dailer := net.Dialer{Timeout: timeout} + return dailer.Dial("unix", addr) +} + +// GenerateTestAddr creates a grpc address that is specific to the operating system +func GenerateTestAddr(path string) string { + var socket url.URL + socket.Scheme = "unix" + socket.Path = filepath.Join(path, "grpc.sock") + return socket.String() +} diff --git a/x-pack/filebeat/input/shipper/tools/test_windows.go b/x-pack/filebeat/input/shipper/tools/test_windows.go new file mode 100644 index 000000000000..0808b237aa26 --- /dev/null +++ b/x-pack/filebeat/input/shipper/tools/test_windows.go @@ -0,0 +1,27 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. +//go:build windows + +package tools + +import ( + "crypto/sha256" + "fmt" + "net" + "time" + + "github.com/Microsoft/go-winio" +) + +// DialTestAddr dials the address with the operating specific function +func DialTestAddr(addr string, timeout time.Duration) (net.Conn, error) { + return winio.DialPipe(addr, &timeout) +} + +// GenerateTestAddr creates a grpc address that is specific to the operating system +func GenerateTestAddr(path string) string { + // entire string cannot be longer than 256 characters, path + // should be unique for each test + return fmt.Sprintf(`\\.\pipe\shipper-%x-pipe`, sha256.Sum256([]byte(path))) +} diff --git a/x-pack/filebeat/input/shipper/tools/tools.go b/x-pack/filebeat/input/shipper/tools/tools.go new file mode 100644 index 000000000000..9b434c83cb15 --- /dev/null +++ b/x-pack/filebeat/input/shipper/tools/tools.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build tools +// +build tools + +package tools + +import ( + // mage notice will fail without this, since it'll try and fetch this with `go install` + _ "go.elastic.co/go-licence-detector" + + _ "github.com/elastic/elastic-agent-libs/dev-tools/mage" + + _ "gotest.tools/gotestsum/cmd" +) diff --git a/x-pack/libbeat/management/generate.go b/x-pack/libbeat/management/generate.go index ab80ae43686b..6677bcfcf644 100644 --- a/x-pack/libbeat/management/generate.go +++ b/x-pack/libbeat/management/generate.go @@ -68,7 +68,7 @@ func (r *TransformRegister) Transform( // CreateInputsFromStreams breaks down the raw Expected config into an array of individual inputs/modules from the Streams values // that can later be formatted into the reloader's ConfigWithMetaData and sent to an indvidual beat/ // This also performs the basic task of inserting module-level add_field processors into the inputs/modules. -func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, inputType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) ([]map[string]interface{}, error) { +func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) ([]map[string]interface{}, error) { // should this be an error? if raw.GetStreams() == nil { return []map[string]interface{}{}, nil @@ -77,14 +77,24 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, inputType string, ag for iter, stream := range raw.GetStreams() { streamSource := raw.GetStreams()[iter].GetSource().AsMap() + streamSource, err := createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...) + if err != nil { + return nil, fmt.Errorf("error creating stream rules: %w", err) + } - streamSource = injectIndexStream(inputType, raw, stream, streamSource) + inputs[iter] = streamSource + } - // the order of building the processors is important - // prepend is used to ensure that the processors defined directly on the stream - // come last in the order as they take priority over the others as they are the - // most specific to this one stream + return inputs, nil +} +// CreateShipperInput is a modified version of CreateInputsFromStreams made for forwarding input units to the shipper beat +// this does not create separate inputs for each stream, and instead passes it along as a single input, with just the processors added +func CreateShipperInput(raw *proto.UnitExpectedConfig, defaultDataStreamType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) ([]map[string]interface{}, error) { + inputs := make([]map[string]interface{}, len(raw.GetStreams())) + for iter, stream := range raw.GetStreams() { + streamSource := raw.GetStreams()[iter].GetSource().AsMap() + streamSource = injectIndexStream(defaultDataStreamType, raw, stream, streamSource) // 1. global processors streamSource = injectGlobalProcesssors(raw, streamSource) @@ -95,21 +105,16 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, inputType string, ag } // 3. stream processors - streamSource, err = injectStreamProcessors(raw, inputType, stream, streamSource, defaultProcessors) + streamSource, err = injectStreamProcessors(raw, defaultDataStreamType, stream, streamSource, defaultProcessors) if err != nil { return nil, fmt.Errorf("Error injecting stream processors: %w", err) } - - // now the order of the processors on this input is as follows - // 1. stream processors - // 2. agentInfo processors - // 3. global processors - // 4. stream specific processors - inputs[iter] = streamSource } + rawMap := raw.Source.AsMap() + rawMap["streams"] = inputs - return inputs, nil + return []map[string]interface{}{rawMap}, nil } // CreateReloadConfigFromInputs turns a raw input/module list into the ConfigWithMeta type used by the reloader interface @@ -131,6 +136,40 @@ func CreateReloadConfigFromInputs(raw []map[string]interface{}) ([]*reload.Confi // config injection // =========== +// convinence method for wrapping all the stream transformations needed by the shipper and other inputs +func createStreamRules(raw *proto.UnitExpectedConfig, streamSource map[string]interface{}, stream *proto.Stream, defaultDataStreamType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) (map[string]interface{}, error) { + + streamSource = injectIndexStream(defaultDataStreamType, raw, stream, streamSource) + + // the order of building the processors is important + // prepend is used to ensure that the processors defined directly on the stream + // come last in the order as they take priority over the others as they are the + // most specific to this one stream + + // 1. global processors + streamSource = injectGlobalProcesssors(raw, streamSource) + + // 2. agentInfo + streamSource, err := injectAgentInfoRule(streamSource, agentInfo) + if err != nil { + return nil, fmt.Errorf("Error injecting agent processors: %w", err) + } + + // 3. stream processors + streamSource, err = injectStreamProcessors(raw, defaultDataStreamType, stream, streamSource, defaultProcessors) + if err != nil { + return nil, fmt.Errorf("Error injecting stream processors: %w", err) + } + + // now the order of the processors on this input is as follows + // 1. stream processors + // 2. agentInfo processors + // 3. global processors + // 4. stream specific processors + + return streamSource, nil +} + // Emulates the InjectAgentInfoRule and InjectHeadersRule ast rules // adds processors for agent-related metadata func injectAgentInfoRule(inputs map[string]interface{}, agentInfo *client.AgentInfo) (map[string]interface{}, error) { @@ -173,8 +212,8 @@ func injectGlobalProcesssors(expected *proto.UnitExpectedConfig, stream map[stri // injectIndexStream is an emulation of the InjectIndexProcessor AST code // this adds the `index` field, based on the data_stream info we get from the config -func injectIndexStream(dataStreamType string, expected *proto.UnitExpectedConfig, streamExpected *proto.Stream, stream map[string]interface{}) map[string]interface{} { - streamType, dataset, namespace := metadataFromDatastreamValues(dataStreamType, expected, streamExpected) +func injectIndexStream(defaultDataStreamType string, expected *proto.UnitExpectedConfig, streamExpected *proto.Stream, stream map[string]interface{}) map[string]interface{} { + streamType, dataset, namespace := metadataFromDatastreamValues(defaultDataStreamType, expected, streamExpected) index := fmt.Sprintf("%s-%s-%s", streamType, dataset, namespace) stream["index"] = index return stream @@ -182,10 +221,10 @@ func injectIndexStream(dataStreamType string, expected *proto.UnitExpectedConfig // injectStreamProcessors is an emulation of the InjectStreamProcessorRule AST code // this adds a variety of processors for metadata related to the dataset and input config. -func injectStreamProcessors(expected *proto.UnitExpectedConfig, dataStreamType string, streamExpected *proto.Stream, stream map[string]interface{}, defaultProcessors []mapstr.M) (map[string]interface{}, error) { +func injectStreamProcessors(expected *proto.UnitExpectedConfig, defaultDataStreamType string, streamExpected *proto.Stream, stream map[string]interface{}, defaultProcessors []mapstr.M) (map[string]interface{}, error) { // 1. start by "repairing" config to add any missing fields // logic from datastreamTypeFromInputNode - procInputType, procInputDataset, procInputNamespace := metadataFromDatastreamValues(dataStreamType, expected, streamExpected) + procInputType, procInputDataset, procInputNamespace := metadataFromDatastreamValues(defaultDataStreamType, expected, streamExpected) var processors = []interface{}{}