Skip to content

Commit

Permalink
Use custom GSource to handle HTTP request timeouts (see #2062 and #2066)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Apr 16, 2020
1 parent fe0d9f8 commit 5bc226c
Showing 1 changed file with 64 additions and 25 deletions.
89 changes: 64 additions & 25 deletions transports/janus_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,52 @@ static void janus_http_session_free(const janus_refcount *session_ref) {
}


/* Custom GSource for tracking request timeouts (including long polls) */
typedef struct janus_http_request_timeout {
GSource source;
janus_transport_session *ts;
janus_http_session *session;
} janus_http_request_timeout;
/* Helper to handle timeouts */
static void janus_http_timeout(janus_transport_session *ts, janus_http_session *session);
/* GSource Functions */
static gboolean janus_http_request_timeout_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) {
JANUS_LOG(LOG_WARN, "[%p] dispatch\n", source);
janus_http_request_timeout *t = (janus_http_request_timeout *)source;
/* Timeout fired, invoke the function */
janus_http_timeout(t->ts, t->session);
/* We're done */
g_source_destroy(source);
g_source_unref(source);
return G_SOURCE_REMOVE;
}
static void janus_http_request_timeout_finalize(GSource *source) {
JANUS_LOG(LOG_WARN, "[%p] finalize\n", source);
janus_http_request_timeout *timeout = (janus_http_request_timeout *)source;
if(timeout) {
if(timeout->session)
janus_refcount_decrease(&timeout->session->ref);
if(timeout->ts)
janus_refcount_decrease(&timeout->ts->ref);
}
}
static GSourceFuncs janus_http_request_timeout_funcs = {
NULL, NULL,
janus_http_request_timeout_dispatch,
janus_http_request_timeout_finalize,
NULL, NULL
};
static GSource *janus_http_request_timeout_create(janus_transport_session *ts, janus_http_session *session, gint timeout) {
GSource *source = g_source_new(&janus_http_request_timeout_funcs, sizeof(janus_http_request_timeout));
janus_http_request_timeout *t = (janus_http_request_timeout *)source;
t->ts = ts;
t->session = session;
g_source_set_ready_time(source, janus_get_monotonic_time() + timeout*G_USEC_PER_SEC);
JANUS_LOG(LOG_WARN, "[%p] create (%d)\n", source, timeout);
return source;
}


/* Callback (libmicrohttpd) invoked when a new connection is attempted on the REST API */
static int janus_http_client_connect(void *cls, const struct sockaddr *addr, socklen_t addrlen);
/* Callback (libmicrohttpd) invoked when a new connection is attempted on the admin/monitor webserver */
Expand All @@ -209,8 +255,6 @@ static int janus_http_return_success(janus_transport_session *ts, char *payload)
/* Helper to quickly send an error response */
static int janus_http_return_error(janus_transport_session *ts, uint64_t session_id,
const char *transaction, gint error, const char *format, ...) G_GNUC_PRINTF(5, 6);
/* Helper to handle timeouts */
static gboolean janus_http_timeout(gpointer user_data);


/* MHD Web Server */
Expand Down Expand Up @@ -954,8 +998,8 @@ int janus_http_send_message(janus_transport_session *transport, void *request_id
/* Send the events back */
if(msg->timeout != NULL) {
g_source_destroy(msg->timeout);
janus_refcount_decrease(&session->ref);
janus_refcount_decrease(&transport->ref);
g_source_unref(msg->timeout);
msg->timeout = NULL;
}
msg->timeout = NULL;
janus_http_notifier(msg);
Expand Down Expand Up @@ -997,7 +1041,8 @@ int janus_http_send_message(janus_transport_session *transport, void *request_id
}
if(msg->timeout != NULL) {
g_source_destroy(msg->timeout);
janus_refcount_decrease(&transport->ref);
g_source_unref(msg->timeout);
msg->timeout = NULL;
}
msg->timeout = NULL;
char *response_text = json_dumps(message, json_format);
Expand Down Expand Up @@ -1062,7 +1107,7 @@ void janus_http_session_claimed(janus_transport_session *transport, guint64 sess
janus_mutex_unlock(&sessions_mutex);
if(old_session == NULL)
return;
/* Were there a long polls waiting? */
/* Were there long polls waiting? */
janus_mutex_lock(&old_session->mutex);
janus_http_msg *msg = NULL;
while(old_session->longpolls) {
Expand All @@ -1072,15 +1117,13 @@ void janus_http_session_claimed(janus_transport_session *transport, guint64 sess
janus_refcount_increase(&msg->ref);
if(msg->timeout != NULL) {
g_source_destroy(msg->timeout);
janus_refcount_decrease(&old_session->ref);
janus_refcount_decrease(&transport->ref);
g_source_unref(msg->timeout);
msg->timeout = NULL;
}
msg->timeout = NULL;
if(g_atomic_pointer_compare_and_exchange(&msg->longpoll, session, NULL)) {
/* Add a new timeout that fires right away to return an error */
janus_refcount_increase(&transport->ref);
msg->timeout = g_timeout_source_new_seconds(0);
g_source_set_callback(msg->timeout, janus_http_timeout, transport, NULL);
msg->timeout = janus_http_request_timeout_create(transport, old_session, 0);
g_source_attach(msg->timeout, httpctx);
}
janus_refcount_decrease(&msg->ref);
Expand Down Expand Up @@ -1431,8 +1474,7 @@ static int janus_http_handler(void *cls, struct MHD_Connection *connection,
/* We won't wait forever for an answer (about 30 seconds) */
janus_refcount_increase(&ts->ref);
janus_refcount_increase(&session->ref);
msg->timeout = g_timeout_source_new_seconds(30);
g_source_set_callback(msg->timeout, janus_http_timeout, ts, NULL);
msg->timeout = janus_http_request_timeout_create(ts, session, 30);
g_source_attach(msg->timeout, httpctx);
/* Mark this connection as the long poll for this session */
msg->max_events = max_events;
Expand Down Expand Up @@ -1500,8 +1542,7 @@ static int janus_http_handler(void *cls, struct MHD_Connection *connection,
MHD_destroy_response(response);
/* We won't wait forever for an answer (about 10 seconds) */
janus_refcount_increase(&ts->ref);
msg->timeout = g_timeout_source_new_seconds(10);
g_source_set_callback(msg->timeout, janus_http_timeout, ts, NULL);
msg->timeout = janus_http_request_timeout_create(ts, NULL, 10);
g_source_attach(msg->timeout, httpctx);
/* Pass the ball to the core */
JANUS_LOG(LOG_HUGE, "Forwarding request to the core (%p)\n", msg);
Expand Down Expand Up @@ -1744,8 +1785,7 @@ static int janus_http_admin_handler(void *cls, struct MHD_Connection *connection
MHD_destroy_response(response);
/* We won't wait forever for an answer (about 10 seconds) */
janus_refcount_increase(&ts->ref);
msg->timeout = g_timeout_source_new_seconds(10);
g_source_set_callback(msg->timeout, janus_http_timeout, ts, NULL);
msg->timeout = janus_http_request_timeout_create(ts, NULL, 10);
g_source_attach(msg->timeout, httpctx);
/* Pass the ball to the core */
JANUS_LOG(LOG_HUGE, "Forwarding admin request to the core (%p)\n", msg);
Expand Down Expand Up @@ -1792,11 +1832,9 @@ static void janus_http_request_completed(void *cls, struct MHD_Connection *conne
janus_refcount_increase(&session->ref);
if(request->timeout != NULL) {
g_source_destroy(request->timeout);
if(session != NULL)
janus_refcount_decrease(&session->ref);
janus_refcount_decrease(&ts->ref);
g_source_unref(request->timeout);
request->timeout = NULL;
}
request->timeout = NULL;
if(session) {
janus_mutex_lock(&session->mutex);
session->longpolls = g_list_remove(session->longpolls, ts);
Expand Down Expand Up @@ -1957,13 +1995,15 @@ static int janus_http_return_error(janus_transport_session *ts, uint64_t session
}

/* Helper to handle timeouts */
static gboolean janus_http_timeout(gpointer user_data) {
janus_transport_session *ts = (janus_transport_session *)user_data;
void janus_http_timeout(janus_transport_session *ts, janus_http_session *session) {
if(g_atomic_int_get(&ts->destroyed))
return;
janus_refcount_increase(&ts->ref);
janus_http_msg *request = (janus_http_msg *)ts->transport_p;
request->timeout = NULL;
/* Is this a long poll timeout, simply meaning we had nothing to send so far? */
janus_http_session *session = (janus_http_session *)g_atomic_pointer_get(&request->longpoll);
if(session != NULL) {
janus_refcount_increase(&session->ref);
g_atomic_pointer_set(&request->longpoll, NULL);
/* Long poll, turn this into a "keepalive" response */
json_t *event = NULL;
Expand Down Expand Up @@ -1997,5 +2037,4 @@ static gboolean janus_http_timeout(gpointer user_data) {
MHD_resume_connection(request->connection);
}
janus_refcount_decrease(&ts->ref);
return G_SOURCE_REMOVE;
}

0 comments on commit 5bc226c

Please sign in to comment.