From 5a8f9cba63044b1b14ed35a3c6216d67b838ca91 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 7 Jun 2021 14:52:38 -0600 Subject: [PATCH 1/4] Add log_group_name_prefix config option for aws-cloudwatch input --- x-pack/filebeat/input/awscloudwatch/config.go | 15 ++-- x-pack/filebeat/input/awscloudwatch/input.go | 71 ++++++++++++++++--- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/x-pack/filebeat/input/awscloudwatch/config.go b/x-pack/filebeat/input/awscloudwatch/config.go index 7f1e4ca3ccdc..210e7bb9a319 100644 --- a/x-pack/filebeat/input/awscloudwatch/config.go +++ b/x-pack/filebeat/input/awscloudwatch/config.go @@ -16,6 +16,7 @@ type config struct { harvester.ForwarderConfig `config:",inline"` LogGroupARN string `config:"log_group_arn"` LogGroupName string `config:"log_group_name"` + LogGroupNamePrefix string `config:"log_group_name_prefix"` RegionName string `config:"region_name"` LogStreams []string `config:"log_streams"` LogStreamPrefix string `config:"log_stream_prefix"` @@ -44,13 +45,17 @@ func (c *config) Validate() error { "either 'beginning' or 'end'") } - if c.LogGroupARN == "" && c.LogGroupName == "" { - return errors.New("log_group_arn and log_group_name config parameter" + - "cannot be both empty") + if c.LogGroupARN == "" && c.LogGroupName == "" && c.LogGroupNamePrefix == "" { + return errors.New("log_group_arn, log_group_name and log_group_name_prefix config parameter" + + "cannot all be empty") } - if c.LogGroupName != "" && c.RegionName == "" { - return errors.New("region_name is required when log_group_name " + + if c.LogGroupName != "" && c.LogGroupNamePrefix != "" { + return errors.New("log_group_name and log_group_name_prefix cannot be given at the same time") + } + + if (c.LogGroupName != "" || c.LogGroupNamePrefix != "") && c.RegionName == "" { + return errors.New("region_name is required when log_group_name or log_group_name_prefix " + "config parameter is given") } return nil diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 2f3563a369e8..7752e7dee6c4 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -131,15 +131,28 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con // Run runs the input func (in *awsCloudWatchInput) Run() { - in.workerOnce.Do(func() { - in.workerWg.Add(1) - go func() { - in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", in.config.LogGroupName) - defer in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", in.config.LogGroupName) - defer in.workerWg.Done() - in.run() - }() - }) + cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AwsConfig.Endpoint, "cloudwatchlogs", in.config.RegionName, in.awsConfig) + svc := cloudwatchlogs.New(cwConfig) + if in.config.LogGroupNamePrefix != "" { + logGroupNames, err := in.getLogGroupNames(svc) + if err != nil { + in.logger.Error("getLogGroupNames failed: ", err) + return + } + + for _, logGroup := range logGroupNames { + in.config.LogGroupName = logGroup + in.workerOnce.Do(func() { + in.workerWg.Add(1) + go func() { + in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", in.config.LogGroupName) + defer in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", in.config.LogGroupName) + defer in.workerWg.Done() + in.run() + }() + }) + } + } } func (in *awsCloudWatchInput) run() { @@ -176,6 +189,46 @@ func parseARN(logGroupARN string) (string, string, error) { return "", "", errors.Errorf("cannot get log group name from log group ARN: %s", logGroupARN) } +// getLogGroupNames uses DescribeLogGroups API to retrieve all log group names +func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI) ([]string, error) { + ctx, cancelFn := context.WithTimeout(in.inputCtx, in.config.APITimeout) + defer cancelFn() + + init := true + nextToken := "" + var logGroupNames []string + for nextToken != "" || init { + // construct DescribeLogGroupsInput + filterLogEventsInput := &cloudwatchlogs.DescribeLogGroupsInput{ + LogGroupNamePrefix: awssdk.String(in.config.LogGroupName), + } + + // make API request + req := svc.DescribeLogGroupsRequest(filterLogEventsInput) + resp, err := req.Send(ctx) + if err != nil { + in.logger.Error("failed DescribeLogGroupsRequest", err) + return logGroupNames, err + } + + // get token for next API call, if resp.NextToken is nil, nextToken set to "" + nextToken = "" + if resp.NextToken != nil { + nextToken = *resp.NextToken + } + + logGroups := resp.LogGroups + in.logger.Debugf("Collecting from #%v log groups", len(logGroups)) + + for _, logGroup := range logGroups { + logGroupNames = append(logGroupNames, *logGroup.LogGroupName) + } + init = false + } + + return logGroupNames, nil +} + // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI) error { ctx, cancelFn := context.WithTimeout(in.inputCtx, in.config.APITimeout) From 89dd7f4867361a1d0a154c8032ba402432b2b7dc Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 23 Jun 2021 05:45:30 -0600 Subject: [PATCH 2/4] add changelog --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-aws-cloudwatch.asciidoc | 9 +++- x-pack/filebeat/input/awscloudwatch/input.go | 46 +++++++------------ 3 files changed, 25 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 40506f5bbbf2..eaeb96806c78 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -808,6 +808,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add fingerprint processor to generate fixed ids for `google_workspace` events. {pull}25841[25841] - Update PanOS module to parse HIP Match logs. {issue}24350[24350] {pull}25686[25686] - Enhance GCP module to populate orchestrator.* fields for GKE / K8S logs {pull}25368[25368] +- Add log_group_name_prefix config into aws-cloudwatch input. {pull}26187[26187] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc index d39ec5508687..50208c643c57 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc @@ -45,9 +45,15 @@ ARN of the log group to collect logs from. [float] ==== `log_group_name` -Name of the log group to collect logs from. Note: region_name is required when +Name of the log group to collect logs from. Note: `region_name` is required when log_group_name is given. +[float] +==== `log_group_name_prefix` +The prefix for a group of log group names. Note: `region_name` is required when +log_group_name_prefix is given. `log_group_name` and `log_group_name_prefix` +cannot be given at the same time. + [float] ==== `region_name` Region that the specified log group belongs to. @@ -109,6 +115,7 @@ Please see <> for more details. === AWS Permissions Specific AWS permissions are required for IAM user to access aws-cloudwatch: ---- +cloudwatchlogs:DescribeLogGroups logs:FilterLogEvents ---- diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 7752e7dee6c4..29b937d6fb35 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -191,41 +191,27 @@ func parseARN(logGroupARN string) (string, string, error) { // getLogGroupNames uses DescribeLogGroups API to retrieve all log group names func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI) ([]string, error) { - ctx, cancelFn := context.WithTimeout(in.inputCtx, in.config.APITimeout) - defer cancelFn() + // construct DescribeLogGroupsInput + filterLogEventsInput := &cloudwatchlogs.DescribeLogGroupsInput{ + LogGroupNamePrefix: awssdk.String(in.config.LogGroupNamePrefix), + } - init := true - nextToken := "" + // make API request + req := svc.DescribeLogGroupsRequest(filterLogEventsInput) + p := cloudwatchlogs.NewDescribeLogGroupsPaginator(req) var logGroupNames []string - for nextToken != "" || init { - // construct DescribeLogGroupsInput - filterLogEventsInput := &cloudwatchlogs.DescribeLogGroupsInput{ - LogGroupNamePrefix: awssdk.String(in.config.LogGroupName), - } - - // make API request - req := svc.DescribeLogGroupsRequest(filterLogEventsInput) - resp, err := req.Send(ctx) - if err != nil { - in.logger.Error("failed DescribeLogGroupsRequest", err) - return logGroupNames, err + for p.Next(context.TODO()) { + page := p.CurrentPage() + in.logger.Debugf("Collecting #%v log group names", len(page.LogGroups)) + for _, lg := range page.LogGroups { + logGroupNames = append(logGroupNames, *lg.LogGroupName) } - - // get token for next API call, if resp.NextToken is nil, nextToken set to "" - nextToken = "" - if resp.NextToken != nil { - nextToken = *resp.NextToken - } - - logGroups := resp.LogGroups - in.logger.Debugf("Collecting from #%v log groups", len(logGroups)) - - for _, logGroup := range logGroups { - logGroupNames = append(logGroupNames, *logGroup.LogGroupName) - } - init = false } + if err := p.Err(); err != nil { + in.logger.Error("failed DescribeLogGroupsRequest: ", err) + return logGroupNames, err + } return logGroupNames, nil } From bfc2817552866aa75f95cff56f32738dd2547fb5 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 23 Jun 2021 06:05:02 -0600 Subject: [PATCH 3/4] update documentation and default config --- .../filebeat.inputs.reference.xpack.yml.tmpl | 57 +++++++++++++++++++ .../docs/inputs/input-aws-cloudwatch.asciidoc | 2 +- x-pack/filebeat/filebeat.reference.yml | 57 +++++++++++++++++++ 3 files changed, 115 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 8b2b53fa8c74..13cb37c0f9cc 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -76,3 +76,60 @@ # List of S3 object metadata keys to include in events. #include_s3_metadata: [] + +#------------------------------ AWS CloudWatch input -------------------------------- +# Beta: Config options for AWS CloudWatch input +#- type: aws-cloudwatch + #enabled: false + + # AWS Credentials + # If access_key_id and secret_access_key are configured, then use them to make api calls. + # If not, aws-cloudwatch input will load default AWS config or load with given profile name. + #access_key_id: '${AWS_ACCESS_KEY_ID:""}' + #secret_access_key: '${AWS_SECRET_ACCESS_KEY:""}' + #session_token: '${AWS_SESSION_TOKEN:"”}' + #credential_profile_name: test-aws-s3-input + + # ARN of the log group to collect logs from + #log_group_arn: "arn:aws:logs:us-east-1:428152502467:log-group:test:*" + + # Name of the log group to collect logs from. + # Note: region_name is required when log_group_name is given. + #log_group_name: test + + # The prefix for a group of log group names. + # Note: `region_name` is required when `log_group_name_prefix` is given. + # `log_group_name` and `log_group_name_prefix` cannot be given at the same time. + #log_group_name_prefix: /aws/ + + # Region that the specified log group or log group prefix belongs to. + #region_name: us-east-1 + + # A list of strings of log streams names that Filebeat collect log events from. + #log_streams: + # - log_stream_name + + # A string to filter the results to include only log events from log streams + # that have names starting with this prefix. + #log_stream_prefix: test + + # `start_position` allows user to specify if this input should read log files + # from the `beginning` or from the `end`. + # `beginning`: reads from the beginning of the log group (default). + # `end`: read only new messages from current time minus `scan_frequency` going forward. + #start_position: beginning + + # This config parameter sets how often Filebeat checks for new log events from the + # specified log group. Default `scan_frequency` is 1 minute, which means Filebeat + # will sleep for 1 minute before querying for new logs again. + #scan_frequency: 1m + + # The maximum duration of AWS API can take. If it exceeds the timeout, AWS API + # will be interrupted. + # The default AWS API timeout for a message is 120 seconds. + # The minimum is 0 seconds. + #api_timeout: 120s + + # This is used to sleep between AWS `FilterLogEvents` API calls inside the same + # collection period. + #api_sleep: 200ms diff --git a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc index 50208c643c57..e04eaa8b9e21 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc @@ -56,7 +56,7 @@ cannot be given at the same time. [float] ==== `region_name` -Region that the specified log group belongs to. +Region that the specified log group or log group prefix belongs to. [float] ==== `log_streams` diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index eb54d2f62b90..b0f2f2db4b37 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3060,6 +3060,63 @@ filebeat.inputs: # List of S3 object metadata keys to include in events. #include_s3_metadata: [] +#------------------------------ AWS CloudWatch input -------------------------------- +# Beta: Config options for AWS CloudWatch input +#- type: aws-cloudwatch + #enabled: false + + # AWS Credentials + # If access_key_id and secret_access_key are configured, then use them to make api calls. + # If not, aws-cloudwatch input will load default AWS config or load with given profile name. + #access_key_id: '${AWS_ACCESS_KEY_ID:""}' + #secret_access_key: '${AWS_SECRET_ACCESS_KEY:""}' + #session_token: '${AWS_SESSION_TOKEN:"”}' + #credential_profile_name: test-aws-s3-input + + # ARN of the log group to collect logs from + #log_group_arn: "arn:aws:logs:us-east-1:428152502467:log-group:test:*" + + # Name of the log group to collect logs from. + # Note: region_name is required when log_group_name is given. + #log_group_name: test + + # The prefix for a group of log group names. + # Note: `region_name` is required when `log_group_name_prefix` is given. + # `log_group_name` and `log_group_name_prefix` cannot be given at the same time. + #log_group_name_prefix: /aws/ + + # Region that the specified log group or log group prefix belongs to. + #region_name: us-east-1 + + # A list of strings of log streams names that Filebeat collect log events from. + #log_streams: + # - log_stream_name + + # A string to filter the results to include only log events from log streams + # that have names starting with this prefix. + #log_stream_prefix: test + + # `start_position` allows user to specify if this input should read log files + # from the `beginning` or from the `end`. + # `beginning`: reads from the beginning of the log group (default). + # `end`: read only new messages from current time minus `scan_frequency` going forward. + #start_position: beginning + + # This config parameter sets how often Filebeat checks for new log events from the + # specified log group. Default `scan_frequency` is 1 minute, which means Filebeat + # will sleep for 1 minute before querying for new logs again. + #scan_frequency: 1m + + # The maximum duration of AWS API can take. If it exceeds the timeout, AWS API + # will be interrupted. + # The default AWS API timeout for a message is 120 seconds. + # The minimum is 0 seconds. + #api_timeout: 120s + + # This is used to sleep between AWS `FilterLogEvents` API calls inside the same + # collection period. + #api_sleep: 200ms + # =========================== Filebeat autodiscover ============================ # Autodiscover allows you to detect changes in the system and spawn new modules From f2a401917cc008bda80dd1c1d40bf620d724012c Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 24 Jun 2021 08:32:26 -0600 Subject: [PATCH 4/4] add condition when log_group_name_prefix is not given --- x-pack/filebeat/input/awscloudwatch/input.go | 31 ++++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 29b937d6fb35..7eb14147d313 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -133,25 +133,30 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con func (in *awsCloudWatchInput) Run() { cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AwsConfig.Endpoint, "cloudwatchlogs", in.config.RegionName, in.awsConfig) svc := cloudwatchlogs.New(cwConfig) + + var logGroupNames []string + var err error if in.config.LogGroupNamePrefix != "" { - logGroupNames, err := in.getLogGroupNames(svc) + logGroupNames, err = in.getLogGroupNames(svc) if err != nil { in.logger.Error("getLogGroupNames failed: ", err) return } + } else { + logGroupNames = []string{in.config.LogGroupName} + } - for _, logGroup := range logGroupNames { - in.config.LogGroupName = logGroup - in.workerOnce.Do(func() { - in.workerWg.Add(1) - go func() { - in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", in.config.LogGroupName) - defer in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", in.config.LogGroupName) - defer in.workerWg.Done() - in.run() - }() - }) - } + for _, logGroup := range logGroupNames { + in.config.LogGroupName = logGroup + in.workerOnce.Do(func() { + in.workerWg.Add(1) + go func() { + in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", in.config.LogGroupName) + defer in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", in.config.LogGroupName) + defer in.workerWg.Done() + in.run() + }() + }) } }