From 1638907f4bcc778437d068d47bd17a8ed55ded26 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 12 Jul 2022 18:13:44 +0200 Subject: [PATCH 01/18] Implement the shipper gRPC server All the endpoints are actually functioning now. --- go.mod | 2 +- go.sum | 4 +- server/config.go | 25 ++++ server/notifications.go | 95 +++++++++++++ server/run.go | 9 +- server/server.go | 294 ++++++++++++++++++++++++++++++++++++---- 6 files changed, 399 insertions(+), 30 deletions(-) create mode 100644 server/config.go create mode 100644 server/notifications.go diff --git a/go.mod b/go.mod index b5ead15..db38d16 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( require ( github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220603155004-ac7e079a9403 github.com/elastic/elastic-agent-client/v7 v7.0.0-20220607160924-1a71765a8bbe - github.com/elastic/elastic-agent-shipper-client v0.1.0 + github.com/elastic/elastic-agent-shipper-client v0.2.0 github.com/elastic/go-ucfg v0.8.5 github.com/magefile/mage v1.13.0 github.com/stretchr/testify v1.7.0 diff --git a/go.sum b/go.sum index 70c77db..f78d2e8 100644 --- a/go.sum +++ b/go.sum @@ -465,8 +465,8 @@ github.com/elastic/elastic-agent-libs v0.2.2/go.mod h1:1xDLBhIqBIjhJ7lr2s+xRFFkQ github.com/elastic/elastic-agent-libs v0.2.5/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= github.com/elastic/elastic-agent-libs v0.2.7 h1:JvjEJVyN7ERr2uDVs2jV+tXJaJUgFAjiqUN15EBWMJI= github.com/elastic/elastic-agent-libs v0.2.7/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= -github.com/elastic/elastic-agent-shipper-client v0.1.0 h1:tyDQyK2XY9Ss8a7QcHXrGRWnXBjtbwVafb98na4HVdA= -github.com/elastic/elastic-agent-shipper-client v0.1.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= +github.com/elastic/elastic-agent-shipper-client v0.2.0 h1:p+5ep48YCOe+3nICeWmiLwQV11yDLad2n4NunI66Shg= +github.com/elastic/elastic-agent-shipper-client v0.2.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= github.com/elastic/elastic-agent-system-metrics v0.3.1/go.mod h1:RIYhJOS7mUeyIthfOSqmmbEILYSzaDWLi5zQ70bQo+o= github.com/elastic/go-concert v0.2.0/go.mod h1:HWjpO3IAEJUxOeaJOWXWEp7imKd27foxz9V5vegC/38= github.com/elastic/go-libaudit/v2 v2.3.1-0.20220523121157-87f0a814a1c0/go.mod h1:GOkMRbzKV7ePrMOy+k6gGF0QvulQ16Cr38b60oirv8U= diff --git a/server/config.go b/server/config.go new file mode 100644 index 0000000..8febec1 --- /dev/null +++ b/server/config.go @@ -0,0 +1,25 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "errors" + "time" +) + +type ShipperServerConfig struct { + // PollingInterval is an interval for polling queue metrics and updating the indices. + // Must be greater than 0 + PollingInterval time.Duration +} + +// Validate validates the server configuration. +func (c ShipperServerConfig) Validate() error { + if c.PollingInterval <= 0 { + return errors.New("polling interval must be greater than 0") + } + + return nil +} diff --git a/server/notifications.go b/server/notifications.go new file mode 100644 index 0000000..b8ebc2e --- /dev/null +++ b/server/notifications.go @@ -0,0 +1,95 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "context" + "sync" + + "github.com/gofrs/uuid" +) + +const ( + // how many notifications can be in a notification buffer. + notificationBufferSize = 16 +) + +type change struct { + // persistedIndex is a changed persisted index or `nil` if the value has not changed. + persistedIndex *int64 +} + +// Any returns true if there is a change. +func (c change) Any() bool { + return c.persistedIndex != nil +} + +type notifications struct { + subscribers map[uuid.UUID]chan change + mutex *sync.Mutex +} + +func (n *notifications) subscribe() (out <-chan change, stop func(), err error) { + ch := make(chan change, notificationBufferSize) + id, err := uuid.NewV4() + if err != nil { + return nil, nil, err + } + + n.mutex.Lock() + n.subscribers[id] = ch + n.mutex.Unlock() + + stop = func() { + n.mutex.Lock() + // we must block till the end of the function so `shutdown` + // does not try to close closed channels and panics + defer n.mutex.Unlock() + + // `shutdown` could shut down the notifications before this is executed + _, ok := n.subscribers[id] + if ok { + delete(n.subscribers, id) + close(ch) + // drain the channel's buffer + for range ch { + } + } + } + + return ch, stop, nil +} + +// notify notifies all the subscribers. +// This function should be used asynchronously. +// If one of subscriber channel's buffer is exhausted it might block +// until the subscriber reads the previous notification or unsubscribes. +func (n *notifications) notify(ctx context.Context, value change) { + n.mutex.Lock() + defer n.mutex.Unlock() + + for _, ch := range n.subscribers { + select { + case <-ctx.Done(): + return + case ch <- value: + continue + } + } +} + +// shutdown closes all the subscriber channels and drains them. +func (n *notifications) shutdown() { + n.mutex.Lock() + defer n.mutex.Unlock() + + for _, ch := range n.subscribers { + close(ch) + // drain the channel buffer + for range ch { + } + } + n.subscribers = make(map[uuid.UUID]chan change) +} diff --git a/server/run.go b/server/run.go index 2f97827..e46d057 100644 --- a/server/run.go +++ b/server/run.go @@ -12,6 +12,7 @@ import ( "os/signal" "sync" "syscall" + "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -104,9 +105,11 @@ func (c *clientHandler) Run(cfg config.ShipperConfig, unit *client.Unit) error { opts = []grpc.ServerOption{grpc.Creds(creds)} } grpcServer := grpc.NewServer(opts...) - r := shipperServer{ - logger: log, - queue: queue, + r, err := NewShipperServer(queue, ShipperServerConfig{ + PollingInterval: 100 * time.Millisecond, // TODO make proper configuration + }) + if err != nil { + return fmt.Errorf("failed to initialise the server: %w", err) } pb.RegisterProducerServer(grpcServer, r) diff --git a/server/server.go b/server/server.go index c402f92..7bd30e8 100644 --- a/server/server.go +++ b/server/server.go @@ -6,53 +6,299 @@ package server import ( "context" + "errors" "fmt" - - pbts "google.golang.org/protobuf/types/known/timestamppb" + "io" + "sync" + "sync/atomic" + "time" "github.com/elastic/elastic-agent-libs/logp" pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" "github.com/elastic/elastic-agent-shipper/queue" + + "github.com/gofrs/uuid" ) +// ShipperServer contains all the gRPC operations for the shipper endpoints. +type ShipperServer interface { + pb.ProducerServer + io.Closer +} + type shipperServer struct { logger *logp.Logger + queue *queue.Queue + cfg ShipperServerConfig + + uuid string + acceptedIndex int64 + persistedIndex int64 - queue *queue.Queue + polling polling + notifications notifications + + close *sync.Once pb.UnimplementedProducerServer } -// PublishEvents is the server implementation of the gRPC PublishEvents call -func (serv shipperServer) PublishEvents(_ context.Context, req *messages.PublishRequest) (*messages.PublishReply, error) { - results := []*messages.EventResult{} - for _, evt := range req.Events { - serv.logger.Infof("Got event %s: %#v", evt.EventId, evt.Fields.Data) - err := serv.queue.Publish(evt) - if err != nil { - // If we couldn't accept any events, return the error directly. Otherwise, - // just return success on however many events we were able to handle. - if len(results) == 0 { - return nil, err +type polling struct { + ctx context.Context + stop func() + stopped chan struct{} +} + +// NewShipperServer creates a new server instance for handling gRPC endpoints. +func NewShipperServer(q *queue.Queue, cfg ShipperServerConfig) (ShipperServer, error) { + if q == nil { + return nil, errors.New("queue cannot be nil") + } + + err := cfg.Validate() + if err != nil { + return nil, fmt.Errorf("invalid configuration: %w", err) + } + + id, err := uuid.NewV4() + if err != nil { + return nil, err + } + + s := shipperServer{ + uuid: id.String(), + logger: logp.NewLogger("shipper-server"), + queue: q, + cfg: cfg, + polling: polling{ + stopped: make(chan struct{}), + }, + notifications: notifications{ + subscribers: make(map[uuid.UUID]chan change), + mutex: &sync.Mutex{}, + }, + close: &sync.Once{}, + } + + s.polling.ctx, s.polling.stop = context.WithCancel(context.Background()) + s.startPolling() + + return &s, nil +} + +// GetAcceptedIndex atomically reads the accepted index +func (serv *shipperServer) GetAcceptedIndex() int64 { + return atomic.LoadInt64(&serv.acceptedIndex) +} + +// GetPersistedIndex atomically reads the persisted index +func (serv *shipperServer) GetPersistedIndex() int64 { + return atomic.LoadInt64(&serv.persistedIndex) +} + +// PublishEvents is the server implementation of the gRPC PublishEvents call. +func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.PublishRequest) (*messages.PublishReply, error) { + resp := &messages.PublishReply{ + Uuid: serv.uuid, + } + + // the value in the request is optional + if req.Uuid != "" && req.Uuid != serv.uuid { + resp.AcceptedIndex = serv.GetAcceptedIndex() + resp.PersistedIndex = serv.GetPersistedIndex() + serv.logger. + With( + "expected", serv.uuid, + "actual", req.Uuid, + ). + Debugf("shipper UUID does not match, all events rejected") + + return resp, nil + } + + for _, e := range req.Events { + err := serv.queue.Publish(e) + if err == nil { + resp.AcceptedIndex++ + continue + } + + log := serv.logger. + With( + "event_count", len(req.Events), + "accepted_count", resp.AcceptedCount, + ) + + if errors.Is(err, queue.ErrQueueIsFull) { + log.Warnf("queue is full, not all events accepted") + } else { + err = fmt.Errorf("failed to enqueue an event: %w", err) + serv.logger.Error(err) + } + + break + } + + // it is cheaper to check than always using atomic + if resp.AcceptedCount > 0 { + atomic.AddInt64(&serv.acceptedIndex, int64(resp.AcceptedCount)) + } + resp.AcceptedIndex = serv.GetAcceptedIndex() + resp.PersistedIndex = serv.GetPersistedIndex() + + serv.logger. + With( + "event_count", len(req.Events), + "accepted_count", resp.AcceptedCount, + "accepted_index", resp.AcceptedIndex, + "persisted_index", resp.PersistedIndex, + ). + Debugf("finished publishing a batch") + + return resp, nil +} + +// PublishEvents is the server implementation of the gRPC PersistedIndex call. +func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error { + // first we reply with current values and then subscribe to notifications and start streaming + err := producer.Send(&messages.PersistedIndexReply{ + Uuid: serv.uuid, + PersistedIndex: serv.GetPersistedIndex(), + }) + if err != nil { + return err + } + + serv.logger.Debug("subscribing client for persisted index change notification...") + ch, stop, err := serv.notifications.subscribe() + if err != nil { + return err + } + + serv.logger.Debug("client subscribed for persisted index change notifications") + // defer works in a LIFO order + defer serv.logger.Debug("client unsubscribed from persisted index change notifications") + defer stop() + + for { + select { + + case change := <-ch: + if change.persistedIndex == nil { + continue + } + + err := producer.Send(&messages.PersistedIndexReply{ + Uuid: serv.uuid, + PersistedIndex: *change.persistedIndex, + }) + if err != nil { + return fmt.Errorf("failed to send the update: %w", err) } - break + + case <-producer.Context().Done(): + return fmt.Errorf("producer context: %w", producer.Context().Err()) + + case <-serv.polling.ctx.Done(): + return fmt.Errorf("server context: %w", serv.polling.ctx.Err()) } - res := messages.EventResult{EventId: evt.EventId, Timestamp: pbts.Now()} - results = append(results, &res) } - return &messages.PublishReply{Results: results}, nil } -// StreamAcknowledgements is the server implementation of the gRPC StreamAcknowledgements call -func (serv shipperServer) StreamAcknowledgements(streamReq *messages.StreamAcksRequest, prd pb.Producer_StreamAcknowledgementsServer) error { +// Close implements the Closer interface +func (serv *shipperServer) Close() error { + serv.close.Do(func() { + // polling must be stopped first, otherwise it would try to write + // a notification to a closed channel and this would cause a panic + serv.polling.stop() + <-serv.polling.stopped + + serv.logger.Debug("shutting down all notifications...") + serv.notifications.shutdown() + serv.logger.Debug("all notifications have been shut down") + }) + + return nil +} + +func (serv *shipperServer) startPolling() { + go func() { + ticker := time.NewTicker(serv.cfg.PollingInterval) + defer ticker.Stop() - // we have no outputs now, so just send a single dummy event - evt := messages.StreamAcksReply{Acks: []*messages.Acknowledgement{{Timestamp: pbts.Now(), EventId: streamReq.Source.GetInputId()}}} - err := prd.Send(&evt) + for { + select { + case <-serv.polling.ctx.Done(): + err := serv.polling.ctx.Err() + if err != nil && errors.Is(err, context.Canceled) { + serv.logger.Error(err) + } + close(serv.polling.stopped) // signaling back to `Close` + return + + case <-ticker.C: + serv.logger.Debug("updating indices...") + err := serv.updateIndices(serv.polling.ctx) + if err != nil { + serv.logger.Errorf("failed to update indices: %s", err) + } else { + serv.logger.Debug("successfully updated indices.") + } + } + } + }() +} + +// updateIndices updates in-memory indices and notifies subscribers if necessary. +// TODO: this is a temporary implementation until the queue supports the `persisted_index`. +func (serv *shipperServer) updateIndices(ctx context.Context) error { + + // TODO: for now we calculate an approximate value + // we cannot rely on this value and needs to be changed when the queue is ready. + metrics, err := serv.queue.Metrics() if err != nil { - return fmt.Errorf("error in StreamAcknowledgements: %w", err) + return fmt.Errorf("failed to fetch queue metrics: %w", err) } + + log := serv.logger + c := change{} + + if metrics.UnackedConsumedEvents.Exists() { + oldPersistedIndex := serv.GetPersistedIndex() + persistedIndex := serv.GetAcceptedIndex() - int64(metrics.UnackedConsumedEvents.ValueOr(0)) + + if persistedIndex != oldPersistedIndex { + atomic.SwapInt64(&serv.persistedIndex, persistedIndex) + // register the change + c.persistedIndex = &persistedIndex + } + log = log.With("persisted_index", persistedIndex) + } + + log.Debug("indices have been updated") + + if c.Any() { + log := serv.logger.With( + "persisted_index", pint64String(c.persistedIndex), + "subscribers_count", len(serv.notifications.subscribers), + ) + + // this must be async because the loop in `notifyChange` can block on a receiver channel + go func() { + log.Debug("notifying about change...") + serv.notifications.notify(ctx, c) + log.Debug("finished notifying about change") + }() + } + return nil } + +func pint64String(i *int64) string { + if i == nil { + return "nil" + } + return fmt.Sprintf("%d", *i) +} From 8665574ea664eb4ce019a1085909db85125eb074 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Wed, 13 Jul 2022 17:57:29 +0200 Subject: [PATCH 02/18] Address comments * Add server.Close to the signal processing * Demote the queue full message to the debug level * Add a channel closed channel for increased safety --- server/run.go | 5 +++-- server/server.go | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/run.go b/server/run.go index e46d057..69fdc93 100644 --- a/server/run.go +++ b/server/run.go @@ -105,13 +105,13 @@ func (c *clientHandler) Run(cfg config.ShipperConfig, unit *client.Unit) error { opts = []grpc.ServerOption{grpc.Creds(creds)} } grpcServer := grpc.NewServer(opts...) - r, err := NewShipperServer(queue, ShipperServerConfig{ + shipperServer, err := NewShipperServer(queue, ShipperServerConfig{ PollingInterval: 100 * time.Millisecond, // TODO make proper configuration }) if err != nil { return fmt.Errorf("failed to initialise the server: %w", err) } - pb.RegisterProducerServer(grpcServer, r) + pb.RegisterProducerServer(grpcServer, shipperServer) shutdownFunc := func() { grpcServer.GracefulStop() @@ -121,6 +121,7 @@ func (c *clientHandler) Run(cfg config.ShipperConfig, unit *client.Unit) error { // We call Wait to give it a chance to finish with events // it has already read. out.Wait() + shipperServer.Close() } handleShutdown(shutdownFunc, c.shutdownInit) log.Debugf("gRPC server is listening on port %d", cfg.Port) diff --git a/server/server.go b/server/server.go index 7bd30e8..a5d61f5 100644 --- a/server/server.go +++ b/server/server.go @@ -131,7 +131,7 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis ) if errors.Is(err, queue.ErrQueueIsFull) { - log.Warnf("queue is full, not all events accepted") + log.Debugf("queue is full, not all events accepted") } else { err = fmt.Errorf("failed to enqueue an event: %w", err) serv.logger.Error(err) @@ -184,8 +184,8 @@ func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, p for { select { - case change := <-ch: - if change.persistedIndex == nil { + case change, closed := <-ch: + if closed || change.persistedIndex == nil { continue } From d33a0656fc641428a4185ddb0d1e1a31678ebdb9 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Wed, 13 Jul 2022 17:59:17 +0200 Subject: [PATCH 03/18] Update go.mod --- NOTICE.txt | 138 ++++++++++++++++++++++++++--------------------------- go.mod | 4 +- 2 files changed, 71 insertions(+), 71 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 57fdd6c..a416bb4 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -478,11 +478,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-shipper-client -Version: v0.1.0 +Version: v0.2.0 Licence type (autodetected): Elastic -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.1.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.2.0/LICENSE.txt: Elastic License 2.0 @@ -790,6 +790,36 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-ucfg@v0.8.5/ limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/gofrs/uuid +Version: v4.2.0+incompatible +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/gofrs/uuid@v4.2.0+incompatible/LICENSE: + +Copyright (C) 2013-2018 by Maxim Bublis + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + -------------------------------------------------------------------------------- Dependency : github.com/magefile/mage Version: v1.13.0 @@ -1677,43 +1707,6 @@ Contents of probable licence file $GOMODCACHE/google.golang.org/grpc@v1.47.0/LIC limitations under the License. --------------------------------------------------------------------------------- -Dependency : google.golang.org/protobuf -Version: v1.28.0 -Licence type (autodetected): BSD-3-Clause --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/google.golang.org/protobuf@v1.28.0/LICENSE: - -Copyright (c) 2018 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - ================================================================================ @@ -18399,36 +18392,6 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. --------------------------------------------------------------------------------- -Dependency : github.com/gofrs/uuid -Version: v4.2.0+incompatible -Licence type (autodetected): MIT --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/gofrs/uuid@v4.2.0+incompatible/LICENSE: - -Copyright (C) 2013-2018 by Maxim Bublis - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - - -------------------------------------------------------------------------------- Dependency : github.com/gogo/protobuf Version: v1.3.2 @@ -42635,6 +42598,43 @@ Contents of probable licence file $GOMODCACHE/google.golang.org/grpc/cmd/protoc- limitations under the License. +-------------------------------------------------------------------------------- +Dependency : google.golang.org/protobuf +Version: v1.28.0 +Licence type (autodetected): BSD-3-Clause +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/google.golang.org/protobuf@v1.28.0/LICENSE: + +Copyright (c) 2018 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + -------------------------------------------------------------------------------- Dependency : gopkg.in/alecthomas/kingpin.v2 Version: v2.2.6 diff --git a/go.mod b/go.mod index db38d16..3e76414 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/spf13/cobra v1.3.0 google.golang.org/genproto v0.0.0-20220602131408-e326c6e8e9c8 // indirect google.golang.org/grpc v1.47.0 - google.golang.org/protobuf v1.28.0 + google.golang.org/protobuf v1.28.0 // indirect ) require ( @@ -15,6 +15,7 @@ require ( github.com/elastic/elastic-agent-client/v7 v7.0.0-20220607160924-1a71765a8bbe github.com/elastic/elastic-agent-shipper-client v0.2.0 github.com/elastic/go-ucfg v0.8.5 + github.com/gofrs/uuid v4.2.0+incompatible github.com/magefile/mage v1.13.0 github.com/stretchr/testify v1.7.0 go.elastic.co/go-licence-detector v0.5.0 @@ -31,7 +32,6 @@ require ( github.com/elastic/go-windows v1.0.1 // indirect github.com/fatih/color v1.13.0 // indirect github.com/gobuffalo/here v0.6.0 // indirect - github.com/gofrs/uuid v4.2.0+incompatible // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/licenseclassifier v0.0.0-20200402202327-879cb1424de0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect From 91efec6aeaf6c29c5624db96b05fd42836fc5b81 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 14 Jul 2022 12:48:17 +0200 Subject: [PATCH 04/18] Make changes according to the possible queue and API changes --- go.mod | 2 ++ go.sum | 2 -- queue/queue.go | 8 ++++++++ server/notifications.go | 2 +- server/server.go | 43 ++++++++++++++--------------------------- 5 files changed, 25 insertions(+), 32 deletions(-) diff --git a/go.mod b/go.mod index 3e76414..2e50a22 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,8 @@ require ( google.golang.org/protobuf v1.28.0 // indirect ) +replace github.com/elastic/elastic-agent-shipper-client => /Users/rdner/Projects/elastic-agent-shipper-client + require ( github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220603155004-ac7e079a9403 github.com/elastic/elastic-agent-client/v7 v7.0.0-20220607160924-1a71765a8bbe diff --git a/go.sum b/go.sum index f78d2e8..36706b0 100644 --- a/go.sum +++ b/go.sum @@ -465,8 +465,6 @@ github.com/elastic/elastic-agent-libs v0.2.2/go.mod h1:1xDLBhIqBIjhJ7lr2s+xRFFkQ github.com/elastic/elastic-agent-libs v0.2.5/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= github.com/elastic/elastic-agent-libs v0.2.7 h1:JvjEJVyN7ERr2uDVs2jV+tXJaJUgFAjiqUN15EBWMJI= github.com/elastic/elastic-agent-libs v0.2.7/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= -github.com/elastic/elastic-agent-shipper-client v0.2.0 h1:p+5ep48YCOe+3nICeWmiLwQV11yDLad2n4NunI66Shg= -github.com/elastic/elastic-agent-shipper-client v0.2.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= github.com/elastic/elastic-agent-system-metrics v0.3.1/go.mod h1:RIYhJOS7mUeyIthfOSqmmbEILYSzaDWLi5zQ70bQo+o= github.com/elastic/go-concert v0.2.0/go.mod h1:HWjpO3IAEJUxOeaJOWXWEp7imKd27foxz9V5vegC/38= github.com/elastic/go-libaudit/v2 v2.3.1-0.20220523121157-87f0a814a1c0/go.mod h1:GOkMRbzKV7ePrMOy+k6gGF0QvulQ16Cr38b60oirv8U= diff --git a/queue/queue.go b/queue/queue.go index d1a7d87..1f49632 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -70,6 +70,14 @@ func (queue *Queue) Get(eventCount int) (beatsqueue.Batch, error) { return queue.eventQueue.Get(eventCount) } +func (queue *Queue) AcceptedIndex() uint64 { + return 0 +} + +func (queue *Queue) PersistedIndex() uint64 { + return 0 +} + func (queue *Queue) Close() { queue.eventQueue.Close() } diff --git a/server/notifications.go b/server/notifications.go index b8ebc2e..cd370f9 100644 --- a/server/notifications.go +++ b/server/notifications.go @@ -18,7 +18,7 @@ const ( type change struct { // persistedIndex is a changed persisted index or `nil` if the value has not changed. - persistedIndex *int64 + persistedIndex *uint64 } // Any returns true if there is a change. diff --git a/server/server.go b/server/server.go index a5d61f5..007ac3d 100644 --- a/server/server.go +++ b/server/server.go @@ -33,8 +33,7 @@ type shipperServer struct { cfg ShipperServerConfig uuid string - acceptedIndex int64 - persistedIndex int64 + persistedIndex uint64 polling polling notifications notifications @@ -88,13 +87,13 @@ func NewShipperServer(q *queue.Queue, cfg ShipperServerConfig) (ShipperServer, e } // GetAcceptedIndex atomically reads the accepted index -func (serv *shipperServer) GetAcceptedIndex() int64 { - return atomic.LoadInt64(&serv.acceptedIndex) +func (serv *shipperServer) GetAcceptedIndex() uint64 { + return serv.queue.AcceptedIndex() } // GetPersistedIndex atomically reads the persisted index -func (serv *shipperServer) GetPersistedIndex() int64 { - return atomic.LoadInt64(&serv.persistedIndex) +func (serv *shipperServer) GetPersistedIndex() uint64 { + return atomic.LoadUint64(&serv.persistedIndex) } // PublishEvents is the server implementation of the gRPC PublishEvents call. @@ -140,10 +139,6 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis break } - // it is cheaper to check than always using atomic - if resp.AcceptedCount > 0 { - atomic.AddInt64(&serv.acceptedIndex, int64(resp.AcceptedCount)) - } resp.AcceptedIndex = serv.GetAcceptedIndex() resp.PersistedIndex = serv.GetPersistedIndex() @@ -254,34 +249,24 @@ func (serv *shipperServer) startPolling() { // updateIndices updates in-memory indices and notifies subscribers if necessary. // TODO: this is a temporary implementation until the queue supports the `persisted_index`. func (serv *shipperServer) updateIndices(ctx context.Context) error { - - // TODO: for now we calculate an approximate value - // we cannot rely on this value and needs to be changed when the queue is ready. - metrics, err := serv.queue.Metrics() - if err != nil { - return fmt.Errorf("failed to fetch queue metrics: %w", err) - } - log := serv.logger c := change{} - if metrics.UnackedConsumedEvents.Exists() { - oldPersistedIndex := serv.GetPersistedIndex() - persistedIndex := serv.GetAcceptedIndex() - int64(metrics.UnackedConsumedEvents.ValueOr(0)) + oldPersistedIndex := serv.GetPersistedIndex() + persistedIndex := serv.queue.PersistedIndex() - if persistedIndex != oldPersistedIndex { - atomic.SwapInt64(&serv.persistedIndex, persistedIndex) - // register the change - c.persistedIndex = &persistedIndex - } - log = log.With("persisted_index", persistedIndex) + if persistedIndex != oldPersistedIndex { + atomic.StoreUint64(&serv.persistedIndex, persistedIndex) + // register the change + c.persistedIndex = &persistedIndex } + log = log.With("persisted_index", persistedIndex) log.Debug("indices have been updated") if c.Any() { log := serv.logger.With( - "persisted_index", pint64String(c.persistedIndex), + "persisted_index", puint64String(c.persistedIndex), "subscribers_count", len(serv.notifications.subscribers), ) @@ -296,7 +281,7 @@ func (serv *shipperServer) updateIndices(ctx context.Context) error { return nil } -func pint64String(i *int64) string { +func puint64String(i *uint64) string { if i == nil { return "nil" } From 2dc4e1f3cd78f4e83f5b260dc84af98b88663b60 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 14 Jul 2022 16:05:46 +0200 Subject: [PATCH 05/18] Update client --- NOTICE.txt | 4 ++-- go.mod | 4 +--- go.sum | 2 ++ 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index a416bb4..3c5634e 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -478,11 +478,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-shipper-client -Version: v0.2.0 +Version: v0.3.0 Licence type (autodetected): Elastic -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.2.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.3.0/LICENSE.txt: Elastic License 2.0 diff --git a/go.mod b/go.mod index 2e50a22..5ac3d44 100644 --- a/go.mod +++ b/go.mod @@ -10,12 +10,10 @@ require ( google.golang.org/protobuf v1.28.0 // indirect ) -replace github.com/elastic/elastic-agent-shipper-client => /Users/rdner/Projects/elastic-agent-shipper-client - require ( github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220603155004-ac7e079a9403 github.com/elastic/elastic-agent-client/v7 v7.0.0-20220607160924-1a71765a8bbe - github.com/elastic/elastic-agent-shipper-client v0.2.0 + github.com/elastic/elastic-agent-shipper-client v0.3.0 github.com/elastic/go-ucfg v0.8.5 github.com/gofrs/uuid v4.2.0+incompatible github.com/magefile/mage v1.13.0 diff --git a/go.sum b/go.sum index 36706b0..d9af9df 100644 --- a/go.sum +++ b/go.sum @@ -465,6 +465,8 @@ github.com/elastic/elastic-agent-libs v0.2.2/go.mod h1:1xDLBhIqBIjhJ7lr2s+xRFFkQ github.com/elastic/elastic-agent-libs v0.2.5/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= github.com/elastic/elastic-agent-libs v0.2.7 h1:JvjEJVyN7ERr2uDVs2jV+tXJaJUgFAjiqUN15EBWMJI= github.com/elastic/elastic-agent-libs v0.2.7/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= +github.com/elastic/elastic-agent-shipper-client v0.3.0 h1:Ec/r08WLB6tfR95lW9JUTCUYD+HcqQdK7MR7n+0ax4s= +github.com/elastic/elastic-agent-shipper-client v0.3.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= github.com/elastic/elastic-agent-system-metrics v0.3.1/go.mod h1:RIYhJOS7mUeyIthfOSqmmbEILYSzaDWLi5zQ70bQo+o= github.com/elastic/go-concert v0.2.0/go.mod h1:HWjpO3IAEJUxOeaJOWXWEp7imKd27foxz9V5vegC/38= github.com/elastic/go-libaudit/v2 v2.3.1-0.20220523121157-87f0a814a1c0/go.mod h1:GOkMRbzKV7ePrMOy+k6gGF0QvulQ16Cr38b60oirv8U= From ad10b7ffa685f55c91fc8a358111ec87e70983cc Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 14 Jul 2022 16:25:44 +0200 Subject: [PATCH 06/18] Adapt to the new queue API --- queue/queue.go | 8 -------- server/server.go | 8 ++++---- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index ffe9429..78c79ec 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -75,14 +75,6 @@ func (queue *Queue) Get(eventCount int) (beatsqueue.Batch, error) { return queue.eventQueue.Get(eventCount) } -func (queue *Queue) AcceptedIndex() uint64 { - return 0 -} - -func (queue *Queue) PersistedIndex() uint64 { - return 0 -} - func (queue *Queue) Close() { queue.eventQueue.Close() } diff --git a/server/server.go b/server/server.go index 007ac3d..ccaf41e 100644 --- a/server/server.go +++ b/server/server.go @@ -88,7 +88,7 @@ func NewShipperServer(q *queue.Queue, cfg ShipperServerConfig) (ShipperServer, e // GetAcceptedIndex atomically reads the accepted index func (serv *shipperServer) GetAcceptedIndex() uint64 { - return serv.queue.AcceptedIndex() + return uint64(serv.queue.AcceptedIndex()) } // GetPersistedIndex atomically reads the persisted index @@ -117,9 +117,9 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis } for _, e := range req.Events { - err := serv.queue.Publish(e) + _, err := serv.queue.Publish(e) if err == nil { - resp.AcceptedIndex++ + resp.AcceptedCount++ continue } @@ -253,7 +253,7 @@ func (serv *shipperServer) updateIndices(ctx context.Context) error { c := change{} oldPersistedIndex := serv.GetPersistedIndex() - persistedIndex := serv.queue.PersistedIndex() + persistedIndex := uint64(serv.queue.PersistedIndex()) if persistedIndex != oldPersistedIndex { atomic.StoreUint64(&serv.persistedIndex, persistedIndex) From 69945174ccf71f0c874fdb559d64cd43e1388f72 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 14 Jul 2022 17:19:19 +0200 Subject: [PATCH 07/18] Add tests --- queue/queue.go | 4 +- server/server.go | 78 +++++----- server/server_test.go | 352 +++++++++++++++++++++++++++++++----------- 3 files changed, 304 insertions(+), 130 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index 78c79ec..680df67 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -75,8 +75,8 @@ func (queue *Queue) Get(eventCount int) (beatsqueue.Batch, error) { return queue.eventQueue.Get(eventCount) } -func (queue *Queue) Close() { - queue.eventQueue.Close() +func (queue *Queue) Close() error { + return queue.eventQueue.Close() } func (queue *Queue) AcceptedIndex() EntryID { diff --git a/server/server.go b/server/server.go index ccaf41e..80fab54 100644 --- a/server/server.go +++ b/server/server.go @@ -21,6 +21,17 @@ import ( "github.com/gofrs/uuid" ) +type Publisher interface { + io.Closer + + // AcceptedIndex returns the current sequential index of the accepted events + AcceptedIndex() queue.EntryID + // AcceptedIndex returns the current sequential index of the persisted events + PersistedIndex() queue.EntryID + // Publish publishes the given event and returns the current accepted index (after this event) + Publish(*messages.Event) (queue.EntryID, error) +} + // ShipperServer contains all the gRPC operations for the shipper endpoints. type ShipperServer interface { pb.ProducerServer @@ -28,9 +39,9 @@ type ShipperServer interface { } type shipperServer struct { - logger *logp.Logger - queue *queue.Queue - cfg ShipperServerConfig + logger *logp.Logger + publisher Publisher + cfg ShipperServerConfig uuid string persistedIndex uint64 @@ -50,9 +61,9 @@ type polling struct { } // NewShipperServer creates a new server instance for handling gRPC endpoints. -func NewShipperServer(q *queue.Queue, cfg ShipperServerConfig) (ShipperServer, error) { - if q == nil { - return nil, errors.New("queue cannot be nil") +func NewShipperServer(publisher Publisher, cfg ShipperServerConfig) (ShipperServer, error) { + if publisher == nil { + return nil, errors.New("publisher cannot be nil") } err := cfg.Validate() @@ -66,10 +77,10 @@ func NewShipperServer(q *queue.Queue, cfg ShipperServerConfig) (ShipperServer, e } s := shipperServer{ - uuid: id.String(), - logger: logp.NewLogger("shipper-server"), - queue: q, - cfg: cfg, + uuid: id.String(), + logger: logp.NewLogger("shipper-server"), + publisher: publisher, + cfg: cfg, polling: polling{ stopped: make(chan struct{}), }, @@ -88,7 +99,7 @@ func NewShipperServer(q *queue.Queue, cfg ShipperServerConfig) (ShipperServer, e // GetAcceptedIndex atomically reads the accepted index func (serv *shipperServer) GetAcceptedIndex() uint64 { - return uint64(serv.queue.AcceptedIndex()) + return uint64(serv.publisher.AcceptedIndex()) } // GetPersistedIndex atomically reads the persisted index @@ -117,7 +128,7 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis } for _, e := range req.Events { - _, err := serv.queue.Publish(e) + _, err := serv.publisher.Publish(e) if err == nil { resp.AcceptedCount++ continue @@ -156,15 +167,6 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis // PublishEvents is the server implementation of the gRPC PersistedIndex call. func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error { - // first we reply with current values and then subscribe to notifications and start streaming - err := producer.Send(&messages.PersistedIndexReply{ - Uuid: serv.uuid, - PersistedIndex: serv.GetPersistedIndex(), - }) - if err != nil { - return err - } - serv.logger.Debug("subscribing client for persisted index change notification...") ch, stop, err := serv.notifications.subscribe() if err != nil { @@ -176,11 +178,21 @@ func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, p defer serv.logger.Debug("client unsubscribed from persisted index change notifications") defer stop() + // before reading notification we send the current values + // in case the notification would not come in a long time + err = producer.Send(&messages.PersistedIndexReply{ + Uuid: serv.uuid, + PersistedIndex: serv.GetPersistedIndex(), + }) + if err != nil { + return err + } + for { select { - case change, closed := <-ch: - if closed || change.persistedIndex == nil { + case change, open := <-ch: + if !open || change.persistedIndex == nil { continue } @@ -196,7 +208,7 @@ func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, p return fmt.Errorf("producer context: %w", producer.Context().Err()) case <-serv.polling.ctx.Done(): - return fmt.Errorf("server context: %w", serv.polling.ctx.Err()) + return fmt.Errorf("server is stopped: %w", serv.polling.ctx.Err()) } } } @@ -253,20 +265,19 @@ func (serv *shipperServer) updateIndices(ctx context.Context) error { c := change{} oldPersistedIndex := serv.GetPersistedIndex() - persistedIndex := uint64(serv.queue.PersistedIndex()) + persistedIndex := uint64(serv.publisher.PersistedIndex()) if persistedIndex != oldPersistedIndex { atomic.StoreUint64(&serv.persistedIndex, persistedIndex) // register the change c.persistedIndex = &persistedIndex + log = log.With("persisted_index", persistedIndex) } - log = log.With("persisted_index", persistedIndex) - - log.Debug("indices have been updated") if c.Any() { - log := serv.logger.With( - "persisted_index", puint64String(c.persistedIndex), + log.Debug("indices have been updated") + + log := log.With( "subscribers_count", len(serv.notifications.subscribers), ) @@ -280,10 +291,3 @@ func (serv *shipperServer) updateIndices(ctx context.Context) error { return nil } - -func puint64String(i *uint64) string { - if i == nil { - return "nil" - } - return fmt.Sprintf("%d", *i) -} diff --git a/server/server_test.go b/server/server_test.go index 93c613e..feea422 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -6,107 +6,277 @@ package server import ( "context" - "fmt" - "sync" + "net" + "strings" "testing" "time" - "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-shipper-client/pkg/helpers" + pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" + "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" + "github.com/elastic/elastic-agent-shipper/queue" "github.com/stretchr/testify/require" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "google.golang.org/grpc/test/bufconn" + "google.golang.org/protobuf/types/known/timestamppb" ) -func TestAgentControl(t *testing.T) { - unitOneID := mock.NewID() - - token := mock.NewID() - var gotConfig, gotHealthy, gotStopped bool - - var mut sync.Mutex - - t.Logf("Creating mock server") - srv := mock.StubServerV2{ - CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { - mut.Lock() - defer mut.Unlock() - if observed.Token == token { - if len(observed.Units) > 0 { - t.Logf("Current unit state is: %v", observed.Units[0].State) - } - - // initial checkin - if len(observed.Units) == 0 || observed.Units[0].State == proto.State_STARTING { - gotConfig = true - return &proto.CheckinExpected{ - Units: []*proto.UnitExpected{ - { - Id: unitOneID, - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - Config: `{"logging": {"level": "debug"}}`, // hack to make my life easier - State: proto.State_HEALTHY, - }, - }, - } - } else if observed.Units[0].State == proto.State_HEALTHY { - gotHealthy = true - //shutdown - return &proto.CheckinExpected{ - Units: []*proto.UnitExpected{ - { - Id: unitOneID, - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - Config: "{}", - State: proto.State_STOPPED, - }, - }, - } - } else if observed.Units[0].State == proto.State_STOPPED { - gotStopped = true - // remove the unit? I think? - return &proto.CheckinExpected{ - Units: nil, - } - } - - } - - //gotInvalid = true - return nil - }, - ActionImpl: func(response *proto.ActionResponse) error { +const bufSize = 1024 * 1024 // 1MB + +func TestPublish(t *testing.T) { + ctx := context.Background() + + sampleValues, err := helpers.NewStruct(map[string]interface{}{ + "string": "value", + "number": 42, + }) - return nil + require.NoError(t, err) + + e := &messages.Event{ + Timestamp: timestamppb.Now(), + Source: &messages.Source{ + InputId: "input", + StreamId: "stream", }, - ActionsChan: make(chan *mock.PerformAction, 100), - } // end of srv declaration - - require.NoError(t, srv.Start()) - defer srv.Stop() - - t.Logf("creating client") - // connect with client - validClient := client.NewV2(fmt.Sprintf(":%d", srv.Port), token, client.VersionInfo{ - Name: "program", - Version: "v1.0.0", - Meta: map[string]string{ - "key": "value", + DataStream: &messages.DataStream{ + Type: "log", + Dataset: "default", + Namespace: "default", }, - }, grpc.WithTransportCredentials(insecure.NewCredentials())) + Metadata: sampleValues, + Fields: sampleValues, + } + + publisher := &publisherMock{} + shipper, err := NewShipperServer(publisher, ShipperServerConfig{ + PollingInterval: time.Second, // we don't use the polled values in this test + }) + defer shipper.Close() + require.NoError(t, err) + client, stop := startServer(t, ctx, shipper) + defer stop() + + // get the current UUID + pirCtx, cancel := context.WithCancel(ctx) + consumer, err := client.PersistedIndex(pirCtx, &messages.PersistedIndexRequest{}) + require.NoError(t, err) + pir, err := consumer.Recv() + require.NoError(t, err) + cancel() // close the stream + + t.Run("should successfully publish a batch", func(t *testing.T) { + publisher.q = make([]*messages.Event, 0, 3) + events := []*messages.Event{e, e, e} + reply, err := client.PublishEvents(ctx, &messages.PublishRequest{ + Uuid: pir.Uuid, + Events: events, + }) + require.NoError(t, err) + require.Equal(t, uint32(len(events)), reply.AcceptedCount) + require.Equal(t, uint64(len(events)), reply.AcceptedIndex) + require.Equal(t, uint64(publisher.persistedIndex), pir.PersistedIndex) + }) + + t.Run("should grow accepted index", func(t *testing.T) { + publisher.q = make([]*messages.Event, 0, 3) + events := []*messages.Event{e} + reply, err := client.PublishEvents(ctx, &messages.PublishRequest{ + Uuid: pir.Uuid, + Events: events, + }) + require.NoError(t, err) + require.Equal(t, uint32(len(events)), reply.AcceptedCount) + require.Equal(t, uint64(1), reply.AcceptedIndex) + require.Equal(t, uint64(publisher.persistedIndex), pir.PersistedIndex) + reply, err = client.PublishEvents(ctx, &messages.PublishRequest{ + Uuid: pir.Uuid, + Events: events, + }) + require.NoError(t, err) + require.Equal(t, uint32(len(events)), reply.AcceptedCount) + require.Equal(t, uint64(2), reply.AcceptedIndex) + require.Equal(t, uint64(publisher.persistedIndex), pir.PersistedIndex) + reply, err = client.PublishEvents(ctx, &messages.PublishRequest{ + Uuid: pir.Uuid, + Events: events, + }) + require.NoError(t, err) + require.Equal(t, uint32(len(events)), reply.AcceptedCount) + require.Equal(t, uint64(3), reply.AcceptedIndex) + require.Equal(t, uint64(publisher.persistedIndex), pir.PersistedIndex) + }) + + t.Run("should return different count when queue is full", func(t *testing.T) { + publisher.q = make([]*messages.Event, 0, 1) + events := []*messages.Event{e, e, e} // 3 should not fit into the queue size 1 + reply, err := client.PublishEvents(ctx, &messages.PublishRequest{ + Uuid: pir.Uuid, + Events: events, + }) + require.NoError(t, err) + require.Equal(t, uint32(1), reply.AcceptedCount) + require.Equal(t, uint64(1), reply.AcceptedIndex) + require.Equal(t, uint64(publisher.persistedIndex), pir.PersistedIndex) + }) + + t.Run("should return count = 0 when uuid does not match", func(t *testing.T) { + publisher.q = make([]*messages.Event, 0, 3) + events := []*messages.Event{e, e, e} + reply, err := client.PublishEvents(ctx, &messages.PublishRequest{ + Uuid: "wrong", + Events: events, + }) + require.NoError(t, err) + require.Equal(t, uint32(0), reply.AcceptedCount) + require.Equal(t, uint64(0), reply.AcceptedIndex) + require.Equal(t, uint64(publisher.persistedIndex), pir.PersistedIndex) + }) +} + +func TestPersistedIndex(t *testing.T) { + ctx := context.Background() + + publisher := &publisherMock{persistedIndex: 42} + + t.Run("server should send updates to the clients", func(t *testing.T) { + shipper, err := NewShipperServer(publisher, ShipperServerConfig{ + PollingInterval: 5 * time.Millisecond, + }) + defer shipper.Close() + require.NoError(t, err) + client, stop := startServer(t, ctx, shipper) + defer stop() + + // first delivery can happen before the first index update + require.Eventually(t, func() bool { + cl := createConsumers(t, ctx, client, 5) + defer cl.stop() + return cl.assertConsumed(t, 42) // initial value in the publisher + }, 100*time.Millisecond, time.Millisecond, "clients are supposed to get the update") + + cl := createConsumers(t, ctx, client, 50) + publisher.persistedIndex = 64 + + cl.assertConsumed(t, 64) + + publisher.persistedIndex = 128 + + cl.assertConsumed(t, 128) + + cl.stop() + }) + + t.Run("server should properly shutdown", func(t *testing.T) { + shipper, err := NewShipperServer(publisher, ShipperServerConfig{ + PollingInterval: 5 * time.Millisecond, // we don't use the polled values in this test + }) + require.NoError(t, err) + client, stop := startServer(t, ctx, shipper) + defer stop() + + cl := createConsumers(t, ctx, client, 50) + publisher.persistedIndex = 64 + shipper.Close() // stopping the server + require.Eventually(t, func() bool { + return cl.assertClosedServer(t) // initial value in the publisher + }, 100*time.Millisecond, time.Millisecond, "server was supposed to shutdown") + }) +} + +func startServer(t *testing.T, ctx context.Context, shipperServer ShipperServer) (client pb.ProducerClient, stop func()) { + lis := bufconn.Listen(bufSize) + grpcServer := grpc.NewServer() - t.Logf("starting shipper controller") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - err := runController(ctx, validClient) - assert.NoError(t, err) + pb.RegisterProducerServer(grpcServer, shipperServer) + go func() { + _ = grpcServer.Serve(lis) + }() + + bufDialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) + if err != nil { + require.NoError(t, err) + } + + stop = func() { + shipperServer.Close() + conn.Close() + grpcServer.Stop() + } + + return pb.NewProducerClient(conn), stop +} + +func createConsumers(t *testing.T, ctx context.Context, client pb.ProducerClient, count int) consumerList { + ctx, cancel := context.WithCancel(ctx) + + cl := consumerList{ + stop: cancel, + consumers: make([]pb.Producer_PersistedIndexClient, 0, count), + } + for i := 0; i < 50; i++ { + consumer, err := client.PersistedIndex(ctx, &messages.PersistedIndexRequest{}) + require.NoError(t, err) + cl.consumers = append(cl.consumers, consumer) + } + + return cl +} + +type consumerList struct { + consumers []pb.Producer_PersistedIndexClient + stop func() +} + +func (l consumerList) assertConsumed(t *testing.T, value uint64) bool { + for _, c := range l.consumers { + pir, err := c.Recv() + require.NoError(t, err) + if pir.PersistedIndex != value { + return false + } + } + return true +} + +func (l consumerList) assertClosedServer(t *testing.T) bool { + for _, c := range l.consumers { + _, err := c.Recv() + if err == nil { + return false + } + + if !strings.Contains(err.Error(), "server is stopped: context canceled") { + return false + } + } + + return true +} + +type publisherMock struct { + Publisher + q []*messages.Event + persistedIndex queue.EntryID +} + +func (p *publisherMock) Publish(event *messages.Event) (queue.EntryID, error) { + if len(p.q) == cap(p.q) { + return queue.EntryID(0), queue.ErrQueueIsFull + } + + p.q = append(p.q, event) + return queue.EntryID(len(p.q)), nil +} + +func (p *publisherMock) AcceptedIndex() queue.EntryID { + return queue.EntryID(len(p.q)) +} - assert.True(t, gotConfig, "config state") - assert.True(t, gotHealthy, "healthy state") - assert.True(t, gotStopped, "stopped state") +func (p *publisherMock) PersistedIndex() queue.EntryID { + return p.persistedIndex } From 2535c38529f63c399aa927318f24423feaed8fa6 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 14 Jul 2022 19:35:33 +0200 Subject: [PATCH 08/18] Fix linter issues --- server/server_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/server/server_test.go b/server/server_test.go index feea422..33fe0f7 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -15,8 +15,10 @@ import ( pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" "github.com/elastic/elastic-agent-shipper/queue" + "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -52,7 +54,7 @@ func TestPublish(t *testing.T) { shipper, err := NewShipperServer(publisher, ShipperServerConfig{ PollingInterval: time.Second, // we don't use the polled values in this test }) - defer shipper.Close() + defer func() { _ = shipper.Close() }() require.NoError(t, err) client, stop := startServer(t, ctx, shipper) defer stop() @@ -143,7 +145,7 @@ func TestPersistedIndex(t *testing.T) { shipper, err := NewShipperServer(publisher, ShipperServerConfig{ PollingInterval: 5 * time.Millisecond, }) - defer shipper.Close() + defer func() { _ = shipper.Close() }() require.NoError(t, err) client, stop := startServer(t, ctx, shipper) defer stop() @@ -197,7 +199,12 @@ func startServer(t *testing.T, ctx context.Context, shipperServer ShipperServer) return lis.Dial() } - conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) + conn, err := grpc.DialContext( + ctx, + "bufnet", + grpc.WithContextDialer(bufDialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) if err != nil { require.NoError(t, err) } From 98630ed30fe41b2c8f4b77ee08d35d9cbb6a6655 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 14 Jul 2022 19:54:37 +0200 Subject: [PATCH 09/18] Remove outdated comment --- server/server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/server.go b/server/server.go index 80fab54..2d31b05 100644 --- a/server/server.go +++ b/server/server.go @@ -259,7 +259,6 @@ func (serv *shipperServer) startPolling() { } // updateIndices updates in-memory indices and notifies subscribers if necessary. -// TODO: this is a temporary implementation until the queue supports the `persisted_index`. func (serv *shipperServer) updateIndices(ctx context.Context) error { log := serv.logger c := change{} From a5cbf257d67d77025541c50b4bf1121154fd8209 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 14 Jul 2022 19:55:46 +0200 Subject: [PATCH 10/18] Add moved tests --- server/controller_client_test.go | 112 +++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 server/controller_client_test.go diff --git a/server/controller_client_test.go b/server/controller_client_test.go new file mode 100644 index 0000000..93c613e --- /dev/null +++ b/server/controller_client_test.go @@ -0,0 +1,112 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +func TestAgentControl(t *testing.T) { + unitOneID := mock.NewID() + + token := mock.NewID() + var gotConfig, gotHealthy, gotStopped bool + + var mut sync.Mutex + + t.Logf("Creating mock server") + srv := mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + mut.Lock() + defer mut.Unlock() + if observed.Token == token { + if len(observed.Units) > 0 { + t.Logf("Current unit state is: %v", observed.Units[0].State) + } + + // initial checkin + if len(observed.Units) == 0 || observed.Units[0].State == proto.State_STARTING { + gotConfig = true + return &proto.CheckinExpected{ + Units: []*proto.UnitExpected{ + { + Id: unitOneID, + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + Config: `{"logging": {"level": "debug"}}`, // hack to make my life easier + State: proto.State_HEALTHY, + }, + }, + } + } else if observed.Units[0].State == proto.State_HEALTHY { + gotHealthy = true + //shutdown + return &proto.CheckinExpected{ + Units: []*proto.UnitExpected{ + { + Id: unitOneID, + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + Config: "{}", + State: proto.State_STOPPED, + }, + }, + } + } else if observed.Units[0].State == proto.State_STOPPED { + gotStopped = true + // remove the unit? I think? + return &proto.CheckinExpected{ + Units: nil, + } + } + + } + + //gotInvalid = true + return nil + }, + ActionImpl: func(response *proto.ActionResponse) error { + + return nil + }, + ActionsChan: make(chan *mock.PerformAction, 100), + } // end of srv declaration + + require.NoError(t, srv.Start()) + defer srv.Stop() + + t.Logf("creating client") + // connect with client + validClient := client.NewV2(fmt.Sprintf(":%d", srv.Port), token, client.VersionInfo{ + Name: "program", + Version: "v1.0.0", + Meta: map[string]string{ + "key": "value", + }, + }, grpc.WithTransportCredentials(insecure.NewCredentials())) + + t.Logf("starting shipper controller") + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + err := runController(ctx, validClient) + assert.NoError(t, err) + + assert.True(t, gotConfig, "config state") + assert.True(t, gotHealthy, "healthy state") + assert.True(t, gotStopped, "stopped state") +} From d4239c1f02abeeb46402c26226477c6b215c2372 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 14 Jul 2022 19:59:52 +0200 Subject: [PATCH 11/18] Update dependencies --- NOTICE.txt | 74 +++++++++++++++++++++++++++--------------------------- go.mod | 2 +- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 3c5634e..f9f0487 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1707,6 +1707,43 @@ Contents of probable licence file $GOMODCACHE/google.golang.org/grpc@v1.47.0/LIC limitations under the License. +-------------------------------------------------------------------------------- +Dependency : google.golang.org/protobuf +Version: v1.28.0 +Licence type (autodetected): BSD-3-Clause +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/google.golang.org/protobuf@v1.28.0/LICENSE: + +Copyright (c) 2018 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + ================================================================================ @@ -42598,43 +42635,6 @@ Contents of probable licence file $GOMODCACHE/google.golang.org/grpc/cmd/protoc- limitations under the License. --------------------------------------------------------------------------------- -Dependency : google.golang.org/protobuf -Version: v1.28.0 -Licence type (autodetected): BSD-3-Clause --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/google.golang.org/protobuf@v1.28.0/LICENSE: - -Copyright (c) 2018 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -------------------------------------------------------------------------------- Dependency : gopkg.in/alecthomas/kingpin.v2 Version: v2.2.6 diff --git a/go.mod b/go.mod index 5ac3d44..42ee396 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/spf13/cobra v1.3.0 google.golang.org/genproto v0.0.0-20220602131408-e326c6e8e9c8 // indirect google.golang.org/grpc v1.47.0 - google.golang.org/protobuf v1.28.0 // indirect + google.golang.org/protobuf v1.28.0 ) require ( From 3444327845dc2c8425cc7452e30d90309173a125 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Mon, 18 Jul 2022 11:45:23 +0200 Subject: [PATCH 12/18] Remove using logger.With --- server/server.go | 43 +++++++++++++------------------------------ 1 file changed, 13 insertions(+), 30 deletions(-) diff --git a/server/server.go b/server/server.go index 2d31b05..914ab7c 100644 --- a/server/server.go +++ b/server/server.go @@ -117,12 +117,7 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis if req.Uuid != "" && req.Uuid != serv.uuid { resp.AcceptedIndex = serv.GetAcceptedIndex() resp.PersistedIndex = serv.GetPersistedIndex() - serv.logger. - With( - "expected", serv.uuid, - "actual", req.Uuid, - ). - Debugf("shipper UUID does not match, all events rejected") + serv.logger.Debugf("shipper UUID does not match, all events rejected. Expected = %s, actual = %s", serv.uuid, req.Uuid) return resp, nil } @@ -134,16 +129,10 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis continue } - log := serv.logger. - With( - "event_count", len(req.Events), - "accepted_count", resp.AcceptedCount, - ) - if errors.Is(err, queue.ErrQueueIsFull) { - log.Debugf("queue is full, not all events accepted") + serv.logger.Debugf("queue is full, not all events accepted. Events = %d, accepted = %d", len(req.Events), resp.AcceptedCount) } else { - err = fmt.Errorf("failed to enqueue an event: %w", err) + err = fmt.Errorf("failed to enqueue an event. Events = %d, accepted = %d: %w", len(req.Events), resp.AcceptedCount, err) serv.logger.Error(err) } @@ -154,13 +143,12 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis resp.PersistedIndex = serv.GetPersistedIndex() serv.logger. - With( - "event_count", len(req.Events), - "accepted_count", resp.AcceptedCount, - "accepted_index", resp.AcceptedIndex, - "persisted_index", resp.PersistedIndex, - ). - Debugf("finished publishing a batch") + Debugf("finished publishing a batch. Events = %d, accepted = %d, accepted index = %d, persisted index = %d", + len(req.Events), + resp.AcceptedCount, + resp.AcceptedIndex, + resp.PersistedIndex, + ) return resp, nil } @@ -260,7 +248,6 @@ func (serv *shipperServer) startPolling() { // updateIndices updates in-memory indices and notifies subscribers if necessary. func (serv *shipperServer) updateIndices(ctx context.Context) error { - log := serv.logger c := change{} oldPersistedIndex := serv.GetPersistedIndex() @@ -270,21 +257,17 @@ func (serv *shipperServer) updateIndices(ctx context.Context) error { atomic.StoreUint64(&serv.persistedIndex, persistedIndex) // register the change c.persistedIndex = &persistedIndex - log = log.With("persisted_index", persistedIndex) + serv.logger.Debugf("new persisted index %d received", persistedIndex) } if c.Any() { - log.Debug("indices have been updated") - - log := log.With( - "subscribers_count", len(serv.notifications.subscribers), - ) + serv.logger.Debug("indices have been updated") // this must be async because the loop in `notifyChange` can block on a receiver channel go func() { - log.Debug("notifying about change...") + serv.logger.Debugf("notifying %d subscribers about change...", len(serv.notifications.subscribers)) serv.notifications.notify(ctx, c) - log.Debug("finished notifying about change") + serv.logger.Debug("finished notifying about change") }() } From 48d073c16d6b41c77cc2dc527b49c39217224769 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 19 Jul 2022 11:47:02 +0200 Subject: [PATCH 13/18] Switch from channels to sync.Cond --- server/notifications.go | 126 +++++++++++++++++++------------------- server/server.go | 132 +++++++++++++++++++--------------------- 2 files changed, 125 insertions(+), 133 deletions(-) diff --git a/server/notifications.go b/server/notifications.go index cd370f9..95f9e8a 100644 --- a/server/notifications.go +++ b/server/notifications.go @@ -5,91 +5,87 @@ package server import ( - "context" "sync" - - "github.com/gofrs/uuid" -) - -const ( - // how many notifications can be in a notification buffer. - notificationBufferSize = 16 + "sync/atomic" ) -type change struct { - // persistedIndex is a changed persisted index or `nil` if the value has not changed. - persistedIndex *uint64 +// state is an extendable struct that stores +// the information subscribers need to be notified about +type state struct { + persistedIndex uint64 } -// Any returns true if there is a change. -func (c change) Any() bool { - return c.persistedIndex != nil +// Equals returns true if 2 instances of the state are equal +func (s *state) Equals(v *state) bool { + return (s == nil && v == nil) || (s != nil && v != nil && s.persistedIndex == v.persistedIndex) } type notifications struct { - subscribers map[uuid.UUID]chan change - mutex *sync.Mutex + mutex *sync.RWMutex + cond *sync.Cond + + // the whole state must be always replaced with `notify` + // the struct fields are not thread safe on their own + state *state + + // the notifications are stopped when this flag > 0 + stopped uint32 } -func (n *notifications) subscribe() (out <-chan change, stop func(), err error) { - ch := make(chan change, notificationBufferSize) - id, err := uuid.NewV4() - if err != nil { - return nil, nil, err - } +// getState return the current state or `nil` if the initial state +// is not set or if the notifications are stopped. +// This is thread-safe. +func (n *notifications) getState() *state { + n.mutex.RLock() + defer n.mutex.RUnlock() + return n.state +} - n.mutex.Lock() - n.subscribers[id] = ch - n.mutex.Unlock() +// wait returns a pointer to the current state or blocks until the +// first state change or when notifications are stopped. +func (n *notifications) wait() *state { + // before trying to lock we check the flag + // it should be less expensive + if atomic.LoadUint32(&n.stopped) != 0 { + return nil + } - stop = func() { - n.mutex.Lock() - // we must block till the end of the function so `shutdown` - // does not try to close closed channels and panics - defer n.mutex.Unlock() + n.cond.L.Lock() + defer n.cond.L.Unlock() - // `shutdown` could shut down the notifications before this is executed - _, ok := n.subscribers[id] - if ok { - delete(n.subscribers, id) - close(ch) - // drain the channel's buffer - for range ch { - } - } + // waiting until there is an initial state or everything is stopped + for n.state == nil && atomic.LoadUint32(&n.stopped) == 0 { + n.cond.Wait() } - return ch, stop, nil + return n.state } -// notify notifies all the subscribers. -// This function should be used asynchronously. -// If one of subscriber channel's buffer is exhausted it might block -// until the subscriber reads the previous notification or unsubscribes. -func (n *notifications) notify(ctx context.Context, value change) { - n.mutex.Lock() - defer n.mutex.Unlock() - - for _, ch := range n.subscribers { - select { - case <-ctx.Done(): - return - case ch <- value: - continue - } +// notify notifies all the subscribers and returns `true` +// or does not notify and returns `false` in case there is no update in the state +func (n *notifications) notify(value state) bool { + if atomic.LoadUint32(&n.stopped) != 0 { + return false } + n.cond.L.Lock() + defer n.cond.L.Unlock() + if value.Equals(n.state) { + return false + } + n.state = &value + n.cond.Broadcast() + + return true } -// shutdown closes all the subscriber channels and drains them. +// shutdown stops waiting for all the subscribers and sets the current state to `nil`. func (n *notifications) shutdown() { - n.mutex.Lock() - defer n.mutex.Unlock() - - for _, ch := range n.subscribers { - close(ch) - // drain the channel buffer - for range ch { - } + if !atomic.CompareAndSwapUint32(&n.stopped, 0, 1) { + return } - n.subscribers = make(map[uuid.UUID]chan change) + n.cond.L.Lock() + defer n.cond.L.Unlock() + n.state = nil + // trigger unblock on each `Wait` so the subscribers could check the `stopped` flag and stop waiting + n.cond.Broadcast() } diff --git a/server/server.go b/server/server.go index 914ab7c..d2e31c7 100644 --- a/server/server.go +++ b/server/server.go @@ -18,9 +18,12 @@ import ( "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" "github.com/elastic/elastic-agent-shipper/queue" + "google.golang.org/grpc/peer" + "github.com/gofrs/uuid" ) +// Publisher contains all operations required for the shipper server to publish incoming events. type Publisher interface { io.Closer @@ -43,11 +46,11 @@ type shipperServer struct { publisher Publisher cfg ShipperServerConfig - uuid string - persistedIndex uint64 + uuid string - polling polling - notifications notifications + polling polling + notifications notifications + indexSubscribers int64 close *sync.Once @@ -76,6 +79,8 @@ func NewShipperServer(publisher Publisher, cfg ShipperServerConfig) (ShipperServ return nil, err } + notificationMutex := &sync.RWMutex{} + s := shipperServer{ uuid: id.String(), logger: logp.NewLogger("shipper-server"), @@ -85,8 +90,8 @@ func NewShipperServer(publisher Publisher, cfg ShipperServerConfig) (ShipperServ stopped: make(chan struct{}), }, notifications: notifications{ - subscribers: make(map[uuid.UUID]chan change), - mutex: &sync.Mutex{}, + mutex: notificationMutex, + cond: sync.NewCond(notificationMutex), }, close: &sync.Once{}, } @@ -97,14 +102,18 @@ func NewShipperServer(publisher Publisher, cfg ShipperServerConfig) (ShipperServ return &s, nil } -// GetAcceptedIndex atomically reads the accepted index +// GetAcceptedIndex returns the accepted index func (serv *shipperServer) GetAcceptedIndex() uint64 { return uint64(serv.publisher.AcceptedIndex()) } -// GetPersistedIndex atomically reads the persisted index +// GetPersistedIndex returns the persisted index func (serv *shipperServer) GetPersistedIndex() uint64 { - return atomic.LoadUint64(&serv.persistedIndex) + s := serv.notifications.getState() + if s == nil { + return 0 + } + return s.persistedIndex } // PublishEvents is the server implementation of the gRPC PublishEvents call. @@ -155,48 +164,49 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis // PublishEvents is the server implementation of the gRPC PersistedIndex call. func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error { - serv.logger.Debug("subscribing client for persisted index change notification...") - ch, stop, err := serv.notifications.subscribe() - if err != nil { - return err - } - - serv.logger.Debug("client subscribed for persisted index change notifications") - // defer works in a LIFO order - defer serv.logger.Debug("client unsubscribed from persisted index change notifications") - defer stop() - - // before reading notification we send the current values - // in case the notification would not come in a long time - err = producer.Send(&messages.PersistedIndexReply{ - Uuid: serv.uuid, - PersistedIndex: serv.GetPersistedIndex(), - }) - if err != nil { - return err + atomic.AddInt64(&serv.indexSubscribers, 1) + defer atomic.AddInt64(&serv.indexSubscribers, -1) + + p, _ := peer.FromContext(producer.Context()) + addr := addressString(p) + serv.logger.Debugf("%s subscribed for persisted index change", addr) + defer serv.logger.Debugf("%s unsubscribed from persisted index change", addr) + + // before reading notification we send the current value + // in case the first notification does not come in a long time + persistedIndex := serv.GetPersistedIndex() + if persistedIndex != 0 { + err := producer.Send(&messages.PersistedIndexReply{ + Uuid: serv.uuid, + PersistedIndex: persistedIndex, + }) + if err != nil { + return err + } } for { select { + case <-producer.Context().Done(): + return fmt.Errorf("producer context: %w", producer.Context().Err()) - case change, open := <-ch: - if !open || change.persistedIndex == nil { + case <-serv.polling.ctx.Done(): + return fmt.Errorf("server is stopped: %w", serv.polling.ctx.Err()) + default: + state := serv.notifications.wait() + // state can be nil if the notifications are shutting down + // we don't send notifications about the same index twice + if state == nil || state.persistedIndex == persistedIndex { continue } - + persistedIndex = state.persistedIndex // store for future comparison err := producer.Send(&messages.PersistedIndexReply{ Uuid: serv.uuid, - PersistedIndex: *change.persistedIndex, + PersistedIndex: persistedIndex, }) if err != nil { return fmt.Errorf("failed to send the update: %w", err) } - - case <-producer.Context().Done(): - return fmt.Errorf("producer context: %w", producer.Context().Err()) - - case <-serv.polling.ctx.Done(): - return fmt.Errorf("server is stopped: %w", serv.polling.ctx.Err()) } } } @@ -204,8 +214,7 @@ func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, p // Close implements the Closer interface func (serv *shipperServer) Close() error { serv.close.Do(func() { - // polling must be stopped first, otherwise it would try to write - // a notification to a closed channel and this would cause a panic + // we should stop polling first to cut off the source of notifications serv.polling.stop() <-serv.polling.stopped @@ -234,42 +243,29 @@ func (serv *shipperServer) startPolling() { return case <-ticker.C: - serv.logger.Debug("updating indices...") - err := serv.updateIndices(serv.polling.ctx) - if err != nil { - serv.logger.Errorf("failed to update indices: %s", err) - } else { - serv.logger.Debug("successfully updated indices.") - } + serv.trySendNewIndex() } } }() } -// updateIndices updates in-memory indices and notifies subscribers if necessary. -func (serv *shipperServer) updateIndices(ctx context.Context) error { - c := change{} - - oldPersistedIndex := serv.GetPersistedIndex() +// trySendNewIndex gets the latest indices and notifies subscribers about the new value. +func (serv *shipperServer) trySendNewIndex() { persistedIndex := uint64(serv.publisher.PersistedIndex()) - if persistedIndex != oldPersistedIndex { - atomic.StoreUint64(&serv.persistedIndex, persistedIndex) - // register the change - c.persistedIndex = &persistedIndex - serv.logger.Debugf("new persisted index %d received", persistedIndex) + s := state{ + persistedIndex: persistedIndex, } - - if c.Any() { - serv.logger.Debug("indices have been updated") - - // this must be async because the loop in `notifyChange` can block on a receiver channel - go func() { - serv.logger.Debugf("notifying %d subscribers about change...", len(serv.notifications.subscribers)) - serv.notifications.notify(ctx, c) - serv.logger.Debug("finished notifying about change") - }() + notified := serv.notifications.notify(s) + if !notified { + return } + serv.logger.Debugf("notified %d subscribers about new persisted index %d.", atomic.LoadInt64(&serv.indexSubscribers), persistedIndex) +} - return nil +func addressString(p *peer.Peer) string { + if p == nil { + return "client" + } + return p.Addr.String() } From 7c000e24d8c743dd909c6755881a78067fe011d6 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 19 Jul 2022 13:00:11 +0200 Subject: [PATCH 14/18] Fix test --- server/server_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/server_test.go b/server/server_test.go index 33fe0f7..e458816 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -50,7 +50,9 @@ func TestPublish(t *testing.T) { Fields: sampleValues, } - publisher := &publisherMock{} + publisher := &publisherMock{ + persistedIndex: 42, + } shipper, err := NewShipperServer(publisher, ShipperServerConfig{ PollingInterval: time.Second, // we don't use the polled values in this test }) From b769383007c2e8af2e2b63f4099cf75a20c87d74 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 19 Jul 2022 13:01:16 +0200 Subject: [PATCH 15/18] Fix typo --- server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index d2e31c7..6e3d0d0 100644 --- a/server/server.go +++ b/server/server.go @@ -29,7 +29,7 @@ type Publisher interface { // AcceptedIndex returns the current sequential index of the accepted events AcceptedIndex() queue.EntryID - // AcceptedIndex returns the current sequential index of the persisted events + // PersistedIndex returns the current sequential index of the persisted events PersistedIndex() queue.EntryID // Publish publishes the given event and returns the current accepted index (after this event) Publish(*messages.Event) (queue.EntryID, error) From 8f8c7b82f4f2fbda871a864443f1fd03cfc29bc3 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Mon, 8 Aug 2022 14:55:31 +0200 Subject: [PATCH 16/18] Switch from centralised subscriptions to tickers --- go.mod | 2 +- go.sum | 4 +- server/config.go | 25 -------- server/notifications.go | 91 ---------------------------- server/run.go | 5 +- server/server.go | 131 ++++++++-------------------------------- server/server_test.go | 25 ++++---- 7 files changed, 40 insertions(+), 243 deletions(-) delete mode 100644 server/config.go delete mode 100644 server/notifications.go diff --git a/go.mod b/go.mod index 42ee396..738489a 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( require ( github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220603155004-ac7e079a9403 github.com/elastic/elastic-agent-client/v7 v7.0.0-20220607160924-1a71765a8bbe - github.com/elastic/elastic-agent-shipper-client v0.3.0 + github.com/elastic/elastic-agent-shipper-client v0.4.0 github.com/elastic/go-ucfg v0.8.5 github.com/gofrs/uuid v4.2.0+incompatible github.com/magefile/mage v1.13.0 diff --git a/go.sum b/go.sum index d9af9df..ad1fbe8 100644 --- a/go.sum +++ b/go.sum @@ -465,8 +465,8 @@ github.com/elastic/elastic-agent-libs v0.2.2/go.mod h1:1xDLBhIqBIjhJ7lr2s+xRFFkQ github.com/elastic/elastic-agent-libs v0.2.5/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= github.com/elastic/elastic-agent-libs v0.2.7 h1:JvjEJVyN7ERr2uDVs2jV+tXJaJUgFAjiqUN15EBWMJI= github.com/elastic/elastic-agent-libs v0.2.7/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= -github.com/elastic/elastic-agent-shipper-client v0.3.0 h1:Ec/r08WLB6tfR95lW9JUTCUYD+HcqQdK7MR7n+0ax4s= -github.com/elastic/elastic-agent-shipper-client v0.3.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= +github.com/elastic/elastic-agent-shipper-client v0.4.0 h1:nsTJF9oo4RHLl+zxFUZqNHaE86C6Ba5aImfegcEf6Sk= +github.com/elastic/elastic-agent-shipper-client v0.4.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= github.com/elastic/elastic-agent-system-metrics v0.3.1/go.mod h1:RIYhJOS7mUeyIthfOSqmmbEILYSzaDWLi5zQ70bQo+o= github.com/elastic/go-concert v0.2.0/go.mod h1:HWjpO3IAEJUxOeaJOWXWEp7imKd27foxz9V5vegC/38= github.com/elastic/go-libaudit/v2 v2.3.1-0.20220523121157-87f0a814a1c0/go.mod h1:GOkMRbzKV7ePrMOy+k6gGF0QvulQ16Cr38b60oirv8U= diff --git a/server/config.go b/server/config.go deleted file mode 100644 index 8febec1..0000000 --- a/server/config.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package server - -import ( - "errors" - "time" -) - -type ShipperServerConfig struct { - // PollingInterval is an interval for polling queue metrics and updating the indices. - // Must be greater than 0 - PollingInterval time.Duration -} - -// Validate validates the server configuration. -func (c ShipperServerConfig) Validate() error { - if c.PollingInterval <= 0 { - return errors.New("polling interval must be greater than 0") - } - - return nil -} diff --git a/server/notifications.go b/server/notifications.go deleted file mode 100644 index 95f9e8a..0000000 --- a/server/notifications.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package server - -import ( - "sync" - "sync/atomic" -) - -// state is an extendable struct that stores -// the information subscribers need to be notified about -type state struct { - persistedIndex uint64 -} - -// Equals returns true if 2 instances of the state are equal -func (s *state) Equals(v *state) bool { - return (s == nil && v == nil) || (s != nil && v != nil && s.persistedIndex == v.persistedIndex) -} - -type notifications struct { - mutex *sync.RWMutex - cond *sync.Cond - - // the whole state must be always replaced with `notify` - // the struct fields are not thread safe on their own - state *state - - // the notifications are stopped when this flag > 0 - stopped uint32 -} - -// getState return the current state or `nil` if the initial state -// is not set or if the notifications are stopped. -// This is thread-safe. -func (n *notifications) getState() *state { - n.mutex.RLock() - defer n.mutex.RUnlock() - return n.state -} - -// wait returns a pointer to the current state or blocks until the -// first state change or when notifications are stopped. -func (n *notifications) wait() *state { - // before trying to lock we check the flag - // it should be less expensive - if atomic.LoadUint32(&n.stopped) != 0 { - return nil - } - - n.cond.L.Lock() - defer n.cond.L.Unlock() - - // waiting until there is an initial state or everything is stopped - for n.state == nil && atomic.LoadUint32(&n.stopped) == 0 { - n.cond.Wait() - } - - return n.state -} - -// notify notifies all the subscribers and returns `true` -// or does not notify and returns `false` in case there is no update in the state -func (n *notifications) notify(value state) bool { - if atomic.LoadUint32(&n.stopped) != 0 { - return false - } - n.cond.L.Lock() - defer n.cond.L.Unlock() - if value.Equals(n.state) { - return false - } - n.state = &value - n.cond.Broadcast() - - return true -} - -// shutdown stops waiting for all the subscribers and sets the current state to `nil`. -func (n *notifications) shutdown() { - if !atomic.CompareAndSwapUint32(&n.stopped, 0, 1) { - return - } - n.cond.L.Lock() - defer n.cond.L.Unlock() - n.state = nil - // trigger unblock on each `Wait` so the subscribers could check the `stopped` flag and stop waiting - n.cond.Broadcast() -} diff --git a/server/run.go b/server/run.go index 69fdc93..6f37c65 100644 --- a/server/run.go +++ b/server/run.go @@ -12,7 +12,6 @@ import ( "os/signal" "sync" "syscall" - "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -105,9 +104,7 @@ func (c *clientHandler) Run(cfg config.ShipperConfig, unit *client.Unit) error { opts = []grpc.ServerOption{grpc.Creds(creds)} } grpcServer := grpc.NewServer(opts...) - shipperServer, err := NewShipperServer(queue, ShipperServerConfig{ - PollingInterval: 100 * time.Millisecond, // TODO make proper configuration - }) + shipperServer, err := NewShipperServer(queue) if err != nil { return fmt.Errorf("failed to initialise the server: %w", err) } diff --git a/server/server.go b/server/server.go index 6e3d0d0..fe20f24 100644 --- a/server/server.go +++ b/server/server.go @@ -10,7 +10,6 @@ import ( "fmt" "io" "sync" - "sync/atomic" "time" "github.com/elastic/elastic-agent-libs/logp" @@ -18,8 +17,6 @@ import ( "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" "github.com/elastic/elastic-agent-shipper/queue" - "google.golang.org/grpc/peer" - "github.com/gofrs/uuid" ) @@ -44,60 +41,35 @@ type ShipperServer interface { type shipperServer struct { logger *logp.Logger publisher Publisher - cfg ShipperServerConfig uuid string - polling polling - notifications notifications - indexSubscribers int64 - close *sync.Once + ctx context.Context + stop func() pb.UnimplementedProducerServer } -type polling struct { - ctx context.Context - stop func() - stopped chan struct{} -} - // NewShipperServer creates a new server instance for handling gRPC endpoints. -func NewShipperServer(publisher Publisher, cfg ShipperServerConfig) (ShipperServer, error) { +func NewShipperServer(publisher Publisher) (ShipperServer, error) { if publisher == nil { return nil, errors.New("publisher cannot be nil") } - err := cfg.Validate() - if err != nil { - return nil, fmt.Errorf("invalid configuration: %w", err) - } - id, err := uuid.NewV4() if err != nil { return nil, err } - notificationMutex := &sync.RWMutex{} - s := shipperServer{ uuid: id.String(), logger: logp.NewLogger("shipper-server"), publisher: publisher, - cfg: cfg, - polling: polling{ - stopped: make(chan struct{}), - }, - notifications: notifications{ - mutex: notificationMutex, - cond: sync.NewCond(notificationMutex), - }, - close: &sync.Once{}, + close: &sync.Once{}, } - s.polling.ctx, s.polling.stop = context.WithCancel(context.Background()) - s.startPolling() + s.ctx, s.stop = context.WithCancel(context.Background()) return &s, nil } @@ -109,11 +81,7 @@ func (serv *shipperServer) GetAcceptedIndex() uint64 { // GetPersistedIndex returns the persisted index func (serv *shipperServer) GetPersistedIndex() uint64 { - s := serv.notifications.getState() - if s == nil { - return 0 - } - return s.persistedIndex + return uint64(serv.publisher.PersistedIndex()) } // PublishEvents is the server implementation of the gRPC PublishEvents call. @@ -164,16 +132,9 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis // PublishEvents is the server implementation of the gRPC PersistedIndex call. func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error { - atomic.AddInt64(&serv.indexSubscribers, 1) - defer atomic.AddInt64(&serv.indexSubscribers, -1) - - p, _ := peer.FromContext(producer.Context()) - addr := addressString(p) - serv.logger.Debugf("%s subscribed for persisted index change", addr) - defer serv.logger.Debugf("%s unsubscribed from persisted index change", addr) + serv.logger.Debug("new subscriber for persisted index change") + defer serv.logger.Debug("unsubscribed from persisted index change") - // before reading notification we send the current value - // in case the first notification does not come in a long time persistedIndex := serv.GetPersistedIndex() if persistedIndex != 0 { err := producer.Send(&messages.PersistedIndexReply{ @@ -185,21 +146,29 @@ func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, p } } + pollingIntervalDur := req.PollingInterval.AsDuration() + + if pollingIntervalDur == 0 { + return nil + } + + ticker := time.NewTicker(pollingIntervalDur) + defer ticker.Stop() + for { select { case <-producer.Context().Done(): return fmt.Errorf("producer context: %w", producer.Context().Err()) - case <-serv.polling.ctx.Done(): - return fmt.Errorf("server is stopped: %w", serv.polling.ctx.Err()) - default: - state := serv.notifications.wait() - // state can be nil if the notifications are shutting down - // we don't send notifications about the same index twice - if state == nil || state.persistedIndex == persistedIndex { + case <-serv.ctx.Done(): + return fmt.Errorf("server is stopped: %w", serv.ctx.Err()) + + case <-ticker.C: + newPersistedIndex := serv.GetPersistedIndex() + if newPersistedIndex == persistedIndex { continue } - persistedIndex = state.persistedIndex // store for future comparison + persistedIndex = newPersistedIndex err := producer.Send(&messages.PersistedIndexReply{ Uuid: serv.uuid, PersistedIndex: persistedIndex, @@ -214,58 +183,8 @@ func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, p // Close implements the Closer interface func (serv *shipperServer) Close() error { serv.close.Do(func() { - // we should stop polling first to cut off the source of notifications - serv.polling.stop() - <-serv.polling.stopped - - serv.logger.Debug("shutting down all notifications...") - serv.notifications.shutdown() - serv.logger.Debug("all notifications have been shut down") + serv.stop() }) return nil } - -func (serv *shipperServer) startPolling() { - go func() { - ticker := time.NewTicker(serv.cfg.PollingInterval) - defer ticker.Stop() - - for { - select { - - case <-serv.polling.ctx.Done(): - err := serv.polling.ctx.Err() - if err != nil && errors.Is(err, context.Canceled) { - serv.logger.Error(err) - } - close(serv.polling.stopped) // signaling back to `Close` - return - - case <-ticker.C: - serv.trySendNewIndex() - } - } - }() -} - -// trySendNewIndex gets the latest indices and notifies subscribers about the new value. -func (serv *shipperServer) trySendNewIndex() { - persistedIndex := uint64(serv.publisher.PersistedIndex()) - - s := state{ - persistedIndex: persistedIndex, - } - notified := serv.notifications.notify(s) - if !notified { - return - } - serv.logger.Debugf("notified %d subscribers about new persisted index %d.", atomic.LoadInt64(&serv.indexSubscribers), persistedIndex) -} - -func addressString(p *peer.Peer) string { - if p == nil { - return "client" - } - return p.Addr.String() -} diff --git a/server/server_test.go b/server/server_test.go index e458816..9229232 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -53,9 +54,7 @@ func TestPublish(t *testing.T) { publisher := &publisherMock{ persistedIndex: 42, } - shipper, err := NewShipperServer(publisher, ShipperServerConfig{ - PollingInterval: time.Second, // we don't use the polled values in this test - }) + shipper, err := NewShipperServer(publisher) defer func() { _ = shipper.Close() }() require.NoError(t, err) client, stop := startServer(t, ctx, shipper) @@ -144,9 +143,7 @@ func TestPersistedIndex(t *testing.T) { publisher := &publisherMock{persistedIndex: 42} t.Run("server should send updates to the clients", func(t *testing.T) { - shipper, err := NewShipperServer(publisher, ShipperServerConfig{ - PollingInterval: 5 * time.Millisecond, - }) + shipper, err := NewShipperServer(publisher) defer func() { _ = shipper.Close() }() require.NoError(t, err) client, stop := startServer(t, ctx, shipper) @@ -154,12 +151,12 @@ func TestPersistedIndex(t *testing.T) { // first delivery can happen before the first index update require.Eventually(t, func() bool { - cl := createConsumers(t, ctx, client, 5) + cl := createConsumers(t, ctx, client, 5, 5*time.Millisecond) defer cl.stop() return cl.assertConsumed(t, 42) // initial value in the publisher }, 100*time.Millisecond, time.Millisecond, "clients are supposed to get the update") - cl := createConsumers(t, ctx, client, 50) + cl := createConsumers(t, ctx, client, 50, 5*time.Millisecond) publisher.persistedIndex = 64 cl.assertConsumed(t, 64) @@ -172,14 +169,12 @@ func TestPersistedIndex(t *testing.T) { }) t.Run("server should properly shutdown", func(t *testing.T) { - shipper, err := NewShipperServer(publisher, ShipperServerConfig{ - PollingInterval: 5 * time.Millisecond, // we don't use the polled values in this test - }) + shipper, err := NewShipperServer(publisher) require.NoError(t, err) client, stop := startServer(t, ctx, shipper) defer stop() - cl := createConsumers(t, ctx, client, 50) + cl := createConsumers(t, ctx, client, 50, 5*time.Millisecond) publisher.persistedIndex = 64 shipper.Close() // stopping the server require.Eventually(t, func() bool { @@ -220,7 +215,7 @@ func startServer(t *testing.T, ctx context.Context, shipperServer ShipperServer) return pb.NewProducerClient(conn), stop } -func createConsumers(t *testing.T, ctx context.Context, client pb.ProducerClient, count int) consumerList { +func createConsumers(t *testing.T, ctx context.Context, client pb.ProducerClient, count int, pollingInterval time.Duration) consumerList { ctx, cancel := context.WithCancel(ctx) cl := consumerList{ @@ -228,7 +223,9 @@ func createConsumers(t *testing.T, ctx context.Context, client pb.ProducerClient consumers: make([]pb.Producer_PersistedIndexClient, 0, count), } for i := 0; i < 50; i++ { - consumer, err := client.PersistedIndex(ctx, &messages.PersistedIndexRequest{}) + consumer, err := client.PersistedIndex(ctx, &messages.PersistedIndexRequest{ + PollingInterval: durationpb.New(pollingInterval), + }) require.NoError(t, err) cl.consumers = append(cl.consumers, consumer) } From 4d65122671fec178e72a72204fe6a8d0f28d835e Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 9 Aug 2022 09:04:44 +0200 Subject: [PATCH 17/18] Address comments * Return error when UUID does not match * Send even 0 value for the persisted index --- server/server.go | 18 +++++++++--------- server/server_test.go | 9 ++++----- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/server/server.go b/server/server.go index fe20f24..5f36826 100644 --- a/server/server.go +++ b/server/server.go @@ -16,6 +16,8 @@ import ( pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" "github.com/elastic/elastic-agent-shipper/queue" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/gofrs/uuid" ) @@ -96,7 +98,7 @@ func (serv *shipperServer) PublishEvents(_ context.Context, req *messages.Publis resp.PersistedIndex = serv.GetPersistedIndex() serv.logger.Debugf("shipper UUID does not match, all events rejected. Expected = %s, actual = %s", serv.uuid, req.Uuid) - return resp, nil + return resp, status.Error(codes.FailedPrecondition, fmt.Sprintf("UUID does not match. Expected = %s, actual = %s", serv.uuid, req.Uuid)) } for _, e := range req.Events { @@ -136,14 +138,12 @@ func (serv *shipperServer) PersistedIndex(req *messages.PersistedIndexRequest, p defer serv.logger.Debug("unsubscribed from persisted index change") persistedIndex := serv.GetPersistedIndex() - if persistedIndex != 0 { - err := producer.Send(&messages.PersistedIndexReply{ - Uuid: serv.uuid, - PersistedIndex: persistedIndex, - }) - if err != nil { - return err - } + err := producer.Send(&messages.PersistedIndexReply{ + Uuid: serv.uuid, + PersistedIndex: persistedIndex, + }) + if err != nil { + return err } pollingIntervalDur := req.PollingInterval.AsDuration() diff --git a/server/server_test.go b/server/server_test.go index 9229232..a21458b 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -123,17 +123,16 @@ func TestPublish(t *testing.T) { require.Equal(t, uint64(publisher.persistedIndex), pir.PersistedIndex) }) - t.Run("should return count = 0 when uuid does not match", func(t *testing.T) { + t.Run("should return an error when uuid does not match", func(t *testing.T) { publisher.q = make([]*messages.Event, 0, 3) events := []*messages.Event{e, e, e} reply, err := client.PublishEvents(ctx, &messages.PublishRequest{ Uuid: "wrong", Events: events, }) - require.NoError(t, err) - require.Equal(t, uint32(0), reply.AcceptedCount) - require.Equal(t, uint64(0), reply.AcceptedIndex) - require.Equal(t, uint64(publisher.persistedIndex), pir.PersistedIndex) + require.Error(t, err) + require.Contains(t, err.Error(), "UUID does not match") + require.Nil(t, reply) }) } From 56e96891ff068fde8ff6dfe62ba10b2dd91544ff Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 9 Aug 2022 09:08:21 +0200 Subject: [PATCH 18/18] Fix linter goimports issue --- server/server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/server.go b/server/server.go index 5f36826..7f508bd 100644 --- a/server/server.go +++ b/server/server.go @@ -13,9 +13,11 @@ import ( "time" "github.com/elastic/elastic-agent-libs/logp" + pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" "github.com/elastic/elastic-agent-shipper/queue" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status"