Skip to content

Commit

Permalink
ISSUE-1619: AMQP session flow control qd_session_t lifecycle (#1620)
Browse files Browse the repository at this point in the history
Fix the qd_session_t lifecycle and encapsulate session windowing
parameters within it.

This does not fix the issue but is the start of the solution.
  • Loading branch information
kgiusti authored Oct 9, 2024
1 parent 4fbde57 commit 3cf90e6
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 204 deletions.
117 changes: 59 additions & 58 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ static void deferred_AMQP_rx_handler(void *context, bool discard)

if (!discard) {
qd_link_t *qdl = safe_deref_qd_link_t(*safe_qdl);
if (!!qdl) {
if (!!qdl && !!qd_link_pn(qdl)) {
assert(qd_link_direction(qdl) == QD_INCOMING);
while (true) {
if (!AMQP_rx_handler(amqp_adaptor.router, qdl))
Expand Down Expand Up @@ -1169,70 +1169,66 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
}

// check if Q3 can be unblocked
pn_session_t *pn_ssn = pn_link_session(pnlink);
if (pn_ssn) {
qd_session_t *qd_ssn = qd_session_from_pn(pn_ssn);
if (qd_ssn && qd_session_is_q3_blocked(qd_ssn)) {
// Q3 blocked - have we drained enough outgoing bytes?
const size_t q3_lower = QD_BUFFER_SIZE * QD_QLIMIT_Q3_LOWER;
if (pn_session_outgoing_bytes(pn_ssn) < q3_lower) {
// yes. We must now unblock all links that have been blocked by Q3

qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn);
qd_link_t *blink = DEQ_HEAD(*blinks);
qd_connection_t *conn = qd_link_connection(blink);

while (blink) {
qd_link_q3_unblock(blink); // removes from blinks list!
pnlink = qd_link_pn(blink);
if (blink != link) { // already flowed this link
rlink = (qdr_link_t *) qd_link_get_context(blink);
if (rlink) {
// signalling flow to the core causes the link to be re-activated
qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink));
}
qd_session_t *qd_ssn = qd_link_get_session(link);
if (qd_session_is_q3_blocked(qd_ssn)) {
// Q3 blocked - have we drained enough outgoing bytes?
if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_get_outgoing_threshold(qd_ssn)) {
// yes. We must now unblock all links that have been blocked by Q3

qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn);
qd_link_t *blink = DEQ_HEAD(*blinks);
qd_connection_t *conn = qd_link_connection(blink);

while (blink) {
qd_link_q3_unblock(blink); // removes from blinks list!
pnlink = qd_link_pn(blink);
if (blink != link) { // already flowed this link
rlink = (qdr_link_t *) qd_link_get_context(blink);
if (rlink) {
// signalling flow to the core causes the link to be re-activated
qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink));
}
}

pn_delivery_t *pdlv = pn_link_current(pnlink);
if (!!pdlv) {
qdr_delivery_t *qdlv = qdr_node_delivery_qdr_from_pn(pdlv);
//
//https://github.com/skupperproject/skupper-router/issues/1221
// Add the delivery/delivery_ref to the outbound_cutthrough_worklist
// only if the delivery is a cut-through delivery.
// Pure all-AMQP deliveries/delivery_refs will never be cut-through and hence will never be placed
// on the conn->outbound_cutthrough_worklist.
//
if (qdr_delivery_is_unicast_cutthrough(qdlv)) {
qdr_delivery_ref_t *dref = new_qdr_delivery_ref_t();
bool used = false;

sys_spinlock_lock(&conn->outbound_cutthrough_spinlock);
if (!qdlv->cutthrough_list_ref) {
DEQ_ITEM_INIT(dref);
dref->dlv = qdlv;
qdlv->cutthrough_list_ref = dref;
DEQ_INSERT_TAIL(conn->outbound_cutthrough_worklist, dref);
qdr_delivery_incref(qdlv, "Recover from Q3 stall");
used = true;
}
sys_spinlock_unlock(&conn->outbound_cutthrough_spinlock);
pn_delivery_t *pdlv = pn_link_current(pnlink);
if (!!pdlv) {
qdr_delivery_t *qdlv = qdr_node_delivery_qdr_from_pn(pdlv);
//
//https://github.com/skupperproject/skupper-router/issues/1221
// Add the delivery/delivery_ref to the outbound_cutthrough_worklist
// only if the delivery is a cut-through delivery.
// Pure all-AMQP deliveries/delivery_refs will never be cut-through and hence will never be placed
// on the conn->outbound_cutthrough_worklist.
//
if (qdr_delivery_is_unicast_cutthrough(qdlv)) {
qdr_delivery_ref_t *dref = new_qdr_delivery_ref_t();
bool used = false;

sys_spinlock_lock(&conn->outbound_cutthrough_spinlock);
if (!qdlv->cutthrough_list_ref) {
DEQ_ITEM_INIT(dref);
dref->dlv = qdlv;
qdlv->cutthrough_list_ref = dref;
DEQ_INSERT_TAIL(conn->outbound_cutthrough_worklist, dref);
qdr_delivery_incref(qdlv, "Recover from Q3 stall");
used = true;
}
sys_spinlock_unlock(&conn->outbound_cutthrough_spinlock);

if (!used) {
free_qdr_delivery_ref_t(dref);
}
if (!used) {
free_qdr_delivery_ref_t(dref);
}
}

blink = DEQ_HEAD(*blinks);
}

//
// Wake the connection for outgoing cut-through
//
SET_ATOMIC_FLAG(&conn->wake_cutthrough_outbound);
AMQP_conn_wake_handler(router, conn, 0);
blink = DEQ_HEAD(*blinks);
}

//
// Wake the connection for outgoing cut-through
//
SET_ATOMIC_FLAG(&conn->wake_cutthrough_outbound);
AMQP_conn_wake_handler(router, conn, 0);
}
}
return 0;
Expand Down Expand Up @@ -2386,7 +2382,6 @@ static void qd_amqp_adaptor_final(void *adaptor_context)
pn_transport_t *tport = pn_connection_transport(ctx->pn_conn);
if (tport)
pn_transport_set_context(tport, 0); /* for transport_tracer */
qd_session_cleanup(ctx);
pn_connection_set_context(ctx->pn_conn, 0);
}
qd_connection_invoke_deferred_calls(ctx, true); // Discard any pending deferred calls
Expand All @@ -2404,6 +2399,12 @@ static void qd_amqp_adaptor_final(void *adaptor_context)
qd_listener_remove_connection(ctx->listener, ctx);
ctx->listener = 0;
}

// free up any still active sessions
for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i)
qd_session_decref(ctx->qd_sessions[i]);
qd_connection_release_sessions(ctx);

qd_tls_session_free(ctx->ssl);
sys_atomic_destroy(&ctx->wake_core);
sys_atomic_destroy(&ctx->wake_cutthrough_inbound);
Expand Down
Loading

0 comments on commit 3cf90e6

Please sign in to comment.