Skip to content

Commit

Permalink
Send websocket message in multiple fragments when needed. (#2355)
Browse files Browse the repository at this point in the history
  • Loading branch information
atoppi authored Sep 15, 2020
1 parent b204c8e commit 8b9549a
Showing 1 changed file with 56 additions and 39 deletions.
95 changes: 56 additions & 39 deletions transports/janus_websockets.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ typedef struct janus_websockets_client {
GAsyncQueue *messages; /* Queue of outgoing messages to push */
char *incoming; /* Buffer containing the incoming message to process (in case there are fragments) */
unsigned char *buffer; /* Buffer containing the message to send */
int buflen; /* Length of the buffer (may be resized after re-allocations) */
int bufpending; /* Data an interrupted previous write couldn't send */
int bufoffset; /* Offset from where the interrupted previous write should resume */
size_t buflen; /* Length of the buffer (may be resized after re-allocations) */
size_t bufpending; /* Data an interrupted previous write couldn't send */
size_t bufoffset; /* Offset from where the interrupted previous write should resume */
volatile gint destroyed; /* Whether this libwebsockets client instance has been closed */
janus_transport_session *ts; /* Janus core-transport session */
} janus_websockets_client;
Expand Down Expand Up @@ -1057,6 +1057,9 @@ static int janus_websockets_callback_https(
return janus_websockets_callback_http(wsi, reason, user, in, len);
}

/* Use ~ 2xMTU as chunk size */
#define MESSAGE_CHUNK_SIZE 2800

/* This callback handles Janus API requests */
static int janus_websockets_common_callback(
struct lws *wsi,
Expand Down Expand Up @@ -1184,7 +1187,7 @@ static int janus_websockets_common_callback(
/* Check if Websockets send pipe is choked */
if(lws_send_pipe_choked(wsi)) {
if(ws_client->buffer && ws_client->bufpending > 0 && ws_client->bufoffset > 0) {
JANUS_LOG(LOG_WARN, "Websockets choked with buffer: %d, trying again\n", ws_client->bufpending);
JANUS_LOG(LOG_WARN, "Websockets choked with buffer: %zu, trying again\n", ws_client->bufpending);
lws_callback_on_writable(wsi);
} else {
gint qlen = g_async_queue_length(ws_client->messages);
Expand All @@ -1198,56 +1201,70 @@ static int janus_websockets_common_callback(
}

/* Check if we have a pending/partial write to complete first */
if(ws_client->buffer && ws_client->bufpending > 0 && ws_client->bufoffset > 0
&& !g_atomic_int_get(&ws_client->destroyed) && !g_atomic_int_get(&stopping)) {
JANUS_LOG(LOG_HUGE, "[%s-%p] Completing pending WebSocket write (still need to write last %d bytes)...\n",
if(ws_client->buffer && ws_client->bufpending > 0 && ws_client->bufoffset > 0) {
JANUS_LOG(LOG_HUGE, "[%s-%p] Completing pending WebSocket write (still need to write last %zu bytes)...\n",
log_prefix, wsi, ws_client->bufpending);
int sent = lws_write(wsi, ws_client->buffer + ws_client->bufoffset, ws_client->bufpending, LWS_WRITE_TEXT);
JANUS_LOG(LOG_HUGE, "[%s-%p] -- Sent %d/%d bytes\n", log_prefix, wsi, sent, ws_client->bufpending);
if(sent > -1 && sent < ws_client->bufpending) {
/* We still couldn't send everything that was left, we'll try and complete this in the next round */
ws_client->bufpending -= sent;
ws_client->bufoffset += sent;
} else {
/* Clear the pending/partial write queue */
ws_client->bufpending = 0;
ws_client->bufoffset = 0;
} else {
/* Shoot all the pending messages */
char *response = g_async_queue_try_pop(ws_client->messages);
if (!response) {
/* No messages found */
janus_mutex_unlock(&ws_client->ts->mutex);
return 0;
}
if (g_atomic_int_get(&ws_client->destroyed) || g_atomic_int_get(&stopping)) {
free(response);
janus_mutex_unlock(&ws_client->ts->mutex);
return 0;
}
/* Done for this round, check the next response/notification later */
lws_callback_on_writable(wsi);
janus_mutex_unlock(&ws_client->ts->mutex);
return 0;
}
/* Shoot all the pending messages */
char *response = g_async_queue_try_pop(ws_client->messages);
if(response && !g_atomic_int_get(&ws_client->destroyed) && !g_atomic_int_get(&stopping)) {
/* Gotcha! */
int buflen = LWS_PRE + strlen(response);
JANUS_LOG(LOG_HUGE, "[%s-%p] Sending WebSocket message (%zu bytes)...\n", log_prefix, wsi, strlen(response));
size_t buflen = LWS_PRE + strlen(response);
if (buflen > ws_client->buflen) {
/* We need a larger shared buffer */
JANUS_LOG(LOG_HUGE, "[%s-%p] Re-allocating to %d bytes (was %d, response is %zu bytes)\n", log_prefix, wsi, buflen, ws_client->buflen, strlen(response));
JANUS_LOG(LOG_HUGE, "[%s-%p] Re-allocating to %zu bytes (was %zu, response is %zu bytes)\n", log_prefix, wsi, buflen, ws_client->buflen, strlen(response));
ws_client->buflen = buflen;
ws_client->buffer = g_realloc(ws_client->buffer, buflen);
}
memcpy(ws_client->buffer + LWS_PRE, response, strlen(response));
JANUS_LOG(LOG_HUGE, "[%s-%p] Sending WebSocket message (%zu bytes)...\n", log_prefix, wsi, strlen(response));
int sent = lws_write(wsi, ws_client->buffer + LWS_PRE, strlen(response), LWS_WRITE_TEXT);
JANUS_LOG(LOG_HUGE, "[%s-%p] -- Sent %d/%zu bytes\n", log_prefix, wsi, sent, strlen(response));
if(sent > -1 && sent < (int)strlen(response)) {
/* We couldn't send everything in a single write, we'll complete this in the next round */
ws_client->bufpending = strlen(response) - sent;
ws_client->bufoffset = LWS_PRE + sent;
JANUS_LOG(LOG_HUGE, "[%s-%p] -- Couldn't write all bytes (%d missing), setting offset %d\n",
log_prefix, wsi, ws_client->bufpending, ws_client->bufoffset);
}
/* Initialize pending bytes count and buffer offset */
ws_client->bufpending = strlen(response);
ws_client->bufoffset = LWS_PRE;
/* We can get rid of the message */
free(response);
/* Done for this round, check the next response/notification later */
lws_callback_on_writable(wsi);
}

if (g_atomic_int_get(&ws_client->destroyed) || g_atomic_int_get(&stopping)) {
janus_mutex_unlock(&ws_client->ts->mutex);
return 0;
}

/* Evaluate amount of data to send according to MESSAGE_CHUNK_SIZE */
int amount = ws_client->bufpending <= MESSAGE_CHUNK_SIZE ? ws_client->bufpending : MESSAGE_CHUNK_SIZE;
/* Set fragment flags */
int flags = lws_write_ws_flags(LWS_WRITE_TEXT, ws_client->bufoffset == LWS_PRE, ws_client->bufpending <= (size_t)amount);
/* Send the fragment with proper flags */
int sent = lws_write(wsi, ws_client->buffer + ws_client->bufoffset, (size_t)amount, flags);
JANUS_LOG(LOG_HUGE, "[%s-%p] -- First=%d, Last=%d, Requested=%d bytes, Sent=%d bytes, Missing=%zu bytes\n", log_prefix, wsi, ws_client->bufoffset <= LWS_PRE, ws_client->bufpending <= (size_t)amount, amount, sent, ws_client->bufpending - amount);
if(sent < amount) {
/* Error on sending, abort operation */
JANUS_LOG(LOG_ERR, "Websocket sent only %d bytes (expected %d)\n", sent, amount);
ws_client->bufpending = 0;
ws_client->bufoffset = 0;
} else {
/* Fragment successfully sent, update status */
ws_client->bufpending -= amount;
ws_client->bufoffset += amount;
if(ws_client->bufpending > 0) {
/* We couldn't send everything in a single write, we'll complete this in the next round */
JANUS_LOG(LOG_HUGE, "[%s-%p] -- Couldn't write all bytes (%zu missing), setting offset %zu\n",
log_prefix, wsi, ws_client->bufpending, ws_client->bufoffset);
}
}
/* Done for this round, check the next response/notification later */
lws_callback_on_writable(wsi);
janus_mutex_unlock(&ws_client->ts->mutex);
return 0;
}
return 0;
}
Expand Down

0 comments on commit 8b9549a

Please sign in to comment.