Skip to content

Commit

Permalink
Add timeout to renew operation.
Browse files Browse the repository at this point in the history
  • Loading branch information
lawrencegripper committed Sep 17, 2018
1 parent 4a6a67b commit 4265bd8
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions internal/app/dispatcher/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,31 @@ func Run(cfg *types.Configuration) {
go func() {
defer wg.Done()
for {
// Locks are held for 1 mins, renew every 25 sec to keep locks
time.Sleep(25 * time.Second)
// Locks are held for 1 mins, renew every 20 sec to keep locks
time.Sleep(20 * time.Second)
// allow 20seconds for the renew operation, keeping a 25 second buffer
timeAllowanceForRenewalRequest := time.Second * 15

// Renew message locks with ServiceBus
//https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#message-renew-lock
activeMessages := provider.GetActiveMessages()
if len(activeMessages) < 1 {
log.Debug("no active messages, skipping lock renewal")
continue
}

messagesAMQP := make([]*amqp.Message, 0, len(activeMessages))
for _, m := range activeMessages {
originalMessage := m.GetAMQPMessage()
messagesAMQP = append(messagesAMQP, originalMessage)
}

err := amqpConnection.RenewLocks(ctx, messagesAMQP)
renewContextWithDeadline, cancel := context.WithTimeout(ctx, timeAllowanceForRenewalRequest)
defer cancel()
err := amqpConnection.RenewLocks(renewContextWithDeadline, messagesAMQP)
if err != nil {
log.WithError(err).Error("failed to renew locks")
// Todo: Additional could be put in here to cleanup operations.
log.WithError(err).Panic("Failed to renew locks therefor cannot continue to operation as message maybe reassigned to another dispatcher.")
}
}
}()
Expand Down

0 comments on commit 4265bd8

Please sign in to comment.