Skip to content

Commit

Permalink
Merge pull request #1049 from nanomq/wangha/dev4
Browse files Browse the repository at this point in the history
Add hybrid_servers to bridge config.
  • Loading branch information
JaylinYu authored Aug 19, 2024
2 parents 899937c + 494d1d0 commit 3c6db51
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 13 deletions.
3 changes: 2 additions & 1 deletion include/nng/supplemental/nanolib/conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ struct conf_bridge_node {
nng_aio **bridge_aio;
nng_mtx *mtx;

bool hybrid; // enable/disable hybrid bridging
char **hybrid_servers;

// MQTT v5 property
conf_bridge_conn_properties * conn_properties;
Expand All @@ -301,7 +303,6 @@ struct conf_bridge_node {
bool multi_stream;
bool stream_auto_genid; // generate stream id automatically for each stream
bool qos_first; // send QoS msg in high priority
bool hybrid; // hybrid bridging affects auto-reconnect of QUIC transport
uint64_t qkeepalive; //keepalive timeout interval of QUIC transport
uint64_t qconnect_timeout; // HandshakeIdleTimeoutMs of QUIC
uint32_t qdiscon_timeout; // DisconnectTimeoutMs
Expand Down
1 change: 0 additions & 1 deletion src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,6 @@ mqtt_ctx_cancel_send(nni_aio *aio, void *arg, int rv)
nni_aio_list_remove(aio);
}
nni_mtx_unlock(&s->mtx);

}

static void
Expand Down
29 changes: 27 additions & 2 deletions src/mqtt/protocol/mqtt/mqtt_quic_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1624,7 +1624,7 @@ quic_mqtt_pipe_fini(void *arg)

// emulate disconnect notify msg as a normal publish
while ((ctx = nni_list_first(&p->recv_queue)) != NULL) {
// Pipe was closed. just push an error back to the
// Pipe was closed. just push an error back to the
// entire socket, because we only have one pipe
nni_list_remove(&p->recv_queue, ctx);
aio = ctx->raio;
Expand Down Expand Up @@ -1901,7 +1901,32 @@ mqtt_quic_ctx_init(void *arg, void *sock)
static void
mqtt_quic_ctx_fini(void *arg)
{
NNI_ARG_UNUSED(arg);
mqtt_quic_ctx *ctx = arg;
mqtt_sock_t *s = ctx->mqtt_sock;
mqtt_pipe_t *p = s->pipe;
nni_aio * aio;

nni_mtx_lock(&s->mtx);
if (nni_list_active(&s->recv_queue, ctx)) {
if ((aio = ctx->raio) != NULL) {
ctx->raio = NULL;
nni_list_remove(&s->recv_queue, ctx);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
}
nni_mtx_unlock(&s->mtx);

if (p) {
nni_mtx_lock(&p->lk);
if (nni_list_active(&p->recv_queue, ctx)) {
if ((aio = ctx->raio) != NULL) {
ctx->raio = NULL;
nni_list_remove(&p->recv_queue, ctx);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
}
nni_mtx_unlock(&p->lk);
}
}

static void
Expand Down
28 changes: 20 additions & 8 deletions src/supplemental/nanolib/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -2763,14 +2763,16 @@ conf_bridge_node_init(conf_bridge_node *node)
node->will_qos = 0;
node->will_retain = false;

node->sqlite = NULL;
node->bridge_aio = NULL;
node->bridge_arg = NULL;

node->bridge_aio = NULL;
node->bridge_arg = NULL;
node->sqlite = NULL;

node->hybrid = false;
node->hybrid_servers = NULL;

#if defined(SUPP_QUIC)
node->multi_stream = false;
node->hybrid = false;
node->qkeepalive = 30;
node->qconnect_timeout = 20; // HandshakeIdleTimeoutMs of QUIC
node->qdiscon_timeout = 20; // DisconnectTimeoutMs
Expand Down Expand Up @@ -2918,10 +2920,6 @@ conf_bridge_node_parse_with_name(const char *path,const char *key_prefix, const
key_prefix, name, ".quic_max_ack_delay_ms")) != NULL) {
node->qmax_ack_delay_ms = atoi(value);
free(value);
} else if ((value = get_conf_value_with_prefix2(line, sz,
key_prefix, name, ".hybrid_bridging")) != NULL) {
node->hybrid = nni_strcasecmp(value, "true") == 0;
free(value);
} else if ((value = get_conf_value_with_prefix2(line, sz,
key_prefix, name, ".quic_multi_stream")) != NULL) {
node->multi_stream = nni_strcasecmp(value, "true") == 0;
Expand Down Expand Up @@ -3228,6 +3226,12 @@ conf_bridge_node_destroy(conf_bridge_node *node)
free(node->sub_properties);
node->sub_properties = NULL;
}
if (node->hybrid_servers) {
for (size_t i = 0; i < cvector_size(node->hybrid_servers); ++i)
if (node->hybrid_servers[i])
free(node->hybrid_servers[i]);
cvector_free(node->hybrid_servers);
}
conf_tls_destroy(&node->tls);
}

Expand Down Expand Up @@ -3274,6 +3278,14 @@ print_bridge_conf(conf_bridge *bridge, const char *prefix)
node->name, node->backoff_max);
log_info("%sbridge.mqtt.%s.max_parallel_processes: %ld", prefix,
node->name, node->parallel);
log_info("%sbridge.mqtt.%s.hybrid_bridging : %s", prefix,
node->name, node->hybrid ? "true" : "false");
log_info("%sbridge.mqtt.%s.hybrid_servers: ", prefix, node->name);
for (size_t j = 0; j < cvector_size(node->hybrid_servers); j++) {
log_info(
"\t[%ld] hybrid servers: %s", j,
node->hybrid_servers[j]);
}

#if defined(SUPP_QUIC)
log_info("%sbridge.mqtt.%s.quic_multi_stream: %s", prefix,
Expand Down
9 changes: 8 additions & 1 deletion src/supplemental/nanolib/conf_ver2.c
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,6 @@ conf_bridge_quic_parse_ver2(conf_bridge_node *node, cJSON *jso_bridge_node)
node, qmax_ack_delay_ms, "quic_max_ack_delay_ms", jso_bridge_node);
hocon_read_time_base(
node, qconnect_timeout, "quic_handshake_timeout", jso_bridge_node);
hocon_read_bool_base(node, hybrid, "hybrid_bridging", jso_bridge_node);
hocon_read_bool_base(node, quic_0rtt, "quic_0rtt", jso_bridge_node);
hocon_read_bool_base(
node, multi_stream, "quic_multi_stream", jso_bridge_node);
Expand Down Expand Up @@ -1282,6 +1281,14 @@ conf_bridge_node_parse(
conf_bridge_sub_properties_parse_ver2(node, jso_prop);
}

hocon_read_bool_base(node, hybrid, "hybrid_bridging", obj);
cJSON *hybrid_servers = hocon_get_obj("hybrid_servers", obj);
cJSON *hybrid_server = NULL;
cJSON_ArrayForEach(hybrid_server, hybrid_servers)
{
cvector_push_back(node->hybrid_servers, strdup(hybrid_server->valuestring));
}

hocon_read_num_base(node, parallel, "max_parallel_processes", obj);
update_bridge_node_vin(node, CONF_NODE_SUBSCRIPTION);
update_bridge_node_vin(node, CONF_NODE_FORWARD);
Expand Down

0 comments on commit 3c6db51

Please sign in to comment.