Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic when more than 32767 pipeline clients are active #38556

Merged
merged 9 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Rename `queue.Batch.ACK()` to `queue.Batch.Done()`. {pull}31903[31903]
- `queue.ACKListener` has been removed. Queue configurations now accept an explicit callback function for ACK handling. {pull}35078[35078]
- Split split httpmon out of x-pack/filebeat/input/internal/httplog. {pull}36385[36385]
- Beats publishing pipeline does not propagate the close signal to its clients any more. It's responsibility of the user to close the pipeline client. {issue}38197[38197] {pull}38556[38556]

==== Bugfixes

Expand Down Expand Up @@ -93,6 +94,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Make winlogbeat/sys/wineventlog follow the unsafe.Pointer rules. {pull}36650[36650]
- Cleaned up documentation errors & fixed a minor bug in Filebeat Azure blob storage input. {pull}36714[36714]
- Fix copy arguments for strict aligned architectures. {pull}36976[36976]
- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556]

==== Added

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]
- Fix indexing failures by re-enabling event normalisation in netflow input. {issue}38703[38703] {pull}38780[38780]
- Fix handling of truncated files in Filestream {issue}38070[38070] {pull}38416[38416]
- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ func startHarvester(
defer releaseResource(resource)

client, err := hg.pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
EventListener: newInputACKHandler(hg.ackCH),
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error {
}
}),
),
CloseRef: ctx.Cancelation,
WaitClose: input.config.WaitClose,
})
if err != nil {
return err
}
defer client.Close()

log.Info("Starting Kafka input")
defer log.Info("Kafka input stopped")
Expand Down
1 change: 0 additions & 1 deletion filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (inp *managedInput) runSource(
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
EventListener: newInputACKHandler(ctx.Logger),
})
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions filebeat/input/v2/input-stateless/stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) (

client, err := pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.DefaultGuarantees,

// configure pipeline to disconnect input on stop signal.
CloseRef: ctx.Cancelation,
})
if err != nil {
return err
Expand Down
9 changes: 5 additions & 4 deletions filebeat/input/v2/input-stateless/stateless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,23 @@ func TestStateless_Run(t *testing.T) {
},
}), nil)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// connector creates a client the blocks forever until the shutdown signal is received
var publishCalls atomic.Int
connector := pubtest.FakeConnector{
ConnectFunc: func(config beat.ClientConfig) (beat.Client, error) {
return &pubtest.FakeClient{
PublishFunc: func(event beat.Event) {
publishCalls.Inc()
<-config.CloseRef.Done()
// Unlock Publish once the input has been cancelled
<-ctx.Done()
},
}, nil
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
var err error
wg.Add(1)
Expand Down
9 changes: 0 additions & 9 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ type ClientConfig struct {

Processing ProcessingConfig

CloseRef CloseRef

// WaitClose sets the maximum duration to wait on ACK, if client still has events
// active non-acknowledged events in the publisher pipeline.
// WaitClose is only effective if one of ACKCount, ACKEvents and ACKLastEvents
Expand Down Expand Up @@ -91,13 +89,6 @@ type EventListener interface {
ClientClosed()
}

// CloseRef allows users to close the client asynchronously.
// A CloseRef implements a subset of function required for context.Context.
type CloseRef interface {
Done() <-chan struct{}
Err() error
}

// ProcessingConfig provides additional event processing settings a client can
// pass to the publisher pipeline on Connect.
type ProcessingConfig struct {
Expand Down
8 changes: 2 additions & 6 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ type client struct {
eventWaitGroup *sync.WaitGroup

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once
closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed.
done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once

observer observer
eventListener beat.EventListener
Expand Down Expand Up @@ -137,8 +135,6 @@ func (c *client) Close() error {
// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.closeOnce.Do(func() {
close(c.done)

c.isOpen.Store(false)
c.onClosing()

Expand Down
15 changes: 4 additions & 11 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package pipeline

import (
"context"
"errors"
"io"
"sync"
Expand Down Expand Up @@ -95,15 +94,7 @@ func TestClient(t *testing.T) {
pipeline := makePipeline(t, Settings{}, makeTestQueue())
defer pipeline.Close()

var ctx context.Context
var cancel func()
if test.context {
ctx, cancel = context.WithCancel(context.Background())
}

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx,
})
client, err := pipeline.ConnectWith(beat.ClientConfig{})
if err != nil {
t.Fatal(err)
}
Expand All @@ -116,7 +107,9 @@ func TestClient(t *testing.T) {
client.Publish(beat.Event{})
}()

test.close(client, cancel)
test.close(client, func() {
client.Close()
})
wg.Wait()
})
}
Expand Down
92 changes: 2 additions & 90 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import (
"fmt"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -73,8 +72,8 @@
eventWaitGroup *sync.WaitGroup

// closeRef signal propagation support
guardStartSigPropagation sync.Once

Check failure on line 75 in libbeat/publisher/pipeline/pipeline.go

View workflow job for this annotation

GitHub Actions / lint (windows)

field `guardStartSigPropagation` is unused (unused)

Check failure on line 75 in libbeat/publisher/pipeline/pipeline.go

View workflow job for this annotation

GitHub Actions / lint (linux)

field `guardStartSigPropagation` is unused (unused)
sigNewClient chan *client

Check failure on line 76 in libbeat/publisher/pipeline/pipeline.go

View workflow job for this annotation

GitHub Actions / lint (windows)

field `sigNewClient` is unused (unused)

Check failure on line 76 in libbeat/publisher/pipeline/pipeline.go

View workflow job for this annotation

GitHub Actions / lint (linux)

field `sigNewClient` is unused (unused)

processors processing.Supporter
}
Expand Down Expand Up @@ -197,9 +196,6 @@
p.outputController.Close()

p.observer.cleanup()
if p.sigNewClient != nil {
close(p.sigNewClient)
}
return nil
}

Expand All @@ -212,6 +208,8 @@
// The client behavior on close and ACK handling can be configured by setting
// the appropriate fields in the passed ClientConfig.
// If not set otherwise the defaut publish mode is OutputChooses.
//
// It is responsibility of the caller to close the client.
func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
var (
canDrop bool
Expand Down Expand Up @@ -239,8 +237,6 @@

client := &client{
logger: p.monitors.Logger,
closeRef: cfg.CloseRef,
done: make(chan struct{}),
isOpen: atomic.MakeBool(true),
clientListener: cfg.ClientListener,
processors: processors,
Expand Down Expand Up @@ -295,93 +291,9 @@
}

p.observer.clientConnected()

if client.closeRef != nil {
p.registerSignalPropagation(client)
}

return client, nil
}

func (p *Pipeline) registerSignalPropagation(c *client) {
p.guardStartSigPropagation.Do(func() {
p.sigNewClient = make(chan *client, 1)
go p.runSignalPropagation()
})
p.sigNewClient <- c
}

func (p *Pipeline) runSignalPropagation() {
var channels []reflect.SelectCase
var clients []*client

channels = append(channels, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(p.sigNewClient),
})

for {
chosen, recv, recvOK := reflect.Select(channels)
if chosen == 0 {
if !recvOK {
// sigNewClient was closed
return
}

// new client -> register client for signal propagation.
if client := recv.Interface().(*client); client != nil {
channels = append(channels,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.closeRef.Done()),
},
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.done),
},
)
clients = append(clients, client)
}
continue
}

// find client we received a signal for. If client.done was closed, then
// we have to remove the client only. But if closeRef did trigger the signal, then
// we have to propagate the async close to the client.
// In either case, the client will be removed

i := (chosen - 1) / 2
isSig := (chosen & 1) == 1
if isSig {
client := clients[i]
client.Close()
}

// remove:
last := len(clients) - 1
ch1 := i*2 + 1
ch2 := ch1 + 1
lastCh1 := last*2 + 1
lastCh2 := lastCh1 + 1

clients[i], clients[last] = clients[last], nil
channels[ch1], channels[lastCh1] = channels[lastCh1], reflect.SelectCase{}
channels[ch2], channels[lastCh2] = channels[lastCh2], reflect.SelectCase{}

clients = clients[:last]
channels = channels[:lastCh1]
if cap(clients) > 10 && len(clients) <= cap(clients)/2 {
clientsTmp := make([]*client, len(clients))
copy(clientsTmp, clients)
clients = clientsTmp

channelsTmp := make([]reflect.SelectCase, len(channels))
copy(channelsTmp, channels)
channels = channelsTmp
}
}
}

func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) {
if p.processors == nil {
return nil, nil
Expand Down
Loading
Loading