From 7f5358a054899f41c76abda323bc80f554675f25 Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 17 Oct 2023 13:13:14 +0200 Subject: [PATCH 01/11] test(registration): close reg with opened conn --- test/quicer_reg_SUITE.erl | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/quicer_reg_SUITE.erl b/test/quicer_reg_SUITE.erl index 32f7b56a..67d7c071 100644 --- a/test/quicer_reg_SUITE.erl +++ b/test/quicer_reg_SUITE.erl @@ -196,3 +196,14 @@ tc_get_reg_name(_Config) -> ?assertEqual({ok, Name}, quicer:get_registration_name(Reg)), ok = quicer:shutdown_registration(Reg), ?assertEqual({ok, Name}, quicer:get_registration_name(Reg)). + +tc_close_with_opened_conn(_Config) -> + Name = atom_to_list(?FUNCTION_NAME), + Profile = quic_execution_profile_low_latency, + {ok, Reg} = quicer:new_registration(Name, Profile), + {ok, Conn} = quicer_nif:open_connection(#{quic_registration => Reg}), + %% @NOTE This is a hack to make sure the connection is abled to be closed + %% which is triggered by the registration close + _ = timer:apply_after(1000, quicer, connect, + ["localhost", 5060, [{alpn, ["sample"]}, {handle, Conn}], 1000]), + ?assertEqual(ok, quicer:close_registration(Reg)). From 012af2bdae6df02622ec3c59ab19a436109e8e09 Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 17 Oct 2023 18:07:34 +0200 Subject: [PATCH 02/11] followup: thread safe conn open and start --- c_src/quicer_config.c | 9 +- c_src/quicer_connection.c | 264 +++++++++++++++++--------------------- c_src/quicer_listener.c | 15 ++- c_src/quicer_stream.c | 29 +++-- 4 files changed, 150 insertions(+), 167 deletions(-) diff --git a/c_src/quicer_config.c b/c_src/quicer_config.c index e720dc94..a54cb687 100644 --- a/c_src/quicer_config.c +++ b/c_src/quicer_config.c @@ -781,15 +781,21 @@ getopt3(ErlNifEnv *env, } else if (enif_get_resource(env, ctx, ctx_stream_t, &q_ctx)) { + enif_mutex_lock(((QuicerStreamCTX *)q_ctx)->lock); res = get_stream_opt(env, (QuicerStreamCTX *)q_ctx, eopt, elevel); + enif_mutex_unlock(((QuicerStreamCTX *)q_ctx)->lock); } else if (enif_get_resource(env, ctx, ctx_connection_t, &q_ctx)) { + enif_mutex_lock(((QuicerConnCTX *)q_ctx)->lock); res = get_connection_opt(env, (QuicerConnCTX *)q_ctx, eopt, elevel); + enif_mutex_unlock(((QuicerConnCTX *)q_ctx)->lock); } else if (enif_get_resource(env, ctx, ctx_listener_t, &q_ctx)) { + enif_mutex_lock(((QuicerListenerCTX *)q_ctx)->lock); res = get_listener_opt(env, (QuicerListenerCTX *)q_ctx, eopt, elevel); + enif_mutex_unlock(((QuicerListenerCTX *)q_ctx)->lock); } else { //@todo support GLOBAL, REGISTRATION and CONFIGURATION @@ -1884,14 +1890,11 @@ get_listener_opt(ErlNifEnv *env, return ERROR_TUPLE_2(ATOM_BADARG); } - enif_mutex_lock(l_ctx->lock); if (l_ctx->is_closed) { - enif_mutex_unlock(l_ctx->lock); return ERROR_TUPLE_2(ATOM_CLOSED); } enif_keep_resource(l_ctx); - enif_mutex_unlock(l_ctx->lock); if (!IS_SAME_TERM(ATOM_FALSE, elevel)) { diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index 2a5fb2fd..9f393c48 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -116,6 +116,7 @@ peercert1(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) { ERL_NIF_TERM ctx = argv[0]; ERL_NIF_TERM DerCert; + ERL_NIF_TERM res = ATOM_UNDEFINED; void *q_ctx; QuicerConnCTX *c_ctx; int len = 0; @@ -134,30 +135,37 @@ peercert1(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) } assert(c_ctx); - + enif_mutex_lock(c_ctx->lock); if (!c_ctx->peer_cert) { - return ERROR_TUPLE_2(ATOM_NO_PEERCERT); + res = ERROR_TUPLE_2(ATOM_NO_PEERCERT); + goto exit; } if ((len = i2d_X509(c_ctx->peer_cert, NULL)) < 0) { // unlikely to happen - return ERROR_TUPLE_2(ATOM_ERROR_INTERNAL_ERROR); + res = ERROR_TUPLE_2(ATOM_ERROR_INTERNAL_ERROR); + goto exit; } unsigned char *data = enif_make_new_binary(env, len, &DerCert); if (!data) { - return ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); + res = ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); + goto exit; } // note, using tmp is mandatory, see doc for i2d_X590 tmp = data; i2d_X509(c_ctx->peer_cert, &tmp); - return SUCCESS(DerCert); + res = SUCCESS(DerCert); + +exit: + enif_mutex_unlock(c_ctx->lock); + return res; } void @@ -277,7 +285,6 @@ _IRQL_requires_max_(DISPATCH_LEVEL) return status; } - enif_mutex_lock(c_ctx->lock); TP_CB_3(event, (uintptr_t)Connection, Event->Type); switch (Event->Type) @@ -375,11 +382,12 @@ _IRQL_requires_max_(DISPATCH_LEVEL) QuicerConfigCTX *conf_ctx = c_ctx->config_resource; if (is_destroy) { + enif_mutex_lock(c_ctx->lock); c_ctx->is_closed = TRUE; // client shutdown completed c_ctx->Connection = NULL; c_ctx->config_resource = NULL; + enif_mutex_unlock(c_ctx->lock); } - enif_mutex_unlock(c_ctx->lock); if (is_destroy) { @@ -411,7 +419,6 @@ ServerConnectionCallback(HQUIC Connection, return status; } - enif_mutex_lock(c_ctx->lock); TP_CB_3(event, (uintptr_t)Connection, Event->Type); // dbg("server connection event: %d", Event->Type); @@ -494,13 +501,15 @@ ServerConnectionCallback(HQUIC Connection, enif_clear_env(env); QuicerConfigCTX *conf_ctx = c_ctx->config_resource; + if (is_destroy) { + enif_mutex_lock(c_ctx->lock); c_ctx->Connection = NULL; c_ctx->is_closed = TRUE; // server shutdown_complete c_ctx->config_resource = NULL; + enif_mutex_unlock(c_ctx->lock); } - enif_mutex_unlock(c_ctx->lock); if (is_destroy) { @@ -526,48 +535,28 @@ open_connectionX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) ERL_NIF_TERM eHandle; ERL_NIF_TERM res = ERROR_TUPLE_2(ATOM_ERROR_INTERNAL_ERROR); QuicerRegistrationCTX *r_ctx = NULL; - HQUIC registration = NULL; ERL_NIF_TERM options = argv[0]; - if (argc == 0) + if (argc == 1) { - if (G_r_ctx) - { - registration = G_r_ctx->Registration; - } - else + // with validate quic_registration arg + if (!parse_registration(env, options, &r_ctx)) { return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); } - r_ctx = NULL; + } + + // If r_ctx is unset, default to use global registration + if (!r_ctx && !G_r_ctx) + { + return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); } else { - assert(argc == 1); - if (!parse_registration(env, options, &r_ctx)) - { - return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); - } - if (r_ctx) - { - enif_keep_resource(r_ctx); - registration = r_ctx->Registration; - } - else - { - if (G_r_ctx) - { - registration = G_r_ctx->Registration; - } - else - { - return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); - } - } + r_ctx = G_r_ctx; } QuicerConnCTX *c_ctx = init_c_ctx(); - c_ctx->r_ctx = r_ctx; if (!c_ctx) { @@ -587,10 +576,28 @@ open_connectionX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) goto exit; } - if (QUIC_FAILED(Status = MsQuic->ConnectionOpen(registration, - ClientConnectionCallback, - c_ctx, - &(c_ctx->Connection)))) + // It is safe to use r_ctx here since + // a) it is passed as argument which beam still has reference to + // b) G_r_ctx is only destroyed when code is unloaded. + + enif_keep_resource(r_ctx); + c_ctx->r_ctx = r_ctx; + enif_mutex_lock(r_ctx->lock); + + if (!r_ctx->Registration) + { + res = ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); + enif_mutex_unlock(r_ctx->lock); + goto exit; + } + + Status = MsQuic->ConnectionOpen(r_ctx->Registration, + ClientConnectionCallback, + c_ctx, + &(c_ctx->Connection)); + enif_mutex_unlock(r_ctx->lock); + + if (QUIC_FAILED(Status)) { res = ERROR_TUPLE_2(ATOM_STATUS(Status)); goto exit; @@ -606,10 +613,7 @@ open_connectionX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) return SUCCESS(eHandle); exit: - if (r_ctx) - { - enif_release_resource(r_ctx); - } + enif_release_resource(r_ctx); enif_release_resource(c_ctx); return res; } @@ -629,22 +633,27 @@ async_connect3(ErlNifEnv *env, ERL_NIF_TERM res = ERROR_TUPLE_2(ATOM_ERROR_INTERNAL_ERROR); QuicerConnCTX *c_ctx = NULL; + QuicerRegistrationCTX *r_ctx = NULL; BOOLEAN is_reuse_handle = FALSE; int port = 0; char host[256] = { 0 }; HQUIC Registration = NULL; + + // Check Port if (!enif_get_int(env, eport, &port) && port > 0) { return ERROR_TUPLE_2(ATOM_BADARG); } + // Check host if (enif_get_string(env, ehost, host, 256, ERL_NIF_LATIN1) <= 0) { return ERROR_TUPLE_2(ATOM_BADARG); } + // Check option 'handle' for opened connection if (enif_get_map_value(env, eoptions, ATOM_HANDLE, &eHandle)) { // Reuse c_ctx from existing connecion handle @@ -652,22 +661,9 @@ async_connect3(ErlNifEnv *env, { assert(c_ctx->is_closed); assert(c_ctx->owner); + // r_ctx is already kept in open_connectionX + r_ctx = c_ctx->r_ctx; is_reuse_handle = TRUE; - if (c_ctx->r_ctx && c_ctx->r_ctx->Registration) - { - Registration = c_ctx->r_ctx->Registration; - } - else - { - if (G_r_ctx) - { - Registration = G_r_ctx->Registration; - } - else - { - return ERROR_TUPLE_2(ATOM_REG_FAILED); - } - } } else { @@ -677,6 +673,7 @@ async_connect3(ErlNifEnv *env, else // we create new c_ctx and set owner { assert(!c_ctx); + assert(!r_ctx); c_ctx = init_c_ctx(); // Get Reg for c_ctx, quic_registration is optional @@ -686,25 +683,8 @@ async_connect3(ErlNifEnv *env, goto Error; } - if (c_ctx->r_ctx && c_ctx->r_ctx->Registration) - { - // quic_registration is set - enif_keep_resource(c_ctx->r_ctx); - Registration = c_ctx->r_ctx->Registration; - } - else - { - // quic_registration is not set, use global registration - // msquic should reject if global registration is NULL (closed) - if (G_r_ctx && G_r_ctx->Registration) - { - Registration = G_r_ctx->Registration; - } - else - { - Registration = NULL; - } - } + r_ctx = c_ctx->r_ctx ? c_ctx->r_ctx : G_r_ctx; + enif_keep_resource(r_ctx); if ((c_ctx->owner = AcceptorAlloc()) == NULL) { @@ -720,13 +700,22 @@ async_connect3(ErlNifEnv *env, } } + assert(r_ctx); + assert(c_ctx); + + // Now we have c_ctx either + // a) passed in as handle + // b) newly allocated if (is_reuse_handle) { enif_mutex_lock(c_ctx->lock); } - + enif_mutex_lock(r_ctx->lock); + Registration = r_ctx->Registration; assert(c_ctx->owner); - // allocate config_resource for client connection + + // Allocate config_resource for client connection + // @TODO client config handle should be reused if needed. if (NULL == (c_ctx->config_resource = init_config_ctx())) { res = ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); @@ -754,7 +743,6 @@ async_connect3(ErlNifEnv *env, } // convert eoptions to Configuration - ERL_NIF_TERM estatus = ClientLoadConfiguration( env, &eoptions, Registration, &(c_ctx->config_resource->Configuration)); @@ -773,17 +761,20 @@ async_connect3(ErlNifEnv *env, { assert(c_ctx->Connection == NULL); res = ERROR_TUPLE_2(ATOM_CONN_OPEN_ERROR); - goto Error; } - - if (!IS_SAME_TERM( - ATOM_OK, (res = parse_conn_resume_ticket(env, eoptions, c_ctx)))) + else + { + assert(c_ctx->is_closed); + res = parse_conn_resume_ticket(env, eoptions, c_ctx); + // we could only lock it after resume ticket is set + enif_mutex_lock(c_ctx->lock); + } + if (!IS_SAME_TERM(ATOM_OK, res)) { goto Error; } } - assert(c_ctx->is_closed); c_ctx->is_closed = FALSE; // connection opened. // optional set sslkeylogfile @@ -810,9 +801,10 @@ async_connect3(ErlNifEnv *env, // @TODO client async_connect_3 should able to take a config_resource as // input ERL TERM so that we don't need to call ClientLoadConfiguration - // assert(!c_ctx->is_closed && c_ctx->Connection); + // c_ctx->lock should be taken to prevent parallel access from callback as + // work trigged by starting of the connection. if (QUIC_FAILED(Status = MsQuic->ConnectionStart( c_ctx->Connection, c_ctx->config_resource->Configuration, @@ -842,62 +834,42 @@ async_connect3(ErlNifEnv *env, enif_monitor_process(NULL, c_ctx, &c_ctx->owner->Pid, &c_ctx->owner_mon); eHandle = enif_make_resource(env, c_ctx); - if (is_reuse_handle) - { - enif_mutex_unlock(c_ctx->lock); - } + enif_mutex_unlock(r_ctx->lock); + enif_mutex_unlock(c_ctx->lock); return SUCCESS(eHandle); -Error: - if (c_ctx->Connection) - { // when is opened +Error:; + HQUIC Connection = c_ctx->Connection; + if (Connection) + { /* - We should not call *destroy_c_ctx* from here. - because it could cause race cond: - - MsQuic Worker: - - Connection close job will trigger ClientConnectionCallback with event: - QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE - - Beam Schedler: - release c_ctx by calling *destroy_c_ctx* will trigger - *resource_conn_dealloc_callback* - - The c_ctx could be freed (unprotectable) by beam while - ClientConnectionCallback can still access it. - - So the side effect chain will be: - - 'MsQuic->ConnectionClose' triggers 'ClientConnectionCallback' triggers - 'release c_ctx resource' triggers resource_conn_dealloc_callback' and - then 'free c_ctx'. At this point both resources in beam and MsQuic is - released. - - note 1: - - If we only call *destroy_c_ctx* triggers - *resource_conn_dealloc_callback* triggers 'MsQuic->ConnectionClose' - triggers ClientConnectionCallback here it can casue race cond. since - c_ctx has been freed by beam already after - resource_conn_dealloc_callback is finished. - - note 2: + Prevent double ConnectionClose + @NOTE: We could not call MsQuic->SetCallbackHandler to set callback to NULL becasue this function is async, not thread safe. - */ - MsQuic->ConnectionClose(c_ctx->Connection); - // prevent double ConnectionClose c_ctx->Connection = NULL; c_ctx->is_closed = TRUE; } + // Error exit, it must be closed or Handle is NULL assert(c_ctx->is_closed || NULL == c_ctx->Connection); - if (is_reuse_handle) + enif_mutex_unlock(c_ctx->lock); + enif_mutex_unlock(r_ctx->lock); + + if (Connection) { - enif_mutex_unlock(c_ctx->lock); + // @NOTE: + // It will trigger 'connection shutdown completed' callback + // thus it is important to not release the resource c_ctx for callback + // to access the c_ctx + MsQuic->ConnectionClose(Connection); + } + + if (!is_reuse_handle) + { + enif_release_resource(r_ctx); } return res; @@ -981,7 +953,9 @@ shutdown_connection3(ErlNifEnv *env, return ERROR_TUPLE_2(ATOM_BADARG); } + enif_mutex_lock(c_ctx->lock); MsQuic->ConnectionShutdown(c_ctx->Connection, flags, app_errcode); + enif_mutex_unlock(c_ctx->lock); return ATOM_OK; } @@ -991,34 +965,35 @@ sockname1(ErlNifEnv *env, __unused_parm__ int args, const ERL_NIF_TERM argv[]) void *q_ctx; HQUIC Handle = NULL; uint32_t Param; + QUIC_STATUS Status; + QUIC_ADDR addr; + uint32_t addrSize = sizeof(addr); if (enif_get_resource(env, argv[0], ctx_connection_t, &q_ctx)) { enif_mutex_lock(((QuicerConnCTX *)q_ctx)->lock); - enif_mutex_unlock(((QuicerConnCTX *)q_ctx)->lock); Handle = (((QuicerConnCTX *)q_ctx))->Connection; Param = QUIC_PARAM_CONN_LOCAL_ADDRESS; + Status = MsQuic->GetParam(Handle, Param, &addrSize, &addr); + enif_mutex_unlock(((QuicerConnCTX *)q_ctx)->lock); } else if (enif_get_resource(env, argv[0], ctx_listener_t, &q_ctx)) { + enif_mutex_lock(((QuicerListenerCTX *)q_ctx)->lock); Handle = ((QuicerListenerCTX *)q_ctx)->Listener; Param = QUIC_PARAM_LISTENER_LOCAL_ADDRESS; + Status = MsQuic->GetParam(Handle, Param, &addrSize, &addr); + enif_mutex_unlock(((QuicerListenerCTX *)q_ctx)->lock); } else { return ERROR_TUPLE_2(ATOM_BADARG); } - QUIC_STATUS status; - QUIC_ADDR addr; - uint32_t addrSize = sizeof(addr); - - if (QUIC_FAILED(status = MsQuic->GetParam(Handle, Param, &addrSize, &addr))) + if (QUIC_FAILED(Status)) { - return ERROR_TUPLE_2(ATOM_SOCKNAME_ERROR); // @TODO is this err useful? - // use ATOM_STATUS instead? + return ERROR_TUPLE_2(ATOM_STATUS(Status)); } - return SUCCESS(addr2eterm(env, &addr)); } @@ -1229,12 +1204,11 @@ handle_connection_event_shutdown_initiated_by_peer( } static QUIC_STATUS -handle_connection_event_shutdown_complete( - QuicerConnCTX *c_ctx, __unused_parm__ QUIC_CONNECTION_EVENT *Event) +handle_connection_event_shutdown_complete(QuicerConnCTX *c_ctx, + QUIC_CONNECTION_EVENT *Event) { // For Server Only assert(QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE == Event->Type); - assert(c_ctx->Connection); assert(c_ctx->acceptor_queue); ACCEPTOR *acc = NULL; ErlNifEnv *env = c_ctx->env; diff --git a/c_src/quicer_listener.c b/c_src/quicer_listener.c index 58d46496..b2630422 100644 --- a/c_src/quicer_listener.c +++ b/c_src/quicer_listener.c @@ -337,19 +337,20 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) } else { + // quic_registration is not set, use global registration target_r_ctx = G_r_ctx; + pthread_mutex_lock(&GRegLock); - // quic_registration is not set, use global registration - // msquic should reject if global registration is NULL (closed) - if (G_r_ctx) - { - Registration = G_r_ctx->Registration; - } - else + if (!G_r_ctx) { + pthread_mutex_unlock(&GRegLock); ret = ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); goto exit; } + + enif_keep_resource(G_r_ctx); + Registration = G_r_ctx->Registration; + pthread_mutex_unlock(&GRegLock); } // Now load server config diff --git a/c_src/quicer_stream.c b/c_src/quicer_stream.c index 3d59087f..cbd95dc2 100644 --- a/c_src/quicer_stream.c +++ b/c_src/quicer_stream.c @@ -333,17 +333,18 @@ async_start_stream2(ErlNifEnv *env, goto ErrorExit; } - if (QUIC_FAILED(Status = MsQuic->StreamOpen(c_ctx->Connection, - open_flag, - ClientStreamCallback, - s_ctx, - &(s_ctx->Stream)))) + enif_mutex_lock(c_ctx->lock); + Status = MsQuic->StreamOpen(c_ctx->Connection, + open_flag, + ClientStreamCallback, + s_ctx, + &(s_ctx->Stream)); + enif_mutex_unlock(c_ctx->lock); + if (QUIC_FAILED(Status)) { - res = ERROR_TUPLE_3(ATOM_STREAM_OPEN_ERROR, ATOM_STATUS(Status)); goto ErrorExit; } - // Now we have Stream handle s_ctx->eHandle = enif_make_resource(s_ctx->imm_env, s_ctx); res = enif_make_copy(env, s_ctx->eHandle); @@ -352,15 +353,19 @@ async_start_stream2(ErlNifEnv *env, // Starts the bidirectional stream. By default, the peer is not notified of // the stream being started until data is sent on the stream. // - if (QUIC_FAILED(Status = MsQuic->StreamStart(s_ctx->Stream, start_flag))) + enif_mutex_lock(s_ctx->lock); + HQUIC Stream = s_ctx->Stream; + Status = MsQuic->StreamStart(Stream, start_flag); + if (QUIC_FAILED(Status)) { - HQUIC Stream = s_ctx->Stream; - enif_mutex_lock(s_ctx->lock); s_ctx->is_closed = TRUE; + s_ctx->Stream = NULL; enif_mutex_unlock(s_ctx->lock); MsQuic->StreamClose(Stream); - return ERROR_TUPLE_3(ATOM_STREAM_START_ERROR, ATOM_STATUS(Status)); + res = ERROR_TUPLE_3(ATOM_STREAM_START_ERROR, ATOM_STATUS(Status)); + goto ErrorExit; } + enif_mutex_unlock(s_ctx->lock); // NOTE: Set is_closed to FALSE (s_ctx->is_closed = FALSE;) // must be done in the worker callback (for // QUICER_STREAM_EVENT_MASK_START_COMPLETE) to avoid race cond. @@ -736,7 +741,7 @@ recv2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) TP_NIF_3(start, (uintptr_t)s_ctx->Stream, size_req); enif_mutex_lock(s_ctx->lock); - if ( !s_ctx->Stream ) + if (!s_ctx->Stream) { res = ERROR_TUPLE_2(ATOM_CLOSED); goto Exit; From 854e92e048b769cab89fb4936837c125dce3490e Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 18 Oct 2023 12:12:38 +0200 Subject: [PATCH 03/11] chore(build): avoid dup EncodeHexBuffer --- CMakeLists.txt | 3 +++ c_src/quicer_connection.c | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ffc07bad..73b8204d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,6 +50,9 @@ endif() if (DEFINED ENV{QUIC_LOGGING_TYPE}) set(QUIC_ENABLE_LOGGING "ON") set(QUIC_LOGGING_TYPE $ENV{QUIC_LOGGING_TYPE}) + if (${QUIC_LOGGING_TYPE} STREQUAL "stdout") + add_compile_options(-DQUICER_LOGGING_STDOUT) + endif() endif() if (DEFINED ENV{QUICER_USE_LTTNG} AND DEFINED ENV{QUICER_USE_SNK}) diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index 9f393c48..ebbed7ed 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -25,7 +25,7 @@ limitations under the License. extern QuicerRegistrationCTX *G_r_ctx; -#ifdef DEBUG +#if defined(DEBUG) && !defined(QUICER_LOGGING_STDOUT) extern inline void EncodeHexBuffer(uint8_t *Buffer, uint8_t BufferLen, char *HexString); #endif From 49eff8ddf1048d78b9d3b834af6fa8f23d2339f2 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 19 Oct 2023 20:36:16 +0200 Subject: [PATCH 04/11] feat: ref count stream and conn --- c_src/quicer_config.c | 18 ++++++++----- c_src/quicer_connection.c | 42 ++++++++++++++++++++---------- c_src/quicer_ctx.c | 42 +++++++++++++++++++++++++++++- c_src/quicer_ctx.h | 10 ++++++++ c_src/quicer_listener.c | 1 + c_src/quicer_stream.c | 54 +++++++++++++++++++++++++-------------- test/quicer_SUITE.erl | 10 ++++++-- 7 files changed, 134 insertions(+), 43 deletions(-) diff --git a/c_src/quicer_config.c b/c_src/quicer_config.c index a54cb687..e6ef0af7 100644 --- a/c_src/quicer_config.c +++ b/c_src/quicer_config.c @@ -760,7 +760,7 @@ getopt3(ErlNifEnv *env, ERL_NIF_TERM elevel = argv[2]; void *q_ctx; - ERL_NIF_TERM res = ATOM_ERROR_NOT_FOUND; + ERL_NIF_TERM res = ERROR_TUPLE_2(ATOM_CLOSED); if (!enif_is_atom(env, eopt)) { @@ -781,15 +781,19 @@ getopt3(ErlNifEnv *env, } else if (enif_get_resource(env, ctx, ctx_stream_t, &q_ctx)) { - enif_mutex_lock(((QuicerStreamCTX *)q_ctx)->lock); - res = get_stream_opt(env, (QuicerStreamCTX *)q_ctx, eopt, elevel); - enif_mutex_unlock(((QuicerStreamCTX *)q_ctx)->lock); + if (get_stream_handle(q_ctx)) + { + res = get_stream_opt(env, (QuicerStreamCTX *)q_ctx, eopt, elevel); + put_stream_handle(q_ctx); + } } else if (enif_get_resource(env, ctx, ctx_connection_t, &q_ctx)) { - enif_mutex_lock(((QuicerConnCTX *)q_ctx)->lock); - res = get_connection_opt(env, (QuicerConnCTX *)q_ctx, eopt, elevel); - enif_mutex_unlock(((QuicerConnCTX *)q_ctx)->lock); + if (get_conn_handle(q_ctx)) + { + res = get_connection_opt(env, (QuicerConnCTX *)q_ctx, eopt, elevel); + put_conn_handle(q_ctx); + } } else if (enif_get_resource(env, ctx, ctx_listener_t, &q_ctx)) { diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index ebbed7ed..a06d91a1 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -384,14 +384,13 @@ _IRQL_requires_max_(DISPATCH_LEVEL) { enif_mutex_lock(c_ctx->lock); c_ctx->is_closed = TRUE; // client shutdown completed - c_ctx->Connection = NULL; c_ctx->config_resource = NULL; enif_mutex_unlock(c_ctx->lock); } if (is_destroy) { - MsQuic->ConnectionClose(Connection); + put_conn_handle(c_ctx); if (conf_ctx) { enif_release_resource(conf_ctx); @@ -505,15 +504,14 @@ ServerConnectionCallback(HQUIC Connection, if (is_destroy) { enif_mutex_lock(c_ctx->lock); - c_ctx->Connection = NULL; c_ctx->is_closed = TRUE; // server shutdown_complete c_ctx->config_resource = NULL; enif_mutex_unlock(c_ctx->lock); } - if (is_destroy) + if (is_destroy) // merge with upper ,remove if... { - MsQuic->ConnectionClose(Connection); + put_conn_handle(c_ctx); if (conf_ctx) { enif_release_resource(conf_ctx); @@ -609,6 +607,7 @@ open_connectionX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) goto exit; } + CxPlatRefInitialize(&(c_ctx->ref_count)); eHandle = enif_make_resource(env, c_ctx); return SUCCESS(eHandle); @@ -765,6 +764,7 @@ async_connect3(ErlNifEnv *env, else { assert(c_ctx->is_closed); + CxPlatRefInitialize(&c_ctx->ref_count); res = parse_conn_resume_ticket(env, eoptions, c_ctx); // we could only lock it after resume ticket is set enif_mutex_lock(c_ctx->lock); @@ -774,7 +774,6 @@ async_connect3(ErlNifEnv *env, goto Error; } } - c_ctx->is_closed = FALSE; // connection opened. // optional set sslkeylogfile @@ -803,6 +802,12 @@ async_connect3(ErlNifEnv *env, // input ERL TERM so that we don't need to call ClientLoadConfiguration assert(!c_ctx->is_closed && c_ctx->Connection); + assert(c_ctx->owner); + + // Monitor owner before start, so we don't need to race with callbacks + // after start the connection + enif_monitor_process(NULL, c_ctx, &c_ctx->owner->Pid, &c_ctx->owner_mon); + // c_ctx->lock should be taken to prevent parallel access from callback as // work trigged by starting of the connection. if (QUIC_FAILED(Status = MsQuic->ConnectionStart( @@ -830,8 +835,6 @@ async_connect3(ErlNifEnv *env, goto Error; } - assert(c_ctx->owner); - enif_monitor_process(NULL, c_ctx, &c_ctx->owner->Pid, &c_ctx->owner_mon); eHandle = enif_make_resource(env, c_ctx); enif_mutex_unlock(r_ctx->lock); @@ -953,10 +956,16 @@ shutdown_connection3(ErlNifEnv *env, return ERROR_TUPLE_2(ATOM_BADARG); } - enif_mutex_lock(c_ctx->lock); - MsQuic->ConnectionShutdown(c_ctx->Connection, flags, app_errcode); - enif_mutex_unlock(c_ctx->lock); - return ATOM_OK; + if (get_conn_handle(c_ctx)) + { + MsQuic->ConnectionShutdown(c_ctx->Connection, flags, app_errcode); + put_conn_handle(c_ctx); + return ATOM_OK; + } + else + { + return ERROR_TUPLE_2(ATOM_CLOSED); + } } ERL_NIF_TERM @@ -971,11 +980,14 @@ sockname1(ErlNifEnv *env, __unused_parm__ int args, const ERL_NIF_TERM argv[]) if (enif_get_resource(env, argv[0], ctx_connection_t, &q_ctx)) { - enif_mutex_lock(((QuicerConnCTX *)q_ctx)->lock); + if (!get_conn_handle((QuicerConnCTX *)q_ctx)) + { + return ERROR_TUPLE_2(ATOM_CLOSED); + } Handle = (((QuicerConnCTX *)q_ctx))->Connection; Param = QUIC_PARAM_CONN_LOCAL_ADDRESS; Status = MsQuic->GetParam(Handle, Param, &addrSize, &addr); - enif_mutex_unlock(((QuicerConnCTX *)q_ctx)->lock); + put_conn_handle((QuicerConnCTX *)q_ctx); } else if (enif_get_resource(env, argv[0], ctx_listener_t, &q_ctx)) { @@ -1305,7 +1317,9 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx, // @TODO Generally, we rely on outer caller to clean the env, // or we should clean the env in this function. env = s_ctx->env; + get_conn_handle(c_ctx); s_ctx->Stream = Event->PEER_STREAM_STARTED.Stream; + CxPlatRefInitialize(&(s_ctx->ref_count)); ACCEPTOR *acc = AcceptorDequeue(c_ctx->acceptor_queue); diff --git a/c_src/quicer_ctx.c b/c_src/quicer_ctx.c index 285e43a8..86b4bd6f 100644 --- a/c_src/quicer_ctx.c +++ b/c_src/quicer_ctx.c @@ -257,7 +257,6 @@ deinit_s_ctx(QuicerStreamCTX *s_ctx) void destroy_s_ctx(QuicerStreamCTX *s_ctx) { - assert(!s_ctx->Stream); enif_free_env(s_ctx->imm_env); enif_release_resource(s_ctx); } @@ -302,3 +301,44 @@ destroy_dgram_send_ctx(QuicerDgramSendCTX *dgram_send_ctx) enif_free_env(dgram_send_ctx->env); CXPLAT_FREE(dgram_send_ctx, QUICER_DGRAM_SEND_CTX); } + +inline void +put_stream_handle(QuicerStreamCTX *s_ctx) +{ + if (CxPlatRefDecrement(&s_ctx->ref_count) && s_ctx->Stream) + { + HQUIC Stream = s_ctx->Stream; + enif_mutex_lock(s_ctx->lock); + Stream = s_ctx->Stream; + s_ctx->Stream = NULL; + enif_mutex_unlock(s_ctx->lock); + MsQuic->StreamClose(Stream); + assert(s_ctx->c_ctx != NULL); + put_conn_handle(s_ctx->c_ctx); + } +} + +inline BOOLEAN +get_stream_handle(QuicerStreamCTX *s_ctx) +{ + return CxPlatRefIncrementNonZero(&s_ctx->ref_count, 1); +} + +inline void +put_conn_handle(QuicerConnCTX *c_ctx) +{ + if (CxPlatRefDecrement(&c_ctx->ref_count) && c_ctx->Connection) + { + HQUIC Connection = c_ctx->Connection; + enif_mutex_lock(c_ctx->lock); + c_ctx->Connection = NULL; + enif_mutex_unlock(c_ctx->lock); + MsQuic->ConnectionClose(Connection); + } +} + +inline BOOLEAN +get_conn_handle(QuicerConnCTX *c_ctx) +{ + return CxPlatRefIncrementNonZero(&c_ctx->ref_count, 1); +} diff --git a/c_src/quicer_ctx.h b/c_src/quicer_ctx.h index 27d4ed32..78ad8db8 100644 --- a/c_src/quicer_ctx.h +++ b/c_src/quicer_ctx.h @@ -94,6 +94,8 @@ typedef struct QuicerConnCTX QUIC_TLS_SECRETS *TlsSecrets; QUIC_BUFFER *ResumptionTicket; BOOLEAN is_closed; + // track lifetime of Connection handle + CXPLAT_REF_COUNT ref_count; uint32_t event_mask; char *ssl_keylogfile; X509 *peer_cert; @@ -123,6 +125,8 @@ typedef struct QuicerStreamCTX _CTX_CALLBACK_READ_ BOOLEAN is_wait_for_data; _CTX_CALLBACK_WRITE_ BOOLEAN is_recv_pending; BOOLEAN is_closed; + // Track lifetime of Stream handle + CXPLAT_REF_COUNT ref_count; uint32_t event_mask; void *reserved1; void *reserved2; @@ -167,4 +171,10 @@ void destroy_send_ctx(QuicerStreamSendCTX *send_ctx); QuicerDgramSendCTX *init_dgram_send_ctx(); void destroy_dgram_send_ctx(QuicerDgramSendCTX *dgram_send_ctx); +void put_stream_handle(QuicerStreamCTX *s_ctx); +BOOLEAN get_stream_handle(QuicerStreamCTX *s_ctx); + +void put_conn_handle(QuicerConnCTX *c_ctx); +BOOLEAN get_conn_handle(QuicerConnCTX *c_ctx); + #endif // __QUICER_CTX_H_ diff --git a/c_src/quicer_listener.c b/c_src/quicer_listener.c index b2630422..80432397 100644 --- a/c_src/quicer_listener.c +++ b/c_src/quicer_listener.c @@ -56,6 +56,7 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener, } c_ctx->Connection = Event->NEW_CONNECTION.Connection; + CxPlatRefInitialize(&(c_ctx->ref_count)); if (l_ctx->trusted_store) { diff --git a/c_src/quicer_stream.c b/c_src/quicer_stream.c index cbd95dc2..e85e1049 100644 --- a/c_src/quicer_stream.c +++ b/c_src/quicer_stream.c @@ -130,14 +130,13 @@ ServerStreamCallback(HQUIC Stream, void *Context, QUIC_STREAM_EVENT *Event) if (is_destroy) { s_ctx->is_closed = TRUE; - s_ctx->Stream = NULL; } enif_mutex_unlock(s_ctx->lock); if (is_destroy) { - MsQuic->StreamClose(Stream); + put_stream_handle(s_ctx); // must be called after mutex unlock destroy_s_ctx(s_ctx); } @@ -230,7 +229,6 @@ _IRQL_requires_max_(DISPATCH_LEVEL) if (is_destroy) { s_ctx->is_closed = TRUE; - s_ctx->Stream = NULL; MsQuic->SetCallbackHandler(Stream, NULL, NULL); } @@ -239,7 +237,7 @@ _IRQL_requires_max_(DISPATCH_LEVEL) if (is_destroy) { // must be called after mutex unlock, - MsQuic->StreamClose(Stream); + put_stream_handle(s_ctx); destroy_s_ctx(s_ctx); } return status; @@ -296,9 +294,12 @@ async_start_stream2(ErlNifEnv *env, // @TODO set event mask for some flags } - // - // note, s_ctx is not shared yet, thus no locking is needed. - // + if (!get_conn_handle(c_ctx)) + { + //@TODO maybe other error like conn_closed? + return ERROR_TUPLE_2(ATOM_CLOSED); + } + QuicerStreamCTX *s_ctx = init_s_ctx(); if (!s_ctx) @@ -311,7 +312,6 @@ async_start_stream2(ErlNifEnv *env, enif_keep_resource(c_ctx); s_ctx->c_ctx = c_ctx; - // Caller should be the owner of this stream. s_ctx->owner = AcceptorAlloc(); @@ -333,13 +333,13 @@ async_start_stream2(ErlNifEnv *env, goto ErrorExit; } - enif_mutex_lock(c_ctx->lock); Status = MsQuic->StreamOpen(c_ctx->Connection, open_flag, ClientStreamCallback, s_ctx, &(s_ctx->Stream)); - enif_mutex_unlock(c_ctx->lock); + CxPlatRefInitialize(&s_ctx->ref_count); + if (QUIC_FAILED(Status)) { res = ERROR_TUPLE_3(ATOM_STREAM_OPEN_ERROR, ATOM_STATUS(Status)); @@ -350,30 +350,37 @@ async_start_stream2(ErlNifEnv *env, res = enif_make_copy(env, s_ctx->eHandle); // - // Starts the bidirectional stream. By default, the peer is not notified of + // Starts the stream. By default, the peer is not notified of // the stream being started until data is sent on the stream. // - enif_mutex_lock(s_ctx->lock); + // + // We need to take a refcnt to avoid handle get closed as the StreamStart + // may trigger callback in another thread. + if (!get_stream_handle(s_ctx)) + { + res = ERROR_TUPLE_2(ATOM_CLOSED); + goto ErrorExit; + } HQUIC Stream = s_ctx->Stream; Status = MsQuic->StreamStart(Stream, start_flag); + put_stream_handle(s_ctx); + if (QUIC_FAILED(Status)) { + enif_mutex_lock(s_ctx->lock); s_ctx->is_closed = TRUE; - s_ctx->Stream = NULL; enif_mutex_unlock(s_ctx->lock); - MsQuic->StreamClose(Stream); res = ERROR_TUPLE_3(ATOM_STREAM_START_ERROR, ATOM_STATUS(Status)); goto ErrorExit; } - enif_mutex_unlock(s_ctx->lock); // NOTE: Set is_closed to FALSE (s_ctx->is_closed = FALSE;) // must be done in the worker callback (for // QUICER_STREAM_EVENT_MASK_START_COMPLETE) to avoid race cond. - // return SUCCESS(res); ErrorExit: destroy_s_ctx(s_ctx); + put_conn_handle(c_ctx); return res; } @@ -490,6 +497,10 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) // release in resource_stream_dealloc_callback enif_keep_resource(c_ctx); + if (!get_conn_handle(c_ctx)) + { + return ERROR_TUPLE_2(ATOM_CLOSED); + } s_ctx->c_ctx = c_ctx; QuicerStreamSendCTX *send_ctx = init_send_ctx(); @@ -562,7 +573,7 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) s_ctx->Stream = NULL; goto ErrorExit; } - + CxPlatRefInitialize(&(s_ctx->ref_count)); // Now we have Stream handle s_ctx->eHandle = enif_make_resource(s_ctx->imm_env, s_ctx); @@ -595,6 +606,7 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) send_ctx))) { enif_mutex_unlock(s_ctx->lock); + put_stream_handle(s_ctx); res = ERROR_TUPLE_3(ATOM_STREAM_SEND_ERROR, ATOM_STATUS(Status)); goto ErrorExit; } @@ -612,6 +624,7 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) ErrorExit: destroy_send_ctx(send_ctx); destroy_s_ctx(s_ctx); + put_conn_handle(c_ctx); // Do not close the stream here, it will be done // in resource_stream_dealloc_callback triggered by // destroy_s_ctx @@ -837,13 +850,16 @@ shutdown_stream3(ErlNifEnv *env, ret = ERROR_TUPLE_2(ATOM_BADARG); } - enif_mutex_lock(s_ctx->lock); + if (!get_stream_handle(s_ctx)) + { + return ERROR_TUPLE_2(ATOM_CLOSED); + } if (QUIC_FAILED(Status = MsQuic->StreamShutdown(s_ctx->Stream, flags, app_errcode))) { ret = ERROR_TUPLE_2(ATOM_STATUS(Status)); } - enif_mutex_unlock(s_ctx->lock); + put_stream_handle(s_ctx); return ret; } diff --git a/test/quicer_SUITE.erl b/test/quicer_SUITE.erl index 9830c0cb..a4cdfc2a 100644 --- a/test/quicer_SUITE.erl +++ b/test/quicer_SUITE.erl @@ -1032,7 +1032,7 @@ tc_get_stream_0rtt_length(Config) -> case quicer:getopt(Stm, param_stream_0rtt_length) of {ok, Val} -> ?assert(is_integer(Val)); {error, invalid_state} -> ok; - {error, invalid_parameter} -> ok + {error, closed} -> ok end, quicer:close_connection(Conn), SPid ! done, @@ -1212,6 +1212,9 @@ tc_idle_timeout(Config) -> {error, stm_open_error, invalid_state} -> %% Invalid state ok; + {error, closed} -> + %% Conn is closed + ok; {ok, _Stream} -> ok end, @@ -1679,7 +1682,8 @@ tc_strm_opt_active_badarg(Config) -> ct:pal("Listener Options: ~p", [Options]), {ok, _QuicApp} = quicer:spawn_listener(mqtt, Port, Options), {ok, Conn} = quicer:connect("localhost", Port, default_conn_opts(), 5000), - {error, badarg} = quicer:start_stream(Conn, [{active, twice}]). + {error, badarg} = quicer:start_stream(Conn, [{active, twice}]), + ok. tc_get_conn_rid(Config) -> Port = select_port(), @@ -1861,6 +1865,7 @@ tc_stream_start_flag_shutdown_on_fail(Config) -> {error, stm_send_error, invalid_state} -> ok %% already closed end, receive + %% THEN we should recv event `start_completed' with status: `stream_limit_reached' {quic, start_completed, Stm, #{status := stream_limit_reached, stream_id := StreamID}} -> ct:pal("Stream ~p limit reached", [StreamID]); @@ -1881,6 +1886,7 @@ tc_stream_start_flag_shutdown_on_fail(Config) -> Other -> ct:fail("Unexpected event ~p after stream start complete", [Other]) end, + ?assertEqual({error, closed}, quicer:getopt(Stm, param_configuration_settings, quic_configuration)), ?assert(is_integer(Rid)). tc_stream_start_flag_indicate_peer_accept_1(Config) -> From 3b034262b7968c3d1c1c9e5a21fcc2a59c2e0a3f Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 19 Oct 2023 21:44:04 +0200 Subject: [PATCH 05/11] feat: listener refcnt --- c_src/quicer_config.c | 43 ++++++++++++++++++++++++++++++--------- c_src/quicer_connection.c | 6 +++--- c_src/quicer_ctx.c | 19 +++++++++++++++++ c_src/quicer_ctx.h | 5 +++++ c_src/quicer_listener.c | 17 ++++++---------- 5 files changed, 66 insertions(+), 24 deletions(-) diff --git a/c_src/quicer_config.c b/c_src/quicer_config.c index e6ef0af7..a7189633 100644 --- a/c_src/quicer_config.c +++ b/c_src/quicer_config.c @@ -760,7 +760,7 @@ getopt3(ErlNifEnv *env, ERL_NIF_TERM elevel = argv[2]; void *q_ctx; - ERL_NIF_TERM res = ERROR_TUPLE_2(ATOM_CLOSED); + ERL_NIF_TERM res = ATOM_ERROR_NOT_FOUND; if (!enif_is_atom(env, eopt)) { @@ -781,32 +781,38 @@ getopt3(ErlNifEnv *env, } else if (enif_get_resource(env, ctx, ctx_stream_t, &q_ctx)) { - if (get_stream_handle(q_ctx)) + if (!get_stream_handle(q_ctx)) { - res = get_stream_opt(env, (QuicerStreamCTX *)q_ctx, eopt, elevel); - put_stream_handle(q_ctx); + goto Exit; } + res = get_stream_opt(env, (QuicerStreamCTX *)q_ctx, eopt, elevel); + put_stream_handle(q_ctx); } else if (enif_get_resource(env, ctx, ctx_connection_t, &q_ctx)) { - if (get_conn_handle(q_ctx)) + if (!get_conn_handle(q_ctx)) { - res = get_connection_opt(env, (QuicerConnCTX *)q_ctx, eopt, elevel); - put_conn_handle(q_ctx); + goto Exit; } + res = get_connection_opt(env, (QuicerConnCTX *)q_ctx, eopt, elevel); + put_conn_handle(q_ctx); } else if (enif_get_resource(env, ctx, ctx_listener_t, &q_ctx)) { - enif_mutex_lock(((QuicerListenerCTX *)q_ctx)->lock); + if (!get_listener_handle(q_ctx)) + { + goto Exit; + } res = get_listener_opt(env, (QuicerListenerCTX *)q_ctx, eopt, elevel); - enif_mutex_unlock(((QuicerListenerCTX *)q_ctx)->lock); + put_listener_handle(q_ctx); } else { //@todo support GLOBAL, REGISTRATION and CONFIGURATION return ERROR_TUPLE_2(ATOM_BADARG); } - return res; +Exit: + return ERROR_TUPLE_2(ATOM_CLOSED); } ERL_NIF_TERM @@ -873,18 +879,33 @@ setopt4(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) } else if (enif_get_resource(env, ctx, ctx_stream_t, &q_ctx)) { + if (!get_stream_handle(q_ctx)) + { + goto Exit; + } res = set_stream_opt( env, (QuicerStreamCTX *)q_ctx, eopt, evalue, elevel); + put_stream_handle(q_ctx); } else if (enif_get_resource(env, ctx, ctx_connection_t, &q_ctx)) { + if (!get_conn_handle(q_ctx)) + { + goto Exit; + } res = set_connection_opt( env, (QuicerConnCTX *)q_ctx, eopt, evalue, elevel); + put_conn_handle(q_ctx); } else if (enif_get_resource(env, ctx, ctx_listener_t, &q_ctx)) { + if (!get_listener_handle(q_ctx)) + { + goto Exit; + } res = set_listener_opt( env, (QuicerListenerCTX *)q_ctx, eopt, evalue, elevel); + put_listener_handle(q_ctx); } else { //@todo support GLOBAL, REGISTRATION and CONFIGURATION @@ -892,6 +913,8 @@ setopt4(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) } return res; +Exit: + return ERROR_TUPLE_2(ATOM_CLOSED); } bool diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index a06d91a1..e726f6db 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -981,9 +981,9 @@ sockname1(ErlNifEnv *env, __unused_parm__ int args, const ERL_NIF_TERM argv[]) if (enif_get_resource(env, argv[0], ctx_connection_t, &q_ctx)) { if (!get_conn_handle((QuicerConnCTX *)q_ctx)) - { - return ERROR_TUPLE_2(ATOM_CLOSED); - } + { + return ERROR_TUPLE_2(ATOM_CLOSED); + } Handle = (((QuicerConnCTX *)q_ctx))->Connection; Param = QUIC_PARAM_CONN_LOCAL_ADDRESS; Status = MsQuic->GetParam(Handle, Param, &addrSize, &addr); diff --git a/c_src/quicer_ctx.c b/c_src/quicer_ctx.c index 86b4bd6f..d4e45921 100644 --- a/c_src/quicer_ctx.c +++ b/c_src/quicer_ctx.c @@ -342,3 +342,22 @@ get_conn_handle(QuicerConnCTX *c_ctx) { return CxPlatRefIncrementNonZero(&c_ctx->ref_count, 1); } + +inline void +put_listener_handle(QuicerListenerCTX *l_ctx) +{ + if (CxPlatRefDecrement(&l_ctx->ref_count) && l_ctx->Listener) + { + HQUIC Listener = l_ctx->Listener; + enif_mutex_lock(l_ctx->lock); + l_ctx->Listener = NULL; + enif_mutex_unlock(l_ctx->lock); + MsQuic->ListenerClose(Listener); + } +} + +inline BOOLEAN +get_listener_handle(QuicerListenerCTX *l_ctx) +{ + return CxPlatRefIncrementNonZero(&l_ctx->ref_count, 1); +} diff --git a/c_src/quicer_ctx.h b/c_src/quicer_ctx.h index 78ad8db8..5a914116 100644 --- a/c_src/quicer_ctx.h +++ b/c_src/quicer_ctx.h @@ -56,6 +56,8 @@ typedef struct QuicerListenerCTX QuicerConfigCTX *config_resource; QuicerRegistrationCTX *r_ctx; HQUIC Listener; + // track lifetime of Connection handle + CXPLAT_REF_COUNT ref_count; QUICER_ACCEPTOR_QUEUE *acceptor_queue; ErlNifPid listenerPid; ErlNifMonitor owner_mon; @@ -177,4 +179,7 @@ BOOLEAN get_stream_handle(QuicerStreamCTX *s_ctx); void put_conn_handle(QuicerConnCTX *c_ctx); BOOLEAN get_conn_handle(QuicerConnCTX *c_ctx); +void put_listener_handle(QuicerListenerCTX *l_ctx); +BOOLEAN get_listener_handle(QuicerListenerCTX *l_ctx); + #endif // __QUICER_CTX_H_ diff --git a/c_src/quicer_listener.c b/c_src/quicer_listener.c index 80432397..f5b66802 100644 --- a/c_src/quicer_listener.c +++ b/c_src/quicer_listener.c @@ -294,6 +294,7 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) free_certificate(&CredConfig); return ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); } + CxPlatRefInitialize(&l_ctx->ref_count); if (is_verify && cacertfile) { @@ -490,20 +491,14 @@ stop_listener1(ErlNifEnv *env, { return ERROR_TUPLE_2(ATOM_BADARG); } - enif_mutex_lock(l_ctx->lock); - if (!l_ctx->Listener) + if (!get_listener_handle(l_ctx)) { ret = ERROR_TUPLE_2(ATOM_CLOSED); - goto exit; + return ret; // follow otp behaviour? } - else if (!l_ctx->is_stopped) - { - l_ctx->is_stopped = TRUE; - // void return - MsQuic->ListenerStop(l_ctx->Listener); - } -exit: - enif_mutex_unlock(l_ctx->lock); + l_ctx->is_stopped = TRUE; + MsQuic->ListenerStop(l_ctx->Listener); + put_listener_handle(l_ctx); return ret; } From d4e14ab5b5628b357d7f0510ebeb8943bd5d0bc5 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 20 Oct 2023 16:09:19 +0200 Subject: [PATCH 06/11] feat: thread safe r_ctx --- c_src/quicer_connection.c | 5 +++++ c_src/quicer_ctx.c | 13 +++++++++++++ c_src/quicer_ctx.h | 5 +++++ c_src/quicer_listener.c | 10 ++++++++++ c_src/quicer_reg.c | 11 +++++++---- 5 files changed, 40 insertions(+), 4 deletions(-) diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index e726f6db..2e831bbb 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -554,6 +554,11 @@ open_connectionX(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) r_ctx = G_r_ctx; } + if (!get_reg_handle(r_ctx)) + { + return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); + } + QuicerConnCTX *c_ctx = init_c_ctx(); if (!c_ctx) diff --git a/c_src/quicer_ctx.c b/c_src/quicer_ctx.c index d4e45921..e1ecdd0c 100644 --- a/c_src/quicer_ctx.c +++ b/c_src/quicer_ctx.c @@ -110,6 +110,7 @@ destroy_l_ctx(QuicerListenerCTX *l_ctx) if (r_ctx) { + put_reg_handle(r_ctx); enif_mutex_lock(r_ctx->lock); CxPlatListEntryRemove(&l_ctx->RegistrationLink); enif_mutex_unlock(r_ctx->lock); @@ -361,3 +362,15 @@ get_listener_handle(QuicerListenerCTX *l_ctx) { return CxPlatRefIncrementNonZero(&l_ctx->ref_count, 1); } + +inline void +put_reg_handle(QuicerRegistrationCTX *r_ctx) +{ + CxPlatRefDecrement(&r_ctx->ref_count); +} + +inline BOOLEAN +get_reg_handle(QuicerRegistrationCTX *r_ctx) +{ + return CxPlatRefIncrementNonZero(&r_ctx->ref_count, 1); +} diff --git a/c_src/quicer_ctx.h b/c_src/quicer_ctx.h index 5a914116..080c7bb2 100644 --- a/c_src/quicer_ctx.h +++ b/c_src/quicer_ctx.h @@ -34,6 +34,8 @@ typedef struct QuicerRegistrationCTX { ErlNifEnv *env; HQUIC Registration; + // Tracking lifetime of Registration handle + CXPLAT_REF_COUNT ref_count; BOOLEAN is_released; char name[UINT8_MAX + 1]; ErlNifMutex *lock; @@ -182,4 +184,7 @@ BOOLEAN get_conn_handle(QuicerConnCTX *c_ctx); void put_listener_handle(QuicerListenerCTX *l_ctx); BOOLEAN get_listener_handle(QuicerListenerCTX *l_ctx); +void put_reg_handle(QuicerRegistrationCTX *r_ctx); +BOOLEAN get_reg_handle(QuicerRegistrationCTX *r_ctx); + #endif // __QUICER_CTX_H_ diff --git a/c_src/quicer_listener.c b/c_src/quicer_listener.c index f5b66802..7234dfc5 100644 --- a/c_src/quicer_listener.c +++ b/c_src/quicer_listener.c @@ -334,6 +334,11 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) { // quic_registration is set enif_keep_resource(l_ctx->r_ctx); + if (!get_reg_handle(l_ctx->r_ctx)) + { + ret = ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); + goto exit; + } Registration = l_ctx->r_ctx->Registration; target_r_ctx = l_ctx->r_ctx; } @@ -351,6 +356,11 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) } enif_keep_resource(G_r_ctx); + if (!get_reg_handle(G_r_ctx)) + { + ret = ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); + goto exit; + } Registration = G_r_ctx->Registration; pthread_mutex_unlock(&GRegLock); } diff --git a/c_src/quicer_reg.c b/c_src/quicer_reg.c index 7377edda..9b87cc6f 100644 --- a/c_src/quicer_reg.c +++ b/c_src/quicer_reg.c @@ -23,7 +23,7 @@ QuicerRegistrationCTX *G_r_ctx = NULL; pthread_mutex_t GRegLock = PTHREAD_MUTEX_INITIALIZER; /* -** For global registration only +** Open global registration. */ ERL_NIF_TERM registration(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) @@ -64,7 +64,7 @@ registration(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) res = ERROR_TUPLE_2(ATOM_STATUS(status)); goto exit; } - + CxPlatRefInitialize(&r_ctx->ref_count); G_r_ctx = r_ctx; pthread_mutex_unlock(&GRegLock); @@ -147,7 +147,7 @@ new_registration2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) res = ERROR_TUPLE_2(ATOM_STATUS(status)); goto exit; } - + CxPlatRefInitialize(&r_ctx->ref_count); return SUCCESS(enif_make_resource(env, r_ctx)); exit: @@ -231,7 +231,10 @@ get_registration_name1(ErlNifEnv *env, return ERROR_TUPLE_2(ATOM_BADARG); } - return SUCCESS(enif_make_string(env, r_ctx->name, ERL_NIF_LATIN1)); + enif_mutex_lock(r_ctx->lock); + ERL_NIF_TERM name = enif_make_string(env, r_ctx->name, ERL_NIF_LATIN1); + enif_mutex_unlock(r_ctx->lock); + return SUCCESS(name); } BOOLEAN From 8800c6b6759cd524cc7c0329303726e0d4a1dde8 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 20 Oct 2023 19:46:06 +0200 Subject: [PATCH 07/11] fix: refcnt in resource down callbacks --- c_src/quicer_connection.c | 1 + c_src/quicer_listener.c | 3 +++ c_src/quicer_nif.c | 16 +++++++++++----- c_src/quicer_tls.c | 2 ++ 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index 2e831bbb..cbe6f223 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -744,6 +744,7 @@ async_connect3(ErlNifEnv *env, goto Error; } free(cacertfile); + cacertfile = NULL; } // convert eoptions to Configuration diff --git a/c_src/quicer_listener.c b/c_src/quicer_listener.c index 7234dfc5..baa6151f 100644 --- a/c_src/quicer_listener.c +++ b/c_src/quicer_listener.c @@ -281,6 +281,7 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) { // TLS opt error not file content error free(cacertfile); + cacertfile = NULL; free_certificate(&CredConfig); return ERROR_TUPLE_2(ATOM_CACERTFILE); } @@ -291,6 +292,7 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) if (!l_ctx) { free(cacertfile); + cacertfile = NULL; free_certificate(&CredConfig); return ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); } @@ -314,6 +316,7 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) else { // since we don't use cacertfile, free it free(cacertfile); + cacertfile = NULL; } // Set owner for l_ctx diff --git a/c_src/quicer_nif.c b/c_src/quicer_nif.c index 60876849..4cdb0696 100644 --- a/c_src/quicer_nif.c +++ b/c_src/quicer_nif.c @@ -800,7 +800,8 @@ resource_listener_down_callback(__unused_parm__ ErlNifEnv *env, TP_CB_3(start, (uintptr_t)l_ctx->Listener, 0); // Hold lock for the race of ListenerClose and ListenerStop call enif_mutex_lock(l_ctx->lock); - if (!l_ctx->is_closed && !l_ctx->is_stopped && l_ctx->Listener) + if (!l_ctx->is_closed && !l_ctx->is_stopped && l_ctx->Listener + && get_listener_handle(l_ctx)) { l_ctx->is_stopped = TRUE; /* @@ -814,6 +815,7 @@ resource_listener_down_callback(__unused_parm__ ErlNifEnv *env, // Listener term get GC. */ MsQuic->ListenerStop(l_ctx->Listener); + put_listener_handle(l_ctx); } enif_mutex_unlock(l_ctx->lock); TP_CB_3(end, (uintptr_t)l_ctx->Listener, 0); @@ -886,16 +888,18 @@ resource_conn_down_callback(__unused_parm__ ErlNifEnv *env, __unused_parm__ ErlNifMonitor *mon) { QuicerConnCTX *c_ctx = ctx; + enif_mutex_lock(c_ctx->lock); if (c_ctx && c_ctx->owner && DeadPid - && !enif_compare_pids(&c_ctx->owner->Pid, DeadPid)) + && !enif_compare_pids(&c_ctx->owner->Pid, DeadPid) + && get_conn_handle(c_ctx)) { TP_CB_3(start, (uintptr_t)c_ctx->Connection, (uintptr_t)ctx); - enif_mutex_lock(c_ctx->lock); MsQuic->ConnectionShutdown( c_ctx->Connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0); - enif_mutex_unlock(c_ctx->lock); + put_conn_handle(c_ctx); TP_CB_3(end, (uintptr_t)c_ctx->Connection, (uintptr_t)ctx); } + enif_mutex_unlock(c_ctx->lock); } void @@ -927,7 +931,8 @@ resource_stream_down_callback(__unused_parm__ ErlNifEnv *env, enif_mutex_lock(s_ctx->lock); if (s_ctx && s_ctx->owner && DeadPid - && !enif_compare_pids(&s_ctx->owner->Pid, DeadPid)) + && !enif_compare_pids(&s_ctx->owner->Pid, DeadPid) + && get_stream_handle(s_ctx)) { TP_CB_3(start, (uintptr_t)s_ctx->Stream, 0); if (QUIC_FAILED(status = MsQuic->StreamShutdown( @@ -943,6 +948,7 @@ resource_stream_down_callback(__unused_parm__ ErlNifEnv *env, { TP_CB_3(shutdown_success, (uintptr_t)s_ctx->Stream, status); } + put_stream_handle(s_ctx); } enif_mutex_unlock(s_ctx->lock); } diff --git a/c_src/quicer_tls.c b/c_src/quicer_tls.c index e2cc6170..6aba04e3 100644 --- a/c_src/quicer_tls.c +++ b/c_src/quicer_tls.c @@ -214,6 +214,7 @@ free_certificate(QUIC_CREDENTIAL_CONFIG *cc) free((char *)cc->CertificateFile->CertificateFile); free((char *)cc->CertificateFile->PrivateKeyFile); CxPlatFree(cc->CertificateFile, QUICER_CERTIFICATE_FILE); + cc->CertificateFile = NULL; } else if (QUIC_CREDENTIAL_TYPE_CERTIFICATE_FILE_PROTECTED == cc->Type) { @@ -222,6 +223,7 @@ free_certificate(QUIC_CREDENTIAL_CONFIG *cc) free((char *)cc->CertificateFileProtected->PrivateKeyPassword); CxPlatFree(cc->CertificateFileProtected, QUICER_CERTIFICATE_FILE_PROTECTED); + cc->CertificateFileProtected = NULL; } } From 312ece399ecfc3e98222e5a53ac9661bef22f9e9 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 20 Oct 2023 16:09:39 +0200 Subject: [PATCH 08/11] test: fix flaky test --- test/quicer_SUITE.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/quicer_SUITE.erl b/test/quicer_SUITE.erl index a4cdfc2a..d6b8fd97 100644 --- a/test/quicer_SUITE.erl +++ b/test/quicer_SUITE.erl @@ -240,7 +240,7 @@ end_per_testcase(tc_lib_registration_neg, _Config) -> end_per_testcase(_TestCase, _Config) -> quicer:terminate_listener(mqtt), quicer_test_lib:report_unhandled_messages(), - quicer_test_lib:report_active_connections(fun ct:pal/2), + quicer_test_lib:report_active_connections(fun ct:comment/2), ct:pal("Counters ~p", [quicer:perf_counters()]), ok. @@ -1886,7 +1886,10 @@ tc_stream_start_flag_shutdown_on_fail(Config) -> Other -> ct:fail("Unexpected event ~p after stream start complete", [Other]) end, - ?assertEqual({error, closed}, quicer:getopt(Stm, param_configuration_settings, quic_configuration)), + {error, closed} = snabbkaffe:retry(100, 10, + fun() -> + {error, closed} = quicer:getopt(Stm, param_configuration_settings, quic_configuration) + end), ?assert(is_integer(Rid)). tc_stream_start_flag_indicate_peer_accept_1(Config) -> From b0010032d35c7300ee5564dcd5e5573bff1cd4a4 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 20 Oct 2023 20:03:57 +0200 Subject: [PATCH 09/11] feat: remove unnecessary locks in dealloc callback --- c_src/quicer_connection.c | 42 +++++++++++++++++++++++++-------------- c_src/quicer_ctx.c | 10 ++++------ c_src/quicer_listener.c | 3 ++- c_src/quicer_nif.c | 3 +-- c_src/quicer_reg.c | 2 ++ c_src/quicer_stream.c | 10 +++++----- 6 files changed, 41 insertions(+), 29 deletions(-) diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index cbe6f223..38ad8ae9 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -660,7 +660,7 @@ async_connect3(ErlNifEnv *env, // Check option 'handle' for opened connection if (enif_get_map_value(env, eoptions, ATOM_HANDLE, &eHandle)) { - // Reuse c_ctx from existing connecion handle + /* Reuse c_ctx from existing connecion handle */ if (enif_get_resource(env, eHandle, ctx_connection_t, (void **)&c_ctx)) { assert(c_ctx->is_closed); @@ -668,40 +668,53 @@ async_connect3(ErlNifEnv *env, // r_ctx is already kept in open_connectionX r_ctx = c_ctx->r_ctx; is_reuse_handle = TRUE; + if (!get_reg_handle(r_ctx)) + { + return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); + } } else { return ERROR_TUPLE_2(ATOM_PARAM_ERROR); } } - else // we create new c_ctx and set owner + else { + /* Alloc new c_ctx */ + assert(!is_reuse_handle); assert(!c_ctx); assert(!r_ctx); + c_ctx = init_c_ctx(); // Get Reg for c_ctx, quic_registration is optional if (!parse_registration(env, eoptions, &c_ctx->r_ctx)) { - res = ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); - goto Error; + enif_release_resource(c_ctx); + return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); } r_ctx = c_ctx->r_ctx ? c_ctx->r_ctx : G_r_ctx; - enif_keep_resource(r_ctx); if ((c_ctx->owner = AcceptorAlloc()) == NULL) { - res = ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); - goto Error; + enif_release_resource(c_ctx); + return ERROR_TUPLE_2(ATOM_ERROR_NOT_ENOUGH_MEMORY); } // set owner if (!enif_self(env, &(c_ctx->owner->Pid))) { - res = ERROR_TUPLE_2(ATOM_BAD_PID); - goto Error; + enif_release_resource(c_ctx); + return ERROR_TUPLE_2(ATOM_BAD_PID); + } + + if (!get_reg_handle(r_ctx)) + { + enif_release_resource(c_ctx); + return ERROR_TUPLE_2(ATOM_QUIC_REGISTRATION); } + enif_keep_resource(r_ctx); } assert(r_ctx); @@ -714,7 +727,7 @@ async_connect3(ErlNifEnv *env, { enif_mutex_lock(c_ctx->lock); } - enif_mutex_lock(r_ctx->lock); + Registration = r_ctx->Registration; assert(c_ctx->owner); @@ -843,11 +856,11 @@ async_connect3(ErlNifEnv *env, eHandle = enif_make_resource(env, c_ctx); - enif_mutex_unlock(r_ctx->lock); enif_mutex_unlock(c_ctx->lock); return SUCCESS(eHandle); -Error:; +Error: + put_reg_handle(r_ctx); HQUIC Connection = c_ctx->Connection; if (Connection) { @@ -863,9 +876,7 @@ Error:; // Error exit, it must be closed or Handle is NULL assert(c_ctx->is_closed || NULL == c_ctx->Connection); - enif_mutex_unlock(c_ctx->lock); - enif_mutex_unlock(r_ctx->lock); if (Connection) { @@ -880,7 +891,6 @@ Error:; { enif_release_resource(r_ctx); } - return res; } @@ -1142,7 +1152,9 @@ handle_connection_event_connected(QuicerConnCTX *c_ctx, // A monitor is automatically removed when it triggers or when the // resource is deallocated. + enif_mutex_lock(c_ctx->lock); enif_monitor_process(NULL, c_ctx, acc_pid, &c_ctx->owner_mon); + enif_mutex_unlock(c_ctx->lock); ERL_NIF_TERM ConnHandle = enif_make_resource(c_ctx->env, c_ctx); diff --git a/c_src/quicer_ctx.c b/c_src/quicer_ctx.c index e1ecdd0c..55246ee3 100644 --- a/c_src/quicer_ctx.c +++ b/c_src/quicer_ctx.c @@ -309,10 +309,10 @@ put_stream_handle(QuicerStreamCTX *s_ctx) if (CxPlatRefDecrement(&s_ctx->ref_count) && s_ctx->Stream) { HQUIC Stream = s_ctx->Stream; - enif_mutex_lock(s_ctx->lock); Stream = s_ctx->Stream; s_ctx->Stream = NULL; - enif_mutex_unlock(s_ctx->lock); + s_ctx->is_closed = TRUE; + MsQuic->SetCallbackHandler(Stream, NULL, NULL); MsQuic->StreamClose(Stream); assert(s_ctx->c_ctx != NULL); put_conn_handle(s_ctx->c_ctx); @@ -331,9 +331,9 @@ put_conn_handle(QuicerConnCTX *c_ctx) if (CxPlatRefDecrement(&c_ctx->ref_count) && c_ctx->Connection) { HQUIC Connection = c_ctx->Connection; - enif_mutex_lock(c_ctx->lock); c_ctx->Connection = NULL; - enif_mutex_unlock(c_ctx->lock); + c_ctx->is_closed = TRUE; + MsQuic->SetCallbackHandler(Connection, NULL, NULL); MsQuic->ConnectionClose(Connection); } } @@ -350,9 +350,7 @@ put_listener_handle(QuicerListenerCTX *l_ctx) if (CxPlatRefDecrement(&l_ctx->ref_count) && l_ctx->Listener) { HQUIC Listener = l_ctx->Listener; - enif_mutex_lock(l_ctx->lock); l_ctx->Listener = NULL; - enif_mutex_unlock(l_ctx->lock); MsQuic->ListenerClose(Listener); } } diff --git a/c_src/quicer_listener.c b/c_src/quicer_listener.c index baa6151f..e472d014 100644 --- a/c_src/quicer_listener.c +++ b/c_src/quicer_listener.c @@ -436,8 +436,9 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[]) l_ctx->Listener, alpn_buffers, alpn_buffer_length, &Address))) { TP_NIF_3(start_fail, (uintptr_t)(l_ctx->Listener), Status); - MsQuic->ListenerClose(l_ctx->Listener); + HQUIC Listener = l_ctx->Listener; l_ctx->Listener = NULL; + MsQuic->ListenerClose(Listener); ret = ERROR_TUPLE_3(ATOM_LISTENER_START_ERROR, ATOM_STATUS(Status)); goto exit; } diff --git a/c_src/quicer_nif.c b/c_src/quicer_nif.c index 4cdb0696..be864f0c 100644 --- a/c_src/quicer_nif.c +++ b/c_src/quicer_nif.c @@ -828,7 +828,6 @@ void resource_listener_dealloc_callback(__unused_parm__ ErlNifEnv *env, void *obj) { QuicerListenerCTX *l_ctx = (QuicerListenerCTX *)obj; - TP_CB_3(start, (uintptr_t)l_ctx->Listener, 0); // Unlike other resources, it is safe to close listener here @@ -908,7 +907,7 @@ resource_stream_dealloc_callback(__unused_parm__ ErlNifEnv *env, void *obj) QuicerStreamCTX *s_ctx = (QuicerStreamCTX *)obj; TP_CB_3(start, (uintptr_t)s_ctx->Stream, s_ctx->is_closed); assert(s_ctx->is_closed == TRUE); - if (s_ctx->Stream) + if (s_ctx->Stream && !s_ctx->is_closed) { MsQuic->StreamClose(s_ctx->Stream); } diff --git a/c_src/quicer_reg.c b/c_src/quicer_reg.c index 9b87cc6f..905fb952 100644 --- a/c_src/quicer_reg.c +++ b/c_src/quicer_reg.c @@ -61,6 +61,7 @@ registration(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) if (QUIC_FAILED( status = MsQuic->RegistrationOpen(&RegConfig, &r_ctx->Registration))) { + enif_release_resource(r_ctx); res = ERROR_TUPLE_2(ATOM_STATUS(status)); goto exit; } @@ -216,6 +217,7 @@ close_registration(ErlNifEnv *env, r_ctx->Registration = NULL; enif_mutex_unlock(r_ctx->lock); MsQuic->RegistrationClose(Registration); + destroy_r_ctx(r_ctx); return ATOM_OK; } diff --git a/c_src/quicer_stream.c b/c_src/quicer_stream.c index e85e1049..39c4b985 100644 --- a/c_src/quicer_stream.c +++ b/c_src/quicer_stream.c @@ -279,7 +279,6 @@ async_start_stream2(ErlNifEnv *env, // if set must be valid. return ERROR_TUPLE_2(ATOM_BADARG); } - // @TODO set event mask for some flags } // optional start_flag, @@ -488,6 +487,11 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) return ERROR_TUPLE_2(ATOM_BADARG); } + if (!get_conn_handle(c_ctx)) + { + return ERROR_TUPLE_2(ATOM_CLOSED); + } + // Allocate ctxs QuicerStreamCTX *s_ctx = init_s_ctx(); if (!s_ctx) @@ -497,10 +501,6 @@ csend4(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) // release in resource_stream_dealloc_callback enif_keep_resource(c_ctx); - if (!get_conn_handle(c_ctx)) - { - return ERROR_TUPLE_2(ATOM_CLOSED); - } s_ctx->c_ctx = c_ctx; QuicerStreamSendCTX *send_ctx = init_send_ctx(); From aa5ea7de200cd7b3906a456ed8a4a471d0a12730 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 20 Oct 2023 21:02:01 +0200 Subject: [PATCH 10/11] chore(server-callback): put handle when rej conn --- c_src/quicer_listener.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/c_src/quicer_listener.c b/c_src/quicer_listener.c index e472d014..91c975e0 100644 --- a/c_src/quicer_listener.c +++ b/c_src/quicer_listener.c @@ -76,13 +76,14 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener, TP_CB_3(no_acceptor, (uintptr_t)c_ctx->Connection, 0); Status = QUIC_STATUS_UNREACHABLE; // We are going to reject the connection, - // we will not be the owner this connection + // we will not be the owner of this connection // msquic will close the Connection Handle internally. // Set it to NULL to avoid close it in resource_conn_dealloc_callback + // or in the put_conn_handle. c_ctx->Connection = NULL; - + put_conn_handle(c_ctx); // However, we still need to free the c_ctx - // note, we don't hold the lock of c_ctx since it is new conn. + // @NOTE: we don't hold the lock of c_ctx since it is new conn. enif_release_resource(c_ctx); goto Error; } From 97979b531317c5f9e96123e09e3457d39ccbc693 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 23 Oct 2023 09:31:13 +0200 Subject: [PATCH 11/11] chore: clean code --- c_src/quicer_connection.c | 7 ------- 1 file changed, 7 deletions(-) diff --git a/c_src/quicer_connection.c b/c_src/quicer_connection.c index 38ad8ae9..e9cec959 100644 --- a/c_src/quicer_connection.c +++ b/c_src/quicer_connection.c @@ -386,10 +386,7 @@ _IRQL_requires_max_(DISPATCH_LEVEL) c_ctx->is_closed = TRUE; // client shutdown completed c_ctx->config_resource = NULL; enif_mutex_unlock(c_ctx->lock); - } - if (is_destroy) - { put_conn_handle(c_ctx); if (conf_ctx) { @@ -507,10 +504,7 @@ ServerConnectionCallback(HQUIC Connection, c_ctx->is_closed = TRUE; // server shutdown_complete c_ctx->config_resource = NULL; enif_mutex_unlock(c_ctx->lock); - } - if (is_destroy) // merge with upper ,remove if... - { put_conn_handle(c_ctx); if (conf_ctx) { @@ -518,7 +512,6 @@ ServerConnectionCallback(HQUIC Connection, } destroy_c_ctx(c_ctx); } - return status; }