Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

token->wait() hangs when publishing from 2 threads #499

Closed
elzbieta-nowak-kubat opened this issue Jun 13, 2024 · 5 comments
Closed

token->wait() hangs when publishing from 2 threads #499

elzbieta-nowak-kubat opened this issue Jun 13, 2024 · 5 comments
Labels
bug Confirmed bug fix added A fix has been pushed to the repo and is being tested upstream Problem with upstream library
Milestone

Comments

@elzbieta-nowak-kubat
Copy link

Hello

Consider the following example

#include <iostream>
#include <cstdlib>
#include <string>
#include <thread>
#include <atomic>
#include <chrono>
#include <cstring>
#include "mqtt/async_client.h"

using namespace std;

const string DFLT_SERVER_ADDRESS	{ "mqtt://localhost:1883" };
const string CLIENT_ID				{ "paho_cpp_publish" };

const string TOPIC { "hello" };

const char* PAYLOAD1 = "Hello World!";

const int  QOS = 0;

/////////////////////////////////////////////////////////////////////////////

/**
 * A callback class for use with the main MQTT client.
 */
class callback : public virtual mqtt::callback
{
public:
	void connection_lost(const string& cause) override {
		cout << "\nConnection lost" << endl;
		if (!cause.empty())
			cout << "\tcause: " << cause << endl;
	}

	void delivery_complete(mqtt::delivery_token_ptr tok) override {
		cout << "\tDelivery complete for token: "
			<< (tok ? tok->get_message_id() : -1) << endl;
	}
};



/////////////////////////////////////////////////////////////////////////////

int main(int argc, char* argv[])
{
	string	address  = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS,
			clientID = (argc > 2) ? string(argv[2]) : CLIENT_ID;

	cout << "Initializing for server '" << address << "'..." << endl;
	auto createOptions = mqtt::create_options_builder()
		.mqtt_version(MQTTVERSION_5)
		.send_while_disconnected(false, false)
		.max_buffered_messages(1)
		.delete_oldest_messages(false)
		.restore_messages(false)
		.persist_qos0(false)
		.finalize();

	mqtt::async_client client(address, clientID, createOptions);

	callback cb;
	client.set_callback(cb);

	auto connOpts = mqtt::connect_options::v5();
	connOpts.set_keep_alive_interval(60);
	connOpts.set_connect_timeout(30);
	connOpts.set_clean_start(true);
	connOpts.set_automatic_reconnect(false);

	cout << "  ...OK" << endl;

	try {
		cout << "\nConnecting..." << endl;
		mqtt::token_ptr conntok = client.connect(connOpts);
		cout << "Waiting for the connection..." << endl;
		conntok->wait();
		cout << "  ...OK" << endl;
	}
	catch (const mqtt::exception& exc) {
		cerr << exc.what() << endl;
		return 1;
	}

	auto publish = [&client](int id) {
		atomic_int counter = 0;

		for (auto i = 0; i < 100; i++) {
			try {
				cout << "\nSending message, thread id : " << id << endl;
				mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC + std::to_string(id), PAYLOAD1 + std::to_string(counter));
				pubmsg->set_qos(QOS);
				pubmsg->set_retained(false);
				auto token = client.publish(pubmsg);
				token->wait();
				cout << "  ...OK, thread id : "<< id << endl;
				counter++;
			}
			catch (const mqtt::exception& exc) {
				cerr << "Publish error : " << exc.what() << endl;
			}
		}
	};

	auto t1 = std::thread(publish, 111);
	auto t2 = std::thread(publish, 222);

	t1.join();
	t2.join();

	try {
		cout << "\nDisconnecting..." << endl;
		client.disconnect()->wait();
		cout << "  ...OK" << endl;
	}
	catch (const mqtt::exception& exc) {
		cerr << exc.what() << endl;
		return 1;
	}

 	return 0;
}

We publish messages in loop from two threads. Publish is synchronous. Almost every time we observe that one of threads hangs on token->wait(), while other is publishing. From time to time we see also error MQTT error [-12]: No more messages can be buffered for some publish. Our observation is that this is related to setting max_buffered_messages to 1 (increasing this number helps).

Is it a bug that max_buffered_messages set to 1 causes hang or we just misconfigured something? If the second case please explain, how to set it (e.g. in relation to number of threads).

@fpagliughi
Copy link
Contributor

I'll have a look and maybe put up new example with two threads publishing.

@fpagliughi fpagliughi added this to the v1.4 milestone Jun 16, 2024
@elzbieta-nowak-kubat
Copy link
Author

That's great.

In meantime I have investigated it a bit and it seems that the problem is caused by wrong synchronization of noBufferedMessages member of MQTTAsync on pahoc side. In can happen that two threads can simultaneously accesses the check on this member, both commands are added to the list and increase this number, so in effect in MQTTAsync_add function processing enters condition

/* delete oldest message if buffer is full.  We wouldn't be here if delete newest was in operation */
if (command->client->createOptions && (command->client->noBufferedMessages >= command->client->createOptions->maxBufferedMessages))

where it should never enter for our create options (delete_oldest_messages(false)) and then ListDetach is called which causes removing the first publish command from the processing. So on pahocpp side wait corresponding to the publish never gets released.

@fpagliughi
Copy link
Contributor

OK. That's what I was afraid of. This should be an issue in the Paho C repo. I will copy and paste the issue there.

@fpagliughi fpagliughi modified the milestones: v1.4, v1.5 Jun 17, 2024
@fpagliughi fpagliughi added upstream Problem with upstream library bug Confirmed bug labels Jun 17, 2024
@icraggs
Copy link
Collaborator

icraggs commented Sep 3, 2024

I think this is because of the lack of synchronization in MQTTAsync_send, the conflict arising here:

else if (m->createOptions &&
			(m->createOptions->struct_version < 2 || m->createOptions->deleteOldestMessages == 0) &&
			(MQTTAsync_getNoBufferedMessages(m) >= m->createOptions->maxBufferedMessages))
		rc = MQTTASYNC_MAX_BUFFERED_MESSAGES;

I've taken a C library trace of the occurrence:

lll.zip

and tried out adding a lock/unlock of mqttasync_mutex to MQTTAsync_send, which appears to work for this case. I'll need to check it doesn't affect anything else.

@fpagliughi fpagliughi added the fix added A fix has been pushed to the repo and is being tested label Jan 4, 2025
@fpagliughi
Copy link
Contributor

It looks like the upstream fix has corrected this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Confirmed bug fix added A fix has been pushed to the repo and is being tested upstream Problem with upstream library
Projects
None yet
Development

No branches or pull requests

3 participants