Skip to content

Commit

Permalink
Recreate AEH Processor in the event of an error before retrying the p…
Browse files Browse the repository at this point in the history
…rocessing operation (#3614)

Signed-off-by: MattCosturos <48531957+MattCosturos@users.noreply.github.com>
Signed-off-by: Matt Costuros <mcosturos@moog.com>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
  • Loading branch information
3 people authored Dec 17, 2024
1 parent aca5116 commit 26808c9
Showing 1 changed file with 47 additions and 46 deletions.
93 changes: 47 additions & 46 deletions common/component/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,6 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
}
topic := config.Topic

// Get the processor client
processor, err := aeh.getProcessorForTopic(subscribeCtx, topic)
if err != nil {
return fmt.Errorf("error trying to establish a connection: %w", err)
}

// This component has built-in retries because Event Hubs doesn't support N/ACK for messages
retryHandler := func(ctx context.Context, events []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error) {
b := aeh.backOffConfig.NewBackOffWithContext(ctx)
Expand Down Expand Up @@ -282,51 +276,58 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr

subscriptionLoopFinished := make(chan bool, 1)

// Process all partition clients as they come in
subscriberLoop := func() {
for {
// This will block until a new partition client is available
// It returns nil if processor.Run terminates or if the context is canceled
partitionClient := processor.NextPartitionClient(subscribeCtx)
if partitionClient == nil {
subscriptionLoopFinished <- true
return
}
aeh.logger.Debugf("Received client for partition %s", partitionClient.PartitionID())

// Once we get a partition client, process the events in a separate goroutine
go func() {
processErr := aeh.processEvents(subscribeCtx, partitionClient, retryConfig)
// Do not log context.Canceled which happens at shutdown
if processErr != nil && !errors.Is(processErr, context.Canceled) {
aeh.logger.Errorf("Error processing events from partition client: %v", processErr)
}
}()
}
}

// Start the processor
// Start the subscribe + processor loop
go func() {
for {
go subscriberLoop()
// This is a blocking call that runs until the context is canceled
err = processor.Run(subscribeCtx)
// Exit if the context is canceled
if err != nil && errors.Is(err, context.Canceled) {
return
}
// Get the processor client
processor, err := aeh.getProcessorForTopic(subscribeCtx, topic)
if err != nil {
aeh.logger.Errorf("Error from event processor: %v", err)
aeh.logger.Errorf("error trying to establish a connection: %w", err)
} else {
aeh.logger.Debugf("Event processor terminated without error")
}
// wait for subscription loop finished signal
select {
case <-subscribeCtx.Done():
return
case <-subscriptionLoopFinished:
// noop
// Process all partition clients as they come in
subscriberLoop := func() {
for {
// This will block until a new partition client is available
// It returns nil if processor.Run terminates or if the context is canceled
partitionClient := processor.NextPartitionClient(subscribeCtx)
if partitionClient == nil {
subscriptionLoopFinished <- true
return
}
aeh.logger.Debugf("Received client for partition %s", partitionClient.PartitionID())

// Once we get a partition client, process the events in a separate goroutine
go func() {
processErr := aeh.processEvents(subscribeCtx, partitionClient, retryConfig)
// Do not log context.Canceled which happens at shutdown
if processErr != nil && !errors.Is(processErr, context.Canceled) {
aeh.logger.Errorf("Error processing events from partition client: %v", processErr)
}
}()
}
}

go subscriberLoop()
// This is a blocking call that runs until the context is canceled or a non-recoverable error is returned.
err = processor.Run(subscribeCtx)
// Exit if the context is canceled
if err != nil && errors.Is(err, context.Canceled) {
return
}
if err != nil {
aeh.logger.Errorf("Error from event processor: %v", err)
} else {
aeh.logger.Debugf("Event processor terminated without error")
}
// wait for subscription loop finished signal
select {
case <-subscribeCtx.Done():
return
case <-subscriptionLoopFinished:
// noop
}
}

// Waiting here is not strictly necessary, however, we will wait for a short time to increase the likelihood of transient errors having disappeared
select {
case <-subscribeCtx.Done():
Expand Down

0 comments on commit 26808c9

Please sign in to comment.