Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][gcs] - Removed bucket_timeout config option and replaced bucket context with parent program context #41970

Merged
merged 4 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Filebeat*

- Removed `bucket_timeout` config option for GCS input and replaced bucket context with parent program context. {issue}41107[41107] {pull}41970[41970]

*Heartbeat*

Expand Down
51 changes: 16 additions & 35 deletions x-pack/filebeat/docs/inputs/input-gcs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ The input can be configured to work with and without polling, though if polling
3. If any major error occurs which stops the main thread, the logs will be appropriately generated,
describing said error.

**Config Option Removal Notice** : The `bucket_timeout` config option has been removed from the google cloud storage input. The intention behind this removal is to simplify the configuration and to make it more user friendly. The `bucket_timeout` option was confusing and had the potential to let users malconfigure the input, which could lead to unexpected behavior. The input now uses a more robust and efficient way to handle the bucket timeout internally.

[id="supported-types-gcs"]
NOTE: Currently only `JSON` and `NDJSON` are supported object/file formats. Objects/files may be also be gzip compressed.
"JSON credential keys" and "credential files" are supported authentication types.
Expand All @@ -46,32 +48,30 @@ filebeat.inputs:
max_workers: 3
poll: true
poll_interval: 15s
bucket_timeout: 60s
- name: gcs-test-old
max_workers: 3
poll: true
poll_interval: 10s
bucket_timeout: 30s
----

*Explanation :*
This `configuration` given above describes a basic gcs config having two buckets named `gcs-test-new` and `gcs-test-old`.
Each of these buckets have their own attributes such as `name`, `max_workers`, `poll`, `poll_interval` and `bucket_timeout`. These attributes have detailed explanations
Each of these buckets have their own attributes such as `name`, `max_workers`, `poll` and `poll_interval`. These attributes have detailed explanations
given <<supported-attributes-gcs,below>>. For now lets try to understand how this config works.

For google cloud storage input to identify the files it needs to read and process, it will require the bucket names to be specified. We can have as
many buckets as we deem fit. We are also able to configure the attributes `max_workers`, `poll`, `poll_interval` and `bucket_timeout` at the root level, which will
many buckets as we deem fit. We are also able to configure the attributes `max_workers`, `poll` and `poll_interval` at the root level, which will
then be applied to all buckets which do not specify any of these attributes explicitly.

NOTE: If the attributes `max_workers`, `poll`, `poll_interval` and `bucket_timeout` are specified at the root level, these can still be overridden at the bucket level with
NOTE: If the attributes `max_workers`, `poll` and `poll_interval` are specified at the root level, these can still be overridden at the bucket level with
different values, thus offering extensive flexibility and customization. Examples <<bucket-overrides,below>> show this behavior.

On receiving this config the google cloud storage input will connect to the service and retrieve a `Storage Client` using the given `bucket_name` and
`auth.credentials_file`, then it will spawn two main go-routines, one for each bucket. After this each of these routines (threads) will initialize a scheduler
which will in turn use the `max_workers` value to initialize an in-memory worker pool (thread pool) with `3` `workers` available. Basically that equates to two instances of a worker pool, one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file).

NOTE: The scheduler is responsible for scheduling jobs, and uses the `maximum available workers` in the pool, at each iteration, to decide the number of files to retrieve and
process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. The `bucket_timeout` value is used to timeout calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value.
process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value.

*A Sample Response :-*
["source","json"]
Expand Down Expand Up @@ -156,15 +156,14 @@ Now let's explore the configuration attributes a bit more elaborately.
3. <<attrib-auth-credentials-file,auth.credentials_file.path>>
4. <<attrib-buckets,buckets>>
5. <<attrib-bucket-name,name>>
6. <<attrib-bucket-timeout,bucket_timeout>>
7. <<attrib-max_workers-gcs,max_workers>>
8. <<attrib-poll-gcs,poll>>
9. <<attrib-poll_interval-gcs,poll_interval>>
10. <<attrib-parse_json,parse_json>>
11. <<attrib-file_selectors-gcs,file_selectors>>
12. <<attrib-expand_event_list_from_field-gcs,expand_event_list_from_field>>
13. <<attrib-timestamp_epoch-gcs,timestamp_epoch>>
14. <<attrib-retry-gcs,retry>>
6. <<attrib-max_workers-gcs,max_workers>>
7. <<attrib-poll-gcs,poll>>
8. <<attrib-poll_interval-gcs,poll_interval>>
9. <<attrib-parse_json,parse_json>>
10. <<attrib-file_selectors-gcs,file_selectors>>
11. <<attrib-expand_event_list_from_field-gcs,expand_event_list_from_field>>
12. <<attrib-timestamp_epoch-gcs,timestamp_epoch>>
13. <<attrib-retry-gcs,retry>>


[id="attrib-project-id"]
Expand Down Expand Up @@ -195,24 +194,14 @@ specified, then the one that occurs first in the configuration will be used.
[float]
==== `buckets`

This attribute contains the details about a specific bucket like `name`, `max_workers`, `poll`, `poll_interval` and `bucket_timeout`. The attribute `name` is specific to a
bucket as it describes the bucket name, while the fields `max_workers`, `poll`, `poll_interval` and `bucket_timeout` can exist both at the bucket level and the root level.
This attribute is internally represented as an array, so we can add as many buckets as we require.
This attribute contains the details about a specific bucket like `name`, `max_workers`, `poll` and `poll_interval`. The attribute `name` is specific to a bucket as it describes the bucket name, while the fields `max_workers`, `poll` and `poll_interval` can exist both at the bucket level and the root level. This attribute is internally represented as an array, so we can add as many buckets as we require.

[id="attrib-bucket-name"]
[float]
==== `name`

This is a specific subfield of a bucket. It specifies the bucket name.

[id="attrib-bucket-timeout"]
[float]
==== `bucket_timeout`

This attribute defines the maximum amount of time after which a bucket operation will give and stop if no response is recieved (example: reading a file / listing a file).
It can be defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish.
If no value is specified for this, by default its initialized to `120 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The value of `bucket_timeout` that should be used depends on the size of the files and the network speed. If the timeout is too low, the input will not be able to read the file completely and `context_deadline_exceeded` errors will be seen in the logs. If the timeout is too high, the input will wait for a long time for the file to be read, which can cause the input to be slow. The ratio between the `bucket_timeout` and `poll_interval` should be considered while setting both the values. A low `poll_interval` and a very high `bucket_timeout` can cause resource utilization issues as schedule ops will be spawned every poll iteration. If previous poll ops are still running, this could result in concurrently running ops and so could cause a bottleneck over time.

[id="attrib-max_workers-gcs"]
[float]
==== `max_workers`
Expand All @@ -237,9 +226,7 @@ This attribute defines the maximum amount of time after which the internal sched
defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish.
Example : `10s` would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to `5 minutes`.
This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority
and override the root level values if both are specified. The `poll_interval` should be set to a value that is equal to the `bucket_timeout` value. This would ensure that another schedule operation is not started before the current buckets have all been processed. If the `poll_interval` is set to a value that is less than the `bucket_timeout`, then the input will start another schedule operation before the current one has finished, which can cause a bottleneck over time. Having a lower `poll_interval` can make the input faster at the cost of more resource utilization.

NOTE: Some edge case scenarios could require different values for `poll_interval` and `bucket_timeout`. For example, if the files are very large and the network speed is slow, then the `bucket_timeout` value should be set to a higher value than the `poll_interval`. This would ensure that polling operation does not wait too long for the files to be read and moves to the next iteration while the current one is still being processed. This would ensure a higher throughput and better resource utilization.
and override the root level values if both are specified. Having a lower `poll_interval` can make the input faster at the cost of more resource utilization.

[id="attrib-parse_json"]
[float]
Expand Down Expand Up @@ -322,7 +309,6 @@ filebeat.inputs:
max_workers: 3
poll: true
poll_interval: 15s
bucket_timeout: 60s
file_selectors:
- regex: '/Monitoring/'
- regex: 'docs/'
Expand Down Expand Up @@ -368,7 +354,6 @@ filebeat.inputs:
max_workers: 3
poll: true
poll_interval: 15s
bucket_timeout: 60s
expand_event_list_from_field: Records
----

Expand All @@ -392,7 +377,6 @@ filebeat.inputs:
max_workers: 3
poll: true
poll_interval: 15s
bucket_timeout: 60s
timestamp_epoch: 1630444800
----

Expand Down Expand Up @@ -432,11 +416,8 @@ filebeat.inputs:
max_workers: 3
poll: true
poll_interval: 11m
bucket_timeout: 10m
----

When configuring the `retry` attribute, the user should consider the `bucket_timeout` value. The `retry` attribute should be configured in such a way that the retries are completed within the `bucket_timeout` window. If the `retry` attribute is configured in such a way that the retries are not completed successfully within the `bucket_timeout` window, the input will suffer a `context timeout` for that specific object/file which it was retrying. This can cause gaps in ingested data to pile up over time.

[id="bucket-overrides"]
*The sample configs below will explain the bucket level overriding of attributes a bit further :-*

Expand Down
12 changes: 4 additions & 8 deletions x-pack/filebeat/input/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ type config struct {
// ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to
// false, since it can get expensive dealing with highly nested json data.
ParseJSON bool `config:"parse_json"`
// BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out.
BucketTimeOut time.Duration `config:"bucket_timeout"`
// Buckets - Defines a list of buckets that will be polled for objects.
Buckets []bucket `config:"buckets" validate:"required"`
// FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket.
Expand All @@ -58,7 +56,6 @@ type config struct {
type bucket struct {
Name string `config:"name" validate:"required"`
MaxWorkers *int `config:"max_workers" validate:"max=5000"`
BucketTimeOut *time.Duration `config:"bucket_timeout"`
Poll *bool `config:"poll"`
PollInterval *time.Duration `config:"poll_interval"`
ParseJSON *bool `config:"parse_json"`
Expand Down Expand Up @@ -136,11 +133,10 @@ func (c authConfig) Validate() error {
// defaultConfig returns the default configuration for the input
func defaultConfig() config {
return config{
MaxWorkers: 1,
Poll: true,
PollInterval: 5 * time.Minute,
BucketTimeOut: 120 * time.Second,
ParseJSON: false,
MaxWorkers: 1,
Poll: true,
PollInterval: 5 * time.Minute,
ParseJSON: false,
Retry: retryConfig{
MaxAttempts: 3,
InitialBackOffDuration: time.Second,
Expand Down
4 changes: 0 additions & 4 deletions x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
sources = append(sources, &Source{
ProjectId: config.ProjectId,
BucketName: bucket.Name,
BucketTimeOut: *bucket.BucketTimeOut,
MaxWorkers: *bucket.MaxWorkers,
Poll: *bucket.Poll,
PollInterval: *bucket.PollInterval,
Expand Down Expand Up @@ -93,9 +92,6 @@
if b.ParseJSON == nil {
b.ParseJSON = &cfg.ParseJSON
}
if b.BucketTimeOut == nil {
b.BucketTimeOut = &cfg.BucketTimeOut
}
if b.TimeStampEpoch == nil {
b.TimeStampEpoch = cfg.TimeStampEpoch
}
Expand Down Expand Up @@ -127,7 +123,7 @@
func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,
cursor cursor.Cursor, publisher cursor.Publisher) error {
st := newState()
currentSource := src.(*Source)

Check failure on line 126 in x-pack/filebeat/input/gcs/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)

log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
log.Infof("Running google cloud storage for project: %s", input.config.ProjectId)
Expand Down
1 change: 0 additions & 1 deletion x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
source = &Source{
ProjectId: in.config.ProjectId,
BucketName: bucket.Name,
BucketTimeOut: *bucket.BucketTimeOut,
MaxWorkers: *bucket.MaxWorkers,
Poll: *bucket.Poll,
PollInterval: *bucket.PollInterval,
Expand All @@ -67,7 +66,7 @@
}

st := newState()
currentSource := source.(*Source)

Check failure on line 69 in x-pack/filebeat/input/gcs/input_stateless.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)
log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
metrics := newInputMetrics(inputCtx.ID+":"+currentSource.BucketName, nil)
defer metrics.Close()
Expand Down
3 changes: 0 additions & 3 deletions x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,6 @@ func Test_StorageClient(t *testing.T) {
"max_workers": 1,
"poll": true,
"poll_interval": "1m",
"bucket_timeout": "1m",
"buckets": []map[string]interface{}{
{
"name": "gcs-test-new",
Expand All @@ -550,7 +549,6 @@ func Test_StorageClient(t *testing.T) {
"max_workers": 1,
"poll": true,
"poll_interval": "10s",
"bucket_timeout": "10s",
"retry": map[string]interface{}{
"max_attempts": 5,
"initial_backoff_duration": "1s",
Expand Down Expand Up @@ -578,7 +576,6 @@ func Test_StorageClient(t *testing.T) {
"max_workers": 1,
"poll": true,
"poll_interval": "10s",
"bucket_timeout": "10s",
"retry": map[string]interface{}{
"max_attempts": 5,
"initial_backoff_duration": "1s",
Expand Down
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
return
}
j.metrics.gcsObjectsPublishedTotal.Inc()
//nolint:gosec // object size cannot be negative hence this conversion is safe

Check failure on line 113 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

directive `//nolint:gosec // object size cannot be negative hence this conversion is safe` is unused for linter "gosec" (nolintlint)
j.metrics.gcsBytesProcessedTotal.Add(uint64(j.object.Size))

} else {
Expand Down Expand Up @@ -147,10 +147,8 @@
}

func (j *job) processAndPublishData(ctx context.Context, id string) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, j.src.BucketTimeOut)
defer cancel()
obj := j.bucket.Object(j.object.Name)
reader, err := obj.NewReader(ctxWithTimeout)
reader, err := obj.NewReader(ctx)
if err != nil {
return fmt.Errorf("failed to open reader for object: %s, with error: %w", j.object.Name, err)
}
Expand Down Expand Up @@ -197,7 +195,7 @@
var v mapstr.M
msg, v, err = dec.decodeValue()
if err != nil {
if err == io.EOF {

Check failure on line 198 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
break
Expand All @@ -206,7 +204,7 @@
} else {
msg, err = dec.decode()
if err != nil {
if err == io.EOF {

Check failure on line 207 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
break
Expand All @@ -222,7 +220,7 @@
for dec.next() {
msg, err := dec.decode()
if err != nil {
if err == io.EOF {

Check failure on line 223 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
break
Expand Down Expand Up @@ -385,7 +383,7 @@
// so the function can peek into the byte stream without consuming it. This makes it convenient for
// code executed after this function call to consume the stream if it wants.
func (j *job) addGzipDecoderIfNeeded(reader *bufio.Reader) (io.Reader, error) {
isStreamGzipped := false

Check failure on line 386 in x-pack/filebeat/input/gcs/job.go

View workflow job for this annotation

GitHub Actions / lint (linux)

assigned to isStreamGzipped, but reassigned without using the value (wastedassign)
// check if stream is gziped or not
buf, err := reader.Peek(3)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion x-pack/filebeat/input/gcs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
// Source, it is the cursor source
type Source struct {
BucketName string
BucketTimeOut time.Duration
ProjectId string
MaxWorkers int
Poll bool
Expand Down
Loading