Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

AMQP session timeout #109

Open
lawrencegripper opened this issue Jul 2, 2018 · 14 comments
Open

AMQP session timeout #109

lawrencegripper opened this issue Jul 2, 2018 · 14 comments

Comments

@lawrencegripper
Copy link
Contributor

lawrencegripper commented Jul 2, 2018

Hi,

I'm seeing the following when using the library with Azure ServiceBus. The program is making a session and sender and then holding it open, sometime for >10Mins, without sending anything (it's a web app which sends an message on receiving a request). After 10mins idle sending a message fails because the link is detached due to being idle.

I want to add a check which allows me to reconnect when a link is detached but I can't find a channel I can watch for this event on either the sender or session. @devigned I wondered if you saw something similar in your work?

Would this be a useful thing to add as I'd also like to reconnect any linkdetached events? I could take a look in making a PR which adds a channel to the session, sound like a good idea?

ps. Still relatively new to AMQP so hopefully got terminology right be apologies if I've misunderstood something and this can be worked around with the existing library.

Error message:

time="2018-07-02T18:19:09Z" level=error msg="link detached, reason: *Error{Condition: amqp:link:detach-forced, Description: The link 'G25:1219031:9qnlIgUj4ClqjdQfzAfLSgmOXRsGQLJyhVerotCDtCO5g-0pXxBHGg' is force detached by the broker due to errors occurred in publisher(link12380570). Detach origin: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00. TrackingId:97c6d7740000002700bce99a5b3a55f0_G25_B17, SystemTracker:ionsb-ccbbjpvf:Topic:frontapi.new_link, Timestamp:7/2/2018 4:52:24 PM, Info: map[]}"
@devigned
Copy link
Contributor

devigned commented Jul 2, 2018

Here is the list of behaviors enforced in the service side related to connection management for Azure Service Bus (afaik).

  • Connection is created, but there are no senders/receivers created on the connection:
    • connection will be closed by the service after 1 minute.
  • Connection is created and a sender or a receiver is created on the connection:
    • connection will be maintained as long as there is at least one sender or receiver.
    • If there are no senders/receivers on the connection, connection will be closed after 5 minutes.
  • Senders/receivers will be closed by the service if there are no activities for 30 minutes (i.e. no send or pending receive calls)

To handle this, in https://github.com/Azure/azure-service-bus-go we recover when sending and receiving if we run into an error we'd classify as recoverable.

There are some event types which cause linkdetatch in Service Bus and Event Hubs, which are not recoverable. I believe this will be specific to the broker implementation and probably why it has not been implemented here.

Just a thought, but if there were to be a "recovery policy" that could be provided to a connection or some other entity, it might be generic enough to be useful across broker implementations.

@amarzavery I know you've been digging into recovery details lately. Please correct me if the details I've provided above are incorrect.

@amarzavery
Copy link

The details you provided are correct. It would be nice if the protocol library can provide a hook/policy that can help users do some custom things for recovery.

For example in Azure EventHubs if a receiver link goes down for some reason and if we decide to recover. We would like to re-establish the link by setting the offset of the last received message as a filter on the link. This enables us to not receive messages from the start or from whatever offset/time that was provided when the link was initially created.

Having a callback/hook to do something custom like above during recovery would be extremely beneficial.

@vcabbage
Copy link
Owner

vcabbage commented Jul 3, 2018

Providing a way to be notified on link detach/allow recovery seems like a good idea.

Does someone want to propose an API? I would prefer it to use a callback approach rather than channels.

@lawrencegripper
Copy link
Contributor Author

Sure I'll take a shot at an API, very rough thinking at the moment but as you have the LinkOption already in the API my preference would be to use this as it will prevent a breaking change and keep things simple.

Here are some very rough (not compiled) thoughts as a first draft. Do you think this is along the right lines? Not set on this approach just getting the ball rolling.

// LinkRecoveryFunc is invoked when a link error occurs and 
// allows you to create a new link using the newLink func or return an error 
// which will be propogated to the sender/receiver next time they are used
type LinkRecoveryFunc func(linkError error, newLink func() (*link, error)) (*link, error)

func LinkRecoveryOption(recoveryFunc LinkRecoveryFunc) LinkOption {
	return func(l *link) error {
		l.recoveryFunc = func(linkError error) (*link, error) {
			return recoveryFunc(linkError, func() (*link, error) { newLink(l.session, l.receiver, l.options) })
		}
		return nil
	}
}

When the link experiences an error then it can invoke l.recoveryFunc with that error and either start using the new link returned or propagate the error.

This could then be used like this when creating a sender:

// CreateAmqpSender makes a sender which reconnects when a link detaches
func (l *AmqpConnection) CreateAmqpSender(topic string) (*amqp.Sender, error) {
	if l.Session == nil {
		log.WithField("currentListener", l).Panic("Cannot create amqp listener without a session already configured")
	}

	return l.Session.NewSender(
		amqp.LinkTargetAddress("/" + topic),
		amqp.LinkRecoveryOption(func(linkError error, newLink func() (*link, error){
			if isDetachError, _ := err.(amqp.DetachError); isDetachError {
				return newLink(), nil
			}

			return nil, linkError
		}),
	)

@alanconway
Copy link
Contributor

alanconway commented Jul 3, 2018 via email

@alanconway
Copy link
Contributor

alanconway commented Jul 3, 2018 via email

@devigned
Copy link
Contributor

devigned commented Jul 3, 2018

What @alanconway suggested is eerily similar to what we've implemented in the Azure Service Bus and Event Hubs libraries for message receivers.

// ListenerHandle provides the ability to close or listen to the close of a Receiver
type ListenerHandle struct {
	r   *receiver
	ctx context.Context
}

// Close will close the listener
func (lc *ListenerHandle) Close(ctx context.Context) error {
	return lc.r.Close(ctx)
}

// Done will close the channel when the listener has stopped
func (lc *ListenerHandle) Done() <-chan struct{} {
	return lc.ctx.Done()
}

// Err will return the last error encountered
func (lc *ListenerHandle) Err() error {
	if lc.r.lastError != nil {
		return lc.r.lastError
	}
	return lc.ctx.Err()
}

https://github.com/Azure/azure-service-bus-go/blob/607999369044f648929f37d3c925c086b21a5a06/receiver.go#L339-L355

This pattern is helpful for consumers to understand when something has failed, but I don't know that this pattern deals with recovery. This would still leave recovery in the hands of the consumer of the API.

If recovery is to be handled effectively, I'd imagine the consumer of the API would need to provide at least two things.

  1. a filter criteria to identify the errors that are recoverable
  2. a backoff policy for recovery (linear, exponential...)

@alanconway
Copy link
Contributor

alanconway commented Jul 3, 2018 via email

@vcabbage
Copy link
Owner

vcabbage commented Jul 4, 2018

Exposing channels in a public API can get rather complex. Not so much for a simple "done" channel, since it can be closed and that's it. For other uses it's not so straightforward. (There's a post detailing some examples and guidelines.)

Referring to @alanconway's example, what is expected of the consumer when both channelWithData and sender.Done() can be received from? When multiple select cases can proceed one is chosen at random (uniformly pseudo-random, to be precise). If channelWithData is buffered, the client may still want to process any pending messages, making correct usage more involved. In fact, the internal implementation of Receiver.Receive has exactly this problem, which needs to be dealt with.

In summary, I prefer to keep these details internal to the package, exposing a synchronous API where possible. In this case, a synchronous API doesn't make much sense, a callback is the next best option in my opinion. Neither of these approaches prevent the user from creating and using channels themselves, if that's more convenient for them. I'm not saying that this lib can never expose channels, but I'd want to see some reasons as to why a channel is significantly better than the alternatives.

@vcabbage
Copy link
Owner

vcabbage commented Jul 4, 2018

@lawrencegripper I think creating a new LinkOption makes sense. To @devigned's point, we probably want it to allow more customization.

Perhaps modifying the newLink function to accept LinkOptions would be sufficient.

type LinkRecoveryFunc func(linkError error, newLink func(opts ...LinkOption) (*link, error)) (*link, error)

When newLink is called, it would use the original options by default. Most, if not all, of the existing LinkOptions will overwrite previous values if applied multiple times.

@alanconway
Copy link
Contributor

alanconway commented Jul 4, 2018 via email

@lawrencegripper
Copy link
Contributor Author

lawrencegripper commented Jul 4, 2018

@vcabbage Ok makes sense to me, I'll take a stab at building a PR out of the proposal, hopefully get some time tomorrow if things go well.

@lawrencegripper
Copy link
Contributor Author

So it's taken me longer than I'd have liked to wrap my head around the changes needed. Before I dive any further into this I wanted to come up for air and sanity check some stuff @vcabbage.

Here are the cases where I think the library should handle a failure gracefully using this method:

  1. TCP Connection lost: https://github.com/vcabbage/amqp/blob/master/conn.go#L518 (other thinks to consider MaxSizeError line 497, multiple parsing errors but I'm unsure if these should be handled in this change)
  2. Link Error returned when invoking Send on a Sender: https://github.com/vcabbage/amqp/blob/master/client.go#L350
  3. Link Error returned when calling Receive on a Receiver: https://github.com/vcabbage/amqp/blob/master/client.go#L1569

The first feels like it should be handled with a separate ConnectionRecoveryOptions added to the connection details.

For the rest I think the existing LinkRecoveryOptions makes sense still.

In terms of where the recoveryFunc gets called I think that inside the body of send is the best fit as this is already locking so I can ensure it's safe. I'm thinking something like this:


// send is separated from Send so that the mutex unlock can be deferred without
// locking the transfer confirmation that happens in Send.
func (s *Sender) send(ctx context.Context, msg *Message) (chan deliveryState, error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	if s.link.err != nil {
		err := s.link.Close(ctx)
		if err != nil {
			return nil, errorErrorf("link in error state: %+v, attempted to close and received error: %+v", s.link.err, err)
		}

		newLink, err := s.link.recoveryFunc(s.link.err)
		if err != nil {
			return nil, errorErrorf("link in error state: %+v, attempted to recovery but received error: %+v", s.link.err, err)
		}
		s.link = newLink
	}

      ... existing code here

I'm a bit unsure how to handle the link change within the Receive function as it doesn't use locking, I need to write a bit of a test harness to prove this all out as currently without a reasonable way of testing all the scenarios.

Sorry if I'm way off track with this, initially looked at adding the recovery into the mux functions but it looks like sitting them on the higher level session object will be cleaner - any advice thoughts very welcome

@vcabbage
Copy link
Owner

vcabbage commented Aug 1, 2018

@lawrencegripper There are at least a few places this lib has grown to be a bit unwieldy, sorry about that. I need to do another round of cleanup at some point.

I'm not sure I'm willing to introduce a connection recovery mechanism. That seems much more involved than recovering individual links. Though I'm always willing to be convinced that it's the right thing to do if there's a real need.


That looks roughly correct for the Sender. A couple notes:

  • Only the mux can access link.err before link.done is closed. You'll need to do a non-blocking receive to check if link.done has been closed.
  • By that point the link should already be closed, so there's no need to call s.link.Close(ctx).
  • There will need to be a way to determine if the error originated at the link level since recovery can't be performed if it came from the session or conn.
  • Some thought will need to be put into errors that happen in the middle of transferring a message or while waiting for a disposition. Off the top of my head, I'm not certain of the correct way to handle them.

For the Receiver, I think the recovery should happen if r.link.done is closed. Since Receiver.Receive can be called concurrently, a new mutex will be needed. The first to acquire the mutex will do the recovery and there will need to be a check to determine if recovery has already been attempted.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants