Skip to content

Commit

Permalink
fix: handle all curl return codes
Browse files Browse the repository at this point in the history
  • Loading branch information
steve-chavez committed Jul 8, 2024
1 parent 39f7bbf commit d522b2c
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ header_cb(void *contents, size_t size, size_t nmemb, void *userp)
return realsize;
}

static CURLMcode init(CURLM *cm, char *method, char *url, int timeout_milliseconds, struct curl_slist *request_headers, char *reqBody, int64 id, CurlData *cdata)
static void init(CURLM *cm, char *method, char *url, int timeout_milliseconds, struct curl_slist *request_headers, char *reqBody, int64 id, CurlData *cdata)
{
CURL *eh = curl_easy_init();

Expand All @@ -132,10 +132,11 @@ static CURLMcode init(CURLM *cm, char *method, char *url, int timeout_millisecon
(void)pushJsonbValue(&response_headers, WJB_BEGIN_OBJECT, NULL);
cdata->response_headers = response_headers;
cdata->id = id;
cdata->request_headers = request_headers;

request_headers = curl_slist_append(request_headers, "User-Agent: pg_net/" EXTVERSION);

cdata->request_headers = request_headers;

if (strcasecmp(method, "GET") == 0) {
if (reqBody) {
CURL_EZ_SETOPT(eh, CURLOPT_POSTFIELDS, reqBody);
Expand Down Expand Up @@ -174,7 +175,10 @@ static CURLMcode init(CURLM *cm, char *method, char *url, int timeout_millisecon
#else
CURL_EZ_SETOPT(eh, CURLOPT_PROTOCOLS, CURLPROTO_HTTP | CURLPROTO_HTTPS);
#endif
return curl_multi_add_handle(cm, eh);

CURLMcode code = curl_multi_add_handle(cm, eh);
if(code != CURLM_OK)
ereport(ERROR, errmsg("curl_multi_add_handle returned %s", curl_multi_strerror(code)));
}

bool is_extension_loaded(){
Expand All @@ -190,7 +194,7 @@ bool is_extension_loaded(){
void
worker_main(Datum main_arg)
{
CURLM *cm=NULL;
CURLM *curl_mhandle=NULL;
CURL *eh=NULL;
CURLMsg *msg=NULL;
int still_running=0, msgs_left=0;
Expand Down Expand Up @@ -280,10 +284,14 @@ worker_main(Datum main_arg)
res = curl_global_init(CURL_GLOBAL_ALL);

if(res) {
elog(ERROR, "error: curl_global_init() returned %d\n", res);
ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(res)));
}

cm = curl_multi_init();
curl_mhandle = curl_multi_init();

if(!curl_mhandle) {
ereport(ERROR, errmsg("curl_multi_init()"));
}

for (int j = 0; j < SPI_processed; j++)
{
Expand All @@ -308,7 +316,7 @@ worker_main(Datum main_arg)
CurlData *cdata;

if (strcasecmp(method, "GET") != 0 && strcasecmp(method, "POST") != 0 && strcasecmp(method, "DELETE") != 0) {
elog(ERROR, "error: Unsupported request method %s\n", method);
ereport(ERROR, errmsg("Unsupported request method %s", method));
}

headersBin = SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 5, &tupIsNull);
Expand All @@ -321,11 +329,7 @@ worker_main(Datum main_arg)

cdata = palloc(sizeof(CurlData));

res = init(cm, method, url, timeout_milliseconds, request_headers, body, id, cdata);

if(res) {
elog(ERROR, "error: init() returned %d\n", res);
}
init(curl_mhandle, method, url, timeout_milliseconds, request_headers, body, id, cdata);
}
}
else
Expand All @@ -337,22 +341,22 @@ worker_main(Datum main_arg)
do {
int numfds=0;

res = curl_multi_perform(cm, &still_running);
res = curl_multi_perform(curl_mhandle, &still_running);

if(res != CURLM_OK) {
elog(ERROR, "error: curl_multi_perform() returned %d\n", res);
ereport(ERROR, errmsg("error: curl_multi_perform() returned %d", res));
}

/*wait at least 1 second(1000 ms) in case all responses are slow*/
/*this avoids busy waiting and higher CPU usage*/
res = curl_multi_wait(cm, NULL, 0, 1000, &numfds);
res = curl_multi_wait(curl_mhandle, NULL, 0, 1000, &numfds);

if(res != CURLM_OK) {
elog(ERROR, "error: curl_multi_wait() returned %d\n", res);
ereport(ERROR, errmsg("error: curl_multi_wait() returned %d", res));
}
} while(still_running);

while ((msg = curl_multi_info_read(cm, &msgs_left))) {
while ((msg = curl_multi_info_read(curl_mhandle, &msgs_left))) {
if (msg->msg == CURLMSG_DONE) {
CURLcode return_code = msg->data.result;
eh = msg->easy_handle;
Expand Down Expand Up @@ -414,14 +418,19 @@ worker_main(Datum main_arg)
pfree(cdata);
}

curl_multi_remove_handle(cm, eh);
res = curl_multi_remove_handle(curl_mhandle, eh);
if(res != CURLM_OK)
ereport(ERROR, errmsg("curl_multi_remove_handle: %s", curl_multi_strerror(res)));

curl_easy_cleanup(eh);
} else {
elog(ERROR, "error: after curl_multi_info_read(), CURLMsg=%d\n", msg->msg);
ereport(ERROR, errmsg("curl_multi_info_read(), CURLMsg=%d\n", msg->msg));
}
}

curl_multi_cleanup(cm);
res = curl_multi_cleanup(curl_mhandle);
if(res != CURLM_OK)
ereport(ERROR, errmsg("curl_multi_cleanup: %s", curl_multi_strerror(res)));

SPI_finish();
PopActiveSnapshot();
Expand Down

0 comments on commit d522b2c

Please sign in to comment.