Skip to content

Commit

Permalink
Liveliness token drop/redeclaration on connection drop/restore (#857)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc authored Jan 22, 2025
1 parent d744178 commit d085699
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 15 deletions.
4 changes: 4 additions & 0 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
extern "C" {
#endif

/*------------- Declaration Helpers --------------*/
z_result_t _z_send_declare(_z_session_t *zn, const _z_network_message_t *n_msg);
z_result_t _z_send_undeclare(_z_session_t *zn, const _z_network_message_t *n_msg);

/*------------------ Discovery ------------------*/

/**
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/session/liveliness.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void _z_liveliness_unregister_token(_z_session_t *zn, uint32_t id);
z_result_t _z_liveliness_subscription_declare(_z_session_t *zn, uint32_t id, const _z_keyexpr_t *keyexpr,
const _z_timestamp_t *timestamp);
z_result_t _z_liveliness_subscription_undeclare(_z_session_t *zn, uint32_t id, const _z_timestamp_t *timestamp);
z_result_t _z_liveliness_subscription_undeclare_all(_z_session_t *zn);
z_result_t _z_liveliness_subscription_trigger_history(_z_session_t *zn, const _z_keyexpr_t *keyexpr);
#endif

Expand Down
11 changes: 5 additions & 6 deletions src/net/liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_

_z_declaration_t declaration = _z_make_decl_token(keyexpr, id);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
ret = _z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
ret = _z_send_declare(_Z_RC_IN_VAL(zn), &n_msg);
_z_n_msg_clear(&n_msg);

_z_liveliness_register_token(_Z_RC_IN_VAL(zn), id, keyexpr);
Expand All @@ -58,7 +58,7 @@ z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token) {

_z_declaration_t declaration = _z_make_undecl_token(token->_id, &token->_key);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
ret = _z_send_n_msg(_Z_RC_IN_VAL(&token->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
ret = _z_send_undeclare(_Z_RC_IN_VAL(&token->_zn), &n_msg);
_z_n_msg_clear(&n_msg);

return ret;
Expand Down Expand Up @@ -92,7 +92,7 @@ _z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_k
keyexpr, s._id, _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_TOKENS | _Z_INTEREST_FLAG_RESTRICTED | mode);

_z_network_message_t n_msg = _z_n_msg_make_interest(interest);
if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
if (_z_send_declare(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) {
_z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sp_s);
_z_subscriber_clear(&ret);
return ret;
Expand All @@ -117,8 +117,7 @@ z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub) {

_z_interest_t interest = _z_make_interest_final(s->_val->_id);
_z_network_message_t n_msg = _z_n_msg_make_interest(interest);
if (_z_send_n_msg(_Z_RC_IN_VAL(&sub->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) !=
_Z_RES_OK) {
if (_z_send_undeclare(_Z_RC_IN_VAL(&sub->_zn), &n_msg) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
}
_z_n_msg_clear(&n_msg);
Expand Down Expand Up @@ -155,7 +154,7 @@ z_result_t _z_liveliness_query(_z_session_t *zn, const _z_keyexpr_t *keyexpr, _z
_Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT);

_z_network_message_t n_msg = _z_n_msg_make_interest(interest);
if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
if (_z_send_declare(zn, &n_msg) != _Z_RES_OK) {
_z_liveliness_unregister_pending_query(zn, id);
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
Expand Down
16 changes: 8 additions & 8 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
#include "zenoh-pico/utils/result.h"

/*------------------ Declaration Helpers ------------------*/
static z_result_t _z_send_decalre(_z_session_t *zn, const _z_network_message_t *n_msg) {
z_result_t _z_send_declare(_z_session_t *zn, const _z_network_message_t *n_msg) {
z_result_t ret = _Z_RES_OK;
ret = _z_send_n_msg(zn, n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);

Expand All @@ -56,7 +56,7 @@ static z_result_t _z_send_decalre(_z_session_t *zn, const _z_network_message_t *
return ret;
}

static z_result_t _z_send_undecalre(_z_session_t *zn, const _z_network_message_t *n_msg) {
z_result_t _z_send_undeclare(_z_session_t *zn, const _z_network_message_t *n_msg) {
z_result_t ret = _Z_RES_OK;
ret = _z_send_n_msg(zn, n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);

Expand Down Expand Up @@ -98,7 +98,7 @@ uint16_t _z_declare_resource(_z_session_t *zn, const _z_keyexpr_t *keyexpr) {
_z_keyexpr_t alias = _z_keyexpr_alias(keyexpr);
_z_declaration_t declaration = _z_make_decl_keyexpr(id, &alias);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
if (_z_send_decalre(zn, &n_msg) == _Z_RES_OK) {
if (_z_send_declare(zn, &n_msg) == _Z_RES_OK) {
ret = id;
// Invalidate cache
_z_subscription_cache_invalidate(zn);
Expand All @@ -120,7 +120,7 @@ z_result_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) {
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_keyexpr(rid);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
if (_z_send_undecalre(zn, &n_msg) == _Z_RES_OK) {
if (_z_send_undeclare(zn, &n_msg) == _Z_RES_OK) {
// Remove local resource
_z_unregister_resource(zn, rid, _Z_KEYEXPR_MAPPING_LOCAL);
// Invalidate cache
Expand Down Expand Up @@ -273,7 +273,7 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_decl_subscriber(&keyexpr, s._id);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
if (_z_send_decalre(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) {
if (_z_send_declare(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) {
_z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_SUBSCRIBER, sp_s);
_z_subscriber_clear(&ret);
return ret;
Expand Down Expand Up @@ -305,7 +305,7 @@ z_result_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
declaration = _z_make_undecl_subscriber(sub->_entity_id, &_Z_RC_IN_VAL(s)->_key);
}
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
if (_z_send_undecalre(_Z_RC_IN_VAL(&sub->_zn), &n_msg) != _Z_RES_OK) {
if (_z_send_undeclare(_Z_RC_IN_VAL(&sub->_zn), &n_msg) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
}
_z_n_msg_clear(&n_msg);
Expand Down Expand Up @@ -340,7 +340,7 @@ _z_queryable_t _z_declare_queryable(const _z_session_rc_t *zn, _z_keyexpr_t keye
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_decl_queryable(&keyexpr, q._id, q._complete, _Z_QUERYABLE_DISTANCE_DEFAULT);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
if (_z_send_decalre(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) {
if (_z_send_declare(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) {
_z_unregister_session_queryable(_Z_RC_IN_VAL(zn), sp_q);
_z_queryable_clear(&ret);
return ret;
Expand Down Expand Up @@ -371,7 +371,7 @@ z_result_t _z_undeclare_queryable(_z_queryable_t *qle) {
declaration = _z_make_undecl_queryable(qle->_entity_id, &_Z_RC_IN_VAL(q)->_key);
}
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
if (_z_send_undecalre(_Z_RC_IN_VAL(&qle->_zn), &n_msg) != _Z_RES_OK) {
if (_z_send_undeclare(_Z_RC_IN_VAL(&qle->_zn), &n_msg) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
}
_z_n_msg_clear(&n_msg);
Expand Down
22 changes: 22 additions & 0 deletions src/session/liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,28 @@ z_result_t _z_liveliness_subscription_undeclare(_z_session_t *zn, uint32_t id, c
return ret;
}

z_result_t _z_liveliness_subscription_undeclare_all(_z_session_t *zn) {
z_result_t ret = _Z_RES_OK;

_z_session_mutex_lock(zn);
_z_keyexpr_intmap_t token_list = _z_keyexpr_intmap_clone(&zn->_remote_tokens);
_z_keyexpr_intmap_clear(&zn->_remote_tokens);
_z_session_mutex_unlock(zn);

_z_keyexpr_intmap_iterator_t iter = _z_keyexpr_intmap_iterator_make(&token_list);
_z_timestamp_t tm = _z_timestamp_null();
while (_z_keyexpr_intmap_iterator_next(&iter)) {
_z_keyexpr_t key = *_z_keyexpr_intmap_iterator_value(&iter);
ret = _z_trigger_liveliness_subscriptions_undeclare(zn, &key, &tm);
if (ret != _Z_RES_OK) {
break;
}
}
_z_keyexpr_intmap_clear(&token_list);

return ret;
}

z_result_t _z_liveliness_subscription_trigger_history(_z_session_t *zn, const _z_keyexpr_t *keyexpr) {
z_result_t ret = _Z_RES_OK;

Expand Down
5 changes: 5 additions & 0 deletions src/transport/multicast/lease.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ z_result_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) {

static void _zp_multicast_failed(_z_transport_multicast_t *ztm) {
_ZP_UNUSED(ztm);

#if Z_FEATURE_LIVELINESS == 1 && Z_FEATURE_SUBSCRIPTION == 1
_z_liveliness_subscription_undeclare_all(_Z_RC_IN_VAL(ztm->_common._session));
#endif

#if Z_FEATURE_AUTO_RECONNECT == 1
_z_reopen(ztm->_common._session);
#endif
Expand Down
5 changes: 5 additions & 0 deletions src/transport/unicast/lease.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "zenoh-pico/transport/unicast/lease.h"

#include "zenoh-pico/session/liveliness.h"
#include "zenoh-pico/session/query.h"
#include "zenoh-pico/system/common/platform.h"
#include "zenoh-pico/transport/common/tx.h"
Expand Down Expand Up @@ -45,6 +46,10 @@ static void _zp_unicast_failed(_z_transport_unicast_t *ztu) {
_z_unicast_transport_close(ztu, _Z_CLOSE_EXPIRED);
_z_unicast_transport_clear(ztu, true);

#if Z_FEATURE_LIVELINESS == 1 && Z_FEATURE_SUBSCRIPTION == 1
_z_liveliness_subscription_undeclare_all(_Z_RC_IN_VAL(ztu->_common._session));
#endif

#if Z_FEATURE_AUTO_RECONNECT == 1
_z_session_rc_ref_t *zs = ztu->_common._session;
z_result_t ret = _z_reopen(zs);
Expand Down
61 changes: 60 additions & 1 deletion tests/connection_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
WAIT_MESSAGE_TIMEOUT_S = 15
DISCONNECT_MESSAGES = ["Closing session because it has expired", "Send keep alive failed"]
CONNECT_MESSAGES = ["Z_OPEN(Ack)"]
LIVELINESS_TOKEN_ALIVE_MESSAGES = ["[LivelinessSubscriber] New alive token"]
LIVELINESS_TOKEN_DROP_MESSAGES = ["[LivelinessSubscriber] Dropped token"]
ROUTER_ERROR_MESSAGE = "ERROR"
ZENOH_PORT = "7447"

Expand All @@ -18,19 +20,23 @@
ACTIVE_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_pub', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}']
PASSIVE_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_sub', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}']

LIVELINESS_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_liveliness', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}']
LIVELINESS_SUB_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_sub_liveliness', '-h', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}']

LIBASAN_PATH = subprocess.run(["gcc", "-print-file-name=libasan.so"], stdout=subprocess.PIPE, text=True, check=True).stdout.strip()


def run_process(command, output_collector, process_list):
env = os.environ.copy()
env["RUST_LOG"] = "trace"
if LIBASAN_PATH:
env["LD_PRELOAD"] = LIBASAN_PATH

print(f"Run {command}")
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env)
process_list.append(process)
for line in iter(process.stdout.readline, ''):
print("--", line.strip())
print(f"-- [{process.pid}]:", line.strip())
output_collector.append(line.strip())
process.stdout.close()
process.wait()
Expand Down Expand Up @@ -175,6 +181,57 @@ def test_router_restart(router_command, client_command, timeout):
terminate_processes(client_process_list + router_process_list)


def test_liveliness_drop(router_command, liveliness_command, liveliness_sub_command):
print(f"Liveliness drop test")
router_output = []
dummy_output = []
client_output = []
process_list = []
blocked = False
try:
run_background(router_command, router_output, process_list)
time.sleep(ROUTER_INIT_TIMEOUT_S)

run_background(liveliness_sub_command, client_output, process_list)
run_background(liveliness_command, dummy_output, process_list)

if wait_messages(client_output, LIVELINESS_TOKEN_ALIVE_MESSAGES):
print("Liveliness token alive")
else:
raise Exception("Failed to get liveliness token alive.")
client_output.clear()

print("Blocking connection...")
block_connection()
blocked = True

time.sleep(15)

if wait_messages(client_output, LIVELINESS_TOKEN_DROP_MESSAGES):
print("Liveliness token dropped")
else:
raise Exception("Failed to get liveliness token drop.")
client_output.clear()

print("Unblocking connection...")
unblock_connection()
blocked = False

if wait_messages(client_output, LIVELINESS_TOKEN_ALIVE_MESSAGES):
print("Liveliness token alive")
else:
raise Exception("Failed to get liveliness token alive.")
client_output.clear()

check_router_errors(router_output)

print(f"Liveliness drop test passed")
finally:
if blocked:
unblock_connection()
terminate_processes(process_list)


def main():
if len(sys.argv) != 2:
print("Usage: sudo python3 ./connection_restore.py /path/to/zenohd")
Expand All @@ -198,6 +255,8 @@ def main():
test_router_restart(router_command, ACTIVE_CLIENT_COMMAND, 15)
test_router_restart(router_command, PASSIVE_CLIENT_COMMAND, 15)

test_liveliness_drop(router_command, LIVELINESS_CLIENT_COMMAND, LIVELINESS_SUB_CLIENT_COMMAND)


if __name__ == "__main__":
main()

0 comments on commit d085699

Please sign in to comment.