-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes #1700: refactor the AMQP link lifecycle #1701
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
This change removes some of the old link attach routing logic and attempts to clean up the link API. The logic that used to track the exchange of Attach/Detach performatives has been simplified. The various counters and booleans maintained by the qdr_link_t structure for tracking this exchange has been reduced to a mask/flag implementation similar to Protons implementation of endpoint state. This patch refactors the link detach adaptor API to be more like the existing AMQP connection API: there is now an explict API call to release the link instance at the end of its lifecycle. The adaptor API is modified by separating the AMQP detach handling from the release of the link instance. The old qdr_link_detach() adaptor function has been refactored into two functions: qdr_link_detach_received() and qdr_link_close(). The qdr_link_detach_received() call is made by the AMQP adaptor when a Detach Peformative has been received by the peer. It is only used by the AMQP adaptor. The new qdr_link_closed() API call is made by all adaptors when the link instance is destroyed. This is similar to the existing qdr_connection_closed() call but for links. It is used by all adaptors to indicate to the core that the link is no longer in use and can be cleaned up. In the case of the AMQP adaptor this call will be made after the link detach handshake has completed. Test coverage by the test-sender AMQP client has been increased by adding a clean connection close function.
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -653,12 +653,6 @@ void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t * | |
****************************************************************************** | ||
*/ | ||
|
||
typedef enum { | ||
QD_DETACHED, // Protocol detach | ||
QD_CLOSED, // Protocol close | ||
QD_LOST // Connection or session closed | ||
} qd_detach_type_t; | ||
|
||
/** | ||
* qdr_link_set_context | ||
* | ||
|
@@ -810,15 +804,32 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, | |
void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target); | ||
|
||
/** | ||
* qdr_link_detach | ||
* qdr_link_detach_received | ||
* | ||
* This function is invoked when a link detach arrives. | ||
* This function is invoked when a link detach performative arrives from the remote peer. This may the first detach | ||
* (peer-initiated link detach) or in response to a detach sent by the router (second detach). | ||
* | ||
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event. | ||
* @param dt The type of detach that occurred. | ||
* @param error The link error from the detach frame or 0 if none. | ||
*/ | ||
void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error); | ||
void qdr_link_detach_received(qdr_link_t *link, qdr_error_t *error); | ||
|
||
|
||
/** | ||
* qdr_link_closed | ||
* | ||
* This function is invoked by the adaptor when the link has fully closed. This will be the last call made by the | ||
* adaptor for this link. This may be called as a result of a successful detach handshake or due to link loss. This will | ||
* also be called during adaptor shutdown on any outstanding links. | ||
* | ||
* The core may free the qdr_link_t by this call. The adaptor MUST NOT reference the qdr_link_t on return from this | ||
* call. | ||
* | ||
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event. | ||
* @param forced True if the link was closed due to failure or shutdown. False if closed by clean detach handshake. | ||
*/ | ||
void qdr_link_closed(qdr_link_t *link, bool forced); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked for the usage of this function in the tcp adaptor, say in
In the above usage, we are calling the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Indeed that's the idea: the adaptor has detected the link is closed and needs to notify the core that the link no longer exists (hence qdr_link_closed rather than qd_link_closed: it's a Core API call not an adaptor call). I chose qdr_link_closed because the function performs essentially the same thing as the exising core function qdr_connection_closed except the new one deals with qdr_link_t's not qdr_connection_t's). You'll see that both adaptors call qdr_connection_close() right before they clean up their connection state objects (qd_tcp_connection_t or qd_connection_t - depending on the adaptor). The TCP adaptor used to call qdr_link_detach() even though there's not such thing as a detach performative for TCP! For TCP we basically simulate a network connection drop since there's no AMQP close handshake going on. What do you think? |
||
|
||
|
||
/** | ||
* qdr_link_deliver | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -139,24 +139,6 @@ static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv) | |
return ref ? (qdr_delivery_t*) ref->ref : 0; | ||
} | ||
|
||
// clean up all qdr_delivery/pn_delivery bindings for the link | ||
// | ||
void qd_link_abandoned_deliveries_handler(qd_router_t *router, qd_link_t *link) | ||
{ | ||
qd_link_ref_list_t *list = qd_link_get_ref_list(link); | ||
qd_link_ref_t *ref = DEQ_HEAD(*list); | ||
|
||
while (ref) { | ||
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref; | ||
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv); | ||
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv)); | ||
|
||
// this will remove and release the ref | ||
qdr_node_disconnect_deliveries(router->router_core, link, dlv, pdlv); | ||
ref = DEQ_HEAD(*list); | ||
} | ||
} | ||
|
||
|
||
// read the delivery-state set by the remote endpoint | ||
// | ||
|
@@ -1223,10 +1205,9 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link) | |
/** | ||
* Link Detached Handler | ||
*/ | ||
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_detach_type_t dt) | ||
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link) | ||
{ | ||
if (!link) | ||
return 0; | ||
assert(link); | ||
|
||
pn_link_t *pn_link = qd_link_pn(link); | ||
if (!pn_link) | ||
|
@@ -1257,29 +1238,59 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det | |
} | ||
} | ||
|
||
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); | ||
pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0; | ||
// Notify the core that a detach has been received. | ||
|
||
qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(link); | ||
if (rlink) { | ||
// | ||
// If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent | ||
// connection/session loss then this is the last proton event that will be generated for this link. The qd_link | ||
// will be freed on return from this call so remove the cross linkage between it and the qdr_link peer. | ||
|
||
if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) { | ||
// note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is | ||
// the second detach! | ||
qd_link_set_context(link, 0); | ||
qdr_link_set_context(rlink, 0); | ||
} | ||
|
||
qdr_error_t *error = qdr_error_from_pn(cond); | ||
qdr_link_detach(rlink, dt, error); | ||
pn_condition_t *cond = pn_link_remote_condition(pn_link); | ||
qdr_error_t *error = qdr_error_from_pn(cond); | ||
qdr_link_detach_received(rlink, error); | ||
} else if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) { | ||
// Normally the core would be responsible for sending the response detach to close the link (via | ||
// CORE_link_detach) but since there is no core link that will not happen. | ||
pn_link_close(pn_link); | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
|
||
/** | ||
* Link closed handler | ||
* | ||
* This is the last callback for the given link - the link will be freed on return from this call! Forced is true if the | ||
* link has not properly closed (detach handshake completed). | ||
*/ | ||
static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wondering if the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
{ | ||
assert(qd_link); | ||
|
||
// Clean up all qdr_delivery/pn_delivery bindings for the link. | ||
|
||
qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link); | ||
qd_link_ref_t *ref = DEQ_HEAD(*list); | ||
|
||
while (ref) { | ||
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref; | ||
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv); | ||
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv)); | ||
|
||
// This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call! | ||
qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv); | ||
ref = DEQ_HEAD(*list); | ||
} | ||
|
||
qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link); | ||
if (qdr_link) { | ||
// Notify core that this link no longer exists | ||
qdr_link_set_context(qdr_link, 0); | ||
qd_link_set_context(qd_link, 0); | ||
qdr_link_closed(qdr_link, forced); | ||
// This will cause the core to free qdr_link at some point so: | ||
qdr_link = 0; | ||
} | ||
} | ||
|
||
static void bind_connection_context(qdr_connection_t *qdrc, void* token) | ||
{ | ||
qd_connection_t *conn = (qd_connection_t*) token; | ||
|
@@ -1776,8 +1787,8 @@ static const qd_node_type_t router_node = {"router", 0, | |
AMQP_outgoing_link_handler, | ||
AMQP_conn_wake_handler, | ||
AMQP_link_detach_handler, | ||
AMQP_link_closed_handler, | ||
AMQP_link_attach_handler, | ||
qd_link_abandoned_deliveries_handler, | ||
AMQP_link_flow_handler, | ||
0, // node_created_handler | ||
0, // node_destroyed_handler | ||
|
@@ -1920,7 +1931,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error | |
return; | ||
|
||
pn_link_t *pn_link = qd_link_pn(qlink); | ||
if (!pn_link) | ||
if (!pn_link || !!(pn_link_state(pn_link) & PN_LOCAL_CLOSED)) // already detached | ||
return; | ||
|
||
if (error) { | ||
|
@@ -1945,17 +1956,6 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error | |
} | ||
} | ||
|
||
// | ||
// This is the last event for this link that the core is going to send into Proton so remove the core => adaptor | ||
// linkage. If this is the response attach then there will be no further proton link events to send to the core so | ||
// remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so | ||
// we can notify the core when the second (response) detach arrives | ||
// | ||
qdr_link_set_context(link, 0); | ||
if (!first) { | ||
qd_link_set_context(qlink, 0); | ||
} | ||
|
||
qd_link_close(qlink); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like renaming the function
qdr_link_detach()
toqdr_link_detach_received()
. The new function name makes it clear that a detach (first or second) has been received from the remote peer.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!