From 9e3a175df2ed5ce4559586eeec5e894e073814ba Mon Sep 17 00:00:00 2001 From: Robert Schmid Date: Wed, 3 Jan 2024 13:43:29 +0000 Subject: [PATCH] Cancellable accept --- examples/examples_localhost/c/event_test.c | 9 ++- examples/rcat/c/rcat.c | 3 +- examples/rcat/c/rcat_dtls.c | 3 +- examples/rcat/c/rcat_tls.c | 3 +- src/c/rasta.c | 77 ++++++++++++++++++-- src/c/redundancy/rasta_red_multiplexer.c | 3 +- src/c/retransmission/safety_retransmission.c | 4 +- src/c/transport/events.c | 24 ++++-- src/c/transport/events.h | 12 +-- src/c/util/event_system.c | 8 +- src/include/rasta/events.h | 2 +- src/include/rasta/rasta.h | 48 ++++++++---- 12 files changed, 149 insertions(+), 47 deletions(-) diff --git a/examples/examples_localhost/c/event_test.c b/examples/examples_localhost/c/event_test.c index 2f8f3e9..a4d5aca 100644 --- a/examples/examples_localhost/c/event_test.c +++ b/examples/examples_localhost/c/event_test.c @@ -14,21 +14,24 @@ uint64_t test_get_nanotime() { return t.tv_sec * 1000000000 + t.tv_nsec; } -int send_heartbeat_event(void *carry_data) { +int send_heartbeat_event(void *carry_data, int fd) { (void)carry_data; + (void)fd; uint64_t n_time = test_get_nanotime(); printf("time since last call: %8lu us - expected:%8lu us\n", (unsigned long)((n_time - last_time) / 1000), (unsigned long)(heartbeat_interval / 1000)); last_time = n_time; return 0; } -int disconnect_event(void *carry_data) { +int disconnect_event(void *carry_data, int fd) { (void)carry_data; + (void)fd; printf("disconnecting due to inactivity\n"); return 1; } -int event_read(void *carry_data) { +int event_read(void *carry_data, int fd) { + (void)fd; char buffer[128]; ssize_t len = read(STDIN_FILENO, buffer, 127); if (buffer[0] != '\n') { diff --git a/examples/rcat/c/rcat.c b/examples/rcat/c/rcat.c index 9941116..a67981c 100644 --- a/examples/rcat/c/rcat.c +++ b/examples/rcat/c/rcat.c @@ -26,7 +26,8 @@ struct connect_event_data { struct rasta_connection *connection; }; -int send_input_data(void *carry_data) { +int send_input_data(void *carry_data, int fd) { + (void)fd; struct connect_event_data *data = carry_data; char buf[BUF_SIZE]; int c; diff --git a/examples/rcat/c/rcat_dtls.c b/examples/rcat/c/rcat_dtls.c index 961b0d5..d0519ea 100644 --- a/examples/rcat/c/rcat_dtls.c +++ b/examples/rcat/c/rcat_dtls.c @@ -42,7 +42,8 @@ struct connect_event_data { struct rasta_connection *connection; }; -int send_input_data(void *carry_data) { +int send_input_data(void *carry_data, int fd) { + (void)fd; struct connect_event_data *data = carry_data; char buf[BUF_SIZE]; int c; diff --git a/examples/rcat/c/rcat_tls.c b/examples/rcat/c/rcat_tls.c index c8b799c..caca31c 100644 --- a/examples/rcat/c/rcat_tls.c +++ b/examples/rcat/c/rcat_tls.c @@ -42,7 +42,8 @@ struct connect_event_data { struct rasta_connection *connection; }; -int send_input_data(void *carry_data) { +int send_input_data(void *carry_data, int fd) { + (void)fd; struct connect_event_data *data = carry_data; char buf[BUF_SIZE]; int c; diff --git a/src/c/rasta.c b/src/c/rasta.c index 107160f..d0f002d 100644 --- a/src/c/rasta.c +++ b/src/c/rasta.c @@ -39,7 +39,7 @@ void rasta_listen(rasta *user_configuration) { sr_listen(&user_configuration->h); } -struct rasta_connection *rasta_accept(rasta *user_configuration) { +rasta_connection *rasta_accept(rasta *user_configuration) { struct rasta_handle *h = &user_configuration->h; event_system *event_system = &user_configuration->rasta_lib_event_system; @@ -61,11 +61,78 @@ struct rasta_connection *rasta_accept(rasta *user_configuration) { return NULL; } -struct rasta_connection *rasta_connect(rasta *user_configuration, unsigned long id) { +int terminator_callback(void *carry, int fd) { + rasta *r = carry; + + logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Cancel", "Executing cancel handler..."); + + // Invalidate the event (read from the pipe) + uint64_t u; + ssize_t ignored = read(fd, &u, sizeof(u)); + (void)ignored; + + // Close the pipe + close(fd); + + // Exit the event loop + return 1; +} + +typedef struct rasta_cancellation { + int fd[2]; +} rasta_cancellation; + +rasta_cancellation *rasta_prepare_cancellation(rasta *r) { + logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Cancel", "Allocating cancellation..."); + rasta_cancellation *result = rmalloc(sizeof(rasta_cancellation)); + + // Cancel event + if (pipe(result->fd) < 0) { + perror("Failed to create pipe"); + rfree(result); + return NULL; + } + + return result; +} + +rasta_connection *rasta_accept_with_cancel(rasta *r, rasta_cancellation *cancellation) { + logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Accept", "Registering cancel event..."); + + fd_event terminator_event; + memset(&terminator_event, 0, sizeof(fd_event)); + terminator_event.callback = terminator_callback; + terminator_event.carry_data = r; + terminator_event.fd = cancellation->fd[0]; + enable_fd_event(&terminator_event); + rasta_add_fd_event(r, &terminator_event, EV_READABLE); + + rasta_connection *result = rasta_accept(r); + + logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Accept", "Unregistering cancel event..."); + + rasta_remove_fd_event(r, &terminator_event); + close(cancellation->fd[1]); + + logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Cancel", "Freeing cancellation..."); + rfree(cancellation); + + return result; +} + +void rasta_cancel_operation(rasta *r, rasta_cancellation *cancel) { + logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Cancel", "Canceling operation..."); + + uint64_t terminate = 1; + uint64_t ignore = write(cancel->fd[1], &terminate, sizeof(uint64_t)); + (void)ignore; +} + +rasta_connection *rasta_connect(rasta *user_configuration, unsigned long id) { return sr_connect(&user_configuration->h, id); } -int rasta_recv(rasta *user_configuration, struct rasta_connection *connection, void *buf, size_t len) { +int rasta_recv(rasta *user_configuration, rasta_connection *connection, void *buf, size_t len) { struct rasta_handle *h = &user_configuration->h; event_system *event_system = &user_configuration->rasta_lib_event_system; @@ -95,7 +162,7 @@ int rasta_recv(rasta *user_configuration, struct rasta_connection *connection, v return received_len; } -int rasta_send(rasta *user_configuration, struct rasta_connection *connection, void *buf, size_t len) { +int rasta_send(rasta *user_configuration, rasta_connection *connection, void *buf, size_t len) { struct RastaMessageData messageData1; allocateRastaMessageData(&messageData1, 1); messageData1.data_array[0].bytes = buf; @@ -106,7 +173,7 @@ int rasta_send(rasta *user_configuration, struct rasta_connection *connection, v return return_val; } -void rasta_disconnect(struct rasta_connection *connection) { +void rasta_disconnect(rasta_connection *connection) { sr_disconnect(connection); } diff --git a/src/c/redundancy/rasta_red_multiplexer.c b/src/c/redundancy/rasta_red_multiplexer.c index ad1d27d..dfc8c7c 100644 --- a/src/c/redundancy/rasta_red_multiplexer.c +++ b/src/c/redundancy/rasta_red_multiplexer.c @@ -137,8 +137,9 @@ void handle_received_data(redundancy_mux *mux, unsigned char *buffer, ssize_t le freeRastaByteArray(&test.key); } -int channel_timeout_event(void *carry_data) { +int channel_timeout_event(void *carry_data, int fd) { UNUSED(carry_data); + UNUSED(fd); // Escape the event loop return 1; } diff --git a/src/c/retransmission/safety_retransmission.c b/src/c/retransmission/safety_retransmission.c index bf4d871..2d3d965 100644 --- a/src/c/retransmission/safety_retransmission.c +++ b/src/c/retransmission/safety_retransmission.c @@ -113,7 +113,7 @@ void sr_remove_confirmed_messages(struct rasta_connection *con) { // sending is now possible again (space in the retransmission queue is available), so we should trigger it if (fifo_full(con->fifo_send)) { - data_send_event(&con->send_handle); + data_send_event(&con->send_handle, -1); } } @@ -397,7 +397,7 @@ int sr_send(struct rasta_handle *h, struct rasta_connection *con, struct RastaMe if (fifo_full(con->fifo_send)) { // Flush, send queued messages now - data_send_event(&con->send_handle); + data_send_event(&con->send_handle, -1); } if (!fifo_push(con->fifo_send, to_fifo)) { diff --git a/src/c/transport/events.c b/src/c/transport/events.c index 5871ddf..8f789ac 100644 --- a/src/c/transport/events.c +++ b/src/c/transport/events.c @@ -17,7 +17,9 @@ #include "diagnostics.h" #include "transport.h" -int channel_accept_event(void *carry_data) { +int channel_accept_event(void *carry_data, int _fd) { + UNUSED(_fd); + struct accept_event_data *data = carry_data; logger_log(data->h->mux.logger, LOG_LEVEL_DEBUG, "RaSTA RedMux accept", "Socket ready to accept"); @@ -60,7 +62,9 @@ int channel_accept_event(void *carry_data) { return 0; } -int channel_receive_event(void *carry_data) { +int channel_receive_event(void *carry_data, int fd) { + UNUSED(fd); + struct receive_event_data *data = carry_data; rasta_connection *connection = data->connection; @@ -135,7 +139,9 @@ int channel_receive_event(void *carry_data) { return 0; } -int event_connection_expired(void *carry_data) { +int event_connection_expired(void *carry_data, int fd) { + UNUSED(fd); + struct timed_event_data *data = carry_data; struct rasta_heartbeat_handle *h = (struct rasta_heartbeat_handle *)data->handle; logger_log(h->logger, LOG_LEVEL_DEBUG, "RaSTA HEARTBEAT", "T_i timer expired"); @@ -169,7 +175,9 @@ int event_connection_expired(void *carry_data) { return 1; } -int heartbeat_send_event(void *carry_data) { +int heartbeat_send_event(void *carry_data, int fd) { + UNUSED(fd); + struct timed_event_data *data = carry_data; struct rasta_heartbeat_handle *h = (struct rasta_heartbeat_handle *)data->handle; @@ -191,7 +199,9 @@ int heartbeat_send_event(void *carry_data) { return 0; } -int data_send_event(void *carry_data) { +int data_send_event(void *carry_data, int fd) { + UNUSED(fd); + rasta_sending_handle *h = carry_data; logger_log(h->logger, LOG_LEVEL_DEBUG, "RaSTA send handler", "send data"); @@ -276,7 +286,9 @@ int data_send_event(void *carry_data) { return 0; } -int send_timed_key_exchange(void *arg) { +int send_timed_key_exchange(void *arg, int fd) { + UNUSED(fd); + #ifdef ENABLE_OPAQUE struct timed_event_data *event_data = (struct timed_event_data *)arg; // rasta_receive_handle *handle = (rasta_receive_handle *)event_data->handle; diff --git a/src/c/transport/events.h b/src/c/transport/events.h index 7702bdc..72ffd24 100644 --- a/src/c/transport/events.h +++ b/src/c/transport/events.h @@ -20,11 +20,11 @@ struct receive_event_data { rasta_transport_channel *channel; }; -int channel_accept_event(void *carry_data); -int channel_receive_event(void *carry_data); +int channel_accept_event(void *carry_data, int fd); +int channel_receive_event(void *carry_data, int fd); -int data_send_event(void *carry_data); -int heartbeat_send_event(void *carry_data); -int event_connection_expired(void *carry_data); +int data_send_event(void *carry_data, int fd); +int heartbeat_send_event(void *carry_data, int fd); +int event_connection_expired(void *carry_data, int fd); -int send_timed_key_exchange(void *arg); +int send_timed_key_exchange(void *arg, int fd); diff --git a/src/c/util/event_system.c b/src/c/util/event_system.c index ffa5931..0ef6279 100644 --- a/src/c/util/event_system.c +++ b/src/c/util/event_system.c @@ -69,13 +69,13 @@ int event_system_sleep(uint64_t time_to_wait, struct fd_event_linked_list_s *fd_ } for (fd_event *current = fd_events->first; current; current = current->next) { if (current->enabled && FD_ISSET(current->fd, &on_readable)) { - if (current->callback(current->carry_data)) return -1; + if (current->callback(current->carry_data, current->fd)) return -1; } if (current->enabled && FD_ISSET(current->fd, &on_writable)) { - if (current->callback(current->carry_data)) return -1; + if (current->callback(current->carry_data, current->fd)) return -1; } if (current->enabled && FD_ISSET(current->fd, &on_exceptional)) { - if (current->callback(current->carry_data)) return -1; + if (current->callback(current->carry_data, current->fd)) return -1; } } return result; @@ -156,7 +156,7 @@ void event_system_start(event_system *ev_sys) { } } // fire event and exit in case it returns something else than 0 - if (next_event->callback(next_event->carry_data)) { + if (next_event->callback(next_event->carry_data, -1)) { break; } // update timed_event::last_call diff --git a/src/include/rasta/events.h b/src/include/rasta/events.h index 16b316b..8cf68fe 100644 --- a/src/include/rasta/events.h +++ b/src/include/rasta/events.h @@ -11,7 +11,7 @@ typedef struct event_system event_system; typedef struct rasta rasta; // event callback pointer, return 0 to keep the loop running, everything else stops the loop -typedef int (*event_ptr)(void *h); +typedef int (*event_ptr)(void *h, int fd); #define EV_READABLE (1 << 0) #define EV_WRITABLE (1 << 1) diff --git a/src/include/rasta/rasta.h b/src/include/rasta/rasta.h index f743cd7..381aa19 100644 --- a/src/include/rasta/rasta.h +++ b/src/include/rasta/rasta.h @@ -14,11 +14,12 @@ extern "C" { // only need to export C interface if #include "rastarole.h" typedef struct rasta rasta; -struct rasta_connection; +typedef struct rasta_connection rasta_connection; +typedef struct rasta_cancellation rasta_cancellation; /** * initializes the RaSTA handle and all configured connections - * @param user_configuration the user configuration containing the handle to initialize + * @param rasta the user configuration containing the handle to initialize * @param config the configuration to initialize the handle with * @param logger the logger to use * @param connections the connections to initialize @@ -26,60 +27,75 @@ struct rasta_connection; */ rasta *rasta_lib_init_configuration(rasta_config_info *config, rasta_connection_config *connections, size_t connections_length, log_level log_level, logger_type logger_type); - /** * binds a RaSTA instance to the configured IP addresses and ports for the transport channels - * @param user_configuration the user configuration to be used + * @param rasta the user configuration to be used */ bool rasta_bind(rasta *r); /** * Listen on all sockets specified by the given RaSTA handle. - * @param user_configuration the user configuration containing the socket information + * @param rasta the user configuration containing the socket information */ void rasta_listen(rasta *r); /** - * Wait for connections on all sockets specified in the user_configuration. - * @param user_configuration the user configuration containing the socket information + * Wait for incoming connections. + * @param rasta the user configuration containing the socket information + */ +rasta_connection *rasta_accept(rasta *r); + +/** + * Prepares cancellation of a blocking operation. + */ +rasta_cancellation *rasta_prepare_cancellation(rasta *r); + +/** + * Wait for incoming connections. + * @param rasta the user configuration containing the socket information + */ +rasta_connection *rasta_accept_with_cancel(rasta *r, rasta_cancellation *cancel); + +/** + * Performs cancellation of a blocking operation. */ -struct rasta_connection *rasta_accept(rasta *r); +void rasta_cancel_operation(rasta *r, rasta_cancellation *cancel); /** * Connect to another rasta instance - * @param user_configuration the user configuration of the local RaSTA instance + * @param rasta the user configuration of the local RaSTA instance * @param id the ID of the remote RaSTA instance to connect to */ -struct rasta_connection *rasta_connect(rasta *r, unsigned long id); +rasta_connection *rasta_connect(rasta *r, unsigned long id); /** * Receive data on a given RaSTA connection - * @param user_configuration the user configuration of the local RaSTA instance + * @param rasta the user configuration of the local RaSTA instance * @param connection the connection from which to receive the data * @param buf the buffer into which to save the received data * @param len the size of buf in bytes */ -int rasta_recv(rasta *r, struct rasta_connection *connection, void *buf, size_t len); +int rasta_recv(rasta *r, rasta_connection *connection, void *buf, size_t len); /** * Send data on a given RaSTA connection - * @param user_configuration the user configuration of the local RaSTA instance + * @param rasta the user configuration of the local RaSTA instance * @param connection the connection on which to send the data * @param buf the buffer from which to read the data to be sent * @param len the size of buf in bytes */ -int rasta_send(rasta *r, struct rasta_connection *connection, void *buf, size_t len); +int rasta_send(rasta *r, rasta_connection *connection, void *buf, size_t len); /** * disconnect a connection on request by the user * @param connection the connection that should be disconnected */ -void rasta_disconnect(struct rasta_connection *connection); +void rasta_disconnect(rasta_connection *connection); /** * Cleanup a connection after a disconnect and free assigned ressources. * Always use this when a programm terminates, otherwise it may not start again. - * @param user_configuration the RaSTA lib configuration + * @param rasta the RaSTA lib configuration */ void rasta_cleanup(rasta *r);