Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azure-eventhub input] Switch the run EPH run mode to non-blocking #34075

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974]
- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996]
- Fix handling of non-200/non-429 status codes. {issue}33999[33999] {pull}34002[34002]
- [azure-eventhub input] Switch the run EPH run mode to non-blocking {pull}34075[34075]

*Heartbeat*
- Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723]
Expand Down
29 changes: 20 additions & 9 deletions x-pack/filebeat/input/azureeventhub/eph.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ var environments = map[string]azure.Environment{
azure.USGovernmentCloud.ResourceManagerEndpoint: azure.USGovernmentCloud,
}

// runWithEPH will consume ingested events using the Event Processor Host (EPH) https://github.com/Azure/azure-event-hubs-go#event-processor-host, https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host
// runWithEPH will consume ingested events using the Event Processor Host (EPH).
//
// To learn more, check the following resources:
// - https://github.com/Azure/azure-event-hubs-go#event-processor-host
// - https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host
//
func (a *azureInput) runWithEPH() error {
// create a new Azure Storage Leaser / Checkpointer
cred, err := azblob.NewSharedKeyCredential(a.config.SAName, a.config.SAKey)
Expand All @@ -40,9 +45,12 @@ func (a *azureInput) runWithEPH() error {
}
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, a.config.SAName, a.config.SAContainer, env)
if err != nil {
a.log.Errorw("error creating storage leaser checkpointer", "error", err)
return err
}
// adding a nil EventProcessorHostOption will break the code, this is why a condition is added and a.processor is assigned

// adding a nil EventProcessorHostOption will break the code,
// this is why a condition is added and a.processor is assigned.
if a.config.ConsumerGroup != "" {
a.processor, err = eph.NewFromConnectionString(
a.workerCtx,
Expand All @@ -60,6 +68,7 @@ func (a *azureInput) runWithEPH() error {
eph.WithNoBanner())
}
if err != nil {
a.log.Errorw("error creating processor", "error", err)
return err
}

Expand All @@ -77,23 +86,25 @@ func (a *azureInput) runWithEPH() error {
return onEventErr
})
if err != nil {
a.log.Errorw("error registering handler", "error", err)
return err
}
a.log.Infof("handler id: %q is running\n", handlerID)

// unregister a handler to stop that handler from receiving events
// processor.UnregisterHandler(ctx, handleID)
a.log.Infof("handler id: %q is registered\n", handlerID)

// start handling messages from all of the partitions balancing across multiple consumers
err = a.processor.Start(a.workerCtx)
// Start handling messages from all of the partitions balancing across
// multiple consumers.
// The processor can be stopped by calling `Close()` on the processor.
err = a.processor.StartNonBlocking(a.workerCtx)
if err != nil {
a.log.Errorw("error starting the processor", "error", err)
return err
}

return nil
}

func getAzureEnvironment(overrideResManager string) (azure.Environment, error) {
// if no overrride is set then the azure public cloud is used
// if no override is set then the azure public cloud is used
if overrideResManager == "" || overrideResManager == "<no value>" {
return azure.PublicCloud, nil
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/azureeventhub/eph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestGetAzureEnvironment(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, env, azure.GermanCloud)
resMan = "http://management.invalidhybrid.com/"
env, err = getAzureEnvironment(resMan)
_, err = getAzureEnvironment(resMan)
assert.Errorf(t, err, "invalid character 'F' looking for beginning of value")
resMan = "<no value>"
env, err = getAzureEnvironment(resMan)
Expand Down
94 changes: 28 additions & 66 deletions x-pack/filebeat/input/azureeventhub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"sync"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -42,9 +40,7 @@ type azureInput struct {
workerCtx context.Context // worker goroutine context. It's cancelled when the input stops or the worker exits.
workerCancel context.CancelFunc // used to signal that the worker should stop.
workerOnce sync.Once // guarantees that the worker goroutine is only started once.
workerWg sync.WaitGroup // waits on worker goroutine.
processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option
hub *eventhub.Hub // hub will be assigned
}

const (
Expand All @@ -54,7 +50,7 @@ const (
func init() {
err := input.Register(inputName, NewInput)
if err != nil {
panic(errors.Wrapf(err, "failed to register %v input", inputName))
panic(fmt.Errorf("failed to register %v input: %w", inputName, err))
}
}

Expand All @@ -66,7 +62,7 @@ func NewInput(
) (input.Input, error) {
var config azureInputConfig
if err := cfg.Unpack(&config); err != nil {
return nil, errors.Wrapf(err, "reading %s input config", inputName)
return nil, fmt.Errorf("reading %s input config: %w", inputName, err)
}

inputCtx, cancelInputCtx := context.WithCancel(context.Background())
Expand Down Expand Up @@ -98,74 +94,40 @@ func NewInput(
return in, nil
}

// Run starts the input worker then returns. Only the first invocation
// will ever start the worker.
// Run starts the `azure-eventhub` input and then returns.
//
// The first invocation will start an input worker. All subsequent
// invocations will be no-ops.
//
// The input worker will continue fetching data from the event hub until
// the input Runner calls the `Stop()` method.
func (a *azureInput) Run() {
// `Run` is invoked periodically by the input Runner. The `sync.Once`
// guarantees that we only start the worker once during the first
// invocation.
a.workerOnce.Do(func() {
a.workerWg.Add(1)
go func() {
a.log.Infof("%s input worker has started.", inputName)
defer a.log.Infof("%s input worker has stopped.", inputName)
defer a.workerWg.Done()
defer a.workerCancel()
err := a.runWithEPH()
if err != nil {
a.log.Error(err)
return
}
}()
a.log.Infof("%s input worker is starting.", inputName)
err := a.runWithEPH()
if err != nil {
a.log.Errorw("error starting the input worker", "error", err)
return
}
a.log.Infof("%s input worker has started.", inputName)
})
}

// run will run the input with the non-eph version, this option will be available once a more reliable storage is in place, it is curently using an in-memory storage
//func (a *azureInput) run() error {
// var err error
// a.hub, err = eventhub.NewHubFromConnectionString(fmt.Sprintf("%s%s%s", a.config.ConnectionString, eventHubConnector, a.config.EventHubName))
// if err != nil {
// return err
// }
// // listen to each partition of the Event Hub
// runtimeInfo, err := a.hub.GetRuntimeInformation(a.workerCtx)
// if err != nil {
// return err
// }
//
// for _, partitionID := range runtimeInfo.PartitionIDs {
// // Start receiving messages
// handler := func(c context.Context, event *eventhub.Event) error {
// a.log.Info(string(event.Data))
// return a.processEvents(event, partitionID)
// }
// var err error
// // sending a nill ReceiveOption will throw an exception
// if a.config.ConsumerGroup != "" {
// _, err = a.hub.Receive(a.workerCtx, partitionID, handler, eventhub.ReceiveWithConsumerGroup(a.config.ConsumerGroup))
// } else {
// _, err = a.hub.Receive(a.workerCtx, partitionID, handler)
// }
// if err != nil {
// return err
// }
// }
// return nil
//}

// Stop stops TCP server
// Stop stops `azure-eventhub` input.
func (a *azureInput) Stop() {
if a.hub != nil {
err := a.hub.Close(a.workerCtx)
if err != nil {
a.log.Errorw(fmt.Sprintf("error while closing eventhub"), "error", err)
}
}
if a.processor != nil {
err := a.processor.Close(a.workerCtx)
// Tells the processor to stop processing events and release all
// resources (like scheduler, leaser, checkpointer, and client).
err := a.processor.Close(context.Background())
if err != nil {
a.log.Errorw(fmt.Sprintf("error while closing eventhostprocessor"), "error", err)
a.log.Errorw("error while closing eventhostprocessor", "error", err)
}
}

a.workerCancel()
a.workerWg.Wait()
}

// Wait stop the current server
Expand All @@ -183,9 +145,9 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo
}
messages := a.parseMultipleMessages(event.Data)
for _, msg := range messages {
azure.Put("offset", event.SystemProperties.Offset)
azure.Put("sequence_number", event.SystemProperties.SequenceNumber)
azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime)
_, _ = azure.Put("offset", event.SystemProperties.Offset)
_, _ = azure.Put("sequence_number", event.SystemProperties.SequenceNumber)
_, _ = azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime)
ok := a.outlet.OnEvent(beat.Event{
Timestamp: timestamp,
Fields: mapstr.M{
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/azureeventhub/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ func TestParseMultipleMessages(t *testing.T) {
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}"
msgs := []string{
fmt.Sprintf("{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
fmt.Sprintf("{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
fmt.Sprintf("{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
}
input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))}
messages := input.parseMultipleMessages([]byte(msg))
Expand Down
30 changes: 12 additions & 18 deletions x-pack/libbeat/management/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,7 @@ func TestBareConfig(t *testing.T) {
},
}

// First test: this doesn't panic on nil pointer dereference
reloadCfg, err := generateBeatConfig(&rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true})
require.NoError(t, err, "error in generateBeatConfig")
cfgMap := mapstr.M{}
err = reloadCfg[0].Config.Unpack(&cfgMap)
require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config)
cfgMap := buildConfigMap(t, &rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true})

// Actual checks
processorFields := map[string]interface{}{
Expand Down Expand Up @@ -84,12 +79,7 @@ func TestGlobalProcessInject(t *testing.T) {
}),
}

reloadCfg, err := generateBeatConfig(&rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true})
require.NoError(t, err, "error in generateBeatConfig")
cfgMap := mapstr.M{}
err = reloadCfg[0].Config.Unpack(&cfgMap)
require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config)

cfgMap := buildConfigMap(t, &rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true})
processorFields := map[string]interface{}{
"add_fields.fields.stream_id": "system/metrics-system.filesystem-default-system", // make sure we're not overwiting anything
"add_fields.fields.dataset": "generic",
Expand Down Expand Up @@ -138,12 +128,7 @@ func TestMBGenerate(t *testing.T) {
},
}

reloadCfg, err := generateBeatConfig(&rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true})
require.NoError(t, err, "error in generateBeatConfig")
cfgMap := mapstr.M{}
err = reloadCfg[0].Config.Unpack(&cfgMap)
require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config)

cfgMap := buildConfigMap(t, &rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true})
configFields := map[string]interface{}{
"drop_event": nil,
"add_fields.fields.stream_id": "system/metrics-system.filesystem-default-system",
Expand Down Expand Up @@ -236,3 +221,12 @@ func findFieldsInProcessors(t *testing.T, configFields map[string]interface{}, c
assert.True(t, gotVal, "got incorrect key for %s, expected %s, got %s", key, val, errStr)
}
}

func buildConfigMap(t *testing.T, unitRaw *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) mapstr.M {
reloadCfg, err := generateBeatConfig(unitRaw, agentInfo)
require.NoError(t, err, "error in generateBeatConfig")
cfgMap := mapstr.M{}
err = reloadCfg[0].Config.Unpack(&cfgMap)
require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config)
return cfgMap
}