Skip to content

Commit

Permalink
Implement connection restoring (#799)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc authored Dec 19, 2024
1 parent d7fda4e commit 31aaa7a
Show file tree
Hide file tree
Showing 34 changed files with 791 additions and 272 deletions.
25 changes: 24 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,29 @@ jobs:
CMAKE_GENERATOR=Ninja ASAN=ON make
python3 ./build/tests/no_router.py
timeout-minutes: 5

connection_restore_test:
needs: zenoh_build
name: Connection restore test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Download Zenoh artifacts
uses: actions/download-artifact@v4
with:
name: ${{ needs.zenoh_build.outputs.artifact-name }}

- name: Unzip Zenoh artifacts
run: unzip ${{ needs.zenoh_build.outputs.artifact-name }} -d zenoh-standalone

- name: Build project and run test
run: |
sudo apt install -y ninja-build
CMAKE_GENERATOR=Ninja ASAN=ON CMAKE_BUILD_TYPE=Debug ZENOH_DEBUG=3 make
RUST_LOG=debug sudo python3 ./build/tests/connection_restore.py ./zenoh-standalone/zenohd
timeout-minutes: 15

markdown_lint:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -331,7 +354,7 @@ jobs:
ci:
name: CI status checks
runs-on: ubuntu-latest
needs: [run_tests, check_format, c99_build, raweth_build, zenoh_build, modular_build, unstable_build, st_build, fragment_test, attachment_test, memory_leak_test, no_router, markdown_lint, build_shared, build_static, integration, multicast]
needs: [run_tests, check_format, c99_build, raweth_build, zenoh_build, modular_build, unstable_build, st_build, fragment_test, attachment_test, memory_leak_test, no_router, connection_restore_test, markdown_lint, build_shared, build_static, integration, multicast]
if: always()
steps:
- name: Check whether all jobs pass
Expand Down
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions")
set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check")
set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching")
set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE")
set(Z_FEATURE_AUTO_RECONNECT 1 CACHE STRING "Toggle automatic reconnection")

# Add a warning message if someone tries to enable Z_FEATURE_LINK_SERIAL_USB directly
if(Z_FEATURE_LINK_SERIAL_USB AND NOT Z_FEATURE_UNSTABLE_API)
Expand All @@ -261,6 +262,7 @@ message(STATUS "Building with feature confing:\n\
* QUERYABLE: ${Z_FEATURE_QUERYABLE}\n\
* LIVELINESS: ${Z_FEATURE_LIVELINESS}\n\
* INTEREST: ${Z_FEATURE_INTEREST}\n\
* AUTO_RECONNECT: ${Z_FEATURE_AUTO_RECONNECT}\n\
* RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}")

configure_file(
Expand Down Expand Up @@ -527,6 +529,7 @@ if(UNIX OR MSVC)
configure_file(${PROJECT_SOURCE_DIR}/tests/attachment.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/attachment.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/no_router.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/no_router.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/memory_leak.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/memory_leak.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/connection_restore.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/connection_restore.py COPYONLY)

enable_testing()
add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test)
Expand Down
41 changes: 21 additions & 20 deletions include/zenoh-pico/collections/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,32 @@ _z_list_t *_z_list_push(_z_list_t *xs, void *x);
_z_list_t *_z_list_push_back(_z_list_t *xs, void *x);
_z_list_t *_z_list_pop(_z_list_t *xs, z_element_free_f f_f, void **x);

_z_list_t *_z_list_find(const _z_list_t *xs, z_element_eq_f f_f, void *e);
_z_list_t *_z_list_find(const _z_list_t *xs, z_element_eq_f f_f, const void *e);

_z_list_t *_z_list_drop_filter(_z_list_t *xs, z_element_free_f f_f, z_element_eq_f c_f, void *left);
_z_list_t *_z_list_drop_filter(_z_list_t *xs, z_element_free_f f_f, z_element_eq_f c_f, const void *left);

_z_list_t *_z_list_clone(const _z_list_t *xs, z_element_clone_f d_f);
void _z_list_free(_z_list_t **xs, z_element_free_f f_f);

#define _Z_LIST_DEFINE(name, type) \
typedef _z_list_t name##_list_t; \
static inline name##_list_t *name##_list_new(void) { return NULL; } \
static inline size_t name##_list_len(const name##_list_t *l) { return _z_list_len(l); } \
static inline bool name##_list_is_empty(const name##_list_t *l) { return _z_list_is_empty(l); } \
static inline type *name##_list_head(const name##_list_t *l) { return (type *)_z_list_head(l); } \
static inline name##_list_t *name##_list_tail(const name##_list_t *l) { return _z_list_tail(l); } \
static inline name##_list_t *name##_list_push(name##_list_t *l, type *e) { return _z_list_push(l, e); } \
static inline name##_list_t *name##_list_pop(name##_list_t *l, type **x) { \
return _z_list_pop(l, name##_elem_free, (void **)x); \
} \
static inline name##_list_t *name##_list_find(const name##_list_t *l, name##_eq_f c_f, type *e) { \
return _z_list_find(l, (z_element_eq_f)c_f, e); \
} \
static inline name##_list_t *name##_list_drop_filter(name##_list_t *l, name##_eq_f c_f, type *e) { \
return _z_list_drop_filter(l, name##_elem_free, (z_element_eq_f)c_f, e); \
} \
static inline name##_list_t *name##_list_clone(name##_list_t *l) { return _z_list_clone(l, name##_elem_clone); } \
#define _Z_LIST_DEFINE(name, type) \
typedef _z_list_t name##_list_t; \
static inline name##_list_t *name##_list_new(void) { return NULL; } \
static inline size_t name##_list_len(const name##_list_t *l) { return _z_list_len(l); } \
static inline bool name##_list_is_empty(const name##_list_t *l) { return _z_list_is_empty(l); } \
static inline type *name##_list_head(const name##_list_t *l) { return (type *)_z_list_head(l); } \
static inline name##_list_t *name##_list_tail(const name##_list_t *l) { return _z_list_tail(l); } \
static inline name##_list_t *name##_list_push(name##_list_t *l, type *e) { return _z_list_push(l, e); } \
static inline name##_list_t *name##_list_push_back(name##_list_t *l, type *e) { return _z_list_push_back(l, e); } \
static inline name##_list_t *name##_list_pop(name##_list_t *l, type **x) { \
return _z_list_pop(l, name##_elem_free, (void **)x); \
} \
static inline name##_list_t *name##_list_find(const name##_list_t *l, name##_eq_f c_f, const type *e) { \
return _z_list_find(l, (z_element_eq_f)c_f, e); \
} \
static inline name##_list_t *name##_list_drop_filter(name##_list_t *l, name##_eq_f c_f, const type *e) { \
return _z_list_drop_filter(l, name##_elem_free, (z_element_eq_f)c_f, e); \
} \
static inline name##_list_t *name##_list_clone(name##_list_t *l) { return _z_list_clone(l, name##_elem_clone); } \
static inline void name##_list_free(name##_list_t **l) { _z_list_free(l, name##_elem_free); }

#ifdef __cplusplus
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#define Z_FEATURE_SUBSCRIPTION 1
#define Z_FEATURE_QUERY 1
#define Z_FEATURE_QUERYABLE 1
#define Z_FEATURE_LIVELINESS 0
#define Z_FEATURE_LIVELINESS 1
#define Z_FEATURE_RAWETH_TRANSPORT 0
#define Z_FEATURE_INTEREST 1
#define Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION 0
Expand All @@ -48,6 +48,7 @@
#define Z_FEATURE_PUBLISHER_SESSION_CHECK 1
#define Z_FEATURE_BATCHING 1
#define Z_FEATURE_RX_CACHE 0
#define Z_FEATURE_AUTO_RECONNECT 1
// End of CMake generation

/*------------------ Runtime configuration properties ------------------*/
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#define Z_FEATURE_PUBLISHER_SESSION_CHECK @Z_FEATURE_PUBLISHER_SESSION_CHECK@
#define Z_FEATURE_BATCHING @Z_FEATURE_BATCHING@
#define Z_FEATURE_RX_CACHE @Z_FEATURE_RX_CACHE@
#define Z_FEATURE_AUTO_RECONNECT @Z_FEATURE_AUTO_RECONNECT@
// End of CMake generation

/*------------------ Runtime configuration properties ------------------*/
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/link/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ bool _z_locator_eq(const _z_locator_t *left, const _z_locator_t *right);

void _z_locator_init(_z_locator_t *locator);
_z_string_t _z_locator_to_string(const _z_locator_t *loc);
z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *s);
z_result_t _z_locator_from_string(_z_locator_t *lc, const _z_string_t *s);

size_t _z_locator_size(_z_locator_t *lc);
void _z_locator_clear(_z_locator_t *lc);
Expand All @@ -72,7 +72,7 @@ typedef struct {
} _z_endpoint_t;

_z_string_t _z_endpoint_to_string(const _z_endpoint_t *e);
z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, _z_string_t *s);
z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, const _z_string_t *s);
void _z_endpoint_clear(_z_endpoint_t *ep);
void _z_endpoint_free(_z_endpoint_t **ep);

Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/link/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ typedef struct _z_link_t {

void _z_link_clear(_z_link_t *zl);
void _z_link_free(_z_link_t **zl);
z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator);
z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator);
z_result_t _z_open_link(_z_link_t *zl, const _z_string_t *locator);
z_result_t _z_listen_link(_z_link_t *zl, const _z_string_t *locator);

z_result_t _z_link_send_wbuf(const _z_link_t *zl, const _z_wbuf_t *wbf);
size_t _z_link_recv_zbuf(const _z_link_t *zl, _z_zbuf_t *zbf, _z_slice_t *addr);
Expand Down
42 changes: 40 additions & 2 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "zenoh-pico/collections/list.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/session/liveliness.h"
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/session.h"
Expand Down Expand Up @@ -55,6 +56,14 @@ typedef struct _z_session_t {
_z_resource_list_t *_local_resources;
_z_resource_list_t *_remote_resources;

#if Z_FEATURE_AUTO_RECONNECT == 1
// Information for session restoring
_z_config_t _config;
_z_network_message_list_t *_decalaration_cache;
z_task_attr_t *_lease_task_attr;
z_task_attr_t *_read_task_attr;
#endif

// Session subscriptions
#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_list_t *_subscriptions;
Expand Down Expand Up @@ -99,14 +108,43 @@ _Z_REFCOUNT_DEFINE(_z_session, _z_session)
* Open a zenoh-net session
*
* Parameters:
* zn: A pointer of A :c:type:`_z_session_rc_t` used as a return value.
* config: A set of properties. The caller keeps its ownership.
* zn: A pointer of A :c:type:`_z_session_t` used as a return value.
* zid: A pointer to Zenoh ID.
*
* Returns:
* ``0`` in case of success, or a ``negative value`` in case of failure.
*/
z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config, const _z_id_t *zid);

/**
* Reopen a disconnected zenoh-net session
*
* Parameters:
* zn: Existing zenoh-net session.
*
* Returns:
* ``0`` in case of success, or a ``negative value`` in case of failure.
*/
z_result_t _z_reopen(_z_session_rc_t *zn);

/**
* Store declaration network message to cache for resend it after session restore
*
* Parameters:
* zs: A zenoh-net session.
* z_msg: Network message with declaration
*/
void _z_cache_declaration(_z_session_t *zs, const _z_network_message_t *n_msg);

/**
* Remove corresponding declaration from the cache
*
* Parameters:
* zs: A zenoh-net session.
* z_msg: Network message with undeclaration
*/
z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config);
void _z_prune_declaration(_z_session_t *zs, const _z_network_message_t *n_msg);

/**
* Close a zenoh-net session.
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ inline static void _z_msg_clear(_z_zenoh_message_t *msg) { _z_n_msg_clear(msg);
inline static void _z_msg_free(_z_zenoh_message_t **msg) { _z_n_msg_free(msg); }
_Z_ELEM_DEFINE(_z_network_message, _z_network_message_t, _z_noop_size, _z_n_msg_clear, _z_noop_copy, _z_noop_move)
_Z_SVEC_DEFINE(_z_network_message, _z_network_message_t)
_Z_LIST_DEFINE(_z_network_message, _z_network_message_t)

void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping);
_z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_slice_t) parameters, _z_zint_t qid,
Expand All @@ -315,6 +316,7 @@ _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, bool ha
_z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_interest(_z_interest_t interest);
z_result_t _z_n_msg_copy(_z_network_message_t *dst, const _z_network_message_t *src);
_z_network_message_t *_z_n_msg_clone(const _z_network_message_t *src);

#ifdef __cplusplus
}
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extern "C" {
_z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t id, _z_string_t *locator, const uint32_t timeout,
const bool exit_on_first);

z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid);
z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid);
void _z_session_clear(_z_session_t *zn);
z_result_t _z_session_close(_z_session_t *zn, uint8_t reason);

Expand Down
30 changes: 30 additions & 0 deletions include/zenoh-pico/transport/common/transport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

#ifndef ZENOH_PICO_COMMON_TRANSPORT_H
#define ZENOH_PICO_COMMON_TRANSPORT_H

#include "zenoh-pico/transport/transport.h"

#ifdef __cplusplus
extern "C" {
#endif

void _z_common_transport_clear(_z_transport_common_t *ztc, bool detach_tasks);

#ifdef __cplusplus
}
#endif

#endif /* ZENOH_PICO_COMMON_TRANSPORT_H*/
3 changes: 2 additions & 1 deletion include/zenoh-pico/transport/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ enum _z_peer_op_e {
_Z_PEER_OP_LISTEN = 1,
};

z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode, int peer_op);
z_result_t _z_new_transport(_z_transport_t *zt, const _z_id_t *bs, const _z_string_t *locator, z_whatami_t mode,
int peer_op);
void _z_free_transport(_z_transport_t **zt);

#ifdef __cplusplus
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/multicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ z_result_t _z_multicast_open_client(_z_transport_multicast_establish_param_t *pa
const _z_id_t *local_zid);
z_result_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, bool link_only);
z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reason);
void _z_multicast_transport_clear(_z_transport_t *zt);
void _z_multicast_transport_clear(_z_transport_multicast_t *ztm, bool detach_tasks);

#if (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1
static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_peer); }
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ typedef struct {
uint8_t _seq_num_res;
} _z_transport_multicast_establish_param_t;

_z_transport_common_t *_z_transport_get_common(_z_transport_t *zt);
z_result_t _z_transport_close(_z_transport_t *zt, uint8_t reason);
void _z_transport_clear(_z_transport_t *zt);
void _z_transport_free(_z_transport_t **zt);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/unicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ z_result_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, c
const _z_id_t *local_zid, int peer_op);
z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bool link_only);
z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason);
void _z_unicast_transport_clear(_z_transport_t *zt);
void _z_unicast_transport_clear(_z_transport_unicast_t *ztu, bool detach_tasks);

#ifdef __cplusplus
}
Expand Down
Loading

0 comments on commit 31aaa7a

Please sign in to comment.