diff --git a/modules/rest_client/rest_cb.c b/modules/rest_client/rest_cb.c index fb390ffdecc..1a8c0d97c01 100644 --- a/modules/rest_client/rest_cb.c +++ b/modules/rest_client/rest_cb.c @@ -109,3 +109,12 @@ size_t header_func(char *ptr, size_t size, size_t nmemb, void *userdata) return len; } +int timer_cb(CURLM *multi_handle, long timeout_ms, void *cbp) +{ + LM_DBG("multi_handle timer called %ld\n", timeout_ms); + long *p = (long*) cbp; + + *p = timeout_ms; + + return 0; +} \ No newline at end of file diff --git a/modules/rest_client/rest_cb.h b/modules/rest_client/rest_cb.h index 2f40349351f..da2126bf51e 100644 --- a/modules/rest_client/rest_cb.h +++ b/modules/rest_client/rest_cb.h @@ -25,6 +25,8 @@ #ifndef _REST_CB_H_ #define _REST_CB_H_ +#include + #include "rest_client.h" #include "../../str.h" @@ -41,6 +43,7 @@ size_t write_func(char *ptr, size_t size, size_t nmemb, void *userdata); size_t header_func(char *ptr, size_t size, size_t nmemb, void *userdata); +int timer_cb(CURLM *multi_handle, long timeout_ms, void *cbp); #endif /* _REST_CB_H_ */ diff --git a/modules/rest_client/rest_client.c b/modules/rest_client/rest_client.c index 1bb37a2a31c..17a6fa3bb4f 100644 --- a/modules/rest_client/rest_client.c +++ b/modules/rest_client/rest_client.c @@ -28,6 +28,8 @@ #include #include +#include + #include "../../async.h" #include "../../sr_module.h" #include "../../dprint.h" @@ -39,9 +41,11 @@ #include "../tls_mgm/api.h" #include "rest_client.h" #include "rest_methods.h" +#include "rest_sockets.h" #include "../../ssl_init_tweaks.h" #include "../../pt.h" #include "../../redact_pii.h" +#include "../../globals.h" /* * Module parameters @@ -50,9 +54,12 @@ long connection_timeout = 20; /* s */ long connect_poll_interval = 20; /* ms */ long connection_timeout_ms; int max_async_transfers = 100; +long max_connections = 100; +long max_host_connections = 0; long curl_timeout = 20; char *ssl_capath; unsigned int max_transfer_size = 10240; /* KB (10MB) */ +int share_connections = 0; /* * curl_multi_perform() may indicate a "try again" response even @@ -72,11 +79,17 @@ int enable_expect_100; struct tls_mgm_binds tls_api; +static preconnect_urls *precon_urls = 0; +static int total_cons = 0; + /* trace parameters for this module */ int rest_proto_id; trace_proto_t tprot; char* rest_id_s = "rest"; +/* file descriptor limits */ +struct rlimit lim; + /* * Module initialization and cleanup */ @@ -104,6 +117,16 @@ static int w_async_rest_put(struct sip_msg *msg, async_ctx *ctx, str *url, str *body, str *_ctype, pv_spec_t *body_pv, pv_spec_t *ctype_pv, pv_spec_t *code_pv); +// Temporary to expose in script +static int w_async_rest_get_v2(struct sip_msg *msg, async_ctx *ctx, str *url, + pv_spec_t *body_pv, pv_spec_t *ctype_pv, pv_spec_t *code_pv); +static int w_async_rest_post_v2(struct sip_msg *msg, async_ctx *ctx, + str *url, str *body, str *_ctype, pv_spec_t *body_pv, + pv_spec_t *ctype_pv, pv_spec_t *code_pv); +static int w_async_rest_put_v2(struct sip_msg *msg, async_ctx *ctx, + str *url, str *body, str *_ctype, pv_spec_t *body_pv, + pv_spec_t *ctype_pv, pv_spec_t *code_pv); + static int w_rest_append_hf(struct sip_msg *msg, str *hfv); static int w_rest_init_client_tls(struct sip_msg *msg, str *tls_client_dom); int validate_curl_http_version(const int *http_version); @@ -139,6 +162,25 @@ static const acmd_export_t acmds[] = { {CMD_PARAM_VAR,0,0}, {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}}, + {"rest_get_v2",(acmd_function)w_async_rest_get_v2, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}}, + {"rest_post_v2",(acmd_function)w_async_rest_post_v2, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}}, + {"rest_put_v2",(acmd_function)w_async_rest_put_v2, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}}, {0,0,{{0,0,0}}} }; @@ -169,6 +211,28 @@ static const cmd_export_t cmds[] = { {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}, ALL_ROUTES}, + {"rest_get_v2",(cmd_function)w_rest_get, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}, + ALL_ROUTES}, + {"rest_post_v2",(cmd_function)w_rest_post, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}, + ALL_ROUTES}, + {"rest_put_v2",(cmd_function)w_rest_put, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}, + ALL_ROUTES}, {"rest_append_hf",(cmd_function)w_rest_append_hf, { {CMD_PARAM_STR,0,0}, {0,0,0}}, ALL_ROUTES}, @@ -190,6 +254,76 @@ static const trans_export_t trans[] = { {{0,0},0,0} }; +static int warm_pool_urls(modparam_t type, void *val) { + unsigned int num_conns; + char *mod_param, *delim, *host = NULL; + size_t delim_index, string_end; + preconnect_urls *tmp = NULL; + str num_conns_s; + + if (!share_connections) { + goto done; + } + + mod_param = (char*) val; + + if ((delim = strchr(mod_param, ',')) == NULL) { + goto error; + } + + delim_index = (size_t)(delim - mod_param); + if (delim_index == 0) { + goto error; + } + + host = (char*) pkg_malloc(delim_index + 1); + if (host == NULL) { + goto error; + } + + strncpy(host, mod_param, delim_index); + host[delim_index + 1] = '\0'; + + string_end = strlen(mod_param + delim_index + 1); + if (string_end == 0) { + goto error; + } + + num_conns_s.s = mod_param + delim_index + 1; + num_conns_s.len = string_end; + if (str2int(&num_conns_s, &num_conns) != 0) { + goto error; + } + + tmp = (preconnect_urls*) pkg_malloc(sizeof(preconnect_urls)); + if (tmp == NULL) { + goto error; + } + + tmp->url = host; + tmp->connections = (long) num_conns; + tmp->next = 0; + + if (precon_urls != NULL) { + tmp->next = precon_urls; + } + + precon_urls = tmp; + total_cons += num_conns; +done: + return 0; +error: + if (host != NULL) { + pkg_free(host); + } + + if (tmp != NULL) { + pkg_free(tmp); + } + + return -1; +} + /* * Exported parameters */ @@ -206,6 +340,12 @@ static const param_export_t params[] = { { "enable_expect_100", INT_PARAM, &enable_expect_100 }, { "no_concurrent_connects", INT_PARAM, &no_concurrent_connects }, { "curl_conn_lifetime", INT_PARAM, &curl_conn_lifetime }, + { "use_multi_socket_api", INT_PARAM, &use_multi_socket_api }, + { "share_connections", INT_PARAM, &share_connections }, + { "max_connections", INT_PARAM, &max_connections }, + { "max_host_connections", INT_PARAM, &max_host_connections }, + { "warm_pool_urls", STR_PARAM|USE_FUNC_PARAM, + (void*)&warm_pool_urls }, { 0, 0, 0 } }; @@ -277,6 +417,18 @@ static int mod_init(void) return -1; } + if (max_connections <= 0) { + LM_WARN("Bad max_connections value (%ld), setting to default of 100\n", max_connections); + max_connections = 100; + } + + if (max_host_connections < 0) { + LM_WARN("Bad max_host_connections value (%ld), setting to max_connections value (%ld)\n", + max_host_connections, max_connections); + + max_host_connections = max_connections; + } + LM_INFO("Module initialized!\n"); return 0; @@ -298,6 +450,16 @@ static int cfg_validate(void) return 1; } +static int get_fd_limit(void) { + if (getrlimit(RLIMIT_NOFILE, &lim) < 0) { + LM_ERR("cannot get the maximum number of file descriptors: %s\n", + strerror(errno)); + return -1; + } + + return 0; +} + static int child_init(int rank) { @@ -306,6 +468,28 @@ static int child_init(int rank) return -1; } + if (get_fd_limit() != 0) { + LM_WARN("Could not get file descriptor limits\n"); + return 0; + } + + if (init_process_limits(lim.rlim_cur) != 0) { + LM_WARN("Could not set file descriptor limits\n"); + return 0; + } + + if (pt[process_no].type != TYPE_UDP && pt[process_no].type != TYPE_TCP) { + return 0; + } + + if (precon_urls == NULL) { + return 0; + } + + if (connect_only(precon_urls, total_cons) != 0) { + LM_WARN("Could not create warm pool\n"); + } + return 0; } @@ -774,3 +958,173 @@ static int w_rest_init_client_tls(struct sip_msg *msg, str *tls_client_dom) { return rest_init_client_tls(msg, tls_client_dom); } + +// Temporary duplication for feature toggle in script +int async_rest_method_v2(enum rest_client_method method, struct sip_msg *msg, + char *url, str *body, str *ctype, async_ctx *ctx, + pv_spec_p body_pv, pv_spec_p ctype_pv, pv_spec_p code_pv) +{ + rest_async_param *param; + pv_value_t val; + long http_rc; + char *host; + int read_fd, rc, lrc = RCL_OK; + + param = pkg_malloc(sizeof *param); + if (!param) { + LM_ERR("no more shm\n"); + return RCL_INTERNAL_ERR; + } + memset(param, '\0', sizeof *param); + + if (no_concurrent_connects && (lrc=rcl_acquire_url(url, &host)) < RCL_OK) + return lrc; + + rc = start_async_http_req_v2(msg, method, url, body, ctype, + param, ¶m->body, ctype_pv ? ¶m->ctype : NULL, &read_fd); + + /* error occurred; no transfer done */ + if (read_fd == ASYNC_NO_IO) { + ctx->resume_param = NULL; + ctx->resume_f = NULL; + if (code_pv) { + val.flags = PV_VAL_INT|PV_TYPE_INT; + val.ri = 0; + if (pv_set_value(msg, (pv_spec_p)code_pv, 0, &val) != 0) + LM_ERR("failed to set output code pv\n"); + } + + /* keep default async status of NO_IO */ + pkg_free(param); + return rc; + + /* no need for async - transfer already completed! */ + } else if (read_fd == ASYNC_SYNC) { + if (code_pv) { + curl_easy_getinfo(param->handle, CURLINFO_RESPONSE_CODE, &http_rc); + LM_DBG("HTTP response code: %ld\n", http_rc); + + val.flags = PV_VAL_INT|PV_TYPE_INT; + val.ri = (int)http_rc; + if (pv_set_value(msg, (pv_spec_p)code_pv, 0, &val) != 0) { + LM_ERR("failed to set output code pv\n"); + return RCL_INTERNAL_ERR; + } + } + + val.flags = PV_VAL_STR; + val.rs = param->body; + if (pv_set_value(msg, (pv_spec_p)body_pv, 0, &val) != 0) { + LM_ERR("failed to set output body pv\n"); + return RCL_INTERNAL_ERR; + } + + if (ctype_pv) { + val.rs = param->ctype; + if (pv_set_value(msg, (pv_spec_p)ctype_pv, 0, &val) != 0) { + LM_ERR("failed to set output ctype pv\n"); + return RCL_INTERNAL_ERR; + } + } + + pkg_free(param->body.s); + if (ctype_pv && param->ctype.s) + pkg_free(param->ctype.s); + curl_easy_cleanup(param->handle); + pkg_free(param); + + async_status = ASYNC_SYNC; + return rc; + } + + /* the TCP connection is established, async started with success */ + + if (lrc == RCL_OK_LOCKED) + rcl_release_url(host, rc == RCL_OK); + + ctx->resume_f = resume_async_http_req_v2; + ctx->timeout_s = curl_timeout; + ctx->timeout_f = time_out_async_http_req_v2; + + param->method = method; + param->body_pv = (pv_spec_p)body_pv; + param->ctype_pv = (pv_spec_p)ctype_pv; + param->code_pv = (pv_spec_p)code_pv; + ctx->resume_param = param; + + async_status = read_fd; + return 1; +} + +static int w_async_rest_get_v2(struct sip_msg *msg, async_ctx *ctx, str *url, + pv_spec_t *body_pv, pv_spec_t *ctype_pv, pv_spec_t *code_pv) +{ + str url_nt; + int rc; + + if (pkg_nt_str_dup(&url_nt, url) < 0) { + LM_ERR("No more pkg memory\n"); + return RCL_INTERNAL_ERR; + } + + LM_DBG("async rest get %.*s %p %p %p\n", url->len, url->s, + body_pv, ctype_pv, code_pv); + + rc = async_rest_method_v2(REST_CLIENT_GET, msg, url_nt.s, NULL, NULL, ctx, + body_pv, ctype_pv, code_pv); + + pkg_free(url_nt.s); + return rc; +} + +static int w_async_rest_post_v2(struct sip_msg *msg, async_ctx *ctx, + str *url, str *body, str *_ctype, pv_spec_t *body_pv, + pv_spec_t *ctype_pv, pv_spec_t *code_pv) +{ + str ctype = { NULL, 0 }; + str url_nt; + int rc; + + if (pkg_nt_str_dup(&url_nt, url) < 0) { + LM_ERR("No more pkg memory\n"); + return RCL_INTERNAL_ERR; + } + + if (_ctype) + ctype = *_ctype; + + LM_DBG("async rest post '%.*s' %p %p %p\n", url->len, url->s, + body_pv, ctype_pv, code_pv); + + rc = async_rest_method_v2(REST_CLIENT_POST, msg, url_nt.s, body, &ctype, ctx, + body_pv, ctype_pv, code_pv); + + pkg_free(url_nt.s); + return rc; +} + +static int w_async_rest_put_v2(struct sip_msg *msg, async_ctx *ctx, + str *url, str *body, str *_ctype, pv_spec_t *body_pv, + pv_spec_t *ctype_pv, pv_spec_t *code_pv) +{ + str ctype = { NULL, 0 }; + str url_nt; + int rc; + + if (pkg_nt_str_dup(&url_nt, url) < 0) { + LM_ERR("No more pkg memory\n"); + return RCL_INTERNAL_ERR; + } + + if (_ctype) + ctype = *_ctype; + + LM_DBG("async rest put '%.*s' %p %p %p\n", + url->len, url->s, body_pv, ctype_pv, code_pv); + + rc = async_rest_method_v2(REST_CLIENT_PUT, msg, url_nt.s, body, &ctype, ctx, + body_pv, ctype_pv, code_pv); + + pkg_free(url_nt.s); + return rc; +} diff --git a/modules/rest_client/rest_client.h b/modules/rest_client/rest_client.h index 6ae9e2e79d0..8a6b7079b42 100644 --- a/modules/rest_client/rest_client.h +++ b/modules/rest_client/rest_client.h @@ -30,6 +30,12 @@ enum tr_rest_subtype { TR_REST_ESCAPE, TR_REST_UNESCAPE }; +typedef struct _preconnect_urls { + char *url; + long connections; + struct _preconnect_urls *next; +} preconnect_urls; + extern int enable_expect_100; extern unsigned int max_transfer_size; diff --git a/modules/rest_client/rest_methods.c b/modules/rest_client/rest_methods.c index 9bdf800c35e..67ebc0a1465 100644 --- a/modules/rest_client/rest_methods.c +++ b/modules/rest_client/rest_methods.c @@ -26,6 +26,7 @@ #include #include #include +#include #include "../../mem/shm_mem.h" #include "../../async.h" @@ -34,12 +35,27 @@ #include "../../trace_api.h" #include "../../resolve.h" #include "../../timer.h" +#include "../../lock_ops.h" #include "../tls_mgm/api.h" #include "rest_client.h" #include "rest_methods.h" #include "rest_cb.h" +#include "rest_sockets.h" + +/* + * So the connections never timeout + * version 8.15 allows 0 to disable the check + * Need to use limits.h to get LONG_MAX otherwise + */ + +#if (LIBCURL_VERSION_NUM >= 0x080f00) +long socket_keep_alive = 0; +#else +#include +long socket_keep_alive = LONG_MAX; +#endif #define REST_CORRELATION_COOKIE "RESTCORR" @@ -73,6 +89,11 @@ extern int rest_proto_id; extern trace_proto_t tprot; extern char *rest_id_s; +static CURLSH *curl_share = NULL; +static gen_lock_t *curl_socket_lock = NULL; + +static long timer; + /** * We cannot use the "parallel transfers" feature of libcurl's multi interface * because that would consume read events from some of its file descriptors that @@ -92,7 +113,8 @@ static int multi_pool_sz; static map_t rcl_connections; static gen_hash_t *rcl_parallel_connects; int no_concurrent_connects; -int curl_conn_lifetime; +int use_multi_socket_api; +unsigned int curl_conn_lifetime; static inline int rest_trace_enabled(void); static int trace_rest_message( rest_trace_param_t* tparam ); @@ -159,6 +181,24 @@ int rcl_init_internals(void) } \ } while (0) +#define w_curl_multi_setopt(mh, opt, value) \ + do { \ + mrc = curl_multi_setopt(mh, opt, value); \ + if (mrc != CURLM_OK) { \ + LM_ERR("curl_multi_setopt(%d): (%s)\n", opt, curl_multi_strerror(mrc)); \ + goto cleanup; \ + } \ + } while (0) + +#define w_curl_share_setopt(cs, opt, value) \ + do { \ + src = curl_share_setopt(cs, opt, value); \ + if (src != CURLSHE_OK) { \ + LM_ERR("curl_share_setopt: %s\n", curl_share_strerror(src)); \ + goto cleanup; \ + } \ + } while (0) + int trace_rest_request_cb(CURL *handle, curl_infotype type, char *data, size_t size, void *userptr) { int is_req; @@ -399,7 +439,54 @@ static inline int get_easy_status(CURL *handle, CURLM *multi, CURLcode *code) return -1; } -static int init_transfer(CURL *handle, char *url) +static void libcurl_share_lock(CURL *handle, curl_lock_data data, curl_lock_access access, void *clientp) { + if (data == CURL_LOCK_DATA_CONNECT) { + LM_DBG("Locking libcurl share %d\n", data); + lock_get(curl_socket_lock); + } +} + +static void libcurl_share_unlock(CURL *handle, curl_lock_data data, void *clientp) { + if (data == CURL_LOCK_DATA_CONNECT) { + LM_DBG("Unlocking libcurl share %d\n", data); + lock_release(curl_socket_lock); + } +} + +static CURLSH *get_curl_share(void) { + CURLSHcode src; + + if (!curl_share) { + curl_socket_lock = lock_alloc(); + + if (!curl_socket_lock) { + goto done; + } + + lock_init(curl_socket_lock); + + curl_share = curl_share_init(); + + w_curl_share_setopt(curl_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT); + w_curl_share_setopt(curl_share, CURLSHOPT_LOCKFUNC, libcurl_share_lock); + w_curl_share_setopt(curl_share, CURLSHOPT_UNLOCKFUNC, libcurl_share_unlock); + } + + return curl_share; +cleanup: + if (curl_socket_lock) { + lock_destroy(curl_socket_lock); + lock_dealloc(curl_socket_lock); + curl_socket_lock = NULL; + } + + curl_share_cleanup(curl_share); // Passing NULL returns early if init failed + curl_share = NULL; +done: + return NULL; +} + +static int init_transfer(CURL *handle, char *url, unsigned long connect_timeout_s, unsigned long timeout_s) { CURLcode rc; @@ -414,13 +501,14 @@ static int init_transfer(CURL *handle, char *url) tls_dom = NULL; } - w_curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, connection_timeout); - w_curl_easy_setopt(handle, CURLOPT_TIMEOUT, curl_timeout); + w_curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, connect_timeout_s); + w_curl_easy_setopt(handle, CURLOPT_TIMEOUT, timeout_s); w_curl_easy_setopt(handle, CURLOPT_VERBOSE, 1); - w_curl_easy_setopt(handle, CURLOPT_STDERR, stdout); w_curl_easy_setopt(handle, CURLOPT_FAILONERROR, 0); + w_curl_easy_setopt(handle, CURLOPT_STDERR, stdout); + if (ssl_capath) w_curl_easy_setopt(handle, CURLOPT_CAPATH, ssl_capath); @@ -436,6 +524,26 @@ static int init_transfer(CURL *handle, char *url) return -1; } +static int init_socket_keepalive(CURL *handle) { + CURLcode rc; + + if (share_connections) { + // If the share cannot be created then log warn but keep going + curl_share = get_curl_share(); + w_curl_easy_setopt(handle, CURLOPT_SHARE, curl_share); + } + + w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPALIVE, 1L); + w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPIDLE, 180L); + w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPINTVL, 180L); + w_curl_easy_setopt(handle, CURLOPT_MAXAGE_CONN, socket_keep_alive); + + return 0; +cleanup: + LM_WARN("Error creating keep alive sockets\n"); + return -1; +} + #define init_rest_trace(handle, msg, trace_data) \ do { \ memset(trace_data, 0, sizeof *(trace_data)); \ @@ -579,7 +687,7 @@ int rcl_acquire_url(const char *url, char **url_host) } if (*connected_ts != 0 && (get_ticks() - - (unsigned int)*(unsigned long *)(*connected_ts) < curl_conn_lifetime)) { + (unsigned int)*(unsigned long *)connected_ts < curl_conn_lifetime)) { new_connection = 0; } else { new_connection = 1; @@ -702,7 +810,7 @@ int rest_sync_transfer(enum rest_client_method method, struct sip_msg *msg, str st = STR_NULL, res_body = STR_NULL, tbody, ttype; curl_easy_reset(sync_handle); - if (init_transfer(sync_handle, url) != 0) { + if (init_transfer(sync_handle, url, connection_timeout, curl_timeout) != 0) { LM_ERR("failed to init transfer to %s\n", url); goto cleanup; } @@ -792,27 +900,10 @@ int rest_sync_transfer(enum rest_client_method method, struct sip_msg *msg, return RCL_INTERNAL_ERR; } -/** - * start_async_http_req - launch an async HTTP request - * - TCP connect phase is synchronous, due to libcurl limitations - * - TCP read phase is asynchronous, thanks to the libcurl multi interface - * - * @msg: sip message struct - * @method: HTTP verb - * @url: HTTP URL to be queried - * @req_body: Body of the request (NULL if not needed) - * @req_ctype: Value for the "Content-Type: " header of the request (same as ^) - * @async_parm: output param, will contain async handles - * @body: reply body; gradually reallocated as data arrives - * @ctype: will eventually hold the last "Content-Type" header of the reply - * @out_fd: the fd to poll on, or a negative error code - * - * @return: 1 on success, negative on failure - */ -int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, - char *url, str *req_body, str *req_ctype, - rest_async_param *async_parm, str *body, str *ctype, - enum async_ret_code *out_fd) +static int start_async_http_req_v1(struct sip_msg *msg, enum rest_client_method method, + char *url, str *req_body, str *req_ctype, + rest_async_param *async_parm, str *body, str *ctype, + enum async_ret_code *out_fd) { CURL *handle; CURLcode rc; @@ -835,7 +926,7 @@ int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, goto cleanup; } - if (init_transfer(handle, url) != 0) { + if (init_transfer(handle, url, connection_timeout, curl_timeout) != 0) { LM_ERR("failed to init transfer to %s\n", url); goto cleanup; } @@ -962,7 +1053,6 @@ int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, if (max_fd != -1) { for (fd = 0; fd <= max_fd; fd++) { if (FD_ISSET(fd, &rset)) { - LM_DBG("ongoing transfer on fd %d\n", fd); if (is_new_transfer(fd)) { LM_DBG(">>> add fd %d to ongoing transfers\n", fd); @@ -1056,7 +1146,7 @@ static enum async_ret_code _resume_async_http_req(int fd, struct sip_msg *msg, char *url = NULL; curl_easy_getinfo(param->handle, CURLINFO_EFFECTIVE_URL, &url); LM_ERR("async %s timed out, URL: %s\n", - rest_client_method_str(param->method), url); + rest_client_method_str(param->method), url); goto cleanup; } @@ -1072,7 +1162,6 @@ static enum async_ret_code _resume_async_http_req(int fd, struct sip_msg *msg, if (mrc == CURLM_OK && running) { async_status = ASYNC_CONTINUE; return 1; - /* this rc has been removed since cURL 7.20.0 (Feb 2010), but it's not * yet marked as deprecated, so let's keep the do/while loop */ } else if (mrc != CURLM_CALL_MULTI_PERFORM) { @@ -1209,18 +1298,157 @@ static enum async_ret_code _resume_async_http_req(int fd, struct sip_msg *msg, return ret; } - -enum async_ret_code resume_async_http_req(int fd, struct sip_msg *msg, void *_param) +static enum async_ret_code _resume_async_http_req_v2(int fd, struct sip_msg *msg, + rest_async_param *param, int timed_out) { - return _resume_async_http_req(fd, msg, (rest_async_param *)_param, 0); -} + CURLcode rc; + CURLMcode mrc; + int running = 0; + long http_rc = 0; + pv_value_t val; + int ret = RCL_INTERNAL_ERR, retr; + CURLM *multi_handle; + LM_DBG("resume async processing...\n"); -enum async_ret_code time_out_async_http_req(int fd, struct sip_msg *msg, void *_param) -{ - return _resume_async_http_req(fd, msg, (rest_async_param *)_param, 1); -} + multi_handle = param->multi_list->multi_handle; + + if (timed_out) { + char *url = NULL; + curl_easy_getinfo(param->handle, CURLINFO_EFFECTIVE_URL, &url); + LM_ERR("async %s timed out, URL: %s (timeout: %lds)\n", + rest_client_method_str(param->method), url, param->timeout_s); + goto cleanup; + } + + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERFUNCTION, timer_cb); + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERDATA, &timer); + + retr = 0; + do { + /* When @enable_expect_100 is on, both the client body upload and the + * server body download will be performed within this loop, blocking */ + + mrc = curl_multi_socket_action(multi_handle, fd, 0, &running); + LM_DBG("perform result: %d, running: %d (break: %d)\n", mrc, running, + mrc != CURLM_CALL_MULTI_PERFORM && (mrc != CURLM_OK || !running)); + + if (mrc == CURLM_OK) { + if (!running) + break; + + async_status = ASYNC_CONTINUE; + return 1; + /* this rc has been removed since cURL 7.20.0 (Feb 2010), but it's not + * yet marked as deprecated, so let's keep the do/while loop */ + } else if (mrc != CURLM_CALL_MULTI_PERFORM) { + break; + } + + usleep(_async_resume_retr_itv); + retr += _async_resume_retr_itv; + } while (retr < _async_resume_retr_timeout); + + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_perform: %s\n", curl_multi_strerror(mrc)); + goto out; + } + + if (!timed_out) { + if (running == 1) { + LM_DBG("transfer in progress...\n"); + async_status = ASYNC_CONTINUE; + return 1; + } + + if (running != 0) { + LM_BUG("non-zero running handles!! (%d)", running); + goto out; + } + } + +cleanup: + curl_slist_free_all(param->header_list); + rc = curl_easy_getinfo(param->handle, CURLINFO_RESPONSE_CODE, &http_rc); + if (rc != CURLE_OK) { + LM_ERR("curl_easy_getinfo: %d, %s\n", rc, curl_easy_strerror(rc)); + http_rc = 0; + } + if (get_easy_status(param->handle, multi_handle, &rc) < 0) + LM_DBG("download finished, but an HTTP status is not available " + "(timed_out: %d)\n", timed_out); + + if (param->code_pv) { + val.flags = PV_VAL_INT|PV_TYPE_INT; + val.ri = (int)http_rc; + if (pv_set_value(msg, param->code_pv, 0, &val) != 0) { + LM_ERR("failed to set output code pv\n"); + goto out; + } + } + + switch (rc) { + case CURLE_OK: + ret = RCL_OK; + break; + + case CURLE_COULDNT_CONNECT: + LM_ERR("connect refused\n"); + ret = RCL_CONNECT_REFUSED; + goto out; + + case CURLE_OPERATION_TIMEDOUT: + LM_ERR("connected, but transfer timed out (%lds)\n", curl_timeout); + ret = RCL_TRANSFER_TIMEOUT; + break; + + default: + LM_ERR("curl_easy_perform error %d, %s\n", + rc, curl_easy_strerror(rc)); + goto out; + } + + val.flags = PV_VAL_STR; + val.rs = param->body; + if (pv_set_value(msg, param->body_pv, 0, &val) != 0) { + LM_ERR("failed to set output body pv\n"); + goto out; + } + + if (param->ctype_pv) { + val.rs = param->ctype; + if (pv_set_value(msg, param->ctype_pv, 0, &val) != 0) { + LM_ERR("failed to set output ctype pv\n"); + goto out; + } + } + + LM_DBG("HTTP response code: %ld\n", http_rc); + +out: + mrc = curl_multi_remove_handle(multi_handle, param->handle); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_remove_handle: %s\n", curl_multi_strerror(mrc)); + ret = RCL_INTERNAL_ERR; + } + put_multi(param->multi_list); + + pkg_free(param->body.s); + if (param->ctype_pv && param->ctype.s) + pkg_free(param->ctype.s); + curl_easy_cleanup(param->handle); + if ( param->tparam ) { + pkg_free( param->tparam ); + } + pkg_free(param); + + if (timed_out) + ret = RCL_TRANSFER_TIMEOUT; + + /* default async status is ASYNC_DONE */ + return ret; +} /** * rest_append_hf - add a custom HTTP header before a rest call @@ -1368,3 +1596,347 @@ static int trace_rest_message( rest_trace_param_t* tparam ) return 0; } + +int connect_only(preconnect_urls *precon_urls, int total_cons) { + CURLcode rc; + CURLMcode mrc; + CURL *handle; + CURLM *multi_handle; + curl_socket_t sockfd; + OSS_CURLM *multi_list; + CURLEasyHandles easy_handles = { 0, 0 }; + preconnect_urls *start, *next; + char *url; + long busy_wait, timer, local_timer; + int num_of_connections = 0, exit_code = 0, open_sockets = 0; + + if (!share_connections) { + exit_code = -1; + goto done; + } + + multi_list = get_multi(); + if (!multi_list) { + goto cleanup; + } + + multi_handle = multi_list->multi_handle; + + start = precon_urls; + + while (start != NULL) { + num_of_connections = start->connections; + url = start->url; + + LM_DBG("connect to %s, num of connects %d\n", url, num_of_connections); + + for (int i = 0; i < num_of_connections; i++) { + handle = curl_easy_init(); + curl_multi_add_handle(multi_handle, handle); + + if (init_transfer(handle, url, curl_timeout, curl_timeout) != 0) { + exit_code = -1; + goto cleanup; + } + + if (init_socket_keepalive(handle) != 0) { + exit_code = -1; + goto cleanup; + } + + w_curl_easy_setopt(handle, CURLOPT_NOBODY, 1L); + } + + start = start->next; + } + + if (num_of_connections == 0) { + goto error; + } + + easy_handles.size = 0; + easy_handles.handles = pkg_malloc(sizeof(CURL*) * num_of_connections); + + if (easy_handles.handles == NULL) { + goto error; + } + + busy_wait = 100; + + if (setsocket_callback_connect(multi_handle, &easy_handles) != 0) { + goto error; + } + + w_curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, (long) num_of_connections); + w_curl_multi_setopt(multi_handle, CURLMOPT_MAXCONNECTS, (long) num_of_connections); + + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERFUNCTION, timer_cb); + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERDATA, &timer); + + if ((running_handles = start_multi_socket(multi_handle)) < 0) { + exit_code = -1; + goto cleanup; + } + + LM_INFO("Creating warm pool connection, running handles %d\n", running_handles); + + local_timer = 0; + + do { + running_handles = run_multi_socket(multi_handle); + + if (timer < 0 || local_timer >= curl_timeout) { + break; + } + + local_timer += busy_wait; + usleep(1000UL * busy_wait); + } while (running_sockets()); +cleanup: + LM_INFO("Finishing warm pool connection, open sockets %d\n", easy_handles.size); + + if (running_sockets()) { + running_handles = end_multi_socket(multi_handle); + } + + for (int i = 0; i < easy_handles.size; i++) { + curl_easy_getinfo(easy_handles.handles[i], CURLINFO_ACTIVESOCKET, &sockfd); + + if (sockfd != CURL_SOCKET_BAD) { + open_sockets += 1; + } + + curl_multi_remove_handle(multi_handle, easy_handles.handles[i]); + + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_remove_handle: %s\n", curl_multi_strerror(mrc)); + } + + curl_easy_cleanup(easy_handles.handles[i]); + } + + LM_INFO("Finishing warm pool connection, open sockets %d\n", open_sockets); +error: + start = precon_urls; + + while (start != NULL) { + next = start->next; + + pkg_free(start->url); + pkg_free(start); + + start = next; + } + + if (easy_handles.handles != NULL) { + pkg_free(easy_handles.handles); + } + + put_multi(multi_list); + +done: + return exit_code; +} + +int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, + char *url, str *req_body, str *req_ctype, + rest_async_param *async_parm, str *body, str *ctype, + enum async_ret_code *out_fd) +{ + CURL *handle; + CURLcode rc; + CURLMcode mrc; + OSS_CURLM *multi_list; + CURLM *multi_handle; + long busy_wait, req_sz; + + handle = curl_easy_init(); + + if (!handle) { + LM_ERR("Init curl handle failed!\n"); + goto cleanup; + } + + if (init_transfer(handle, url, connection_timeout, async_parm->timeout_s) != 0) { + LM_ERR("failed to init transfer to %s\n", url); + goto cleanup; + } + + switch (method) { + case REST_CLIENT_POST: + set_post_opts(handle, req_ctype, req_body); + break; + + case REST_CLIENT_GET: + if (header_list) + w_curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header_list); + break; + + case REST_CLIENT_PUT: + set_put_opts(handle, req_ctype, req_body); + break; + + default: + LM_ERR("unsupported method: %d, defaulting to GET\n", method); + } + + w_curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, write_func); + w_curl_easy_setopt(handle, CURLOPT_WRITEDATA, body); + + init_socket_keepalive(handle); + + if (ctype) { + w_curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, header_func); + w_curl_easy_setopt(handle, CURLOPT_HEADERDATA, ctype); + } + + if (rest_trace_enabled()) { + async_parm->tparam = pkg_malloc(sizeof(rest_trace_param_t)); + if (!async_parm->tparam) { + LM_ERR("oom\n"); + goto cleanup; + } + + init_rest_trace(handle, msg, async_parm->tparam); + } + + multi_list = get_multi(); + if (!multi_list) { + LM_WARN("failed to get a multi handle, doing a blocking transfer\n"); + rc = rest_easy_perform(handle, url, NULL); + clean_header_list; + async_parm->handle = handle; + *out_fd = ASYNC_SYNC; + return rc; + } + + multi_handle = multi_list->multi_handle; + curl_multi_add_handle(multi_handle, handle); + + if (setsocket_callback_request(multi_handle) != 0) { + goto cleanup; + } + + w_curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, max_host_connections); + w_curl_multi_setopt(multi_handle, CURLMOPT_MAXCONNECTS, max_connections); + + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERFUNCTION, timer_cb); + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERDATA, &timer); + + if ((running_handles = start_multi_socket(multi_handle)) < 0) { + goto cleanup; + } + + busy_wait = connect_poll_interval; + + do { + running_handles = run_multi_socket(multi_handle); + + if (running_handles < 0) { + goto error; + } + + curl_easy_getinfo(handle, CURLINFO_REQUEST_SIZE, &req_sz); + + if (req_sz > 0) { + goto success; + } + + if (timer < 0) { + break; + } + + usleep(1000UL * busy_wait); + LM_DBG("Connecting or sending, running handles %d\n", running_handles); + } while (running_handles != 0); + + LM_ERR("connect timeout on %s (%lds)\n", url, connection_timeout); + goto error; + +success: + async_parm->header_list = header_list; + async_parm->handle = handle; + header_list = NULL; + *out_fd = get_max_fd(ASYNC_SYNC); // Running only one socket at a time so it's always the max + + if (*out_fd >= 0) { + async_parm->multi_list = multi_list; + } else { + put_multi(multi_list); + } + return RCL_OK; + +error: + mrc = curl_multi_remove_handle(multi_handle, handle); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_remove_handle: %s\n", curl_multi_strerror(mrc)); + } + put_multi(multi_list); + + curl_easy_cleanup(handle); + +cleanup: + clean_header_list; + if (tls_dom) { + tls_api.release_domain(tls_dom); + tls_dom = NULL; + } + if (rest_trace_enabled() && async_parm->tparam) + pkg_free(async_parm->tparam); + + *out_fd = ASYNC_NO_IO; + return RCL_INTERNAL_ERR; +} + +/** + * start_async_http_req - launch an async HTTP request + * - TCP connect phase is synchronous, due to libcurl limitations + * - TCP read phase is asynchronous, thanks to the libcurl multi interface + * + * @msg: sip message struct + * @method: HTTP verb + * @url: HTTP URL to be queried + * @req_body: Body of the request (NULL if not needed) + * @req_ctype: Value for the "Content-Type: " header of the request (same as ^) + * @async_parm: in/out param, will contain async handles + * @body: reply body; gradually reallocated as data arrives + * @ctype: will eventually hold the last "Content-Type" header of the reply + * @out_fd: the fd to poll on, or a negative error code + * + * @return: 1 on success, negative on failure + */ +int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, + char *url, str *req_body, str *req_ctype, + rest_async_param *async_parm, str *body, str *ctype, + enum async_ret_code *out_fd) +{ + if (!use_multi_socket_api) { + return start_async_http_req_v1(msg, method, url, req_body, req_ctype, async_parm, body, ctype, out_fd); + } else { + return start_async_http_req_v2(msg, method, url, req_body, req_ctype, async_parm, body, ctype, out_fd); + } +} + +enum async_ret_code resume_async_http_req(int fd, struct sip_msg *msg, void *_param) { + if (!use_multi_socket_api) { + return _resume_async_http_req(fd, msg, (rest_async_param *)_param, 0); + } else { + return _resume_async_http_req_v2(fd, msg, (rest_async_param *)_param, 0); + } +} + +enum async_ret_code time_out_async_http_req(int fd, struct sip_msg *msg, void *_param) { + if (!use_multi_socket_api) { + return _resume_async_http_req(fd, msg, (rest_async_param *)_param, 1); + } else { + return _resume_async_http_req_v2(fd, msg, (rest_async_param *)_param, 1); + } +} + +enum async_ret_code resume_async_http_req_v2(int fd, struct sip_msg *msg, void *_param) { + return _resume_async_http_req_v2(fd, msg, (rest_async_param *)_param, 0); +} + +enum async_ret_code time_out_async_http_req_v2(int fd, struct sip_msg *msg, void *_param) { + return _resume_async_http_req_v2(fd, msg, (rest_async_param *)_param, 1); +} diff --git a/modules/rest_client/rest_methods.h b/modules/rest_client/rest_methods.h index 3e2774d1ff3..c1ee2d7b17b 100644 --- a/modules/rest_client/rest_methods.h +++ b/modules/rest_client/rest_methods.h @@ -39,6 +39,8 @@ extern long connection_timeout; extern long connect_poll_interval; extern long connection_timeout_ms; extern int max_async_transfers; +extern long max_connections; +extern long max_host_connections; extern long curl_timeout; extern char *ssl_capath; @@ -47,7 +49,9 @@ extern int ssl_verifyhost; extern int curl_http_version; extern int no_concurrent_connects; -extern int curl_conn_lifetime; +extern int use_multi_socket_api; +extern int share_connections; +extern unsigned int curl_conn_lifetime; /* handle for use with synchronous reqs */ extern CURL *sync_handle; @@ -121,6 +125,7 @@ typedef struct rest_async_param_ { pv_spec_p body_pv; pv_spec_p ctype_pv; pv_spec_p code_pv; + unsigned int timeout_s; } rest_async_param; int init_sync_handle(void); @@ -139,9 +144,18 @@ int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, enum async_ret_code resume_async_http_req(int fd, struct sip_msg *msg, void *_param); enum async_ret_code time_out_async_http_req(int fd, struct sip_msg *msg, void *_param); +// Temporary expose these +int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, + char *url, str *req_body, str *req_ctype, + rest_async_param *async_parm, str *body, str *ctype, + enum async_ret_code *out_fd); + +enum async_ret_code resume_async_http_req_v2(int fd, struct sip_msg *msg, void *_param); +enum async_ret_code time_out_async_http_req_v2(int fd, struct sip_msg *msg, void *_param); int rest_append_hf_method(struct sip_msg *msg, str *hfv); int rest_init_client_tls(struct sip_msg *msg, str *tls_client_dom); +int connect_only(preconnect_urls *precon_urls, int total_cons); #endif /* _REST_METHODS_ */ diff --git a/modules/rest_client/rest_sockets.c b/modules/rest_client/rest_sockets.c new file mode 100644 index 00000000000..e5be50662ea --- /dev/null +++ b/modules/rest_client/rest_sockets.c @@ -0,0 +1,229 @@ +/* + * Copyright (C) 2025 OpenSIPS Solutions + * + * This file is part of opensips, a free SIP server. + * + * opensips is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * opensips is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include + +#include "../../dprint.h" +#include "../../mem/mem.h" +#include "rest_sockets.h" + +#if defined(__CPU_aarch64) || defined(__CPU_x86_64) || defined(__CPU_mips64) +#define WORD_SIZE_BITS 64 +#define WORD_SIZE_BYTES 8 +#define COUNT_LEADING_ZEROS(s) __builtin_clzll(s) +#define COUNT_TRAILING_ZEROS(s) __builtin_ctzll(s) +typedef uint64_t cpuword_t; +#else +#define WORD_SIZE_BITS 32 +#define WORD_SIZE_BYTES 4 +#define COUNT_LEADING_ZEROS(s) __builtin_clz(s) +#define COUNT_TRAILING_ZEROS(s) __builtin_ctz(s) +typedef uint32_t cpuword_t; +#endif + +#define BYTE_LEN 8 + +typedef struct _file_descriptors { + unsigned char *tracked_socks; + int max_fd_index; +} file_descriptors; + +static file_descriptors fds; +size_t aligned_bitset_len; + +int init_process_limits(rlim_t rlim_cur) { + aligned_bitset_len = (((rlim_cur + BYTE_LEN - 1) / BYTE_LEN) * BYTE_LEN) / BYTE_LEN; + + fds.tracked_socks = (unsigned char*) pkg_malloc(aligned_bitset_len); + fds.max_fd_index = 0; + + if (fds.tracked_socks == NULL) { + return -1; + } + + return 0; +} + +int get_max_fd(int no_max_default) { + cpuword_t sockets; + + if (fds.max_fd_index < 0) { + return -2; + } + + memcpy(&sockets, fds.tracked_socks + fds.max_fd_index, sizeof(cpuword_t)); + + if (!sockets) { + fds.max_fd_index -= WORD_SIZE_BYTES; + return no_max_default; + } + + return ((fds.max_fd_index << 3) + WORD_SIZE_BITS - 1) - COUNT_LEADING_ZEROS(sockets); +} + +int running_sockets(void) { + cpuword_t sockets; + int running, curl_fd; + + for (int i = 0; i <= fds.max_fd_index; i += WORD_SIZE_BYTES) { + memcpy(&sockets, fds.tracked_socks + i, sizeof(cpuword_t)); + + if (sockets) { + return 1; + } + } + + return 0; +} + +static void add_sock(int s) { + int sock_index = s >> 3; + + if (sock_index > fds.max_fd_index) { + fds.max_fd_index = sock_index; + } + + fds.tracked_socks[s / BYTE_LEN] |= (1 << (s % BYTE_LEN)); +} + +static void remove_sock(int s) { + cpuword_t sockets; + + fds.tracked_socks[s / BYTE_LEN] &= ~(1 << (s % BYTE_LEN)); + + if (fds.max_fd_index >= 0) { + memcpy(&sockets, fds.tracked_socks + fds.max_fd_index, sizeof(cpuword_t)); + + if (!sockets) { + fds.max_fd_index -= WORD_SIZE_BYTES; + } + } +} + +static int socket_action_connect(CURL *e, curl_socket_t s, int event, void *cbp, void *sockp) { + LM_DBG("called for socket %d status %d\n", s, event); + + CURLEasyHandles *easy_handles = (CURLEasyHandles*) cbp; + if (event != CURL_POLL_REMOVE) { + add_sock(s); + } else if (event == CURL_POLL_REMOVE) { + remove_sock(s); + + LM_DBG("Adding handle for socket %d current size %d\n", s, easy_handles->size); + easy_handles->handles[easy_handles->size] = e; + easy_handles->size += 1; + } + + return 0; +} + +static int socket_action_http(CURL *e, curl_socket_t s, int event, void *cbp, void *sockp) { + LM_DBG("called for socket %d status %d\n", s, event); + + if (event != CURL_POLL_REMOVE) { + add_sock(s); + } else if (event == CURL_POLL_REMOVE) { + remove_sock(s); + } + + return 0; +} + +int setsocket_callback_connect(CURLM *multi_handle, CURLEasyHandles *easy_handles) { + CURLMcode mrc; + + mrc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_action_connect); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_multi_strerror(mrc)); + return -1; + } + + mrc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETDATA, easy_handles); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_multi_strerror(mrc)); + return -1; + } + + return 0; +} + +int setsocket_callback_request(CURLM *multi_handle) { + CURLMcode mrc; + + mrc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_action_http); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_multi_strerror(mrc)); + return -1; + } + + return 0; +} + +static int run_all_multi_socket(CURLM *multi_handle, int ev_bitmask) { + CURLMcode mrc; + int running; + + memset(fds.tracked_socks, 0, aligned_bitset_len); + fds.max_fd_index = 0; + mrc = curl_multi_socket_action(multi_handle, CURL_SOCKET_TIMEOUT, 0, &running); + + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_socket_action: %s\n", curl_multi_strerror(mrc)); + return -1; + } + + return running; +} + +int start_multi_socket(CURLM *multi_handle) { + return run_all_multi_socket(multi_handle, CURL_SOCKET_TIMEOUT); +} + +int end_multi_socket(CURLM *multi_handle) { + return run_all_multi_socket(multi_handle, CURL_POLL_REMOVE); +} + +int run_multi_socket(CURLM *multi_handle) { + CURLMcode mrc; + cpuword_t sockets; + int running, curl_fd; + + for (int i = 0; i <= fds.max_fd_index; i += WORD_SIZE_BYTES) { + memcpy(&sockets, fds.tracked_socks + i, sizeof(cpuword_t)); + + while (sockets) { + curl_fd = (i * BYTE_LEN) + COUNT_TRAILING_ZEROS(sockets); + LM_DBG("Action on socket %d\n", curl_fd); + + mrc = curl_multi_socket_action(multi_handle, curl_fd, 0, &running); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_socket_action: %s\n", curl_multi_strerror(mrc)); + return -1; + } + + sockets &= sockets - 1; + } + } + + return running; +} diff --git a/modules/rest_client/rest_sockets.h b/modules/rest_client/rest_sockets.h new file mode 100644 index 00000000000..0af32663652 --- /dev/null +++ b/modules/rest_client/rest_sockets.h @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2025 OpenSIPS Solutions + * + * This file is part of opensips, a free SIP server. + * + * opensips is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * opensips is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef _REST_SOCKET_H_ +#define _REST_SOCKET_H_ + +#include +#include + +#include + +typedef struct _curl_easy_handles { + int size; + CURL **handles; +} CURLEasyHandles; + +int init_process_limits(rlim_t rlim_cur); +int get_max_fd(int no_max_default); +int running_sockets(void); +int start_multi_socket(CURLM *multi_handle); +int end_multi_socket(CURLM *multi_handle); +int run_multi_socket(CURLM *multi_handle); +int setsocket_callback_request(CURLM *multi_handle); +int setsocket_callback_connect(CURLM *multi_handle, CURLEasyHandles *easy_handles); + +#endif /* _REST_SOCKET_H_ */ \ No newline at end of file