Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefetch not assigned to first channel #502

Open
osok opened this issue Apr 10, 2021 · 0 comments
Open

Prefetch not assigned to first channel #502

osok opened this issue Apr 10, 2021 · 0 comments

Comments

@osok
Copy link

osok commented Apr 10, 2021

I have an application that consumes messages from a channel. The execution can be lengthy like a few minutes to run. When the application picks up a message it builds one of four configurations. In some cases it can be short, so I want it to pick up the next message.

Server --> (writes four messages) ----> I have four processes consuming [Builder]

The builder is the application. I have it set to prefetch just one message from the channel.

	// Connect to the rabbitMQ instance
	connection, err := amqp.Dial(url)
	defer connection.Close()

	if err != nil {
		util.FailOnError(err, "could not establish connection with RabbitMQ in main")
		return
	}

	channel, err := connection.Channel()
	if err != nil {
		util.FailOnError(err, "could not open first RabbitMQ channel in main")
		return
	}

	// Only pick up one message at a time.  Since some of the workers will take a long time to compile all the
	// executables, this helps to allow other workers to pick up work.
	channel.Qos(1,0,true)
	// We create an exchange that will bind to the queue to send and receive messages
	err = channel.ExchangeDeclare(builder2.BUILD_EXCHANGE, "topic", true, false, false, false, nil)

	if err != nil {
		util.FailOnError(err, "error declaring exchange in main")
		return
	}

	// We bind the queue to the exchange to send and receive data from the queue
	err = channel.QueueBind(builder2.BUILD_QUEUE, "#", builder2.BUILD_EXCHANGE, false, nil)
	if err != nil {
		// we need to reopen the channel
		channel, err = connection.Channel()
		if err != nil {
			util.FailOnError(err, "could not open RabbitMQ channel in main")
			return
		}
		err = declareQueue(channel)
		if err != nil {
			util.FailOnError(err, "error binding to the queue in main")
			return
		}
	}

	// We consume data from the queue named Test using the channel we created in go.
	msgs, err := channel.Consume(builder2.BUILD_QUEUE, "", false, false, false, false, nil)

So when I run the Builder four times, I get get four channels. The prefetch is properly set for channel 2,3 &4, but channel 1 is not set.

So when the server fires off the first batch, and then a second, I see five messages in the first channel and one in each of the other three.

go version go1.14.2 linux/amd64
I pulled amqp as late as today 4/10/2021

2021-04-10_16-47-15

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant