Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multiple beat.Clients #37657

Closed
wants to merge 14 commits into from
Closed
6 changes: 6 additions & 0 deletions x-pack/filebeat/input/gcppubsub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ func defaultConfig() config {
c.ForwarderConfig = harvester.ForwarderConfig{
Type: "gcp-pubsub",
}
// Since we now support multiple beat.Clients, having multiple NumGoroutines
// greatly improves ingestion performance. Hence, default values are being
// adjusted to get the most of the input.
// It is not increased too high to cause high CPU usage.
c.Subscription.NumGoroutines = 2
// The input gets blocked until flush.min_events or flush.timeout is reached.
// Hence max_outstanding_message has to be atleast flush.min_events to avoid this blockage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Hence max_outstanding_message has to be atleast flush.min_events to avoid this blockage.
// Hence max_outstanding_message has to be at least flush.min_events to avoid this blockage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated as suggested

c.Subscription.MaxOutstandingMessages = 1600
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that there is not a globally visible variable/constant that could be used for this that would ensure that this is both explained in code an robust to source mutation.

Is there a way for the input to know flush.min_events and adjust this to be max(flush.min_events, max_outstanding_messages)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the queue config is defined inside publisher, I don't see any quick way to get this check without a considerable refactor of Input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. It was a wistful desire rather than an expectation of change.

c.Subscription.Create = true
return c
Expand Down
7 changes: 2 additions & 5 deletions x-pack/filebeat/input/gcppubsub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func (in *pubsubInput) run() error {
var workerWg sync.WaitGroup

for ctx.Err() == nil {

workers, err := in.workerSem.AcquireContext(numGoRoutines, ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should proceed the approach of adding multiple pub/sub readers (pending resolution to previous comments), but if we do, then:

I think this can be simplified to not use a semaphore. A plain for loop (e.g. for i := 0; i < number of readers; i++ { go runSinglePubSubClient() }) seems to achieve the same result given that if any one "worker" fails they all stop due to cancel().

if err != nil {
break
Expand All @@ -231,13 +230,12 @@ func (in *pubsubInput) run() error {
go func() {
client, err := in.newPubsubClient(ctx)
defer func() {
workerWg.Done()
in.workerSem.Release(1)
workerWg.Done()
}()
if err != nil {
in.log.Error("failed to create pub/sub client: ", err)
in.log.Errorw("failed to create pub/sub client: ", "error", err)
cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this should be calling return after cancel()?

// return err
} else {
defer client.Close()
}
Expand All @@ -247,7 +245,6 @@ func (in *pubsubInput) run() error {
if err != nil {
in.log.Error("failed to subscribe to pub/sub topic: ", err)
cancel()
// return fmt.Errorf("failed to subscribe to pub/sub topic: %w", err)
}
sub.ReceiveSettings.NumGoroutines = in.Subscription.NumGoroutines
sub.ReceiveSettings.MaxOutstandingMessages = in.Subscription.MaxOutstandingMessages
Expand Down
4 changes: 0 additions & 4 deletions x-pack/filebeat/input/gcppubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import (
"context"
"errors"
"io/ioutil"

Check failure on line 10 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

SA1019: "io/ioutil" has been deprecated since Go 1.19: As of Go 1.16, the same functionality is now provided by package [io] or package [os], and those implementations should be preferred in new code. See the specific function documentation for details. (staticcheck)
"net/http"
"os"
"strconv"
Expand Down Expand Up @@ -340,10 +340,6 @@

runTest(t, cfg, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) {
require.Error(t, input.run())
// err := input.run()
// if assert.Error(t, err) {
// assert.Contains(t, err.Error(), "failed to subscribe to pub/sub topic")
// }
})
}

Expand Down
Loading