diff --git a/src/plugins/janus_videoroom.c b/src/plugins/janus_videoroom.c index d8d1b2ced6..e9b6e09edf 100644 --- a/src/plugins/janus_videoroom.c +++ b/src/plugins/janus_videoroom.c @@ -1514,6 +1514,7 @@ room-: { #include "../ip-utils.h" #include #include +#include /* Plugin information */ @@ -1741,6 +1742,7 @@ static struct janus_json_parameter record_parameters[] = { {"record", JANUS_JSON_BOOL, JANUS_JSON_PARAM_REQUIRED} }; static struct janus_json_parameter rtp_forward_parameters[] = { + {"secret", JSON_STRING, 0}, {"host", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}, {"host_family", JSON_STRING, 0}, {"simulcast", JANUS_JSON_BOOL, 0}, @@ -1765,6 +1767,7 @@ static struct janus_json_parameter rtp_forward_parameters[] = { {"data_port", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, }; static struct janus_json_parameter rtp_forward_stream_parameters[] = { + {"secret", JSON_STRING, 0}, {"mid", JANUS_JSON_STRING, JANUS_JSON_PARAM_REQUIRED}, {"host", JSON_STRING, 0}, {"host_family", JSON_STRING, 0}, @@ -1783,6 +1786,7 @@ static struct janus_json_parameter rtp_forward_stream_parameters[] = { {"pt_3", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE} }; static struct janus_json_parameter stop_rtp_forward_parameters[] = { + {"secret", JSON_STRING, 0}, {"stream_id", JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE} }; static struct janus_json_parameter publisher_parameters[] = { @@ -1871,6 +1875,49 @@ static struct janus_json_parameter switch_update_parameters[] = { {"mid", JANUS_JSON_STRING, JANUS_JSON_PARAM_REQUIRED}, {"sub_mid", JANUS_JSON_STRING, JANUS_JSON_PARAM_REQUIRED} }; +static struct janus_json_parameter publish_remotely_parameters[] = { + {"secret", JSON_STRING, 0}, + {"remote_id", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}, + {"host", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}, + {"host_family", JSON_STRING, 0}, + {"port", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE | JANUS_JSON_PARAM_REQUIRED}, + {"rtcp_port", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE} +}; +static struct janus_json_parameter unpublish_remotely_parameters[] = { + {"secret", JSON_STRING, 0}, + {"remote_id", JSON_STRING, JANUS_JSON_PARAM_REQUIRED} +}; +static struct janus_json_parameter remote_publisher_parameters[] = { + {"secret", JSON_STRING, 0}, + {"display", JANUS_JSON_STRING, 0}, + {"mcast", JANUS_JSON_STRING, 0}, + {"iface", JANUS_JSON_STRING, 0}, + {"port", JANUS_JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, + {"streams", JANUS_JSON_ARRAY, JANUS_JSON_PARAM_REQUIRED}, +}; +static struct janus_json_parameter remote_publisher_update_parameters[] = { + {"secret", JSON_STRING, 0}, + {"display", JANUS_JSON_STRING, 0}, + {"streams", JANUS_JSON_ARRAY, JANUS_JSON_PARAM_REQUIRED} +}; +static struct janus_json_parameter remote_publisher_stream_parameters[] = { + {"mid", JANUS_JSON_STRING, JANUS_JSON_PARAM_REQUIRED}, + {"mindex", JANUS_JSON_INTEGER, JANUS_JSON_PARAM_REQUIRED | JANUS_JSON_PARAM_POSITIVE}, + {"type", JANUS_JSON_STRING, JANUS_JSON_PARAM_REQUIRED}, + {"codec", JANUS_JSON_STRING, 0}, + {"description", JANUS_JSON_STRING, 0}, + {"disabled", JANUS_JSON_BOOL, 0}, + {"stereo", JANUS_JSON_BOOL, 0}, + {"fec", JANUS_JSON_BOOL, 0}, + {"dtx", JANUS_JSON_BOOL, 0}, + {"h264_profile", JANUS_JSON_BOOL, 0}, + {"vp9_profile", JANUS_JSON_BOOL, 0}, + {"simulcast", JANUS_JSON_BOOL, 0}, + {"svc", JANUS_JSON_BOOL, 0}, + {"audiolevel_ext_id", JANUS_JSON_INTEGER, 0}, + {"videoorient_ext_id", JANUS_JSON_INTEGER, 0}, + {"playoutdelay_ext_id", JANUS_JSON_INTEGER, 0}, +}; /* Static configuration instance */ static janus_config *config = NULL; @@ -1923,6 +1970,17 @@ static janus_sdp_mtype janus_videoroom_media_sdptype(janus_videoroom_media type) } return JANUS_SDP_OTHER; } +static janus_videoroom_media janus_videoroom_media_from_str(const char *type) { + if(type == NULL) + return JANUS_VIDEOROOM_MEDIA_NONE; + else if(!strcasecmp(type, "audio")) + return JANUS_VIDEOROOM_MEDIA_AUDIO; + else if(!strcasecmp(type, "video")) + return JANUS_VIDEOROOM_MEDIA_VIDEO; + else if(!strcasecmp(type, "data")) + return JANUS_VIDEOROOM_MEDIA_DATA; + return JANUS_VIDEOROOM_MEDIA_NONE; +} typedef struct janus_videoroom_message { janus_plugin_session *handle; @@ -2019,6 +2077,8 @@ typedef struct janus_videoroom_rtp_forwarder { /* Only needed for SRTP forwarders */ gboolean is_srtp; janus_videoroom_srtp_context *srtp_ctx; + /* In case this is part of the remotization of publisher */ + char *remote_id; /* Reference */ volatile gint destroyed; janus_refcount ref; @@ -2110,6 +2170,14 @@ typedef struct janus_videoroom_publisher { janus_mutex subscribers_mutex; janus_mutex own_subscriptions_mutex; GHashTable *srtp_contexts; /* SRTP contexts that we can share among RTP forwarders */ + /* In case this local publisher is being forwarder remotely */ + GHashTable *remote_recipients; + /* In case this is a remote publisher */ + gboolean remote; /* Whether this is a remote publisher */ + int remote_fd, remote_rtcp_fd, pipefd[2]; /* Remote publisher sockets */ + struct sockaddr_storage rtcp_addr; /* RTCP address of the remote publisher */ + GThread *remote_thread; /* Remote publisher incoming packets thread */ + volatile gint remote_leaving; /* Index of RTP (or data) forwarders for this participant (all streams), if any */ GHashTable *rtp_forwarders; janus_mutex rtp_forwarders_mutex; @@ -2135,7 +2203,6 @@ typedef struct janus_videoroom_publisher_stream { char *fmtp; /* fmtp that ended up being negotiated, if any (for video profiles) */ char *h264_profile; /* H264 profile used for this stream (if video and H264 codec) */ char *vp9_profile; /* VP9 profile this publisher is using (if video and VP9 codec) */ - guint32 ssrc; /* Internal SSRC of this stream */ gint64 fir_latest; /* Time of latest sent PLI (to avoid flooding) */ gint fir_seq; /* FIR sequence number, if needed */ gboolean opusfec; /* Whether this stream is sending inband Opus FEC */ @@ -2165,6 +2232,10 @@ typedef struct janus_videoroom_publisher_stream { /* RTP (or data) forwarders for this stream, if any */ GHashTable *rtp_forwarders; janus_mutex rtp_forwarders_mutex; + /* In case this is a stream from a remote publisher */ + volatile gint need_pli; /* Whether we need to send a PLI later */ + volatile gint sending_pli; /* Whether we're currently sending a PLI */ + gint64 pli_latest; /* Time of latest sent PLI (to avoid flooding) */ /* Subscriptions to this publisher stream (who's receiving it) */ GSList *subscribers; janus_mutex subscribers_mutex; @@ -2180,6 +2251,23 @@ static janus_videoroom_rtp_forwarder *janus_videoroom_rtp_forwarder_add_helper(j static json_t *janus_videoroom_rtp_forwarder_summary(janus_videoroom_rtp_forwarder *f); static void janus_videoroom_create_dummy_publisher(janus_videoroom *room, GHashTable *streams); +/* We support remote publishers as well, for which we use plain RTP, + * which means we need to create and work with generic file descriptors */ +#define DEFAULT_RTP_RANGE_MIN 10000 +#define DEFAULT_RTP_RANGE_MAX 60000 +static uint16_t rtp_range_min = DEFAULT_RTP_RANGE_MIN; +static uint16_t rtp_range_max = DEFAULT_RTP_RANGE_MAX; +static uint16_t rtp_range_slider = DEFAULT_RTP_RANGE_MIN; +static janus_mutex fd_mutex = JANUS_MUTEX_INITIALIZER; +#define REMOTE_PUBLISHER_BASE_SSRC 1000 +#define REMOTE_PUBLISHER_SSRC_STEP 10 +/* Helpers to create a listener filedescriptor */ +static int janus_videoroom_create_fd(int port, in_addr_t mcast, const janus_network_address *iface, char *host, size_t hostlen); +/* Helper to return fd port */ +static int janus_videoroom_get_fd_port(int fd); +/* Thread responsible for a specific remote publisher */ +static void *janus_videoroom_remote_publisher_thread(void *data); + typedef struct janus_videoroom_subscriber { janus_videoroom_session *session; janus_videoroom *room; /* Room */ @@ -2249,6 +2337,23 @@ typedef struct janus_videoroom_rtp_relay_packet { gboolean textdata; } janus_videoroom_rtp_relay_packet; +/* VideoRoom publishers can be forwarder remotely: we use the following + * struct to track specific recipients of a local publisher */ +typedef struct janus_videoroom_remote_recipient { + char *remote_id; /* ID of this publisher remotization */ + char *host; /* Address this publisher is being relayed to */ + uint16_t port; /* Port this publisher is being relayed to */ + uint16_t rtcp_port; /* RTCP port this publisher is going to latch to */ + gboolean rtcp_added; /* Whether we created an RTCP socket for this remotization */ +} janus_videoroom_remote_recipient; +static void janus_videoroom_remote_recipient_free(janus_videoroom_remote_recipient *r) { + if(r) { + g_free(r->remote_id); + g_free(r->host); + g_free(r); + } +} + /* Start / stop recording */ static void janus_videoroom_recorder_create(janus_videoroom_publisher_stream *ps); static void janus_videoroom_recorder_close(janus_videoroom_publisher *participant); @@ -2391,12 +2496,20 @@ static void janus_videoroom_publisher_free(const janus_refcount *p_ref) { if(p->udp_sock > 0) close(p->udp_sock); + g_hash_table_destroy(p->remote_recipients); g_hash_table_destroy(p->rtp_forwarders); - p->rtp_forwarders = NULL; g_hash_table_destroy(p->srtp_contexts); - p->srtp_contexts = NULL; g_slist_free(p->subscriptions); + if(p->remote_fd > 0) + close(p->remote_fd); + if(p->remote_rtcp_fd > 0) + close(p->remote_rtcp_fd); + if(p->pipefd[0] > 0) + close(p->pipefd[0]); + if(p->pipefd[1] > 0) + close(p->pipefd[1]); + janus_mutex_destroy(&p->subscribers_mutex); janus_mutex_destroy(&p->rtp_forwarders_mutex); @@ -2511,13 +2624,58 @@ static void janus_videoroom_codecstr(janus_videoroom *videoroom, char *audio_cod } } +/* Helper method to send an RTCP PLI to a remote publisher */ +static void janus_videoroom_rtcp_pli_send(janus_videoroom_publisher_stream *ps) { + if(ps == NULL || ps->publisher == NULL) + return; + janus_videoroom_publisher *publisher = ps->publisher; + if(publisher->remote_rtcp_fd < 0 || publisher->rtcp_addr.ss_family == 0) + return; + if(!g_atomic_int_compare_and_exchange(&ps->sending_pli, 0, 1)) + return; + gint64 now = janus_get_monotonic_time(); + if(now - ps->pli_latest < G_USEC_PER_SEC) { + /* We just sent a PLI less than a second ago, schedule a new delivery later */ + g_atomic_int_set(&ps->need_pli, 1); + g_atomic_int_set(&ps->sending_pli, 0); + return; + } + /* Update the time of when we last sent a keyframe request */ + g_atomic_int_set(&ps->need_pli, 0); + ps->pli_latest = janus_get_monotonic_time(); + JANUS_LOG(LOG_HUGE, "Sending PLI\n"); + /* Generate a PLI */ + char rtcp_buf[12]; + int rtcp_len = 12; + janus_rtcp_pli((char *)&rtcp_buf, rtcp_len); + uint32_t ssrc = REMOTE_PUBLISHER_BASE_SSRC + (ps->mindex*REMOTE_PUBLISHER_SSRC_STEP); + janus_rtcp_fix_ssrc(NULL, rtcp_buf, rtcp_len, 1, 1, ssrc); + /* Send the packet */ + socklen_t addrlen = publisher->rtcp_addr.ss_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + int sent = 0; + if((sent = sendto(publisher->remote_rtcp_fd, rtcp_buf, rtcp_len, 0, + (struct sockaddr *)&publisher->rtcp_addr, addrlen)) < 0) { + JANUS_LOG(LOG_ERR, "Error in sendto... %d (%s)\n", errno, g_strerror(errno)); + } else { + JANUS_LOG(LOG_HUGE, "Sent %d/%d bytes\n", sent, rtcp_len); + } + g_atomic_int_set(&ps->sending_pli, 0); +} + static void janus_videoroom_reqpli(janus_videoroom_publisher_stream *ps, const char *reason) { if(ps == NULL || g_atomic_int_get(&ps->destroyed) || ps->publisher == NULL || g_atomic_int_get(&ps->publisher->destroyed)) return; /* Send a PLI */ JANUS_LOG(LOG_VERB, "%s sending PLI to %s (#%d, %s)\n", reason, ps->publisher->user_id_str, ps->mindex, ps->publisher->display ? ps->publisher->display : "??"); - gateway->send_pli_stream(ps->publisher->session->handle, ps->mindex); + if(!ps->publisher->remote) { + /* Easy enough, local publisher so we ask the Janus core to send a PLI */ + gateway->send_pli_stream(ps->publisher->session->handle, ps->mindex); + } else { + /* This is a remote publisher, so we'll need to send a PLI to the remote RTCP address */ + JANUS_LOG(LOG_VERB, "Sending PLI to remote publisher\n"); + janus_videoroom_rtcp_pli_send(ps); + } /* Update the time of when we last sent a keyframe request */ ps->fir_latest = janus_get_monotonic_time(); } @@ -2827,6 +2985,7 @@ static void janus_videoroom_rtp_forwarder_free(const janus_refcount *f_ref) { if(forward->srtp_ctx->count == 0 && forward->srtp_ctx->contexts != NULL) g_hash_table_remove(forward->srtp_ctx->contexts, forward->srtp_ctx->id); } + g_free(forward->remote_id); g_free(forward); forward = NULL; } @@ -2879,6 +3038,8 @@ static void janus_videoroom_create_dummy_publisher(janus_videoroom *room, GHashT (GDestroyNotify)g_free, (GDestroyNotify)janus_videoroom_publisher_stream_unref); janus_mutex_init(&publisher->streams_mutex); janus_mutex_init(&publisher->rtp_forwarders_mutex); + publisher->remote_recipients = g_hash_table_new_full(g_str_hash, g_str_equal, + (GDestroyNotify)g_free, (GDestroyNotify)janus_videoroom_remote_recipient_free); publisher->rtp_forwarders = g_hash_table_new(NULL, NULL); publisher->srtp_contexts = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, (GDestroyNotify)janus_videoroom_srtp_context_free); publisher->udp_sock = -1; @@ -6003,7 +6164,15 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi while(temp) { janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; janus_mutex_lock(&ps->rtp_forwarders_mutex); - if(g_hash_table_remove(ps->rtp_forwarders, GUINT_TO_POINTER(stream_id))) { + janus_videoroom_rtp_forwarder *f = g_hash_table_lookup(ps->rtp_forwarders, GUINT_TO_POINTER(stream_id)); + if(f != NULL) { + if(f->remote_id != NULL) { + /* This belongs to a remotization, ignore */ + janus_mutex_unlock(&ps->rtp_forwarders_mutex); + found = FALSE; + break; + } + g_hash_table_remove(ps->rtp_forwarders, GUINT_TO_POINTER(stream_id)); janus_mutex_unlock(&ps->rtp_forwarders_mutex); /* Found, remove from global index too */ g_hash_table_remove(publisher->rtp_forwarders, GUINT_TO_POINTER(stream_id)); @@ -6117,14 +6286,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi janus_refcount_increase(&videoroom->ref); janus_mutex_unlock(&rooms_mutex); janus_mutex_lock(&videoroom->mutex); - /* A secret may be required for this action */ - JANUS_CHECK_SECRET(videoroom->room_secret, root, "secret", error_code, error_cause, - JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT, JANUS_VIDEOROOM_ERROR_UNAUTHORIZED); - if(error_code != 0) { - janus_mutex_unlock(&videoroom->mutex); - janus_refcount_decrease(&videoroom->ref); - goto prepare_response; - } if(!strcasecmp(action_text, "enable")) { JANUS_LOG(LOG_VERB, "Enabling the check on allowed authorization tokens for room %s\n", room_id_str); videoroom->check_allowed = TRUE; @@ -6236,14 +6397,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi janus_refcount_increase(&videoroom->ref); janus_mutex_unlock(&rooms_mutex); janus_mutex_lock(&videoroom->mutex); - /* A secret may be required for this action */ - JANUS_CHECK_SECRET(videoroom->room_secret, root, "secret", error_code, error_cause, - JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT, JANUS_VIDEOROOM_ERROR_UNAUTHORIZED); - if(error_code != 0) { - janus_mutex_unlock(&videoroom->mutex); - janus_refcount_decrease(&videoroom->ref); - goto prepare_response; - } guint64 user_id = 0; char user_id_num[30], *user_id_str = NULL; if(!string_ids) { @@ -6363,14 +6516,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi janus_refcount_increase(&videoroom->ref); janus_mutex_unlock(&rooms_mutex); janus_mutex_lock(&videoroom->mutex); - /* A secret may be required for this action */ - JANUS_CHECK_SECRET(videoroom->room_secret, root, "secret", error_code, error_cause, - JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT, JANUS_VIDEOROOM_ERROR_UNAUTHORIZED); - if(error_code != 0) { - janus_mutex_unlock(&videoroom->mutex); - janus_refcount_decrease(&videoroom->ref); - goto prepare_response; - } guint64 user_id = 0; char user_id_num[30], *user_id_str = NULL; if(!string_ids) { @@ -6498,6 +6643,8 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi json_object_set_new(pl, "display", json_string(p->display)); if(p->dummy) json_object_set_new(pl, "dummy", json_true()); + if(p->remote) + json_object_set_new(pl, "remote", json_true()); json_object_set_new(pl, "publisher", g_atomic_int_get(&p->session->started) ? json_true() : json_false()); /* To see if the participant is talking, we need to find the audio stream(s) */ if(g_atomic_int_get(&p->session->started)) { @@ -6590,6 +6737,9 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi g_hash_table_iter_init(&iter_f, ps->rtp_forwarders); while(g_hash_table_iter_next(&iter_f, &key_f, &value_f)) { janus_videoroom_rtp_forwarder *rpv = value_f; + /* If this belongs to a remotization, skip it */ + if(rpv->remote_id != NULL) + continue; /* Return a different, media-agnostic, format */ json_t *fl = janus_videoroom_rtp_forwarder_summary(rpv); json_array_append_new(flist, fl); @@ -6675,50 +6825,1186 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi json_object_set_new(response, "videoroom", json_string("success")); json_object_set_new(response, "record", json_boolean(recording_active)); goto prepare_response; - } else { - /* Not a request we recognize, don't do anything */ - return NULL; - } - -prepare_response: - { - if(error_code == 0 && !response) { + } else if(!strcasecmp(request_text, "publish_remotely")) { + /* Configure a local publisher to restream to a remote VideoRomm instance as well */ + JANUS_VALIDATE_JSON_OBJECT(root, publish_remotely_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + if(error_code != 0) + goto prepare_response; + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, room_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, roomstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, pid_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, pidstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + if(lock_rtpfwd && admin_key != NULL) { + /* An admin key was specified: make sure it was provided, and that it's valid */ + JANUS_VALIDATE_JSON_OBJECT(root, adminkey_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + if(error_code != 0) + goto prepare_response; + JANUS_CHECK_SECRET(admin_key, root, "admin_key", error_code, error_cause, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT, JANUS_VIDEOROOM_ERROR_UNAUTHORIZED); + if(error_code != 0) + goto prepare_response; + } + const char *remote_id = json_string_value(json_object_get(root, "remote_id")); + json_t *pub_id = json_object_get(root, "publisher_id"); + json_t *json_host = json_object_get(root, "host"); + json_t *json_host_family = json_object_get(root, "host_family"); + const char *host_family = json_string_value(json_host_family); + uint16_t port = json_integer_value(json_object_get(root, "port")); + uint16_t rtcp_port = json_integer_value(json_object_get(root, "rtcp_port")); + if(port == 0) { + JANUS_LOG(LOG_ERR, "Invalid element (port must be a non-zero positive integer)\n"); + error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT; + g_snprintf(error_cause, 512, "Invalid element (port must be a non-zero positive integer)"); + goto prepare_response; + } + int family = 0; + if(host_family) { + if(!strcasecmp(host_family, "ipv4")) { + family = AF_INET; + } else if(!strcasecmp(host_family, "ipv6")) { + family = AF_INET6; + } else { + JANUS_LOG(LOG_ERR, "Unsupported protocol family (%s)\n", host_family); + error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT; + g_snprintf(error_cause, 512, "Unsupported protocol family (%s)", host_family); + goto prepare_response; + } + } + guint64 publisher_id = 0; + char publisher_id_num[30], *publisher_id_str = NULL; + if(!string_ids) { + publisher_id = json_integer_value(pub_id); + g_snprintf(publisher_id_num, sizeof(publisher_id_num), "%"SCNu64, publisher_id); + publisher_id_str = publisher_id_num; + } else { + publisher_id_str = (char *)json_string_value(pub_id); + } + const char *host = json_string_value(json_host), *resolved_host = NULL; + /* Check if we need to resolve this host address */ + struct addrinfo *res = NULL, *start = NULL; + janus_network_address addr; + janus_network_address_string_buffer addr_buf; + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + if(family != 0) + hints.ai_family = family; + if(getaddrinfo(host, NULL, family != 0 ? &hints : NULL, &res) == 0) { + start = res; + while(res != NULL) { + if(janus_network_address_from_sockaddr(res->ai_addr, &addr) == 0 && + janus_network_address_to_string_buffer(&addr, &addr_buf) == 0) { + /* Resolved */ + resolved_host = janus_network_address_string_from_buffer(&addr_buf); + freeaddrinfo(start); + start = NULL; + break; + } + res = res->ai_next; + } + } + if(resolved_host == NULL) { + if(start) + freeaddrinfo(start); + JANUS_LOG(LOG_ERR, "Could not resolve address (%s)...\n", host); + error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT; + g_snprintf(error_cause, 512, "Could not resolve address (%s)...", host); + goto prepare_response; + } + host = resolved_host; + /* Look for room and publisher */ + janus_mutex_lock(&rooms_mutex); + janus_videoroom *videoroom = NULL; + error_code = janus_videoroom_access_room(root, TRUE, FALSE, &videoroom, error_cause, sizeof(error_cause)); + if(error_code != 0) { + janus_mutex_unlock(&rooms_mutex); + goto prepare_response; + } + janus_refcount_increase(&videoroom->ref); + janus_mutex_unlock(&rooms_mutex); + janus_mutex_lock(&videoroom->mutex); + janus_videoroom_publisher *publisher = g_hash_table_lookup(videoroom->participants, + string_ids ? (gpointer)publisher_id_str : (gpointer)&publisher_id); + if(publisher == NULL) { + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "No such publisher (%s)\n", publisher_id_str); + error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED; + g_snprintf(error_cause, 512, "No such feed (%s)", publisher_id_str); + goto prepare_response; + } + janus_refcount_increase(&publisher->ref); /* This is just to handle the request for now */ + janus_mutex_unlock(&videoroom->mutex); + /* FIXME At the moment, we only allow for the remotization of + * local publishers, not remote ones: it may make sense to allow + * the remotization of remote publishers as well in the future + * (e.g., for cascading beyond the source), but that's something + * that in case we'll work on in subsequent code changes */ + if(publisher->remote) { + janus_refcount_decrease(&publisher->ref); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "Only local publishers can be remotized\n"); + error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT; + g_snprintf(error_cause, 512, "Only local publishers can be remotized"); + goto prepare_response; + } + janus_mutex_lock(&publisher->rtp_forwarders_mutex); + if(g_hash_table_lookup(publisher->remote_recipients, remote_id) != NULL) { + janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + janus_refcount_decrease(&publisher->ref); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "Remotization already exists (%s)\n", remote_id); + error_code = JANUS_VIDEOROOM_ERROR_ID_EXISTS; + g_snprintf(error_cause, 512, "Remotization already exists (%s)", remote_id); + goto prepare_response; + } + if(publisher->udp_sock <= 0) { + publisher->udp_sock = socket(!ipv6_disabled ? AF_INET6 : AF_INET, SOCK_DGRAM, IPPROTO_UDP); + int v6only = 0; + if(publisher->udp_sock <= 0 || + (!ipv6_disabled && setsockopt(publisher->udp_sock, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != 0)) { + janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + janus_refcount_decrease(&publisher->ref); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "Could not open UDP socket for RTP stream for publisher (%s), %d (%s)\n", + publisher_id_str, errno, g_strerror(errno)); error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR; - g_snprintf(error_cause, 512, "Invalid response"); + g_snprintf(error_cause, 512, "Could not open UDP socket for RTP stream"); + goto prepare_response; } - if(error_code != 0) { - /* Prepare JSON error event */ - response = json_object(); - json_object_set_new(response, "videoroom", json_string("event")); - json_object_set_new(response, "error_code", json_integer(error_code)); - json_object_set_new(response, "error", json_string(error_cause)); + } + /* Add a new RTP forwarder for each of the publisher streams */ + janus_mutex_lock(&publisher->streams_mutex); + janus_videoroom_publisher_stream *ps = NULL; + janus_videoroom_rtp_forwarder *f = NULL; + gboolean rtcp_added = FALSE, add_rtcp = FALSE; + GList *temp = publisher->streams; + while(temp) { + ps = (janus_videoroom_publisher_stream *)temp->data; + if(ps == NULL || g_atomic_int_get(&ps->destroyed)) { + temp = temp->next; + continue; } - return response; + if(ps->type == JANUS_VIDEOROOM_MEDIA_AUDIO) { + /* Audio stream */ + f = janus_videoroom_rtp_forwarder_add_helper(publisher, ps, + host, port, -1, 0, + (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP), + FALSE, 0, NULL, 0, FALSE, FALSE); + if(f != NULL) + f->remote_id = g_strdup(remote_id); + } else if(ps->type == JANUS_VIDEOROOM_MEDIA_VIDEO) { + /* Video stream */ + add_rtcp = (!rtcp_added && rtcp_port > 0); + f = janus_videoroom_rtp_forwarder_add_helper(publisher, ps, + host, port, add_rtcp ? rtcp_port : -1, 0, + (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP), + FALSE, 0, NULL, 0, TRUE, FALSE); + if(f != NULL) + f->remote_id = g_strdup(remote_id); + if(add_rtcp) + rtcp_added = TRUE; + /* Check if there's simulcast substreams we need to relay too */ + if(ps->vssrc[1] || ps->rid[1]) { + f = janus_videoroom_rtp_forwarder_add_helper(publisher, ps, + host, port, -1, 0, + (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP + 1), + FALSE, 0, NULL, 1, TRUE, FALSE); + if(f != NULL) + f->remote_id = g_strdup(remote_id); + } + if(ps->vssrc[2] || ps->rid[2]) { + f = janus_videoroom_rtp_forwarder_add_helper(publisher, ps, + host, port, -1, 0, + (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP + 2), + FALSE, 0, NULL, 2, TRUE, FALSE); + if(f != NULL) + f->remote_id = g_strdup(remote_id); + } + } else { + /* Data stream */ + f = janus_videoroom_rtp_forwarder_add_helper(publisher, ps, + host, port, -1, 0, + (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP), + FALSE, 0, NULL, 0, FALSE, TRUE); + if(f != NULL) + f->remote_id = g_strdup(remote_id); + } + temp = temp->next; } - -} - -struct janus_plugin_result *janus_videoroom_handle_message(janus_plugin_session *handle, char *transaction, json_t *message, json_t *jsep) { - if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) - return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized", NULL); - - /* Pre-parse the message */ - int error_code = 0; - char error_cause[512]; - json_t *root = message; - json_t *response = NULL; - - janus_mutex_lock(&sessions_mutex); - janus_videoroom_session *session = janus_videoroom_lookup_session(handle); - if(!session) { - janus_mutex_unlock(&sessions_mutex); - JANUS_LOG(LOG_ERR, "No session associated with this handle...\n"); - error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR; - g_snprintf(error_cause, 512, "%s", "No session associated with this handle..."); - goto plugin_response; - } - /* Increase the reference counter for this session: we'll decrease it after we handle the message */ - janus_refcount_increase(&session->ref); + janus_mutex_unlock(&publisher->streams_mutex); + /* Keep track of this remotization */ + janus_videoroom_remote_recipient *recipient = g_malloc(sizeof(janus_videoroom_remote_recipient)); + recipient->remote_id = g_strdup(remote_id); + recipient->host = g_strdup(host); + recipient->port = port; + recipient->rtcp_port = rtcp_port; + recipient->rtcp_added = rtcp_added; + g_hash_table_insert(publisher->remote_recipients, g_strdup(remote_id), recipient); + /* Done */ + janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + response = json_object(); + json_object_set_new(response, "videoroom", json_string("success")); + json_object_set_new(response, "room", string_ids ? json_string(publisher->room_id_str) : json_integer(publisher->room_id)); + json_object_set_new(response, "id", string_ids ? json_string(publisher->user_id_str) : json_integer(publisher->user_id)); + json_object_set_new(response, "remote_id", json_string(remote_id)); + janus_refcount_decrease(&publisher->ref); /* This is just to handle the request for now */ + goto prepare_response; + } else if(!strcasecmp(request_text, "unpublish_remotely")) { + /* Configure a local publisher to stop restreaming to a remote VideoRomm instance */ + JANUS_VALIDATE_JSON_OBJECT(root, unpublish_remotely_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + if(error_code != 0) + goto prepare_response; + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, room_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, roomstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, pid_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, pidstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + janus_mutex_lock(&rooms_mutex); + janus_videoroom *videoroom = NULL; + error_code = janus_videoroom_access_room(root, TRUE, FALSE, &videoroom, error_cause, sizeof(error_cause)); + if(error_code != 0) { + janus_mutex_unlock(&rooms_mutex); + goto prepare_response; + } + janus_refcount_increase(&videoroom->ref); + janus_mutex_unlock(&rooms_mutex); + janus_mutex_lock(&videoroom->mutex); + const char *remote_id = json_string_value(json_object_get(root, "remote_id")); + json_t *pub_id = json_object_get(root, "publisher_id"); + guint64 publisher_id = 0; + char publisher_id_num[30], *publisher_id_str = NULL; + if(!string_ids) { + publisher_id = json_integer_value(pub_id); + g_snprintf(publisher_id_num, sizeof(publisher_id_num), "%"SCNu64, publisher_id); + publisher_id_str = publisher_id_num; + } else { + publisher_id_str = (char *)json_string_value(pub_id); + } + janus_videoroom_publisher *publisher = g_hash_table_lookup(videoroom->participants, + string_ids ? (gpointer)publisher_id_str : (gpointer)&publisher_id); + if(publisher == NULL || g_atomic_int_get(&publisher->destroyed)) { + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "No such publisher (%s)\n", publisher_id_str); + error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED; + g_snprintf(error_cause, 512, "No such publisher (%s)", publisher_id_str); + goto prepare_response; + } + janus_refcount_increase(&publisher->ref); + janus_mutex_unlock(&videoroom->mutex); + janus_mutex_lock(&publisher->rtp_forwarders_mutex); + /* Check if we know of this remotization */ + if(g_hash_table_remove(publisher->remote_recipients, remote_id) == FALSE) { + janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + janus_refcount_decrease(&publisher->ref); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "No such remotization (%s)\n", remote_id); + error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED; + g_snprintf(error_cause, 512, "No such remotization (%s)", remote_id); + goto prepare_response; + } + /* Now get rid of all RTP forwarders with that ID */ + GList *temp = publisher->streams; + while(temp) { + janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; + janus_mutex_lock(&ps->rtp_forwarders_mutex); + GHashTableIter iter; + gpointer value; + g_hash_table_iter_init(&iter, ps->rtp_forwarders); + while(g_hash_table_iter_next(&iter, NULL, &value)) { + janus_videoroom_rtp_forwarder *f = (janus_videoroom_rtp_forwarder *)value; + if(f->remote_id != NULL && !strcmp(f->remote_id, remote_id)) { + /* We found one, get rid of it */ + uint32_t stream_id = f->stream_id; + g_hash_table_iter_remove(&iter); + /* Remove from global index too */ + g_hash_table_remove(publisher->rtp_forwarders, GUINT_TO_POINTER(stream_id)); + } + } + janus_mutex_unlock(&ps->rtp_forwarders_mutex); + temp = temp->next; + } + janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + /* Done */ + response = json_object(); + json_object_set_new(response, "videoroom", json_string("success")); + json_object_set_new(response, "room", string_ids ? json_string(publisher->room_id_str) : json_integer(publisher->room_id)); + json_object_set_new(response, "id", string_ids ? json_string(publisher->user_id_str) : json_integer(publisher->user_id)); + janus_refcount_decrease(&publisher->ref); + janus_refcount_decrease(&videoroom->ref); + goto prepare_response; + } else if(!strcasecmp(request_text, "listremotes")) { + /* List all the remote restreams a local publisher is configured with; + * notice that this is different from RTP forwarders, since this is + * explicitly related to the concept of remote publishers */ + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, room_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, roomstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, pid_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, pidstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + janus_mutex_lock(&rooms_mutex); + janus_videoroom *videoroom = NULL; + error_code = janus_videoroom_access_room(root, TRUE, FALSE, &videoroom, error_cause, sizeof(error_cause)); + if(error_code != 0) { + janus_mutex_unlock(&rooms_mutex); + goto prepare_response; + } + janus_refcount_increase(&videoroom->ref); + janus_mutex_unlock(&rooms_mutex); + janus_mutex_lock(&videoroom->mutex); + json_t *id = json_object_get(root, "publisher_id"); + guint64 publisher_id = 0; + char publisher_id_num[30], *publisher_id_str = NULL; + if(!string_ids) { + publisher_id = json_integer_value(id); + g_snprintf(publisher_id_num, sizeof(publisher_id_num), "%"SCNu64, publisher_id); + publisher_id_str = publisher_id_num; + } else { + publisher_id_str = (char *)json_string_value(id); + } + janus_videoroom_publisher *publisher = g_hash_table_lookup(videoroom->participants, + string_ids ? (gpointer)publisher_id_str : (gpointer)&publisher_id); + if(publisher == NULL || g_atomic_int_get(&publisher->destroyed)) { + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "No such publisher (%s)\n", publisher_id_str); + error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED; + g_snprintf(error_cause, 512, "No such publisher (%s)", publisher_id_str); + goto prepare_response; + } + janus_refcount_increase(&publisher->ref); + janus_mutex_unlock(&videoroom->mutex); + janus_mutex_lock(&publisher->rtp_forwarders_mutex); + /* Return a list of all remotizations for this publisher */ + json_t *list = json_array(); + GHashTableIter iter; + gpointer value; + g_hash_table_iter_init(&iter, publisher->remote_recipients); + while(g_hash_table_iter_next(&iter, NULL, &value)) { + janus_videoroom_remote_recipient *r = (janus_videoroom_remote_recipient *)value; + if(r) { + json_t *pr = json_object(); + json_object_set_new(pr, "remote_id", json_string(r->remote_id)); + json_object_set_new(pr, "host", json_string(r->host)); + json_object_set_new(pr, "port", json_integer(r->port)); + if(r->rtcp_port > 0) + json_object_set_new(pr, "rtcp_port", json_integer(r->rtcp_port)); + json_array_append_new(list, pr); + } + } + janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + /* Done */ + response = json_object(); + json_object_set_new(response, "videoroom", json_string("success")); + json_object_set_new(response, "room", string_ids ? json_string(publisher->room_id_str) : json_integer(publisher->room_id)); + json_object_set_new(response, "id", string_ids ? json_string(publisher->user_id_str) : json_integer(publisher->user_id)); + json_object_set_new(response, "list", list); + janus_refcount_decrease(&publisher->ref); + janus_refcount_decrease(&videoroom->ref); + goto prepare_response; + } else if(!strcasecmp(request_text, "add_remote_publisher")) { + /* Add a new remote publisher */ + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, room_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, roomstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, idopt_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, idstropt_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + JANUS_VALIDATE_JSON_OBJECT(root, remote_publisher_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + if(error_code != 0) + goto prepare_response; + /* Validate the stream parameters too */ + json_t *streams = json_object_get(root, "streams"); + if(json_array_size(streams) == 0) { + error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT; + JANUS_LOG(LOG_ERR, "Invalid element value (streams can't be empty)\n"); + g_snprintf(error_cause, 512, "Invalid element value (streams can't be empty)"); + goto prepare_response; + } + size_t i = 0; + for(i=0; iref); + janus_mutex_unlock(&rooms_mutex); + janus_mutex_lock(&videoroom->mutex); + /* Prepare a new fake publisher on behalf of the remote one */ + json_t *display = json_object_get(root, "display"); + const char *display_text = display ? json_string_value(display) : NULL; + guint64 user_id = 0; + char user_id_num[30], *user_id_str = NULL; + gboolean user_id_allocated = FALSE; + json_t *id = json_object_get(root, "id"); + if(id) { + if(!string_ids) { + user_id = json_integer_value(id); + g_snprintf(user_id_num, sizeof(user_id_num), "%"SCNu64, user_id); + user_id_str = user_id_num; + } else { + user_id_str = (char *)json_string_value(id); + } + if(g_hash_table_lookup(videoroom->participants, + string_ids ? (gpointer)user_id_str : (gpointer)&user_id) != NULL) { + /* User ID already taken */ + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + error_code = JANUS_VIDEOROOM_ERROR_ID_EXISTS; + JANUS_LOG(LOG_ERR, "User ID %s already exists\n", user_id_str); + g_snprintf(error_cause, 512, "User ID %s already exists", user_id_str); + goto prepare_response; + } + } + if(!string_ids) { + if(user_id == 0) { + /* Generate a random ID */ + while(user_id == 0) { + user_id = janus_random_uint64(); + if(g_hash_table_lookup(videoroom->participants, &user_id) != NULL) { + /* User ID already taken, try another one */ + user_id = 0; + } + } + g_snprintf(user_id_num, sizeof(user_id_num), "%"SCNu64, user_id); + user_id_str = user_id_num; + } + JANUS_LOG(LOG_VERB, " -- Participant ID: %"SCNu64"\n", user_id); + } else { + if(user_id_str == NULL) { + /* Generate a random ID */ + while(user_id_str == NULL) { + user_id_str = janus_random_uuid(); + if(g_hash_table_lookup(videoroom->participants, user_id_str) != NULL) { + /* User ID already taken, try another one */ + g_clear_pointer(&user_id_str, g_free); + } + } + user_id_allocated = TRUE; + } + JANUS_LOG(LOG_VERB, " -- Participant ID: %s\n", user_id_str); + } + /* Create the socket we'll need for this remote publisher */ + const char *mcast = json_string_value(json_object_get(root, "mcast")); + const char *iface = json_string_value(json_object_get(root, "iface")); + janus_network_address miface; + if(iface) { + struct ifaddrs *ifas = NULL; + if(getifaddrs(&ifas) == -1) { + JANUS_LOG(LOG_ERR, "Unable to acquire list of network devices/interfaces; remote publishers may not work as expected... %d (%s)\n", + errno, g_strerror(errno)); + } + if(janus_network_lookup_interface(ifas, iface, &miface) != 0) { + if(user_id_allocated) + g_free(user_id_str); + if(ifas) + freeifaddrs(ifas); + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "Invalid network interface configuration for remote publisher...\n"); + error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR; + g_snprintf(error_cause, 512, ifas ? "Invalid network interface configuration for remote publisher" : "Unable to query network device information"); + goto prepare_response; + } + if(ifas) + freeifaddrs(ifas); + } else { + janus_network_address_nullify(&miface); + } + uint16_t port = json_integer_value(json_object_get(root, "port")); + uint16_t rtcp_port = json_integer_value(json_object_get(root, "rtcp_port")); + char host[46]; + host[0] = '\0'; + int fd = janus_videoroom_create_fd(port, mcast ? inet_addr(mcast) : INADDR_ANY, &miface, host, sizeof(host)); + if(fd < 0) { + if(user_id_allocated) + g_free(user_id_str); + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "Could not open UDP socket for RTP stream for remote publisher, %d (%s)\n", + errno, g_strerror(errno)); + error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR; + g_snprintf(error_cause, 512, "Could not open UDP socket for RTP stream"); + goto prepare_response; + } + port = janus_videoroom_get_fd_port(fd); + int rtcp_fd = janus_videoroom_create_fd(rtcp_port, mcast ? inet_addr(mcast) : INADDR_ANY, &miface, host, sizeof(host)); + if(rtcp_fd < 0) { + close(fd); + if(user_id_allocated) + g_free(user_id_str); + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "Could not open UDP socket for remote publisher RTCP, %d (%s)\n", + errno, g_strerror(errno)); + error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR; + g_snprintf(error_cause, 512, "Could not open UDP socket for RTP stream"); + goto prepare_response; + } + rtcp_port = janus_videoroom_get_fd_port(rtcp_fd); + /* We create a dummy session first, that's not actually bound to anything */ + janus_videoroom_session *session = g_malloc0(sizeof(janus_videoroom_session)); + session->handle = NULL; + session->participant_type = janus_videoroom_p_type_publisher; + g_atomic_int_set(&session->started, 1); + janus_mutex_init(&session->mutex); + janus_refcount_init(&session->ref, janus_videoroom_session_free); + /* We actually create a publisher instance, which has no associated session but looks like it's publishing */ + janus_videoroom_publisher *publisher = g_malloc0(sizeof(janus_videoroom_publisher)); + publisher->session = session; + session->participant = publisher; + publisher->room_id = videoroom->room_id; + publisher->room_id_str = videoroom->room_id_str ? g_strdup(videoroom->room_id_str) : NULL; + publisher->room = videoroom; + janus_refcount_increase(&videoroom->ref); + publisher->user_id = user_id; + publisher->user_id_str = user_id_allocated ? user_id_str : g_strdup(user_id_str); + publisher->display = display_text ? g_strdup(display_text) : NULL; + publisher->acodec = JANUS_AUDIOCODEC_NONE; + publisher->vcodec = JANUS_VIDEOCODEC_NONE; + publisher->data_mindex = -1; + publisher->remote = TRUE; + publisher->remote_fd = fd; + publisher->remote_rtcp_fd = rtcp_fd; + pipe(publisher->pipefd); + janus_mutex_init(&publisher->subscribers_mutex); + janus_mutex_init(&publisher->own_subscriptions_mutex); + publisher->streams_byid = g_hash_table_new_full(NULL, NULL, + NULL, (GDestroyNotify)janus_videoroom_publisher_stream_destroy); + publisher->streams_bymid = g_hash_table_new_full(g_str_hash, g_str_equal, + (GDestroyNotify)g_free, (GDestroyNotify)janus_videoroom_publisher_stream_unref); + janus_mutex_init(&publisher->streams_mutex); + janus_mutex_init(&publisher->rtp_forwarders_mutex); + publisher->remote_recipients = g_hash_table_new_full(g_str_hash, g_str_equal, + (GDestroyNotify)g_free, (GDestroyNotify)janus_videoroom_remote_recipient_free); + publisher->rtp_forwarders = g_hash_table_new(NULL, NULL); + publisher->srtp_contexts = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, (GDestroyNotify)janus_videoroom_srtp_context_free); + publisher->udp_sock = -1; + g_atomic_int_set(&publisher->destroyed, 0); + janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free); + /* Create publisher streams for all the things that the remote publisher is sending */ + janus_videoroom_publisher_stream *ps = NULL; + int mindex = 0; + for(i=0; itype = mtype; + ps->mindex = mindex; + char mid[5]; + g_snprintf(mid, sizeof(mid), "%d", mindex); + ps->mid = g_strdup(mid); + ps->publisher = publisher; + janus_refcount_increase(&publisher->ref); /* Add a reference to the publisher */ + ps->description = desc ? g_strdup(desc) : NULL; + ps->active = TRUE; + ps->disabled = disabled; + ps->acodec = JANUS_AUDIOCODEC_NONE; + ps->vcodec = JANUS_VIDEOCODEC_NONE; + ps->min_delay = -1; + ps->max_delay = -1; + if(mtype == JANUS_VIDEOROOM_MEDIA_AUDIO) { + ps->acodec = janus_audiocodec_from_name(codec); + ps->pt = janus_audiocodec_pt(ps->acodec); + gboolean found = FALSE; + int j = 0; + for(j=0; j<5; j++) { + if(videoroom->acodec[j] == ps->acodec) { + found = TRUE; + break; + } + } + if(!found) { + /* Codec not allowed in this room */ + ps->disabled = TRUE; + } else { + ps->opusstereo = json_is_true(json_object_get(s, "stereo")); + ps->opusfec = json_is_true(json_object_get(s, "fec")); + ps->opusdtx = json_is_true(json_object_get(s, "dtx")); + } + int audio_level_extmap_id = json_integer_value(json_object_get(s, "audiolevel_ext_id")); + if(audio_level_extmap_id > 0) + ps->audio_level_extmap_id = audio_level_extmap_id; + } else if(mtype == JANUS_VIDEOROOM_MEDIA_VIDEO) { + ps->vcodec = janus_videocodec_from_name(codec); + ps->pt = janus_videocodec_pt(ps->vcodec); + gboolean found = FALSE; + int j = 0; + for(j=0; j<5; j++) { + if(videoroom->vcodec[j] == ps->vcodec) { + found = TRUE; + break; + } + } + if(!found) { + /* Codec not allowed in this room */ + ps->disabled = TRUE; + } else { + if(ps->vcodec == JANUS_VIDEOCODEC_H264) { + const char *h264_profile = json_string_value(json_object_get(s, "h264_profile")); + if(h264_profile) + ps->h264_profile = g_strdup(h264_profile); + else if(videoroom->h264_profile) + ps->h264_profile = g_strdup(videoroom->h264_profile); + } else if(ps->vcodec == JANUS_VIDEOCODEC_VP9) { + const char *vp9_profile = json_string_value(json_object_get(s, "vp9_profile")); + if(vp9_profile) + ps->vp9_profile = g_strdup(vp9_profile); + else if(videoroom->vp9_profile) + ps->vp9_profile = g_strdup(videoroom->vp9_profile); + } + ps->simulcast = json_is_true(json_object_get(s, "simulcast")); + ps->svc = json_is_true(json_object_get(s, "svc")); + if(ps->simulcast) { + ps->vssrc[0] = REMOTE_PUBLISHER_BASE_SSRC + (mindex*REMOTE_PUBLISHER_SSRC_STEP); + ps->vssrc[1] = REMOTE_PUBLISHER_BASE_SSRC + (mindex*REMOTE_PUBLISHER_SSRC_STEP) + 1; + ps->vssrc[2] = REMOTE_PUBLISHER_BASE_SSRC + (mindex*REMOTE_PUBLISHER_SSRC_STEP) + 2; + } + } + int video_orient_extmap_id = json_integer_value(json_object_get(s, "videoorient_ext_id")); + if(video_orient_extmap_id > 0) + ps->video_orient_extmap_id = video_orient_extmap_id; + int playout_delay_extmap_id = json_integer_value(json_object_get(s, "playoutdelay_ext_id")); + if(playout_delay_extmap_id > 0) + ps->playout_delay_extmap_id = playout_delay_extmap_id; + } else if(mtype == JANUS_VIDEOROOM_MEDIA_DATA) { + if(publisher->data_mindex == -1) { + publisher->data_mindex = ps->mindex; + } else { + JANUS_LOG(LOG_WARN, "Ignoring extra data channel m-line from remote publisher\n"); + } + } + g_atomic_int_set(&ps->destroyed, 0); + janus_refcount_init(&ps->ref, janus_videoroom_publisher_stream_free); + janus_refcount_increase(&ps->ref); /* This is for the id-indexed hashtable */ + janus_refcount_increase(&ps->ref); /* This is for the mid-indexed hashtable */ + janus_mutex_init(&ps->subscribers_mutex); + janus_mutex_init(&ps->rtp_forwarders_mutex); + ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_videoroom_rtp_forwarder_destroy); + publisher->streams = g_list_append(publisher->streams, ps); + g_hash_table_insert(publisher->streams_byid, GINT_TO_POINTER(ps->mindex), ps); + g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps); + mindex++; + } + /* Done, spawn a thread for this remote publisher */ + janus_refcount_increase(&publisher->ref); + janus_refcount_increase(&publisher->session->ref); + GError *error = NULL; + char tname[16]; + g_snprintf(tname, sizeof(tname), "vremote %s", publisher->user_id_str); + publisher->remote_thread = g_thread_try_new(tname, janus_videoroom_remote_publisher_thread, publisher, &error); + if(error != NULL) { + /* Something went wrong */ + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + janus_refcount_decrease(&publisher->ref); + janus_refcount_decrease(&publisher->session->ref); + janus_videoroom_publisher_destroy(publisher); + JANUS_LOG(LOG_ERR, "Could not spawn thread for remote publisher, %d (%s)\n", + errno, g_strerror(errno)); + error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR; + g_snprintf(error_cause, 512, "Could not spawn thread for remote publisher"); + goto prepare_response; + } + /* Done */ + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + response = json_object(); + json_object_set_new(response, "videoroom", json_string("success")); + json_object_set_new(response, "room", string_ids ? json_string(publisher->room_id_str) : json_integer(publisher->room_id)); + json_object_set_new(response, "id", string_ids ? json_string(publisher->user_id_str) : json_integer(publisher->user_id)); + /* Return connectivity information */ + if(strlen(host) > 0) + json_object_set_new(response, "ip", json_string(host)); + json_object_set_new(response, "port", json_integer(port)); + json_object_set_new(response, "rtcp_port", json_integer(rtcp_port)); + goto prepare_response; + } else if(!strcasecmp(request_text, "update_remote_publisher")) { + /* Update an existing remote publisher */ + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, room_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, roomstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, id_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, idstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + JANUS_VALIDATE_JSON_OBJECT(root, remote_publisher_update_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + if(error_code != 0) + goto prepare_response; + /* Validate the stream parameters too */ + json_t *streams = json_object_get(root, "streams"); + if(streams && json_array_size(streams) > 0) { + size_t i = 0; + for(i=0; iref); + janus_mutex_unlock(&rooms_mutex); + janus_mutex_lock(&videoroom->mutex); + json_t *id = json_object_get(root, "id"); + guint64 publisher_id = 0; + char publisher_id_num[30], *publisher_id_str = NULL; + if(!string_ids) { + publisher_id = json_integer_value(id); + g_snprintf(publisher_id_num, sizeof(publisher_id_num), "%"SCNu64, publisher_id); + publisher_id_str = publisher_id_num; + } else { + publisher_id_str = (char *)json_string_value(id); + } + janus_videoroom_publisher *publisher = g_hash_table_lookup(videoroom->participants, + string_ids ? (gpointer)publisher_id_str : (gpointer)&publisher_id); + if(publisher == NULL || !publisher->remote || g_atomic_int_get(&publisher->remote_leaving)) { + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "No such remote publisher (%s)\n", publisher_id_str); + error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED; + g_snprintf(error_cause, 512, "No such remote publisher (%s)", publisher_id_str); + goto prepare_response; + } + janus_refcount_increase(&publisher->ref); + /* Check if there's a new display, new streams, or changes to existing ones */ + json_t *display = json_object_get(root, "display"); + if(display) { + char *old_display = publisher->display; + char *new_display = g_strdup(json_string_value(display)); + publisher->display = new_display; + g_free(old_display); + } + janus_mutex_unlock(&videoroom->mutex); + janus_mutex_lock(&publisher->streams_mutex); + janus_videoroom_publisher_stream *ps = NULL; + int changes = FALSE; + size_t i = 0; + for(i=0; istreams_bymid, mid); + if(ps != NULL) { + /* Update an existing stream */ + JANUS_LOG(LOG_VERB, "Updating existing stream (mid %s)\n", mid); + const char *desc = json_string_value(json_object_get(s, "description")); + if(ps->description == NULL || (desc && strcmp(ps->description, desc))) { + g_free(ps->description); + ps->description = desc ? g_strdup(desc) : NULL; + changes = TRUE; + } + json_t *disabled = json_object_get(s, "disabled"); + if(disabled && ps->disabled != json_is_true(disabled)) { + ps->disabled = json_is_true(disabled); + changes = TRUE; + } + continue; + } + /* If we're here, we need to create a new stream */ + if(mindex - g_list_length(publisher->streams) > 1) { + JANUS_LOG(LOG_ERR, "Not adding new stream with mindex %d (missing indexes)\n", mindex); + continue; + } + const char *type = json_string_value(json_object_get(s, "type")); + janus_videoroom_media mtype = janus_videoroom_media_from_str(type); + const char *codec = json_string_value(json_object_get(s, "codec")); + const char *desc = json_string_value(json_object_get(s, "description")); + gboolean disabled = json_is_true(json_object_get(s, "disabled")); + /* Create a publisher stream */ + ps = g_malloc0(sizeof(janus_videoroom_publisher_stream)); + ps->type = mtype; + ps->mindex = mindex; + char pmid[5]; + g_snprintf(pmid, sizeof(pmid), "%d", mindex); + ps->mid = g_strdup(pmid); + ps->publisher = publisher; + janus_refcount_increase(&publisher->ref); /* Add a reference to the publisher */ + ps->description = desc ? g_strdup(desc) : NULL; + ps->active = TRUE; + ps->disabled = disabled; + ps->acodec = JANUS_AUDIOCODEC_NONE; + ps->vcodec = JANUS_VIDEOCODEC_NONE; + ps->min_delay = -1; + ps->max_delay = -1; + if(mtype == JANUS_VIDEOROOM_MEDIA_AUDIO) { + ps->acodec = janus_audiocodec_from_name(codec); + ps->pt = janus_audiocodec_pt(ps->acodec); + gboolean found = FALSE; + int j = 0; + for(j=0; j<5; j++) { + if(videoroom->acodec[j] == ps->acodec) { + found = TRUE; + break; + } + } + if(!found) { + /* Codec not allowed in this room */ + ps->disabled = TRUE; + } else { + ps->opusstereo = json_is_true(json_object_get(s, "stereo")); + ps->opusfec = json_is_true(json_object_get(s, "fec")); + ps->opusdtx = json_is_true(json_object_get(s, "dtx")); + } + int audio_level_extmap_id = json_integer_value(json_object_get(s, "audiolevel_ext_id")); + if(audio_level_extmap_id > 0) + ps->audio_level_extmap_id = audio_level_extmap_id; + } else if(mtype == JANUS_VIDEOROOM_MEDIA_VIDEO) { + ps->vcodec = janus_videocodec_from_name(codec); + ps->pt = janus_videocodec_pt(ps->vcodec); + gboolean found = FALSE; + int j = 0; + for(j=0; j<5; j++) { + if(videoroom->vcodec[j] == ps->vcodec) { + found = TRUE; + break; + } + } + if(!found) { + /* Codec not allowed in this room */ + ps->disabled = TRUE; + } else { + if(ps->vcodec == JANUS_VIDEOCODEC_H264) { + const char *h264_profile = json_string_value(json_object_get(s, "h264_profile")); + if(h264_profile) + ps->h264_profile = g_strdup(h264_profile); + else if(videoroom->h264_profile) + ps->h264_profile = g_strdup(videoroom->h264_profile); + } else if(ps->vcodec == JANUS_VIDEOCODEC_VP9) { + const char *vp9_profile = json_string_value(json_object_get(s, "vp9_profile")); + if(vp9_profile) + ps->vp9_profile = g_strdup(vp9_profile); + else if(videoroom->vp9_profile) + ps->vp9_profile = g_strdup(videoroom->vp9_profile); + } + ps->simulcast = json_is_true(json_object_get(s, "simulcast")); + ps->svc = json_is_true(json_object_get(s, "svc")); + if(ps->simulcast) { + ps->vssrc[0] = REMOTE_PUBLISHER_BASE_SSRC + (mindex*REMOTE_PUBLISHER_SSRC_STEP); + ps->vssrc[1] = REMOTE_PUBLISHER_BASE_SSRC + (mindex*REMOTE_PUBLISHER_SSRC_STEP) + 1; + ps->vssrc[2] = REMOTE_PUBLISHER_BASE_SSRC + (mindex*REMOTE_PUBLISHER_SSRC_STEP) + 2; + } + } + int video_orient_extmap_id = json_integer_value(json_object_get(s, "videoorient_ext_id")); + if(video_orient_extmap_id > 0) + ps->video_orient_extmap_id = video_orient_extmap_id; + int playout_delay_extmap_id = json_integer_value(json_object_get(s, "playoutdelay_ext_id")); + if(playout_delay_extmap_id > 0) + ps->playout_delay_extmap_id = playout_delay_extmap_id; + } else if(mtype == JANUS_VIDEOROOM_MEDIA_DATA) { + if(publisher->data_mindex == -1) { + publisher->data_mindex = ps->mindex; + } else { + JANUS_LOG(LOG_WARN, "Ignoring extra data channel m-line from remote publisher\n"); + } + } + g_atomic_int_set(&ps->destroyed, 0); + janus_refcount_init(&ps->ref, janus_videoroom_publisher_stream_free); + janus_refcount_increase(&ps->ref); /* This is for the id-indexed hashtable */ + janus_refcount_increase(&ps->ref); /* This is for the mid-indexed hashtable */ + janus_mutex_init(&ps->subscribers_mutex); + janus_mutex_init(&ps->rtp_forwarders_mutex); + ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_videoroom_rtp_forwarder_destroy); + publisher->streams = g_list_append(publisher->streams, ps); + g_hash_table_insert(publisher->streams_byid, GINT_TO_POINTER(ps->mindex), ps); + g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps); + changes = TRUE; + } + janus_mutex_unlock(&publisher->streams_mutex); + if(changes) { + /* Notify all other participants this publisher's media has changed */ + janus_videoroom_notify_about_publisher(publisher, TRUE); + } + /* Done */ + janus_refcount_decrease(&publisher->ref); + janus_refcount_decrease(&videoroom->ref); + response = json_object(); + json_object_set_new(response, "videoroom", json_string("success")); + goto prepare_response; + } else if(!strcasecmp(request_text, "remove_remote_publisher")) { + /* Get rid an existing remote publisher */ + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, room_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, roomstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, id_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, idstr_parameters, + error_code, error_cause, TRUE, + JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + janus_mutex_lock(&rooms_mutex); + janus_videoroom *videoroom = NULL; + error_code = janus_videoroom_access_room(root, TRUE, FALSE, &videoroom, error_cause, sizeof(error_cause)); + if(error_code != 0) { + janus_mutex_unlock(&rooms_mutex); + goto prepare_response; + } + janus_refcount_increase(&videoroom->ref); + janus_mutex_unlock(&rooms_mutex); + janus_mutex_lock(&videoroom->mutex); + json_t *id = json_object_get(root, "id"); + guint64 publisher_id = 0; + char publisher_id_num[30], *publisher_id_str = NULL; + if(!string_ids) { + publisher_id = json_integer_value(id); + g_snprintf(publisher_id_num, sizeof(publisher_id_num), "%"SCNu64, publisher_id); + publisher_id_str = publisher_id_num; + } else { + publisher_id_str = (char *)json_string_value(id); + } + janus_videoroom_publisher *publisher = g_hash_table_lookup(videoroom->participants, + string_ids ? (gpointer)publisher_id_str : (gpointer)&publisher_id); + if(publisher == NULL || !publisher->remote || !g_atomic_int_compare_and_exchange(&publisher->remote_leaving, 0, 1)) { + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + JANUS_LOG(LOG_ERR, "No such remote publisher (%s)\n", publisher_id_str); + error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED; + g_snprintf(error_cause, 512, "No such remote publisher (%s)", publisher_id_str); + goto prepare_response; + } + /* Mark the remote publisher as leaving, the thread will do the cleanup */ + g_atomic_int_set(&publisher->remote_leaving, 1); + /* Notify the thread that it's time to go */ + if(publisher->pipefd[1] > 0) { + int code = 1; + ssize_t res = 0; + do { + res = write(publisher->pipefd[1], &code, sizeof(int)); + } while(res == -1 && errno == EINTR); + } + janus_mutex_unlock(&videoroom->mutex); + janus_refcount_decrease(&videoroom->ref); + /* Done */ + response = json_object(); + json_object_set_new(response, "videoroom", json_string("success")); + goto prepare_response; + } else { + /* Not a request we recognize, don't do anything */ + return NULL; + } + +prepare_response: + { + if(error_code == 0 && !response) { + error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR; + g_snprintf(error_cause, 512, "Invalid response"); + } + if(error_code != 0) { + /* Prepare JSON error event */ + response = json_object(); + json_object_set_new(response, "videoroom", json_string("event")); + json_object_set_new(response, "error_code", json_integer(error_code)); + json_object_set_new(response, "error", json_string(error_cause)); + } + return response; + } + +} + +struct janus_plugin_result *janus_videoroom_handle_message(janus_plugin_session *handle, char *transaction, json_t *message, json_t *jsep) { + if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) + return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized", NULL); + + /* Pre-parse the message */ + int error_code = 0; + char error_cause[512]; + json_t *root = message; + json_t *response = NULL; + + janus_mutex_lock(&sessions_mutex); + janus_videoroom_session *session = janus_videoroom_lookup_session(handle); + if(!session) { + janus_mutex_unlock(&sessions_mutex); + JANUS_LOG(LOG_ERR, "No session associated with this handle...\n"); + error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR; + g_snprintf(error_cause, 512, "%s", "No session associated with this handle..."); + goto plugin_response; + } + /* Increase the reference counter for this session: we'll decrease it after we handle the message */ + janus_refcount_increase(&session->ref); janus_mutex_unlock(&sessions_mutex); if(g_atomic_int_get(&session->destroyed)) { JANUS_LOG(LOG_ERR, "Session has already been marked as destroyed...\n"); @@ -6908,6 +8194,7 @@ void janus_videoroom_setup_media(janus_plugin_session *handle) { janus_refcount_decrease(&session->ref); } +static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *session, janus_videoroom_publisher *participant, janus_plugin_rtp *pkt); void janus_videoroom_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *pkt) { if(handle == NULL || g_atomic_int_get(&handle->stopped) || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) return; @@ -6917,6 +8204,9 @@ void janus_videoroom_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp janus_videoroom_publisher *participant = janus_videoroom_session_get_publisher_nodebug(session); if(participant == NULL) return; + janus_videoroom_incoming_rtp_internal(session, participant, pkt); +} +static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *session, janus_videoroom_publisher *participant, janus_plugin_rtp *pkt) { if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams || participant->room == NULL) { janus_videoroom_publisher_dereference_nodebug(participant); return; @@ -6927,7 +8217,7 @@ void janus_videoroom_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp janus_mutex_lock(&participant->streams_mutex); janus_videoroom_publisher_stream *ps = g_hash_table_lookup(participant->streams_byid, GINT_TO_POINTER(pkt->mindex)); janus_mutex_unlock(&participant->streams_mutex); - if(ps == NULL) { + if(ps == NULL || ps->disabled) { /* No stream..? */ janus_videoroom_publisher_dereference_nodebug(participant); return; @@ -7188,7 +8478,11 @@ void janus_videoroom_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp participant->remb_startup--; } JANUS_LOG(LOG_VERB, "Sending REMB (%s, %"SCNu32")\n", participant->display, bitrate); - gateway->send_remb(handle, bitrate); + if(!participant->remote) { + gateway->send_remb(session->handle, bitrate); + } else { + /* TODO Forward back to the remote publisher */ + } if(participant->remb_startup == 0) participant->remb_latest = janus_get_monotonic_time(); } @@ -7282,17 +8576,25 @@ void janus_videoroom_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rt } } +static void janus_videoroom_incoming_data_internal(janus_videoroom_session *session, janus_videoroom_publisher *participant, janus_plugin_data *packet); void janus_videoroom_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet) { if(handle == NULL || g_atomic_int_get(&handle->stopped) || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) return; - if(packet->buffer == NULL || packet->length == 0) - return; janus_videoroom_session *session = (janus_videoroom_session *)handle->plugin_handle; if(!session || g_atomic_int_get(&session->destroyed) || session->participant_type != janus_videoroom_p_type_publisher) return; janus_videoroom_publisher *participant = janus_videoroom_session_get_publisher_nodebug(session); if(participant == NULL) return; + janus_videoroom_incoming_data_internal(session, participant, packet); +} +static void janus_videoroom_incoming_data_internal(janus_videoroom_session *session, janus_videoroom_publisher *participant, janus_plugin_data *packet) { + if(packet->buffer == NULL || packet->length == 0) + return; + if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams || participant->room == NULL) { + janus_videoroom_publisher_dereference_nodebug(participant); + return; + } if(g_atomic_int_get(&participant->destroyed) || participant->data_mindex < 0 || !participant->streams || participant->kicked) { janus_videoroom_publisher_dereference_nodebug(participant); return; @@ -7317,14 +8619,35 @@ void janus_videoroom_incoming_data(janus_plugin_session *handle, janus_plugin_da gpointer value; g_hash_table_iter_init(&iter, ps->rtp_forwarders); while(participant->udp_sock > 0 && g_hash_table_iter_next(&iter, NULL, &value)) { - janus_videoroom_rtp_forwarder* rtp_forward = (janus_videoroom_rtp_forwarder*)value; + janus_videoroom_rtp_forwarder *rtp_forward = (janus_videoroom_rtp_forwarder *)value; if(rtp_forward->is_data) { struct sockaddr *address = (rtp_forward->serv_addr.sin_family == AF_INET ? (struct sockaddr *)&rtp_forward->serv_addr : (struct sockaddr *)&rtp_forward->serv_addr6); size_t addrlen = (rtp_forward->serv_addr.sin_family == AF_INET ? sizeof(rtp_forward->serv_addr) : sizeof(rtp_forward->serv_addr6)); - if(sendto(participant->udp_sock, buf, len, 0, address, addrlen) < 0) { - JANUS_LOG(LOG_HUGE, "Error forwarding data packet for %s... %s (len=%d)...\n", - participant->display, g_strerror(errno), len); + /* Check if this is a regular RTP forwarder, or a publisher remotization */ + if(rtp_forward->remote_id == NULL) { + /* Regular forwarder, send the payload as it is */ + if(sendto(participant->udp_sock, buf, len, 0, address, addrlen) < 0) { + JANUS_LOG(LOG_HUGE, "Error forwarding data packet for %s... %s (len=%d)...\n", + participant->display, g_strerror(errno), len); + } + } else { + /* Remotization, prefix with a fake RTP header so that we can + * set an SRRC (and use the payload type for binary vs. text) */ + char buffer[1500]; + memset(buffer, 0, sizeof(buffer)); + int buflen = len + 12; + if(buflen > (int)sizeof(buffer)) /* FIXME We're going to truncate */ + buflen = sizeof(buffer); + janus_rtp_header *rtp = (janus_rtp_header *)buffer; + rtp->version = 2; + rtp->ssrc = htonl(rtp_forward->ssrc); + rtp->type = packet->binary ? 1 : 0; + memcpy(buffer + 12, buf, buflen - 12); + if(sendto(participant->udp_sock, buffer, buflen, 0, address, addrlen) < 0) { + JANUS_LOG(LOG_HUGE, "Error forwarding data packet for %s... %s (len=%d)...\n", + participant->display, g_strerror(errno), len); + } } } } @@ -7937,6 +9260,8 @@ static void *janus_videoroom_handler(void *data) { publisher->remb_startup = 4; publisher->remb_latest = 0; janus_mutex_init(&publisher->rtp_forwarders_mutex); + publisher->remote_recipients = g_hash_table_new_full(g_str_hash, g_str_equal, + (GDestroyNotify)g_free, (GDestroyNotify)janus_videoroom_remote_recipient_free); publisher->rtp_forwarders = g_hash_table_new(NULL, NULL); publisher->srtp_contexts = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, (GDestroyNotify)janus_videoroom_srtp_context_free); publisher->udp_sock = -1; @@ -10578,6 +11903,7 @@ static void *janus_videoroom_handler(void *data) { const char *audiocodec = NULL, *videocodec = NULL; char *vp9_profile = NULL, *h264_profile = NULL; GList *temp = offer->m_lines; + janus_mutex_lock(&participant->rtp_forwarders_mutex); janus_mutex_lock(&participant->streams_mutex); while(temp) { /* Which media are available? */ @@ -10892,6 +12218,58 @@ static void *janus_videoroom_handler(void *data) { participant->data_mindex = ps->mindex; g_hash_table_insert(participant->streams_byid, GINT_TO_POINTER(ps->mindex), ps); g_hash_table_insert(participant->streams_bymid, g_strdup(ps->mid), ps); + /* Also check if this publisher is remotized, and in case + * automatically create forwarders to the remote recipients */ + GHashTableIter iter; + gpointer value; + g_hash_table_iter_init(&iter, participant->remote_recipients); + while(g_hash_table_iter_next(&iter, NULL, &value)) { + janus_videoroom_remote_recipient *r = (janus_videoroom_remote_recipient *)value; + janus_videoroom_rtp_forwarder *f = NULL; + if(r) { + if(ps->type == JANUS_VIDEOROOM_MEDIA_AUDIO) { + /* Audio stream */ + f = janus_videoroom_rtp_forwarder_add_helper(participant, ps, + r->host, r->port, -1, 0, + (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP), + FALSE, 0, NULL, 0, FALSE, FALSE); + if(f != NULL) + f->remote_id = g_strdup(r->remote_id); + } else if(ps->type == JANUS_VIDEOROOM_MEDIA_VIDEO) { + /* Video stream */ + gboolean add_rtcp = (!r->rtcp_added && r->rtcp_port > 0); + f = janus_videoroom_rtp_forwarder_add_helper(participant, ps, + r->host, r->port, add_rtcp ? r->rtcp_port : -1, 0, + (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP), + FALSE, 0, NULL, 0, TRUE, FALSE); + if(f != NULL) + f->remote_id = g_strdup(r->remote_id); + if(add_rtcp) + r->rtcp_added = TRUE; + /* Check if there's simulcast substreams we need to relay too */ + if(ps->vssrc[1] || ps->rid[1]) { + f = janus_videoroom_rtp_forwarder_add_helper(participant, ps, + r->host, r->port, -1, 0, + (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP + 1), + FALSE, 0, NULL, 1, TRUE, FALSE); + if(f != NULL) + f->remote_id = g_strdup(r->remote_id); + } + if(ps->vssrc[2] || ps->rid[2]) { + f = janus_videoroom_rtp_forwarder_add_helper(participant, ps, + r->host, r->port, -1, 0, + (REMOTE_PUBLISHER_BASE_SSRC + ps->mindex*REMOTE_PUBLISHER_SSRC_STEP + 2), + FALSE, 0, NULL, 2, TRUE, FALSE); + if(f != NULL) + f->remote_id = g_strdup(r->remote_id); + } + } else { + /* Data stream */ + f = janus_videoroom_rtp_forwarder_add_helper(participant, ps, + r->host, r->port, 0, 0, 0, FALSE, 0, NULL, 0, FALSE, TRUE); + } + } + } } temp = temp->next; /* Add to the info we send back to the publisher */ @@ -10929,6 +12307,7 @@ static void *janus_videoroom_handler(void *data) { json_array_append_new(media, info); } janus_mutex_unlock(&participant->streams_mutex); + janus_mutex_unlock(&participant->rtp_forwarders_mutex); janus_sdp_destroy(offer); /* Replace the session name */ g_free(answer->s_name); @@ -11340,7 +12719,90 @@ static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_videoroom_rtp_forwa /* We only handle incoming video PLIs or FIR at the moment */ if(!janus_rtcp_has_fir(buffer, len) && !janus_rtcp_has_pli(buffer, len)) return; - janus_videoroom_reqpli((janus_videoroom_publisher_stream *)forward->source, "RTCP from forwarder"); + /* Check if this is a regular RTP forwarder, or a publisher remotization */ + if(forward->remote_id == NULL) { + /* Regular forwarder, send the PLI to the stream associated with it */ + janus_videoroom_reqpli((janus_videoroom_publisher_stream *)forward->source, "RTCP from forwarder"); + } else { + /* Remotization, check the SSRC in the request so that we know + * which publisher video stream we should send the PLI to */ + uint32_t ssrc = 0; + janus_rtcp_header *rtcp = (janus_rtcp_header *)buffer; + int pno = 0, total = len; + while(rtcp && ssrc == 0) { + if(!janus_rtcp_check_len(rtcp, total)) + return; /* Invalid RTCP packet */ + if(rtcp->version != 2) + return; /* Invalid RTCP packet */ + pno++; + switch(rtcp->type) { + case RTCP_PSFB: { + gint fmt = rtcp->rc; + if(fmt == 1) { + if(!janus_rtcp_check_fci(rtcp, total, 0)) + return; /* Invalid RTCP packet */ + /* TODO */ + janus_rtcp_fb *rtcpfb = (janus_rtcp_fb *)rtcp; + ssrc = ntohl(rtcpfb->media); + break; + } + } + default: + break; + } + /* Is this a compound packet? */ + int length = ntohs(rtcp->length); + if(length == 0) + break; + total -= length*4+4; + if(total <= 0) + break; + rtcp = (janus_rtcp_header *)((uint32_t*)rtcp + length + 1); + } + if(ssrc > 0) { + /* Look for the right publisher stream instance */ + char *remote_id = forward->remote_id; + janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)forward->source; + janus_videoroom_publisher *p = ps->publisher; + if(p == NULL || g_atomic_int_get(&p->destroyed)) + return; + janus_mutex_lock(&p->rtp_forwarders_mutex); + if(g_hash_table_size(p->rtp_forwarders) == 0) { + janus_mutex_unlock(&p->rtp_forwarders_mutex); + return; + } + gboolean found = FALSE; + GList *temp = p->streams; + while(temp && !found) { + ps = (janus_videoroom_publisher_stream *)temp->data; + janus_mutex_lock(&ps->rtp_forwarders_mutex); + if(g_hash_table_size(ps->rtp_forwarders) == 0) { + janus_mutex_unlock(&ps->rtp_forwarders_mutex); + temp = temp->next; + continue; + } + GHashTableIter iter_f; + gpointer key_f, value_f; + g_hash_table_iter_init(&iter_f, ps->rtp_forwarders); + while(g_hash_table_iter_next(&iter_f, &key_f, &value_f)) { + janus_videoroom_rtp_forwarder *rpv = value_f; + /* We only care about video forwarders used for the same remotization */ + if(!rpv->is_video || rpv->remote_id == NULL || strcasecmp(rpv->remote_id, remote_id)) + continue; + /* Check the SSRC */ + if(rpv->ssrc == ssrc) { + found = TRUE; + break; + } + } + janus_mutex_unlock(&ps->rtp_forwarders_mutex); + temp = temp->next; + } + janus_mutex_unlock(&p->rtp_forwarders_mutex); + if(found && ps) + janus_videoroom_reqpli(ps, "RTCP from remotized forwarder"); + } + } } } @@ -11352,3 +12814,500 @@ static void *janus_videoroom_rtp_forwarder_rtcp_thread(void *data) { JANUS_LOG(LOG_VERB, "Leaving RTCP thread for RTP forwarders...\n"); return NULL; } + +/* Helpers to create a listener filedescriptor */ +static int janus_videoroom_create_fd(int port, in_addr_t mcast, const janus_network_address *iface, char *host, size_t hostlen) { + janus_mutex_lock(&fd_mutex); + struct sockaddr_in address = { 0 }; + struct sockaddr_in6 address6 = { 0 }; + janus_network_address_string_buffer address_representation; + + uint16_t rtp_port_next = rtp_range_slider; /* Read global slider */ + uint16_t rtp_port_start = rtp_port_next; + gboolean use_range = (port == 0), rtp_port_wrap = FALSE; + + int fd = -1, family = 0; + while(1) { + family = 0; /* By default, we bind to both IPv4 and IPv6 */ + if(use_range && rtp_port_wrap && rtp_port_next >= rtp_port_start) { + /* Full range scanned */ + JANUS_LOG(LOG_ERR, "No ports available for RTP/RTCP in range: %u -- %u\n", + rtp_range_min, rtp_range_max); + break; + } + if(!use_range) { + /* Use the port specified in the arguments */ + if(IN_MULTICAST(ntohl(mcast))) { + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if(fd < 0) { + JANUS_LOG(LOG_ERR, "Cannot create socket for remote publisher... %d (%s)\n", errno, g_strerror(errno)); + break; + } +#ifdef IP_MULTICAST_ALL + int mc_all = 0; + if((setsockopt(fd, IPPROTO_IP, IP_MULTICAST_ALL, (void*) &mc_all, sizeof(mc_all))) < 0) { + JANUS_LOG(LOG_ERR, "setsockopt IP_MULTICAST_ALL failed... %d (%s)\n", + errno, g_strerror(errno)); + close(fd); + janus_mutex_unlock(&fd_mutex); + return -1; + } +#endif + struct ip_mreq mreq; + memset(&mreq, '\0', sizeof(mreq)); + mreq.imr_multiaddr.s_addr = mcast; + if(!janus_network_address_is_null(iface)) { + family = AF_INET; + if(iface->family == AF_INET) { + mreq.imr_interface = iface->ipv4; + (void) janus_network_address_to_string_buffer(iface, &address_representation); /* This is OK: if we get here iface must be non-NULL */ + char *maddr = inet_ntoa(mreq.imr_multiaddr); + JANUS_LOG(LOG_VERB, "Remote publisher using interface address: %s (%s)\n", + janus_network_address_string_from_buffer(&address_representation), maddr); + if(maddr && host && hostlen > 0) + g_strlcpy(host, maddr, hostlen); + } else { + JANUS_LOG(LOG_ERR, "Invalid multicast address type (only IPv4 multicast is currently supported by this plugin)\n"); + close(fd); + janus_mutex_unlock(&fd_mutex); + return -1; + } + } else { + JANUS_LOG(LOG_WARN, "No multicast interface: this may not work as expected if you have multiple network devices (NICs)\n"); + } + if(setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) == -1) { + JANUS_LOG(LOG_ERR, "IP_ADD_MEMBERSHIP failed... %d (%s)\n", errno, g_strerror(errno)); + close(fd); + janus_mutex_unlock(&fd_mutex); + return -1; + } + } + } else { + /* Pick a port in the configured range */ + port = rtp_port_next; + if((uint32_t)(rtp_port_next) < rtp_range_max) { + rtp_port_next++; + } else { + rtp_port_next = rtp_range_min; + rtp_port_wrap = TRUE; + } + } + address.sin_family = AF_INET; + address.sin_port = htons(port); + address.sin_addr.s_addr = INADDR_ANY; + address6.sin6_family = AF_INET6; + address6.sin6_port = htons(port); + address6.sin6_addr = in6addr_any; + /* If this is multicast, allow a re-use of the same ports (different groups may be used) */ + if(!use_range && IN_MULTICAST(ntohl(mcast))) { + int reuse = 1; + if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) { + JANUS_LOG(LOG_ERR, "setsockopt SO_REUSEADDR failed... %d (%s)\n", errno, g_strerror(errno)); + close(fd); + janus_mutex_unlock(&fd_mutex); + return -1; + } + /* TODO IPv6 */ + family = AF_INET; + address.sin_addr.s_addr = mcast; + } else { + if(!IN_MULTICAST(ntohl(mcast)) && !janus_network_address_is_null(iface)) { + family = iface->family; + if(iface->family == AF_INET) { + address.sin_addr = iface->ipv4; + (void) janus_network_address_to_string_buffer(iface, &address_representation); /* This is OK: if we get here iface must be non-NULL */ + JANUS_LOG(LOG_VERB, "Remote publisher restricted to interface address: %s\n", + janus_network_address_string_from_buffer(&address_representation)); + if(host && hostlen > 0) + g_strlcpy(host, janus_network_address_string_from_buffer(&address_representation), hostlen); + } else if(iface->family == AF_INET6) { + memcpy(&address6.sin6_addr, &iface->ipv6, sizeof(iface->ipv6)); + (void) janus_network_address_to_string_buffer(iface, &address_representation); /* This is OK: if we get here iface must be non-NULL */ + JANUS_LOG(LOG_VERB, "Remote publisher restricted to interface address: %s\n", + janus_network_address_string_from_buffer(&address_representation)); + if(host && hostlen > 0) + g_strlcpy(host, janus_network_address_string_from_buffer(&address_representation), hostlen); + } else { + JANUS_LOG(LOG_ERR, "Invalid address/restriction type\n"); + continue; + } + } + } + /* Bind to the specified port */ + if(fd == -1) { + fd = socket(family == AF_INET ? AF_INET : AF_INET6, SOCK_DGRAM, IPPROTO_UDP); + int v6only = 0; + if(fd < 0) { + JANUS_LOG(LOG_ERR, "Cannot create socket for remote publisher... %d (%s)\n", errno, g_strerror(errno)); + break; + } + if(family != AF_INET && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != 0) { + JANUS_LOG(LOG_ERR, "setsockopt on socket failed... %d (%s)\n", errno, g_strerror(errno)); + break; + } + } + size_t addrlen = (family == AF_INET ? sizeof(address) : sizeof(address6)); + if(bind(fd, (family == AF_INET ? (struct sockaddr *)&address : (struct sockaddr *)&address6), addrlen) < 0) { + close(fd); + fd = -1; + if(!use_range) { + JANUS_LOG(LOG_ERR, "Bind failed (port %d)... %d (%s)\n", port, errno, g_strerror(errno)); + break; + } + } else { + if(use_range) + rtp_range_slider = port; /* Update global slider */ + break; + } + } + janus_mutex_unlock(&fd_mutex); + return fd; +} +/* Helper to return fd port */ +static int janus_videoroom_get_fd_port(int fd) { + struct sockaddr_in6 server = { 0 }; + socklen_t len = sizeof(server); + if(getsockname(fd, (struct sockaddr *)&server, &len) == -1) { + return -1; + } + return ntohs(server.sin6_port); +} +/* Thread responsible for a specific remote publisher */ +static void *janus_videoroom_remote_publisher_thread(void *user_data) { + janus_videoroom_publisher *publisher = (janus_videoroom_publisher *)user_data; + if(publisher == NULL) { + JANUS_LOG(LOG_ERR, "Invalid publisher instance\n"); + g_thread_unref(g_thread_self()); + return NULL; + } + JANUS_LOG(LOG_VERB, "[%s/%s] Joining remote publisher thread...\n", + publisher->room->room_id_str, publisher->user_id_str); + + /* File descriptors */ + socklen_t addrlen; + struct sockaddr_storage remote = { 0 }; + int resfd = 0, bytes = 0; + struct pollfd fds[3]; + int pipe_fd = publisher->pipefd[0]; + char buffer[1500]; + memset(buffer, 0, 1500); + if(pipe_fd == -1) { + /* If the pipe file descriptor doesn't exist, it means we're done already, + * and/or we may never be notified about sessions being closed, so give up */ + JANUS_LOG(LOG_WARN, "[%s/%s] Leaving remote publisher thread, no pipe file descriptor...\n", + publisher->room->room_id_str, publisher->user_id_str); + janus_videoroom_publisher_destroy(publisher); + janus_refcount_decrease(&publisher->session->ref); + janus_refcount_decrease(&publisher->ref); + g_thread_unref(g_thread_self()); + return NULL; + } + /* RTP stuff */ + janus_rtp_header *rtp = NULL; + uint32_t ssrc = 0, diff = 0; + int mindex = 0, vindex = 0; + janus_videoroom_publisher_stream *ps = NULL; + janus_plugin_rtp pkt = { 0 }; + janus_plugin_data data = { 0 }; + GList *temp = NULL; + + /* As the first thing, we add the remote publisher to the list */ + janus_refcount_increase(&publisher->ref); + janus_videoroom *videoroom = publisher->room; + janus_refcount_increase(&videoroom->ref); + g_hash_table_insert(videoroom->participants, + string_ids ? (gpointer)g_strdup(publisher->user_id_str) : (gpointer)janus_uint64_dup(publisher->user_id), + publisher); + /* Let's also notify all other participants that the publisher is here */ + janus_videoroom_notify_about_publisher(publisher, FALSE); + + /* Loop */ + int num = 0, i = 0; + while(!g_atomic_int_get(&publisher->remote_leaving) && !g_atomic_int_get(&publisher->destroyed)) { + /* Prepare poll */ + num = 0; + if(publisher->remote_fd != -1) { + fds[num].fd = publisher->remote_fd; + fds[num].events = POLLIN; + fds[num].revents = 0; + num++; + } + if(publisher->remote_rtcp_fd != -1) { + fds[num].fd = publisher->remote_rtcp_fd; + fds[num].events = POLLIN; + fds[num].revents = 0; + num++; + } + pipe_fd = publisher->pipefd[0]; + if(pipe_fd == -1) { + /* Pipe was closed? Means the call is over */ + break; + } + fds[num].fd = pipe_fd; + fds[num].events = POLLIN; + fds[num].revents = 0; + num++; + /* Check if we need to send any PLI */ + janus_mutex_lock(&publisher->streams_mutex); + temp = publisher->streams; + while(temp) { + ps = (janus_videoroom_publisher_stream *)temp->data; + /* Any PLI and/or REMB we should send back to the source? */ + if(ps->type == JANUS_VIDEOROOM_MEDIA_VIDEO && g_atomic_int_get(&ps->need_pli)) + janus_videoroom_rtcp_pli_send(ps); + temp = temp->next; + } + janus_mutex_unlock(&publisher->streams_mutex); + /* Wait for some data */ + resfd = poll(fds, num, 1000); + if(resfd < 0) { + if(errno == EINTR) { + JANUS_LOG(LOG_HUGE, "[%s/%s] Got an EINTR (%s), ignoring...\n", + videoroom->room_id_str, publisher->user_id_str, g_strerror(errno)); + continue; + } + JANUS_LOG(LOG_ERR, "[%s/%s] Error polling...\n", videoroom->room_id_str, publisher->user_id_str); + JANUS_LOG(LOG_ERR, "[%s/%s] -- %d (%s)\n", + videoroom->room_id_str, publisher->user_id_str, errno, g_strerror(errno)); + break; + } else if(resfd == 0) { + /* No data, keep going */ + continue; + } + if(g_atomic_int_get(&publisher->remote_leaving) || g_atomic_int_get(&publisher->destroyed)) + break; + for(i=0; iroom_id_str, publisher->user_id_str, + fds[i].revents & POLLERR ? "POLLERR" : "POLLHUP", errno, g_strerror(errno)); + break; + } else if(fds[i].revents & POLLIN) { + if(pipe_fd != -1 && fds[i].fd == pipe_fd) { + /* Poll interrupted for a reason, go on */ + int code = 0; + (void)read(pipe_fd, &code, sizeof(int)); + break; + } else if(fds[i].fd == publisher->remote_rtcp_fd) { + /* Got Something on the RTCP socket, we only use this for latching */ + addrlen = sizeof(remote); + bytes = recvfrom(fds[i].fd, buffer, 1500, 0, (struct sockaddr *)&remote, &addrlen); + if(bytes < 0 || (!janus_is_rtp(buffer, bytes) && !janus_is_rtcp(buffer, bytes))) { + /* For latching we need an RTP or RTCP packet */ + continue; + } + memcpy(&publisher->rtcp_addr, &remote, addrlen); + continue; + } + /* Got an RTP/RTCP packet */ + addrlen = sizeof(remote); + bytes = recvfrom(fds[i].fd, buffer, 1500, 0, (struct sockaddr *)&remote, &addrlen); + if(bytes < 0) { + /* Failed to read? */ + continue; + } + /* Handle packet: check SSRC and do relay_rtp accordingly */ + if(!janus_is_rtp(buffer, bytes)) { + /* Not RTP, drop the packet */ + continue; + } + rtp = (janus_rtp_header *)buffer; + ssrc = ntohl(rtp->ssrc); + if(ssrc < REMOTE_PUBLISHER_BASE_SSRC) { + /* Can't be one of the SSRCs we're waiting for, innore */ + JANUS_LOG(LOG_WARN, "[%s/%s] Invalid SSRC (%"SCNu32")\n", + videoroom->room_id_str, publisher->user_id_str, ssrc); + continue; + } + diff = ssrc - REMOTE_PUBLISHER_BASE_SSRC; + mindex = diff/REMOTE_PUBLISHER_SSRC_STEP; + vindex = diff - (mindex*REMOTE_PUBLISHER_SSRC_STEP); + janus_mutex_lock(&publisher->streams_mutex); + ps = g_hash_table_lookup(publisher->streams_byid, GINT_TO_POINTER(mindex)); + if(ps == NULL) { + janus_mutex_unlock(&publisher->streams_mutex); + JANUS_LOG(LOG_WARN, "[%s/%s] Invalid mindex %d\n", + videoroom->room_id_str, publisher->user_id_str, mindex); + continue; + } + if((!ps->simulcast && vindex > 0) || vindex > 2) { + janus_mutex_unlock(&publisher->streams_mutex); + JANUS_LOG(LOG_WARN, "[%s/%s] Invalid substream %d\n", + videoroom->room_id_str, publisher->user_id_str, vindex); + continue; + } + /* Check if this is an actual RTP packet, or an + * envelope created to relay data channels */ + if(ps->type == JANUS_VIDEOROOM_MEDIA_DATA) { + /* Handle as data channel, stripping the RTP header */ + janus_refcount_increase_nodebug(&publisher->ref); + janus_mutex_unlock(&publisher->streams_mutex); + data.label = NULL; + data.protocol = NULL; + data.binary = rtp->type ? TRUE : FALSE; + data.buffer = buffer + 12; + data.length = bytes - 12; + /* Now handle the packet as if coming from a regular publisher */ + janus_videoroom_incoming_data_internal(publisher->session, publisher, &data); + continue; + } + /* Prepare the RTP packet */ + pkt.mindex = mindex; + pkt.video = (ps->type == JANUS_VIDEOROOM_MEDIA_VIDEO); + pkt.buffer = buffer; + pkt.length = bytes; + janus_plugin_rtp_extensions_reset(&pkt.extensions); + janus_refcount_increase_nodebug(&publisher->ref); + janus_mutex_unlock(&publisher->streams_mutex); + /* Parse RTP extensions before relaying the packet */ + if(!pkt.video && ps->audio_level_extmap_id > 0) { + gboolean vad = FALSE; + int level = -1; + if(janus_rtp_header_extension_parse_audio_level(buffer, bytes, + ps->audio_level_extmap_id, &vad, &level) == 0) { + pkt.extensions.audio_level = level; + pkt.extensions.audio_level_vad = vad; + } + } + if(pkt.video && ps->video_orient_extmap_id > 0) { + gboolean c = FALSE, f = FALSE, r1 = FALSE, r0 = FALSE; + if(janus_rtp_header_extension_parse_video_orientation(buffer, bytes, + ps->video_orient_extmap_id, &c, &f, &r1, &r0) == 0) { + pkt.extensions.video_rotation = 0; + if(r1 && r0) + pkt.extensions.video_rotation = 270; + else if(r1) + pkt.extensions.video_rotation = 180; + else if(r0) + pkt.extensions.video_rotation = 90; + pkt.extensions.video_back_camera = c; + pkt.extensions.video_flipped = f; + } + } + if(pkt.video && ps->playout_delay_extmap_id > 0) { + uint16_t min = 0, max = 0; + if(janus_rtp_header_extension_parse_playout_delay(buffer, bytes, + ps->playout_delay_extmap_id, &min, &max) == 0) { + pkt.extensions.min_delay = min; + pkt.extensions.max_delay = max; + } + } + /* Now handle the packet as if coming from a regular publisher */ + janus_refcount_increase_nodebug(&publisher->ref); + janus_videoroom_incoming_rtp_internal(publisher->session, publisher, &pkt); + } + } + } + /* If we got here, the remote publisher has been removed from the + * room: let's notify all other publishers in the room */ + janus_mutex_lock(&publisher->rec_mutex); + g_free(publisher->recording_base); + publisher->recording_base = NULL; + janus_videoroom_recorder_close(publisher); + janus_mutex_unlock(&publisher->rec_mutex); + publisher->acodec = JANUS_AUDIOCODEC_NONE; + publisher->vcodec = JANUS_VIDEOCODEC_NONE; + publisher->firefox = FALSE; + publisher->e2ee = FALSE; + /* Get rid of streams */ + janus_mutex_lock(&publisher->streams_mutex); + GList *subscribers = NULL; + temp = publisher->streams; + while(temp) { + janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; + /* Close all subscriptions to this stream */ + janus_mutex_lock(&ps->subscribers_mutex); + GSList *temp2 = ps->subscribers; + while(temp2) { + janus_videoroom_subscriber_stream *ss = (janus_videoroom_subscriber_stream *)temp2->data; + temp2 = temp2->next; + if(ss) { + /* Take note of the subscriber, so that we can send an updated offer */ + if(ss->type != JANUS_VIDEOROOM_MEDIA_DATA && g_list_find(subscribers, ss->subscriber) == NULL) { + janus_refcount_increase(&ss->subscriber->ref); + janus_refcount_increase(&ss->subscriber->session->ref); + subscribers = g_list_append(subscribers, ss->subscriber); + } + /* Remove the subscription (turns the m-line to inactive) */ + janus_videoroom_subscriber_stream_remove(ss, ps, FALSE); + } + } + g_slist_free(ps->subscribers); + ps->subscribers = NULL; + int i=0; + for(i=0; i<3; i++) { + ps->vssrc[i] = 0; + g_free(ps->rid[i]); + ps->rid[i] = NULL; + } + ps->rid_extmap_id = 0; + g_free(ps->fmtp); + ps->fmtp = NULL; + janus_mutex_unlock(&ps->subscribers_mutex); + temp = temp->next; + } + /* Any subscriber session to update? */ + if(subscribers != NULL) { + temp = subscribers; + while(temp) { + janus_videoroom_subscriber *subscriber = (janus_videoroom_subscriber *)temp->data; + /* Send (or schedule) a new offer */ + janus_mutex_lock(&subscriber->streams_mutex); + if(!g_atomic_int_get(&subscriber->answered)) { + /* We're still waiting for an answer to a previous offer, postpone this */ + g_atomic_int_set(&subscriber->pending_offer, 1); + janus_mutex_unlock(&subscriber->streams_mutex); + } else { + json_t *event = json_object(); + json_object_set_new(event, "videoroom", json_string("updated")); + json_object_set_new(event, "room", string_ids ? + json_string(subscriber->room_id_str) : json_integer(subscriber->room_id)); + json_t *media = janus_videoroom_subscriber_streams_summary(subscriber, FALSE, NULL); + json_t *media_event = NULL; + if(notify_events && gateway->events_is_enabled()) + media_event = json_deep_copy(media); + json_object_set_new(event, "streams", media); + /* Generate a new offer */ + json_t *jsep = janus_videoroom_subscriber_offer(subscriber); + janus_mutex_unlock(&subscriber->streams_mutex); + /* How long will the Janus core take to push the event? */ + gint64 start = janus_get_monotonic_time(); + int res = gateway->push_event(subscriber->session->handle, &janus_videoroom_plugin, NULL, event, jsep); + JANUS_LOG(LOG_VERB, " >> Pushing event: %d (took %"SCNu64" us)\n", res, janus_get_monotonic_time()-start); + json_decref(event); + json_decref(jsep); + /* Also notify event handlers */ + if(notify_events && gateway->events_is_enabled()) { + json_t *info = json_object(); + json_object_set_new(info, "event", json_string("updated")); + json_object_set_new(info, "room", string_ids ? + json_string(subscriber->room_id_str) : json_integer(subscriber->room_id)); + json_object_set_new(info, "streams", media_event); + json_object_set_new(info, "private_id", json_integer(subscriber->pvt_id)); + gateway->notify_event(&janus_videoroom_plugin, NULL, info); + } + } + janus_refcount_decrease(&subscriber->session->ref); + janus_refcount_decrease(&subscriber->ref); + temp = temp->next; + } + } + g_list_free(subscribers); + /* Free streams */ + g_list_free(publisher->streams); + publisher->streams = NULL; + g_hash_table_remove_all(publisher->streams_byid); + g_hash_table_remove_all(publisher->streams_bymid); + janus_mutex_unlock(&publisher->streams_mutex); + janus_videoroom_leave_or_unpublish(publisher, TRUE, FALSE); + janus_videoroom_publisher_destroy(publisher); + /* Done */ + JANUS_LOG(LOG_VERB, "[%s/%s] Leaving remote publisher thread...\n", + videoroom->room_id_str, publisher->user_id_str); + janus_refcount_decrease(&videoroom->ref); + janus_refcount_decrease(&publisher->session->ref); + janus_refcount_decrease(&publisher->ref); + g_thread_unref(g_thread_self()); + return NULL; +}