Skip to content

Commit

Permalink
Adding support for NG trace to Homer
Browse files Browse the repository at this point in the history
Support is desired for Kamailio/Rtpengine traffic via UDP.
Adding homer-enable-rtcp-stats and homer-enable-ng config params to
separately control sending to Homer each traffic type. By default rtcp
is `on` when homer parameter is configured. NG is by default `off`.
  • Loading branch information
lbalaceanu committed Mar 4, 2024
1 parent 527225a commit d30f05b
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 14 deletions.
124 changes: 121 additions & 3 deletions daemon/control_ng.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "statistics.h"
#include "streambuf.h"
#include "str.h"
#include "homer.h"
#include "tcp_listener.h"
#include "main.h"

Expand Down Expand Up @@ -62,6 +63,104 @@ const char *ng_command_strings_short[NGC_COUNT] = {
"Pub", "SubReq", "SubAns", "Unsub",
};

typedef struct ng_ctx {
str callid;
str command;
str cookie;
bool should_trace;
const endpoint_t *sin_ep;
const endpoint_t *local_ep;
} ng_ctx;

#define CH(func, ...) do { \
if (rtpe_config.homer_ng) \
func( __VA_ARGS__); \
} while (0)

static GString *create_homer_msg(str *cookie, str *data) {
GString *msg = g_string_sized_new(cookie->len + 1 + data->len);
g_string_append_printf(msg, "%.*s %.*s", STR_FMT(cookie), STR_FMT(data));
return msg;
}

static bool should_trace_msg(str *pcmd) {
switch (__csh_lookup(pcmd)) {
case CSH_LOOKUP("ping"):
return false;
default:
return true;
}
}

static void homer_fill_values(ng_ctx *hctx, str *callid, str *cmd) {
if (hctx) {
hctx->command = *cmd;
hctx->callid = *callid;
}
}

static void homer_extract_values(ng_ctx *hctx, str* data, struct obj *ref, struct ng_buffer **ngbufp) {
bencode_item_t *dict, *resp;

struct ng_buffer *ngbuf = *ngbufp = ng_buffer_new(ref);

resp = bencode_dictionary(&ngbuf->buffer);
assert(resp != NULL);

//"Invalid data (no payload)";
if (data->len <= 0)
goto end_function;

if (data->s[0] == 'd') {
dict = bencode_decode_expect_str(&ngbuf->buffer, data, BENCODE_DICTIONARY);
//"Could not decode bencode dictionary";
if (!dict)
goto end_function;
}
else if (data->s[0] == '{') {
JsonParser *json = json_parser_new();
bencode_buffer_destroy_add(&ngbuf->buffer, g_object_unref, json);
//"Failed to parse JSON document";
if (!json_parser_load_from_data(json, data->s, data->len, NULL))
goto end_function;
dict = bencode_convert_json(&ngbuf->buffer, json);
//"Could not decode bencode dictionary";
if (!dict || dict->type != BENCODE_DICTIONARY)
goto end_function;
}
else {
//"Invalid NG data format";
goto end_function;
}

bencode_dictionary_get_str(dict, "command", &hctx->command);
// "Dictionary contains no key \"command\"";
if (!hctx->command.s)
goto end_function;

bencode_dictionary_get_str(dict, "call-id", &hctx->callid);
end_function:
return;
}

static void homer_trace_msg_in(ng_ctx *hctx, str *data) {
hctx->should_trace = should_trace_msg(&hctx->command);
if (hctx->should_trace) {
struct timeval tv;
gettimeofday(&tv, NULL);
GString *msg = create_homer_msg(&hctx->cookie, data);
homer_send(msg, &hctx->callid, hctx->sin_ep, hctx->local_ep, &tv, PROTO_NG);
}
}

static void homer_trace_msg_out(ng_ctx *hctx, str *data) {
if (hctx->should_trace) {
struct timeval tv;
gettimeofday(&tv, NULL);
GString *msg = create_homer_msg(&hctx->cookie, data);
homer_send(msg, &hctx->callid, hctx->local_ep, hctx->sin_ep, &tv, PROTO_NG);
}
}

static void pretty_print(bencode_item_t *el, GString *s) {
bencode_item_t *chld;
Expand Down Expand Up @@ -148,7 +247,7 @@ ng_buffer *ng_buffer_new(struct obj *ref) {
return ngbuf;
}

static void control_ng_process_payload(str *reply, str *data, const endpoint_t *sin, char *addr, struct obj *ref,
static void control_ng_process_payload(ng_ctx *hctx, str *reply, str *data, const endpoint_t *sin, char *addr, struct obj *ref,
struct ng_buffer **ngbufp)
{
bencode_item_t *dict, *resp;
Expand Down Expand Up @@ -203,6 +302,9 @@ static void control_ng_process_payload(str *reply, str *data, const endpoint_t *

ilogs(control, LOG_INFO, "Received command '"STR_FORMAT"' from %s", STR_FMT(&cmd), addr);

CH(homer_fill_values, hctx, &callid, &cmd);
CH(homer_trace_msg_in, hctx, data);

if (get_log_level(control) >= LOG_DEBUG) {
log_str = g_string_sized_new(256);
g_string_append_printf(log_str, "Dump for '"STR_FORMAT"' from %s: %s", STR_FMT(&cmd), addr,
Expand Down Expand Up @@ -385,6 +487,7 @@ static void control_ng_process_payload(str *reply, str *data, const endpoint_t *

release_closed_sockets();
log_info_pop_until(&callid);
CH(homer_trace_msg_out ,hctx, reply);
}

int control_ng_process(str *buf, const endpoint_t *sin, char *addr, const sockaddr_t *local,
Expand All @@ -407,14 +510,29 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr, const sockad
str *cached = cookie_cache_lookup(&ng_cookie_cache, &cookie);
if (cached) {
ilogs(control, LOG_INFO, "Detected command from %s as a duplicate", addr);

ng_ctx hctx = {.sin_ep = sin,
.local_ep = p1 ? &(((socket_t*)p1)->local) : NULL,
.cookie = cookie};
g_autoptr(ng_buffer) ngbuf = NULL;
CH(homer_extract_values, &hctx, &data, ref, &ngbuf);
CH(homer_trace_msg_in, &hctx, &data);
cb(&cookie, cached, sin, local, p1);
CH(homer_trace_msg_out, &hctx, cached);
free(cached);
return 0;
}

str reply;
g_autoptr(ng_buffer) ngbuf = NULL;
control_ng_process_payload(&reply, &data, sin, addr, ref, &ngbuf);

ng_ctx hctx = {.sin_ep = sin,
.local_ep = p1 ? &(((socket_t*)p1)->local) : NULL,
.cookie = cookie};

control_ng_process_payload(rtpe_config.homer_ng ? &hctx : NULL,
&reply, &data, sin, addr, ref, &ngbuf);

cb(&cookie, &reply, sin, local, p1);
cookie_cache_insert(&ng_cookie_cache, &cookie, &reply);

Expand All @@ -428,7 +546,7 @@ int control_ng_process_plain(str *data, const endpoint_t *sin, char *addr, const
g_autoptr(ng_buffer) ngbuf = NULL;

str reply;
control_ng_process_payload(&reply, data, sin, addr, ref, &ngbuf);
control_ng_process_payload(NULL, &reply, data, sin, addr, ref, &ngbuf);
cb(NULL, &reply, sin, local, p1);

return 0;
Expand Down
12 changes: 5 additions & 7 deletions daemon/homer.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static struct homer_sender *main_homer_sender;


static int send_hepv3 (GString *s, const str *id, int, const endpoint_t *src, const endpoint_t *dst,
const struct timeval *);
const struct timeval *, int hep_capture_proto);

// state handlers
static int __established(struct homer_sender *hs);
Expand Down Expand Up @@ -203,7 +203,7 @@ void homer_sender_init(const endpoint_t *ep, int protocol, int capture_id) {

// takes over the GString
int homer_send(GString *s, const str *id, const endpoint_t *src,
const endpoint_t *dst, const struct timeval *tv)
const endpoint_t *dst, const struct timeval *tv, int hep_capture_proto)
{
if (!main_homer_sender)
goto out;
Expand All @@ -214,7 +214,7 @@ int homer_send(GString *s, const str *id, const endpoint_t *src,

ilog(LOG_DEBUG, "JSON to send to Homer: '"STR_FORMAT"'", G_STR_FMT(s));

if (send_hepv3(s, id, main_homer_sender->capture_id, src, dst, tv))
if (send_hepv3(s, id, main_homer_sender->capture_id, src, dst, tv, hep_capture_proto))
goto out;

mutex_lock(&main_homer_sender->lock);
Expand Down Expand Up @@ -318,11 +318,9 @@ struct hep_generic {

typedef struct hep_generic hep_generic_t;

#define PROTO_RTCP_JSON 0x05

// modifies the GString in place
static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t *src, const endpoint_t *dst,
const struct timeval *tv)
const struct timeval *tv, int hep_capture_proto)
{

struct hep_generic *hg=NULL;
Expand Down Expand Up @@ -417,7 +415,7 @@ static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t
/* Protocol TYPE */
hg->proto_t.chunk.vendor_id = htons(0x0000);
hg->proto_t.chunk.type_id = htons(0x000b);
hg->proto_t.data = PROTO_RTCP_JSON;
hg->proto_t.data = hep_capture_proto;
hg->proto_t.chunk.length = htons(sizeof(hg->proto_t));

/* Capture ID */
Expand Down
15 changes: 15 additions & 0 deletions daemon/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ struct rtpengine_config rtpe_config = {
.mqtt_publish_interval = 5000,
.dtmf_digit_delay = 2500,
.rtcp_interval = 5000,
.homer_rtcp = -1,
.common = {
.log_levels = {
[log_level_index_internals] = -1,
Expand Down Expand Up @@ -578,6 +579,8 @@ static void options(int *argc, char ***argv) {
{ "homer", 0, 0, G_OPTION_ARG_STRING, &homerp, "Address of Homer server for RTCP stats","IP46|HOSTNAME:PORT"},
{ "homer-protocol",0,0,G_OPTION_ARG_STRING, &homerproto, "Transport protocol for Homer (default udp)", "udp|tcp" },
{ "homer-id", 0, 0, G_OPTION_ARG_INT, &rtpe_config.homer_id, "'Capture ID' to use within the HEP protocol", "INT" },
{ "homer-enable-rtcp-stats", 0, 0, G_OPTION_ARG_INT, &rtpe_config.homer_rtcp, "Enable/disable RTCP stats to Homer (enabled by default if homer server enabled, else disabled)", "0|1" },
{ "homer-enable-ng", 0, 0, G_OPTION_ARG_INT, &rtpe_config.homer_ng, "Enable/disable NG to Homer", "0|1" },
{ "recording-dir", 0, 0, G_OPTION_ARG_STRING, &rtpe_config.spooldir, "Directory for storing pcap and metadata files", "FILE" },
{ "recording-method",0, 0, G_OPTION_ARG_STRING, &rtpe_config.rec_method, "Strategy for call recording", "pcap|proc|all" },
{ "recording-format",0, 0, G_OPTION_ARG_STRING, &rtpe_config.rec_format, "File format for stored pcap files", "raw|eth" },
Expand Down Expand Up @@ -788,6 +791,9 @@ static void options(int *argc, char ***argv) {
if (homerp) {
if (endpoint_parse_any_getaddrinfo_full(&rtpe_config.homer_ep, homerp))
die("Invalid IP or port '%s' (--homer)", homerp);
if (rtpe_config.homer_rtcp == -1) {
rtpe_config.homer_rtcp = 1;
}
}
if (homerproto) {
if (!strcmp(homerproto, "tcp"))
Expand All @@ -797,6 +803,15 @@ static void options(int *argc, char ***argv) {
else
die("Invalid protocol '%s' (--homer-protocol)", homerproto);
}
/* homer_rtcp == -1 means homer-enable-rtcp-stats is enabled by default if homer enabled */
if (rtpe_config.homer_rtcp == -1)
rtpe_config.homer_rtcp = 0;

if (rtpe_config.homer_rtcp < 0 || rtpe_config.homer_rtcp > 1 )
die("Invalid homer-enable-rtcp-stats value");

if (rtpe_config.homer_ng < 0 || rtpe_config.homer_ng > 1 )
die("Invalid homer-enable-ng value");

if (rtpe_config.default_tos < 0 || rtpe_config.default_tos > 255)
die("Invalid TOS value");
Expand Down
4 changes: 2 additions & 2 deletions daemon/rtcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ static void homer_finish(struct rtcp_process_ctx *ctx, call_t *c, const endpoint
str_sanitize(ctx->json);
g_string_append(ctx->json, " }");
if (ctx->json->len > ctx->json_init_len + 2)
homer_send(ctx->json, &c->callid, src, dst, tv);
homer_send(ctx->json, &c->callid, src, dst, tv, PROTO_RTCP_JSON);
else
g_string_free(ctx->json, TRUE);
ctx->json = NULL;
Expand Down Expand Up @@ -1407,7 +1407,7 @@ static void transcode_sr_wrap(struct rtcp_process_ctx *ctx, struct sender_report

void rtcp_init(void) {
rtcp_handlers.logging = _log_facility_rtcp ? &log_handlers : &dummy_handlers;
rtcp_handlers.homer = has_homer() ? &homer_handlers : &dummy_handlers;
rtcp_handlers.homer = has_homer() && rtpe_config.homer_rtcp ? &homer_handlers : &dummy_handlers;
}


Expand Down
7 changes: 5 additions & 2 deletions include/homer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

#include "socket.h"

#define PROTO_RTCP_JSON 0x05
#define PROTO_NG 0x3d

void homer_sender_init(const endpoint_t *, int, int);
int homer_send(GString *, const str *, const endpoint_t *, const endpoint_t *,
const struct timeval *tv);
const struct timeval *, int);
int has_homer(void);

#endif
#endif
2 changes: 2 additions & 0 deletions include/main.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct rtpengine_config {
endpoint_t homer_ep;
int homer_protocol;
int homer_id;
int homer_rtcp;
int homer_ng;
gboolean no_fallback;
gboolean reject_invalid_sdp;
gboolean save_interface_ports;
Expand Down

0 comments on commit d30f05b

Please sign in to comment.