Skip to content

Commit

Permalink
* NEW [conf] add ctx_msgs as a app layer cache mq for bridging
Browse files Browse the repository at this point in the history
Signed-off-by: jaylin <jaylin@emqx.io>
Signed-off-by: JaylinYu <letrangerjaylin@gmail.com>
  • Loading branch information
JaylinYu committed Nov 28, 2024
1 parent 1abff3d commit e4ab702
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
4 changes: 4 additions & 0 deletions include/nng/supplemental/nanolib/conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ typedef struct {
struct conf_bridge_node {
bool enable;
bool dynamic;
bool busy;
bool clean_start;
bool transparent;
bool will_flag;
Expand Down Expand Up @@ -291,6 +292,9 @@ struct conf_bridge_node {
conf_tcp tcp;
conf_sqlite *sqlite;
nng_aio **bridge_aio;
nng_aio *resend_aio;
nng_dialer dialer; // in order to postpone bridging client start after local broker
nng_lmq *ctx_msgs; // only cache qos msg blocked by aio busy
nng_mtx *mtx;

bool hybrid; // enable/disable hybrid bridging
Expand Down
8 changes: 8 additions & 0 deletions src/supplemental/nanolib/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -2738,6 +2738,7 @@ conf_bridge_node_init(conf_bridge_node *node)
{
node->sock = NULL;
node->name = NULL;
node->busy = false;
node->enable = false;
node->parallel = 2;
node->address = NULL;
Expand Down Expand Up @@ -2789,6 +2790,8 @@ conf_bridge_node_init(conf_bridge_node *node)
node->conn_properties = NULL;
node->will_properties = NULL;
node->sub_properties = NULL;
// TODO compatible with bridge reload
node->ctx_msgs = NULL;
}

static void
Expand Down Expand Up @@ -2964,6 +2967,7 @@ conf_bridge_node_parse_with_name(const char *path,const char *key_prefix, const
} else if ((value = get_conf_value_with_prefix2(line, sz,
key_prefix, name, ".max_send_queue_len")) != NULL) {
node->max_send_queue_len = atoi(value);
nng_lmq_alloc(&node->ctx_msgs, node->max_send_queue_len);
free(value);
} else if ((value = get_conf_value_with_prefix2(line, sz,
key_prefix, name, ".resend_interval")) != NULL) {
Expand Down Expand Up @@ -3167,6 +3171,10 @@ conf_bridge_node_destroy(conf_bridge_node *node)
free(node->will_payload);
node->will_payload = NULL;
}
if (node->ctx_msgs) {
nng_lmq_flush(node->ctx_msgs);
nng_lmq_free(node->ctx_msgs);
}
if (node->forwards_count > 0 && node->forwards_list) {
for (size_t i = 0; i < node->forwards_count; i++) {
topics *s = node->forwards_list[i];
Expand Down

0 comments on commit e4ab702

Please sign in to comment.