From cabda46f9add3b2e3cac88c2cd78c5ebfefe4543 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 17 May 2024 14:24:05 -0400 Subject: [PATCH] Remove shipper (#39584) This commit removes all code related to the shipper and fixes lint warnings. The deprecated `ioutil.ReadDir` has been replaced by `io.ReadDir`. A single for loop is used to iterate over the dirEntries and if there is an error reading the FileInfo from a DirEntry, we log it and continue to the next dirEntry. This effectively adds more resilience to the original code that would not process any segment if the call to `ioutil.ReadDir` had failed. --- NOTICE.txt | 163 +---- go.mod | 3 +- go.sum | 2 - libbeat/outputs/elasticsearch/client_test.go | 5 +- libbeat/outputs/outest/batch.go | 2 - libbeat/outputs/shipper/README.md | 82 --- libbeat/outputs/shipper/api/shipper_mock.go | 108 --- libbeat/outputs/shipper/config.go | 64 -- libbeat/outputs/shipper/shipper.go | 449 ------------ libbeat/outputs/shipper/shipper_test.go | 652 ------------------ libbeat/publisher/event.go | 11 - libbeat/publisher/includes/includes.go | 1 - libbeat/publisher/pipeline/ttl_batch.go | 5 - libbeat/publisher/pipeline/ttl_batch_test.go | 10 - .../queue/diskqueue/benchmark_test.go | 47 +- libbeat/publisher/queue/diskqueue/config.go | 3 - libbeat/publisher/queue/diskqueue/consumer.go | 3 - libbeat/publisher/queue/diskqueue/queue.go | 8 +- libbeat/publisher/queue/diskqueue/segments.go | 23 +- .../publisher/queue/diskqueue/serialize.go | 21 +- .../queue/diskqueue/serialize_test.go | 109 --- libbeat/publisher/queue/memqueue/broker.go | 9 - .../publisher/queue/memqueue/queue_test.go | 38 - libbeat/publisher/queue/queue.go | 4 - x-pack/filebeat/cmd/agent.go | 25 +- x-pack/filebeat/cmd/root.go | 8 - .../input/default-inputs/inputs_other.go | 2 - .../input/default-inputs/inputs_windows.go | 2 - x-pack/filebeat/input/shipper/acker.go | 30 - x-pack/filebeat/input/shipper/config.go | 63 -- x-pack/filebeat/input/shipper/input.go | 283 -------- x-pack/filebeat/input/shipper/server.go | 223 ------ x-pack/filebeat/input/shipper/srv_unix.go | 17 - x-pack/filebeat/input/shipper/srv_windows.go | 80 --- .../filebeat/input/shipper/tools/test_unix.go | 27 - .../input/shipper/tools/test_windows.go | 27 - x-pack/filebeat/input/shipper/tools/tools.go | 17 - .../tests/integration/shipper_test.go | 270 -------- x-pack/libbeat/management/generate.go | 31 +- 39 files changed, 67 insertions(+), 2860 deletions(-) delete mode 100644 libbeat/outputs/shipper/README.md delete mode 100644 libbeat/outputs/shipper/api/shipper_mock.go delete mode 100644 libbeat/outputs/shipper/config.go delete mode 100644 libbeat/outputs/shipper/shipper.go delete mode 100644 libbeat/outputs/shipper/shipper_test.go delete mode 100644 libbeat/publisher/queue/diskqueue/serialize_test.go delete mode 100644 x-pack/filebeat/input/shipper/acker.go delete mode 100644 x-pack/filebeat/input/shipper/config.go delete mode 100644 x-pack/filebeat/input/shipper/input.go delete mode 100644 x-pack/filebeat/input/shipper/server.go delete mode 100644 x-pack/filebeat/input/shipper/srv_unix.go delete mode 100644 x-pack/filebeat/input/shipper/srv_windows.go delete mode 100644 x-pack/filebeat/input/shipper/tools/test_unix.go delete mode 100644 x-pack/filebeat/input/shipper/tools/test_windows.go delete mode 100644 x-pack/filebeat/input/shipper/tools/tools.go delete mode 100644 x-pack/filebeat/tests/integration/shipper_test.go diff --git a/NOTICE.txt b/NOTICE.txt index e636d4324a4..aad81b518ac 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -13178,109 +13178,6 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l limitations under the License. --------------------------------------------------------------------------------- -Dependency : github.com/elastic/elastic-agent-shipper-client -Version: v0.5.1-0.20230228231646-f04347b666f3 -Licence type (autodetected): Elastic --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.5.1-0.20230228231646-f04347b666f3/LICENSE.txt: - -Elastic License 2.0 - -URL: https://www.elastic.co/licensing/elastic-license - -## Acceptance - -By using the software, you agree to all of the terms and conditions below. - -## Copyright License - -The licensor grants you a non-exclusive, royalty-free, worldwide, -non-sublicensable, non-transferable license to use, copy, distribute, make -available, and prepare derivative works of the software, in each case subject to -the limitations and conditions below. - -## Limitations - -You may not provide the software to third parties as a hosted or managed -service, where the service provides users with access to any substantial set of -the features or functionality of the software. - -You may not move, change, disable, or circumvent the license key functionality -in the software, and you may not remove or obscure any functionality in the -software that is protected by the license key. - -You may not alter, remove, or obscure any licensing, copyright, or other notices -of the licensor in the software. Any use of the licensor’s trademarks is subject -to applicable law. - -## Patents - -The licensor grants you a license, under any patent claims the licensor can -license, or becomes able to license, to make, have made, use, sell, offer for -sale, import and have imported the software, in each case subject to the -limitations and conditions in this license. This license does not cover any -patent claims that you cause to be infringed by modifications or additions to -the software. If you or your company make any written claim that the software -infringes or contributes to infringement of any patent, your patent license for -the software granted under these terms ends immediately. If your company makes -such a claim, your patent license ends immediately for work on behalf of your -company. - -## Notices - -You must ensure that anyone who gets a copy of any part of the software from you -also gets a copy of these terms. - -If you modify the software, you must include in any modified copies of the -software prominent notices stating that you have modified the software. - -## No Other Rights - -These terms do not imply any licenses other than those expressly granted in -these terms. - -## Termination - -If you use the software in violation of these terms, such use is not licensed, -and your licenses will automatically terminate. If the licensor provides you -with a notice of your violation, and you cease all violation of this license no -later than 30 days after you receive that notice, your licenses will be -reinstated retroactively. However, if you violate these terms after such -reinstatement, any additional violation of these terms will cause your licenses -to terminate automatically and permanently. - -## No Liability - -*As far as the law allows, the software comes as is, without any warranty or -condition, and the licensor will not be liable to you for any damages arising -out of these terms or the use or nature of the software, under any kind of -legal claim.* - -## Definitions - -The **licensor** is the entity offering these terms, and the **software** is the -software the licensor makes available under these terms, including any portion -of it. - -**you** refers to the individual or entity agreeing to these terms. - -**your company** is any legal entity, sole proprietorship, or other kind of -organization that you work for, plus all organizations that have control over, -are under the control of, or are under common control with that -organization. **control** means ownership of substantially all the assets of an -entity, or the power to direct its management and policies by vote, contract, or -otherwise. Control can be direct or indirect. - -**your licenses** are all the licenses granted to you for the software under -these terms. - -**use** means anything you do with the software requiring one of your licenses. - -**trademark** means trademarks, service marks, and similar rights. - - -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-system-metrics Version: v0.9.2 @@ -22830,36 +22727,6 @@ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. --------------------------------------------------------------------------------- -Dependency : github.com/sergi/go-diff -Version: v1.3.1 -Licence type (autodetected): MIT --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/sergi/go-diff@v1.3.1/LICENSE: - -Copyright (c) 2012-2016 The go-diff Authors. All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining a -copy of this software and associated documentation files (the "Software"), -to deal in the Software without restriction, including without limitation -the rights to use, copy, modify, merge, publish, distribute, sublicense, -and/or sell copies of the Software, and to permit persons to whom the -Software is furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included -in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. - - - -------------------------------------------------------------------------------- Dependency : github.com/shirou/gopsutil/v3 Version: v3.22.10 @@ -51122,6 +50989,36 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/sergi/go-diff +Version: v1.3.1 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/sergi/go-diff@v1.3.1/LICENSE: + +Copyright (c) 2012-2016 The go-diff Authors. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a +copy of this software and associated documentation files (the "Software"), +to deal in the Software without restriction, including without limitation +the rights to use, copy, modify, merge, publish, distribute, sublicense, +and/or sell copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included +in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. + + + -------------------------------------------------------------------------------- Dependency : github.com/shirou/gopsutil Version: v3.21.11+incompatible diff --git a/go.mod b/go.mod index 8b4a980154b..8d278dae027 100644 --- a/go.mod +++ b/go.mod @@ -207,7 +207,6 @@ require ( github.com/elastic/ebpfevents v0.6.0 github.com/elastic/elastic-agent-autodiscover v0.6.14 github.com/elastic/elastic-agent-libs v0.9.8 - github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 github.com/elastic/elastic-agent-system-metrics v0.9.2 github.com/elastic/go-elasticsearch/v8 v8.13.1 github.com/elastic/mito v1.11.0 @@ -227,7 +226,6 @@ require ( github.com/otiai10/copy v1.12.0 github.com/pierrec/lz4/v4 v4.1.18 github.com/pkg/xattr v0.4.9 - github.com/sergi/go-diff v1.3.1 github.com/shirou/gopsutil/v3 v3.22.10 github.com/tklauser/go-sysconf v0.3.10 go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 @@ -366,6 +364,7 @@ require ( github.com/rootless-containers/rootlesskit v1.1.0 // indirect github.com/sanathkr/go-yaml v0.0.0-20170819195128-ed9d249f429b // indirect github.com/segmentio/asm v1.2.0 // indirect + github.com/sergi/go-diff v1.3.1 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect diff --git a/go.sum b/go.sum index d1affeb4e8f..e91e46ce2c6 100644 --- a/go.sum +++ b/go.sum @@ -556,8 +556,6 @@ github.com/elastic/elastic-agent-client/v7 v7.9.0 h1:ryNbISIg4tTRT9KA0MYOa+fxW0C github.com/elastic/elastic-agent-client/v7 v7.9.0/go.mod h1:/AeiwX9zxG99eUNrLhpApTpwmE71Qwuh4ozObn7a0ss= github.com/elastic/elastic-agent-libs v0.9.8 h1:fwl3hp0gNmKkuERcUQTwe4cyIK6M0jJkv16EIsB6Apw= github.com/elastic/elastic-agent-libs v0.9.8/go.mod h1:xhHF9jeWhPzKPtEHN+epKjdiZi0bCbACLxwkp1aHMpc= -github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 h1:sb+25XJn/JcC9/VL8HX4r4QXSUq4uTNzGS2kxOE7u1U= -github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3/go.mod h1:rWarFM7qYxJKsi9WcV6ONcFjH/NA3niDNpTxO+8/GVI= github.com/elastic/elastic-agent-system-metrics v0.9.2 h1:/tvTKOt55EerU0WwGFoDhBlyWLgxyv7d8xCbny0bciw= github.com/elastic/elastic-agent-system-metrics v0.9.2/go.mod h1:VfJnKw4Jqrd9ddljXCwaGKJgN+7ADyyGk089NaXVsf0= github.com/elastic/elastic-transport-go/v8 v8.5.0 h1:v5membAl7lvQgBTexPRDBO/RdnlQX+FM9fUVDyXxvH0= diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 28033ff3cb2..429b600d11a 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -70,9 +70,8 @@ func (bm *batchMock) ACK() { func (bm *batchMock) Drop() { bm.drop = true } -func (bm *batchMock) Retry() { panic("unimplemented") } -func (bm *batchMock) Cancelled() { panic("unimplemented") } -func (bm *batchMock) FreeEntries() {} +func (bm *batchMock) Retry() { panic("unimplemented") } +func (bm *batchMock) Cancelled() { panic("unimplemented") } func (bm *batchMock) SplitRetry() bool { if bm.canSplit { bm.didSplit = true diff --git a/libbeat/outputs/outest/batch.go b/libbeat/outputs/outest/batch.go index f0e6838e8fd..f8106a9f5db 100644 --- a/libbeat/outputs/outest/batch.go +++ b/libbeat/outputs/outest/batch.go @@ -77,8 +77,6 @@ func (b *Batch) SplitRetry() bool { return len(b.events) > 1 } -func (b *Batch) FreeEntries() {} - func (b *Batch) Cancelled() { b.doSignal(BatchSignal{Tag: BatchCancelled}) } diff --git a/libbeat/outputs/shipper/README.md b/libbeat/outputs/shipper/README.md deleted file mode 100644 index dec05449e73..00000000000 --- a/libbeat/outputs/shipper/README.md +++ /dev/null @@ -1,82 +0,0 @@ -# Shipper - -The Shipper output sends events to a shipper service via gRPC. - -WARNING: This output is experimental and is not supposed to be used by anyone other than developers. - -To use this output, edit the beat's configuration file and enable the shipper output by adding `output.shipper`. - -Example configuration: - -```yaml -output.shipper: - server: "localhost:50051" - ssl: - enabled: true - certificate_authorities: ["/etc/client/ca.pem"] - certificate: "/etc/client/cert.pem" - ssl.key: "/etc/client/cert.key" - timeout: 30 - max_retries: 3 - bulk_max_size: 50 - ack_polling_interval: '5ms' - backoff: - init: 1 - max: 60 -``` - -## Configuration options - -You can specify the following `output.shipper` options in the beat's config file: - -### `server` - -The address of the gRPC server where the shipper service is hosted. - -### `ssl` - -Configuration options for SSL parameters like the root CA for gRPC connections. -See [configuration-ssl](https://www.elastic.co/guide/en/beats/filebeat/current/configuration-ssl.html) for more information. - -### `timeout` - -The number of seconds to wait for responses from the gRPC server before timing -out. The default is 30 (seconds). - -### `max_retries` - -The number of times to retry publishing an event after a publishing failure. -After the specified number of retries, the events are typically dropped. - -Set `max_retries` to a value less than 0 to retry until all events are published. - -The default is 3. - -### `bulk_max_size` - -The maximum number of events to buffer internally during publishing. The default is 50. - -Specifying a larger batch size may add some latency and buffering during publishing. - -Setting `bulk_max_size` to values less than or equal to 0 disables the -splitting of batches. When splitting is disabled, the queue decides on the -number of events to be contained in a batch. - -### `ack_polling_interval` - -The minimal interval for getting persisted index updates from the shipper server. Batches of events are acknowledged asynchronously in the background. If after the `ack_polling_interval` duration the persisted index value changed all batches pending acknowledgment will be checked against the new value and acknowledged if `persisted_index` >= `accepted_index`. - -The default value is `5ms`, cannot be set to a value less then the default. - -### `backoff.init` - -The number of seconds to wait before trying to republish to the shipper -after a network error. After waiting `backoff.init` seconds, {beatname_uc} -tries to republish. If the attempt fails, the backoff timer is increased -exponentially up to `backoff.max`. After a successful publish, the backoff -timer is reset. The default is 1s. - -### `backoff.max` - -The maximum number of seconds to wait before attempting to republish to -the shipper after a network error. The default is 60s. diff --git a/libbeat/outputs/shipper/api/shipper_mock.go b/libbeat/outputs/shipper/api/shipper_mock.go deleted file mode 100644 index 52e49a72e32..00000000000 --- a/libbeat/outputs/shipper/api/shipper_mock.go +++ /dev/null @@ -1,108 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package api - -import ( - context "context" - "errors" - "time" - - pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" - - "github.com/gofrs/uuid" -) - -func NewProducerMock(cap int) *ProducerMock { - id, _ := uuid.NewV4() - return &ProducerMock{ - uuid: id.String(), - Q: make([]*messages.Event, 0, cap), - } -} - -type ProducerMock struct { - pb.UnimplementedProducerServer - Q []*messages.Event - uuid string - AcceptedCount uint32 - persistedIndex uint64 - ErrorCallback func(events []*messages.Event) error -} - -func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishRequest) (*messages.PublishReply, error) { - if p.ErrorCallback != nil { - if err := p.ErrorCallback(r.Events); err != nil { - return nil, err - } - } - - if r.Uuid != p.uuid { - return nil, errors.New("UUID does not match") - } - - resp := &messages.PublishReply{} - - for _, e := range r.Events { - if len(p.Q) == cap(p.Q) { - return resp, nil - } - - p.Q = append(p.Q, e) - resp.AcceptedCount++ - if resp.AcceptedCount == p.AcceptedCount { - break - } - } - - resp.AcceptedIndex = uint64(len(p.Q)) - - return resp, nil -} - -func (p *ProducerMock) Persist(count uint64) { - p.persistedIndex = count -} - -func (p *ProducerMock) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error { - err := producer.Send(&messages.PersistedIndexReply{ - Uuid: p.uuid, - PersistedIndex: p.persistedIndex, - }) - if err != nil { - return err - } - - if !req.PollingInterval.IsValid() || req.PollingInterval.AsDuration() == 0 { - return nil - } - - ticker := time.NewTicker(req.PollingInterval.AsDuration()) - defer ticker.Stop() - - for range ticker.C { - err = producer.Send(&messages.PersistedIndexReply{ - Uuid: p.uuid, - PersistedIndex: p.persistedIndex, - }) - if err != nil { - return err - } - } - return nil -} diff --git a/libbeat/outputs/shipper/config.go b/libbeat/outputs/shipper/config.go deleted file mode 100644 index a4c71e05da2..00000000000 --- a/libbeat/outputs/shipper/config.go +++ /dev/null @@ -1,64 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package shipper - -import ( - "time" - - "github.com/elastic/elastic-agent-libs/transport/tlscommon" -) - -type backoffConfig struct { - Init time.Duration `config:"init" validate:"nonzero"` - Max time.Duration `config:"max" validate:"nonzero"` -} - -type Config struct { - // Server address in the format of host:port, e.g. `localhost:50051` - Server string `config:"server"` - // TLS/SSL configurationf or secure connection - TLS *tlscommon.Config `config:"ssl"` - // Timeout of a single batch publishing request - Timeout time.Duration `config:"timeout" validate:"min=1"` - // MaxRetries is how many times the same batch is attempted to be sent - MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` - // BulkMaxSize max amount of events in a single batch - BulkMaxSize int `config:"bulk_max_size"` - // AckPollingInterval is a minimal interval for getting persisted index updates from the shipper server. - // Batches of events are acknowledged asynchronously in the background. - // If after the `AckPollingInterval` duration the persisted index value changed - // all batches pending acknowledgment will be checked against the new value - // and acknowledged if `persisted_index` >= `accepted_index`. - AckPollingInterval time.Duration `config:"ack_polling_interval" validate:"min=5ms"` - // Backoff strategy for the shipper output - Backoff backoffConfig `config:"backoff"` -} - -func defaultConfig() Config { - return Config{ - TLS: nil, - Timeout: 30 * time.Second, - MaxRetries: 3, - BulkMaxSize: 50, - AckPollingInterval: 5 * time.Millisecond, - Backoff: backoffConfig{ - Init: 1 * time.Second, - Max: 60 * time.Second, - }, - } -} diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go deleted file mode 100644 index 83955a80f4c..00000000000 --- a/libbeat/outputs/shipper/shipper.go +++ /dev/null @@ -1,449 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package shipper - -import ( - "context" - "fmt" - "sync" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/outputs" - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" - - "github.com/elastic/elastic-agent-shipper-client/pkg/helpers" - sc "github.com/elastic/elastic-agent-shipper-client/pkg/proto" - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" - - conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/transport/tlscommon" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" -) - -type pendingBatch struct { - batch publisher.Batch - index uint64 - eventCount int - droppedCount int -} - -type shipper struct { - log *logp.Logger - observer outputs.Observer - - config Config - - conn *grpc.ClientConn - client sc.ProducerClient - ackClient sc.Producer_PersistedIndexClient - - serverID string - - // The publish function sends to ackLoopChan to notify the ack worker of - // new pending batches - ackBatchChan chan pendingBatch - - // The ack RPC listener sends to ackIndexChan to notify the ack worker - // of the new persisted index - ackIndexChan chan uint64 - - // ackWaitGroup is used to synchronize the shutdown of the ack listener - // and the ack worker when a connection is closed. - ackWaitGroup sync.WaitGroup - - // ackCancel cancels the context for the ack listener and the ack worker, - // notifying them to shut down. - ackCancel context.CancelFunc -} - -func init() { - outputs.RegisterType("shipper", makeShipper) -} - -// shipperProcessor serves as a wrapper for testing Publish() calls with alternate marshalling callbacks -var shipperProcessor = toShipperEvent - -func makeShipper( - _ outputs.IndexManager, - beat beat.Info, - observer outputs.Observer, - cfg *conf.C, -) (outputs.Group, error) { - - config := defaultConfig() - err := cfg.Unpack(&config) - if err != nil { - return outputs.Fail(err) - } - - s := &shipper{ - log: logp.NewLogger("shipper"), - observer: observer, - config: config, - } - - swb := outputs.WithBackoff(s, config.Backoff.Init, config.Backoff.Max) - - return outputs.Group{ - Clients: []outputs.Client{swb}, - Retry: config.MaxRetries, - QueueFactory: memqueue.FactoryForSettings( - memqueue.Settings{ - Events: config.BulkMaxSize * 2, - MaxGetRequest: config.BulkMaxSize, - FlushTimeout: 0, - }), - }, nil -} - -// Connect establishes connection to the shipper server and implements `outputs.Connectable`. -func (s *shipper) Connect() error { - tls, err := tlscommon.LoadTLSConfig(s.config.TLS) - if err != nil { - return fmt.Errorf("invalid shipper TLS configuration: %w", err) - } - - var creds credentials.TransportCredentials - if s.config.TLS != nil && s.config.TLS.Enabled != nil && *s.config.TLS.Enabled { - creds = credentials.NewTLS(tls.ToConfig()) - } else { - creds = insecure.NewCredentials() - } - - opts := []grpc.DialOption{ - grpc.WithConnectParams(grpc.ConnectParams{ - MinConnectTimeout: s.config.Timeout, - }), - grpc.WithBlock(), - grpc.WithTransportCredentials(creds), - } - - s.log.Debugf("trying to connect to %s...", s.config.Server) - - ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout) - defer cancel() - conn, err := grpc.DialContext(ctx, s.config.Server, opts...) - if err != nil { - return fmt.Errorf("shipper connection failed with: %w", err) - } - - s.conn = conn - s.client = sc.NewProducerClient(conn) - - return s.startACKLoop() -} - -// Publish converts and sends a batch of events to the shipper server. -// Also, implements `outputs.Client` -func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error { - err := s.publish(ctx, batch) - if err != nil { - // If there was an error then we are dropping our connection. - s.Close() - } - return err -} - -func (s *shipper) publish(ctx context.Context, batch publisher.Batch) error { - if s.conn == nil { - return fmt.Errorf("connection is not established") - } - - events := batch.Events() - s.observer.NewBatch(len(events)) - - toSend := make([]*messages.Event, 0, len(events)) - - s.log.Debugf("converting %d events to protobuf...", len(events)) - - droppedCount := 0 - - for i, e := range events { - converted, err := shipperProcessor(e) - if err != nil { - // conversion errors are not recoverable, so we have to drop the event completely - s.log.Errorf("%d/%d: %q, dropped", i+1, len(events), err) - droppedCount++ - continue - } - - toSend = append(toSend, converted) - } - - convertedCount := len(toSend) - - s.observer.Dropped(droppedCount) - s.log.Debugf("%d events converted to protobuf, %d dropped", convertedCount, droppedCount) - - var lastAcceptedIndex uint64 - - ctx, cancel := context.WithTimeout(ctx, s.config.Timeout) - defer cancel() - - for len(toSend) > 0 { - publishReply, err := s.client.PublishEvents(ctx, &messages.PublishRequest{ - Uuid: s.serverID, - Events: toSend, - }) - - if err != nil { - if status.Code(err) == codes.ResourceExhausted { - // This error can only come from the gRPC connection, and - // most likely indicates this request exceeds the shipper's - // RPC size limit. Split the batch if possible, otherwise we - // need to drop it. - if batch.SplitRetry() { - // Report that we split a batch - s.observer.Split() - } else { - batch.Drop() - s.observer.Dropped(len(events)) - s.log.Errorf("dropping %d events because of RPC failure: %v", len(events), err) - } - return nil - } - // All other known errors are, in theory, retryable once the - // RPC connection is successfully restarted, and don't depend on - // the contents of the request. We should be cautious, though: if an - // error is deterministic based on the contents of a publish - // request, then cancelling here (instead of dropping or retrying) - // will cause an infinite retry loop, wedging the pipeline. - - batch.Cancelled() // does not decrease the TTL - s.observer.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events - return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(toSend), err) - } - - // with a correct server implementation should never happen, this error is not recoverable - if int(publishReply.AcceptedCount) > len(toSend) { - return fmt.Errorf( - "server returned unexpected results, expected maximum accepted items %d, got %d", - len(toSend), - publishReply.AcceptedCount, - ) - } - toSend = toSend[publishReply.AcceptedCount:] - lastAcceptedIndex = publishReply.AcceptedIndex - s.log.Debugf("%d events have been accepted during a publish request", publishReply.AcceptedCount) - } - - s.log.Debugf("total of %d events have been accepted from batch, %d dropped", convertedCount, droppedCount) - - // We've sent as much as we can to the shipper, release the batch's events and - // save it in the queue of batches awaiting acknowledgment. - batch.FreeEntries() - s.ackBatchChan <- pendingBatch{ - batch: batch, - index: lastAcceptedIndex, - eventCount: len(events), - droppedCount: droppedCount, - } - - return nil -} - -// Close closes the connection to the shipper server. -// Also, implements `outputs.Client` -func (s *shipper) Close() error { - if s.conn == nil { - return fmt.Errorf("connection is not established") - } - s.ackCancel() - s.ackWaitGroup.Wait() - - err := s.conn.Close() - s.conn = nil - s.client = nil - - return err -} - -// String implements `outputs.Client` -func (s *shipper) String() string { - return "shipper" -} - -func (s *shipper) startACKLoop() error { - ctx, cancel := context.WithCancel(context.Background()) - s.ackCancel = cancel - - indexClient, err := s.client.PersistedIndex(ctx, &messages.PersistedIndexRequest{ - PollingInterval: durationpb.New(s.config.AckPollingInterval), - }) - if err != nil { - return fmt.Errorf("failed to connect to the server: %w", err) - } - indexReply, err := indexClient.Recv() - if err != nil { - return fmt.Errorf("failed to fetch server information: %w", err) - } - s.serverID = indexReply.GetUuid() - - s.log.Debugf("connection to %s (%s) established.", s.config.Server, s.serverID) - - s.ackClient = indexClient - s.ackBatchChan = make(chan pendingBatch) - s.ackIndexChan = make(chan uint64) - s.ackWaitGroup.Add(2) - - go func() { - s.ackWorker(ctx) - s.ackWaitGroup.Done() - }() - - go func() { - err := s.ackListener(ctx) - s.ackWaitGroup.Done() - if err != nil { - s.log.Errorf("acknowledgment listener stopped: %s", err) - - // Shut down the connection and clear the output metadata. - // This will not propagate back to the pipeline immediately, - // but the next time Publish is called it will return an error - // because there is no connection, which will signal the pipeline - // to try to revive this output worker via Connect(). - s.Close() - } - }() - - return nil -} - -// ackListener's only job is to listen to the persisted index RPC stream -// and forward its values to the ack worker. -func (s *shipper) ackListener(ctx context.Context) error { - s.log.Debugf("starting acknowledgment listener with server %s", s.serverID) - for { - indexReply, err := s.ackClient.Recv() - if err != nil { - select { - case <-ctx.Done(): - // If our context has been closed, this is an intentional closed - // connection, so don't return the error. - return nil - default: - // If the context itself is not closed then this means a real - // connection error. - return fmt.Errorf("ack listener closed connection: %w", err) - } - } - s.ackIndexChan <- indexReply.PersistedIndex - } -} - -// ackWorker listens for newly published batches awaiting acknowledgment, -// and for new persisted indexes that should be forwarded to already-published -// batches. -func (s *shipper) ackWorker(ctx context.Context) { - s.log.Debugf("starting acknowledgment loop with server %s", s.serverID) - - pending := []pendingBatch{} - for { - select { - case <-ctx.Done(): - // If there are any pending batches left when the ack loop returns, then - // they will never be acknowledged, so send the cancel signal. - for _, p := range pending { - p.batch.Cancelled() - } - return - - case newBatch := <-s.ackBatchChan: - pending = append(pending, newBatch) - - case newIndex := <-s.ackIndexChan: - lastProcessed := 0 - for _, p := range pending { - // if we met a batch that is ahead of the persisted index - // we stop iterating and wait for another update from the server. - // The latest pending batch has the max(AcceptedIndex). - if p.index > newIndex { - break - } - - p.batch.ACK() - ackedCount := p.eventCount - p.droppedCount - s.observer.Acked(ackedCount) - s.log.Debugf("%d events have been acknowledged, %d dropped", ackedCount, p.droppedCount) - lastProcessed++ - } - // so we don't perform any manipulation when the pending list is empty - // or none of the batches were acknowledged by this persisted index update - if lastProcessed != 0 { - remaining := len(pending) - lastProcessed - copy(pending[0:], pending[lastProcessed:]) - pending = pending[:remaining] - } - } - } -} - -func toShipperEvent(e publisher.Event) (*messages.Event, error) { - meta, err := helpers.NewValue(e.Content.Meta) - if err != nil { - return nil, fmt.Errorf("failed to convert event metadata to protobuf: %w", err) - } - - fields, err := helpers.NewValue(e.Content.Fields) - if err != nil { - return nil, fmt.Errorf("failed to convert event fields to protobuf: %w", err) - } - - source := &messages.Source{} - ds := &messages.DataStream{} - - inputIDVal, err := e.Content.Meta.GetValue("input_id") - if err == nil { - source.InputId, _ = inputIDVal.(string) - } - - streamIDVal, err := e.Content.Meta.GetValue("stream_id") - if err == nil { - source.StreamId, _ = streamIDVal.(string) - } - - dsType, err := e.Content.Fields.GetValue("data_stream.type") - if err == nil { - ds.Type, _ = dsType.(string) - } - dsNamespace, err := e.Content.Fields.GetValue("data_stream.namespace") - if err == nil { - ds.Namespace, _ = dsNamespace.(string) - } - dsDataset, err := e.Content.Fields.GetValue("data_stream.dataset") - if err == nil { - ds.Dataset, _ = dsDataset.(string) - } - - return &messages.Event{ - Timestamp: timestamppb.New(e.Content.Timestamp), - Metadata: meta.GetStructValue(), - Fields: fields.GetStructValue(), - Source: source, - DataStream: ds, - }, nil -} diff --git a/libbeat/outputs/shipper/shipper_test.go b/libbeat/outputs/shipper/shipper_test.go deleted file mode 100644 index e26d44635af..00000000000 --- a/libbeat/outputs/shipper/shipper_test.go +++ /dev/null @@ -1,652 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package shipper - -import ( - "context" - "errors" - "fmt" - "net" - "reflect" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/outputs" - "github.com/elastic/beats/v7/libbeat/outputs/outest" - "github.com/elastic/beats/v7/libbeat/outputs/shipper/api" - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/pipeline" - "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-shipper-client/pkg/helpers" - pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" -) - -func TestToShipperEvent(t *testing.T) { - ts := time.Now().Truncate(time.Second) - - cases := []struct { - name string - value publisher.Event - exp *messages.Event - expErr string - }{ - { - name: "successfully converts an event without source and data stream", - value: publisher.Event{ - Content: beat.Event{ - Timestamp: ts, - Meta: mapstr.M{ - "metafield": 42, - }, - Fields: mapstr.M{ - "field": "117", - }, - }, - }, - exp: &messages.Event{ - Timestamp: timestamppb.New(ts), - Source: &messages.Source{}, - DataStream: &messages.DataStream{}, - Metadata: protoStruct(t, map[string]interface{}{ - "metafield": 42, - }), - Fields: protoStruct(t, map[string]interface{}{ - "field": "117", - }), - }, - }, - { - name: "successfully converts an event with source and data stream", - value: publisher.Event{ - Content: beat.Event{ - Timestamp: ts, - Meta: mapstr.M{ - "metafield": 42, - "input_id": "input", - "stream_id": "stream", - }, - Fields: mapstr.M{ - "field": "117", - "data_stream": mapstr.M{ - "type": "ds-type", - "namespace": "ds-namespace", - "dataset": "ds-dataset", - }, - }, - }, - }, - exp: &messages.Event{ - Timestamp: timestamppb.New(ts), - Source: &messages.Source{ - InputId: "input", - StreamId: "stream", - }, - DataStream: &messages.DataStream{ - Type: "ds-type", - Namespace: "ds-namespace", - Dataset: "ds-dataset", - }, - Metadata: protoStruct(t, map[string]interface{}{ - "metafield": 42, - "input_id": "input", - "stream_id": "stream", - }), - Fields: protoStruct(t, map[string]interface{}{ - "field": "117", - "data_stream": map[string]interface{}{ - "type": "ds-type", - "namespace": "ds-namespace", - "dataset": "ds-dataset", - }, - }), - }, - }, - } - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - converted, err := toShipperEvent(tc.value) - if tc.expErr != "" { - require.Error(t, err) - require.Contains(t, err.Error(), tc.expErr) - require.Nil(t, converted) - return - } - requireEqualProto(t, tc.exp, converted) - }) - } -} - -func TestPublish(t *testing.T) { - events := []beat.Event{ - { - Meta: mapstr.M{"event": "first"}, - Fields: mapstr.M{"a": "b"}, - }, - { - Meta: nil, // see failMarshal() - Fields: mapstr.M{"a": "b"}, - }, - { - Meta: mapstr.M{"event": "third"}, - Fields: mapstr.M{"e": "f"}, - }, - } - - cases := []struct { - name string - events []beat.Event - expSignals []outest.BatchSignal - serverError error - expError string - // note: this sets the queue size used by the mock output - // if the mock shipper receives more than this count of events, the test will fail - qSize int - observerExpected *TestObserver - marshalMethod func(e publisher.Event) (*messages.Event, error) - }{ - { - name: "sends a batch", - events: events, - marshalMethod: toShipperEvent, - expSignals: []outest.BatchSignal{ - {Tag: outest.BatchACK}, - }, - qSize: 3, - observerExpected: &TestObserver{batch: 3, acked: 3}, - }, - { - name: "retries not accepted events", - events: events, - expSignals: []outest.BatchSignal{ - {Tag: outest.BatchACK}, - }, - marshalMethod: failMarshal, // emulate a dropped event - qSize: 3, - observerExpected: &TestObserver{batch: 3, dropped: 1, acked: 2}, - }, - { - name: "cancels the batch if server error", - events: events, - expSignals: []outest.BatchSignal{ - {Tag: outest.BatchCancelled}, - }, - marshalMethod: toShipperEvent, - qSize: 3, - observerExpected: &TestObserver{cancelled: 3, batch: 3}, - serverError: errors.New("some error"), - expError: "failed to publish the batch to the shipper, none of the 3 events were accepted", - }, - { - name: "splits the batch on resource exceeded error", - events: events, - expSignals: []outest.BatchSignal{ - {Tag: outest.BatchSplitRetry}, - }, - marshalMethod: toShipperEvent, - qSize: 3, - observerExpected: &TestObserver{batch: 3, split: 1}, - serverError: status.Error(codes.ResourceExhausted, "rpc size limit exceeded"), - }, - { - name: "drops an unsplittable batch on resource exceeded error", - events: events[:1], // only 1 event so SplitRetry returns false - expSignals: []outest.BatchSignal{ - {Tag: outest.BatchSplitRetry}, - {Tag: outest.BatchDrop}, - }, - marshalMethod: toShipperEvent, - qSize: 1, - observerExpected: &TestObserver{batch: 1, dropped: 1}, - serverError: status.Error(codes.ResourceExhausted, "rpc size limit exceeded"), - }, - } - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - if tc.marshalMethod != nil { - shipperProcessor = tc.marshalMethod - } - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - addr, producer, stop := runServer( - t, tc.qSize, constErrorCallback(tc.serverError), "localhost:0") - defer stop() - - cfg, err := config.NewConfigFrom(map[string]interface{}{ - "server": addr, - }) - require.NoError(t, err) - observer := &TestObserver{} - - client := createShipperClient(t, cfg, observer) - - batch := outest.NewBatch(tc.events...) - - err = client.Publish(ctx, batch) - if tc.expError != "" { - require.Error(t, err) - require.Contains(t, err.Error(), tc.expError) - } else { - require.NoError(t, err) - producer.Persist(uint64(tc.qSize)) // always persisted all published events - } - - assert.Eventually(t, func() bool { - // there is a background routine that checks acknowledgments, - // it should eventually change the status of the batch - return reflect.DeepEqual(tc.expSignals, batch.Signals) - }, 100*time.Millisecond, 10*time.Millisecond) - require.Equal(t, tc.expSignals, batch.Signals) - if tc.observerExpected != nil { - require.Equal(t, tc.observerExpected, observer) - } - }) - } - // reset marshaler - shipperProcessor = toShipperEvent - - t.Run("cancels the batch when a different server responds", func(t *testing.T) { - t.Skip("Flaky test: https://github.com/elastic/beats/issues/34984") - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - addr, _, stop := runServer(t, 5, nil, "localhost:0") - defer stop() - - cfg, err := config.NewConfigFrom(map[string]interface{}{ - "server": addr, - "timeout": 5, // 5 sec - "backoff": map[string]interface{}{ - "init": "10ms", - "max": "5s", - }, - }) - require.NoError(t, err) - observer := &TestObserver{} - client := createShipperClient(t, cfg, observer) - - // Should accept the batch and put it to the pending list - batch := outest.NewBatch(events...) - err = client.Publish(ctx, batch) - require.NoError(t, err) - - // Replace the server (would change the ID) - stop() - - _, _, stop = runServer(t, 5, nil, addr) - defer stop() - err = client.Connect() - require.NoError(t, err) - - expSignals := []outest.BatchSignal{ - { - Tag: outest.BatchCancelled, - }, - } - assert.Eventually(t, func() bool { - // there is a background routine that checks acknowledgments, - // it should eventually cancel the batch because the IDs don't match - return reflect.DeepEqual(expSignals, batch.Signals) - }, 100*time.Millisecond, 10*time.Millisecond) - require.Equal(t, expSignals, batch.Signals) - }) - - t.Run("acks multiple batches", func(t *testing.T) { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - addr, producer, stop := runServer(t, 9, nil, "localhost:0") - defer stop() - - cfg, err := config.NewConfigFrom(map[string]interface{}{ - "server": addr, - "timeout": 5, // 5 sec - "backoff": map[string]interface{}{ - "init": "10ms", - "max": "5s", - }, - }) - require.NoError(t, err) - observer := &TestObserver{} - expectedObserver := &TestObserver{batch: 9, acked: 9} - client := createShipperClient(t, cfg, observer) - - // Should accept the batch and put it to the pending list - batch1 := outest.NewBatch(events...) - err = client.Publish(ctx, batch1) - require.NoError(t, err) - - batch2 := outest.NewBatch(events...) - err = client.Publish(ctx, batch2) - require.NoError(t, err) - - batch3 := outest.NewBatch(events...) - err = client.Publish(ctx, batch3) - require.NoError(t, err) - - expSignals := []outest.BatchSignal{ - { - Tag: outest.BatchACK, - }, - } - - producer.Persist(9) // 2 events per batch, 3 batches - - assert.Eventually(t, func() bool { - // there is a background routine that checks acknowledgments, - // it should eventually send expected signals - return reflect.DeepEqual(expSignals, batch1.Signals) && - reflect.DeepEqual(expSignals, batch2.Signals) && - reflect.DeepEqual(expSignals, batch3.Signals) - }, 100*time.Millisecond, 10*time.Millisecond) - require.Equal(t, expSignals, batch1.Signals, "batch1") - require.Equal(t, expSignals, batch2.Signals, "batch2") - require.Equal(t, expSignals, batch3.Signals, "batch3") - require.Equal(t, expectedObserver, observer) - }) - - t.Run("live batches where all events are too large to ingest", func(t *testing.T) { - // This tests recursive retry using live `ttlBatch` structs instead of mocks - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - errCallback := constErrorCallback(status.Error(codes.ResourceExhausted, "rpc size limit exceeded")) - addr, _, stop := runServer(t, 9, errCallback, "localhost:0") - defer stop() - cfg, err := config.NewConfigFrom(map[string]interface{}{ - "server": addr, - }) - require.NoError(t, err) - observer := &TestObserver{} - - client := createShipperClient(t, cfg, observer) - - // Since we retry directly instead of going through a live pipeline, - // the Publish call is synchronous and we can track state by modifying - // local variables directly. - retryCount := 0 - done := false - batch := pipeline.NewBatchForTesting( - []publisher.Event{ - {Content: events[0]}, {Content: events[1]}, {Content: events[2]}, - }, - func(b publisher.Batch) { - // Retry by sending directly back to Publish. In a live - // pipeline, this would be sent through eventConsumer first - // before calling Publish on the next free output worker. - retryCount++ - err := client.Publish(ctx, b) - assert.NoError(t, err, "Publish shouldn't return an error") - }, - func() { done = true }, - ) - err = client.Publish(ctx, batch) - assert.NoError(t, err, "Publish shouldn't return an error") - - // For three events there should be four retries in total: - // {[event1], [event2, event3]}, then {[event2], [event3]}. - // "done" should be true because after splitting into individual - // events, all 3 will fail and be dropped. - assert.Equal(t, 4, retryCount, "three-event batch should produce four total retries") - assert.True(t, done, "batch should be done after Publish") - - // "batch" adds up all events passed into publish, including repeats, - // so it should be 3 + 2 + 1 + 1 + 1 = 8 - expectedObserver := &TestObserver{split: 2, dropped: 3, batch: 8} - require.Equal(t, expectedObserver, observer) - }) - - t.Run("live batches where only one event is too large to ingest", func(t *testing.T) { - // This tests retry using live `ttlBatch` structs instead of mocks, - // where one event is too large too ingest but the others are ok. - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - errCallback := func(batchEvents []*messages.Event) error { - // Treat only the first event (which contains the metadata - // string "first") as too large to ingest, and accept otherwise. - for _, e := range batchEvents { - if strings.Contains(e.String(), "\"first\"") { - return status.Error(codes.ResourceExhausted, "rpc size limit exceeded") - } - } - return nil - } - addr, _, stop := runServer(t, 9, errCallback, "localhost:0") - defer stop() - cfg, err := config.NewConfigFrom(map[string]interface{}{ - "server": addr, - }) - require.NoError(t, err) - observer := &TestObserver{} - - client := createShipperClient(t, cfg, observer) - - // Since we retry directly instead of going through a live pipeline, - // the Publish call is synchronous and we can track state by modifying - // local variables directly. - retryCount := 0 - done := false - batch := pipeline.NewBatchForTesting( - []publisher.Event{ - {Content: events[0]}, {Content: events[1]}, {Content: events[2]}, - }, - func(b publisher.Batch) { - // Retry by sending directly back to Publish. In a live - // pipeline, this would be sent through eventConsumer first - // before calling Publish on the next free output worker. - retryCount++ - err := client.Publish(ctx, b) - assert.NoError(t, err, "Publish shouldn't return an error") - }, - func() { done = true }, - ) - err = client.Publish(ctx, batch) - assert.NoError(t, err, "Publish shouldn't return an error") - - // Only the first event is too large -- it will be retried by - // itself and the other batch will succeed, so retryCount should - // be 2. - // "done" should be false because the shipper output doesn't call done - // until upstream ingestion is confirmed via PersistedIndex. - assert.Equal(t, 2, retryCount, "three-event batch should produce four total retries") - assert.False(t, done, "batch should be acknowledged after Publish") - - // "batch" adds up all events passed into publish, including repeats, - // so it should be 3 + 1 + 2 = 6 - expectedObserver := &TestObserver{split: 1, dropped: 1, batch: 6} - require.Equal(t, expectedObserver, observer) - }) -} - -// BenchmarkToShipperEvent is used to detect performance regression when the conversion function is changed. -func BenchmarkToShipperEvent(b *testing.B) { - ts := time.Date(2022, time.July, 8, 16, 00, 00, 00, time.UTC) - str := strings.Repeat("somelongstring", 100) - - // This event causes to go through every code path during the event conversion - e := publisher.Event{Content: beat.Event{ - Timestamp: ts, - Meta: mapstr.M{ - "input_id": "someinputid", - "stream_id": "somestreamid", - "data_stream": mapstr.M{ - "type": "logs", - "namespace": "default", - "dataset": "default", - }, - "number": 42, - "string": str, - "time": ts, - "bytes": []byte(str), - "list": []interface{}{str, str, str}, - "nil": nil, - }, - Fields: mapstr.M{ - "inner": mapstr.M{ - "number": 42, - "string": str, - "time": ts, - "bytes": []byte(str), - "list": []interface{}{str, str, str}, - "nil": nil, - }, - "number": 42, - "string": str, - "time": ts, - "bytes": []byte(str), - "list": []interface{}{str, str, str}, - "nil": nil, - }, - }} - - for i := 0; i < b.N; i++ { - pe, err := toShipperEvent(e) - require.NoError(b, err) - bytes, err := proto.Marshal(pe) - require.NoError(b, err) - require.NotEmpty(b, bytes) - } -} - -// runServer mocks the shipper mock server for testing -// `qSize` is a slice of the event buffer in the mock -// `err` is a preset error that the server will serve to the client -// `listenAddr` is the address for the server to listen -// returns `actualAddr` where the listener actually is and the `stop` function to stop the server -func runServer( - t *testing.T, - qSize int, - errCallback func([]*messages.Event) error, - listenAddr string, -) (actualAddr string, mock *api.ProducerMock, stop func()) { - producer := api.NewProducerMock(qSize) - producer.ErrorCallback = errCallback - grpcServer := grpc.NewServer() - pb.RegisterProducerServer(grpcServer, producer) - - listener, err := net.Listen("tcp", listenAddr) - require.NoError(t, err) - go func() { - _ = grpcServer.Serve(listener) - }() - - actualAddr = listener.Addr().String() - stop = func() { - grpcServer.Stop() - listener.Close() - } - - return actualAddr, producer, stop -} - -func constErrorCallback(err error) func([]*messages.Event) error { - return func(_ []*messages.Event) error { - return err - } -} - -func createShipperClient(t *testing.T, cfg *config.C, observer outputs.Observer) outputs.NetworkClient { - group, err := makeShipper( - nil, - beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, - observer, - cfg, - ) - require.NoError(t, err) - require.Len(t, group.Clients, 1) - - client := group.Clients[0].(outputs.NetworkClient) - - err = client.Connect() - require.NoError(t, err) - - return client -} - -func protoStruct(t *testing.T, values map[string]interface{}) *messages.Struct { - s, err := helpers.NewStruct(values) - require.NoError(t, err) - return s -} - -func requireEqualProto(t *testing.T, expected, actual proto.Message) { - require.True( - t, - proto.Equal(expected, actual), - fmt.Sprintf("These two protobuf messages are not equal:\nexpected: %v\nactual: %v", expected, actual), - ) -} - -// emulates the toShipperEvent, but looks for a nil meta field, and throws an error -func failMarshal(e publisher.Event) (*messages.Event, error) { - if e.Content.Meta == nil { - return nil, fmt.Errorf("nil meta field") - } - return toShipperEvent(e) -} - -// mock test observer for tracking events - -type TestObserver struct { - acked int - dropped int - cancelled int - batch int - duplicate int - failed int - split int - - writeError error - readError error - - writeBytes int - readBytes int - - errTooMany int -} - -func (to *TestObserver) NewBatch(batch int) { to.batch += batch } -func (to *TestObserver) Acked(acked int) { to.acked += acked } -func (to *TestObserver) ReportLatency(_ time.Duration) {} -func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate } -func (to *TestObserver) Failed(failed int) { to.failed += failed } -func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped } -func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled } -func (to *TestObserver) Split() { to.split++ } -func (to *TestObserver) WriteError(we error) { to.writeError = we } -func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb } -func (to *TestObserver) ReadError(re error) { to.readError = re } -func (to *TestObserver) ReadBytes(rb int) { to.readBytes += rb } -func (to *TestObserver) ErrTooMany(err int) { to.errTooMany = +err } diff --git a/libbeat/publisher/event.go b/libbeat/publisher/event.go index 77ab6716f99..efd5220740e 100644 --- a/libbeat/publisher/event.go +++ b/libbeat/publisher/event.go @@ -47,17 +47,6 @@ type Batch interface { // batch.Drop() if necessary). SplitRetry() bool - // Release the internal pointer to this batch's events but do not yet - // acknowledge this batch. This exists specifically for the shipper output, - // where there is potentially a long gap between when events are handed off - // to the shipper and when they are acknowledged upstream; during that time, - // we need to preserve batch metadata for producer end-to-end acknowledgments, - // but we do not need the events themselves since they are already queued by - // the shipper. It is only guaranteed to release event pointers when using the - // proxy queue. - // Never call this on a batch that might be retried. - FreeEntries() - // Send was aborted, try again but don't decrease the batch's TTL counter. Cancelled() } diff --git a/libbeat/publisher/includes/includes.go b/libbeat/publisher/includes/includes.go index c1e2d02e3cf..ccb69d8e475 100644 --- a/libbeat/publisher/includes/includes.go +++ b/libbeat/publisher/includes/includes.go @@ -28,7 +28,6 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/kafka" _ "github.com/elastic/beats/v7/libbeat/outputs/logstash" _ "github.com/elastic/beats/v7/libbeat/outputs/redis" - _ "github.com/elastic/beats/v7/libbeat/outputs/shipper" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" ) diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index dcc2790f231..dab77fa5659 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -77,7 +77,6 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { events = append(events, event) } } - original.FreeEntries() b := &ttlBatch{ done: original.Done, @@ -166,10 +165,6 @@ func (b *ttlBatch) RetryEvents(events []publisher.Event) { b.Retry() } -func (b *ttlBatch) FreeEntries() { - b.events = nil -} - // reduceTTL reduces the time to live for all events that have no 'guaranteed' // sending requirements. reduceTTL returns true if the batch is still alive. func (b *ttlBatch) reduceTTL() bool { diff --git a/libbeat/publisher/pipeline/ttl_batch_test.go b/libbeat/publisher/pipeline/ttl_batch_test.go index 4c5207acbb0..769ccc37c35 100644 --- a/libbeat/publisher/pipeline/ttl_batch_test.go +++ b/libbeat/publisher/pipeline/ttl_batch_test.go @@ -112,12 +112,6 @@ func TestBatchCallsDoneAndFreesEvents(t *testing.T) { require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback") } -func TestNewBatchFreesEvents(t *testing.T) { - queueBatch := &mockQueueBatch{} - _ = newBatch(nil, queueBatch, 0) - assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch") -} - type mockQueueBatch struct { freeEntriesCalled int } @@ -133,10 +127,6 @@ func (b *mockQueueBatch) Entry(i int) queue.Entry { return fmt.Sprintf("event %v", i) } -func (b *mockQueueBatch) FreeEntries() { - b.freeEntriesCalled++ -} - type mockRetryer struct { batches []*ttlBatch } diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 1ac91e57ce1..7665c4fd780 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -34,14 +34,11 @@ import ( "testing" "time" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" ) var ( @@ -61,28 +58,12 @@ var ( ) // makePublisherEvent creates a sample publisher.Event, using a random message from msgs list -func makePublisherEvent() publisher.Event { +func makePublisherEvent(r *rand.Rand) publisher.Event { return publisher.Event{ Content: beat.Event{ Timestamp: eventTime, Fields: mapstr.M{ - "message": msgs[rand.Intn(len(msgs))], - }, - }, - } -} - -// makeMessagesEvent creates a sample *messages.Event, using a random message from msgs list -func makeMessagesEvent() *messages.Event { - return &messages.Event{ - Timestamp: timestamppb.New(eventTime), - Fields: &messages.Struct{ - Data: map[string]*messages.Value{ - "message": { - Kind: &messages.Value_StringValue{ - StringValue: msgs[rand.Intn(len(msgs))], - }, - }, + "message": msgs[r.Intn(len(msgs))], }, }, } @@ -99,7 +80,6 @@ func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue s.EncryptionKey = []byte("testtesttesttest") } s.UseCompression = compress - s.UseProtobuf = protobuf q, err := NewQueue(logp.L(), nil, s, nil) if err != nil { panic(err) @@ -116,14 +96,9 @@ func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue return q, p } -func publishEvents(p queue.Producer, num int, protobuf bool) { +func publishEvents(r *rand.Rand, p queue.Producer, num int) { for i := 0; i < num; i++ { - var e queue.Entry - if protobuf { - e = makeMessagesEvent() - } else { - e = makePublisherEvent() - } + e := makePublisherEvent(r) _, ok := p.Publish(e) if !ok { panic("didn't publish") @@ -149,15 +124,15 @@ func getAndAckEvents(q *diskQueue, num_events int, batch_size int) error { // produceAndConsume generates and publishes events in a go routine, in // the main go routine it consumes and acks them. This interleaves // publish and consume. -func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int, protobuf bool) error { - go publishEvents(p, num_events, protobuf) +func produceAndConsume(r *rand.Rand, p queue.Producer, q *diskQueue, num_events int, batch_size int) error { + go publishEvents(r, p, num_events) return getAndAckEvents(q, num_events, batch_size) } // produceThenConsume generates and publishes events, when all events // are published it consumes and acks them. -func produceThenConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int, protobuf bool) error { - publishEvents(p, num_events, protobuf) +func produceThenConsume(r *rand.Rand, p queue.Producer, q *diskQueue, num_events int, batch_size int) error { + publishEvents(r, p, num_events) return getAndAckEvents(q, num_events, batch_size) } @@ -169,15 +144,15 @@ func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool, for n := 0; n < b.N; n++ { b.StopTimer() - rand.Seed(1) + r := rand.New(rand.NewSource(1)) q, p := setup(b, encrypt, compress, protobuf) b.StartTimer() if async { - if err = produceAndConsume(p, q, num_events, batch_size, protobuf); err != nil { + if err = produceAndConsume(r, p, q, num_events, batch_size); err != nil { break } } else { - if err = produceThenConsume(p, q, num_events, batch_size, protobuf); err != nil { + if err = produceThenConsume(r, p, q, num_events, batch_size); err != nil { break } } diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index 33af260fc85..56e9d35e08d 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -69,9 +69,6 @@ type Settings struct { // UseCompression enables or disables LZ4 compression UseCompression bool - - // UseProtobuf enables protobuf serialization instead of CBOR - UseProtobuf bool } // userConfig holds the parameters for a disk queue that are configurable diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 0ebdcef5ad3..55098b10fa8 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -90,9 +90,6 @@ func (batch *diskQueueBatch) Entry(i int) queue.Entry { return batch.frames[i].event } -func (batch *diskQueueBatch) FreeEntries() { -} - func (batch *diskQueueBatch) Done() { batch.queue.acks.addFrames(batch.frames) } diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 5c04f9a0385..4fedcfa6a6e 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -289,16 +289,10 @@ func (dq *diskQueue) BufferConfig() queue.BufferConfig { } func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer { - var serializationFormat SerializationFormat - if dq.settings.UseProtobuf { - serializationFormat = SerializationProtobuf - } else { - serializationFormat = SerializationCBOR - } return &diskQueueProducer{ queue: dq, config: cfg, - encoder: newEventEncoder(serializationFormat), + encoder: newEventEncoder(SerializationCBOR), done: make(chan struct{}), } } diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index cc73ec17d8e..0460fc4431a 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "os" "path" "sort" @@ -170,13 +169,19 @@ func (s bySegmentID) Less(i, j int) bool { return s[i].id < s[j].id } // Scan the given path for segment files, and return them in a list // ordered by segment id. func scanExistingSegments(logger *logp.Logger, pathStr string) ([]*queueSegment, error) { - files, err := ioutil.ReadDir(pathStr) + dirEntries, err := os.ReadDir(pathStr) if err != nil { - return nil, fmt.Errorf("couldn't read queue directory '%s': %w", pathStr, err) + return nil, fmt.Errorf("could not read queue directory '%s': %w", pathStr, err) } segments := []*queueSegment{} - for _, file := range files { + for _, dirEntry := range dirEntries { + file, err := dirEntry.Info() + if err != nil { + logger.Errorf("could not get info for file '%s', skipping. Error: %w", dirEntry.Name(), err) + continue + } + components := strings.Split(file.Name(), ".") if len(components) == 2 && strings.ToLower(components[1]) == "seg" { // Parse the id as base-10 64-bit unsigned int. We ignore file names that @@ -251,11 +256,7 @@ func (segment *queueSegment) getReader(queueSettings Settings) (*segmentReader, // Version 1 is CBOR, Version 2 could be CBOR or ProtoBuf, the // options control which if header.version > 0 { - if (header.options & ENABLE_PROTOBUF) == ENABLE_PROTOBUF { - sr.serializationFormat = SerializationProtobuf - } else { - sr.serializationFormat = SerializationCBOR - } + sr.serializationFormat = SerializationCBOR } if (header.options & ENABLE_ENCRYPTION) == ENABLE_ENCRYPTION { @@ -297,10 +298,6 @@ func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, options = options | ENABLE_COMPRESSION } - if queueSettings.UseProtobuf { - options = options | ENABLE_PROTOBUF - } - sw := &segmentWriter{} sw.dst = file diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go index 0269fde07e2..d7e50b8f850 100644 --- a/libbeat/publisher/queue/diskqueue/serialize.go +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -25,13 +25,10 @@ import ( "fmt" "time" - "google.golang.org/protobuf/proto" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs/codec" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" "github.com/elastic/go-structform/cborl" "github.com/elastic/go-structform/gotype" "github.com/elastic/go-structform/json" @@ -40,9 +37,8 @@ import ( type SerializationFormat int const ( - SerializationJSON SerializationFormat = iota // 0 - SerializationCBOR // 1 - SerializationProtobuf // 2 + SerializationJSON SerializationFormat = iota // 0 + SerializationCBOR // 1 ) type eventEncoder struct { @@ -98,11 +94,6 @@ func (e *eventEncoder) encode(evt interface{}) ([]byte, error) { return nil, fmt.Errorf("incompatible serialization for type %T. Only CBOR is supported", v) } return e.encode_publisher_event(v) - case *messages.Event: - if e.serializationFormat != SerializationProtobuf { - return nil, fmt.Errorf("incompatible serialization for type %T. Only Protobuf is supported", v) - } - return proto.Marshal(v) default: return nil, fmt.Errorf("no known serialization format for type %T", v) } @@ -160,8 +151,6 @@ func (d *eventDecoder) Decode() (interface{}, error) { switch d.serializationFormat { case SerializationJSON, SerializationCBOR: return d.decodeJSONAndCBOR() - case SerializationProtobuf: - return d.decodeProtobuf() default: return nil, fmt.Errorf("unknown serialization format: %d", d.serializationFormat) } @@ -200,9 +189,3 @@ func (d *eventDecoder) decodeJSONAndCBOR() (publisher.Event, error) { }, }, nil } - -func (d *eventDecoder) decodeProtobuf() (*messages.Event, error) { - e := messages.Event{} - err := proto.Unmarshal(d.buf, &e) - return &e, err -} diff --git a/libbeat/publisher/queue/diskqueue/serialize_test.go b/libbeat/publisher/queue/diskqueue/serialize_test.go deleted file mode 100644 index 2ee5de6de6d..00000000000 --- a/libbeat/publisher/queue/diskqueue/serialize_test.go +++ /dev/null @@ -1,109 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package diskqueue - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" -) - -// A test to make sure serialization works correctly on multi-byte characters. -func TestSerialize(t *testing.T) { - tests := map[string]struct { - value string - format SerializationFormat - }{ - "Ascii only, CBOR": { - value: "{\"name\": \"Momotaro\"}", - format: SerializationCBOR, - }, - "Multi-byte, CBOR": { - value: "{\"name\": \"桃太郎\"}", - format: SerializationCBOR, - }, - "Ascii only, Protobuf": { - value: "{\"name\": \"Momotaro\"}", - format: SerializationProtobuf, - }, - "Multi-byte, Protobuf": { - value: "{\"name\": \"桃太郎\"}", - format: SerializationProtobuf, - }, - } - - for name, tc := range tests { - encoder := newEventEncoder(tc.format) - var event interface{} - switch tc.format { - case SerializationCBOR: - event = publisher.Event{ - Content: beat.Event{ - Fields: mapstr.M{ - "test_field": tc.value, - }, - }, - } - case SerializationProtobuf: - event = &messages.Event{ - Fields: &messages.Struct{ - Data: map[string]*messages.Value{ - "test_field": { - Kind: &messages.Value_StringValue{ - StringValue: tc.value, - }, - }, - }, - }, - } - } - serialized, err := encoder.encode(event) - assert.NoErrorf(t, err, "%s: Couldn't encode event, error: %v", name, err) - - // Use decoder to decode the serialized bytes. - decoder := newEventDecoder() - decoder.serializationFormat = tc.format - buf := decoder.Buffer(len(serialized)) - copy(buf, serialized) - decoded, err := decoder.Decode() - require.NoErrorf(t, err, "%s: Couldn't decode event", name) - - switch tc.format { - case SerializationCBOR: - event, ok := decoded.(publisher.Event) - require.True(t, ok) - decodedValue, err := event.Content.Fields.GetValue("test_field") - assert.NoErrorf(t, err, "%s: Couldn't get 'test_field'", name) - assert.Equal(t, tc.value, decodedValue) - case SerializationProtobuf: - event, ok := decoded.(*messages.Event) - require.True(t, ok) - d := event.GetFields().GetData() - test_field, prs := d["test_field"] - assert.Truef(t, prs, "'test_field' was not present in decoded event data") - decodedValue := test_field.GetStringValue() - assert.Equal(t, tc.value, decodedValue) - } - } -} diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 23569f02150..a42215f48a6 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -417,15 +417,6 @@ func (b *batch) Entry(i int) queue.Entry { return b.rawEntry(i).event } -func (b *batch) FreeEntries() { - // This signals that the event data has been copied out of the batch, and is - // safe to free from the queue buffer, so set all the event pointers to nil. - for i := 0; i < b.count; i++ { - index := (b.start + i) % len(b.queue.buf) - b.queue.buf[index].event = nil - } -} - func (b *batch) Done() { b.doneChan <- batchDoneMsg{} } diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index df2d16d0dec..41228046c53 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -438,44 +438,6 @@ func TestEntryIDs(t *testing.T) { }) } -func TestBatchFreeEntries(t *testing.T) { - const queueSize = 10 - const batchSize = 5 - // 1. Add 10 events to the queue, request two batches with 5 events each - // 2. Make sure the queue buffer has 10 non-nil events - // 3. Call FreeEntries on the second batch - // 4. Make sure only events 6-10 are nil - // 5. Call FreeEntries on the first batch - // 6. Make sure all events are nil - testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0, nil) - producer := testQueue.Producer(queue.ProducerConfig{}) - for i := 0; i < queueSize; i++ { - _, ok := producer.Publish(i) - require.True(t, ok, "Queue publish must succeed") - } - batch1, err := testQueue.Get(batchSize) - require.NoError(t, err, "Queue read must succeed") - require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request") - batch2, err := testQueue.Get(batchSize) - require.NoError(t, err, "Queue read must succeed") - require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request") - // Slight concurrency subtlety: we check events are non-nil after the queue - // reads, since if we do it before we have no way to be sure the insert - // has been completed. - for i := 0; i < queueSize; i++ { - require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil") - } - batch2.FreeEntries() - for i := 0; i < batchSize; i++ { - require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i) - require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i) - } - batch1.FreeEntries() - for i := 0; i < queueSize; i++ { - require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches") - } -} - // producerACKWaiter is a helper that can listen to queue producer callbacks // and wait on them from the test thread, so we can test the queue's asynchronous // behavior without relying on time.Sleep. diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index e691c2888f6..8758c055945 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -148,10 +148,6 @@ type Producer interface { type Batch interface { Count() int Entry(i int) Entry - // Release the internal references to the contained events, if - // supported (the disk queue does not yet implement it). - // Count() and Entry() cannot be used after this call. - FreeEntries() Done() } diff --git a/x-pack/filebeat/cmd/agent.go b/x-pack/filebeat/cmd/agent.go index e7125ab0ffa..4183ddc06f5 100644 --- a/x-pack/filebeat/cmd/agent.go +++ b/x-pack/filebeat/cmd/agent.go @@ -16,23 +16,16 @@ import ( func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { var modules []map[string]interface{} var err error - if rawIn.Type == "shipper" { // place filebeat in "shipper mode", with one filebeat input per agent config input - modules, err = management.CreateShipperInput(rawIn, "logs", agentInfo) - if err != nil { - return nil, fmt.Errorf("error creating shipper config from raw expected config: %w", err) - } - } else { - modules, err = management.CreateInputsFromStreams(rawIn, "logs", agentInfo) - if err != nil { - return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) - } + modules, err = management.CreateInputsFromStreams(rawIn, "logs", agentInfo) + if err != nil { + return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) + } - // Extract the module name from the stream-level type - // these types are defined in the elastic-agent's specfiles - for iter := range modules { - if _, ok := modules[iter]["type"]; !ok { - modules[iter]["type"] = rawIn.Type - } + // Extract the module name from the stream-level type + // these types are defined in the elastic-agent's specfiles + for iter := range modules { + if _, ok := modules[iter]["type"]; !ok { + modules[iter]["type"] = rawIn.Type } } diff --git a/x-pack/filebeat/cmd/root.go b/x-pack/filebeat/cmd/root.go index b348647b508..66e200e7078 100644 --- a/x-pack/filebeat/cmd/root.go +++ b/x-pack/filebeat/cmd/root.go @@ -6,7 +6,6 @@ package cmd import ( "fmt" - "os" "github.com/spf13/cobra" @@ -52,12 +51,6 @@ func defaultProcessors() []mapstr.M { // - add_docker_metadata: ~ // - add_kubernetes_metadata: ~ - // This gets called early enough that the CLI handling isn't properly initialized yet, - // so use an environment variable. - shipperEnv := os.Getenv("SHIPPER_MODE") - if shipperEnv == "True" { - return []mapstr.M{} - } return []mapstr.M{ { "add_host_metadata": mapstr.M{ @@ -68,5 +61,4 @@ func defaultProcessors() []mapstr.M { {"add_docker_metadata": nil}, {"add_kubernetes_metadata": nil}, } - } diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index e53538fbcef..2fa63535dbb 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -24,7 +24,6 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" "github.com/elastic/beats/v7/x-pack/filebeat/input/salesforce" - "github.com/elastic/beats/v7/x-pack/filebeat/input/shipper" "github.com/elastic/beats/v7/x-pack/filebeat/input/websocket" "github.com/elastic/elastic-agent-libs/logp" ) @@ -43,7 +42,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 awscloudwatch.Plugin(), lumberjack.Plugin(), salesforce.Plugin(log, store), - shipper.Plugin(log, store), websocket.Plugin(log, store), netflow.Plugin(log), benchmark.Plugin(), diff --git a/x-pack/filebeat/input/default-inputs/inputs_windows.go b/x-pack/filebeat/input/default-inputs/inputs_windows.go index 821131c8bc2..7ec003623b6 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_windows.go +++ b/x-pack/filebeat/input/default-inputs/inputs_windows.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" - "github.com/elastic/beats/v7/x-pack/filebeat/input/shipper" "github.com/elastic/elastic-agent-libs/logp" ) @@ -40,7 +39,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 awss3.Plugin(store), awscloudwatch.Plugin(), lumberjack.Plugin(), - shipper.Plugin(log, store), etw.Plugin(), netflow.Plugin(log), } diff --git a/x-pack/filebeat/input/shipper/acker.go b/x-pack/filebeat/input/shipper/acker.go deleted file mode 100644 index 8bfd856828f..00000000000 --- a/x-pack/filebeat/input/shipper/acker.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package shipper - -import ( - "sync/atomic" -) - -type shipperAcker struct { - persistedIndex uint64 -} - -func newShipperAcker() *shipperAcker { - return &shipperAcker{persistedIndex: 0} -} - -// Update the input's persistedIndex by adding total to it. -// Despite the name, "total" here means an incremental total, i.e. -// the total number of events that are being acknowledged by this callback, not the total that have been sent overall. -// The acked parameter includes only those events that were successfully sent upstream rather than dropped by processors, etc., -// but since we don't make that distinction in persistedIndex we can probably ignore it. -func (acker *shipperAcker) Track(_ int, total int) { - atomic.AddUint64(&acker.persistedIndex, uint64(total)) -} - -func (acker *shipperAcker) PersistedIndex() uint64 { - return acker.persistedIndex -} diff --git a/x-pack/filebeat/input/shipper/config.go b/x-pack/filebeat/input/shipper/config.go deleted file mode 100644 index e28217d8712..00000000000 --- a/x-pack/filebeat/input/shipper/config.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package shipper - -import ( - "time" - - "github.com/elastic/elastic-agent-libs/config" -) - -// Instance represents all the config needed to start a single shipper input -// because a beat works fundamentally differently from the old shipper, we dont have to deal with async config that's being pieced together, -// this one config object recievewd on create has both the input and output config -type Instance struct { - // config for the shipper's gRPC input - Conn ConnectionConfig `config:",inline"` - Input InputConfig `config:",inline"` -} - -// ConnectionConfig is the shipper-relevant portion of the config received from input units -type ConnectionConfig struct { - Server string `config:"server"` - InitialTimeout time.Duration `config:"grpc_setup_timeout"` - TLS TLS `config:"ssl"` -} - -// TLS is TLS-specific shipper client settings -type TLS struct { - CAs []string `config:"certificate_authorities"` - Cert string `config:"certificate"` - Key string `config:"key"` -} - -// InputConfig represents the config for a shipper input. This is the complete config for that input, mirrored and sent to us. -// This is more or less the same as the the proto.UnitExpectedConfig type, but that doesn't have `config` struct tags, -// so for the sake of quick prototyping we're just (roughly) duplicating the structure here, minus any fields the shipper doesn't need (for now) -type InputConfig struct { - ID string `config:"id"` - Type string `config:"type"` - Name string `config:"name"` - DataStream DataStream `config:"data_stream"` - // for now don't try to parse the streams, - // once we have a better idea of how per-stream processors work, we can find a better way to unpack this - Streams []Stream `config:"streams"` -} - -// DataStream represents the datastream metadata from an input -type DataStream struct { - Dataset string `config:"dataset"` - Type string `config:"type"` - Namespace string `config:"namespace"` -} - -// Stream represents a single stream present inside an input. -// this field is largely unpredictable and varies by input type, -// we're just grabbing the fields the shipper needs. -type Stream struct { - ID string `config:"id"` - Processors []*config.C `config:"processors"` - Index string `config:"index"` -} diff --git a/x-pack/filebeat/input/shipper/input.go b/x-pack/filebeat/input/shipper/input.go deleted file mode 100644 index e1bda9a8768..00000000000 --- a/x-pack/filebeat/input/shipper/input.go +++ /dev/null @@ -1,283 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package shipper - -import ( - "errors" - "fmt" - "os" - "path/filepath" - "strings" - "sync" - "sync/atomic" - - v2 "github.com/elastic/beats/v7/filebeat/input/v2" - inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/acker" - "github.com/elastic/beats/v7/libbeat/feature" - "github.com/elastic/beats/v7/libbeat/processors" - "github.com/elastic/beats/v7/x-pack/filebeat/input/shipper/tools" - "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-shipper-client/pkg/helpers" - pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" - "github.com/elastic/go-concert/unison" - - "github.com/docker/go-units" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -const ( - inputName = "shipper" -) - -// Plugin registers the input -func Plugin(log *logp.Logger, _ inputcursor.StateStore) v2.Plugin { - return v2.Plugin{ - Name: inputName, - Stability: feature.Experimental, - Manager: NewInputManager(log), - } -} - -// InputManager wraps one stateless input manager -type InputManager struct { - log *logp.Logger -} - -// NewInputManager creates a new shipper input manager -func NewInputManager(log *logp.Logger) *InputManager { - log.Infof("creating new InputManager") - return &InputManager{ - log: log.Named("shipper-beat"), - } -} - -// Init initializes the manager -// not sure if the shipper needs to do anything at this point? -func (im *InputManager) Init(_ unison.Group) error { - return nil -} - -// Create creates the input from a given config -// in an attempt to speed things up, this will create the processors from the config before we have access to the pipeline to create the clients -func (im *InputManager) Create(cfg *config.C) (v2.Input, error) { - config := Instance{} - if err := cfg.Unpack(&config); err != nil { - return nil, fmt.Errorf("error unpacking config: %w", err) - } - // strip the config we get from agent - config.Conn.Server = strings.TrimPrefix(config.Conn.Server, "unix://") - // following lines are helpful for debugging config, - // will be useful as we figure out how to integrate with agent - - // raw := mapstr.M{} - // err := cfg.Unpack(&raw) - // if err != nil { - // return nil, fmt.Errorf("error unpacking debug config: %w", err) - // } - // im.log.Infof("creating a new shipper input with config: %s", raw.String()) - // im.log.Infof("parsed config as: %#v", config) - - // create a mapping of streams - // when we get a new event, we use this to decide what processors to use - streamDataMap := map[string]streamData{} - for _, stream := range config.Input.Streams { - // convert to an actual processor used by the client - procList, err := processors.New(stream.Processors) - if err != nil { - return nil, fmt.Errorf("error creating processors for input: %w", err) - } - im.log.Infof("created processors for %s: %s", stream.ID, procList.String()) - streamDataMap[stream.ID] = streamData{index: stream.Index, processors: procList} - } - return &shipperInput{log: im.log, cfg: config, srvMut: &sync.Mutex{}, streams: streamDataMap, shipperSrv: config.Conn.Server, acker: newShipperAcker()}, nil -} - -// shipperInput is the main runtime object for the shipper -type shipperInput struct { - log *logp.Logger - cfg Instance - streams map[string]streamData - - server *grpc.Server - shipper *ShipperServer - // TODO: we probably don't need this, and can just fetch the config - shipperSrv string - srvMut *sync.Mutex - - acker *shipperAcker - - // incrementing counter that serves as an event ID - eventIDInc uint64 -} - -// all the data associated with a given stream that the shipper needs access to. -type streamData struct { - index string - client beat.Client - processors beat.ProcessorList -} - -func (in *shipperInput) Name() string { return inputName } - -func (in *shipperInput) Test(ctx v2.TestContext) error { - return nil -} - -// Stop the shipper -func (in *shipperInput) Stop() { - in.log.Infof("shipper shutting down") - // stop individual clients - for streamID, stream := range in.streams { - err := stream.client.Close() - if err != nil { - in.log.Infof("error closing client for stream: %s: %w", streamID, stream) - } - } - in.srvMut.Lock() - defer in.srvMut.Unlock() - if in.shipper != nil { - err := in.shipper.Close() - if err != nil { - in.log.Debugf("Error stopping shipper input: %s", err) - } - in.shipper = nil - } - if in.server != nil { - in.server.GracefulStop() - in.server = nil - - } - err := os.Remove(in.shipperSrv) - if err != nil { - in.log.Debugf("error removing unix socket for grpc listener during shutdown: %s", err) - } -} - -// Run the shipper -func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) error { - in.log.Infof("Running shipper input") - // create clients ahead of time - for streamID, streamProc := range in.streams { - client, err := pipeline.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - EventListener: acker.TrackingCounter(in.acker.Track), - Processing: beat.ProcessingConfig{ - Processor: streamProc.processors, - DisableHost: true, - DisableType: true, - }, - }) - if err != nil { - return fmt.Errorf("error creating client for stream %s: %w", streamID, err) - } - defer client.Close() - in.log.Infof("Creating beat client for stream %s", streamID) - - newStreamData := streamData{client: client, index: in.streams[streamID].index, processors: in.streams[streamID].processors} - in.streams[streamID] = newStreamData - } - - // setup gRPC - err := in.setupgRPC(pipeline) - if err != nil { - return fmt.Errorf("error starting shipper gRPC server: %w", err) - } - in.log.Infof("done setting up gRPC server") - - // wait for shutdown - <-inputContext.Cancelation.Done() - - in.Stop() - - return nil -} - -func (in *shipperInput) setupgRPC(pipeline beat.Pipeline) error { - in.log.Infof("initializing grpc server at %s", in.shipperSrv) - // Currently no TLS until we figure out mTLS issues in agent/shipper - creds := insecure.NewCredentials() - opts := []grpc.ServerOption{ - grpc.Creds(creds), - grpc.MaxRecvMsgSize(64 * units.MiB), - } - - var err error - in.server = grpc.NewServer(opts...) - in.shipper, err = NewShipperServer(pipeline, in) - if err != nil { - return fmt.Errorf("error creating shipper server: %w", err) - } - - pb.RegisterProducerServer(in.server, in.shipper) - - in.srvMut.Lock() - - // treat most of these checking errors as "soft" errors - // Try to make the environment clean, but trust newListener() to fail if it can't just start. - - // paranoid checking, make sure we have the base directory. - dir := filepath.Dir(in.shipperSrv) - err = os.MkdirAll(dir, 0o755) - if err != nil { - in.log.Warnf("could not create directory for unix socket %s: %w", dir, err) - } - - // on linux, net.Listen will fail if the file already exists - err = os.Remove(in.shipperSrv) - if err != nil && !errors.Is(err, os.ErrNotExist) { - in.log.Warnf("could not remove pre-existing socket at %s: %w", in.shipperSrv, err) - } - - lis, err := newListener(in.log, in.shipperSrv) - if err != nil { - in.srvMut.Unlock() - return fmt.Errorf("failed to listen on %s: %w", in.shipperSrv, err) - } - - go func() { - in.log.Infof("gRPC listening on %s", in.shipperSrv) - err = in.server.Serve(lis) - if err != nil { - in.log.Errorf("gRPC server shut down with error: %s", err) - } - }() - - // make sure connection is up before mutex is released; - // if close() on the socket is called before it's started, it will trigger a race. - defer in.srvMut.Unlock() - con, err := tools.DialTestAddr(in.shipperSrv, in.cfg.Conn.InitialTimeout) - if err != nil { - // this will stop the other go routine in the wait group - in.server.Stop() - return fmt.Errorf("failed to test connection with the gRPC server on %s: %w", in.shipperSrv, err) - } - _ = con.Close() - - return nil -} - -func (in *shipperInput) sendEvent(event *messages.Event) (uint64, error) { - // look for matching processor config - stream, ok := in.streams[event.Source.StreamId] - if !ok { - return 0, fmt.Errorf("could not find data stream associated with ID '%s'", event.Source.StreamId) - } - - evt := beat.Event{ - Timestamp: event.Timestamp.AsTime(), - Fields: helpers.AsMap(event.Fields), - Meta: helpers.AsMap(event.Metadata), - } - atomic.AddUint64(&in.eventIDInc, 1) - - stream.client.Publish(evt) - - return in.eventIDInc, nil -} diff --git a/x-pack/filebeat/input/shipper/server.go b/x-pack/filebeat/input/shipper/server.go deleted file mode 100644 index 3350a235782..00000000000 --- a/x-pack/filebeat/input/shipper/server.go +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package shipper - -import ( - "context" - "errors" - "fmt" - "strings" - "sync" - "time" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - "github.com/gofrs/uuid" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/elastic-agent-libs/logp" - - pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" -) - -// ShipperServer handles the actual gRPC server and associated connections -type ShipperServer struct { - logger *logp.Logger - pipeline beat.Pipeline - - uuid string - - close *sync.Once - ctx context.Context - stop func() - - strictMode bool - - beatInput *shipperInput - - pb.UnimplementedProducerServer -} - -// NewShipperServer creates a new server instance for handling gRPC endpoints. -// publisher can be set to nil, in which case the SetOutput() method must be called. -func NewShipperServer(pipeline beat.Pipeline, shipper *shipperInput) (*ShipperServer, error) { - log := logp.NewLogger("shipper-server") - - id, err := uuid.NewV4() - if err != nil { - return nil, fmt.Errorf("error generating shipper UUID: %w", err) - } - - srv := ShipperServer{ - uuid: id.String(), - logger: log, - pipeline: pipeline, - close: &sync.Once{}, - beatInput: shipper, - strictMode: false, - } - - srv.ctx, srv.stop = context.WithCancel(context.Background()) - - return &srv, nil -} - -// PublishEvents is the server implementation of the gRPC PublishEvents call. -func (serv *ShipperServer) PublishEvents(_ context.Context, req *messages.PublishRequest) (*messages.PublishReply, error) { - resp := &messages.PublishReply{ - Uuid: serv.uuid, - } - // the value in the request is optional - if req.Uuid != "" && req.Uuid != serv.uuid { - serv.logger.Debugf("shipper UUID does not match, all events rejected. Expected = %s, actual = %s", serv.uuid, req.Uuid) - return resp, status.Error(codes.FailedPrecondition, fmt.Sprintf("UUID does not match. Expected = %s, actual = %s", serv.uuid, req.Uuid)) - } - - if len(req.Events) == 0 { - return nil, status.Error(codes.InvalidArgument, "publish request must contain at least one event") - } - - if serv.strictMode { - for _, e := range req.Events { - err := serv.validateEvent(e) - if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - } - } - - var accIdx uint64 - var err error - for _, evt := range req.Events { - accIdx, err = serv.beatInput.sendEvent(evt) - if err != nil { - serv.logger.Errorf("error sending event: %s", err) - } else { - resp.AcceptedCount++ - } - - } - resp.AcceptedIndex = accIdx - serv.logger. - Debugf("finished publishing a batch. Events = %d, accepted = %d, accepted index = %d", - len(req.Events), - resp.AcceptedCount, - resp.AcceptedIndex, - ) - - return resp, nil -} - -// PersistedIndex implementation. Will track and send the oldest unacked event in the queue. -func (serv *ShipperServer) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error { - serv.logger.Debug("new subscriber for persisted index change") - defer serv.logger.Debug("unsubscribed from persisted index change") - - err := producer.Send(&messages.PersistedIndexReply{ - Uuid: serv.uuid, - PersistedIndex: serv.beatInput.acker.PersistedIndex(), - }) - if err != nil { - return fmt.Errorf("error sending index reply: %w", err) - } - - pollingIntervalDur := req.PollingInterval.AsDuration() - if pollingIntervalDur == 0 { - return nil - } - - ticker := time.NewTicker(pollingIntervalDur) - defer ticker.Stop() - - for { - select { - case <-producer.Context().Done(): - return fmt.Errorf("producer context: %w", producer.Context().Err()) - - case <-serv.ctx.Done(): - return fmt.Errorf("server is stopped: %w", serv.ctx.Err()) - - case <-ticker.C: - serv.logger.Infof("persistedIndex=%d", serv.beatInput.acker.PersistedIndex()) - err = producer.Send(&messages.PersistedIndexReply{ - Uuid: serv.uuid, - PersistedIndex: serv.beatInput.acker.PersistedIndex(), - }) - if err != nil { - return fmt.Errorf("failed to send the update: %w", err) - } - } - } - -} - -// Close the server connection -func (serv *ShipperServer) Close() error { - return nil -} - -func (serv *ShipperServer) validateEvent(m *messages.Event) error { - var msgs []string - - if err := m.Timestamp.CheckValid(); err != nil { - msgs = append(msgs, fmt.Sprintf("timestamp: %s", err)) - } - - if err := serv.validateDataStream(m.DataStream); err != nil { - msgs = append(msgs, fmt.Sprintf("datastream: %s", err)) - } - - if err := serv.validateSource(m.Source); err != nil { - msgs = append(msgs, fmt.Sprintf("source: %s", err)) - } - - if len(msgs) == 0 { - return nil - } - - return errors.New(strings.Join(msgs, "; ")) -} - -func (serv *ShipperServer) validateSource(s *messages.Source) error { - if s == nil { - return fmt.Errorf("cannot be nil") - } - - var msgs []string - if s.InputId == "" { - msgs = append(msgs, "input_id is a required field") - } - - if len(msgs) == 0 { - return nil - } - - return errors.New(strings.Join(msgs, "; ")) -} - -func (serv *ShipperServer) validateDataStream(ds *messages.DataStream) error { - if ds == nil { - return fmt.Errorf("cannot be nil") - } - - var msgs []string - if ds.Dataset == "" { - msgs = append(msgs, "dataset is a required field") - } - if ds.Namespace == "" { - msgs = append(msgs, "namespace is a required field") - } - if ds.Type == "" { - msgs = append(msgs, "type is a required field") - } - - if len(msgs) == 0 { - return nil - } - - return errors.New(strings.Join(msgs, "; ")) -} diff --git a/x-pack/filebeat/input/shipper/srv_unix.go b/x-pack/filebeat/input/shipper/srv_unix.go deleted file mode 100644 index 2fc49b3a6cc..00000000000 --- a/x-pack/filebeat/input/shipper/srv_unix.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build !windows - -package shipper - -import ( - "net" - - "github.com/elastic/elastic-agent-libs/logp" -) - -func newListener(_ *logp.Logger, addr string) (net.Listener, error) { - return net.Listen("unix", addr) -} diff --git a/x-pack/filebeat/input/shipper/srv_windows.go b/x-pack/filebeat/input/shipper/srv_windows.go deleted file mode 100644 index 053533bf6c0..00000000000 --- a/x-pack/filebeat/input/shipper/srv_windows.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. -//go:build windows - -package shipper - -import ( - "fmt" - "net" - "os/user" - "strings" - - "github.com/elastic/elastic-agent-libs/api/npipe" - "github.com/elastic/elastic-agent-libs/logp" -) - -const ( - NTAUTHORITY_SYSTEM = "S-1-5-18" - ADMINISTRATORS_GROUP = "S-1-5-32-544" -) - -func newListener(log *logp.Logger, addr string) (net.Listener, error) { - sd, err := securityDescriptor(log) - if err != nil { - return nil, err - } - return npipe.NewListener(addr, sd) -} - -func securityDescriptor(log *logp.Logger) (string, error) { - u, err := user.Current() - if err != nil { - return "", fmt.Errorf("failed to get current user: %w", err) - } - // Named pipe security and access rights. - // We create the pipe and the specific users should only be able to write to it. - // See docs: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-security-and-access-rights - // String definition: https://docs.microsoft.com/en-us/windows/win32/secauthz/ace-strings - // Give generic read/write access to the specified user. - descriptor := "D:P(A;;GA;;;" + u.Uid + ")" - - if isAdmin, err := isWindowsAdmin(u); err != nil { - // do not fail, agent would end up in a loop, continue with limited permissions - log.Warnf("failed to detect admin: %w", err) - } else if isAdmin { - // running as SYSTEM, include Administrators group so Administrators can talk over - // the named pipe to the running Elastic Agent system process - // https://support.microsoft.com/en-us/help/243330/well-known-security-identifiers-in-windows-operating-systems - descriptor += "(A;;GA;;;" + ADMINISTRATORS_GROUP + ")" - } - return descriptor, nil -} - -func isWindowsAdmin(u *user.User) (bool, error) { - if u.Username == "NT AUTHORITY\\SYSTEM" { - return true, nil - } - - if equalsSystemGroup(u.Uid) || equalsSystemGroup(u.Gid) { - return true, nil - } - - groups, err := u.GroupIds() - if err != nil { - return false, fmt.Errorf("failed to get current user groups: %w", err) - } - - for _, groupSid := range groups { - if equalsSystemGroup(groupSid) { - return true, nil - } - } - - return false, nil -} - -func equalsSystemGroup(s string) bool { - return strings.EqualFold(s, NTAUTHORITY_SYSTEM) || strings.EqualFold(s, ADMINISTRATORS_GROUP) -} diff --git a/x-pack/filebeat/input/shipper/tools/test_unix.go b/x-pack/filebeat/input/shipper/tools/test_unix.go deleted file mode 100644 index c092029767f..00000000000 --- a/x-pack/filebeat/input/shipper/tools/test_unix.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. -//go:build !windows - -package tools - -import ( - "net" - "net/url" - "path/filepath" - "time" -) - -// DialTestAddr dials the address with the operating specific function -func DialTestAddr(addr string, timeout time.Duration) (net.Conn, error) { - dailer := net.Dialer{Timeout: timeout} - return dailer.Dial("unix", addr) -} - -// GenerateTestAddr creates a grpc address that is specific to the operating system -func GenerateTestAddr(path string) string { - var socket url.URL - socket.Scheme = "unix" - socket.Path = filepath.Join(path, "grpc.sock") - return socket.String() -} diff --git a/x-pack/filebeat/input/shipper/tools/test_windows.go b/x-pack/filebeat/input/shipper/tools/test_windows.go deleted file mode 100644 index 0808b237aa2..00000000000 --- a/x-pack/filebeat/input/shipper/tools/test_windows.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. -//go:build windows - -package tools - -import ( - "crypto/sha256" - "fmt" - "net" - "time" - - "github.com/Microsoft/go-winio" -) - -// DialTestAddr dials the address with the operating specific function -func DialTestAddr(addr string, timeout time.Duration) (net.Conn, error) { - return winio.DialPipe(addr, &timeout) -} - -// GenerateTestAddr creates a grpc address that is specific to the operating system -func GenerateTestAddr(path string) string { - // entire string cannot be longer than 256 characters, path - // should be unique for each test - return fmt.Sprintf(`\\.\pipe\shipper-%x-pipe`, sha256.Sum256([]byte(path))) -} diff --git a/x-pack/filebeat/input/shipper/tools/tools.go b/x-pack/filebeat/input/shipper/tools/tools.go deleted file mode 100644 index 9b434c83cb1..00000000000 --- a/x-pack/filebeat/input/shipper/tools/tools.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build tools -// +build tools - -package tools - -import ( - // mage notice will fail without this, since it'll try and fetch this with `go install` - _ "go.elastic.co/go-licence-detector" - - _ "github.com/elastic/elastic-agent-libs/dev-tools/mage" - - _ "gotest.tools/gotestsum/cmd" -) diff --git a/x-pack/filebeat/tests/integration/shipper_test.go b/x-pack/filebeat/tests/integration/shipper_test.go deleted file mode 100644 index 803258bd8cc..00000000000 --- a/x-pack/filebeat/tests/integration/shipper_test.go +++ /dev/null @@ -1,270 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration && !windows - -package integration - -import ( - "context" - "crypto/rand" - "encoding/json" - "fmt" - "os" - "path/filepath" - "testing" - "time" - - "github.com/sergi/go-diff/diffmatchpatch" - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/tests/integration" - "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/go-elasticsearch/v8" - "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" - "github.com/elastic/go-elasticsearch/v8/typedapi/types" - "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/operator" -) - -func TestShipperInputOutput(t *testing.T) { - integration.EnsureESIsRunning(t) - esURL := integration.GetESURL(t, "http") - esPassword, ok := esURL.User.Password() - require.True(t, ok, "ES didn't have a password") - kURL, kUserInfo := integration.GetKibana(t) - kPassword, ok := kUserInfo.Password() - require.True(t, ok, "Kibana didn't have a password") - - gRpcPath := filepath.Join(os.TempDir(), "grpc") - - // create file to ingest with a unique message - inputFilePath := filepath.Join(t.TempDir(), "test.log") - inputFile, err := os.Create(inputFilePath) - require.NoError(t, err, "error creating input test file") - uniqVal := make([]byte, 16) - _, err = rand.Read(uniqVal) - uniqMsg := fmt.Sprintf("%X", uniqVal) - require.NoError(t, err, "error getting a unique random value") - _, err = inputFile.Write([]byte(uniqMsg)) - require.NoError(t, err, "error writing input test file") - _, err = inputFile.Write([]byte("\n")) - require.NoError(t, err, "error writing new line") - err = inputFile.Close() - require.NoError(t, err, "error closing input test file") - - // Elasticsearch client - esCfg := elasticsearch.Config{ - Addresses: []string{esURL.String()}, - Username: esURL.User.Username(), - Password: esPassword, - } - es, err := elasticsearch.NewTypedClient(esCfg) - require.NoError(t, err, "error creating new es client") - - cfg := `filebeat.inputs: -- type: filestream - id: my-filestream-id - paths: - - %s -output.elasticsearch: - hosts: - - %s - username: %s - password: %s - allow_older_versions: true -setup.kibana: - hosts: %s - username: %s - password: %s -logging.level: debug - -queue.mem: - events: 100 - flush.min_events: 0 -processors: -- add_fields: - target: data_stream - fields: - type: logs - namespace: generic - dataset: generic -- add_fields: - target: host - fields: - name: %s -- add_fields: - target: agent - fields: - type: metricbeat -` - // check that file can be ingested normally and found in elasticsearch - filebeat := NewFilebeat(t) - filebeat.WriteConfigFile(fmt.Sprintf(cfg, inputFilePath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword, uniqMsg)) - filebeat.Start() - filebeat.WaitForLogs("Publish event: ", 10*time.Second) - filebeat.WaitForLogs("PublishEvents: ", 10*time.Second) - // It can take a few seconds for a doc to show up in a search - require.Eventually(t, func() bool { - res, err := es.Search(). - Index(".ds-filebeat-*"). - Request(&search.Request{ - Query: &types.Query{ - Match: map[string]types.MatchQuery{ - "message": { - Query: uniqMsg, - Operator: &operator.And, - }, - }, - }, - }).Do(context.Background()) - require.NoError(t, err, "error doing search request: %s", err) - return res.Hits.Total.Value == 1 - }, 30*time.Second, 250*time.Millisecond, "never found document") - - shipperCfg := `filebeat.inputs: -- type: shipper - server: unix://%s - id: my-shipper-id - data_stream: - data_set: generic - type: log - namespace: generic - streams: - - id: stream-id -output.elasticsearch: - hosts: - - %s - username: %s - password: %s - allow_older_versions: true -setup.kibana: - hosts: %s - username: %s - password: %s -logging.level: debug -queue.mem: - events: 100 - flush.min_events: 0 -` - // start a shipper filebeat, wait until gRPC service starts - shipper := NewFilebeat(t) - shipper.WriteConfigFile(fmt.Sprintf(shipperCfg, gRpcPath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword)) - shipper.Start() - shipper.WaitForLogs("done setting up gRPC server", 30*time.Second) - - fb2shipperCfg := `filebeat.inputs: -- type: filestream - id: my-filestream-id - paths: - - %s -output.shipper: - server: unix://%s -setup.kibana: - hosts: %s - username: %s - password: %s -logging.level: debug -queue.mem: - events: 100 - flush.min_events: 0 -processors: -- script: - lang: javascript - source: > - function process(event) { - event.Put("@metadata.stream_id", "stream-id"); - } -- add_fields: - target: data_stream - fields: - type: logs - namespace: generic - dataset: generic -- add_fields: - target: host - fields: - name: %s -- add_fields: - target: agent - fields: - type: metricbeat -` - // start filebeat with shipper output, make doc is ingested into elasticsearch - fb2shipper := NewFilebeat(t) - fb2shipper.WriteConfigFile(fmt.Sprintf(fb2shipperCfg, inputFilePath, gRpcPath, kURL.Host, kUserInfo.Username(), kPassword, uniqMsg)) - fb2shipper.Start() - fb2shipper.WaitForLogs("Publish event: ", 10*time.Second) - fb2shipper.WaitForLogs("events to protobuf", 10*time.Second) - require.Eventually(t, func() bool { - res, err := es.Search(). - Index(".ds-filebeat-*"). - Request(&search.Request{ - Query: &types.Query{ - Match: map[string]types.MatchQuery{ - "message": { - Query: uniqMsg, - Operator: &operator.And, - }, - }, - }, - }).Do(context.Background()) - require.NoError(t, err, "error doing search request: %s", err) - return res.Hits.Total.Value == 2 - }, 30*time.Second, 250*time.Millisecond, "never found 2 documents") - - res, err := es.Search(). - Index(".ds-filebeat-*"). - Request(&search.Request{ - Query: &types.Query{ - Match: map[string]types.MatchQuery{ - "message": { - Query: uniqMsg, - Operator: &operator.And, - }, - }, - }, - }).Do(context.Background()) - require.NoError(t, err, "error doing search request: %s", err) - require.Equal(t, int64(2), res.Hits.Total.Value) - diff, err := diffDocs(res.Hits.Hits[0].Source_, - res.Hits.Hits[1].Source_) - require.NoError(t, err, "error diffing docs") - if len(diff) != 0 { - t.Fatalf("docs differ:\n:%s\n", diff) - } -} - -func diffDocs(doc1 json.RawMessage, doc2 json.RawMessage) (string, error) { - fieldsToDrop := []string{ - "@timestamp", - "agent.ephemeral_id", - "agent.id", - "elastic_agent.id", - } - var d1 map[string]interface{} - var d2 map[string]interface{} - - if err := json.Unmarshal(doc1, &d1); err != nil { - return "", err - } - - if err := json.Unmarshal(doc2, &d2); err != nil { - return "", err - } - f1 := mapstr.M(d1).Flatten() - f2 := mapstr.M(d2).Flatten() - - for _, key := range fieldsToDrop { - _ = f1.Delete(key) - _ = f2.Delete(key) - } - - dmp := diffmatchpatch.New() - diffs := dmp.DiffMain(f1.StringToPrint(), f2.StringToPrint(), false) - - if len(diffs) != 1 { - return dmp.DiffPrettyText(diffs), nil - } - return "", nil -} diff --git a/x-pack/libbeat/management/generate.go b/x-pack/libbeat/management/generate.go index 59537e06686..34b9aaa7075 100644 --- a/x-pack/libbeat/management/generate.go +++ b/x-pack/libbeat/management/generate.go @@ -118,35 +118,6 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamTyp return inputs, nil } -// CreateShipperInput is a modified version of CreateInputsFromStreams made for forwarding input units to the shipper beat -// this does not create separate inputs for each stream, and instead passes it along as a single input, with just the processors added -func CreateShipperInput(raw *proto.UnitExpectedConfig, defaultDataStreamType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) ([]map[string]interface{}, error) { - inputs := make([]map[string]interface{}, len(raw.GetStreams())) - for iter, stream := range raw.GetStreams() { - streamSource := raw.GetStreams()[iter].GetSource().AsMap() - streamSource = injectIndexStream(defaultDataStreamType, raw, stream, streamSource) - // 1. global processors - streamSource = injectGlobalProcesssors(raw, streamSource) - - // 2. agentInfo - streamSource, err := injectAgentInfoRule(streamSource, agentInfo) - if err != nil { - return nil, fmt.Errorf("Error injecting agent processors: %w", err) - } - - // 3. stream processors - streamSource, err = injectStreamProcessors(raw, defaultDataStreamType, stream, streamSource, defaultProcessors) - if err != nil { - return nil, fmt.Errorf("Error injecting stream processors: %w", err) - } - inputs[iter] = streamSource - } - rawMap := raw.Source.AsMap() - rawMap["streams"] = inputs - - return []map[string]interface{}{rawMap}, nil -} - // CreateReloadConfigFromInputs turns a raw input/module list into the ConfigWithMeta type used by the reloader interface func CreateReloadConfigFromInputs(raw []map[string]interface{}) ([]*reload.ConfigWithMeta, error) { // format for the reloadable list needed bythe cm.Reload() method @@ -384,7 +355,7 @@ func groupByOutputs(outCfg *proto.UnitExpectedConfig) (*reload.ConfigWithMeta, e // We still need to emulate the InjectHeadersRule AST code, // I don't think we can get the `Headers()` data reported by the AgentInfo() sourceMap := outCfg.GetSource().AsMap() - outputType := outCfg.GetType() //nolint:typecheck // this is used, linter just doesn't seem to see it + outputType := outCfg.GetType() if outputType == "" { return nil, fmt.Errorf("output config does not have a configured type field") }