diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9cc993ece533..29779e28e1da 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -68,6 +68,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974] - Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996] - Fix handling of non-200/non-429 status codes. {issue}33999[33999] {pull}34002[34002] +- [azure-eventhub input] Switch the run EPH run mode to non-blocking {pull}34075[34075] *Heartbeat* - Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723] diff --git a/x-pack/filebeat/input/azureeventhub/eph.go b/x-pack/filebeat/input/azureeventhub/eph.go index d9584952dbe1..e80b01559cc1 100644 --- a/x-pack/filebeat/input/azureeventhub/eph.go +++ b/x-pack/filebeat/input/azureeventhub/eph.go @@ -27,7 +27,12 @@ var environments = map[string]azure.Environment{ azure.USGovernmentCloud.ResourceManagerEndpoint: azure.USGovernmentCloud, } -// runWithEPH will consume ingested events using the Event Processor Host (EPH) https://github.com/Azure/azure-event-hubs-go#event-processor-host, https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host +// runWithEPH will consume ingested events using the Event Processor Host (EPH). +// +// To learn more, check the following resources: +// - https://github.com/Azure/azure-event-hubs-go#event-processor-host +// - https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host +// func (a *azureInput) runWithEPH() error { // create a new Azure Storage Leaser / Checkpointer cred, err := azblob.NewSharedKeyCredential(a.config.SAName, a.config.SAKey) @@ -40,9 +45,12 @@ func (a *azureInput) runWithEPH() error { } leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, a.config.SAName, a.config.SAContainer, env) if err != nil { + a.log.Errorw("error creating storage leaser checkpointer", "error", err) return err } - // adding a nil EventProcessorHostOption will break the code, this is why a condition is added and a.processor is assigned + + // adding a nil EventProcessorHostOption will break the code, + // this is why a condition is added and a.processor is assigned. if a.config.ConsumerGroup != "" { a.processor, err = eph.NewFromConnectionString( a.workerCtx, @@ -60,6 +68,7 @@ func (a *azureInput) runWithEPH() error { eph.WithNoBanner()) } if err != nil { + a.log.Errorw("error creating processor", "error", err) return err } @@ -77,23 +86,25 @@ func (a *azureInput) runWithEPH() error { return onEventErr }) if err != nil { + a.log.Errorw("error registering handler", "error", err) return err } - a.log.Infof("handler id: %q is running\n", handlerID) - - // unregister a handler to stop that handler from receiving events - // processor.UnregisterHandler(ctx, handleID) + a.log.Infof("handler id: %q is registered\n", handlerID) - // start handling messages from all of the partitions balancing across multiple consumers - err = a.processor.Start(a.workerCtx) + // Start handling messages from all of the partitions balancing across + // multiple consumers. + // The processor can be stopped by calling `Close()` on the processor. + err = a.processor.StartNonBlocking(a.workerCtx) if err != nil { + a.log.Errorw("error starting the processor", "error", err) return err } + return nil } func getAzureEnvironment(overrideResManager string) (azure.Environment, error) { - // if no overrride is set then the azure public cloud is used + // if no override is set then the azure public cloud is used if overrideResManager == "" || overrideResManager == "" { return azure.PublicCloud, nil } diff --git a/x-pack/filebeat/input/azureeventhub/eph_test.go b/x-pack/filebeat/input/azureeventhub/eph_test.go index 609e3a1f94a9..c3fb64b21d36 100644 --- a/x-pack/filebeat/input/azureeventhub/eph_test.go +++ b/x-pack/filebeat/input/azureeventhub/eph_test.go @@ -40,7 +40,7 @@ func TestGetAzureEnvironment(t *testing.T) { assert.NoError(t, err) assert.Equal(t, env, azure.GermanCloud) resMan = "http://management.invalidhybrid.com/" - env, err = getAzureEnvironment(resMan) + _, err = getAzureEnvironment(resMan) assert.Errorf(t, err, "invalid character 'F' looking for beginning of value") resMan = "" env, err = getAzureEnvironment(resMan) diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 7e7a1fe604ca..9c3a0d5980aa 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -15,8 +15,6 @@ import ( "sync" "time" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" @@ -42,9 +40,7 @@ type azureInput struct { workerCtx context.Context // worker goroutine context. It's cancelled when the input stops or the worker exits. workerCancel context.CancelFunc // used to signal that the worker should stop. workerOnce sync.Once // guarantees that the worker goroutine is only started once. - workerWg sync.WaitGroup // waits on worker goroutine. processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option - hub *eventhub.Hub // hub will be assigned } const ( @@ -54,7 +50,7 @@ const ( func init() { err := input.Register(inputName, NewInput) if err != nil { - panic(errors.Wrapf(err, "failed to register %v input", inputName)) + panic(fmt.Errorf("failed to register %v input: %w", inputName, err)) } } @@ -66,7 +62,7 @@ func NewInput( ) (input.Input, error) { var config azureInputConfig if err := cfg.Unpack(&config); err != nil { - return nil, errors.Wrapf(err, "reading %s input config", inputName) + return nil, fmt.Errorf("reading %s input config: %w", inputName, err) } inputCtx, cancelInputCtx := context.WithCancel(context.Background()) @@ -98,74 +94,40 @@ func NewInput( return in, nil } -// Run starts the input worker then returns. Only the first invocation -// will ever start the worker. +// Run starts the `azure-eventhub` input and then returns. +// +// The first invocation will start an input worker. All subsequent +// invocations will be no-ops. +// +// The input worker will continue fetching data from the event hub until +// the input Runner calls the `Stop()` method. func (a *azureInput) Run() { + // `Run` is invoked periodically by the input Runner. The `sync.Once` + // guarantees that we only start the worker once during the first + // invocation. a.workerOnce.Do(func() { - a.workerWg.Add(1) - go func() { - a.log.Infof("%s input worker has started.", inputName) - defer a.log.Infof("%s input worker has stopped.", inputName) - defer a.workerWg.Done() - defer a.workerCancel() - err := a.runWithEPH() - if err != nil { - a.log.Error(err) - return - } - }() + a.log.Infof("%s input worker is starting.", inputName) + err := a.runWithEPH() + if err != nil { + a.log.Errorw("error starting the input worker", "error", err) + return + } + a.log.Infof("%s input worker has started.", inputName) }) } -// run will run the input with the non-eph version, this option will be available once a more reliable storage is in place, it is curently using an in-memory storage -//func (a *azureInput) run() error { -// var err error -// a.hub, err = eventhub.NewHubFromConnectionString(fmt.Sprintf("%s%s%s", a.config.ConnectionString, eventHubConnector, a.config.EventHubName)) -// if err != nil { -// return err -// } -// // listen to each partition of the Event Hub -// runtimeInfo, err := a.hub.GetRuntimeInformation(a.workerCtx) -// if err != nil { -// return err -// } -// -// for _, partitionID := range runtimeInfo.PartitionIDs { -// // Start receiving messages -// handler := func(c context.Context, event *eventhub.Event) error { -// a.log.Info(string(event.Data)) -// return a.processEvents(event, partitionID) -// } -// var err error -// // sending a nill ReceiveOption will throw an exception -// if a.config.ConsumerGroup != "" { -// _, err = a.hub.Receive(a.workerCtx, partitionID, handler, eventhub.ReceiveWithConsumerGroup(a.config.ConsumerGroup)) -// } else { -// _, err = a.hub.Receive(a.workerCtx, partitionID, handler) -// } -// if err != nil { -// return err -// } -// } -// return nil -//} - -// Stop stops TCP server +// Stop stops `azure-eventhub` input. func (a *azureInput) Stop() { - if a.hub != nil { - err := a.hub.Close(a.workerCtx) - if err != nil { - a.log.Errorw(fmt.Sprintf("error while closing eventhub"), "error", err) - } - } if a.processor != nil { - err := a.processor.Close(a.workerCtx) + // Tells the processor to stop processing events and release all + // resources (like scheduler, leaser, checkpointer, and client). + err := a.processor.Close(context.Background()) if err != nil { - a.log.Errorw(fmt.Sprintf("error while closing eventhostprocessor"), "error", err) + a.log.Errorw("error while closing eventhostprocessor", "error", err) } } + a.workerCancel() - a.workerWg.Wait() } // Wait stop the current server @@ -183,9 +145,9 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo } messages := a.parseMultipleMessages(event.Data) for _, msg := range messages { - azure.Put("offset", event.SystemProperties.Offset) - azure.Put("sequence_number", event.SystemProperties.SequenceNumber) - azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime) + _, _ = azure.Put("offset", event.SystemProperties.Offset) + _, _ = azure.Put("sequence_number", event.SystemProperties.SequenceNumber) + _, _ = azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime) ok := a.outlet.OnEvent(beat.Event{ Timestamp: timestamp, Fields: mapstr.M{ diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index 680b3b37cb25..d13b2889de9f 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -80,9 +80,9 @@ func TestParseMultipleMessages(t *testing.T) { "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}" msgs := []string{ - fmt.Sprintf("{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), - fmt.Sprintf("{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), - fmt.Sprintf("{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", } input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} messages := input.parseMultipleMessages([]byte(msg)) diff --git a/x-pack/libbeat/management/generate_test.go b/x-pack/libbeat/management/generate_test.go index e9acffb4a8b7..fb7f88ff7759 100644 --- a/x-pack/libbeat/management/generate_test.go +++ b/x-pack/libbeat/management/generate_test.go @@ -33,12 +33,7 @@ func TestBareConfig(t *testing.T) { }, } - // First test: this doesn't panic on nil pointer dereference - reloadCfg, err := generateBeatConfig(&rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true}) - require.NoError(t, err, "error in generateBeatConfig") - cfgMap := mapstr.M{} - err = reloadCfg[0].Config.Unpack(&cfgMap) - require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config) + cfgMap := buildConfigMap(t, &rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true}) // Actual checks processorFields := map[string]interface{}{ @@ -84,12 +79,7 @@ func TestGlobalProcessInject(t *testing.T) { }), } - reloadCfg, err := generateBeatConfig(&rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true}) - require.NoError(t, err, "error in generateBeatConfig") - cfgMap := mapstr.M{} - err = reloadCfg[0].Config.Unpack(&cfgMap) - require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config) - + cfgMap := buildConfigMap(t, &rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true}) processorFields := map[string]interface{}{ "add_fields.fields.stream_id": "system/metrics-system.filesystem-default-system", // make sure we're not overwiting anything "add_fields.fields.dataset": "generic", @@ -138,12 +128,7 @@ func TestMBGenerate(t *testing.T) { }, } - reloadCfg, err := generateBeatConfig(&rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true}) - require.NoError(t, err, "error in generateBeatConfig") - cfgMap := mapstr.M{} - err = reloadCfg[0].Config.Unpack(&cfgMap) - require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config) - + cfgMap := buildConfigMap(t, &rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true}) configFields := map[string]interface{}{ "drop_event": nil, "add_fields.fields.stream_id": "system/metrics-system.filesystem-default-system", @@ -236,3 +221,12 @@ func findFieldsInProcessors(t *testing.T, configFields map[string]interface{}, c assert.True(t, gotVal, "got incorrect key for %s, expected %s, got %s", key, val, errStr) } } + +func buildConfigMap(t *testing.T, unitRaw *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) mapstr.M { + reloadCfg, err := generateBeatConfig(unitRaw, agentInfo) + require.NoError(t, err, "error in generateBeatConfig") + cfgMap := mapstr.M{} + err = reloadCfg[0].Config.Unpack(&cfgMap) + require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config) + return cfgMap +}