Skip to content

Commit

Permalink
Updated the parameter of set_accept_handler() from endpoint_t to std:…
Browse files Browse the repository at this point in the history
…:shared_ptr<endpoint_t>.

For `set_*_handler()`, capture `std::weak_ptr<endpoint_t>` at
test_broker and examples.

I think that `weak_ptr` is redundant. Reference is good enough because
`shared_ptr` lifetime is kept by `start_session(life_keeper)`.

However, `wp.lock()` is easy to detect if the lifetime is over unexpectedly.
  • Loading branch information
redboltz committed Sep 15, 2019
1 parent 10830cd commit 8805cdd
Show file tree
Hide file tree
Showing 18 changed files with 459 additions and 431 deletions.
58 changes: 30 additions & 28 deletions example/no_tls_both.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,36 +167,38 @@ void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
}
);
s.set_accept_handler(
[&](con_t& ep) {
[&s, &connections, &subs](con_sp_t spep) {
auto& ep = *spep;
std::weak_ptr<con_t> wp(spep);

using packet_id_t = typename std::remove_reference_t<decltype(ep)>::packet_id_t;
std::cout << "[server] accept" << std::endl;
auto sp = ep.shared_from_this();
// For server close if ep is closed.
auto g = MQTT_NS::shared_scope_guard(
[&] {
[&s] {
std::cout << "[server] session end" << std::endl;
s.close();
}
);
ep.start_session(std::make_tuple(std::move(sp), std::move(g)));
ep.start_session(std::make_tuple(std::move(spep), std::move(g)));

// set connection (lower than MQTT) level handlers
ep.set_close_handler(
[&]
[&connections, &subs, wp]
(){
std::cout << "[server] closed." << std::endl;
close_proc(connections, subs, ep.shared_from_this());
close_proc(connections, subs, wp.lock());
});
ep.set_error_handler(
[&]
[&connections, &subs, wp]
(boost::system::error_code const& ec){
std::cout << "[server] error: " << ec.message() << std::endl;
close_proc(connections, subs, ep.shared_from_this());
close_proc(connections, subs, wp.lock());
});

// set MQTT level handlers
ep.set_connect_handler(
[&]
[&connections, wp]
(MQTT_NS::buffer client_id,
MQTT_NS::optional<MQTT_NS::buffer> username,
MQTT_NS::optional<MQTT_NS::buffer> password,
Expand All @@ -209,53 +211,53 @@ void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl;
std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl;
std::cout << "[server] keep_alive : " << keep_alive << std::endl;
connections.insert(ep.shared_from_this());
ep.connack(false, MQTT_NS::connect_return_code::accepted);
connections.insert(wp.lock());
wp.lock()->connack(false, MQTT_NS::connect_return_code::accepted);
return true;
}
);
ep.set_disconnect_handler(
[&]
[&connections, &subs, wp]
(){
std::cout << "[server] disconnect received." << std::endl;
close_proc(connections, subs, ep.shared_from_this());
close_proc(connections, subs, wp.lock());
});
ep.set_puback_handler(
[&]
[]
(packet_id_t packet_id){
std::cout << "[server] puback received. packet_id: " << packet_id << std::endl;
return true;
});
ep.set_pubrec_handler(
[&]
[]
(packet_id_t packet_id){
std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl;
return true;
});
ep.set_pubrel_handler(
[&]
[]
(packet_id_t packet_id){
std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl;
return true;
});
ep.set_pubcomp_handler(
[&]
[]
(packet_id_t packet_id){
std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl;
return true;
});
ep.set_publish_handler(
[&]
(bool dup,
[&subs]
(bool is_dup,
MQTT_NS::qos qos_value,
bool retain,
bool is_retain,
MQTT_NS::optional<packet_id_t> packet_id,
MQTT_NS::buffer topic_name,
MQTT_NS::buffer contents){
std::cout << "[server] publish received."
<< " dup: " << std::boolalpha << dup
<< " dup: " << std::boolalpha << is_dup
<< " qos: " << qos_value
<< " retain: " << std::boolalpha << retain << std::endl;
<< " retain: " << std::boolalpha << is_retain << std::endl;
if (packet_id)
std::cout << "[server] packet_id: " << *packet_id << std::endl;
std::cout << "[server] topic_name: " << topic_name << std::endl;
Expand All @@ -268,13 +270,13 @@ void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
boost::asio::buffer(contents),
std::make_pair(topic_name, contents),
std::min(r.first->qos_value, qos_value),
retain
is_retain
);
}
return true;
});
ep.set_subscribe_handler(
[&]
[&subs, wp]
(packet_id_t packet_id,
std::vector<std::tuple<MQTT_NS::buffer, MQTT_NS::subscribe_options>> entries) {
std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl;
Expand All @@ -285,21 +287,21 @@ void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
MQTT_NS::qos qos_value = std::get<1>(e).get_qos();
std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl;
res.emplace_back(static_cast<MQTT_NS::suback_reason_code>(qos_value));
subs.emplace(std::move(topic), ep.shared_from_this(), qos_value);
subs.emplace(std::move(topic), wp.lock(), qos_value);
}
ep.suback(packet_id, res);
wp.lock()->suback(packet_id, res);
return true;
}
);
ep.set_unsubscribe_handler(
[&]
[&subs, wp]
(packet_id_t packet_id,
std::vector<MQTT_NS::buffer> topics) {
std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl;
for (auto const& topic : topics) {
subs.erase(topic);
}
ep.unsuback(packet_id);
wp.lock()->unsuback(packet_id);
return true;
}
);
Expand Down
85 changes: 44 additions & 41 deletions example/no_tls_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,92 +79,95 @@ int main(int argc, char** argv) {
mi_sub_con subs;

s.set_accept_handler(
[&](con_t& ep) {
[&connections, &subs](con_sp_t spep) {
auto& ep = *spep;
std::weak_ptr<con_t> wp(spep);

using packet_id_t = typename std::remove_reference_t<decltype(ep)>::packet_id_t;
std::cout << "accept" << std::endl;
auto sp = ep.shared_from_this();
ep.start_session(sp); // keeping ep's lifetime as sp until session finished

ep.start_session(std::move(spep)); // keeping ep's lifetime as sp until session finished

// set connection (lower than MQTT) level handlers
ep.set_close_handler(
[&]
[&connections, &subs, wp]
(){
std::cout << "closed." << std::endl;
close_proc(connections, subs, ep.shared_from_this());
std::cout << "[server] closed." << std::endl;
close_proc(connections, subs, wp.lock());
});
ep.set_error_handler(
[&]
[&connections, &subs, wp]
(boost::system::error_code const& ec){
std::cout << "error: " << ec.message() << std::endl;
close_proc(connections, subs, ep.shared_from_this());
std::cout << "[server] error: " << ec.message() << std::endl;
close_proc(connections, subs, wp.lock());
});

// set MQTT level handlers
ep.set_connect_handler(
[&]
[&connections, wp]
(MQTT_NS::buffer client_id,
MQTT_NS::optional<MQTT_NS::buffer> username,
MQTT_NS::optional<MQTT_NS::buffer> password,
MQTT_NS::optional<MQTT_NS::will>,
bool clean_session,
std::uint16_t keep_alive) {
using namespace MQTT_NS::literals;
std::cout << "client_id : " << client_id << std::endl;
std::cout << "username : " << (username ? username.value() : "none"_mb) << std::endl;
std::cout << "password : " << (password ? password.value() : "none"_mb) << std::endl;
std::cout << "clean_session: " << std::boolalpha << clean_session << std::endl;
std::cout << "keep_alive : " << keep_alive << std::endl;
connections.insert(ep.shared_from_this());
ep.connack(false, MQTT_NS::connect_return_code::accepted);
std::cout << "[server] client_id : " << client_id << std::endl;
std::cout << "[server] username : " << (username ? username.value() : "none"_mb) << std::endl;
std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl;
std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl;
std::cout << "[server] keep_alive : " << keep_alive << std::endl;
connections.insert(wp.lock());
wp.lock()->connack(false, MQTT_NS::connect_return_code::accepted);
return true;
}
);
ep.set_disconnect_handler(
[&]
[&connections, &subs, wp]
(){
std::cout << "disconnect received." << std::endl;
close_proc(connections, subs, ep.shared_from_this());
std::cout << "[server] disconnect received." << std::endl;
close_proc(connections, subs, wp.lock());
});
ep.set_puback_handler(
[&]
[]
(packet_id_t packet_id){
std::cout << "puback received. packet_id: " << packet_id << std::endl;
std::cout << "[server] puback received. packet_id: " << packet_id << std::endl;
return true;
});
ep.set_pubrec_handler(
[&]
[]
(packet_id_t packet_id){
std::cout << "pubrec received. packet_id: " << packet_id << std::endl;
std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl;
return true;
});
ep.set_pubrel_handler(
[&]
[]
(packet_id_t packet_id){
std::cout << "pubrel received. packet_id: " << packet_id << std::endl;
std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl;
return true;
});
ep.set_pubcomp_handler(
[&]
[]
(packet_id_t packet_id){
std::cout << "pubcomp received. packet_id: " << packet_id << std::endl;
std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl;
return true;
});
ep.set_publish_handler(
[&]
[&subs]
(bool is_dup,
MQTT_NS::qos qos_value,
bool is_retain,
MQTT_NS::optional<packet_id_t> packet_id,
MQTT_NS::buffer topic_name,
MQTT_NS::buffer contents){
std::cout << "publish received."
std::cout << "[server] publish received."
<< " dup: " << std::boolalpha << is_dup
<< " qos: " << qos_value
<< " retain: " << std::boolalpha << is_retain << std::endl;
if (packet_id)
std::cout << "packet_id: " << *packet_id << std::endl;
std::cout << "topic_name: " << topic_name << std::endl;
std::cout << "contents: " << contents << std::endl;
std::cout << "[server] packet_id: " << *packet_id << std::endl;
std::cout << "[server] topic_name: " << topic_name << std::endl;
std::cout << "[server] contents: " << contents << std::endl;
auto const& idx = subs.get<tag_topic>();
auto r = idx.equal_range(topic_name);
for (; r.first != r.second; ++r.first) {
Expand All @@ -179,32 +182,32 @@ int main(int argc, char** argv) {
return true;
});
ep.set_subscribe_handler(
[&]
[&subs, wp]
(packet_id_t packet_id,
std::vector<std::tuple<MQTT_NS::buffer, MQTT_NS::subscribe_options>> entries) {
std::cout << "subscribe received. packet_id: " << packet_id << std::endl;
std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl;
std::vector<MQTT_NS::suback_reason_code> res;
res.reserve(entries.size());
for (auto const& e : entries) {
MQTT_NS::buffer topic = std::get<0>(e);
MQTT_NS::qos qos_value = std::get<1>(e).get_qos();
std::cout << "topic: " << topic << " qos: " << qos_value << std::endl;
std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl;
res.emplace_back(static_cast<MQTT_NS::suback_reason_code>(qos_value));
subs.emplace(std::move(topic), ep.shared_from_this(), qos_value);
subs.emplace(std::move(topic), wp.lock(), qos_value);
}
ep.suback(packet_id, res);
wp.lock()->suback(packet_id, res);
return true;
}
);
ep.set_unsubscribe_handler(
[&]
[&subs, wp]
(packet_id_t packet_id,
std::vector<MQTT_NS::buffer> topics) {
std::cout << "unsubscribe received. packet_id: " << packet_id << std::endl;
std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl;
for (auto const& topic : topics) {
subs.erase(topic);
}
ep.unsuback(packet_id);
wp.lock()->unsuback(packet_id);
return true;
}
);
Expand Down
Loading

0 comments on commit 8805cdd

Please sign in to comment.