diff --git a/proxy/http/Http1ClientSession.cc b/proxy/http/Http1ClientSession.cc index e7e88096b41..1e5601dd6ee 100644 --- a/proxy/http/Http1ClientSession.cc +++ b/proxy/http/Http1ClientSession.cc @@ -74,7 +74,6 @@ Http1ClientSession::destroy() in_destroy = true; HttpSsnDebug("[%" PRId64 "] session destroy", con_id); - ink_release_assert(!client_vc); ink_assert(read_buffer); ink_release_assert(transact_count == released_transactions); do_api_callout(TS_HTTP_SSN_CLOSE_HOOK); @@ -287,10 +286,7 @@ Http1ClientSession::do_io_close(int alerrno) HTTP_SUM_DYN_STAT(http_transactions_per_client_con, transact_count); HTTP_DECREMENT_DYN_STAT(http_current_client_connections_stat); conn_decrease = false; - if (client_vc) { - client_vc->do_io_close(); - client_vc = nullptr; - } + // the netvc will be closed in the session free } if (transact_count == released_transactions) { this->destroy(); diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index c03bd23cff4..6c559f43834 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -425,6 +425,17 @@ HttpSM::state_add_to_list(int event, void * /* data ATS_UNUSED */) t_state.api_next_action = HttpTransact::SM_ACTION_API_SM_START; do_api_callout(); + if (api_timer < 0) { + // Didn't get the hook continuation lock. Clear the read and wait for next event + if (ua_entry->read_vio) { + ua_entry->read_vio = ua_entry->vc->do_io_read(nullptr, 0, nullptr); + } + return EVENT_CONT; + } else { + if (ua_entry->read_vio) { + ua_entry->read_vio = ua_entry->vc->do_io_read(this, 0, ua_buffer_reader->mbuf); + } + } return EVENT_DONE; } @@ -1478,7 +1489,7 @@ plugins required to work with sni_routing. // Have a mutex but didn't get the lock, reschedule if (!lock.is_locked()) { api_timer = -Thread::get_hrtime_updated(); - HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::state_api_callout); + HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::state_api_callback); ink_assert(pending_action == nullptr); pending_action = mutex->thread_holding->schedule_in(this, HRTIME_MSECONDS(10)); return 0; @@ -6857,6 +6868,12 @@ HttpSM::kill_this() terminate_sm = false; t_state.api_next_action = HttpTransact::SM_ACTION_API_SM_SHUTDOWN; do_api_callout(); + if (api_timer < 0) { + // Need to hang out until we can complete the TXN_CLOSE hook + terminate_sm = false; + reentrancy_count--; + return; + } } // The reentrancy_count is still valid up to this point since // the api shutdown hook is asynchronous and double frees can diff --git a/proxy/http2/Http2ClientSession.cc b/proxy/http2/Http2ClientSession.cc index 10378c7ac75..2d3c2c0c68b 100644 --- a/proxy/http2/Http2ClientSession.cc +++ b/proxy/http2/Http2ClientSession.cc @@ -87,6 +87,11 @@ Http2ClientSession::free() this->_reenable_event = nullptr; } + // Don't clean up yet if we still have a hook outstanding + if (schedule_event) { + return; + } + if (client_vc) { client_vc->do_io_close(); client_vc = nullptr; @@ -299,19 +304,11 @@ Http2ClientSession::do_io_close(int alerrno) ink_assert(this->mutex->thread_holding == this_ethread()); send_connection_event(&this->connection_state, HTTP2_SESSION_EVENT_FINI, this); - // Don't send the SSN_CLOSE_HOOK until we got rid of all the streams - // And handled all the TXN_CLOSE_HOOK's - if (client_vc) { - // Copy aside the client address before releasing the vc - cached_client_addr.assign(client_vc->get_remote_addr()); - cached_local_addr.assign(client_vc->get_local_addr()); - client_vc->do_io_close(); - client_vc = nullptr; - } + // client_vc will be closed in Http2ClientSession::free { SCOPED_MUTEX_LOCK(lock, this->connection_state.mutex, this_ethread()); - this->connection_state.release_stream(nullptr); + this->connection_state.release_stream(); } this->clear_session_active(); diff --git a/proxy/http2/Http2ConnectionState.cc b/proxy/http2/Http2ConnectionState.cc index fed9cd211ec..c32fec9913c 100644 --- a/proxy/http2/Http2ConnectionState.cc +++ b/proxy/http2/Http2ConnectionState.cc @@ -526,7 +526,7 @@ rcv_rst_stream_frame(Http2ConnectionState &cstate, const Http2Frame &frame) Http2StreamDebug(cstate.ua_session, stream_id, "RST_STREAM: Error Code: %u", rst_stream.error_code); stream->set_rx_error_code({ProxyErrorClass::TXN, static_cast(rst_stream.error_code)}); - cstate.delete_stream(stream); + stream->initiating_close(); } return Http2Error(Http2ErrorClass::HTTP2_ERROR_CLASS_NONE); @@ -998,7 +998,6 @@ Http2ConnectionState::main_event_handler(int event, void *edata) this->fini_received = true; cleanup_streams(); SET_HANDLER(&Http2ConnectionState::state_closed); - this->release_stream(nullptr); } break; case HTTP2_SESSION_EVENT_XMIT: { @@ -1286,13 +1285,11 @@ Http2ConnectionState::cleanup_streams() if (this->tx_error_code.cls != ProxyErrorClass::NONE) { s->set_tx_error_code(this->tx_error_code); } - this->delete_stream(s); + s->initiating_close(); ink_assert(s != next); s = next; } - ink_assert(stream_list.empty()); - if (!is_state_closed()) { SCOPED_MUTEX_LOCK(lock, this->ua_session->mutex, this_ethread()); @@ -1355,16 +1352,10 @@ Http2ConnectionState::delete_stream(Http2Stream *stream) } void -Http2ConnectionState::release_stream(Http2Stream *stream) +Http2ConnectionState::release_stream() { REMEMBER(NO_EVENT, this->recursion) - if (stream) { - // Decrement total_client_streams_count here, because it's a counter include streams in the process of shutting down. - // Other counters (client_streams_in_count/client_streams_out_count) are already decremented in delete_stream(). - --total_client_streams_count; - } - SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); if (this->ua_session) { ink_assert(this->mutex == ua_session->mutex); @@ -1460,7 +1451,7 @@ Http2ConnectionState::send_data_frames_depends_on_priority() } case Http2SendDataFrameResult::DONE: { dependency_tree->deactivate(node, len); - delete_stream(stream); + stream->initiating_close(); break; } default: @@ -1557,7 +1548,7 @@ Http2ConnectionState::send_data_frames(Http2Stream *stream) if (stream->get_state() == Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL || stream->get_state() == Http2StreamState::HTTP2_STREAM_STATE_CLOSED) { Http2StreamDebug(this->ua_session, stream->get_id(), "Shutdown half closed local stream"); - this->delete_stream(stream); + stream->initiating_close(); return; } @@ -1572,7 +1563,7 @@ Http2ConnectionState::send_data_frames(Http2Stream *stream) // RST_STREAM and WINDOW_UPDATE. // See 'closed' state written at [RFC 7540] 5.1. Http2StreamDebug(this->ua_session, stream->get_id(), "Shutdown stream"); - this->delete_stream(stream); + stream->initiating_close(); } } diff --git a/proxy/http2/Http2ConnectionState.h b/proxy/http2/Http2ConnectionState.h index f68db97e539..8a83fa77ea4 100644 --- a/proxy/http2/Http2ConnectionState.h +++ b/proxy/http2/Http2ConnectionState.h @@ -178,7 +178,7 @@ class Http2ConnectionState : public Continuation Http2Stream *find_stream(Http2StreamId id) const; void restart_streams(); bool delete_stream(Http2Stream *stream); - void release_stream(Http2Stream *stream); + void release_stream(); void cleanup_streams(); void update_initial_rwnd(Http2WindowSize new_size); @@ -230,6 +230,12 @@ class Http2ConnectionState : public Continuation return client_streams_in_count; } + void + decrement_stream_count() + { + --total_client_streams_count; + } + double get_stream_error_rate() const { diff --git a/proxy/http2/Http2Stream.cc b/proxy/http2/Http2Stream.cc index ecbec35a37d..2aa3db40634 100644 --- a/proxy/http2/Http2Stream.cc +++ b/proxy/http2/Http2Stream.cc @@ -429,7 +429,6 @@ Http2Stream::terminate_if_possible() Http2ClientSession *h2_proxy_ssn = static_cast(this->_proxy_ssn); SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread()); - h2_proxy_ssn->connection_state.delete_stream(this); destroy(); } } @@ -763,8 +762,10 @@ Http2Stream::destroy() // In many cases, this has been called earlier, so this call is a no-op h2_proxy_ssn->connection_state.delete_stream(this); + h2_proxy_ssn->connection_state.decrement_stream_count(); + // Update session's stream counts, so it accurately goes into keep-alive state - h2_proxy_ssn->connection_state.release_stream(this); + h2_proxy_ssn->connection_state.release_stream(); cid = _proxy_ssn->connection_id(); } @@ -911,7 +912,6 @@ void Http2Stream::release(IOBufferReader *r) { super::release(r); - _sm = nullptr; // State machine is on its own way down. this->do_io_close(); } diff --git a/tests/gold_tests/continuations/openclose.test.py b/tests/gold_tests/continuations/openclose.test.py index b27b018304d..b75b95ebf30 100644 --- a/tests/gold_tests/continuations/openclose.test.py +++ b/tests/gold_tests/continuations/openclose.test.py @@ -41,7 +41,7 @@ # add response to the server dictionary server.addResponse("sessionfile.log", request_header, response_header) ts.Disk.records_config.update({ - 'proxy.config.diags.debug.enabled': 1, + 'proxy.config.diags.debug.enabled': 0, 'proxy.config.diags.debug.tags': 'ssntxnorder_verify.*', 'proxy.config.http.cache.http': 0, # disable cache to simply the test. 'proxy.config.cache.enable_read_while_writer': 0 @@ -52,8 +52,8 @@ ts.Variables.port, server.Variables.Port) ) -cmd = 'curl -vs http://127.0.0.1:{0}'.format(ts.Variables.port) -numberOfRequests = 25 +cmd = 'curl -vs -H "host:oc.test" http://127.0.0.1:{0}'.format(ts.Variables.port) +numberOfRequests = 100 tr = Test.AddTestRun() # Create a bunch of curl commands to be executed in parallel. Default.Process is set in SpawnCommands. @@ -66,14 +66,14 @@ # Execution order is: ts/server, ps(curl cmds), Default Process. tr.Processes.Default.StartBefore( server, ready=When.PortOpen(server.Variables.Port)) -# Adds a delay once the ts port is ready. This is because we cannot test the ts state. -tr.Processes.Default.StartBefore(ts, ready=10) +tr.Processes.Default.StartBefore(Test.Processes.ts, ready=When.PortOpen(ts.Variables.port)) ts.StartAfter(*ps) server.StartAfter(*ps) tr.StillRunningAfter = ts # Signal that all the curl processes have completed tr = Test.AddTestRun("Curl Done") +tr.DelayStart = 2 # Delaying a couple seconds to make sure the global continuation's lock contention resolves. tr.Processes.Default.Command = "traffic_ctl plugin msg done done" tr.Processes.Default.ReturnCode = 0 tr.Processes.Default.Env = ts.Env diff --git a/tests/gold_tests/continuations/openclose_h2.test.py b/tests/gold_tests/continuations/openclose_h2.test.py index 932670cfd3b..6f07ce97a8e 100644 --- a/tests/gold_tests/continuations/openclose_h2.test.py +++ b/tests/gold_tests/continuations/openclose_h2.test.py @@ -49,7 +49,8 @@ # add response to the server dictionary server.addResponse("sessionfile.log", request_header, response_header) ts.Disk.records_config.update({ - 'proxy.config.diags.debug.enabled': 1, + 'proxy.config.http2.zombie_debug_timeout_in': 10, + 'proxy.config.diags.debug.enabled': 0, 'proxy.config.diags.debug.tags': 'ssntxnorder_verify', 'proxy.config.http.cache.http': 0, # disable cache to simply the test. 'proxy.config.cache.enable_read_while_writer': 0, @@ -58,15 +59,15 @@ }) ts.Disk.remap_config.AddLine( - 'map http://oc.test:{0} http://127.0.0.1:{1}'.format( - ts.Variables.port, server.Variables.Port) + 'map https://oc.test:{0} http://127.0.0.1:{1}'.format( + ts.Variables.ssl_port, server.Variables.Port) ) ts.Disk.ssl_multicert_config.AddLine( 'dest_ip=* ssl_cert_name=server.pem ssl_key_name=server.key' ) -cmd = 'curl -k --http2 -vs https://127.0.0.1:{0}'.format(ts.Variables.ssl_port) -numberOfRequests = 25 +cmd = 'curl -k --resolve oc.test:{0}:127.0.0.1 --http2 https://oc.test:{0}'.format(ts.Variables.ssl_port) +numberOfRequests = 100 tr = Test.AddTestRun() # Create a bunch of curl commands to be executed in parallel. Default.Process is set in SpawnCommands. @@ -79,14 +80,15 @@ # Execution order is: ts/server, ps(curl cmds), Default Process. tr.Processes.Default.StartBefore( server, ready=When.PortOpen(server.Variables.Port)) -# Adds a delay once the ts port is ready. This is because we cannot test the ts state. -tr.Processes.Default.StartBefore(ts, ready=10) +tr.Processes.Default.StartBefore(Test.Processes.ts, ready=When.PortOpen(ts.Variables.ssl_port)) +# Don't know why we need both the start before and the start after ts.StartAfter(*ps) server.StartAfter(*ps) tr.StillRunningAfter = ts # Signal that all the curl processes have completed tr = Test.AddTestRun("Curl Done") +tr.DelayStart = 2 # Delaying a couple seconds to make sure the global continuation's lock contention resolves. tr.Processes.Default.Command = "traffic_ctl plugin msg done done" tr.Processes.Default.ReturnCode = 0 tr.Processes.Default.Env = ts.Env diff --git a/tests/gold_tests/pluginTest/test_hooks/log.gold b/tests/gold_tests/pluginTest/test_hooks/log.gold index b798539374a..8f3ab5bdac7 100644 --- a/tests/gold_tests/pluginTest/test_hooks/log.gold +++ b/tests/gold_tests/pluginTest/test_hooks/log.gold @@ -28,3 +28,22 @@ Global: event=TS_EVENT_HTTP_SSN_CLOSE Session: event=TS_EVENT_HTTP_SSN_CLOSE Global: event=TS_EVENT_VCONN_CLOSE Global: ssl flag=1 +Global: event=TS_EVENT_VCONN_START +Global: ssl flag=1 +Global: event=TS_EVENT_SSL_SERVERNAME +Global: ssl flag=1 +Global: event=TS_EVENT_SSL_CERT +Global: ssl flag=1 +Global: event=TS_EVENT_HTTP_SSN_START +Global: event=TS_EVENT_HTTP_TXN_START +Session: event=TS_EVENT_HTTP_TXN_START +Global: event=TS_EVENT_HTTP_READ_REQUEST_HDR +Session: event=TS_EVENT_HTTP_READ_REQUEST_HDR +Transaction: event=TS_EVENT_HTTP_READ_REQUEST_HDR +Global: event=TS_EVENT_HTTP_TXN_CLOSE +Session: event=TS_EVENT_HTTP_TXN_CLOSE +Transaction: event=TS_EVENT_HTTP_TXN_CLOSE +Global: event=TS_EVENT_HTTP_SSN_CLOSE +Session: event=TS_EVENT_HTTP_SSN_CLOSE +Global: event=TS_EVENT_VCONN_CLOSE +Global: ssl flag=1 diff --git a/tests/gold_tests/pluginTest/test_hooks/test_hooks.test.py b/tests/gold_tests/pluginTest/test_hooks/test_hooks.test.py index a35c87f16e7..903e9fa7f9f 100644 --- a/tests/gold_tests/pluginTest/test_hooks/test_hooks.test.py +++ b/tests/gold_tests/pluginTest/test_hooks/test_hooks.test.py @@ -78,6 +78,12 @@ ) tr.Processes.Default.ReturnCode = 0 +tr = Test.AddTestRun() +tr.Processes.Default.Command = ( + 'curl --verbose --ipv4 --http1.1 --insecure --header "Host: one" https://localhost:{0}/argh'.format(ts.Variables.ssl_port) +) +tr.Processes.Default.ReturnCode = 0 + # The probing of the ATS port to detect when ATS is ready may be seen by ATS as a VCONN start/close, so filter out these # events from the log file. # diff --git a/tests/tools/plugins/ssntxnorder_verify.cc b/tests/tools/plugins/ssntxnorder_verify.cc index fd0a2ce9749..5d9a53948fe 100644 --- a/tests/tools/plugins/ssntxnorder_verify.cc +++ b/tests/tools/plugins/ssntxnorder_verify.cc @@ -37,7 +37,8 @@ static const char DEBUG_TAG_INIT[] = "ssntxnorder_verify.init"; // debug messages on every request serviced -static const char DEBUG_TAG_HOOK[] = "ssntxnorder_verify.hook"; +static const char DEBUG_TAG_HOOK[] = "ssntxnorder_verify.hook"; +static const char DEBUG_TAG_CLOSE[] = "ssntxnorder_verify.close"; // plugin registration info static char plugin_name[] = "ssntxnorder_verify"; @@ -45,8 +46,8 @@ static char vendor_name[] = "Apache"; static char support_email[] = "shinrich@apache.org"; // List of started sessions, SSN_START seen, SSN_CLOSE not seen yet. -static std::set started_ssns; -static int ssn_balance = 0; // +1 on SSN_START, -1 on SSN_CLOSE +thread_local std::set started_ssns; +thread_local int ssn_balance = 0; // +1 on SSN_START, -1 on SSN_CLOSE // Metadata for active transactions. Stored upon start to persist improper // closing behavior. @@ -67,8 +68,9 @@ struct txn_compare { } }; // List of started transactions, TXN_START seen, TXN_CLOSE not seen yet. -static std::set started_txns; -static int txn_balance = 0; // +1 on TXN_START -1 on TXN_CLOSE +thread_local std::set started_txns; +thread_local std::set closed_txns; +thread_local int txn_balance = 0; // +1 on TXN_START -1 on TXN_CLOSE // Statistics provided by the plugin static int stat_ssn_close = 0; // number of TS_HTTP_SSN_CLOSE hooks caught @@ -138,17 +140,19 @@ handle_order(TSCont contp, TSEvent event, void *edata) case TS_EVENT_HTTP_SSN_CLOSE: // End of session { ssnp = reinterpret_cast(edata); - TSDebug(DEBUG_TAG_HOOK, "event TS_EVENT_HTTP_SSN_CLOSE [ SSNID = %p ]", ssnp); + TSDebug(DEBUG_TAG_CLOSE, "event TS_EVENT_HTTP_SSN_CLOSE [ SSNID = %p ]", ssnp); TSStatIntIncrement(stat_ssn_close, 1); if (started_ssns.erase(ssnp) == 0) { // No record existsted for this session - TSError("Session [ SSNID = %p ] closing was not previously started", ssnp); + TSDebug(DEBUG_TAG_HOOK, "Session [ SSNID = %p ] closing was not previously started", ssnp); TSStatIntIncrement(stat_err, 1); + abort(); } if (--ssn_balance < 0) { - TSError("More sessions have been closed than started."); + TSDebug(DEBUG_TAG_HOOK, "More sessions have been closed than started."); TSStatIntIncrement(stat_err, 1); + abort(); } TSHttpSsnReenable(ssnp, TS_EVENT_HTTP_CONTINUE); @@ -163,8 +167,9 @@ handle_order(TSCont contp, TSEvent event, void *edata) if (!started_ssns.insert(ssnp).second) { // Insert failed. Session already existed in the record. - TSError("Session [ SSNID = %p ] has previously started.", ssnp); + TSDebug(DEBUG_TAG_HOOK, "Session [ SSNID = %p ] has previously started.", ssnp); TSStatIntIncrement(stat_err, 1); + abort(); } ++ssn_balance; @@ -178,32 +183,45 @@ handle_order(TSCont contp, TSEvent event, void *edata) TSDebug(DEBUG_TAG_HOOK, "event TS_EVENT_HTTP_TXN_CLOSE [ TXNID = %" PRIu64 " ]", TSHttpTxnIdGet(txnp)); TSStatIntIncrement(stat_txn_close, 1); + std::set::iterator closed_txn = closed_txns.find(started_txn(TSHttpTxnIdGet(txnp))); + if (closed_txn != closed_txns.end()) { + // Double close? + TSStatIntIncrement(stat_err, 1); + abort(); + } + + closed_txns.insert(started_txn(TSHttpTxnIdGet(txnp))); std::set::iterator current_txn = started_txns.find(started_txn(TSHttpTxnIdGet(txnp))); - if (current_txn != started_txns.end()) { + if (current_txn != started_txns.end() && current_txn->id == TSHttpTxnIdGet(txnp)) { // Transaction exists. ssnp = current_txn->ssnp; if (started_ssns.find(ssnp) == started_ssns.end()) { // The session of the transaction was either not started, or was // already closed. - TSError("Transaction [ TXNID = %" PRIu64 " ] closing not in an " + TSDebug(DEBUG_TAG_HOOK, + "Transaction [ TXNID = %" PRIu64 " ] closing not in an " "active session [ SSNID = %p ].", current_txn->id, ssnp); TSStatIntIncrement(stat_err, 1); + abort(); } started_txns.erase(current_txn); // Stop monitoring the transaction } else { // Transaction does not exists. - TSError("Transaction [ TXNID = %" PRIu64 " ] closing not " + TSDebug(DEBUG_TAG_HOOK, + "Transaction [ TXNID = %" PRIu64 " ] closing not " "previously started.", - current_txn->id); + TSHttpTxnIdGet(txnp)); TSStatIntIncrement(stat_err, 1); + abort(); } if (--txn_balance < 0) { - TSError("More transactions have been closed than started."); + TSDebug(DEBUG_TAG_HOOK, "More transactions have been closed than started."); TSStatIntIncrement(stat_err, 1); + abort(); } TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); @@ -221,16 +239,19 @@ handle_order(TSCont contp, TSEvent event, void *edata) if (started_ssns.find(ssnp) == started_ssns.end()) { // Session of the transaction has not started. - TSError("Transaction [ TXNID = %" PRIu64 " ] starting not in an " + TSDebug(DEBUG_TAG_HOOK, + "Transaction [ TXNID = %" PRIu64 " ] starting not in an " "active session [ SSNID = %p ].", new_txn.id, ssnp); TSStatIntIncrement(stat_err, 1); + abort(); } if (!started_txns.insert(new_txn).second) { // Insertion failed. Transaction has previously started. - TSError("Transaction [ TXNID = %" PRIu64 " ] has previously started.", new_txn.id); + TSDebug(DEBUG_TAG_HOOK, "Transaction [ TXNID = %" PRIu64 " ] has previously started.", new_txn.id); TSStatIntIncrement(stat_err, 1); + abort(); } ++txn_balance; @@ -262,6 +283,7 @@ handle_order(TSCont contp, TSEvent event, void *edata) // Just release the lock for all other states and do nothing default: + abort(); break; } @@ -293,13 +315,13 @@ TSPluginInit(int argc, const char *argv[]) #else if (TSPluginRegister(&info) != TS_SUCCESS) { #endif - TSError("[%s] Plugin registration failed. \n", plugin_name); + TSDebug(DEBUG_TAG_HOOK, "[%s] Plugin registration failed. \n", plugin_name); } TSCont contp = TSContCreate(handle_order, TSMutexCreate()); if (contp == nullptr) { // Continuation initialization failed. Unrecoverable, report and exit. - TSError("[%s] could not create continuation", plugin_name); + TSDebug(DEBUG_TAG_HOOK, "[%s] could not create continuation", plugin_name); abort(); } else { // Continuation initialization succeeded.