diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index 23af7ec2eb2..08fa63e685d 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -296,6 +296,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Disable event normalization for netflow input {pull}40635[40635]
- Allow attribute selection in the Active Directory entity analytics provider. {issue}40482[40482] {pull}40662[40662]
- Improve error quality when CEL program does not correctly return an events array. {pull}40580[40580]
+- Add support for Crowdstrike streaming API to the streaming input. {issue}40264[40264] {pull}40838[40838]
*Auditbeat*
diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc
index 4dc4e426dd3..b80deda9c77 100644
--- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc
+++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc
@@ -12,9 +12,9 @@ experimental[]
Streaming
++++
-The `streaming` input reads messages from a streaming data source, for example a websocket server. This input uses the `CEL engine` and the `mito` library interally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but deviates in the way the messages are read and processed. Currently only websocket server or API endpoints are supported.
+The `streaming` input reads messages from a streaming data source, for example a websocket server. This input uses the `CEL engine` and the `mito` library internally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but differs in the way the messages are read and processed. Currently websocket server or API endpoints, and the Crowdstrike Falcon streaming API are supported.
-This input supports:
+The websocket streaming input supports:
* Auth
** Basic
@@ -23,6 +23,10 @@ This input supports:
NOTE: The `streaming` input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.
+The Crowdstrike streaming input requires OAuth2.0 as described in the Crowdstrike documentation for the API. When using the Crowdstrike streaming type, the `crowdstrike_app_id` configuration field must be set. This field specifies the `appId` parameter sent to the Crowdstrike API. See the Crowdstrike documentation for details.
+
+The `stream_type` configuration field specifies which type of streaming input to use, "websocket" or "crowdstrike". If it is not set, the input defaults to websocket streaming .
+
==== Execution
The execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library.
@@ -64,7 +68,7 @@ The field could be an array or a single object that will be treated as an array
<2> If `cursor` is present it must be either be a single object or an array with the same length as events; each element _i_ of the `cursor` will be the details for obtaining the events at and beyond event _i_ in the `events` array. If the `cursor` is a single object, it will be the details for obtaining events after the last event in the `events` array and will only be retained on successful publication of all the events in the `events` array.
-Example configuration:
+Example configurations:
["source","yaml",subs="attributes"]
----
@@ -80,13 +84,36 @@ filebeat.inputs:
})
----
+["source","yaml",subs="attributes"]
+----
+filebeat.inputs:
+# Read and process events from the Crowdstrike Falcon Hose API
+- type: streaming
+ stream_type: crowdstrike
+ url: https://api.crowdstrike.com/sensors/entities/datafeed/v2
+ auth:
+ client_id: a23fcea2643868ef1a41565a1a8a1c7c
+ client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
+ token_url: https://api.crowdstrike.com/oauth2/token
+ crowdstrike_app_id: my_app_id
+ program: |
+ state.response.decode_json().as(body,{
+ "events": [body],
+ ?"cursor": has(body.?metadata.offset) ?
+ optional.of({"offset": body.metadata.offset})
+ :
+ optional.none(),
+ })
+----
+
==== Debug state logging
The Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation.
This will include any sensitive or secret information kept in the `state` object, and so DEBUG level logging should not be used in production when sensitive information is retained in the `state` object. See <> configuration parameters for settings to exclude sensitive fields from DEBUG logs.
==== Authentication
-The Websocket input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.
+
+The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.
Example configurations with authentication:
@@ -126,6 +153,19 @@ filebeat.inputs:
url: wss://localhost:443/_stream
----
+The crowdstrike streaming input requires OAuth2.0 authentication using a client ID, client secret and a token URL. These values are not exposed to the `state` object. OAuth2.0 scopes and endpoint parameters are available via the `auth.scopes` and `auth.endpoint_params` config parameters.
+
+["source","yaml",subs="attributes"]
+----
+filebeat.inputs:
+- type: streaming
+ stream_type: crowdstrike
+ auth:
+ client_id: a23fcea2643868ef1a41565a1a8a1c7c
+ client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
+ token_url: https://api.crowdstrike.com/oauth2/token
+----
+
[[input-state-streaming]]
==== Input state
@@ -137,6 +177,12 @@ The state must contain a `response` map and may contain any object the user wish
The `streaming` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.
+[[stream_type-streaming]]
+[float]
+==== `stream_type`
+
+The flavor of streaming to use. This may be either "websocket", "crowdstrike", or unset. If the field is unset, websocket streaming is used.
+
[[program-streaming]]
[float]
==== `program`
diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go
index bcec48a9568..67ee6e1eb31 100644
--- a/x-pack/filebeat/input/streaming/config.go
+++ b/x-pack/filebeat/input/streaming/config.go
@@ -13,9 +13,14 @@ import (
"time"
"github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/elastic-agent-libs/transport/httpcommon"
)
type config struct {
+ // Type is the type of the stream being followed. The
+ // zero value indicates websocket.
+ Type string `config:"stream_type"`
+
// URLProgram is the CEL program to be run once before to prep the url.
URLProgram string `config:"url_program"`
// Program is the CEL program to be run for each polling.
@@ -36,6 +41,13 @@ type config struct {
Redact *redact `config:"redact"`
// Retry is the configuration for retrying failed connections.
Retry *retry `config:"retry"`
+
+ Transport httpcommon.HTTPTransportSettings `config:",inline"`
+
+ // CrowdstrikeAppID is the value used to set the
+ // appId request parameter in the FalconHose stream
+ // discovery request.
+ CrowdstrikeAppID string `config:"crowdstrike_app_id"`
}
type redact struct {
@@ -60,6 +72,8 @@ type authConfig struct {
BearerToken string `config:"bearer_token"`
// Basic auth token to use for authentication.
BasicToken string `config:"basic_token"`
+
+ OAuth2 oAuth2Config `config:",inline"`
}
type customAuthConfig struct {
@@ -67,6 +81,16 @@ type customAuthConfig struct {
Header string `config:"header"`
Value string `config:"value"`
}
+
+type oAuth2Config struct {
+ // common oauth fields
+ ClientID string `config:"client_id"`
+ ClientSecret string `config:"client_secret"`
+ EndpointParams map[string][]string `config:"endpoint_params"`
+ Scopes []string `config:"scopes"`
+ TokenURL string `config:"token_url"`
+}
+
type urlConfig struct {
*url.URL
}
@@ -81,6 +105,12 @@ func (u *urlConfig) Unpack(in string) error {
}
func (c config) Validate() error {
+ switch c.Type {
+ case "", "websocket", "crowdstrike":
+ default:
+ return fmt.Errorf("unknown stream type: %s", c.Type)
+ }
+
if c.Redact == nil {
logp.L().Named("input.websocket").Warn("missing recommended 'redact' configuration: " +
"see documentation for details: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html#_redact")
@@ -100,7 +130,7 @@ func (c config) Validate() error {
return fmt.Errorf("failed to check program: %w", err)
}
}
- err = checkURLScheme(c.URL)
+ err = checkURLScheme(c)
if err != nil {
return err
}
@@ -116,11 +146,23 @@ func (c config) Validate() error {
return nil
}
-func checkURLScheme(url *urlConfig) error {
- switch url.Scheme {
- case "ws", "wss":
- return nil
+func checkURLScheme(c config) error {
+ switch c.Type {
+ case "", "websocket":
+ switch c.URL.Scheme {
+ case "ws", "wss":
+ return nil
+ default:
+ return fmt.Errorf("unsupported scheme: %s", c.URL.Scheme)
+ }
+ case "crowdstrike":
+ switch c.URL.Scheme {
+ case "http", "https":
+ return nil
+ default:
+ return fmt.Errorf("unsupported scheme: %s", c.URL.Scheme)
+ }
default:
- return fmt.Errorf("unsupported scheme: %s", url.Scheme)
+ return fmt.Errorf("unknown stream type: %s", c.Type)
}
}
diff --git a/x-pack/filebeat/input/streaming/crowdstrike.go b/x-pack/filebeat/input/streaming/crowdstrike.go
new file mode 100644
index 00000000000..699b99b3d8d
--- /dev/null
+++ b/x-pack/filebeat/input/streaming/crowdstrike.go
@@ -0,0 +1,288 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package streaming
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "net/url"
+ "strconv"
+ "time"
+
+ "golang.org/x/oauth2/clientcredentials"
+
+ inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
+ "github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/elastic-agent-libs/mapstr"
+ "github.com/elastic/elastic-agent-libs/transport/httpcommon"
+)
+
+type falconHoseStream struct {
+ processor
+
+ id string
+ cfg config
+ cursor map[string]any
+
+ creds *clientcredentials.Config
+ discoverURL string
+ plainClient *http.Client
+
+ time func() time.Time
+}
+
+// NewFalconHoseFollower performs environment construction including CEL
+// program and regexp compilation, and input metrics set-up for a Crowdstrike
+// FalconHose stream follower.
+func NewFalconHoseFollower(ctx context.Context, id string, cfg config, cursor map[string]any, pub inputcursor.Publisher, log *logp.Logger, now func() time.Time) (StreamFollower, error) {
+ s := falconHoseStream{
+ id: id,
+ cfg: cfg,
+ cursor: cursor,
+ processor: processor{
+ ns: "falcon_hose",
+ pub: pub,
+ log: log,
+ redact: cfg.Redact,
+ metrics: newInputMetrics(id),
+ },
+ creds: &clientcredentials.Config{
+ ClientID: cfg.Auth.OAuth2.ClientID,
+ ClientSecret: cfg.Auth.OAuth2.ClientSecret,
+ TokenURL: cfg.Auth.OAuth2.TokenURL,
+ Scopes: cfg.Auth.OAuth2.Scopes,
+ EndpointParams: cfg.Auth.OAuth2.EndpointParams,
+ },
+ }
+ s.metrics.url.Set(cfg.URL.String())
+ s.metrics.errorsTotal.Set(0)
+
+ patterns, err := regexpsFromConfig(cfg)
+ if err != nil {
+ s.metrics.errorsTotal.Inc()
+ s.Close()
+ return nil, err
+ }
+
+ s.prg, s.ast, err = newProgram(ctx, cfg.Program, root, patterns, log)
+ if err != nil {
+ s.metrics.errorsTotal.Inc()
+ s.Close()
+ return nil, err
+ }
+
+ u, err := url.Parse(s.cfg.URL.String())
+ if err != nil {
+ return nil, fmt.Errorf("failed parse url: %w", err)
+ }
+ query := url.Values{"appId": []string{cfg.CrowdstrikeAppID}}
+ u.RawQuery = query.Encode()
+ s.discoverURL = u.String()
+
+ s.plainClient, err = cfg.Transport.Client(httpcommon.WithAPMHTTPInstrumentation())
+ if err != nil {
+ return nil, err
+ }
+
+ return &s, nil
+}
+
+// FollowStream receives, processes and publishes events from the subscribed
+// FalconHose stream.
+func (s *falconHoseStream) FollowStream(ctx context.Context) error {
+ state := s.cfg.State
+ if state == nil {
+ state = make(map[string]any)
+ }
+ if s.cursor != nil {
+ state["cursor"] = s.cursor
+ }
+
+ cli := s.creds.Client(ctx)
+ // Normally we would not bother with this, but since connections
+ // are in keep-alive in normal operation, let's clean up.
+ defer cli.CloseIdleConnections()
+
+ var err error
+ for {
+ state, err = s.followSession(ctx, cli, state)
+ if err != nil {
+ if !errors.Is(err, Warning{}) {
+ if errors.Is(err, context.Canceled) {
+ return nil
+ }
+ s.metrics.errorsTotal.Inc()
+ return err
+ }
+ s.metrics.errorsTotal.Inc()
+ s.log.Warnw("session warning", "error", err)
+ }
+ }
+}
+
+func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, state map[string]any) (map[string]any, error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.discoverURL, nil)
+ if err != nil {
+ return nil, fmt.Errorf("failed to prepare discover stream request: %w", err)
+ }
+ resp, err := cli.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("failed GET to discover stream: %w", err)
+ }
+ defer resp.Body.Close()
+
+ dec := json.NewDecoder(resp.Body)
+
+ type resource struct {
+ FeedURL string `json:"dataFeedURL"`
+ Session struct {
+ Token string `json:"token"`
+ Expires time.Time `json:"expiration"`
+ } `json:"sessionToken"`
+ RefreshURL string `json:"refreshActiveSessionURL"`
+ RefreshAfter int `json:"refreshActiveSessionInterval"`
+ }
+ var body struct {
+ Resources []resource `json:"resources"`
+ Meta map[string]any `json:"meta"`
+ }
+ err = dec.Decode(&body)
+ if err != nil {
+ return state, Warning{fmt.Errorf("failed to decode discover body: %w", err)}
+ }
+ s.log.Debugw("stream discover metadata", "meta", mapstr.M(body.Meta))
+
+ var offset int
+ if cursor, ok := state["cursor"].(map[string]any); ok {
+ switch off := cursor["offset"].(type) {
+ case int:
+ offset = off
+ case float64:
+ offset = int(off)
+ }
+ }
+
+ for _, r := range body.Resources {
+ refreshAfter := time.Duration(r.RefreshAfter) * time.Second
+ go func() {
+ const grace = 5 * time.Minute
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(refreshAfter - grace):
+ req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.RefreshURL, nil)
+ if err != nil {
+ s.metrics.errorsTotal.Inc()
+ s.log.Errorw("failed to prepare refresh stream request", "error", err)
+ return
+ }
+ req.Header.Set("Content-Type", "application/json")
+ resp, err := cli.Do(req)
+ if err != nil {
+ s.metrics.errorsTotal.Inc()
+ s.log.Errorw("failed to refresh stream connection", "error", err)
+ return
+ }
+ err = resp.Body.Close()
+ if err != nil {
+ s.metrics.errorsTotal.Inc()
+ s.log.Warnw("failed to close refresh response body", "error", err)
+ }
+ }
+ }
+ }()
+
+ if offset > 0 {
+ feedURL, err := url.Parse(r.FeedURL)
+ if err != nil {
+ log.Fatalf("failed to parse feed url: %v", err)
+ }
+ feedQuery, err := url.ParseQuery(feedURL.RawQuery)
+ if err != nil {
+ log.Fatalf("failed to parse feed query: %v", err)
+ }
+ feedQuery.Set("offset", strconv.Itoa(offset))
+ feedURL.RawQuery = feedQuery.Encode()
+ r.FeedURL = feedURL.String()
+ }
+
+ req, err := http.NewRequestWithContext(ctx, "GET", r.FeedURL, nil)
+ if err != nil {
+ return state, Warning{fmt.Errorf("failed to make firehose request to %s: %w", r.FeedURL, err)}
+ }
+ req.Header = make(http.Header)
+ req.Header.Add("Accept", "application/json")
+ req.Header.Add("Authorization", "Token "+r.Session.Token)
+
+ resp, err := s.plainClient.Do(req)
+ if err != nil {
+ return state, Warning{fmt.Errorf("failed to get firehose from %s: %w", r.FeedURL, err)}
+ }
+ defer resp.Body.Close()
+
+ dec := json.NewDecoder(resp.Body)
+ for {
+ var msg json.RawMessage
+ err := dec.Decode(&msg)
+ if err != nil {
+ s.metrics.errorsTotal.Inc()
+ if err == io.EOF {
+ s.log.Info("stream ended, restarting")
+ return state, nil
+ }
+ return state, Warning{fmt.Errorf("error decoding event: %w", err)}
+ }
+ s.metrics.receivedBytesTotal.Add(uint64(len(msg)))
+ state["response"] = []byte(msg)
+ s.log.Debugw("received firehose message", logp.Namespace("falcon_hose"), debugMsg(msg))
+ err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
+ if err != nil {
+ s.log.Errorw("failed to process and publish data", "error", err)
+ return nil, err
+ }
+ }
+ }
+ return state, nil
+}
+
+// Warning is a warning-only error.
+type Warning struct {
+ error
+}
+
+// Is returns true if target is a Warning.
+func (e Warning) Is(target error) bool {
+ _, ok := target.(Warning)
+ return ok
+}
+
+func (e Warning) Unwrap() error {
+ return e.error
+}
+
+// now is time.Now with a modifiable time source.
+func (s *falconHoseStream) now() time.Time {
+ if s.time == nil {
+ return time.Now()
+ }
+ return s.time()
+}
+
+func (s *falconHoseStream) Close() error {
+ s.metrics.Close()
+ return nil
+}
+
+type debugMsg []byte
+
+func (b debugMsg) String() string {
+ return string(b)
+}
diff --git a/x-pack/filebeat/input/streaming/crowdstrike_test.go b/x-pack/filebeat/input/streaming/crowdstrike_test.go
new file mode 100644
index 00000000000..775cb4a88cd
--- /dev/null
+++ b/x-pack/filebeat/input/streaming/crowdstrike_test.go
@@ -0,0 +1,108 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package streaming
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "net/url"
+ "os"
+ "testing"
+ "time"
+
+ cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/elastic-agent-libs/logp"
+)
+
+var (
+ timeout = flag.Duration("crowdstrike_timeout", time.Minute, "time to allow Crowdstrike FalconHose test to run")
+ offset = flag.Int("crowdstrike_offset", -1, "offset into stream (negative to ignore)")
+)
+
+func TestCrowdstrikeFalconHose(t *testing.T) {
+ logp.TestingSetup()
+ logger := logp.L()
+
+ feedURL, ok := os.LookupEnv("CROWDSTRIKE_URL")
+ if !ok {
+ t.Skip("okta tests require ${CROWDSTRIKE_URL} to be set")
+ }
+ tokenURL, ok := os.LookupEnv("CROWDSTRIKE_TOKEN_URL")
+ if !ok {
+ t.Skip("okta tests require ${CROWDSTRIKE_TOKEN_URL} to be set")
+ }
+ clientID, ok := os.LookupEnv("CROWDSTRIKE_CLIENT_ID")
+ if !ok {
+ t.Skip("okta tests require ${CROWDSTRIKE_CLIENT_ID} to be set")
+ }
+ clientSecret, ok := os.LookupEnv("CROWDSTRIKE_CLIENT_SECRET")
+ if !ok {
+ t.Skip("okta tests require ${CROWDSTRIKE_CLIENT_SECRET} to be set")
+ }
+ appID, ok := os.LookupEnv("CROWDSTRIKE_APPID")
+ if !ok {
+ t.Skip("okta tests require ${CROWDSTRIKE_APPID} to be set")
+ }
+
+ u, err := url.Parse(feedURL)
+ if err != nil {
+ t.Fatalf("unexpected error parsing feed url: %v", err)
+ }
+ cfg := config{
+ Type: "crowdstrike",
+ URL: &urlConfig{u},
+ Program: `
+ state.response.decode_json().as(body,{
+ "events": [body],
+ ?"cursor": has(body.?metadata.offset) ?
+ optional.of({"offset": body.metadata.offset})
+ :
+ optional.none(),
+ })`,
+ Auth: authConfig{
+ OAuth2: oAuth2Config{
+ ClientID: clientID,
+ ClientSecret: clientSecret,
+ TokenURL: tokenURL,
+ },
+ },
+ CrowdstrikeAppID: appID,
+ }
+
+ err = cfg.Validate()
+ if err != nil {
+ t.Fatalf("unexpected error validating config: %v", err)
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), *timeout)
+ defer cancel()
+ var cursor map[string]any
+ if *offset >= 0 {
+ cursor = map[string]any{"offset": *offset}
+ }
+ s, err := NewFalconHoseFollower(ctx, "crowdstrike_testing", cfg, cursor, &testPublisher{logger}, logger, time.Now)
+ if err != nil {
+ t.Fatalf("unexpected error constructing follower: %v", err)
+ }
+ err = s.FollowStream(ctx)
+ if errors.Is(err, context.DeadlineExceeded) {
+ err = nil
+ }
+ if err != nil {
+ t.Errorf("unexpected error following stream: %v", err)
+ }
+}
+
+type testPublisher struct {
+ log *logp.Logger
+}
+
+var _ cursor.Publisher = testPublisher{}
+
+func (p testPublisher) Publish(e beat.Event, cursor any) error {
+ p.log.Infow("publish", "event", e.Fields, "cursor", cursor)
+ return nil
+}
diff --git a/x-pack/filebeat/input/streaming/input.go b/x-pack/filebeat/input/streaming/input.go
index 01106c94578..12a362625bf 100644
--- a/x-pack/filebeat/input/streaming/input.go
+++ b/x-pack/filebeat/input/streaming/input.go
@@ -88,7 +88,18 @@ func (i input) run(env v2.Context, src *source, cursor map[string]any, pub input
log := env.Logger.With("input_url", cfg.URL)
ctx := ctxtool.FromCanceller(env.Cancelation)
- s, err := NewWebsocketFollower(ctx, env.ID, cfg, cursor, pub, log, i.time)
+ var (
+ s StreamFollower
+ err error
+ )
+ // When and if the number of followers increases, this may
+ // want to be a registry. Until then, let's keep this simple.
+ switch cfg.Type {
+ case "", "websocket":
+ s, err = NewWebsocketFollower(ctx, env.ID, cfg, cursor, pub, log, i.time)
+ case "crowdstrike":
+ s, err = NewFalconHoseFollower(ctx, env.ID, cfg, cursor, pub, log, i.time)
+ }
if err != nil {
return err
}
@@ -110,7 +121,7 @@ func getURL(ctx context.Context, name, src, url string, state map[string]any, re
return "", err
}
- log.Debugw("cel engine state before url_eval", logp.Namespace("websocket"), "state", redactor{state: state, cfg: redaction})
+ log.Debugw("cel engine state before url_eval", logp.Namespace(name), "state", redactor{state: state, cfg: redaction})
start := now().In(time.UTC)
url, err = evalURLWith(ctx, url_prg, ast, state, start)
log.Debugw("url_eval result", logp.Namespace(name), "modified_url", url)