Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/streaming: add falcon hose stream follower #40838

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ CHANGELOG*
/x-pack/filebeat/input/netflow/ @elastic/sec-deployment-and-devices
/x-pack/filebeat/input/o365audit/ @elastic/security-service-integrations
/x-pack/filebeat/input/salesforce @elastic/obs-infraobs-integrations
/x-pack/filebeat/input/websocket/ @elastic/security-service-integrations
/x-pack/filebeat/input/streaming/ @elastic/security-service-integrations
/x-pack/filebeat/module/activemq @elastic/obs-infraobs-integrations
/x-pack/filebeat/module/aws @elastic/obs-cloud-monitoring
/x-pack/filebeat/module/awsfargate @elastic/obs-cloud-monitoring
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Improve error quality when CEL program does not correctly return an events array. {pull}40580[40580]
- Add `use_kubeadm` config option for filebeat (both filbeat.input and autodiscovery) in order to toggle kubeadm-config api requests {pull}40301[40301]
- Make HTTP library function inclusion non-conditional in CEL input. {pull}40912[40912]
- Add support for Crowdstrike streaming API to the streaming input. {issue}40264[40264] {pull}40838[40838]

*Auditbeat*

Expand Down
54 changes: 50 additions & 4 deletions x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ experimental[]
<titleabbrev>Streaming</titleabbrev>
++++

The `streaming` input reads messages from a streaming data source, for example a websocket server. This input uses the `CEL engine` and the `mito` library interally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but deviates in the way the messages are read and processed. Currently only websocket server or API endpoints are supported.
The `streaming` input reads messages from a streaming data source, for example a websocket server. This input uses the `CEL engine` and the `mito` library internally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but differs in the way the messages are read and processed. Currently websocket server or API endpoints, and the Crowdstrike Falcon streaming API are supported.

This input supports:
The websocket streaming input supports:

* Auth
** Basic
Expand All @@ -23,6 +23,10 @@ This input supports:

NOTE: The `streaming` input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.

The Crowdstrike streaming input requires OAuth2.0 as described in the Crowdstrike documentation for the API. When using the Crowdstrike streaming type, the `crowdstrike_app_id` configuration field must be set. This field specifies the `appId` parameter sent to the Crowdstrike API. See the Crowdstrike documentation for details.

The `stream_type` configuration field specifies which type of streaming input to use, "websocket" or "crowdstrike". If it is not set, the input defaults to websocket streaming .

==== Execution

The execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library.
Expand Down Expand Up @@ -64,7 +68,7 @@ The field could be an array or a single object that will be treated as an array
<2> If `cursor` is present it must be either be a single object or an array with the same length as events; each element _i_ of the `cursor` will be the details for obtaining the events at and beyond event _i_ in the `events` array. If the `cursor` is a single object, it will be the details for obtaining events after the last event in the `events` array and will only be retained on successful publication of all the events in the `events` array.


Example configuration:
Example configurations:

["source","yaml",subs="attributes"]
----
Expand All @@ -80,13 +84,36 @@ filebeat.inputs:
})
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
# Read and process events from the Crowdstrike Falcon Hose API
- type: streaming
stream_type: crowdstrike
url: https://api.crowdstrike.com/sensors/entities/datafeed/v2
auth:
client_id: a23fcea2643868ef1a41565a1a8a1c7c
client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
token_url: https://api.crowdstrike.com/oauth2/token
crowdstrike_app_id: my_app_id
program: |
state.response.decode_json().as(body,{
"events": [body],
?"cursor": has(body.?metadata.offset) ?
optional.of({"offset": body.metadata.offset})
:
optional.none(),
})
----

==== Debug state logging

The Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation.
This will include any sensitive or secret information kept in the `state` object, and so DEBUG level logging should not be used in production when sensitive information is retained in the `state` object. See <<streaming-state-redact,`redact`>> configuration parameters for settings to exclude sensitive fields from DEBUG logs.

==== Authentication
The Websocket input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.

The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.

Example configurations with authentication:

Expand Down Expand Up @@ -126,6 +153,19 @@ filebeat.inputs:
url: wss://localhost:443/_stream
----

The crowdstrike streaming input requires OAuth2.0 authentication using a client ID, client secret and a token URL. These values are not exposed to the `state` object. OAuth2.0 scopes and endpoint parameters are available via the `auth.scopes` and `auth.endpoint_params` config parameters.

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: streaming
stream_type: crowdstrike
auth:
client_id: a23fcea2643868ef1a41565a1a8a1c7c
client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
token_url: https://api.crowdstrike.com/oauth2/token
----

[[input-state-streaming]]
==== Input state

Expand All @@ -137,6 +177,12 @@ The state must contain a `response` map and may contain any object the user wish
The `streaming` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

[[stream_type-streaming]]
[float]
==== `stream_type`

The flavor of streaming to use. This may be either "websocket", "crowdstrike", or unset. If the field is unset, websocket streaming is used.

[[program-streaming]]
[float]
==== `program`
Expand Down
54 changes: 48 additions & 6 deletions x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ import (
"time"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

type config struct {
// Type is the type of the stream being followed. The
// zero value indicates websocket.
Type string `config:"stream_type"`

// URLProgram is the CEL program to be run once before to prep the url.
URLProgram string `config:"url_program"`
// Program is the CEL program to be run for each polling.
Expand All @@ -36,6 +41,13 @@ type config struct {
Redact *redact `config:"redact"`
// Retry is the configuration for retrying failed connections.
Retry *retry `config:"retry"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`

// CrowdstrikeAppID is the value used to set the
// appId request parameter in the FalconHose stream
// discovery request.
CrowdstrikeAppID string `config:"crowdstrike_app_id"`
}

type redact struct {
Expand All @@ -60,13 +72,25 @@ type authConfig struct {
BearerToken string `config:"bearer_token"`
// Basic auth token to use for authentication.
BasicToken string `config:"basic_token"`

OAuth2 oAuth2Config `config:",inline"`
}

type customAuthConfig struct {
// Custom auth config to use for authentication.
Header string `config:"header"`
Value string `config:"value"`
}

type oAuth2Config struct {
// common oauth fields
ClientID string `config:"client_id"`
ClientSecret string `config:"client_secret"`
EndpointParams map[string][]string `config:"endpoint_params"`
Scopes []string `config:"scopes"`
TokenURL string `config:"token_url"`
}

type urlConfig struct {
*url.URL
}
Expand All @@ -81,6 +105,12 @@ func (u *urlConfig) Unpack(in string) error {
}

func (c config) Validate() error {
switch c.Type {
case "", "websocket", "crowdstrike":
default:
return fmt.Errorf("unknown stream type: %s", c.Type)
}

if c.Redact == nil {
logp.L().Named("input.websocket").Warn("missing recommended 'redact' configuration: " +
"see documentation for details: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html#_redact")
Expand All @@ -100,7 +130,7 @@ func (c config) Validate() error {
return fmt.Errorf("failed to check program: %w", err)
}
}
err = checkURLScheme(c.URL)
err = checkURLScheme(c)
if err != nil {
return err
}
Expand All @@ -116,11 +146,23 @@ func (c config) Validate() error {
return nil
}

func checkURLScheme(url *urlConfig) error {
switch url.Scheme {
case "ws", "wss":
return nil
func checkURLScheme(c config) error {
switch c.Type {
case "", "websocket":
switch c.URL.Scheme {
case "ws", "wss":
return nil
default:
return fmt.Errorf("unsupported scheme: %s", c.URL.Scheme)
}
case "crowdstrike":
switch c.URL.Scheme {
case "http", "https":
return nil
default:
return fmt.Errorf("unsupported scheme: %s", c.URL.Scheme)
}
default:
return fmt.Errorf("unsupported scheme: %s", url.Scheme)
return fmt.Errorf("unknown stream type: %s", c.Type)
}
}
Loading
Loading