Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

Commit

Permalink
Add package and API skeleton for internal queue (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored May 26, 2022
1 parent bfd5836 commit ca42ed1
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 36 deletions.
16 changes: 8 additions & 8 deletions monitoring/queuemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"fmt"
"time"

outqueue "github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/opt"

"github.com/elastic/elastic-agent-shipper/monitoring/reporter"
"github.com/elastic/elastic-agent-shipper/queue"

"github.com/elastic/elastic-agent-shipper/monitoring/reporter/expvar"
"github.com/elastic/elastic-agent-shipper/monitoring/reporter/log"
Expand All @@ -26,8 +26,8 @@ type QueueMonitor struct {
interval time.Duration
done chan struct{}
// handler for the event queue
queue outqueue.Queue
log *logp.Logger
target queue.MetricsSource
log *logp.Logger
// enabled is a awkward no-op if a user has disabled monitoring
enabled bool

Expand Down Expand Up @@ -59,16 +59,16 @@ func DefaultConfig() Config {
}

// NewFromConfig creates a new queue monitor from a pre-filled config struct.
func NewFromConfig(cfg Config, queue outqueue.Queue) (*QueueMonitor, error) {
func NewFromConfig(cfg Config, target queue.MetricsSource) (*QueueMonitor, error) {
// the queue == nil is largely a shim to make things not panic while we wait for the queues to get hooked up.
if !cfg.Enabled || queue == nil {
if !cfg.Enabled || target == nil {
return &QueueMonitor{enabled: true}, nil
}
//init reporters
reporters := initReporters(cfg)
return &QueueMonitor{
interval: cfg.Interval,
queue: queue,
target: target,
done: make(chan struct{}),
log: logp.L(),
reporters: reporters,
Expand Down Expand Up @@ -112,7 +112,7 @@ func (mon QueueMonitor) End() {

// updateMetrics is responsible for fetching the metrics from the queue, calculating whatever it needs to, and sending the complete events to the output
func (mon *QueueMonitor) updateMetrics() error {
raw, err := mon.queue.Metrics()
raw, err := mon.target.Metrics()
if err != nil {
return fmt.Errorf("error fetching queue Metrics: %w", err)
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func initReporters(cfg Config) []reporter.Reporter {
// This is a wrapper to deal with the multiple queue metric "types",
// as we could either be dealing with event counts, or bytes.
// The reporting interfaces assumes we only want one.
func getLimits(raw outqueue.Metrics) (uint64, uint64, bool, error) {
func getLimits(raw queue.Metrics) (uint64, uint64, bool, error) {

//bias towards byte count, as it's a little more granular.
if raw.ByteCount.Exists() && raw.ByteLimit.Exists() {
Expand Down
22 changes: 1 addition & 21 deletions monitoring/queuemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/opt"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
expvarReport "github.com/elastic/elastic-agent-shipper/monitoring/reporter/expvar"
"github.com/elastic/elastic-agent-shipper/queue"
)

func init() {
Expand Down Expand Up @@ -56,26 +56,6 @@ func NewTestQueue(limit uint64) *TestMetricsQueue {
}
}

// BufferConfig doesn't do anything
func (tq TestMetricsQueue) BufferConfig() queue.BufferConfig {
return queue.BufferConfig{}
}

// Producer doesn't do anything
func (tq TestMetricsQueue) Producer(_ queue.ProducerConfig) queue.Producer {
return nil
}

// Consumer doesn't do anything
func (tq TestMetricsQueue) Consumer() queue.Consumer {
return nil
}

// Close Doesn't do anything
func (tq TestMetricsQueue) Close() error {
return nil
}

// Metrics spoofs the metrics output
func (tq *TestMetricsQueue) Metrics() (queue.Metrics, error) {
tq.metricState.EventCount = opt.UintWith(tq.metricState.EventCount.ValueOr(0) + 1)
Expand Down
49 changes: 49 additions & 0 deletions queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package queue

import (
"fmt"

beatsqueue "github.com/elastic/beats/v7/libbeat/publisher/queue"

"github.com/elastic/elastic-agent-shipper/api"
)

// Queue is a shipper-specific wrapper around the bare libbeat queue.
// It accepts api.Event instead of bare interface pointers like the
// libbeat queue, and it sets opinionated defaults for the queue
// semantics. The intention is to keep the shipper from becoming too
// entangled with the legacy queue api, and to gradually expose more
// features as the libbeat queue evolves and we decide what we want
// to support in the shipper.
type Queue struct {
eventQueue beatsqueue.Queue

//producer beatsqueue.Producer
}

type Metrics beatsqueue.Metrics

// metricsSource is a wrapper around the libbeat queue interface, exposing only
// the callback to query the current metrics. It is used to pass queue metrics
// to the monitoring package.
type MetricsSource interface {
Metrics() (Metrics, error)
}

func New() (*Queue, error) {
return &Queue{}, nil
}

func (queue *Queue) Publish(event *api.Event) error {
return fmt.Errorf("couldn't publish: Queue.Publish is not implemented")
}

func (queue *Queue) Metrics() (Metrics, error) {
metrics, err := queue.eventQueue.Metrics()
// We need to do the explicit cast, otherwise this isn't recognized as the same type
return Metrics(metrics), err
}

func (queue *Queue) Close() {

}
20 changes: 13 additions & 7 deletions server/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-shipper/config"
"github.com/elastic/elastic-agent-shipper/monitoring"
"github.com/elastic/elastic-agent-shipper/queue"

pb "github.com/elastic/elastic-agent-shipper/api"
)
Expand Down Expand Up @@ -62,14 +63,22 @@ func handleShutdown(stopFunc func(), log *logp.Logger) {
// Run starts the gRPC server
func Run(cfg config.ShipperConfig) error {
log := logp.L()

// When there is queue-specific configuration in ShipperConfig, it should
// be passed in here.
queue, err := queue.New()
if err != nil {
return fmt.Errorf("couldn't create queue: %w", err)
}

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Port))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}

// Beats won't call the "New*" functions of a queue directly, but instead fetch a queueFactory from the global registers.
//However, that requires the publisher/pipeline code as well, and I'm not sure we want that.
monHandler, err := loadMonitoring(cfg)
monHandler, err := loadMonitoring(cfg, queue)
if err != nil {
return fmt.Errorf("error loading outputs: %w", err)
}
Expand All @@ -91,6 +100,7 @@ func Run(cfg config.ShipperConfig) error {
shutdownFunc := func() {
grpcServer.GracefulStop()
monHandler.End()
queue.Close()
}
handleShutdown(shutdownFunc, log)
log.Debugf("gRPC server is listening on port %d", cfg.Port)
Expand All @@ -99,13 +109,9 @@ func Run(cfg config.ShipperConfig) error {
}

// Initialize metrics and outputs
func loadMonitoring(cfg config.ShipperConfig) (*monitoring.QueueMonitor, error) {
//If we had an actual queue hooked up, that would go here
//queue := NewTestQueue()

func loadMonitoring(cfg config.ShipperConfig, queue *queue.Queue) (*monitoring.QueueMonitor, error) {
//startup monitor
//remove the nil in the second argument here when we have an actual queue.
mon, err := monitoring.NewFromConfig(cfg.Monitor, nil)
mon, err := monitoring.NewFromConfig(cfg.Monitor, queue)
if err != nil {
return nil, fmt.Errorf("error initializing output monitor: %w", err)
}
Expand Down
13 changes: 13 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ import (

"github.com/elastic/elastic-agent-libs/logp"
pb "github.com/elastic/elastic-agent-shipper/api"
"github.com/elastic/elastic-agent-shipper/queue"
)

type shipperServer struct {
logger *logp.Logger

queue *queue.Queue

pb.UnimplementedProducerServer
}

Expand All @@ -24,6 +28,15 @@ func (serv shipperServer) PublishEvents(_ context.Context, req *pb.PublishReques
results := []*pb.EventResult{}
for _, evt := range req.Events {
serv.logger.Infof("Got event %s: %#v", evt.EventId, evt.Fields.AsMap())
err := serv.queue.Publish(evt)
if err != nil {
// If we couldn't accept any events, return the error directly. Otherwise,
// just return success on however many events we were able to handle.
if len(results) == 0 {
return nil, err
}
break
}
res := pb.EventResult{EventId: evt.EventId, Timestamp: pbts.Now()}
results = append(results, &res)
}
Expand Down

0 comments on commit ca42ed1

Please sign in to comment.