Skip to content

Commit

Permalink
Cancellable accept
Browse files Browse the repository at this point in the history
  • Loading branch information
rs22 committed Jan 3, 2024
1 parent f301654 commit 9e3a175
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 47 deletions.
9 changes: 6 additions & 3 deletions examples/examples_localhost/c/event_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@ uint64_t test_get_nanotime() {
return t.tv_sec * 1000000000 + t.tv_nsec;
}

int send_heartbeat_event(void *carry_data) {
int send_heartbeat_event(void *carry_data, int fd) {
(void)carry_data;
(void)fd;
uint64_t n_time = test_get_nanotime();
printf("time since last call: %8lu us - expected:%8lu us\n", (unsigned long)((n_time - last_time) / 1000), (unsigned long)(heartbeat_interval / 1000));
last_time = n_time;
return 0;
}

int disconnect_event(void *carry_data) {
int disconnect_event(void *carry_data, int fd) {
(void)carry_data;
(void)fd;
printf("disconnecting due to inactivity\n");
return 1;
}

int event_read(void *carry_data) {
int event_read(void *carry_data, int fd) {
(void)fd;
char buffer[128];
ssize_t len = read(STDIN_FILENO, buffer, 127);
if (buffer[0] != '\n') {
Expand Down
3 changes: 2 additions & 1 deletion examples/rcat/c/rcat.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ struct connect_event_data {
struct rasta_connection *connection;
};

int send_input_data(void *carry_data) {
int send_input_data(void *carry_data, int fd) {
(void)fd;
struct connect_event_data *data = carry_data;
char buf[BUF_SIZE];
int c;
Expand Down
3 changes: 2 additions & 1 deletion examples/rcat/c/rcat_dtls.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ struct connect_event_data {
struct rasta_connection *connection;
};

int send_input_data(void *carry_data) {
int send_input_data(void *carry_data, int fd) {
(void)fd;
struct connect_event_data *data = carry_data;
char buf[BUF_SIZE];
int c;
Expand Down
3 changes: 2 additions & 1 deletion examples/rcat/c/rcat_tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ struct connect_event_data {
struct rasta_connection *connection;
};

int send_input_data(void *carry_data) {
int send_input_data(void *carry_data, int fd) {
(void)fd;
struct connect_event_data *data = carry_data;
char buf[BUF_SIZE];
int c;
Expand Down
77 changes: 72 additions & 5 deletions src/c/rasta.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void rasta_listen(rasta *user_configuration) {
sr_listen(&user_configuration->h);
}

struct rasta_connection *rasta_accept(rasta *user_configuration) {
rasta_connection *rasta_accept(rasta *user_configuration) {
struct rasta_handle *h = &user_configuration->h;
event_system *event_system = &user_configuration->rasta_lib_event_system;

Expand All @@ -61,11 +61,78 @@ struct rasta_connection *rasta_accept(rasta *user_configuration) {
return NULL;
}

struct rasta_connection *rasta_connect(rasta *user_configuration, unsigned long id) {
int terminator_callback(void *carry, int fd) {
rasta *r = carry;

logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Cancel", "Executing cancel handler...");

// Invalidate the event (read from the pipe)
uint64_t u;
ssize_t ignored = read(fd, &u, sizeof(u));
(void)ignored;

// Close the pipe
close(fd);

// Exit the event loop
return 1;
}

typedef struct rasta_cancellation {
int fd[2];
} rasta_cancellation;

rasta_cancellation *rasta_prepare_cancellation(rasta *r) {
logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Cancel", "Allocating cancellation...");
rasta_cancellation *result = rmalloc(sizeof(rasta_cancellation));

// Cancel event
if (pipe(result->fd) < 0) {
perror("Failed to create pipe");
rfree(result);
return NULL;
}

return result;
}

rasta_connection *rasta_accept_with_cancel(rasta *r, rasta_cancellation *cancellation) {
logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Accept", "Registering cancel event...");

fd_event terminator_event;
memset(&terminator_event, 0, sizeof(fd_event));
terminator_event.callback = terminator_callback;
terminator_event.carry_data = r;
terminator_event.fd = cancellation->fd[0];
enable_fd_event(&terminator_event);
rasta_add_fd_event(r, &terminator_event, EV_READABLE);

rasta_connection *result = rasta_accept(r);

logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Accept", "Unregistering cancel event...");

rasta_remove_fd_event(r, &terminator_event);
close(cancellation->fd[1]);

logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Cancel", "Freeing cancellation...");
rfree(cancellation);

return result;
}

void rasta_cancel_operation(rasta *r, rasta_cancellation *cancel) {
logger_log(&r->logger, LOG_LEVEL_DEBUG, "RaSTA Cancel", "Canceling operation...");

uint64_t terminate = 1;
uint64_t ignore = write(cancel->fd[1], &terminate, sizeof(uint64_t));
(void)ignore;
}

rasta_connection *rasta_connect(rasta *user_configuration, unsigned long id) {
return sr_connect(&user_configuration->h, id);
}

int rasta_recv(rasta *user_configuration, struct rasta_connection *connection, void *buf, size_t len) {
int rasta_recv(rasta *user_configuration, rasta_connection *connection, void *buf, size_t len) {
struct rasta_handle *h = &user_configuration->h;
event_system *event_system = &user_configuration->rasta_lib_event_system;

Expand Down Expand Up @@ -95,7 +162,7 @@ int rasta_recv(rasta *user_configuration, struct rasta_connection *connection, v
return received_len;
}

int rasta_send(rasta *user_configuration, struct rasta_connection *connection, void *buf, size_t len) {
int rasta_send(rasta *user_configuration, rasta_connection *connection, void *buf, size_t len) {
struct RastaMessageData messageData1;
allocateRastaMessageData(&messageData1, 1);
messageData1.data_array[0].bytes = buf;
Expand All @@ -106,7 +173,7 @@ int rasta_send(rasta *user_configuration, struct rasta_connection *connection, v
return return_val;
}

void rasta_disconnect(struct rasta_connection *connection) {
void rasta_disconnect(rasta_connection *connection) {
sr_disconnect(connection);
}

Expand Down
3 changes: 2 additions & 1 deletion src/c/redundancy/rasta_red_multiplexer.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ void handle_received_data(redundancy_mux *mux, unsigned char *buffer, ssize_t le
freeRastaByteArray(&test.key);
}

int channel_timeout_event(void *carry_data) {
int channel_timeout_event(void *carry_data, int fd) {
UNUSED(carry_data);
UNUSED(fd);
// Escape the event loop
return 1;
}
Expand Down
4 changes: 2 additions & 2 deletions src/c/retransmission/safety_retransmission.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void sr_remove_confirmed_messages(struct rasta_connection *con) {

// sending is now possible again (space in the retransmission queue is available), so we should trigger it
if (fifo_full(con->fifo_send)) {
data_send_event(&con->send_handle);
data_send_event(&con->send_handle, -1);
}
}

Expand Down Expand Up @@ -397,7 +397,7 @@ int sr_send(struct rasta_handle *h, struct rasta_connection *con, struct RastaMe

if (fifo_full(con->fifo_send)) {
// Flush, send queued messages now
data_send_event(&con->send_handle);
data_send_event(&con->send_handle, -1);
}

if (!fifo_push(con->fifo_send, to_fifo)) {
Expand Down
24 changes: 18 additions & 6 deletions src/c/transport/events.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
#include "diagnostics.h"
#include "transport.h"

int channel_accept_event(void *carry_data) {
int channel_accept_event(void *carry_data, int _fd) {
UNUSED(_fd);

struct accept_event_data *data = carry_data;

logger_log(data->h->mux.logger, LOG_LEVEL_DEBUG, "RaSTA RedMux accept", "Socket ready to accept");
Expand Down Expand Up @@ -60,7 +62,9 @@ int channel_accept_event(void *carry_data) {
return 0;
}

int channel_receive_event(void *carry_data) {
int channel_receive_event(void *carry_data, int fd) {
UNUSED(fd);

struct receive_event_data *data = carry_data;
rasta_connection *connection = data->connection;

Expand Down Expand Up @@ -135,7 +139,9 @@ int channel_receive_event(void *carry_data) {
return 0;
}

int event_connection_expired(void *carry_data) {
int event_connection_expired(void *carry_data, int fd) {
UNUSED(fd);

struct timed_event_data *data = carry_data;
struct rasta_heartbeat_handle *h = (struct rasta_heartbeat_handle *)data->handle;
logger_log(h->logger, LOG_LEVEL_DEBUG, "RaSTA HEARTBEAT", "T_i timer expired");
Expand Down Expand Up @@ -169,7 +175,9 @@ int event_connection_expired(void *carry_data) {
return 1;
}

int heartbeat_send_event(void *carry_data) {
int heartbeat_send_event(void *carry_data, int fd) {
UNUSED(fd);

struct timed_event_data *data = carry_data;
struct rasta_heartbeat_handle *h = (struct rasta_heartbeat_handle *)data->handle;

Expand All @@ -191,7 +199,9 @@ int heartbeat_send_event(void *carry_data) {
return 0;
}

int data_send_event(void *carry_data) {
int data_send_event(void *carry_data, int fd) {
UNUSED(fd);

rasta_sending_handle *h = carry_data;

logger_log(h->logger, LOG_LEVEL_DEBUG, "RaSTA send handler", "send data");
Expand Down Expand Up @@ -276,7 +286,9 @@ int data_send_event(void *carry_data) {
return 0;
}

int send_timed_key_exchange(void *arg) {
int send_timed_key_exchange(void *arg, int fd) {
UNUSED(fd);

#ifdef ENABLE_OPAQUE
struct timed_event_data *event_data = (struct timed_event_data *)arg;
// rasta_receive_handle *handle = (rasta_receive_handle *)event_data->handle;
Expand Down
12 changes: 6 additions & 6 deletions src/c/transport/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ struct receive_event_data {
rasta_transport_channel *channel;
};

int channel_accept_event(void *carry_data);
int channel_receive_event(void *carry_data);
int channel_accept_event(void *carry_data, int fd);
int channel_receive_event(void *carry_data, int fd);

int data_send_event(void *carry_data);
int heartbeat_send_event(void *carry_data);
int event_connection_expired(void *carry_data);
int data_send_event(void *carry_data, int fd);
int heartbeat_send_event(void *carry_data, int fd);
int event_connection_expired(void *carry_data, int fd);

int send_timed_key_exchange(void *arg);
int send_timed_key_exchange(void *arg, int fd);
8 changes: 4 additions & 4 deletions src/c/util/event_system.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ int event_system_sleep(uint64_t time_to_wait, struct fd_event_linked_list_s *fd_
}
for (fd_event *current = fd_events->first; current; current = current->next) {
if (current->enabled && FD_ISSET(current->fd, &on_readable)) {
if (current->callback(current->carry_data)) return -1;
if (current->callback(current->carry_data, current->fd)) return -1;
}
if (current->enabled && FD_ISSET(current->fd, &on_writable)) {
if (current->callback(current->carry_data)) return -1;
if (current->callback(current->carry_data, current->fd)) return -1;
}
if (current->enabled && FD_ISSET(current->fd, &on_exceptional)) {
if (current->callback(current->carry_data)) return -1;
if (current->callback(current->carry_data, current->fd)) return -1;
}
}
return result;
Expand Down Expand Up @@ -156,7 +156,7 @@ void event_system_start(event_system *ev_sys) {
}
}
// fire event and exit in case it returns something else than 0
if (next_event->callback(next_event->carry_data)) {
if (next_event->callback(next_event->carry_data, -1)) {
break;
}
// update timed_event::last_call
Expand Down
2 changes: 1 addition & 1 deletion src/include/rasta/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ typedef struct event_system event_system;
typedef struct rasta rasta;

// event callback pointer, return 0 to keep the loop running, everything else stops the loop
typedef int (*event_ptr)(void *h);
typedef int (*event_ptr)(void *h, int fd);

#define EV_READABLE (1 << 0)
#define EV_WRITABLE (1 << 1)
Expand Down
Loading

0 comments on commit 9e3a175

Please sign in to comment.