Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potential blocking on unsubscribe with message in flight #25

Open
flowchartsman opened this issue Apr 27, 2015 · 27 comments
Open

Potential blocking on unsubscribe with message in flight #25

flowchartsman opened this issue Apr 27, 2015 · 27 comments

Comments

@flowchartsman
Copy link

Per our discussion in stompngo_examples issue #2, I'm opening an issue in the main project. The concern is on what to do with in-flight messages received after or during an unsubscribe. The current use case I've been working with involves ActiveMQ with prefetch set, but could apply to any similar interaction with a STOMP server that sends more than one MESSAGE without requiring an ACK using either ackmode client or client-individual

  1. connect
  2. sub to queue q.1 with ackmode client and prefetch X where X is how many messages I need
  3. process X messages
  4. ACK the last message (which should be cumulative with ackmode client)
  5. unsubscribe
  6. repeat 2-5 with queues q.2, q.3, etc.

The problem I'm running into is that the UNSUBSCRIBE right after the ACK seems to be causing messages to not get ACKed sometimes, and some of the later subscriptions are getting nothing at all. In examining this problem (which may or may not be related to the issue I'm submitting), I ran across the following scenario which I think needs to be at least mentioned in the documentation, if not fixed in the code somehow:

  1. Client subscribes to a destination, which creates a chan MessageData with a buffer size of c.scc (default: 1)
//subscription.go
if hid { // Client specified id
     c.subs[id] = make(chan MessageData, c.scc) // Assign subscription
}
  1. Message(s) arrive with that destination specified in the subscription header and are picked up in the read loop here
//reader.go
if sid, ok := f.Headers.Contains("subscription"); ok {
     c.subsLock.Lock()
     c.subs[sid] <- d
     c.subsLock.Unlock()
} else {
     c.input <- d
}
  1. Client processes some messages or not, but there is still a message in the channel for this subscription.
  2. client unsubscribes, which causes the following
//unsubscribe.go
c.subsLock.Lock()                                                                                                                                                              defer c.subsLock.Unlock()
// ...
close(c.subs[sid])
delete(c.subs, sid)
  1. Before c.subsLock.Lock() is called in the unsubscribe, another message arrives and is picked up by the reader goroutine.

It seems that this would block. The unsubscribe wouldn't be able to get the subsLock, and the reader goroutine would be blocked trying to send to the subscription channel.

@gmallard
Copy link
Owner

First, I have been unable to recreate the main scenario described, i.e. apparently corrupted queue state. Why? I consistently get the hang described above (or different hangs/panics, read on). Queue state examined from admin consoles seems OK here, after kill of a blocked client.

Second, I have recreated the hang described, as well as other hangs or pure panics. All of this work is published at the https://github.com/gmallard/stompngo_examples project, branch name is sngissue25.

Included there is:

  • An example of how to create the problem(s)
  • Sample stack traces using ActiveMQ, Apollo, and RabbitMQ. Note that different brokers can behave differently. Or not.

You are invited to review those stack traces in detail.

All of those examples were run with stompngo at: 048c068.

Seems like a fairly major difficulty with this package at present.

@flowchartsman
Copy link
Author

I may have a couple ides on how to address the issue, but it will take some thinking for how to do it without breaking the API. Will try to find some time later in the week.

@gmallard
Copy link
Owner

Thanks. I will experiment here as well.

Given the different scenarios in the stack traces from above, at this time I feel there is no 'simple' fix.

A nasty little problem .......

@gmallard
Copy link
Owner

gmallard commented May 3, 2015

For the record, I can also create hangs when unsubscribe is bypassed, and disconnect is invoked directly.

What I cannot recreate is the ...... queue corruption that you seem to describe (broker does not correctly handle the ACK, missed ACKS, apparently empty queues .....). Do you have an example that reliably recreates that situation?

I am working on a very experimental idea to eliminate the hangs (which includes a draining concept), but have not published any of it as yet. It is somewhat ugly, and I want to think about the approach some more, as well as perform additional validation / tests.

@gmallard
Copy link
Owner

gmallard commented May 3, 2015

Can you post your activemq.xml configuration file? Are you sure "missing" messages are not on a DLQ?

@gmallard
Copy link
Owner

gmallard commented May 4, 2015

Try branch blockwork at 0c372bb.

Let me know if that helps.

@flowchartsman
Copy link
Author

This is a lot to respond to. I'll test out the new branch, and see if I can get ahold of the activemq.xml, but no, they were not on dead-letter queues, they were just not getting acked. My test case is to put 10 messages in each of the five queues and then take 1 from 1, 2 from 2 and so on. But I would end up only getting one or two from a queue I'd tried to get multiples from, IIRC. I will try the new branch and post what happens.

@gmallard
Copy link
Owner

gmallard commented May 5, 2015

Do I read that last post correctly? As:

  • from queue 1 receive 1 message only
  • from queue 2 receive 2 messages only
  • from queue 3 receive 3 messages only
  • ..... and so on

???

Some day ..... you should explain to me what real world scenario would require something like that .....
seems somewhat strange to me.

Edited: to make it more clear what I am asking (I hope).

@gmallard
Copy link
Owner

gmallard commented May 6, 2015

I really think that you should think about posting some code that shows that behavior.

@gmallard
Copy link
Owner

gmallard commented May 7, 2015

I am changing my thoughts on some of this. I will detect the situation in Unsubscribe and handle it. And with 1.2, NACK any latent messages.

But there is something you need to be aware of:

  • With ActiveMQ 5.11.1
  • STOMP Protocol 1.2
  • NACK'd messages are sent automatically to ActiveMQ.DLQ

That would seem to invalidate your approach.

@gmallard
Copy link
Owner

A quick comment.

I will eventually add code which attempts to address this kind of client design. And will also add notes to the wiki describing this in some detail.

Given the totally asynchronous nature of STOMP and typical broker behavior, this application design is certainly less than ideal, and frankly I discourage it.

gmallard added a commit that referenced this issue May 21, 2015
actual work on stompngo issue #25.
@gmallard
Copy link
Owner

gmallard commented Jul 4, 2016

I have not just stopped thinking about this issue. Despite the lack of current progress.

I believe at least one, perhaps several partial solutions are possible. Any of these solutions would require this package to 'extend' the base STOMP protocol. The protocol extensions would allow package clients to optionally refine current package behavior.

I believe these solutions can be implemented by merely extending the current API, not modifying currently available functionality.

I also believe that any possible solutions would still leave a client with the ability to see odd / erroneous behavior depending on the client's design / implementation.

The STOMP level 1.2 specification was released on 10/22/2012. Prior to that time the specification committee had discussed a number of protocol additions that relate to the flow of messages, and possible control of that flow. These protocol additions were postponed to await an effort to develop a version 2.0 of the protocol. For a high level overview of these discussions see:

Stomp Specification Issues

Concerned readers should peruse the link above.

@gmallard
Copy link
Owner

gmallard commented Aug 7, 2016

Please upgrade your local environments and any clones to:

stompngo: Version v1.0.4

stompngo_examples: Version v1.0.5

This adds an 'extension' to the package that should allow you to implement the design you have described.

You will need changes to your current code.

The examples project has ..... well, examples of how to use this extension.

@gmallard
Copy link
Owner

@flowchartsman Do you have any updates on this issue?

@itomsawyer
Copy link
Contributor

@gmallard

We also have encountered with this case.
As described by @flowchartsman , while reader try to dispatch message to subscribers, the subscriber routine may already give up read message from subscribe chan. This can make reader hang on c.subs[sid] <- d .

We solve this problem in our app as below:

func (b *WSBroker) unsubscribe(uuid string, r <-chan stompngo.MessageData, dest string) {
    done := make(chan struct{})
    go func() {
        sbh := stompngo.Headers{}
        switch b.connection.Protocol() {
        case stompngo.SPL_12:
            sbh = sbh.Add("id", uuid)
        case stompngo.SPL_11:
            sbh = sbh.Add("id", uuid)
        case stompngo.SPL_10:
            sbh = sbh.Add("destination", dest)
        default:
            b.logger.Printf("consume protocol error, should not happen")
        }
        err := b.connection.Unsubscribe(sbh)
        if err != nil {
            b.logger.Println("unsubscribe error %v", err)
        }

        //XXX stompngo Close on message inflight maybe hang due to reader sending
        //    message to subscriber which is already unsubed , while Close() function
        //    inflight cannot get the subscribers RWLock
        close(done)
    }()

    for {
        select {
        // drain all incoming message due to we have unsubscribed
        case _, ok := <-r:
            if !ok {
                return
            }
        case _, ok := <-b.connection.MessageData:
            if !ok {
                return
            }
        //insure connection Unsubscribe done
        case <-done:
            return
        }
    }
}

WSBroker is our encapsulation of stompngo.Connection with WSBroker is type of *stompngo.Connection.

After WSBroker unsubscribed from b.Connection. we continue drain message from r which is created by Subscribe.

This ensure reader is not blocked and continue to read until it read the response of unsubscribe frame. Meanwhile, the message inflight is not acked, and can be processed later.

@flowchartsman
Copy link
Author

A couple months after last working on this issue I actually ended up moving to another company, and most of the work we do here is with Kafka and NSQ, so my interaction with STOMP has been pretty much zero for over two years now, but I'm happy to help test if needed.

@gmallard
Copy link
Owner

gmallard commented Aug 5, 2018

@flowchartsman - Thanks for the offer.

All - At this point, what is really needed is: a small self-contained example that reliably demonstrates the issue using current HEAD of the project.

@gmallard
Copy link
Owner

I have added a wiki page that describes the workaround that was implemented for this issue. That page is here:

https://github.com/gmallard/stompngo/wiki/Issue-25

@gmallard
Copy link
Owner

@itomsawyer - your solution to this issue is quite interesting.

I am considering how that solution might (optionally) be implemented as a second workaround in the current stompngo environment.

gmallard added a commit that referenced this issue Aug 14, 2018
This allow a client to implement a fix for the hangs described in
issue #25.  This is a second approach to solving that problem.
@gmallard
Copy link
Owner

I have committed 512e8d8.

This implements a second extension that provides an alternative way to address blocks/hangs with this application design. This second extension is based very roughly on the idea @itomsawyer described above. Many thanks for the ideas.

Both stompngo extensions are described at:

https://github.com/gmallard/stompngo/wiki/Issue-25

The stompngo_examples project also has examples for both extensions that support message draining. Those examples are at these locations:

https://github.com/gmallard/stompngo_examples/tree/dev/adhoc/varmGetter

https://github.com/gmallard/stompngo_examples/tree/dev/adhoc/varmGetter2

@MichaHoffmann
Copy link

MichaHoffmann commented Jul 23, 2020

I have hit this issue to today. In my usecase i have no idea how many messages we are going to consume ( its a long running application ) so i tried the "StompDrainNow" extension.

forsel:
	for {
		ticker := time.NewTicker(ival)
		select {
		case mi, ok := <-usesp.md:
			if !ok {
				break forsel
			}
			dmc++
			c.log("sngdrnow DROP", dmc, mi.Message.Command, mi.Message.Headers)
		case _ = <-ticker.C:
			c.log("sngdrnow extension BREAK")
			break forsel
		}
	}

Since the ticker is refreshed every time a message arrives, it just keeps on consuming messages and refreshing the ticker.
Shouldnt the Ticker be constructed above the loop and not inside it?

@gmallard
Copy link
Owner

What is the ACK mode in use please?

@MichaHoffmann
Copy link

MichaHoffmann commented Jul 24, 2020

What is the ACK mode in use please?

client-individual in my use case. Right now, im draining the subscription by hand when unsubscribing. I have not hit the issue when draining manually.

@gmallard
Copy link
Owner

Hmmm. Please describe in detail:

a) What you are seeing (sequence of events, ....)
b) What you expect to see

Sorry, I am not really understanding the problem .... yet.

@gmallard
Copy link
Owner

What ever you are seeing (hang, loop, something else??) I can not recreate it here.

Regardless, I recently became aware that I often use Ticker when the time package provides more appropriate tools.

That loop code has been changed to use time.After, and the code is on the dev branch: 8eccd03

Please give that a try and let me know the results.

@MichaHoffmann
Copy link

What ever you are seeing (hang, loop, something else??) I can not recreate it here.

Regardless, I recently became aware that I often use Ticker when the time package provides more appropriate tools.

That loop code has been changed to use time.After, and the code is on the dev branch: 8eccd03

Please give that a try and let me know the results.

Sorry i've been busy; i'll try to create a testcase for what i'm seeing this week.

@MichaHoffmann
Copy link

MichaHoffmann commented Jul 27, 2020

Ok, i have tried to write a test using ginkgo/gomega

The idea is that the server just keeps spamming messages after the unsubscribe call ( i think i have seen it with the rabbitmq stomp plugin, but maybe i was just too impatient and it took too long )

package test

import (
	"fmt"
        "io"
        "strings"
	"net"
	"time"

	"github.com/gmallard/stompngo"

	. "github.com/onsi/ginkgo"
	. "github.com/onsi/gomega"
	. "github.com/onsi/gomega/gbytes"
)

func stompFrame(frame string) string {
	return fmt.Sprintf("%s\x00", frame)
}

func write(writer io.Writer, cont string) error {
	_, err := io.Copy(writer, strings.NewReader(cont))
	return err
}

var _ = Describe("issue #25", func() {
	When("the server keeps sending messages during unsubscribe indefinitly", func() {
		It("should not hang", func() {
			done := make(chan struct{})
			server, client := net.Pipe()
			receiver := BufferReader(server)
			go func() {
				defer GinkgoRecover()
				defer close(done)

				By("connecting")
				connectHeaders := stompngo.Headers{}.
					Add(stompngo.HK_ACCEPT_VERSION, stompngo.SPL_12).
					Add(stompngo.HK_HOST, "/")
				stomper, err := stompngo.Connect(client, connectHeaders)
				Expect(err).ToNot(HaveOccurred())

				By("subscribing")
				subscribeHeaders := stompngo.Headers{}.
					Add(stompngo.HK_DESTINATION, "testDestination").
					Add(stompngo.HK_ID, "testId").
					Add(stompngo.HK_ACK, stompngo.AckModeAuto)
				md, err := stomper.Subscribe(subscribeHeaders)

				Expect(err).ToNot(HaveOccurred())

				By("received first message")
				Eventually(md).Should(Receive())

				By("started unsubscribing")
				unsubscribeHeaders := stompngo.Headers{}.
					Add(stompngo.HK_ID, "testId").
					Add(stompngo.StompPlusDrainNow, "")
				Expect(stomper.Unsubscribe(unsubscribeHeaders)).To(Succeed())
			}()

			Eventually(receiver).Should(Say(stompFrame(`CONNECT
accept-version:1.2
host:/
content-type:text/plain; charset=UTF-8
content-length:0

`)))
			Expect(write(server, stompFrame(`CONNECTED
version:1.2

`))).To(Succeed())

			Eventually(receiver).Should(Say(stompFrame(`SUBSCRIBE
destination:testDestination
id:testId
ack:auto
content-type:text/plain; charset=UTF-8
content-length:0

`)))

			By("spamming messages")
			go func() {
				defer GinkgoRecover()

				msg := "test"
				msgId := 0
				for ; ; time.Sleep(10 * time.Millisecond) {
					msgId++
					By("writing a message")
					Expect(write(server, stompFrame(fmt.Sprintf(`MESSAGE
subscription:testId
message-id:%d
ack:%d
destination:testDestination
content-type:text/plain

%s`, msgId, msgId, msg)))).To(Succeed())
				}
			}()

			Eventually(done).Should(Receive())
		})
	})
})

This test fails on my machine with

STEP: connecting
STEP: subscribing
STEP: received first message
STEP: spamming messages
STEP: writing a message
STEP: started unsubscribing
STEP: writing a message
STEP: writing a message
... more

• Failure [1.012 seconds]
issue #25

  when the server keeps sending messages during unsubscribe indefinitly

    should not hang [It]


    Timed out after 1.000s.
    Expected
        <chan struct {} | len:0, cap:0>: 0xc0003ae000
    to receive something.

Possibly the server is to blame because it just ignores the unsubscribe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants