Skip to content

Commit

Permalink
[8.5](backport #34075) [azure-eventhub input] Switch the run EPH run …
Browse files Browse the repository at this point in the history
…mode to non-blocking (#34100)

* [azure-eventhub input] Switch the run EPH run mode to non-blocking (#34075)

* Start EPH as non-blocking

With this change, the first call to the input Start() method will set up and start the EPH worker in a non-blocking fashion. The EPH worker will continue until the Filebeat calls the Stop() method to tear it down for live reloading.

(cherry picked from commit 8f0db84)

Co-authored-by: Maurizio Branca <maurizio.branca@gmail.com>
  • Loading branch information
mergify[bot] and zmoog authored Dec 23, 2022
1 parent b3f7316 commit 072859c
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 79 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]
- Fix handling of error in states in direct aws-s3 listing input {issue}33513[33513] {pull}33722[33722]
- Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654]
- [azure-eventhub input] Switch the run EPH run mode to non-blocking {pull}34075[34075]

*Heartbeat*

Expand Down
29 changes: 20 additions & 9 deletions x-pack/filebeat/input/azureeventhub/eph.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ var environments = map[string]azure.Environment{
azure.USGovernmentCloud.ResourceManagerEndpoint: azure.USGovernmentCloud,
}

// runWithEPH will consume ingested events using the Event Processor Host (EPH) https://github.com/Azure/azure-event-hubs-go#event-processor-host, https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host
// runWithEPH will consume ingested events using the Event Processor Host (EPH).
//
// To learn more, check the following resources:
// - https://github.com/Azure/azure-event-hubs-go#event-processor-host
// - https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host
//
func (a *azureInput) runWithEPH() error {
// create a new Azure Storage Leaser / Checkpointer
cred, err := azblob.NewSharedKeyCredential(a.config.SAName, a.config.SAKey)
Expand All @@ -40,9 +45,12 @@ func (a *azureInput) runWithEPH() error {
}
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, a.config.SAName, a.config.SAContainer, env)
if err != nil {
a.log.Errorw("error creating storage leaser checkpointer", "error", err)
return err
}
// adding a nil EventProcessorHostOption will break the code, this is why a condition is added and a.processor is assigned

// adding a nil EventProcessorHostOption will break the code,
// this is why a condition is added and a.processor is assigned.
if a.config.ConsumerGroup != "" {
a.processor, err = eph.NewFromConnectionString(
a.workerCtx,
Expand All @@ -60,6 +68,7 @@ func (a *azureInput) runWithEPH() error {
eph.WithNoBanner())
}
if err != nil {
a.log.Errorw("error creating processor", "error", err)
return err
}

Expand All @@ -77,23 +86,25 @@ func (a *azureInput) runWithEPH() error {
return onEventErr
})
if err != nil {
a.log.Errorw("error registering handler", "error", err)
return err
}
a.log.Infof("handler id: %q is running\n", handlerID)

// unregister a handler to stop that handler from receiving events
// processor.UnregisterHandler(ctx, handleID)
a.log.Infof("handler id: %q is registered\n", handlerID)

// start handling messages from all of the partitions balancing across multiple consumers
err = a.processor.Start(a.workerCtx)
// Start handling messages from all of the partitions balancing across
// multiple consumers.
// The processor can be stopped by calling `Close()` on the processor.
err = a.processor.StartNonBlocking(a.workerCtx)
if err != nil {
a.log.Errorw("error starting the processor", "error", err)
return err
}

return nil
}

func getAzureEnvironment(overrideResManager string) (azure.Environment, error) {
// if no overrride is set then the azure public cloud is used
// if no override is set then the azure public cloud is used
if overrideResManager == "" || overrideResManager == "<no value>" {
return azure.PublicCloud, nil
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/azureeventhub/eph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestGetAzureEnvironment(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, env, azure.GermanCloud)
resMan = "http://management.invalidhybrid.com/"
env, err = getAzureEnvironment(resMan)
_, err = getAzureEnvironment(resMan)
assert.Errorf(t, err, "invalid character 'F' looking for beginning of value")
resMan = "<no value>"
env, err = getAzureEnvironment(resMan)
Expand Down
94 changes: 28 additions & 66 deletions x-pack/filebeat/input/azureeventhub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"sync"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -42,9 +40,7 @@ type azureInput struct {
workerCtx context.Context // worker goroutine context. It's cancelled when the input stops or the worker exits.
workerCancel context.CancelFunc // used to signal that the worker should stop.
workerOnce sync.Once // guarantees that the worker goroutine is only started once.
workerWg sync.WaitGroup // waits on worker goroutine.
processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option
hub *eventhub.Hub // hub will be assigned
}

const (
Expand All @@ -54,7 +50,7 @@ const (
func init() {
err := input.Register(inputName, NewInput)
if err != nil {
panic(errors.Wrapf(err, "failed to register %v input", inputName))
panic(fmt.Errorf("failed to register %v input: %w", inputName, err))
}
}

Expand All @@ -66,7 +62,7 @@ func NewInput(
) (input.Input, error) {
var config azureInputConfig
if err := cfg.Unpack(&config); err != nil {
return nil, errors.Wrapf(err, "reading %s input config", inputName)
return nil, fmt.Errorf("reading %s input config: %w", inputName, err)
}

inputCtx, cancelInputCtx := context.WithCancel(context.Background())
Expand Down Expand Up @@ -98,74 +94,40 @@ func NewInput(
return in, nil
}

// Run starts the input worker then returns. Only the first invocation
// will ever start the worker.
// Run starts the `azure-eventhub` input and then returns.
//
// The first invocation will start an input worker. All subsequent
// invocations will be no-ops.
//
// The input worker will continue fetching data from the event hub until
// the input Runner calls the `Stop()` method.
func (a *azureInput) Run() {
// `Run` is invoked periodically by the input Runner. The `sync.Once`
// guarantees that we only start the worker once during the first
// invocation.
a.workerOnce.Do(func() {
a.workerWg.Add(1)
go func() {
a.log.Infof("%s input worker has started.", inputName)
defer a.log.Infof("%s input worker has stopped.", inputName)
defer a.workerWg.Done()
defer a.workerCancel()
err := a.runWithEPH()
if err != nil {
a.log.Error(err)
return
}
}()
a.log.Infof("%s input worker is starting.", inputName)
err := a.runWithEPH()
if err != nil {
a.log.Errorw("error starting the input worker", "error", err)
return
}
a.log.Infof("%s input worker has started.", inputName)
})
}

// run will run the input with the non-eph version, this option will be available once a more reliable storage is in place, it is curently using an in-memory storage
//func (a *azureInput) run() error {
// var err error
// a.hub, err = eventhub.NewHubFromConnectionString(fmt.Sprintf("%s%s%s", a.config.ConnectionString, eventHubConnector, a.config.EventHubName))
// if err != nil {
// return err
// }
// // listen to each partition of the Event Hub
// runtimeInfo, err := a.hub.GetRuntimeInformation(a.workerCtx)
// if err != nil {
// return err
// }
//
// for _, partitionID := range runtimeInfo.PartitionIDs {
// // Start receiving messages
// handler := func(c context.Context, event *eventhub.Event) error {
// a.log.Info(string(event.Data))
// return a.processEvents(event, partitionID)
// }
// var err error
// // sending a nill ReceiveOption will throw an exception
// if a.config.ConsumerGroup != "" {
// _, err = a.hub.Receive(a.workerCtx, partitionID, handler, eventhub.ReceiveWithConsumerGroup(a.config.ConsumerGroup))
// } else {
// _, err = a.hub.Receive(a.workerCtx, partitionID, handler)
// }
// if err != nil {
// return err
// }
// }
// return nil
//}

// Stop stops TCP server
// Stop stops `azure-eventhub` input.
func (a *azureInput) Stop() {
if a.hub != nil {
err := a.hub.Close(a.workerCtx)
if err != nil {
a.log.Errorw(fmt.Sprintf("error while closing eventhub"), "error", err)
}
}
if a.processor != nil {
err := a.processor.Close(a.workerCtx)
// Tells the processor to stop processing events and release all
// resources (like scheduler, leaser, checkpointer, and client).
err := a.processor.Close(context.Background())
if err != nil {
a.log.Errorw(fmt.Sprintf("error while closing eventhostprocessor"), "error", err)
a.log.Errorw("error while closing eventhostprocessor", "error", err)
}
}

a.workerCancel()
a.workerWg.Wait()
}

// Wait stop the current server
Expand All @@ -183,9 +145,9 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo
}
messages := a.parseMultipleMessages(event.Data)
for _, msg := range messages {
azure.Put("offset", event.SystemProperties.Offset)
azure.Put("sequence_number", event.SystemProperties.SequenceNumber)
azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime)
_, _ = azure.Put("offset", event.SystemProperties.Offset)
_, _ = azure.Put("sequence_number", event.SystemProperties.SequenceNumber)
_, _ = azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime)
ok := a.outlet.OnEvent(beat.Event{
Timestamp: timestamp,
Fields: mapstr.M{
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/azureeventhub/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ func TestParseMultipleMessages(t *testing.T) {
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}"
msgs := []string{
fmt.Sprintf("{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
fmt.Sprintf("{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
fmt.Sprintf("{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
}
input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))}
messages := input.parseMultipleMessages([]byte(msg))
Expand Down

0 comments on commit 072859c

Please sign in to comment.