Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize multi-stream & QoS Cancel Timeout #1102

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/mqtt/protocol/mqtt/mqtt_quic_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct mqtt_pipe_s {
#endif
};


// QUIC is still retrying, Although we abort sending in protocol layer.
static void
mqtt_quic_cancel_send(nni_aio *aio, void *arg, int rv)
{
Expand Down Expand Up @@ -290,7 +290,8 @@ mqtt_quic_send_msg(nni_aio *aio, mqtt_sock_t *s)
nni_msg_free(msg);
nni_aio_finish_error(aio, NNG_ECANCELED);
return NNG_ECANCELED;
} else {
} else if (!s->qos_first) {
// Cancel Timeout is simply not compatible with QoS first logic
int rv;
if ((rv = nni_aio_schedule(aio, mqtt_quic_cancel_send, s)) != 0) {
log_warn("Cancel_Func scheduling failed, send abort!");
Expand Down Expand Up @@ -319,7 +320,7 @@ mqtt_quic_send_msg(nni_aio *aio, mqtt_sock_t *s)
// this make cancel_send impossible
nni_aio_set_prov_data(aio, &prior_flags);
nni_pipe_send(p->qpipe, aio);
log_debug("sending highpriority QoS msg in parallel");
log_debug("sending high priority QoS msg in parallel");
nni_mtx_unlock(&s->mtx);
return 0;
}
Expand Down Expand Up @@ -740,6 +741,7 @@ mqtt_quic_recv_cb(void *arg)
if (packet_type == NNG_MQTT_PUBLISH)
if (s->cb.msg_recv_cb) // Trigger cb
s->cb.msg_recv_cb(msg, s->cb.recvarg);
return;
}

// Timer callback, we use it for retransmition.
Expand Down
11 changes: 2 additions & 9 deletions src/supplemental/quic/msquic_dial.c
Original file line number Diff line number Diff line change
Expand Up @@ -1008,8 +1008,7 @@ quic_stream_dowrite(nni_quic_conn *c)
naiov, QUIC_SEND_FLAG_NONE, NULL))) {
log_error("[sid%d] Failed in StreamSend, 0x%x!", c->id, rv);
free(buf);
// nni_aio_list_remove(aio);
// nni_aio_finish_error(aio, NNG_ECLOSED);
// Verify: send_complete cb triggered later?
return;
}

Expand Down Expand Up @@ -1335,15 +1334,9 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context,
nni_aio_set_input(aio, 0, NULL);
free(buf);
nni_msg *msg = nni_aio_get_msg(aio);
// free SUBSCRIBE/UNSUBSCRIBE QoS 1/2 PUBLISH msg here
// nni_mqtt_packet_type t = nni_mqtt_msg_get_packet_type(msg);
nni_msg_free(msg);
// Do not set nni_aio_set_msg(aio, NULL) here! leave it to cancel!
if (canceled)
nni_aio_finish_error(aio, NNG_ECANCELED);
// XXX Protocol will finish aio when received ack.
//else
// nni_aio_finish(aio, 0, nni_aio_count(aio));
// Do not finish user aio here!
break;
}
// Ordinary sending
Expand Down
Loading