Skip to content

Commit

Permalink
minor optimization to short circuit link processing
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Nov 7, 2024
1 parent 5b8cc53 commit 2c310bd
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 37 deletions.
84 changes: 52 additions & 32 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ static void AMQP_disposition_handler(qd_router_t *router, qd_link_t *link, pn_de
/**
* Handle session window update events
*/
extern void qdr_link_schedule_hack(qdr_link_t *link);
static int AMQP_session_flow_handler(qd_router_t *router, qd_session_t *qd_ssn)
{
// ignore this event unless there are links blocked on output capacity
Expand All @@ -1093,45 +1094,44 @@ static int AMQP_session_flow_handler(qd_router_t *router, qd_session_t *qd_ssn)
// there are blocked links and there is enough session output capacity to restart sending

bool activate_cutthrough = false;
bool activate_conn = false;
qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn);
qd_link_t *blink = DEQ_HEAD(*blinks);
qd_connection_t *conn = qd_link_connection(blink);

while (blink) {
qd_link_q3_unblock(blink); // removes from blinks list!
pn_link_t *pnlink = qd_link_pn(blink);
pn_delivery_t *pdlv = pn_link_current(pnlink);
if (!!pdlv) {
// How we restart output on a link depends on whether the delivery is cut-through or the legacy
// "through to core" path
qdr_delivery_t *qdlv = qdr_node_delivery_qdr_from_pn(pdlv);
if (qdr_delivery_is_unicast_cutthrough(qdlv)) {
qdr_delivery_ref_t *dref = new_qdr_delivery_ref_t();
bool used = false;

sys_spinlock_lock(&conn->outbound_cutthrough_spinlock);
if (!qdlv->cutthrough_list_ref) {
DEQ_ITEM_INIT(dref);
dref->dlv = qdlv;
qdlv->cutthrough_list_ref = dref;
DEQ_INSERT_TAIL(conn->outbound_cutthrough_worklist, dref);
qdr_delivery_incref(qdlv, "Recover from Q3 stall");
used = true;
activate_cutthrough = true;
}
sys_spinlock_unlock(&conn->outbound_cutthrough_spinlock);

if (!used) {
free_qdr_delivery_ref_t(dref);
}
pn_delivery_t *pdlv = pn_link_current(pnlink);
qdr_delivery_t *qdlv = !!pdlv ? qdr_node_delivery_qdr_from_pn(pdlv) : 0;

} else {
// non-cutthrough delivery: slowpath through core by hijacking existing link flow path
qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(blink);
if (rlink) {
// signalling flow to the core causes the link to be re-activated
qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink));
}
if (qdlv && qdr_delivery_is_unicast_cutthrough(qdlv)) {
qdr_delivery_ref_t *dref = new_qdr_delivery_ref_t();
bool used = false;

sys_spinlock_lock(&conn->outbound_cutthrough_spinlock);
if (!qdlv->cutthrough_list_ref) {
DEQ_ITEM_INIT(dref);
dref->dlv = qdlv;
qdlv->cutthrough_list_ref = dref;
DEQ_INSERT_TAIL(conn->outbound_cutthrough_worklist, dref);
qdr_delivery_incref(qdlv, "Recover from Q3 stall");
used = true;
activate_cutthrough = true;
}
sys_spinlock_unlock(&conn->outbound_cutthrough_spinlock);

if (!used) {
free_qdr_delivery_ref_t(dref);
}

} else {
// non-cutthrough delivery: slowpath through core by hijacking existing link flow path
qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(blink);
if (rlink) {
qdr_link_schedule_hack(rlink);
activate_conn = true;
}
}

Expand All @@ -1140,6 +1140,11 @@ static int AMQP_session_flow_handler(qd_router_t *router, qd_session_t *qd_ssn)

if (activate_cutthrough) {
SET_ATOMIC_FLAG(&conn->wake_cutthrough_outbound);
}
if (activate_conn) {
SET_ATOMIC_FLAG(&conn->wake_core);
}
if (activate_cutthrough || activate_conn) {
AMQP_conn_wake_handler(router, conn, 0);
}

Expand Down Expand Up @@ -1246,8 +1251,11 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
static int AMQP_link_work_handler(qd_router_t *router, qd_link_t *link)
{
// Sending frames may have opened up more outgoing capacity on the parent session.
// Check for any blocked outgoing links
return AMQP_session_flow_handler(router, qd_link_get_session(link));
qd_session_t *qd_ssn = qd_link_get_session(link);
if (qd_session_is_q3_blocked(qd_ssn)) {
return AMQP_session_flow_handler(router, qd_ssn);
}
return 0;
}

/**
Expand Down Expand Up @@ -1855,6 +1863,7 @@ static void CORE_link_first_attach(void *context,
//
qd_link_t *qlink = qd_link(qconn, qdr_link_direction(link), qdr_link_name(link), ssn_class);

qd_link_set_link_id(qlink, link->identity); // TODO(kgiusti): this fixes an unrelated bug
//
// Copy the source and target termini to the link
//
Expand Down Expand Up @@ -2066,6 +2075,17 @@ static int CORE_link_push(void *context, qdr_link_t *link, int limit)
if (!qlink)
return 0;

if (qd_link_is_q3_blocked(qlink))
return 0;

// Cannot write output data if parent session is blocked,
qd_session_t *qd_ssn = qd_link_get_session(qlink);
if (qd_session_is_q3_blocked(qd_ssn)) {
// Mark link as blocked so when the session unblocks the link will get re-pushed:
qd_link_q3_block(qlink);
return 0;
}

pn_link_t *plink = qd_link_pn(qlink);

if (plink) {
Expand Down
7 changes: 7 additions & 0 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,13 @@ void qd_link_q3_unblock(qd_link_t *link)
}


bool qd_link_is_q3_blocked(const qd_link_t *link)
{
assert(link);
return link->q3_blocked;
}


uint64_t qd_link_link_id(const qd_link_t *link)
{
return link->link_id;
Expand Down
1 change: 1 addition & 0 deletions src/adaptors/amqp/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ void qd_link_close(qd_link_t *link);
void qd_link_detach(qd_link_t *link);
void qd_link_free(qd_link_t *link);
void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context);
bool qd_link_is_q3_blocked(const qd_link_t *link);
void qd_link_q3_block(qd_link_t *link);
void qd_link_q3_unblock(qd_link_t *link);
uint64_t qd_link_link_id(const qd_link_t *link);
Expand Down
23 changes: 23 additions & 0 deletions src/router_core/connections.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,14 @@ int qdr_connection_process(qdr_connection_t *conn)
conn->protocol_adaptor->conn_trace_handler(conn->protocol_adaptor->user_context, conn, false);
break;

case QDR_CONNECTION_WORK_SCHEDULE_LINK: {
if (work->link->ref[QDR_LINK_LIST_CLASS_LOCAL] == 0) {
qdr_add_link_ref(&links_with_work[work->link->priority], work->link, QDR_LINK_LIST_CLASS_LOCAL);
work->link->processing = true;
}
break;
}

}

qdr_connection_work_free_CT(work);
Expand Down Expand Up @@ -825,6 +833,21 @@ void qdr_connection_enqueue_work_CT(qdr_core_t *core,
}


void qdr_link_schedule_hack(qdr_link_t *link)
{
qdr_connection_t *conn = link->conn;
if (conn) {
qdr_connection_work_t *work = new_qdr_connection_work_t();
ZERO(work);
work->work_type = QDR_CONNECTION_WORK_SCHEDULE_LINK;
work->link = link;

sys_mutex_lock(&conn->work_lock);
DEQ_INSERT_TAIL(conn->work_list, work);
sys_mutex_unlock(&conn->work_lock);
}
}

void qdr_link_enqueue_work_CT(qdr_core_t *core,
qdr_link_t *link,
qdr_link_work_t *work)
Expand Down
3 changes: 2 additions & 1 deletion src/router_core/router_core_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ typedef enum {
QDR_CONNECTION_WORK_FIRST_ATTACH,
QDR_CONNECTION_WORK_SECOND_ATTACH,
QDR_CONNECTION_WORK_TRACING_ON,
QDR_CONNECTION_WORK_TRACING_OFF
QDR_CONNECTION_WORK_TRACING_OFF,
QDR_CONNECTION_WORK_SCHEDULE_LINK

} qdr_connection_work_type_t;

Expand Down
3 changes: 1 addition & 2 deletions tests/clogger.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ static bool event_handler(pn_event_t *event)
debug("Remote MAX FRAME=%u\n", remote_max_frame);
} break;

case PN_LINK_FLOW:
case PN_LINK_WORK: {
case PN_LINK_FLOW: {
// the remote has given us some credit, now we can send messages
//
if (limit == 0 || sent < limit) {
Expand Down
3 changes: 1 addition & 2 deletions tests/test-sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ static bool event_handler(pn_event_t *event)
}
} break;

case PN_LINK_FLOW:
case PN_LINK_WORK: {
case PN_LINK_FLOW: {
// the remote has given us some credit, now we can send messages
//
static long tag = 0; // a simple tag generator
Expand Down

0 comments on commit 2c310bd

Please sign in to comment.