From 8805cdd55a90dc6e9c443c2f4d9a79d4aa0e82ee Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Sun, 15 Sep 2019 22:10:05 +0900 Subject: [PATCH] Updated the parameter of set_accept_handler() from endpoint_t to std::shared_ptr. For `set_*_handler()`, capture `std::weak_ptr` at test_broker and examples. I think that `weak_ptr` is redundant. Reference is good enough because `shared_ptr` lifetime is kept by `start_session(life_keeper)`. However, `wp.lock()` is easy to detect if the lifetime is over unexpectedly. --- example/no_tls_both.cpp | 58 ++++++++++----------- example/no_tls_server.cpp | 85 ++++++++++++++++--------------- example/no_tls_ws_both.cpp | 48 +++++++++--------- example/no_tls_ws_server.cpp | 85 ++++++++++++++++--------------- example/tls_both.cpp | 86 +++++++++++++++---------------- example/tls_server.cpp | 87 ++++++++++++++++---------------- example/tls_ws_both.cpp | 50 +++++++++--------- example/tls_ws_server.cpp | 87 ++++++++++++++++---------------- example/v5_no_tls_both.cpp | 52 ++++++++++--------- example/v5_no_tls_prop.cpp | 29 ++++++----- example/v5_no_tls_server.cpp | 92 +++++++++++++++++----------------- include/mqtt/server.hpp | 16 +++--- test/test_broker.hpp | 91 ++++++++++++++++----------------- test/test_server_no_tls.hpp | 4 +- test/test_server_no_tls_ws.hpp | 4 +- test/test_server_tls.hpp | 4 +- test/test_server_tls_ws.hpp | 4 +- test/underlying_timeout.cpp | 8 +-- 18 files changed, 459 insertions(+), 431 deletions(-) diff --git a/example/no_tls_both.cpp b/example/no_tls_both.cpp index 50bb47cc3..0b6ebe1b7 100644 --- a/example/no_tls_both.cpp +++ b/example/no_tls_both.cpp @@ -167,36 +167,38 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { } ); s.set_accept_handler( - [&](con_t& ep) { + [&s, &connections, &subs](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + using packet_id_t = typename std::remove_reference_t::packet_id_t; std::cout << "[server] accept" << std::endl; - auto sp = ep.shared_from_this(); // For server close if ep is closed. auto g = MQTT_NS::shared_scope_guard( - [&] { + [&s] { std::cout << "[server] session end" << std::endl; s.close(); } ); - ep.start_session(std::make_tuple(std::move(sp), std::move(g))); + ep.start_session(std::make_tuple(std::move(spep), std::move(g))); // set connection (lower than MQTT) level handlers ep.set_close_handler( - [&] + [&connections, &subs, wp] (){ std::cout << "[server] closed." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); ep.set_error_handler( - [&] + [&connections, &subs, wp] (boost::system::error_code const& ec){ std::cout << "[server] error: " << ec.message() << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); // set MQTT level handlers ep.set_connect_handler( - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, @@ -209,53 +211,53 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl; std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl; std::cout << "[server] keep_alive : " << keep_alive << std::endl; - connections.insert(ep.shared_from_this()); - ep.connack(false, MQTT_NS::connect_return_code::accepted); + connections.insert(wp.lock()); + wp.lock()->connack(false, MQTT_NS::connect_return_code::accepted); return true; } ); ep.set_disconnect_handler( - [&] + [&connections, &subs, wp] (){ std::cout << "[server] disconnect received." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); ep.set_puback_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] puback received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrec_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrel_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubcomp_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl; return true; }); ep.set_publish_handler( - [&] - (bool dup, + [&subs] + (bool is_dup, MQTT_NS::qos qos_value, - bool retain, + bool is_retain, MQTT_NS::optional packet_id, MQTT_NS::buffer topic_name, MQTT_NS::buffer contents){ std::cout << "[server] publish received." - << " dup: " << std::boolalpha << dup + << " dup: " << std::boolalpha << is_dup << " qos: " << qos_value - << " retain: " << std::boolalpha << retain << std::endl; + << " retain: " << std::boolalpha << is_retain << std::endl; if (packet_id) std::cout << "[server] packet_id: " << *packet_id << std::endl; std::cout << "[server] topic_name: " << topic_name << std::endl; @@ -268,13 +270,13 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { boost::asio::buffer(contents), std::make_pair(topic_name, contents), std::min(r.first->qos_value, qos_value), - retain + is_retain ); } return true; }); ep.set_subscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector> entries) { std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl; @@ -285,21 +287,21 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { MQTT_NS::qos qos_value = std::get<1>(e).get_qos(); std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl; res.emplace_back(static_cast(qos_value)); - subs.emplace(std::move(topic), ep.shared_from_this(), qos_value); + subs.emplace(std::move(topic), wp.lock(), qos_value); } - ep.suback(packet_id, res); + wp.lock()->suback(packet_id, res); return true; } ); ep.set_unsubscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector topics) { std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl; for (auto const& topic : topics) { subs.erase(topic); } - ep.unsuback(packet_id); + wp.lock()->unsuback(packet_id); return true; } ); diff --git a/example/no_tls_server.cpp b/example/no_tls_server.cpp index 3a1b4d7d9..db72cb9b9 100644 --- a/example/no_tls_server.cpp +++ b/example/no_tls_server.cpp @@ -79,29 +79,32 @@ int main(int argc, char** argv) { mi_sub_con subs; s.set_accept_handler( - [&](con_t& ep) { + [&connections, &subs](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + using packet_id_t = typename std::remove_reference_t::packet_id_t; std::cout << "accept" << std::endl; - auto sp = ep.shared_from_this(); - ep.start_session(sp); // keeping ep's lifetime as sp until session finished + + ep.start_session(std::move(spep)); // keeping ep's lifetime as sp until session finished // set connection (lower than MQTT) level handlers ep.set_close_handler( - [&] + [&connections, &subs, wp] (){ - std::cout << "closed." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] closed." << std::endl; + close_proc(connections, subs, wp.lock()); }); ep.set_error_handler( - [&] + [&connections, &subs, wp] (boost::system::error_code const& ec){ - std::cout << "error: " << ec.message() << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] error: " << ec.message() << std::endl; + close_proc(connections, subs, wp.lock()); }); // set MQTT level handlers ep.set_connect_handler( - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, @@ -109,62 +112,62 @@ int main(int argc, char** argv) { bool clean_session, std::uint16_t keep_alive) { using namespace MQTT_NS::literals; - std::cout << "client_id : " << client_id << std::endl; - std::cout << "username : " << (username ? username.value() : "none"_mb) << std::endl; - std::cout << "password : " << (password ? password.value() : "none"_mb) << std::endl; - std::cout << "clean_session: " << std::boolalpha << clean_session << std::endl; - std::cout << "keep_alive : " << keep_alive << std::endl; - connections.insert(ep.shared_from_this()); - ep.connack(false, MQTT_NS::connect_return_code::accepted); + std::cout << "[server] client_id : " << client_id << std::endl; + std::cout << "[server] username : " << (username ? username.value() : "none"_mb) << std::endl; + std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl; + std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl; + std::cout << "[server] keep_alive : " << keep_alive << std::endl; + connections.insert(wp.lock()); + wp.lock()->connack(false, MQTT_NS::connect_return_code::accepted); return true; } ); ep.set_disconnect_handler( - [&] + [&connections, &subs, wp] (){ - std::cout << "disconnect received." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] disconnect received." << std::endl; + close_proc(connections, subs, wp.lock()); }); ep.set_puback_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "puback received. packet_id: " << packet_id << std::endl; + std::cout << "[server] puback received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrec_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubrec received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrel_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubrel received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubcomp_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubcomp received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl; return true; }); ep.set_publish_handler( - [&] + [&subs] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, MQTT_NS::optional packet_id, MQTT_NS::buffer topic_name, MQTT_NS::buffer contents){ - std::cout << "publish received." + std::cout << "[server] publish received." << " dup: " << std::boolalpha << is_dup << " qos: " << qos_value << " retain: " << std::boolalpha << is_retain << std::endl; if (packet_id) - std::cout << "packet_id: " << *packet_id << std::endl; - std::cout << "topic_name: " << topic_name << std::endl; - std::cout << "contents: " << contents << std::endl; + std::cout << "[server] packet_id: " << *packet_id << std::endl; + std::cout << "[server] topic_name: " << topic_name << std::endl; + std::cout << "[server] contents: " << contents << std::endl; auto const& idx = subs.get(); auto r = idx.equal_range(topic_name); for (; r.first != r.second; ++r.first) { @@ -179,32 +182,32 @@ int main(int argc, char** argv) { return true; }); ep.set_subscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector> entries) { - std::cout << "subscribe received. packet_id: " << packet_id << std::endl; + std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl; std::vector res; res.reserve(entries.size()); for (auto const& e : entries) { MQTT_NS::buffer topic = std::get<0>(e); MQTT_NS::qos qos_value = std::get<1>(e).get_qos(); - std::cout << "topic: " << topic << " qos: " << qos_value << std::endl; + std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl; res.emplace_back(static_cast(qos_value)); - subs.emplace(std::move(topic), ep.shared_from_this(), qos_value); + subs.emplace(std::move(topic), wp.lock(), qos_value); } - ep.suback(packet_id, res); + wp.lock()->suback(packet_id, res); return true; } ); ep.set_unsubscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector topics) { - std::cout << "unsubscribe received. packet_id: " << packet_id << std::endl; + std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl; for (auto const& topic : topics) { subs.erase(topic); } - ep.unsuback(packet_id); + wp.lock()->unsuback(packet_id); return true; } ); diff --git a/example/no_tls_ws_both.cpp b/example/no_tls_ws_both.cpp index 095209265..fa43c2472 100644 --- a/example/no_tls_ws_both.cpp +++ b/example/no_tls_ws_both.cpp @@ -167,36 +167,38 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { } ); s.set_accept_handler( - [&](con_t& ep) { + [&s, &connections, &subs](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + using packet_id_t = typename std::remove_reference_t::packet_id_t; std::cout << "[server] accept" << std::endl; - auto sp = ep.shared_from_this(); // For server close if ep is closed. auto g = MQTT_NS::shared_scope_guard( - [&] { + [&s] { std::cout << "[server] session end" << std::endl; s.close(); } ); - ep.start_session(std::make_tuple(std::move(sp), std::move(g))); + ep.start_session(std::make_tuple(std::move(spep), std::move(g))); // set connection (lower than MQTT) level handlers ep.set_close_handler( - [&] + [&connections, &subs, wp] (){ std::cout << "[server] closed." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); ep.set_error_handler( - [&] + [&connections, &subs, wp] (boost::system::error_code const& ec){ std::cout << "[server] error: " << ec.message() << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); // set MQTT level handlers ep.set_connect_handler( - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, @@ -209,43 +211,43 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl; std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl; std::cout << "[server] keep_alive : " << keep_alive << std::endl; - connections.insert(ep.shared_from_this()); - ep.connack(false, MQTT_NS::connect_return_code::accepted); + connections.insert(wp.lock()); + wp.lock()->connack(false, MQTT_NS::connect_return_code::accepted); return true; } ); ep.set_disconnect_handler( - [&] + [&connections, &subs, wp] (){ std::cout << "[server] disconnect received." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); ep.set_puback_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] puback received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrec_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrel_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubcomp_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl; return true; }); ep.set_publish_handler( - [&] + [&subs] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, @@ -274,7 +276,7 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { return true; }); ep.set_subscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector> entries) { std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl; @@ -285,21 +287,21 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { MQTT_NS::qos qos_value = std::get<1>(e).get_qos(); std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl; res.emplace_back(static_cast(qos_value)); - subs.emplace(std::move(topic), ep.shared_from_this(), qos_value); + subs.emplace(std::move(topic), wp.lock(), qos_value); } - ep.suback(packet_id, res); + wp.lock()->suback(packet_id, res); return true; } ); ep.set_unsubscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector topics) { std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl; for (auto const& topic : topics) { subs.erase(topic); } - ep.unsuback(packet_id); + wp.lock()->unsuback(packet_id); return true; } ); diff --git a/example/no_tls_ws_server.cpp b/example/no_tls_ws_server.cpp index 3d7762b16..8bd552811 100644 --- a/example/no_tls_ws_server.cpp +++ b/example/no_tls_ws_server.cpp @@ -79,29 +79,32 @@ int main(int argc, char** argv) { mi_sub_con subs; s.set_accept_handler( - [&](con_t& ep) { + [&connections, &subs](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + using packet_id_t = typename std::remove_reference_t::packet_id_t; std::cout << "accept" << std::endl; - auto sp = ep.shared_from_this(); - ep.start_session(sp); // keeping ep's lifetime as sp until session finished + + ep.start_session(std::move(spep)); // keeping ep's lifetime as sp until session finished // set connection (lower than MQTT) level handlers ep.set_close_handler( - [&] + [&connections, &subs, wp] (){ - std::cout << "closed." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] closed." << std::endl; + close_proc(connections, subs, wp.lock()); }); ep.set_error_handler( - [&] + [&connections, &subs, wp] (boost::system::error_code const& ec){ - std::cout << "error: " << ec.message() << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] error: " << ec.message() << std::endl; + close_proc(connections, subs, wp.lock()); }); // set MQTT level handlers ep.set_connect_handler( - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, @@ -109,62 +112,62 @@ int main(int argc, char** argv) { bool clean_session, std::uint16_t keep_alive) { using namespace MQTT_NS::literals; - std::cout << "client_id : " << client_id << std::endl; - std::cout << "username : " << (username ? username.value() : "none"_mb) << std::endl; - std::cout << "password : " << (password ? password.value() : "none"_mb) << std::endl; - std::cout << "clean_session: " << std::boolalpha << clean_session << std::endl; - std::cout << "keep_alive : " << keep_alive << std::endl; - connections.insert(ep.shared_from_this()); - ep.connack(false, MQTT_NS::connect_return_code::accepted); + std::cout << "[server] client_id : " << client_id << std::endl; + std::cout << "[server] username : " << (username ? username.value() : "none"_mb) << std::endl; + std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl; + std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl; + std::cout << "[server] keep_alive : " << keep_alive << std::endl; + connections.insert(wp.lock()); + wp.lock()->connack(false, MQTT_NS::connect_return_code::accepted); return true; } ); ep.set_disconnect_handler( - [&] + [&connections, &subs, wp] (){ - std::cout << "disconnect received." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] disconnect received." << std::endl; + close_proc(connections, subs, wp.lock()); }); ep.set_puback_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "puback received. packet_id: " << packet_id << std::endl; + std::cout << "[server] puback received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrec_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubrec received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrel_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubrel received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubcomp_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubcomp received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl; return true; }); ep.set_publish_handler( - [&] + [&subs] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, MQTT_NS::optional packet_id, MQTT_NS::buffer topic_name, MQTT_NS::buffer contents){ - std::cout << "publish received." + std::cout << "[server] publish received." << " dup: " << std::boolalpha << is_dup << " qos: " << qos_value << " retain: " << std::boolalpha << is_retain << std::endl; if (packet_id) - std::cout << "packet_id: " << *packet_id << std::endl; - std::cout << "topic_name: " << topic_name << std::endl; - std::cout << "contents: " << contents << std::endl; + std::cout << "[server] packet_id: " << *packet_id << std::endl; + std::cout << "[server] topic_name: " << topic_name << std::endl; + std::cout << "[server] contents: " << contents << std::endl; auto const& idx = subs.get(); auto r = idx.equal_range(topic_name); for (; r.first != r.second; ++r.first) { @@ -179,32 +182,32 @@ int main(int argc, char** argv) { return true; }); ep.set_subscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector> entries) { - std::cout << "subscribe received. packet_id: " << packet_id << std::endl; + std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl; std::vector res; res.reserve(entries.size()); for (auto const& e : entries) { MQTT_NS::buffer topic = std::get<0>(e); MQTT_NS::qos qos_value = std::get<1>(e).get_qos(); - std::cout << "topic: " << topic << " qos: " << qos_value << std::endl; + std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl; res.emplace_back(static_cast(qos_value)); - subs.emplace(std::move(topic), ep.shared_from_this(), qos_value); + subs.emplace(std::move(topic), wp.lock(), qos_value); } - ep.suback(packet_id, res); + wp.lock()->suback(packet_id, res); return true; } ); ep.set_unsubscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector topics) { - std::cout << "unsubscribe received. packet_id: " << packet_id << std::endl; + std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl; for (auto const& topic : topics) { subs.erase(topic); } - ep.unsuback(packet_id); + wp.lock()->unsuback(packet_id); return true; } ); diff --git a/example/tls_both.cpp b/example/tls_both.cpp index 67516175a..3e1ec9f66 100644 --- a/example/tls_both.cpp +++ b/example/tls_both.cpp @@ -169,36 +169,38 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { } ); s.set_accept_handler( - [&](con_t& ep) { + [&s, &connections, &subs](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + using packet_id_t = typename std::remove_reference_t::packet_id_t; - std::cout << "[server]accept" << std::endl; - auto sp = ep.shared_from_this(); + std::cout << "[server] accept" << std::endl; // For server close if ep is closed. auto g = MQTT_NS::shared_scope_guard( - [&] { + [&s] { std::cout << "[server] session end" << std::endl; s.close(); } ); - ep.start_session(std::make_tuple(std::move(sp), std::move(g))); + ep.start_session(std::make_tuple(std::move(spep), std::move(g))); // set connection (lower than MQTT) level handlers ep.set_close_handler( - [&] + [&connections, &subs, wp] (){ - std::cout << "[server]closed." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] closed." << std::endl; + close_proc(connections, subs, wp.lock()); }); ep.set_error_handler( - [&] + [&connections, &subs, wp] (boost::system::error_code const& ec){ - std::cout << "[server]error: " << ec.message() << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] error: " << ec.message() << std::endl; + close_proc(connections, subs, wp.lock()); }); // set MQTT level handlers ep.set_connect_handler( - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, @@ -206,62 +208,62 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { bool clean_session, std::uint16_t keep_alive) { using namespace MQTT_NS::literals; - std::cout << "[server]client_id : " << client_id << std::endl; - std::cout << "[server]username : " << (username ? username.value() : "none"_mb) << std::endl; - std::cout << "[server]password : " << (password ? password.value() : "none"_mb) << std::endl; - std::cout << "[server]clean_session: " << std::boolalpha << clean_session << std::endl; - std::cout << "[server]keep_alive : " << keep_alive << std::endl; - connections.insert(ep.shared_from_this()); - ep.connack(false, MQTT_NS::connect_return_code::accepted); + std::cout << "[server] client_id : " << client_id << std::endl; + std::cout << "[server] username : " << (username ? username.value() : "none"_mb) << std::endl; + std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl; + std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl; + std::cout << "[server] keep_alive : " << keep_alive << std::endl; + connections.insert(wp.lock()); + wp.lock()->connack(false, MQTT_NS::connect_return_code::accepted); return true; } ); ep.set_disconnect_handler( - [&] + [&connections, &subs, wp] (){ - std::cout << "[server]disconnect received." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] disconnect received." << std::endl; + close_proc(connections, subs, wp.lock()); }); ep.set_puback_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "[server]puback received. packet_id: " << packet_id << std::endl; + std::cout << "[server] puback received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrec_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "[server]pubrec received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrel_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "[server]pubrel received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubcomp_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "[server]pubcomp received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl; return true; }); ep.set_publish_handler( - [&] + [&subs] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, MQTT_NS::optional packet_id, MQTT_NS::buffer topic_name, MQTT_NS::buffer contents){ - std::cout << "[server]publish received." + std::cout << "[server] publish received." << " dup: " << std::boolalpha << is_dup << " qos: " << qos_value - << " retain: " << is_retain << std::endl; + << " retain: " << std::boolalpha << is_retain << std::endl; if (packet_id) - std::cout << "[server]packet_id: " << *packet_id << std::endl; - std::cout << "[server]topic_name: " << topic_name << std::endl; - std::cout << "[server]contents: " << contents << std::endl; + std::cout << "[server] packet_id: " << *packet_id << std::endl; + std::cout << "[server] topic_name: " << topic_name << std::endl; + std::cout << "[server] contents: " << contents << std::endl; auto const& idx = subs.get(); auto r = idx.equal_range(topic_name); for (; r.first != r.second; ++r.first) { @@ -276,7 +278,7 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { return true; }); ep.set_subscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector> entries) { std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl; @@ -285,23 +287,23 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { for (auto const& e : entries) { MQTT_NS::buffer topic = std::get<0>(e); MQTT_NS::qos qos_value = std::get<1>(e).get_qos(); - std::cout << "[server]topic: " << topic << " qos: " << qos_value << std::endl; + std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl; res.emplace_back(static_cast(qos_value)); - subs.emplace(std::move(topic), ep.shared_from_this(), qos_value); + subs.emplace(std::move(topic), wp.lock(), qos_value); } - ep.suback(packet_id, res); + wp.lock()->suback(packet_id, res); return true; } ); ep.set_unsubscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector topics) { std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl; for (auto const& topic : topics) { subs.erase(topic); } - ep.unsuback(packet_id); + wp.lock()->unsuback(packet_id); return true; } ); diff --git a/example/tls_server.cpp b/example/tls_server.cpp index 66b4e3b82..828baecb6 100644 --- a/example/tls_server.cpp +++ b/example/tls_server.cpp @@ -91,29 +91,32 @@ int main(int argc, char** argv) { mi_sub_con subs; s.set_accept_handler( - [&](con_t& ep) { + [&connections, &subs](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + using packet_id_t = typename std::remove_reference_t::packet_id_t; std::cout << "accept" << std::endl; - auto sp = ep.shared_from_this(); - ep.start_session(sp); // keeping ep's lifetime as sp until session finished + + ep.start_session(std::move(spep)); // keeping ep's lifetime as sp until session finished // set connection (lower than MQTT) level handlers ep.set_close_handler( - [&] + [&connections, &subs, wp] (){ - std::cout << "closed." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] closed." << std::endl; + close_proc(connections, subs, wp.lock()); }); ep.set_error_handler( - [&] + [&connections, &subs, wp] (boost::system::error_code const& ec){ - std::cout << "error: " << ec.message() << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] error: " << ec.message() << std::endl; + close_proc(connections, subs, wp.lock()); }); // set MQTT level handlers ep.set_connect_handler( - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, @@ -121,62 +124,62 @@ int main(int argc, char** argv) { bool clean_session, std::uint16_t keep_alive) { using namespace MQTT_NS::literals; - std::cout << "client_id : " << client_id << std::endl; - std::cout << "username : " << (username ? username.value() : "none"_mb) << std::endl; - std::cout << "password : " << (password ? password.value() : "none"_mb) << std::endl; - std::cout << "clean_session: " << std::boolalpha << clean_session << std::endl; - std::cout << "keep_alive : " << keep_alive << std::endl; - connections.insert(ep.shared_from_this()); - ep.connack(false, MQTT_NS::connect_return_code::accepted); + std::cout << "[server] client_id : " << client_id << std::endl; + std::cout << "[server] username : " << (username ? username.value() : "none"_mb) << std::endl; + std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl; + std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl; + std::cout << "[server] keep_alive : " << keep_alive << std::endl; + connections.insert(wp.lock()); + wp.lock()->connack(false, MQTT_NS::connect_return_code::accepted); return true; } ); ep.set_disconnect_handler( - [&] + [&connections, &subs, wp] (){ - std::cout << "disconnect received." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] disconnect received." << std::endl; + close_proc(connections, subs, wp.lock()); }); ep.set_puback_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "puback received. packet_id: " << packet_id << std::endl; + std::cout << "[server] puback received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrec_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubrec received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrel_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubrel received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubcomp_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubcomp received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl; return true; }); ep.set_publish_handler( - [&] + [&subs] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, MQTT_NS::optional packet_id, MQTT_NS::buffer topic_name, MQTT_NS::buffer contents){ - std::cout << "publish received." + std::cout << "[server] publish received." << " dup: " << std::boolalpha << is_dup << " qos: " << qos_value - << " retain: " << is_retain << std::endl; + << " retain: " << std::boolalpha << is_retain << std::endl; if (packet_id) - std::cout << "packet_id: " << *packet_id << std::endl; - std::cout << "topic_name: " << topic_name << std::endl; - std::cout << "contents: " << contents << std::endl; + std::cout << "[server] packet_id: " << *packet_id << std::endl; + std::cout << "[server] topic_name: " << topic_name << std::endl; + std::cout << "[server] contents: " << contents << std::endl; auto const& idx = subs.get(); auto r = idx.equal_range(topic_name); for (; r.first != r.second; ++r.first) { @@ -191,32 +194,32 @@ int main(int argc, char** argv) { return true; }); ep.set_subscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector> entries) { - std::cout << "subscribe received. packet_id: " << packet_id << std::endl; + std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl; std::vector res; res.reserve(entries.size()); for (auto const& e : entries) { MQTT_NS::buffer topic = std::get<0>(e); MQTT_NS::qos qos_value = std::get<1>(e).get_qos(); - std::cout << "topic: " << topic << " qos: " << qos_value << std::endl; + std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl; res.emplace_back(static_cast(qos_value)); - subs.emplace(std::move(topic), ep.shared_from_this(), qos_value); + subs.emplace(std::move(topic), wp.lock(), qos_value); } - ep.suback(packet_id, res); + wp.lock()->suback(packet_id, res); return true; } ); ep.set_unsubscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector topics) { - std::cout << "unsubscribe received. packet_id: " << packet_id << std::endl; + std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl; for (auto const& topic : topics) { subs.erase(topic); } - ep.unsuback(packet_id); + wp.lock()->unsuback(packet_id); return true; } ); diff --git a/example/tls_ws_both.cpp b/example/tls_ws_both.cpp index f5500737d..cbd6332fe 100644 --- a/example/tls_ws_both.cpp +++ b/example/tls_ws_both.cpp @@ -168,36 +168,38 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { } ); s.set_accept_handler( - [&](con_t& ep) { + [&s, &connections, &subs](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + using packet_id_t = typename std::remove_reference_t::packet_id_t; std::cout << "[server] accept" << std::endl; - auto sp = ep.shared_from_this(); // For server close if ep is closed. auto g = MQTT_NS::shared_scope_guard( - [&] { + [&s] { std::cout << "[server] session end" << std::endl; s.close(); } ); - ep.start_session(std::make_tuple(std::move(sp), std::move(g))); + ep.start_session(std::make_tuple(std::move(spep), std::move(g))); // set connection (lower than MQTT) level handlers ep.set_close_handler( - [&] + [&connections, &subs, wp] (){ std::cout << "[server] closed." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); ep.set_error_handler( - [&] + [&connections, &subs, wp] (boost::system::error_code const& ec){ std::cout << "[server] error: " << ec.message() << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); // set MQTT level handlers ep.set_connect_handler( - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, @@ -210,43 +212,43 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl; std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl; std::cout << "[server] keep_alive : " << keep_alive << std::endl; - connections.insert(ep.shared_from_this()); - ep.connack(false, MQTT_NS::connect_return_code::accepted); + connections.insert(wp.lock()); + wp.lock()->connack(false, MQTT_NS::connect_return_code::accepted); return true; } ); ep.set_disconnect_handler( - [&] + [&connections, &subs, wp] (){ std::cout << "[server] disconnect received." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); ep.set_puback_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] puback received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrec_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrel_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubcomp_handler( - [&] + [] (packet_id_t packet_id){ std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl; return true; }); ep.set_publish_handler( - [&] + [&subs] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, @@ -256,7 +258,7 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { std::cout << "[server] publish received." << " dup: " << std::boolalpha << is_dup << " qos: " << qos_value - << " retain: " << is_retain << std::endl; + << " retain: " << std::boolalpha << is_retain << std::endl; if (packet_id) std::cout << "[server] packet_id: " << *packet_id << std::endl; std::cout << "[server] topic_name: " << topic_name << std::endl; @@ -275,7 +277,7 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { return true; }); ep.set_subscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector> entries) { std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl; @@ -286,21 +288,21 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { MQTT_NS::qos qos_value = std::get<1>(e).get_qos(); std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl; res.emplace_back(static_cast(qos_value)); - subs.emplace(std::move(topic), ep.shared_from_this(), qos_value); + subs.emplace(std::move(topic), wp.lock(), qos_value); } - ep.suback(packet_id, res); + wp.lock()->suback(packet_id, res); return true; } ); ep.set_unsubscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector topics) { std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl; for (auto const& topic : topics) { subs.erase(topic); } - ep.unsuback(packet_id); + wp.lock()->unsuback(packet_id); return true; } ); diff --git a/example/tls_ws_server.cpp b/example/tls_ws_server.cpp index ed59e7c0e..b8087283a 100644 --- a/example/tls_ws_server.cpp +++ b/example/tls_ws_server.cpp @@ -91,29 +91,32 @@ int main(int argc, char** argv) { mi_sub_con subs; s.set_accept_handler( - [&](con_t& ep) { + [&connections, &subs](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + using packet_id_t = typename std::remove_reference_t::packet_id_t; std::cout << "accept" << std::endl; - auto sp = ep.shared_from_this(); - ep.start_session(sp); // keeping ep's lifetime as sp until session finished + + ep.start_session(std::move(spep)); // keeping ep's lifetime as sp until session finished // set connection (lower than MQTT) level handlers ep.set_close_handler( - [&] + [&connections, &subs, wp] (){ - std::cout << "closed." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] closed." << std::endl; + close_proc(connections, subs, wp.lock()); }); ep.set_error_handler( - [&] + [&connections, &subs, wp] (boost::system::error_code const& ec){ - std::cout << "error: " << ec.message() << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] error: " << ec.message() << std::endl; + close_proc(connections, subs, wp.lock()); }); // set MQTT level handlers ep.set_connect_handler( - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, @@ -121,62 +124,62 @@ int main(int argc, char** argv) { bool clean_session, std::uint16_t keep_alive) { using namespace MQTT_NS::literals; - std::cout << "client_id : " << client_id << std::endl; - std::cout << "username : " << (username ? username.value() : "none"_mb) << std::endl; - std::cout << "password : " << (password ? password.value() : "none"_mb) << std::endl; - std::cout << "clean_session: " << std::boolalpha << clean_session << std::endl; - std::cout << "keep_alive : " << keep_alive << std::endl; - connections.insert(ep.shared_from_this()); - ep.connack(false, MQTT_NS::connect_return_code::accepted); + std::cout << "[server] client_id : " << client_id << std::endl; + std::cout << "[server] username : " << (username ? username.value() : "none"_mb) << std::endl; + std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl; + std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl; + std::cout << "[server] keep_alive : " << keep_alive << std::endl; + connections.insert(wp.lock()); + wp.lock()->connack(false, MQTT_NS::connect_return_code::accepted); return true; } ); ep.set_disconnect_handler( - [&] + [&connections, &subs, wp] (){ - std::cout << "disconnect received." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] disconnect received." << std::endl; + close_proc(connections, subs, wp.lock()); }); ep.set_puback_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "puback received. packet_id: " << packet_id << std::endl; + std::cout << "[server] puback received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrec_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubrec received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubrel_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubrel received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl; return true; }); ep.set_pubcomp_handler( - [&] + [] (packet_id_t packet_id){ - std::cout << "pubcomp received. packet_id: " << packet_id << std::endl; + std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl; return true; }); ep.set_publish_handler( - [&] + [&subs] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, MQTT_NS::optional packet_id, MQTT_NS::buffer topic_name, MQTT_NS::buffer contents){ - std::cout << "publish received." + std::cout << "[server] publish received." << " dup: " << std::boolalpha << is_dup << " qos: " << qos_value - << " retain: " << is_retain << std::endl; + << " retain: " << std::boolalpha << is_retain << std::endl; if (packet_id) - std::cout << "packet_id: " << *packet_id << std::endl; - std::cout << "topic_name: " << topic_name << std::endl; - std::cout << "contents: " << contents << std::endl; + std::cout << "[server] packet_id: " << *packet_id << std::endl; + std::cout << "[server] topic_name: " << topic_name << std::endl; + std::cout << "[server] contents: " << contents << std::endl; auto const& idx = subs.get(); auto r = idx.equal_range(topic_name); for (; r.first != r.second; ++r.first) { @@ -191,32 +194,32 @@ int main(int argc, char** argv) { return true; }); ep.set_subscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector> entries) { - std::cout << "subscribe received. packet_id: " << packet_id << std::endl; + std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl; std::vector res; res.reserve(entries.size()); for (auto const& e : entries) { MQTT_NS::buffer topic = std::get<0>(e); MQTT_NS::qos qos_value = std::get<1>(e).get_qos(); - std::cout << "topic: " << topic << " qos: " << qos_value << std::endl; + std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl; res.emplace_back(static_cast(qos_value)); - subs.emplace(std::move(topic), ep.shared_from_this(), qos_value); + subs.emplace(std::move(topic), wp.lock(), qos_value); } - ep.suback(packet_id, res); + wp.lock()->suback(packet_id, res); return true; } ); ep.set_unsubscribe_handler( - [&] + [&subs, wp] (packet_id_t packet_id, std::vector topics) { - std::cout << "unsubscribe received. packet_id: " << packet_id << std::endl; + std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl; for (auto const& topic : topics) { subs.erase(topic); } - ep.unsuback(packet_id); + wp.lock()->unsuback(packet_id); return true; } ); diff --git a/example/v5_no_tls_both.cpp b/example/v5_no_tls_both.cpp index d5927ef9a..d38c75be7 100644 --- a/example/v5_no_tls_both.cpp +++ b/example/v5_no_tls_both.cpp @@ -188,36 +188,38 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { } ); s.set_accept_handler( // this handler doesn't depend on MQTT protocol version - [&](con_t& ep) { + [&s, &connections, &subs](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + using packet_id_t = typename std::remove_reference_t::packet_id_t; std::cout << "[server] accept" << std::endl; - auto sp = ep.shared_from_this(); // For server close if ep is closed. auto g = MQTT_NS::shared_scope_guard( - [&] { + [&s] { std::cout << "[server] session end" << std::endl; s.close(); } ); - ep.start_session(std::make_tuple(std::move(sp), std::move(g))); + ep.start_session(std::make_tuple(std::move(spep), std::move(g))); // set connection (lower than MQTT) level handlers - ep.set_close_handler( // this handler doesn't depend on MQTT protocol version - [&] + ep.set_close_handler( + [&connections, &subs, wp] (){ std::cout << "[server] closed." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); - ep.set_error_handler( // this handler doesn't depend on MQTT protocol version - [&] + ep.set_error_handler( + [&connections, &subs, wp] (boost::system::error_code const& ec){ std::cout << "[server] error: " << ec.message() << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); // set MQTT level handlers ep.set_v5_connect_handler( // use v5 handler - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional const& username, MQTT_NS::optional const& password, @@ -231,21 +233,21 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl; std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl; std::cout << "[server] keep_alive : " << keep_alive << std::endl; - connections.insert(ep.shared_from_this()); - ep.connack(false, MQTT_NS::v5::connect_reason_code::success); + connections.insert(wp.lock()); + wp.lock()->connack(false, MQTT_NS::v5::connect_reason_code::success); return true; } ); ep.set_v5_disconnect_handler( // use v5 handler - [&] + [&connections, &subs, wp] (MQTT_NS::v5::disconnect_reason_code reason_code, std::vector /*props*/) { std::cout << "[server] disconnect received." << " reason_code: " << reason_code << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); ep.set_v5_puback_handler( // use v5 handler - [&] + [] (packet_id_t packet_id, MQTT_NS::v5::puback_reason_code reason_code, std::vector /*props*/){ std::cout << "[server] puback received. packet_id: " << packet_id << @@ -253,7 +255,7 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { return true; }); ep.set_v5_pubrec_handler( // use v5 handler - [&] + [] (packet_id_t packet_id, MQTT_NS::v5::pubrec_reason_code reason_code, std::vector /*props*/){ std::cout << "[server] pubrec received. packet_id: " << packet_id << @@ -261,7 +263,7 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { return true; }); ep.set_v5_pubrel_handler( // use v5 handler - [&] + [] (packet_id_t packet_id, MQTT_NS::v5::pubrel_reason_code reason_code, std::vector /*props*/){ std::cout << "[server] pubrel received. packet_id: " << packet_id << @@ -269,7 +271,7 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { return true; }); ep.set_v5_pubcomp_handler( // use v5 handler - [&] + [] (packet_id_t packet_id, MQTT_NS::v5::pubcomp_reason_code reason_code, std::vector /*props*/){ std::cout << "[server] pubcomp received. packet_id: " << packet_id << @@ -277,7 +279,7 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { return true; }); ep.set_v5_publish_handler( // use v5 handler - [&] + [&subs] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, @@ -307,7 +309,7 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { return true; }); ep.set_v5_subscribe_handler( // use v5 handler - [&] + [&subs, wp] (packet_id_t packet_id, std::vector> entries, std::vector /*props*/) { @@ -319,14 +321,14 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { MQTT_NS::qos qos_value = std::get<1>(e).get_qos(); std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl; res.emplace_back(static_cast(qos_value)); - subs.emplace(std::move(topic), ep.shared_from_this(), qos_value); + subs.emplace(std::move(topic), wp.lock(), qos_value); } - ep.suback(packet_id, res); + wp.lock()->suback(packet_id, res); return true; } ); ep.set_v5_unsubscribe_handler( // use v5 handler - [&] + [&subs, wp] (packet_id_t packet_id, std::vector topics, std::vector /*props*/) { @@ -334,7 +336,7 @@ void server_proc(Server& s, std::set& connections, mi_sub_con& subs) { for (auto const& topic : topics) { subs.erase(topic); } - ep.unsuback(packet_id); + wp.lock()->unsuback(packet_id); return true; } ); diff --git a/example/v5_no_tls_prop.cpp b/example/v5_no_tls_prop.cpp index a48d2f6ba..8e95e216f 100644 --- a/example/v5_no_tls_prop.cpp +++ b/example/v5_no_tls_prop.cpp @@ -148,35 +148,38 @@ void server_proc(Server& s, std::set& connections) { } ); s.set_accept_handler( // this handler doesn't depend on MQTT protocol version - [&](con_t& ep) { + [&s, &connections](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + + using packet_id_t = typename std::remove_reference_t::packet_id_t; std::cout << "[server] accept" << std::endl; - auto sp = ep.shared_from_this(); // For server close if ep is closed. auto g = MQTT_NS::shared_scope_guard( - [&] { + [&s] { std::cout << "[server] session end" << std::endl; s.close(); } ); - ep.start_session(std::make_tuple(std::move(sp), std::move(g))); + ep.start_session(std::make_tuple(std::move(spep), std::move(g))); // set connection (lower than MQTT) level handlers ep.set_close_handler( // this handler doesn't depend on MQTT protocol version - [&] + [&connections, wp] (){ std::cout << "[server] closed." << std::endl; - connections.erase(ep.shared_from_this()); + connections.erase(wp.lock()); }); ep.set_error_handler( // this handler doesn't depend on MQTT protocol version - [&] + [&connections, wp] (boost::system::error_code const& ec){ std::cout << "[server] error: " << ec.message() << std::endl; - connections.erase(ep.shared_from_this()); + connections.erase(wp.lock()); }); // set MQTT level handlers ep.set_v5_connect_handler( // use v5 handler - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional const& username, MQTT_NS::optional const& password, @@ -229,7 +232,7 @@ void server_proc(Server& s, std::set& connections) { ); } - connections.insert(ep.shared_from_this()); + connections.insert(wp.lock()); std::vector connack_ps { MQTT_NS::v5::property::session_expiry_interval(0), @@ -251,17 +254,17 @@ void server_proc(Server& s, std::set& connections) { MQTT_NS::v5::property::authentication_method("test authentication method"_mb), MQTT_NS::v5::property::authentication_data("test authentication data"_mb) }; - ep.connack(false, MQTT_NS::v5::connect_reason_code::success, std::move(connack_ps)); + wp.lock()->connack(false, MQTT_NS::v5::connect_reason_code::success, std::move(connack_ps)); return true; } ); ep.set_v5_disconnect_handler( // use v5 handler - [&] + [&connections, wp] (MQTT_NS::v5::disconnect_reason_code reason_code, std::vector /*props*/) { std::cout << "[server] disconnect received." << " reason_code: " << reason_code << std::endl; - connections.erase(ep.shared_from_this()); + connections.erase(wp.lock()); }); } ); diff --git a/example/v5_no_tls_server.cpp b/example/v5_no_tls_server.cpp index 0ff95c86f..d5b022fe4 100644 --- a/example/v5_no_tls_server.cpp +++ b/example/v5_no_tls_server.cpp @@ -79,29 +79,31 @@ int main(int argc, char** argv) { mi_sub_con subs; s.set_accept_handler( // this handler doesn't depend on MQTT protocol version - [&](con_t& ep) { + [&connections, &subs](con_sp_t spep) { + auto& ep = *spep; + std::weak_ptr wp(spep); + using packet_id_t = typename std::remove_reference_t::packet_id_t; - std::cout << "[server]accept" << std::endl; - auto sp = ep.shared_from_this(); - ep.start_session(sp); // keeping ep's lifetime as sp until session finished + std::cout << "[server] accept" << std::endl; + ep.start_session(std::move(spep)); // set connection (lower than MQTT) level handlers - ep.set_close_handler( // this handler doesn't depend on MQTT protocol version - [&] + ep.set_close_handler( + [&connections, &subs, wp] (){ - std::cout << "[server]closed." << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] closed." << std::endl; + close_proc(connections, subs, wp.lock()); }); - ep.set_error_handler( // this handler doesn't depend on MQTT protocol version - [&] + ep.set_error_handler( + [&connections, &subs, wp] (boost::system::error_code const& ec){ - std::cout << "[server]error: " << ec.message() << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + std::cout << "[server] error: " << ec.message() << std::endl; + close_proc(connections, subs, wp.lock()); }); // set MQTT level handlers ep.set_v5_connect_handler( // use v5 handler - [&] + [&connections, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional const& username, MQTT_NS::optional const& password, @@ -110,58 +112,58 @@ int main(int argc, char** argv) { std::uint16_t keep_alive, std::vector /*props*/){ using namespace MQTT_NS::literals; - std::cout << "[server]client_id : " << client_id << std::endl; - std::cout << "[server]username : " << (username ? username.value() : "none"_mb) << std::endl; - std::cout << "[server]password : " << (password ? password.value() : "none"_mb) << std::endl; - std::cout << "[server]clean_session: " << std::boolalpha << clean_session << std::endl; - std::cout << "[server]keep_alive : " << keep_alive << std::endl; - connections.insert(ep.shared_from_this()); - ep.connack(false, MQTT_NS::v5::connect_reason_code::success); + std::cout << "[server] client_id : " << client_id << std::endl; + std::cout << "[server] username : " << (username ? username.value() : "none"_mb) << std::endl; + std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl; + std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl; + std::cout << "[server] keep_alive : " << keep_alive << std::endl; + connections.insert(wp.lock()); + wp.lock()->connack(false, MQTT_NS::v5::connect_reason_code::success); return true; } ); ep.set_v5_disconnect_handler( // use v5 handler - [&] + [&connections, &subs, wp] (MQTT_NS::v5::disconnect_reason_code reason_code, std::vector /*props*/) { std::cout << - "[server]disconnect received." << + "[server] disconnect received." << " reason_code: " << reason_code << std::endl; - close_proc(connections, subs, ep.shared_from_this()); + close_proc(connections, subs, wp.lock()); }); ep.set_v5_puback_handler( // use v5 handler - [&] + [] (packet_id_t packet_id, MQTT_NS::v5::puback_reason_code reason_code, std::vector /*props*/){ std::cout << - "[server]puback received. packet_id: " << packet_id << + "[server] puback received. packet_id: " << packet_id << " reason_code: " << reason_code << std::endl; return true; }); ep.set_v5_pubrec_handler( // use v5 handler - [&] + [] (packet_id_t packet_id, MQTT_NS::v5::pubrec_reason_code reason_code, std::vector /*props*/){ std::cout << - "[server]pubrec received. packet_id: " << packet_id << + "[server] pubrec received. packet_id: " << packet_id << " reason_code: " << reason_code << std::endl; return true; }); ep.set_v5_pubrel_handler( // use v5 handler - [&] + [] (packet_id_t packet_id, MQTT_NS::v5::pubrel_reason_code reason_code, std::vector /*props*/){ std::cout << - "[server]pubrel received. packet_id: " << packet_id << + "[server] pubrel received. packet_id: " << packet_id << " reason_code: " << reason_code << std::endl; return true; }); ep.set_v5_pubcomp_handler( // use v5 handler - [&] + [] (packet_id_t packet_id, MQTT_NS::v5::pubcomp_reason_code reason_code, std::vector /*props*/){ std::cout << - "[server]pubcomp received. packet_id: " << packet_id << + "[server] pubcomp received. packet_id: " << packet_id << " reason_code: " << reason_code << std::endl; return true; }); ep.set_v5_publish_handler( // use v5 handler - [&] + [&subs] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, @@ -169,14 +171,14 @@ int main(int argc, char** argv) { MQTT_NS::buffer topic_name, MQTT_NS::buffer contents, std::vector /*props*/){ - std::cout << "[server]publish received." + std::cout << "[server] publish received." << " dup: " << std::boolalpha << is_dup << " qos: " << qos_value - << " retain: " << std::boolalpha << is_retain << std::endl; + << " retain: " << std::boolalpha << is_retain << std::endl; if (packet_id) - std::cout << "[server]packet_id: " << *packet_id << std::endl; - std::cout << "[server]topic_name: " << topic_name << std::endl; - std::cout << "[server]contents: " << contents << std::endl; + std::cout << "[server] packet_id: " << *packet_id << std::endl; + std::cout << "[server] topic_name: " << topic_name << std::endl; + std::cout << "[server] contents: " << contents << std::endl; auto const& idx = subs.get(); auto r = idx.equal_range(topic_name); for (; r.first != r.second; ++r.first) { @@ -191,34 +193,34 @@ int main(int argc, char** argv) { return true; }); ep.set_v5_subscribe_handler( // use v5 handler - [&] + [&subs, wp] (packet_id_t packet_id, std::vector> entries, std::vector /*props*/) { - std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl; + std::cout << "[server] subscribe received. packet_id: " << packet_id << std::endl; std::vector res; res.reserve(entries.size()); for (auto const& e : entries) { MQTT_NS::buffer topic = std::get<0>(e); MQTT_NS::qos qos_value = std::get<1>(e).get_qos(); - std::cout << "[server]topic: " << topic << " qos: " << qos_value << std::endl; + std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl; res.emplace_back(static_cast(qos_value)); - subs.emplace(std::move(topic), ep.shared_from_this(), qos_value); + subs.emplace(std::move(topic), wp.lock(), qos_value); } - ep.suback(packet_id, res); + wp.lock()->suback(packet_id, res); return true; } ); ep.set_v5_unsubscribe_handler( // use v5 handler - [&] + [&subs, wp] (packet_id_t packet_id, std::vector topics, std::vector /*props*/) { - std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl; + std::cout << "[server] unsubscribe received. packet_id: " << packet_id << std::endl; for (auto const& topic : topics) { subs.erase(topic); } - ep.unsuback(packet_id); + wp.lock()->unsuback(packet_id); return true; } ); diff --git a/include/mqtt/server.hpp b/include/mqtt/server.hpp index ac110e012..47a419520 100644 --- a/include/mqtt/server.hpp +++ b/include/mqtt/server.hpp @@ -47,7 +47,7 @@ class server { * @brief Accept handler * @param ep endpoint of the connecting client */ - using accept_handler = std::function; + using accept_handler = std::function ep)>; /** * @brief Error handler @@ -154,7 +154,7 @@ class server { return; } auto sp = std::make_shared(force_move(socket), version_); - if (h_accept_) h_accept_(*sp); + if (h_accept_) h_accept_(force_move(sp)); do_accept(); } ); @@ -189,7 +189,7 @@ class server_tls { * @brief Accept handler * @param ep endpoint of the connecting client */ - using accept_handler = std::function; + using accept_handler = std::function ep)>; /** * @brief Error handler @@ -336,7 +336,7 @@ class server_tls { return; } auto sp = std::make_shared(force_move(socket), version_); - if (h_accept_) h_accept_(*sp); + if (h_accept_) h_accept_(force_move(sp)); } ); do_accept(); @@ -392,7 +392,7 @@ class server_ws { * @brief Accept handler * @param ep endpoint of the connecting client */ - using accept_handler = std::function; + using accept_handler = std::function ep)>; /** * @brief Error handler @@ -561,7 +561,7 @@ class server_ws { return; } auto sp = std::make_shared(force_move(socket), version_); - if (h_accept_) h_accept_(*sp); + if (h_accept_) h_accept_(force_move(sp)); } ); } @@ -602,7 +602,7 @@ class server_tls_ws { * @brief Accept handler * @param ep endpoint of the connecting client */ - using accept_handler = std::function; + using accept_handler = std::function ep)>; /** * @brief Error handler @@ -784,7 +784,7 @@ class server_tls_ws { // a static assertion that socket is a const object when // TLS is enabled, and WS is enabled, with Boost 1.70, and gcc 8.3.0 auto sp = std::make_shared(socket, version_); - if (h_accept_) h_accept_(*sp); + if (h_accept_) h_accept_(force_move(sp)); } ); } diff --git a/test/test_broker.hpp b/test/test_broker.hpp index 1b7216061..91edb22df 100644 --- a/test/test_broker.hpp +++ b/test/test_broker.hpp @@ -72,27 +72,28 @@ class test_broker { * @param ep - The MQTT_NS::server (of whichever kind) to accept a connection on. */ template - void handle_accept(Endpoint& ep) { - auto sp = ep.shared_from_this(); + void handle_accept(std::shared_ptr spep) { + std::weak_ptr wp(spep); + auto& ep = *spep; ep.socket().lowest_layer().set_option(as::ip::tcp::no_delay(true)); ep.set_auto_pub_response(false); - ep.start_session(sp); // keeping ep's lifetime as sp until session finished + ep.start_session(spep); // keeping ep's lifetime as sp until session finished // set connection (lower than MQTT) level handlers ep.set_close_handler( - [&] + [this, wp] (){ - close_proc(ep, true); + close_proc(*wp.lock(), true); }); ep.set_error_handler( - [&] + [this, wp] (boost::system::error_code const& /*ec*/){ - close_proc(ep, true); + close_proc(*wp.lock(), true); }); // set MQTT level handlers ep.set_connect_handler( - [&] + [this, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, @@ -101,7 +102,7 @@ class test_broker { std::uint16_t keep_alive) { return connect_handler( - ep, + *wp.lock(), std::move(client_id), std::move(username), std::move(password), @@ -113,7 +114,7 @@ class test_broker { } ); ep.set_v5_connect_handler( - [&] + [this, wp] (MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, @@ -123,7 +124,7 @@ class test_broker { std::vector props) { return connect_handler( - ep, + *wp.lock(), std::move(client_id), std::move(username), std::move(password), @@ -135,74 +136,74 @@ class test_broker { } ); ep.set_disconnect_handler( - [&] + [this, wp] (){ return - disconnect_handler(ep); + disconnect_handler(*wp.lock()); } ); ep.set_v5_disconnect_handler( - [&] + [this, wp] (MQTT_NS::v5::disconnect_reason_code /*reason_code*/, std::vector props) { if (h_disconnect_props_) h_disconnect_props_(std::move(props)); return - disconnect_handler(ep); + disconnect_handler(*wp.lock()); } ); ep.set_puback_handler( - [&] + [] (typename Endpoint::packet_id_t /*packet_id*/){ return true; }); ep.set_v5_puback_handler( - [&] + [] (typename Endpoint::packet_id_t /*packet_id*/, MQTT_NS::v5::puback_reason_code /*reason_code*/, std::vector /*props*/){ return true; }); ep.set_pubrec_handler( - [&] + [wp] (typename Endpoint::packet_id_t packet_id){ - ep.pubrel(packet_id); + wp.lock()->pubrel(packet_id); return true; }); ep.set_v5_pubrec_handler( - [&] + [this, wp] (typename Endpoint::packet_id_t packet_id, MQTT_NS::v5::pubrec_reason_code /*reason_code*/, std::vector /*props*/){ - ep.pubrel(packet_id, MQTT_NS::v5::pubrel_reason_code::success, pubrel_props_); + wp.lock()->pubrel(packet_id, MQTT_NS::v5::pubrel_reason_code::success, pubrel_props_); return true; }); ep.set_pubrel_handler( - [&] + [wp] (typename Endpoint::packet_id_t packet_id){ - ep.pubcomp(packet_id); + wp.lock()->pubcomp(packet_id); return true; }); ep.set_v5_pubrel_handler( - [&] + [this, wp] (typename Endpoint::packet_id_t packet_id, MQTT_NS::v5::pubrel_reason_code /*reason_code*/, std::vector /*props*/){ - ep.pubcomp(packet_id, MQTT_NS::v5::pubcomp_reason_code::success, pubcomp_props_); + wp.lock()->pubcomp(packet_id, MQTT_NS::v5::pubcomp_reason_code::success, pubcomp_props_); return true; }); ep.set_pubcomp_handler( - [&] + [] (typename Endpoint::packet_id_t /*packet_id*/){ return true; }); ep.set_v5_pubcomp_handler( - [&] + [] (typename Endpoint::packet_id_t /*packet_id*/, MQTT_NS::v5::pubcomp_reason_code /*reason_code*/, std::vector /*props*/){ return true; }); ep.set_publish_handler( - [&] + [this, wp] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, @@ -210,7 +211,7 @@ class test_broker { MQTT_NS::buffer topic_name, MQTT_NS::buffer contents){ return publish_handler( - ep, + *wp.lock(), is_dup, qos_value, is_retain, @@ -221,7 +222,7 @@ class test_broker { ); }); ep.set_v5_publish_handler( - [&] + [this, wp] (bool is_dup, MQTT_NS::qos qos_value, bool is_retain, @@ -232,10 +233,10 @@ class test_broker { ) { if (h_publish_props_) h_publish_props_(props); return publish_handler( - ep, - is_dup, - qos_value, - is_retain, + *wp.lock(), + is_dup, + qos_value, + is_retain, packet_id, std::move(topic_name), std::move(contents), @@ -243,11 +244,11 @@ class test_broker { ); }); ep.set_subscribe_handler( - [&] + [this, wp] (typename Endpoint::packet_id_t packet_id, std::vector> entries) { return subscribe_handler( - ep, + *wp.lock(), packet_id, std::move(entries), {} @@ -255,13 +256,13 @@ class test_broker { } ); ep.set_v5_subscribe_handler( - [&] + [this, wp] (typename Endpoint::packet_id_t packet_id, std::vector> entries, std::vector props ) { return subscribe_handler( - ep, + *wp.lock(), packet_id, std::move(entries), std::move(props) @@ -269,11 +270,11 @@ class test_broker { } ); ep.set_unsubscribe_handler( - [&] + [this, wp] (typename Endpoint::packet_id_t packet_id, std::vector topics) { return unsubscribe_handler( - ep, + *wp.lock(), packet_id, std::move(topics), {} @@ -281,13 +282,13 @@ class test_broker { } ); ep.set_v5_unsubscribe_handler( - [&] + [this, wp] (typename Endpoint::packet_id_t packet_id, std::vector topics, std::vector props ) { return unsubscribe_handler( - ep, + *wp.lock(), packet_id, std::move(topics), std::move(props) @@ -295,13 +296,13 @@ class test_broker { } ); ep.set_pingreq_handler( - [&] { - ep.pingresp(); + [wp] { + wp.lock()->pingresp(); return true; } ); ep.set_v5_auth_handler( - [&] + [this] (MQTT_NS::v5::auth_reason_code /*reason_code*/, std::vector props ) { diff --git a/test/test_server_no_tls.hpp b/test/test_server_no_tls.hpp index 00cd5fa07..d2513e07b 100644 --- a/test/test_server_no_tls.hpp +++ b/test/test_server_no_tls.hpp @@ -34,8 +34,8 @@ class test_server_no_tls { ); server_.set_accept_handler( - [&](MQTT_NS::server<>::endpoint_t& ep) { - b_.handle_accept(ep); + [&](std::shared_ptr::endpoint_t> spep) { + b_.handle_accept(MQTT_NS::force_move(spep)); } ); diff --git a/test/test_server_no_tls_ws.hpp b/test/test_server_no_tls_ws.hpp index 7ca1a43e1..65394f5ca 100644 --- a/test/test_server_no_tls_ws.hpp +++ b/test/test_server_no_tls_ws.hpp @@ -34,8 +34,8 @@ class test_server_no_tls_ws { ); server_.set_accept_handler( - [&](MQTT_NS::server_ws<>::endpoint_t& ep) { - b_.handle_accept(ep); + [&](std::shared_ptr::endpoint_t> spep) { + b_.handle_accept(MQTT_NS::force_move(spep)); } ); diff --git a/test/test_server_tls.hpp b/test/test_server_tls.hpp index df64e284e..4032af77f 100644 --- a/test/test_server_tls.hpp +++ b/test/test_server_tls.hpp @@ -43,8 +43,8 @@ class test_server_tls : ctx_init { ); server_.set_accept_handler( - [&](MQTT_NS::server_tls<>::endpoint_t& ep) { - b_.handle_accept(ep); + [&](std::shared_ptr::endpoint_t> spep) { + b_.handle_accept(MQTT_NS::force_move(spep)); } ); diff --git a/test/test_server_tls_ws.hpp b/test/test_server_tls_ws.hpp index f11076655..9b7da20b5 100644 --- a/test/test_server_tls_ws.hpp +++ b/test/test_server_tls_ws.hpp @@ -44,8 +44,8 @@ class test_server_tls_ws : ctx_init { ); server_.set_accept_handler( - [&](MQTT_NS::server_tls_ws<>::endpoint_t& ep) { - b_.handle_accept(ep); + [&](std::shared_ptr::endpoint_t> spep) { + b_.handle_accept(MQTT_NS::force_move(spep)); } ); diff --git a/test/underlying_timeout.cpp b/test/underlying_timeout.cpp index 80694ae77..beb5a1fd1 100644 --- a/test/underlying_timeout.cpp +++ b/test/underlying_timeout.cpp @@ -30,7 +30,7 @@ BOOST_AUTO_TEST_CASE( connect_ws_upg ) { ioc); server.set_accept_handler( - [&](MQTT_NS::server_ws<>::endpoint_t& /*ep*/) { + [&](std::shared_ptr::endpoint_t> /*spep*/) { BOOST_TEST(false); } ); @@ -103,7 +103,7 @@ BOOST_AUTO_TEST_CASE( connect_tls_ws_ashs ) { ioc); server.set_accept_handler( - [&](MQTT_NS::server_tls_ws<>::endpoint_t& /*ep*/) { + [&](std::shared_ptr::endpoint_t> /*spep*/) { BOOST_TEST(false); } ); @@ -177,7 +177,7 @@ BOOST_AUTO_TEST_CASE( connect_tls_ws_upg ) { ioc); server.set_accept_handler( - [&](MQTT_NS::server_tls_ws<>::endpoint_t& /*ep*/) { + [&](std::shared_ptr::endpoint_t> /*spep*/) { BOOST_TEST(false); } ); @@ -268,7 +268,7 @@ BOOST_AUTO_TEST_CASE( connect_tls_ashs ) { ioc); server.set_accept_handler( - [&](MQTT_NS::server_tls<>::endpoint_t& /*ep*/) { + [&](std::shared_ptr::endpoint_t> /*spep*/) { BOOST_TEST(false); } );