Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove queue.Consumer #31502

Merged
merged 18 commits into from
May 12, 2022
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Removed deprecated disk spool from Beats. Use disk queue instead. {pull}28869[28869]
- Wildcard fields no longer have a default ignore_above setting of 1024. {issue}30096[30096] {pull}30668[30668]
- Remove `common.MapStr` and use `mapstr.M` from `github.com/elastic/elastic-agent-libs` instead. {pull}31420[31420]
- Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502]

==== Bugfixes

Expand Down
5 changes: 2 additions & 3 deletions libbeat/logp/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,12 @@ func DevelopmentSetup(options ...Option) error {

// TestingSetup configures logging by calling DevelopmentSetup if and only if
// verbose testing is enabled (as in 'go test -v').
func TestingSetup(options ...Option) error {
func TestingSetup(options ...Option) {
// Use the flag to avoid a dependency on the testing package.
f := flag.Lookup("test.v")
if f != nil && f.Value.String() == "true" {
return DevelopmentSetup(options...)
_ = DevelopmentSetup(options...)
}
return nil
}

// ObserverLogs provides the list of logs generated during the observation
Expand Down
10 changes: 3 additions & 7 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,14 @@ func TestClient(t *testing.T) {
},
}

if testing.Verbose() {
logp.TestingSetup()
}
logp.TestingSetup()

for name, test := range cases {
t.Run(name, func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

pipeline := makePipeline(Settings{}, makeBlockingQueue())
pipeline := makePipeline(Settings{}, makeTestQueue())
defer pipeline.Close()

var ctx context.Context
Expand Down Expand Up @@ -140,9 +138,7 @@ func TestClientWaitClose(t *testing.T) {

return p
}
if testing.Verbose() {
logp.TestingSetup()
}
logp.TestingSetup()

q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1})
pipeline := makePipeline(Settings{}, q)
Expand Down
13 changes: 2 additions & 11 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ func (c *eventConsumer) run() {
// The output channel (and associated parameters) that will receive
// the batches we're loading.
target consumerTarget

// The queue.Consumer we get the raw batches from. Reset whenever
// the target changes.
consumer queue.Consumer = c.queue.Consumer()
)

outerLoop:
Expand All @@ -130,7 +126,7 @@ outerLoop:
if queueBatch == nil && !pendingRead {
pendingRead = true
queueReader.req <- queueReaderRequest{
consumer: consumer,
queue: c.queue,
retryer: c,
batchSize: target.batchSize,
timeToLive: target.timeToLive,
Expand Down Expand Up @@ -175,11 +171,10 @@ outerLoop:
pendingRead = false

case req := <-c.retryChan:
alive := true
if req.decreaseTTL {
countFailed := len(req.batch.Events())

alive = req.batch.reduceTTL()
alive := req.batch.reduceTTL()

countDropped := countFailed - len(req.batch.Events())
c.observer.eventsDropped(countDropped)
Expand All @@ -197,10 +192,6 @@ outerLoop:
}
}

// Close the queue.Consumer, otherwise queueReader can get blocked
// waiting on a read.
consumer.Close()

// Close the queueReader request channel so it knows to shutdown.
close(queueReader.req)

Expand Down
16 changes: 9 additions & 7 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ func loadOutput(
monitors Monitors,
makeOutput OutputFactory,
) (outputs.Group, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
return outputs.Group{}, nil
}
Expand All @@ -138,7 +133,11 @@ func loadOutput(
if monitors.Metrics != nil {
metrics = monitors.Metrics.GetRegistry("output")
if metrics != nil {
metrics.Clear()
err := metrics.Clear()
if err != nil {
return outputs.Group{}, err
}

} else {
metrics = monitors.Metrics.NewRegistry("output")
}
Expand All @@ -156,7 +155,10 @@ func loadOutput(
if monitors.Telemetry != nil {
telemetry := monitors.Telemetry.GetRegistry("output")
if telemetry != nil {
telemetry.Clear()
err := telemetry.Clear()
if err != nil {
return outputs.Group{}, err
}
} else {
telemetry = monitors.Telemetry.NewRegistry("output")
}
Expand Down
84 changes: 10 additions & 74 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,14 @@ type testQueue struct {
close func() error
bufferConfig func() queue.BufferConfig
producer func(queue.ProducerConfig) queue.Producer
consumer func() queue.Consumer
get func(sz int) (queue.Batch, error)
}

type testProducer struct {
publish func(try bool, event publisher.Event) bool
cancel func() int
}

type testConsumer struct {
get func(sz int) (queue.Batch, error)
close func() error
}

func (q *testQueue) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, nil
}
Expand All @@ -67,11 +62,11 @@ func (q *testQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
return nil
}

func (q *testQueue) Consumer() queue.Consumer {
if q.consumer != nil {
return q.consumer()
func (q *testQueue) Get(sz int) (queue.Batch, error) {
if q.get != nil {
return q.get(sz)
}
return nil
return nil, nil
}

func (p *testProducer) Publish(event publisher.Event) bool {
Expand All @@ -95,39 +90,14 @@ func (p *testProducer) Cancel() int {
return 0
}

func (p *testConsumer) Get(sz int) (queue.Batch, error) {
if p.get != nil {
return p.get(sz)
}
return nil, nil
}

func (p *testConsumer) Close() error {
if p.close() != nil {
return p.close()
}
return nil
}

func makeBlockingQueue() queue.Queue {
return makeTestQueue(emptyConsumer, blockingProducer)
}

func makeTestQueue(
makeConsumer func() queue.Consumer,
makeProducer func(queue.ProducerConfig) queue.Producer,
) queue.Queue {
func makeTestQueue() queue.Queue {
var mux sync.Mutex
var wg sync.WaitGroup
consumers := map[*testConsumer]struct{}{}
producers := map[queue.Producer]struct{}{}

return &testQueue{
close: func() error {
mux.Lock()
for consumer := range consumers {
consumer.Close()
}
for producer := range producers {
producer.Cancel()
}
Expand All @@ -136,34 +106,14 @@ func makeTestQueue(
wg.Wait()
return nil
},

consumer: func() queue.Consumer {
var consumer *testConsumer
c := makeConsumer()
consumer = &testConsumer{
get: func(sz int) (queue.Batch, error) { return c.Get(sz) },
close: func() error {
err := c.Close()

mux.Lock()
defer mux.Unlock()
delete(consumers, consumer)
wg.Done()

return err
},
}

mux.Lock()
defer mux.Unlock()
consumers[consumer] = struct{}{}
wg.Add(1)
return consumer
get: func(count int) (queue.Batch, error) {
//<-done
return nil, nil
},

producer: func(cfg queue.ProducerConfig) queue.Producer {
var producer *testProducer
p := makeProducer(cfg)
p := blockingProducer(cfg)
producer = &testProducer{
publish: func(try bool, event publisher.Event) bool {
if try {
Expand Down Expand Up @@ -192,20 +142,6 @@ func makeTestQueue(
}
}

func emptyConsumer() queue.Consumer {
done := make(chan struct{})
return &testConsumer{
get: func(sz int) (queue.Batch, error) {
<-done
return nil, nil
},
close: func() error {
close(done)
return nil
},
}
}

func blockingProducer(_ queue.ProducerConfig) queue.Producer {
sig := make(chan struct{})
waiting := atomic.MakeInt(0)
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/pipeline/queue_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type queueReader struct {
}

type queueReaderRequest struct {
consumer queue.Consumer
queue queue.Queue
retryer retryer
batchSize int
timeToLive int
Expand All @@ -53,7 +53,7 @@ func (qr *queueReader) run(logger *logp.Logger) {
logger.Debug("pipeline event consumer queue reader: stop")
return
}
queueBatch, _ := req.consumer.Get(req.batchSize)
queueBatch, _ := req.queue.Get(req.batchSize)
var batch *ttlBatch
if queueBatch != nil {
batch = newBatch(req.retryer, queueBatch, req.timeToLive)
Expand Down
5 changes: 0 additions & 5 deletions libbeat/publisher/pipeline/sync_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ type ISyncClient interface {

// SyncClient wraps an existing beat.Client and provide a sync interface.
type SyncClient struct {
// Chain callbacks already defined in the original ClientConfig
ackCount func(int)
ackEvents func([]interface{})
ackLastEvent func(interface{})

client beat.Client
wg sync.WaitGroup
log *logp.Logger
Expand Down
20 changes: 7 additions & 13 deletions libbeat/publisher/pipeline/sync_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ func (d *dummyPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error)

func TestSyncClient(t *testing.T) {
receiver := func(c *dummyClient, sc *SyncClient) {
select {
case i := <-c.Received:
sc.onACK(i)
return
}
i := <-c.Received
sc.onACK(i)
}

t.Run("Publish", func(t *testing.T) {
Expand Down Expand Up @@ -109,7 +106,7 @@ func TestSyncClient(t *testing.T) {
sc.Wait()
})

t.Run("PublishAll multiple independant ACKs", func(t *testing.T) {
t.Run("PublishAll multiple independent ACKs", func(t *testing.T) {
c := newDummyClient()

pipeline := newDummyPipeline(c)
Expand All @@ -120,13 +117,10 @@ func TestSyncClient(t *testing.T) {
defer sc.Close()

go func(c *dummyClient, sc *SyncClient) {
select {
case <-c.Received:
// simulate multiple acks
sc.onACK(5)
sc.onACK(5)
return
}
<-c.Received
// simulate multiple acks
sc.onACK(5)
sc.onACK(5)
}(c, sc)

err = sc.PublishAll(make([]beat.Event, 10))
Expand Down
Loading