From 3b227bc3ea639afbfd8e4d1f274153e0329ff8a8 Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Mon, 8 May 2023 10:53:27 +0200 Subject: [PATCH] Improve port API and update generic_unix socket_driver to use it Add `port_ensure_available_with_roots` after `memory_ensure_free_with_roots` Add several new `port_send_*` functions Update `port_send_reply` to ensure memory is available Document `port_send_*` functions Use these new functions in generic_unix's socket_driver Signed-off-by: Paul Guyot --- .clang-format-ignore | 1 - src/libAtomVM/port.c | 40 +- src/libAtomVM/port.h | 95 ++- .../generic_unix/lib/platform_defaultatoms.c | 2 + .../generic_unix/lib/platform_defaultatoms.h | 42 +- .../generic_unix/lib/socket_driver.c | 621 ++++++++++-------- .../generic_unix/lib/socket_driver.h | 11 - 7 files changed, 487 insertions(+), 325 deletions(-) diff --git a/.clang-format-ignore b/.clang-format-ignore index 9810d4b5d..28cb07856 100644 --- a/.clang-format-ignore +++ b/.clang-format-ignore @@ -36,7 +36,6 @@ src/platforms/esp32/build/* src/platforms/esp32/components/* src/platforms/generic_unix/lib/platform_defaultatoms.c src/platforms/generic_unix/lib/platform_nifs.c -src/platforms/generic_unix/lib/socket_driver.c src/platforms/stm32/src/lib/platform_defaultatoms.c src/platforms/stm32/src/lib/platform_nifs.c src/platforms/stm32/build/* diff --git a/src/libAtomVM/port.c b/src/libAtomVM/port.c index e5e1b06cb..7e1ed4be2 100644 --- a/src/libAtomVM/port.c +++ b/src/libAtomVM/port.c @@ -87,39 +87,37 @@ void port_send_message_nolock(GlobalContext *glb, term pid, term msg) globalcontext_send_message_nolock(glb, local_process_id, msg); } -void port_ensure_available(Context *ctx, size_t size) -{ - if (context_avail_free_memory(ctx) < size) { - switch (memory_ensure_free(ctx, size)) { - case MEMORY_GC_OK: - break; - case MEMORY_GC_ERROR_FAILED_ALLOCATION: - // TODO Improve error handling - fprintf(stderr, "Failed to allocate additional heap storage: [%s:%i]\n", __FILE__, __LINE__); - AVM_ABORT(); - case MEMORY_GC_DENIED_ALLOCATION: - // TODO Improve error handling - fprintf(stderr, "Not permitted to allocate additional heap storage: [%s:%i]\n", __FILE__, __LINE__); - AVM_ABORT(); - } +void port_ensure_available_with_roots(Context *ctx, size_t size, size_t num_roots, term *roots, enum MemoryShrinkMode shrink_mode) +{ + switch (memory_ensure_free_with_roots(ctx, size, num_roots, roots, shrink_mode)) { + case MEMORY_GC_OK: + break; + case MEMORY_GC_ERROR_FAILED_ALLOCATION: + // TODO Improve error handling + fprintf(stderr, "Failed to allocate additional heap storage: [%s:%i]\n", __FILE__, __LINE__); + AVM_ABORT(); + case MEMORY_GC_DENIED_ALLOCATION: + // TODO Improve error handling + fprintf(stderr, "Not permitted to allocate additional heap storage: [%s:%i]\n", __FILE__, __LINE__); + AVM_ABORT(); } } -int port_is_standard_port_command(term t) +bool port_is_standard_port_command(term t) { if (!term_is_tuple(t)) { - return 0; + return false; } else if (term_get_tuple_arity(t) != 3) { - return 0; + return false; } else { term pid = term_get_tuple_element(t, 0); term ref = term_get_tuple_element(t, 1); if (!term_is_pid(pid)) { - return 0; + return false; } else if (!term_is_reference(ref)) { - return 0; + return false; } else { - return 1; + return true; } } } diff --git a/src/libAtomVM/port.h b/src/libAtomVM/port.h index 3fc04d951..4d06040ab 100644 --- a/src/libAtomVM/port.h +++ b/src/libAtomVM/port.h @@ -25,11 +25,19 @@ extern "C" { #endif +#include + #include "context.h" #include "defaultatoms.h" #include "globalcontext.h" #include "term.h" +// Constants to use with port_ensure_available* +#define PORT_ERROR_TUPLE_SIZE TUPLE_SIZE(2) +#define PORT_SYS_ERROR_TUPLE_SIZE TUPLE_SIZE(2) + PORT_ERROR_TUPLE_SIZE +#define PORT_OK_TUPLE_SIZE TUPLE_SIZE(2) +#define PORT_REPLY_SIZE TUPLE_SIZE(2) + // All port_* functions with a given ctx may only be called from the // executed ctx. term port_create_tuple2(Context *ctx, term a, term b); @@ -41,8 +49,12 @@ term port_create_ok_tuple(Context *ctx, term t); term port_create_reply(Context *ctx, term ref, term payload); void port_send_message(GlobalContext *glb, term pid, term msg); void port_send_message_nolock(GlobalContext *glb, term pid, term msg); -void port_ensure_available(Context *ctx, size_t size); -int port_is_standard_port_command(term msg); +void port_ensure_available_with_roots(Context *ctx, size_t size, size_t num_roots, term *roots, enum MemoryShrinkMode shrink_mode); +static inline void port_ensure_available(Context *ctx, size_t size) +{ + port_ensure_available_with_roots(ctx, size, 0, NULL, MEMORY_NO_SHRINK); +} +bool port_is_standard_port_command(term msg); // Sometimes ports need to send messages while not executed in a given ctx // (typically event handlers). @@ -54,9 +66,86 @@ term port_heap_create_sys_error_tuple(term **heap_ptr, term syscall, int errno); term port_heap_create_ok_tuple(term **heap_ptr, term t); term port_heap_create_reply(term **heap_ptr, term ref, term payload); -// Helper to send a message from NIFs or from the native handler. +/** + * @brief Send a reply to a process, typically a port client + * @details Send a reply tuple `{Ref, Payload}` to process pid. This function + * ensures memory is available, eventually garbage collecting terms except + * `ref` and `payload`. + * @param ctx the current context (native handler for ports) + * @param pid the pid to send the reply to + * @param ref the ref that tags the reply + * @param payload reply payload + */ static inline void port_send_reply(Context *ctx, term pid, term ref, term payload) { + term roots[2]; + roots[0] = ref; + roots[1] = payload; + port_ensure_available_with_roots(ctx, PORT_REPLY_SIZE, 2, roots, MEMORY_NO_SHRINK); + term reply = port_create_reply(ctx, ref, payload); + port_send_message(ctx->global, pid, reply); +} + +/** + * @brief Send an ok tuple to a process, typically a port client + * @details Send a reply tuple `{Ref, {ok, T}}` to process pid. This + * function ensures memory is available, eventually garbage collecting terms + * except `ref` and `t`. + * @param ctx the current context (native handler for ports) + * @param pid the pid to send the reply to + * @param ref the ref that tags the reply + * @param t term returned with the ok tuple (can be an atom or any term) + */ +static inline void port_send_ok_tuple(Context *ctx, term pid, term ref, term t) +{ + term roots[2]; + roots[0] = ref; + roots[1] = t; + port_ensure_available_with_roots(ctx, PORT_REPLY_SIZE + PORT_OK_TUPLE_SIZE, 2, roots, MEMORY_NO_SHRINK); + term payload = port_create_ok_tuple(ctx, t); + term reply = port_create_reply(ctx, ref, payload); + port_send_message(ctx->global, pid, reply); +} + +/** + * @brief Send an error tuple to a process, typically a port client + * @details Send a reply tuple `{Ref, {error, Reason}}` to process pid. This + * function ensures memory is available, eventually garbage collecting terms + * except `ref` and `reason`. + * @param ctx the current context (native handler for ports) + * @param pid the pid to send the reply to + * @param ref the ref that tags the reply + * @param reason error reason (can be an atom or any term) + */ +static inline void port_send_error_tuple(Context *ctx, term pid, term ref, term reason) +{ + term roots[2]; + roots[0] = ref; + roots[1] = reason; + port_ensure_available_with_roots(ctx, PORT_REPLY_SIZE + PORT_ERROR_TUPLE_SIZE, 2, roots, MEMORY_NO_SHRINK); + term payload = port_create_error_tuple(ctx, reason); + term reply = port_create_reply(ctx, ref, payload); + port_send_message(ctx->global, pid, reply); +} + +/** + * @brief Send a sys error tuple to a process, typically a port client + * @details Send a reply tuple `{Ref, {error, {Syscall, Errno}}` to process pid. + * This function ensures memory is available, eventually garbage collecting + * terms except `ref` and `syscall`. + * @param ctx the current context (native handler for ports) + * @param pid the pid to send the reply to + * @param ref the ref that tags the reply + * @param syscall error syscall (can be an atom or any term) + * @param errno error number + */ +static inline void port_send_sys_error_tuple(Context *ctx, term pid, term ref, term syscall, int errno) +{ + term roots[2]; + roots[0] = ref; + roots[1] = syscall; + port_ensure_available_with_roots(ctx, PORT_REPLY_SIZE + PORT_SYS_ERROR_TUPLE_SIZE, 2, roots, MEMORY_NO_SHRINK); + term payload = port_create_sys_error_tuple(ctx, syscall, errno); term reply = port_create_reply(ctx, ref, payload); port_send_message(ctx->global, pid, reply); } diff --git a/src/platforms/generic_unix/lib/platform_defaultatoms.c b/src/platforms/generic_unix/lib/platform_defaultatoms.c index 225144e73..7dc9dcec4 100644 --- a/src/platforms/generic_unix/lib/platform_defaultatoms.c +++ b/src/platforms/generic_unix/lib/platform_defaultatoms.c @@ -26,6 +26,7 @@ static const char *const tcp_atom = "\x3" "tcp"; static const char *const socket_atom = "\x6" "socket"; static const char *const fcntl_atom = "\x5" "fcntl"; static const char *const bind_atom = "\x4" "bind"; +static const char *const getpeername_atom = "\xB" "getpeername"; static const char *const getsockname_atom = "\xB" "getsockname"; static const char *const recvfrom_atom = "\x8" "recvfrom"; static const char *const recv_atom = "\x4" "recv"; @@ -61,6 +62,7 @@ void platform_defaultatoms_init(GlobalContext *glb) ok &= globalcontext_insert_atom(glb, socket_atom) == SOCKET_ATOM_INDEX; ok &= globalcontext_insert_atom(glb, fcntl_atom) == FCNTL_ATOM_INDEX; ok &= globalcontext_insert_atom(glb, bind_atom) == BIND_ATOM_INDEX; + ok &= globalcontext_insert_atom(glb, getpeername_atom) == GETPEERNAME_ATOM_INDEX; ok &= globalcontext_insert_atom(glb, getsockname_atom) == GETSOCKNAME_ATOM_INDEX; ok &= globalcontext_insert_atom(glb, recvfrom_atom) == RECVFROM_ATOM_INDEX; ok &= globalcontext_insert_atom(glb, recv_atom) == RECV_ATOM_INDEX; diff --git a/src/platforms/generic_unix/lib/platform_defaultatoms.h b/src/platforms/generic_unix/lib/platform_defaultatoms.h index c65ae6565..e6b57179c 100644 --- a/src/platforms/generic_unix/lib/platform_defaultatoms.h +++ b/src/platforms/generic_unix/lib/platform_defaultatoms.h @@ -29,30 +29,31 @@ #define SOCKET_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 3) #define FCNTL_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 4) #define BIND_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 5) -#define GETSOCKNAME_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 6) -#define RECVFROM_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 7) -#define RECV_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 8) -#define SENDTO_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 9) -#define SEND_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 10) +#define GETPEERNAME_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 6) +#define GETSOCKNAME_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 7) +#define RECVFROM_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 8) +#define RECV_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 9) +#define SENDTO_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 10) +#define SEND_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 11) -#define STA_GOT_IP_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 11) -#define STA_CONNECTED_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 12) +#define STA_GOT_IP_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 12) +#define STA_CONNECTED_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 13) -#define ADDRESS_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 13) -#define CONTROLLING_PROCESS_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 14) -#define ACTIVE_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 15) -#define BUFFER_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 16) +#define ADDRESS_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 14) +#define CONTROLLING_PROCESS_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 15) +#define ACTIVE_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 16) +#define BUFFER_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 17) -#define GETADDRINFO_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 17) -#define NO_SUCH_HOST_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 18) -#define CONNECT_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 19) -#define TCP_CLOSED_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 20) +#define GETADDRINFO_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 18) +#define NO_SUCH_HOST_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 19) +#define CONNECT_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 20) +#define TCP_CLOSED_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 21) -#define LISTEN_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 21) -#define BACKLOG_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 22) -#define ACCEPT_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 23) -#define FD_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 24) -#define GENERIC_UNIX_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 25) +#define LISTEN_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 22) +#define BACKLOG_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 23) +#define ACCEPT_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 24) +#define FD_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 25) +#define GENERIC_UNIX_ATOM_INDEX (PLATFORM_ATOMS_BASE_INDEX + 26) #define PROTO_ATOM term_from_atom_index(PROTO_ATOM_INDEX) #define UDP_ATOM term_from_atom_index(UDP_ATOM_INDEX) @@ -60,6 +61,7 @@ #define SOCKET_ATOM term_from_atom_index(SOCKET_ATOM_INDEX) #define FCNTL_ATOM term_from_atom_index(FCNTL_ATOM_INDEX) #define BIND_ATOM term_from_atom_index(BIND_ATOM_INDEX) +#define GETPEERNAME_ATOM term_from_atom_index(GETPEERNAME_ATOM_INDEX) #define GETSOCKNAME_ATOM term_from_atom_index(GETSOCKNAME_ATOM_INDEX) #define RECVFROM_ATOM term_from_atom_index(RECVFROM_ATOM_INDEX) #define RECV_ATOM term_from_atom_index(RECV_ATOM_INDEX) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index 7c2f9626c..8e895dad8 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -84,27 +84,28 @@ typedef struct SocketDriverData } SocketDriverData; // TODO define in defaultatoms -const char *const send_a = "\x4" "send"; -const char *const sendto_a = "\x6" "sendto"; -const char *const init_a = "\x4" "init"; -const char *const bind_a = "\x4" "bind"; -const char *const recvfrom_a = "\x8" "recvfrom"; -const char *const recv_a = "\x4" "recv"; -const char *const close_a = "\x5" "close"; -const char *const closed_a = "\x6" "closed"; -const char *const get_port_a = "\x8" "get_port"; -const char *const accept_a = "\x6" "accept"; -const char *const sockname_a = "\x8" "sockname"; -const char *const peername_a = "\x8" "peername"; -const char *const controlling_process_a = "\x13" "controlling_process"; -const char *const not_owner_a = "\x9" "not_owner"; - -const char *const close_internal = "\x14" "$atomvm_socket_close"; +const char *const init_a = ATOM_STR("\x4", "init"); +const char *const get_port_a = ATOM_STR("\x8", "get_port"); +const char *const sockname_a = ATOM_STR("\x8", "sockname"); +const char *const peername_a = ATOM_STR("\x8", "peername"); +const char *const not_owner_a = ATOM_STR("\x9", "not_owner"); + +const char *const close_internal = ATOM_STR("\x14", "$atomvm_socket_close"); static EventListener *active_recv_callback(GlobalContext *glb, EventListener *listener); static EventListener *passive_recv_callback(GlobalContext *glb, EventListener *listener); static EventListener *active_recvfrom_callback(GlobalContext *glb, EventListener *listener); static EventListener *passive_recvfrom_callback(GlobalContext *glb, EventListener *listener); +static NativeHandlerResult socket_driver_do_init(Context *ctx, term pid, term ref, term params); +static void socket_driver_do_send(Context *ctx, term pid, term ref, term data); +static void socket_driver_do_sendto(Context *ctx, term pid, term ref, term dest_address, term dest_port, term data); +static void socket_driver_do_recv(Context *ctx, term pid, term ref, term length, term timeout); +static void socket_driver_do_recvfrom(Context *ctx, term pid, term ref, term length, term timeout); +static void socket_driver_do_close(Context *ctx); +static void socket_driver_get_port(Context *ctx, term pid, term ref); +static void socket_driver_do_accept(Context *ctx, term pid, term ref, term timeout); +static void socket_driver_sockname(Context *ctx, term pid, term ref); +static void socket_driver_peername(Context *ctx, term pid, term ref); static NativeHandlerResult socket_consume_mailbox(Context *ctx); uint32_t socket_tuple_to_addr(term addr_tuple) @@ -115,6 +116,8 @@ uint32_t socket_tuple_to_addr(term addr_tuple) | (term_to_int32(term_get_tuple_element(addr_tuple, 3)) & 0xFF); } +#define SOCKET_INET_ADDR TUPLE_SIZE(4) + term socket_ctx_tuple_from_addr(Context *ctx, uint32_t addr) { term terms[4]; @@ -166,11 +169,22 @@ void socket_driver_delete_data(void *data) free(data); } -static term do_bind(Context *ctx, term address, term port) +/** + * @brief Bind a socket + * @details This function binds a socket. It sends an error reply to the caller + * if an error occurs, but doesn't send OK if it succeeds. + * @param ctx the driver context + * @param pid the caller's pid, for the reply + * @param ref the caller's message ref, to tag the reply + * @param params initialization socket parameters with address and port + * @return 0 on success + */ +static int do_bind(Context *ctx, term pid, term ref, term params) { SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; + term address = interop_proplist_get_value_default(params, ADDRESS_ATOM, UNDEFINED_ATOM); + term port = interop_proplist_get_value(params, PORT_ATOM); struct sockaddr_in serveraddr; - UNUSED(address); memset(&serveraddr, 0, sizeof(serveraddr)); serveraddr.sin_family = AF_INET; if (address == UNDEFINED_ATOM) { @@ -178,66 +192,105 @@ static term do_bind(Context *ctx, term address, term port) } else if (term_is_tuple(address)) { serveraddr.sin_addr.s_addr = htonl(socket_tuple_to_addr(address)); } else { - term_display(stderr, address, ctx); - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return -1; + } + if (!term_is_integer(port)) { + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return -1; } avm_int_t p = term_to_int(port); serveraddr.sin_port = htons(p); socklen_t address_len = sizeof(serveraddr); if (bind(socket_data->sockfd, (struct sockaddr *) &serveraddr, address_len) == -1) { - return port_create_sys_error_tuple(ctx, BIND_ATOM, errno); - } else { - TRACE("socket_driver|do_bind: bound to %ld\n", p); - if (getsockname(socket_data->sockfd, (struct sockaddr *) &serveraddr, &address_len) == -1) { - return port_create_sys_error_tuple(ctx, GETSOCKNAME_ATOM, errno); - } else { - socket_data->port = ntohs(serveraddr.sin_port); - return OK_ATOM; - } + port_send_sys_error_tuple(ctx, pid, ref, BIND_ATOM, errno); + return -1; } + if (getsockname(socket_data->sockfd, (struct sockaddr *) &serveraddr, &address_len) == -1) { + port_send_sys_error_tuple(ctx, pid, ref, GETSOCKNAME_ATOM, errno); + return -1; + } + socket_data->port = ntohs(serveraddr.sin_port); + TRACE("socket_driver|do_bind: bound to %ld\n", socket_data->port); + return 0; } -static term init_udp_socket(Context *ctx, SocketDriverData *socket_data, term params) +/** + * @brief Initialize a UDP socket + * @details This function initializes a UDP socket and sends the reply to + * the caller + * @param ctx the driver context + * @param pid the caller's pid, for the reply + * @param ref the caller's message ref, to tag the reply + * @param socket_data current socket data, updated with fd and listener + * @param params UDP initialization socket parameters with address and port + * @return NativeContinue or NativeTerminate if the driver should terminate + */ +static NativeHandlerResult init_udp_socket(Context *ctx, term pid, term ref, SocketDriverData *socket_data, term params) { GlobalContext *glb = ctx->global; int sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (sockfd == -1) { - return port_create_sys_error_tuple(ctx, SOCKET_ATOM, errno); + port_send_sys_error_tuple(ctx, pid, ref, SOCKET_ATOM, errno); + return NativeTerminate; } socket_data->sockfd = sockfd; if (fcntl(socket_data->sockfd, F_SETFL, O_NONBLOCK) == -1) { close(sockfd); - return port_create_sys_error_tuple(ctx, FCNTL_ATOM, errno); + port_send_sys_error_tuple(ctx, pid, ref, FCNTL_ATOM, errno); + return NativeTerminate; } - term address = interop_proplist_get_value_default(params, ADDRESS_ATOM, UNDEFINED_ATOM); - term port = interop_proplist_get_value(params, PORT_ATOM); - term ret = do_bind(ctx, address, port); - if (ret != OK_ATOM) { + if (do_bind(ctx, pid, ref, params)) { close(sockfd); - } else { - if (socket_data->active) { - ActiveRecvListener *listener = malloc(sizeof(ActiveRecvListener)); - if (IS_NULL_PTR(listener)) { - fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); - AVM_ABORT(); - } - listener->base.fd = socket_data->sockfd; - listener->base.handler = active_recvfrom_callback; - listener->buf_size = socket_data->buffer; - listener->process_id = ctx->process_id; - sys_register_listener(glb, &listener->base); - socket_data->active_listener = listener; + return NativeTerminate; + } + if (socket_data->active) { + ActiveRecvListener *listener = malloc(sizeof(ActiveRecvListener)); + if (IS_NULL_PTR(listener)) { + fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); + AVM_ABORT(); } + listener->base.fd = socket_data->sockfd; + listener->base.handler = active_recvfrom_callback; + listener->buf_size = socket_data->buffer; + listener->process_id = ctx->process_id; + sys_register_listener(glb, &listener->base); + socket_data->active_listener = listener; } - return ret; + port_send_reply(ctx, pid, ref, OK_ATOM); + return NativeContinue; } -static term do_connect(SocketDriverData *socket_data, Context *ctx, term address, term port) +/** + * @brief Connect a TCP socket + * @details This function connects a TCP socket. It sends an error reply to the + * caller if an error occurs, but doesn't send OK if it succeeds. + * @param ctx the driver context + * @param pid the caller's pid, for the reply + * @param ref the caller's message ref, to tag the reply + * @param params UDP initialization socket parameters with address and port + * @return 0 on success, -1 on failure + */ +static int do_connect(Context *ctx, term pid, term ref, term params) { + SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; + int sockfd = socket_data->sockfd; + term address = interop_proplist_get_value(params, ADDRESS_ATOM); + if (!term_is_list(address)) { + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return -1; + } + term port = interop_proplist_get_value(params, PORT_ATOM); + if (!term_is_integer(port)) { + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return -1; + } + // TODO handle IP addresses if (!term_is_list(address)) { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return -1; } struct addrinfo hints; memset(&hints, 0, sizeof(hints)); @@ -248,11 +301,12 @@ static term do_connect(SocketDriverData *socket_data, Context *ctx, term address int ok; char *addr_str = interop_term_to_string(address, &ok); if (!ok) { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return -1; } char port_str[32]; snprintf(port_str, 32, "%u", (unsigned short) term_to_int(port)); - TRACE("socket_driver:do_connect: resolving to %s:%s over socket fd %i\n", addr_str, port_str, term_to_int32(socket_data->sockfd)); + TRACE("socket_driver:do_connect: resolving to %s:%s over socket fd %i\n", addr_str, port_str, sockfd); struct addrinfo *server_info; int status = getaddrinfo(addr_str, port_str, &hints, &server_info); @@ -260,7 +314,8 @@ static term do_connect(SocketDriverData *socket_data, Context *ctx, term address free(addr_str); if (status != 0) { - return port_create_sys_error_tuple(ctx, GETADDRINFO_ATOM, status); + port_send_sys_error_tuple(ctx, pid, ref, GETADDRINFO_ATOM, status); + return -1; } struct sockaddr *addr = NULL; @@ -271,72 +326,107 @@ static term do_connect(SocketDriverData *socket_data, Context *ctx, term address break; } if (IS_NULL_PTR(addr)) { - return port_create_error_tuple(ctx, NO_SUCH_HOST_ATOM); + port_send_error_tuple(ctx, pid, ref, NO_SUCH_HOST_ATOM); + return -1; } - status = connect(socket_data->sockfd, addr, addr_len); + status = connect(sockfd, addr, addr_len); freeaddrinfo(server_info); if (status == -1) { - return port_create_sys_error_tuple(ctx, CONNECT_ATOM, errno); - } else { - TRACE("socket_driver|do_connect: connected.\n"); - return OK_ATOM; + port_send_sys_error_tuple(ctx, pid, ref, CONNECT_ATOM, errno); + return -1; } + TRACE("socket_driver|do_connect: connected.\n"); + return 0; } -static term init_client_tcp_socket(Context *ctx, SocketDriverData *socket_data, term params) +/** + * @brief Initialize context as a client TCP socket + * @details This function initializes and connects the TCP socket and sends the + * reply to the caller. Socket data is updated with fd and listener. + * @param ctx the driver context + * @param pid the caller's pid, for the reply + * @param ref the caller's message ref, to tag the reply + * @param params UDP initialization socket parameters with address and port + * @return NativeContinue or NativeTerminate if the driver should terminate + */ +static NativeHandlerResult init_client_tcp_socket(Context *ctx, term pid, term ref, term params) { + SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; GlobalContext *glb = ctx->global; int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { - return port_create_sys_error_tuple(ctx, SOCKET_ATOM, errno); + port_send_sys_error_tuple(ctx, pid, ref, SOCKET_ATOM, errno); + return NativeTerminate; } socket_data->sockfd = sockfd; - term address = interop_proplist_get_value(params, ADDRESS_ATOM); - term port = interop_proplist_get_value(params, PORT_ATOM); - term ret = do_connect(socket_data, ctx, address, port); - if (ret != OK_ATOM) { + if (do_connect(ctx, pid, ref, params)) { close(sockfd); - } else { - if (socket_data->active) { - ActiveRecvListener *listener = malloc(sizeof(ActiveRecvListener)); - if (IS_NULL_PTR(listener)) { - fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); - AVM_ABORT(); - } - listener->base.fd = socket_data->sockfd; - listener->base.handler = active_recv_callback; - listener->buf_size = socket_data->buffer; - listener->process_id = ctx->process_id; - sys_register_listener(glb, &listener->base); - socket_data->active_listener = listener; + return NativeTerminate; + } + if (socket_data->active) { + ActiveRecvListener *listener = malloc(sizeof(ActiveRecvListener)); + if (IS_NULL_PTR(listener)) { + fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); + AVM_ABORT(); } + listener->base.fd = socket_data->sockfd; + listener->base.handler = active_recv_callback; + listener->buf_size = socket_data->buffer; + listener->process_id = ctx->process_id; + sys_register_listener(glb, &listener->base); + socket_data->active_listener = listener; } - return ret; + port_send_reply(ctx, pid, ref, OK_ATOM); + return NativeContinue; } -static term do_listen(SocketDriverData *socket_data, Context *ctx, term params) +/** + * @brief Configure a tcp socket for listening + * @details This function configures the TCP socket for listening. It sends an + * error reply to the caller if an error occurs, but doesn't send OK if it + * succeeds. + * @param ctx the driver context + * @param pid the caller's pid, for the reply + * @param ref the caller's message ref, to tag the reply + * @param params TCP parameters with backlog + * @return 0 on success, -1 on failure + */ +static int do_listen(Context *ctx, term pid, term ref, term params) { + SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; term backlog = interop_proplist_get_value(params, BACKLOG_ATOM); if (!term_is_integer(backlog)) { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return -1; } - int status = listen(socket_data->sockfd, term_from_int(backlog)); - if (status == -1) { - return port_create_sys_error_tuple(ctx, LISTEN_ATOM, errno); - } else { - return OK_ATOM; + if (listen(socket_data->sockfd, term_from_int(backlog)) == -1) { + port_send_sys_error_tuple(ctx, pid, ref, LISTEN_ATOM, errno); + return -1; } + return 0; } -static term init_server_tcp_socket(Context *ctx, SocketDriverData *socket_data, term params) +/** + * @brief Initialize context as a server TCP socket + * @details This function initializes and binds the TCP socket and sends the + * reply to the caller. Socket data is updated with fd. + * @param ctx the driver context + * @param pid the caller's pid, for the reply + * @param ref the caller's message ref, to tag the reply + * @param params TCP initialization socket parameters with address and port + * @return NativeContinue or NativeTerminate if the driver should terminate + */ +static NativeHandlerResult init_server_tcp_socket(Context *ctx, term pid, term ref, term params) { + SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { - return port_create_sys_error_tuple(ctx, SOCKET_ATOM, errno); + port_send_sys_error_tuple(ctx, pid, ref, SOCKET_ATOM, errno); + return NativeTerminate; } socket_data->sockfd = sockfd; @@ -354,22 +444,20 @@ static term init_server_tcp_socket(Context *ctx, SocketDriverData *socket_data, if (fcntl(socket_data->sockfd, F_SETFL, O_NONBLOCK) == -1) { close(sockfd); - return port_create_sys_error_tuple(ctx, FCNTL_ATOM, errno); + port_send_sys_error_tuple(ctx, pid, ref, FCNTL_ATOM, errno); + return NativeTerminate; } - term address = interop_proplist_get_value_default(params, ADDRESS_ATOM, UNDEFINED_ATOM); - term port = interop_proplist_get_value(params, PORT_ATOM); - term ret = do_bind(ctx, address, port); - if (ret != OK_ATOM) { + if (do_bind(ctx, pid, ref, params)) { close(sockfd); - } else { - ret = do_listen(socket_data, ctx, params); - if (ret != OK_ATOM) { - close(sockfd); - } else { - TRACE("socket_driver|init_server_tcp_socket: listening on port %u\n", (unsigned) term_to_int(port)); - } + return NativeTerminate; } - return ret; + if (do_listen(ctx, pid, ref, params)) { + close(sockfd); + return NativeTerminate; + } + port_send_reply(ctx, pid, ref, OK_ATOM); + TRACE("socket_driver|init_server_tcp_socket: listening on port %u\n", (unsigned) term_to_int(port)); + return NativeContinue; } static Context *create_accepting_socket(GlobalContext *glb, SocketDriverData *new_socket_data) @@ -395,15 +483,17 @@ static ActiveRecvListener *create_accepting_socket_listener(Context *ctx, Socket return listener; } -term socket_driver_do_init(Context *ctx, term params) +static NativeHandlerResult socket_driver_do_init(Context *ctx, term pid, term ref, term params) { SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; if (!term_is_list(params)) { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NativeTerminate; } term proto = interop_proplist_get_value(params, PROTO_ATOM); if (term_is_nil(proto)) { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NativeTerminate; } socket_data->proto = proto; // @@ -411,7 +501,8 @@ term socket_driver_do_init(Context *ctx, term params) // term controlling_process = interop_proplist_get_value_default(params, CONTROLLING_PROCESS_ATOM, term_invalid_term()); if (!(term_is_invalid_term(controlling_process) || term_is_pid(controlling_process))) { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NativeTerminate; } socket_data->controlling_process = controlling_process; // @@ -419,7 +510,8 @@ term socket_driver_do_init(Context *ctx, term params) // term binary = interop_proplist_get_value_default(params, BINARY_ATOM, FALSE_ATOM); if (!(binary == TRUE_ATOM || binary == FALSE_ATOM)) { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NativeTerminate; } socket_data->binary = binary == TRUE_ATOM; // @@ -427,12 +519,14 @@ term socket_driver_do_init(Context *ctx, term params) // term buffer = interop_proplist_get_value_default(params, BUFFER_ATOM, term_from_int(BUFSIZE)); if (!term_is_integer(buffer)) { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NativeTerminate; } avm_int_t buffer_val = term_to_int(buffer); #if AVM_INT_MAX > SIZE_MAX if (buffer_val > SIZE_MAX) { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NativeTerminate; } #endif socket_data->buffer = (size_t) buffer_val; @@ -441,33 +535,35 @@ term socket_driver_do_init(Context *ctx, term params) // term active = interop_proplist_get_value_default(params, ACTIVE_ATOM, FALSE_ATOM); if (!(active == TRUE_ATOM || active == FALSE_ATOM)) { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NativeTerminate; } socket_data->active = active == TRUE_ATOM; // // initialize based on specified protocol and action // if (proto == UDP_ATOM) { - term ret = init_udp_socket(ctx, socket_data, params); - return ret; + return init_udp_socket(ctx, pid, ref, socket_data, params); } else if (proto == TCP_ATOM) { term connect = interop_proplist_get_value_default(params, CONNECT_ATOM, FALSE_ATOM); if (connect == TRUE_ATOM) { - return init_client_tcp_socket(ctx, socket_data, params); + return init_client_tcp_socket(ctx, pid, ref, params); } else { term listen = interop_proplist_get_value_default(params, LISTEN_ATOM, FALSE_ATOM); if (listen == TRUE_ATOM) { - return init_server_tcp_socket(ctx, socket_data, params); + return init_server_tcp_socket(ctx, pid, ref, params); } else { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NativeTerminate; } } } else { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NativeTerminate; } } -void socket_driver_do_close(Context *ctx) +static void socket_driver_do_close(Context *ctx) { SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; if (close(socket_data->sockfd) == -1) { @@ -477,192 +573,192 @@ void socket_driver_do_close(Context *ctx) } } -static term socket_driver_controlling_process(Context *ctx, term pid, term new_pid_term) +static void socket_driver_controlling_process(Context *ctx, term pid, term ref, term new_pid_term) { struct SocketDriverData *socket_data = ctx->platform_data; + term reply; if (UNLIKELY(!term_is_pid(new_pid_term))) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_ABORT(); - } - term error = term_alloc_tuple(2, ctx); - term_put_tuple_element(error, 0, ERROR_ATOM); - term_put_tuple_element(error, 1, BADARG_ATOM); - return error; + port_ensure_available_with_roots(ctx, PORT_REPLY_SIZE + PORT_ERROR_TUPLE_SIZE, 1, &ref, MEMORY_NO_SHRINK); + reply = port_create_error_tuple(ctx, BADARG_ATOM); } else if (UNLIKELY(pid != socket_data->controlling_process)) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_ABORT(); - } - term error = term_alloc_tuple(2, ctx); - term_put_tuple_element(error, 0, ERROR_ATOM); - term_put_tuple_element(error, 1, globalcontext_make_atom(ctx->global, not_owner_a)); - return error; + port_ensure_available_with_roots(ctx, PORT_REPLY_SIZE + PORT_ERROR_TUPLE_SIZE, 1, &ref, MEMORY_NO_SHRINK); + reply = port_create_error_tuple(ctx, globalcontext_make_atom(ctx->global, not_owner_a)); } else { + port_ensure_available_with_roots(ctx, PORT_REPLY_SIZE, 1, &ref, MEMORY_NO_SHRINK); socket_data->controlling_process = new_pid_term; - return OK_ATOM; + reply = OK_ATOM; } + port_send_reply(ctx, pid, ref, reply); } // // INET API // -term socket_driver_get_port(Context *ctx) +void socket_driver_get_port(Context *ctx, term pid, term ref) { SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; - port_ensure_available(ctx, 7); - return port_create_ok_tuple(ctx, term_from_int(socket_data->port)); + port_send_ok_tuple(ctx, pid, ref, term_from_int(socket_data->port)); } -term socket_driver_sockname(Context *ctx) +void socket_driver_send_name_reply(Context *ctx, term pid, term ref, struct sockaddr_in *addr) +{ + port_ensure_available_with_roots(ctx, PORT_REPLY_SIZE + SOCKET_INET_ADDR + TUPLE_SIZE(2) + PORT_OK_TUPLE_SIZE, 1, &ref, MEMORY_NO_SHRINK); + term addr_term = socket_ctx_tuple_from_addr(ctx, ntohl(addr->sin_addr.s_addr)); + term port_term = term_from_int(ntohs(addr->sin_port)); + term addr_port = port_create_tuple2(ctx, addr_term, port_term); + port_send_ok_tuple(ctx, pid, ref, addr_port); +} + +static void socket_driver_sockname(Context *ctx, term pid, term ref) { SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); - int result = getsockname(socket_data->sockfd, (struct sockaddr *) &addr, &addrlen); - if (result != 0) { - port_ensure_available(ctx, 3); - return port_create_error_tuple(ctx, term_from_int(errno)); - } else { - port_ensure_available(ctx, 11); - term addr_term = socket_ctx_tuple_from_addr( - ctx, ntohl(addr.sin_addr.s_addr)); - term port_term = term_from_int(ntohs(addr.sin_port)); - term addr_port = port_create_tuple2( - ctx, - addr_term, - port_term); - return port_create_tuple2( - ctx, - OK_ATOM, - addr_port); + if (getsockname(socket_data->sockfd, (struct sockaddr *) &addr, &addrlen) == -1) { + port_send_sys_error_tuple(ctx, pid, ref, GETSOCKNAME_ATOM, errno); } + socket_driver_send_name_reply(ctx, pid, ref, &addr); } -term socket_driver_peername(Context *ctx) +static void socket_driver_peername(Context *ctx, term pid, term ref) { SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); - int result = getpeername(socket_data->sockfd, (struct sockaddr *) &addr, &addrlen); - if (result != 0) { - port_ensure_available(ctx, 3); - return port_create_error_tuple(ctx, term_from_int(errno)); - } else { - port_ensure_available(ctx, 11); - term addr_term = socket_ctx_tuple_from_addr( - ctx, ntohl(addr.sin_addr.s_addr)); - term port_term = term_from_int(ntohs(addr.sin_port)); - term addr_port = port_create_tuple2( - ctx, - addr_term, - port_term); - return port_create_tuple2( - ctx, - OK_ATOM, - addr_port); + if (getpeername(socket_data->sockfd, (struct sockaddr *) &addr, &addrlen) == -1) { + port_send_sys_error_tuple(ctx, pid, ref, GETPEERNAME_ATOM, errno); } + socket_driver_send_name_reply(ctx, pid, ref, &addr); } // // send operations // -term socket_driver_do_send(Context *ctx, term data) +/** + * @brief get buffer data from provided term + * @details return or allocate buffer data. Sends an error message to the caller + * if it fails. The buffer may be allocated, `socket_driver_send_free_data` + * must be called to free it. + * @param ctx the current context + * @param pid the caller's pid + * @param ref the message ref + * @param data the data to get a buffer from + * @param len on output, length of data + * @returns NULL if it failed + */ +static char *socket_driver_send_get_data(Context *ctx, term pid, term ref, term data, size_t *len) { - SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; - char *buf; - size_t len; if (term_is_binary(data)) { buf = (char *) term_binary_data(data); - len = term_binary_size(data); + *len = term_binary_size(data); } else if (term_is_list(data)) { - switch (interop_iolist_size(data, &len)) { + switch (interop_iolist_size(data, len)) { case InteropOk: break; case InteropMemoryAllocFail: - return port_create_error_tuple(ctx, OUT_OF_MEMORY_ATOM); + port_send_error_tuple(ctx, pid, ref, OUT_OF_MEMORY_ATOM); + return NULL; case InteropBadArg: - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NULL; } - buf = malloc(len); + buf = malloc(*len); switch (interop_write_iolist(data, buf)) { case InteropOk: break; case InteropMemoryAllocFail: free(buf); - return port_create_error_tuple(ctx, OUT_OF_MEMORY_ATOM); + port_send_error_tuple(ctx, pid, ref, OUT_OF_MEMORY_ATOM); + return NULL; case InteropBadArg: free(buf); - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NULL; } } else { - return port_create_error_tuple(ctx, BADARG_ATOM); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); + return NULL; } + return buf; +} - int sent_data = send(socket_data->sockfd, buf, len, 0); +/** + * @brief free the buffer data that was allocated by + * `socket_driver_send_get_data` + * @param data the data `socket_driver_send_get_data` got buffer from + * @param buf the result of `socket_driver_send_get_data` + */ +static void socket_driver_send_free_data(term data, char *buf) +{ if (term_is_list(data)) { free(buf); } +} - if (sent_data == -1) { - return port_create_sys_error_tuple(ctx, SEND_ATOM, errno); - } else { - TRACE("socket_driver_do_send: sent data with len %li to fd %i\n", len, socket_data->sockfd); - term sent_atom = term_from_int(sent_data); - return port_create_ok_tuple(ctx, sent_atom); +/** + * @brief send data to a TCP socket + * @details return an error tuple or `{Ref, {ok, Len}}` to the caller + * @param ctx the driver context + * @param pid caller's pid + * @param ref message ref, for the reply + * @param data iolist or binary of the data to send + */ +static void socket_driver_do_send(Context *ctx, term pid, term ref, term data) +{ + SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; + char *buf; + size_t len; + + buf = socket_driver_send_get_data(ctx, pid, ref, data, &len); + if (buf) { + int sent_data = send(socket_data->sockfd, buf, len, 0); + socket_driver_send_free_data(data, buf); + + if (sent_data < 0) { + port_send_sys_error_tuple(ctx, pid, ref, SEND_ATOM, errno); + } else { + TRACE("socket_driver_do_send: sent data with len %li to fd %i\n", len, socket_data->sockfd); + port_send_ok_tuple(ctx, pid, ref, term_from_int(sent_data)); + } } } -term socket_driver_do_sendto(Context *ctx, term dest_address, term dest_port, term data) +/** + * @brief send data to a UDP socket + * @details return an error tuple or `{Ref, {ok, Len}}` to the caller + * @param ctx the driver context + * @param pid caller's pid + * @param ref message ref, for the reply + * @param data iolist or binary of the data to send + */ +static void socket_driver_do_sendto(Context *ctx, term pid, term ref, term dest_address, term dest_port, term data) { SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; - struct sockaddr_in addr; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(socket_tuple_to_addr(dest_address)); - addr.sin_port = htons(term_to_int32(dest_port)); char *buf; size_t len; - if (term_is_binary(data)) { - buf = (char *) term_binary_data(data); - len = term_binary_size(data); - } else if (term_is_list(data)) { - switch (interop_iolist_size(data, &len)) { - case InteropOk: - break; - case InteropMemoryAllocFail: - return port_create_error_tuple(ctx, OUT_OF_MEMORY_ATOM); - case InteropBadArg: - return port_create_error_tuple(ctx, BADARG_ATOM); - } - buf = malloc(len); - switch (interop_write_iolist(data, buf)) { - case InteropOk: - break; - case InteropMemoryAllocFail: - free(buf); - return port_create_error_tuple(ctx, OUT_OF_MEMORY_ATOM); - case InteropBadArg: - free(buf); - return port_create_error_tuple(ctx, BADARG_ATOM); + + buf = socket_driver_send_get_data(ctx, pid, ref, data, &len); + if (buf) { + struct sockaddr_in addr; + memset(&addr, 0, sizeof(struct sockaddr_in)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(socket_tuple_to_addr(dest_address)); + addr.sin_port = htons(term_to_int32(dest_port)); + int sent_data = sendto(socket_data->sockfd, buf, len, 0, (struct sockaddr *) &addr, sizeof(addr)); + socket_driver_send_free_data(data, buf); + + if (sent_data < 0) { + port_send_sys_error_tuple(ctx, pid, ref, SEND_ATOM, errno); + } else { + TRACE("socket_driver_do_sendto: sent data with len %li to fd %i\n", len, socket_data->sockfd); + port_send_ok_tuple(ctx, pid, ref, term_from_int(sent_data)); } - } else { - return port_create_error_tuple(ctx, BADARG_ATOM); - } - int sent_data = sendto(socket_data->sockfd, buf, len, 0, (struct sockaddr *) &addr, sizeof(addr)); - if (term_is_list(data)) { - free(buf); - } - if (sent_data == -1) { - return port_create_sys_error_tuple(ctx, SENDTO_ATOM, errno); - } else { - TRACE("socket_driver_do_sendto: sent data with len: %li, to: %i, port: %i\n", len, ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port)); - term sent_atom = term_from_int32(sent_data); - return port_create_ok_tuple(ctx, sent_atom); } } @@ -762,7 +858,7 @@ static EventListener *passive_recv_callback(GlobalContext *glb, EventListener *b term *heap_ptr = heap; term pid = listener->pid; term ref = term_heap_from_ref_ticks(listener->ref_ticks, &heap_ptr); - term reply = port_heap_create_reply(&heap_ptr, ref, port_heap_create_error_tuple(&heap_ptr, globalcontext_make_atom(glb, closed_a))); + term reply = port_heap_create_reply(&heap_ptr, ref, port_heap_create_error_tuple(&heap_ptr, CLOSED_ATOM)); port_send_message_nolock(glb, pid, reply); mailbox_send(ctx, globalcontext_make_atom(glb, close_internal)); } else if (len < 0) { @@ -960,12 +1056,12 @@ static void do_recv(Context *ctx, term pid, term ref, term length, term timeout, socket_data->passive_listener = listener; } -void socket_driver_do_recvfrom(Context *ctx, term pid, term ref, term length, term timeout) +static inline void socket_driver_do_recvfrom(Context *ctx, term pid, term ref, term length, term timeout) { do_recv(ctx, pid, ref, length, timeout, passive_recvfrom_callback); } -void socket_driver_do_recv(Context *ctx, term pid, term ref, term length, term timeout) +static inline void socket_driver_do_recv(Context *ctx, term pid, term ref, term length, term timeout) { do_recv(ctx, pid, ref, length, timeout, passive_recv_callback); } @@ -1037,7 +1133,7 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li return result; } -void socket_driver_do_accept(Context *ctx, term pid, term ref, term timeout) +static void socket_driver_do_accept(Context *ctx, term pid, term ref, term timeout) { UNUSED(timeout); @@ -1069,8 +1165,6 @@ static NativeHandlerResult socket_consume_mailbox(Context *ctx) AVM_ABORT(); } - port_ensure_available(ctx, 16); - GlobalContext *glb = ctx->global; // Socket can be closed in another thread. @@ -1085,44 +1179,38 @@ static NativeHandlerResult socket_consume_mailbox(Context *ctx) term ref = term_get_tuple_element(msg, 1); term cmd = term_get_tuple_element(msg, 2); + NativeHandlerResult result = NativeContinue; + term cmd_name = term_get_tuple_element(cmd, 0); if (cmd_name == globalcontext_make_atom(glb, init_a)) { TRACE("init\n"); term params = term_get_tuple_element(cmd, 1); - term reply = socket_driver_do_init(ctx, params); - port_send_reply(ctx, pid, ref, reply); - if (reply != OK_ATOM) { - // TODO handle shutdown - // socket_driver_delete_data(ctx->platform_data); - // context_destroy(ctx); - } - } else if (cmd_name == globalcontext_make_atom(glb, sendto_a)) { + result = socket_driver_do_init(ctx, pid, ref, params); + } else if (cmd_name == SENDTO_ATOM) { TRACE("sendto\n"); term dest_address = term_get_tuple_element(cmd, 1); term dest_port = term_get_tuple_element(cmd, 2); term buffer = term_get_tuple_element(cmd, 3); - term reply = socket_driver_do_sendto(ctx, dest_address, dest_port, buffer); - port_send_reply(ctx, pid, ref, reply); - } else if (cmd_name == globalcontext_make_atom(glb, send_a)) { + socket_driver_do_sendto(ctx, pid, ref, dest_address, dest_port, buffer); + } else if (cmd_name == SEND_ATOM) { TRACE("send\n"); term buffer = term_get_tuple_element(cmd, 1); - term reply = socket_driver_do_send(ctx, buffer); - port_send_reply(ctx, pid, ref, reply); - } else if (cmd_name == globalcontext_make_atom(glb, recvfrom_a)) { + socket_driver_do_send(ctx, pid, ref, buffer); + } else if (cmd_name == RECVFROM_ATOM) { TRACE("recvfrom\n"); term length = term_get_tuple_element(cmd, 1); term timeout = term_get_tuple_element(cmd, 2); socket_driver_do_recvfrom(ctx, pid, ref, length, timeout); - } else if (cmd_name == globalcontext_make_atom(glb, recv_a)) { + } else if (cmd_name == RECV_ATOM) { TRACE("recv\n"); term length = term_get_tuple_element(cmd, 1); term timeout = term_get_tuple_element(cmd, 2); socket_driver_do_recv(ctx, pid, ref, length, timeout); - } else if (cmd_name == globalcontext_make_atom(glb, accept_a)) { + } else if (cmd_name == ACCEPT_ATOM) { TRACE("accept\n"); term timeout = term_get_tuple_element(cmd, 1); socket_driver_do_accept(ctx, pid, ref, timeout); - } else if (cmd_name == globalcontext_make_atom(glb, close_a)) { + } else if (cmd_name == CLOSE_ATOM) { TRACE("close\n"); port_send_reply(ctx, pid, ref, OK_ATOM); SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; @@ -1135,36 +1223,31 @@ static NativeHandlerResult socket_consume_mailbox(Context *ctx) free(socket_data->passive_listener); } socket_driver_do_close(ctx); - // We don't need to remove message. - return NativeTerminate; + result = NativeTerminate; } else if (cmd_name == globalcontext_make_atom(glb, sockname_a)) { TRACE("sockname\n"); - term reply = socket_driver_sockname(ctx); - port_send_reply(ctx, pid, ref, reply); + socket_driver_sockname(ctx, pid, ref); } else if (cmd_name == globalcontext_make_atom(glb, peername_a)) { TRACE("peername\n"); - term reply = socket_driver_peername(ctx); - port_send_reply(ctx, pid, ref, reply); + socket_driver_peername(ctx, pid, ref); } else if (cmd_name == globalcontext_make_atom(glb, get_port_a)) { // TODO This function is not supported in the gen_tcp or gen_udp APIs. // It should be removed. (Use inet:peername and inet:sockname instead) TRACE("get_port\n"); - term reply = socket_driver_get_port(ctx); - port_send_reply(ctx, pid, ref, reply); - } else if (cmd_name == globalcontext_make_atom(glb, controlling_process_a)) { + socket_driver_get_port(ctx, pid, ref); + } else if (cmd_name == CONTROLLING_PROCESS_ATOM) { TRACE("controlling_process\n"); term new_pid = term_get_tuple_element(cmd, 1); - term reply = socket_driver_controlling_process(ctx, pid, new_pid); - port_send_reply(ctx, pid, ref, reply); + socket_driver_controlling_process(ctx, pid, ref, new_pid); } else { TRACE("unknown cmd\n"); - port_send_reply(ctx, pid, ref, port_create_error_tuple(ctx, BADARG_ATOM)); + port_send_error_tuple(ctx, pid, ref, BADARG_ATOM); } mailbox_remove(&ctx->mailbox); TRACE("END socket_consume_mailbox\n"); - return NativeContinue; + return result; } Context *socket_init(GlobalContext *glb, term opts) diff --git a/src/platforms/generic_unix/lib/socket_driver.h b/src/platforms/generic_unix/lib/socket_driver.h index a7465966e..700c4eb91 100644 --- a/src/platforms/generic_unix/lib/socket_driver.h +++ b/src/platforms/generic_unix/lib/socket_driver.h @@ -27,15 +27,4 @@ void *socket_driver_create_data(); void socket_driver_delete_data(void *data); -term socket_driver_do_init(Context *ctx, term params); -term socket_driver_do_send(Context *ctx, term buffer); -term socket_driver_do_sendto(Context *ctx, term dest_address, term dest_port, term buffer); -void socket_driver_do_recv(Context *ctx, term pid, term ref, term length, term timeout); -void socket_driver_do_recvfrom(Context *ctx, term pid, term ref, term length, term timeout); -void socket_driver_do_close(Context *ctx); -term socket_driver_get_port(Context *ctx); -void socket_driver_do_accept(Context *ctx, term pid, term ref, term timeout); -term socket_driver_sockname(Context *ctx); -term socket_driver_peername(Context *ctx); - #endif