From 85c021123fd94c4d97a6015484eb1d8054bec9eb Mon Sep 17 00:00:00 2001 From: shinrich Date: Fri, 15 Jul 2016 15:52:09 -0500 Subject: [PATCH] TS-4507: Fix SSN and TXN hook ordering. --- proxy/ProxyClientSession.cc | 6 +- proxy/ProxyClientSession.h | 4 +- proxy/ProxyClientTransaction.cc | 12 +- proxy/ProxyClientTransaction.h | 8 +- proxy/http/Http1ClientSession.cc | 37 ++++-- proxy/http/Http1ClientSession.h | 17 ++- proxy/http/Http1ClientTransaction.cc | 19 +++ proxy/http/Http1ClientTransaction.h | 6 +- proxy/http/HttpSM.cc | 21 ++-- proxy/http2/Http2ClientSession.cc | 74 +++++++++-- proxy/http2/Http2ClientSession.h | 22 +++- proxy/http2/Http2ConnectionState.cc | 37 ++++-- proxy/http2/Http2ConnectionState.h | 20 ++- proxy/http2/Http2Stream.cc | 175 +++++++++++++++------------ proxy/http2/Http2Stream.h | 9 +- 15 files changed, 323 insertions(+), 144 deletions(-) diff --git a/proxy/ProxyClientSession.cc b/proxy/ProxyClientSession.cc index 5d649a8a07c..635894b1892 100644 --- a/proxy/ProxyClientSession.cc +++ b/proxy/ProxyClientSession.cc @@ -67,7 +67,7 @@ is_valid_hook(TSHttpHookID hookid) } void -ProxyClientSession::destroy() +ProxyClientSession::free() { if (schedule_event) { schedule_event->cancel(); @@ -137,7 +137,7 @@ ProxyClientSession::state_api_callout(int event, void *data) // coverity[unterminated_default] default: - ink_assert(false); + ink_release_assert(false); } return 0; @@ -185,7 +185,7 @@ ProxyClientSession::handle_api_return(int event) vc->do_io_close(); this->release_netvc(); } - this->destroy(); + free(); // You can now clean things up break; } default: diff --git a/proxy/ProxyClientSession.h b/proxy/ProxyClientSession.h index 2be15a9a693..d6feef70e44 100644 --- a/proxy/ProxyClientSession.h +++ b/proxy/ProxyClientSession.h @@ -43,7 +43,8 @@ class ProxyClientSession : public VConnection public: ProxyClientSession(); - virtual void destroy(); + virtual void destroy() = 0; + virtual void free(); virtual void start() = 0; virtual void new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOBufferReader *reader, bool backdoor) = 0; @@ -188,6 +189,7 @@ class ProxyClientSession : public VConnection int64_t con_id; Event *schedule_event; + bool in_destroy; private: APIHookScope api_scope; diff --git a/proxy/ProxyClientTransaction.cc b/proxy/ProxyClientTransaction.cc index e1bad21f43b..3723c605976 100644 --- a/proxy/ProxyClientTransaction.cc +++ b/proxy/ProxyClientTransaction.cc @@ -63,7 +63,7 @@ ProxyClientTransaction::release(IOBufferReader *r) DebugHttpTxn("[%" PRId64 "] session released by sm [%" PRId64 "]", parent ? parent->connection_id() : 0, current_reader ? current_reader->sm_id : 0); - current_reader = NULL; // Clear reference to SM + // current_reader = NULL; // Clear reference to SM // Pass along the release to the session if (parent) { @@ -77,6 +77,16 @@ ProxyClientTransaction::attach_server_session(HttpServerSession *ssession, bool parent->attach_server_session(ssession, transaction_done); } +void +ProxyClientTransaction::destroy() +{ + if (current_reader) { + current_reader->ua_session = NULL; + current_reader = NULL; + } + this->mutex.clear(); +} + Action * ProxyClientTransaction::adjust_thread(Continuation *cont, int event, void *data) { diff --git a/proxy/ProxyClientTransaction.h b/proxy/ProxyClientTransaction.h index 9dac03d3775..1f2a717e66f 100644 --- a/proxy/ProxyClientTransaction.h +++ b/proxy/ProxyClientTransaction.h @@ -174,11 +174,9 @@ class ProxyClientTransaction : public VConnection return true; } - virtual void - destroy() - { - this->mutex.clear(); - } + virtual void destroy(); + + virtual void transaction_done() = 0; ProxyClientSession * get_parent() diff --git a/proxy/http/Http1ClientSession.cc b/proxy/http/Http1ClientSession.cc index 7c9fdbe115c..2fca5b96925 100644 --- a/proxy/http/Http1ClientSession.cc +++ b/proxy/http/Http1ClientSession.cc @@ -31,15 +31,12 @@ ****************************************************************************/ #include -//#include "ink_config.h" -//#include "Allocator.h" #include "Http1ClientSession.h" #include "Http1ClientTransaction.h" #include "HttpSM.h" #include "HttpDebugNames.h" #include "HttpServerSession.h" #include "Plugin.h" -//#include "Http2ClientSession.h" #define DebugHttpSsn(fmt, ...) DebugSsn(this, "http_cs", fmt, __VA_ARGS__) @@ -84,11 +81,23 @@ Http1ClientSession::Http1ClientSession() void Http1ClientSession::destroy() { - DebugHttpSsn("[%" PRId64 "] session destroy", con_id); + if (read_state != HCS_CLOSED) { + return; + } + if (!in_destroy) { + in_destroy = true; + DebugHttpSsn("[%" PRId64 "] session destroy", con_id); - ink_release_assert(!client_vc); - ink_assert(read_buffer); + ink_release_assert(!client_vc); + ink_assert(read_buffer); + do_api_callout(TS_HTTP_SSN_CLOSE_HOOK); + } +} + +void +Http1ClientSession::free() +{ magic = HTTP_CS_MAGIC_DEAD; if (read_buffer) { free_MIOBuffer(read_buffer); @@ -112,7 +121,7 @@ Http1ClientSession::destroy() // Free the transaction resources this->trans.cleanup(); - super::destroy(); + super::free(); THREAD_FREE(this, http1ClientSessionAllocator, this_thread()); } @@ -126,6 +135,7 @@ Http1ClientSession::new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOB mutex = new_vc->mutex; trans.mutex = mutex; // Share this mutex with the transaction ssn_start_time = Thread::get_hrtime(); + in_destroy = false; MUTEX_TRY_LOCK(lock, mutex, this_ethread()); ink_assert(lock.is_locked()); @@ -215,6 +225,8 @@ Http1ClientSession::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader void Http1ClientSession::set_tcp_init_cwnd() { + if (!trans.get_sm()) + return; int desired_tcp_init_cwnd = trans.get_sm()->t_state.txn_conf->server_tcp_init_cwnd; DebugHttpSsn("desired TCP congestion window is %d", desired_tcp_init_cwnd); if (desired_tcp_init_cwnd == 0) { @@ -234,6 +246,8 @@ Http1ClientSession::do_io_shutdown(ShutdownHowTo_t howto) void Http1ClientSession::do_io_close(int alerrno) { + if (read_state == HCS_CLOSED) + return; // Don't double call session close if (read_state == HCS_ACTIVE_READER) { if (trans.m_active) { trans.m_active = false; @@ -285,7 +299,13 @@ Http1ClientSession::do_io_close(int alerrno) HTTP_SUM_DYN_STAT(http_transactions_per_client_con, transact_count); HTTP_DECREMENT_DYN_STAT(http_current_client_connections_stat); conn_decrease = false; - do_api_callout(TS_HTTP_SSN_CLOSE_HOOK); + if (client_vc) { + client_vc->do_io_close(); + client_vc = NULL; + } + } + if (trans.get_sm() == NULL) { // Destroying from keep_alive state + this->destroy(); } } @@ -314,6 +334,7 @@ Http1ClientSession::state_wait_for_close(int event, void *data) // Drain any data read sm_reader->consume(sm_reader->read_avail()); break; + default: ink_release_assert(0); break; diff --git a/proxy/http/Http1ClientSession.h b/proxy/http/Http1ClientSession.h index 30113d5ad02..8e963eb9e78 100644 --- a/proxy/http/Http1ClientSession.h +++ b/proxy/http/Http1ClientSession.h @@ -56,6 +56,7 @@ class Http1ClientSession : public ProxyClientSession // Implement ProxyClientSession interface. virtual void destroy(); + virtual void free(); virtual void start() @@ -92,7 +93,14 @@ class Http1ClientSession : public ProxyClientSession virtual void release_netvc() { - client_vc = NULL; + // Make sure the vio's are also released to avoid + // later surprises in inactivity timeout + if (client_vc) { + client_vc->do_io_read(NULL, 0, NULL); + client_vc->do_io_write(NULL, 0, NULL); + client_vc->set_action(NULL); + client_vc = NULL; + } } int @@ -187,7 +195,12 @@ class Http1ClientSession : public ProxyClientSession MIOBuffer *read_buffer; IOBufferReader *sm_reader; - C_Read_State read_state; + + /* + * Volatile should not be necessary, but there appears to be a bug in the 4.9 rhel gcc + * compiler that was using an old version of read_state to make decisions in really_destroy + */ + volatile C_Read_State read_state; VIO *ka_vio; VIO *slave_ka_vio; diff --git a/proxy/http/Http1ClientTransaction.cc b/proxy/http/Http1ClientTransaction.cc index a5ae268c560..cb0de3e6384 100644 --- a/proxy/http/Http1ClientTransaction.cc +++ b/proxy/http/Http1ClientTransaction.cc @@ -62,3 +62,22 @@ Http1ClientTransaction::set_parent(ProxyClientSession *new_parent) } super::set_parent(new_parent); } + +void +Http1ClientTransaction::transaction_done() +{ + current_reader = NULL; + // If the parent session is not in the closed state, the destroy will not occur. + if (parent) { + parent->destroy(); + } +} + +void +Http1ClientTransaction::destroy() +{ + if (current_reader) { + current_reader->ua_session = NULL; + current_reader = NULL; + } +} diff --git a/proxy/http/Http1ClientTransaction.h b/proxy/http/Http1ClientTransaction.h index 9826b7fa5cd..8fa73190b3e 100644 --- a/proxy/http/Http1ClientTransaction.h +++ b/proxy/http/Http1ClientTransaction.h @@ -55,10 +55,7 @@ class Http1ClientTransaction : public ProxyClientTransaction // Don't destroy your elements. Rely on the Http1ClientSession to clean up the // Http1ClientTransaction class as necessary - virtual void - destroy() - { - } + virtual void destroy(); // Clean up the transaction elements when the ClientSession shuts down void @@ -164,6 +161,7 @@ class Http1ClientTransaction : public ProxyClientTransaction if (parent) parent->cancel_inactivity_timeout(); } + void transaction_done(); protected: uint16_t outbound_port; diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index 3c19cf73db6..b21ac3d7531 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -910,11 +910,7 @@ HttpSM::state_watch_for_client_abort(int event, void *data) } ua_entry->eos = true; } else { - if (netvc) { - netvc->do_io_close(); - } ua_session->do_io_close(); - ua_session = NULL; ua_buffer_reader = NULL; vc_table.cleanup_entry(ua_entry); ua_entry = NULL; @@ -3007,9 +3003,8 @@ HttpSM::tunnel_handler_server(int event, HttpTunnelProducer *p) // Note: This is a hack. The correct solution is for the UA session to signal back to the SM // when the UA is about to be destroyed and clean up the pointer there. That should be done once // the TS-3612 changes are in place (and similarly for the server session). - if (ua_entry->in_tunnel) { - ua_session = NULL; - } + /*if (ua_entry->in_tunnel) + ua_session = NULL; */ t_state.current.server->abort = HttpTransact::ABORTED; t_state.client_info.keep_alive = HTTP_NO_KEEPALIVE; @@ -3325,12 +3320,11 @@ HttpSM::tunnel_handler_ua(int event, HttpTunnelConsumer *c) } ua_session->do_io_close(); - ua_session = NULL; } else { ink_assert(ua_buffer_reader != NULL); ua_session->release(ua_buffer_reader); ua_buffer_reader = NULL; - ua_session = NULL; + // ua_session = NULL; } return 0; @@ -6160,8 +6154,8 @@ HttpSM::setup_error_transfer() } else { DebugSM("http", "[setup_error_transfer] Now closing connection ..."); vc_table.cleanup_entry(ua_entry); - ua_entry = NULL; - ua_session = NULL; + ua_entry = NULL; + // ua_session = NULL; terminate_sm = true; t_state.source = HttpTransact::SOURCE_INTERNAL; } @@ -6775,7 +6769,6 @@ HttpSM::kill_this() plugin_tunnel = NULL; } - ua_session = NULL; server_session = NULL; // So we don't try to nuke the state machine @@ -6804,6 +6797,10 @@ HttpSM::kill_this() // then the value of kill_this_async_done has changed so // we must check it again if (kill_this_async_done == true) { + if (ua_session) { + ua_session->transaction_done(); + } + // In the async state, the plugin could have been // called resulting in the creation of a plugin_tunnel. // So it needs to be deleted now. diff --git a/proxy/http2/Http2ClientSession.cc b/proxy/http2/Http2ClientSession.cc index 635e86db93a..891516c246d 100644 --- a/proxy/http2/Http2ClientSession.cc +++ b/proxy/http2/Http2ClientSession.cc @@ -66,14 +66,41 @@ Http2ClientSession::Http2ClientSession() sm_reader(NULL), write_buffer(NULL), sm_writer(NULL), - upgrade_context() + upgrade_context(), + kill_me(false), + recursion(0) { } void Http2ClientSession::destroy() { - DebugHttp2Ssn("session destroy"); + if (!in_destroy) { + in_destroy = true; + DebugHttp2Ssn("session destroy"); + // Let everyone know we are going down + do_api_callout(TS_HTTP_SSN_CLOSE_HOOK); + } +} + +void +Http2ClientSession::free() +{ + DebugHttp2Ssn("session free"); + + if (client_vc) { + release_netvc(); + client_vc->do_io_close(); + client_vc = NULL; + } + + // Make sure the we are at the bottom of the stack + if (connection_state.is_recursing() || this->recursion != 0) { + // Note that we are ready to be cleaned up + // One of the event handlers will catch it + kill_me = true; + return; + } HTTP2_DECREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_SESSION_COUNT, this->mutex->thread_holding); @@ -105,7 +132,7 @@ Http2ClientSession::destroy() this->connection_state.destroy(); - super::destroy(); + super::free(); free_MIOBuffer(this->read_buffer); free_MIOBuffer(this->write_buffer); @@ -150,8 +177,9 @@ Http2ClientSession::new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOB this->con_id = ProxyClientSession::next_connection_id(); this->client_vc = new_vc; client_vc->set_inactivity_timeout(HRTIME_SECONDS(Http2::accept_no_activity_timeout)); - this->mutex = new_vc->mutex; this->schedule_event = NULL; + this->mutex = new_vc->mutex; + this->in_destroy = false; this->connection_state.mutex = new_ProxyMutex(); @@ -235,7 +263,15 @@ Http2ClientSession::do_io_close(int alerrno) ink_assert(this->mutex->thread_holding == this_ethread()); send_connection_event(&this->connection_state, HTTP2_SESSION_EVENT_FINI, this); - do_api_callout(TS_HTTP_SSN_CLOSE_HOOK); + + // Don't send the SSN_CLOSE_HOOK until we got rid of all the streams + // And handled all the TXN_CLOSE_HOOK's + if (client_vc) { + this->release_netvc(); + client_vc->do_io_close(); + client_vc = NULL; + } + this->connection_state.release_stream(NULL); } void @@ -248,6 +284,9 @@ int Http2ClientSession::main_event_handler(int event, void *edata) { ink_assert(this->mutex->thread_holding == this_ethread()); + int retval; + + recursion++; Event *e = static_cast(edata); if (e == schedule_event) { @@ -257,7 +296,8 @@ Http2ClientSession::main_event_handler(int event, void *edata) switch (event) { case VC_EVENT_READ_COMPLETE: case VC_EVENT_READ_READY: - return (this->*session_handler)(event, edata); + retval = (this->*session_handler)(event, edata); + break; case HTTP2_SESSION_EVENT_XMIT: { Http2Frame *frame = (Http2Frame *)edata; @@ -265,7 +305,8 @@ Http2ClientSession::main_event_handler(int event, void *edata) write_vio->nbytes = total_write_len; frame->xmit(this->write_buffer); write_reenable(); - return 0; + retval = 0; + break; } case VC_EVENT_ACTIVE_TIMEOUT: @@ -276,18 +317,25 @@ Http2ClientSession::main_event_handler(int event, void *edata) return 0; case VC_EVENT_WRITE_READY: - return 0; + retval = 0; + break; + case VC_EVENT_WRITE_COMPLETE: - if (this->connection_state.is_state_closed()) { - this->do_io_close(); - } - return 0; + // Seems as this is being closed already + retval = 0; + break; default: DebugHttp2Ssn("unexpected event=%d edata=%p", event, edata); ink_release_assert(0); - return 0; + retval = 0; + break; + } + recursion--; + if (!connection_state.is_recursing() && this->recursion == 0 && kill_me) { + this->free(); } + return retval; } int diff --git a/proxy/http2/Http2ClientSession.h b/proxy/http2/Http2ClientSession.h index 088f87e0513..e8ce529bd99 100644 --- a/proxy/http2/Http2ClientSession.h +++ b/proxy/http2/Http2ClientSession.h @@ -159,8 +159,15 @@ class Http2ClientSession : public ProxyClientSession // Implement ProxyClientSession interface. void start(); virtual void destroy(); + virtual void free(); void new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOBufferReader *reader, bool backdoor); + bool + ready_to_free() const + { + return kill_me; + } + // Implement VConnection interface. VIO *do_io_read(Continuation *c, int64_t nbytes = INT64_MAX, MIOBuffer *buf = 0); VIO *do_io_write(Continuation *c = NULL, int64_t nbytes = INT64_MAX, IOBufferReader *buf = 0, bool owner = false); @@ -175,7 +182,13 @@ class Http2ClientSession : public ProxyClientSession virtual void release_netvc() { - client_vc = NULL; + // Make sure the vio's are also released to avoid + // later surprises in inactivity timeout + if (client_vc) { + client_vc->do_io_read(NULL, 0, NULL); + client_vc->do_io_write(NULL, 0, NULL); + client_vc->set_action(NULL); + } } sockaddr const * @@ -218,6 +231,11 @@ class Http2ClientSession : public ProxyClientSession { return dying_event; } + bool + is_recursing() const + { + return recursion > 0; + } virtual const char * get_protocol_string() const @@ -250,6 +268,8 @@ class Http2ClientSession : public ProxyClientSession VIO *write_vio; int dying_event; + bool kill_me; + int recursion; }; extern ClassAllocator http2ClientSessionAllocator; diff --git a/proxy/http2/Http2ConnectionState.cc b/proxy/http2/Http2ConnectionState.cc index d5ed261ff57..fc99d6b5281 100644 --- a/proxy/http2/Http2ConnectionState.cc +++ b/proxy/http2/Http2ConnectionState.cc @@ -764,6 +764,7 @@ static const http2_frame_dispatch frame_handlers[HTTP2_FRAME_TYPE_MAX] = { int Http2ConnectionState::main_event_handler(int event, void *edata) { + ++recursion; switch (event) { // Initialize HTTP/2 Connection case HTTP2_SESSION_EVENT_INIT: { @@ -788,16 +789,16 @@ Http2ConnectionState::main_event_handler(int event, void *edata) send_window_update_frame(0, server_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE) - HTTP2_INITIAL_WINDOW_SIZE); } - return 0; + break; } // Finalize HTTP/2 Connection case HTTP2_SESSION_EVENT_FINI: { - this->ua_session = NULL; + this->fini_received = true; cleanup_streams(); SET_HANDLER(&Http2ConnectionState::state_closed); - return 0; - } + this->release_stream(NULL); + } break; case HTTP2_SESSION_EVENT_XMIT: { SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); @@ -817,7 +818,7 @@ Http2ConnectionState::main_event_handler(int event, void *edata) // Implementations MUST discard frames that have unknown or unsupported types. if (frame->header().type >= HTTP2_FRAME_TYPE_MAX) { DebugHttp2Stream(ua_session, stream_id, "Discard a frame which has unknown type, type=%x", frame->header().type); - return 0; + break; } if (frame_handlers[frame->header().type]) { @@ -845,13 +846,22 @@ Http2ConnectionState::main_event_handler(int event, void *edata) } } - return 0; + break; } default: DebugHttp2Con(ua_session, "unexpected event=%d edata=%p", event, edata); ink_release_assert(0); - return 0; + break; + } + + --recursion; + if (recursion == 0 && ua_session && !ua_session->is_recursing()) { + if (this->ua_session->ready_to_free()) { + this->ua_session->free(); + // After the free, the Http2ConnectionState object is also freed. + // The Http2ConnectionState object is allocted within the Http2ClientSession object + } } return 0; @@ -887,6 +897,7 @@ Http2ConnectionState::create_stream(Http2StreamId new_id) ink_assert(client_streams_count < UINT32_MAX); ++client_streams_count; + ++total_client_streams_count; new_stream->set_parent(ua_session); new_stream->mutex = ua_session->mutex; ua_session->get_netvc()->add_to_active_queue(); @@ -956,6 +967,18 @@ Http2ConnectionState::delete_stream(Http2Stream *stream) } } +void +Http2ConnectionState::release_stream(Http2Stream *stream) +{ + if (stream) { + --total_client_streams_count; + } + if (ua_session && fini_received && total_client_streams_count == 0) { + // We were shutting down, go ahead and terminate the session + ua_session->destroy(); + } +} + void Http2ConnectionState::update_initial_rwnd(Http2WindowSize new_size) { diff --git a/proxy/http2/Http2ConnectionState.h b/proxy/http2/Http2ConnectionState.h index 1594b9f7df0..5c50d810fc6 100644 --- a/proxy/http2/Http2ConnectionState.h +++ b/proxy/http2/Http2ConnectionState.h @@ -119,8 +119,11 @@ class Http2ConnectionState : public Continuation stream_list(), latest_streamid(0), client_streams_count(0), + total_client_streams_count(0), continued_stream_id(0), - _scheduled(false) + _scheduled(false), + fini_received(false), + recursion(0) { SET_HANDLER(&Http2ConnectionState::main_event_handler); } @@ -169,6 +172,7 @@ class Http2ConnectionState : public Continuation Http2Stream *find_stream(Http2StreamId id) const; void restart_streams(); void delete_stream(Http2Stream *stream); + void release_stream(Http2Stream *stream); void cleanup_streams(); void update_initial_rwnd(Http2WindowSize new_size); @@ -214,7 +218,13 @@ class Http2ConnectionState : public Continuation bool is_state_closed() const { - return ua_session == NULL; + return ua_session == NULL || fini_received; + } + + bool + is_recursing() const + { + return recursion > 0; } private: @@ -232,8 +242,10 @@ class Http2ConnectionState : public Continuation DLL stream_list; Http2StreamId latest_streamid; - // Counter for current acive streams which is started by client + // Counter for current active streams which is started by client uint32_t client_streams_count; + // Counter for current active streams and streams in the process of shutting down + uint32_t total_client_streams_count; // NOTE: Id of stream which MUST receive CONTINUATION frame. // - [RFC 7540] 6.2 HEADERS @@ -245,6 +257,8 @@ class Http2ConnectionState : public Continuation Http2StreamId continued_stream_id; IOVec continued_buffer; bool _scheduled; + bool fini_received; + int recursion; }; #endif // __HTTP2_CONNECTION_STATE_H__ diff --git a/proxy/http2/Http2Stream.cc b/proxy/http2/Http2Stream.cc index 1e4d0a8981d..7a228ff3e0e 100644 --- a/proxy/http2/Http2Stream.cc +++ b/proxy/http2/Http2Stream.cc @@ -33,6 +33,15 @@ Http2Stream::main_event_handler(int event, void *edata) { Event *e = static_cast(edata); + Thread *this_thread = this_ethread(); + if (this->get_thread() != this_thread) { + // Send on to the owning thread + if (cross_thread_event == NULL) { + cross_thread_event = this->get_thread()->schedule_imm(this, event, edata); + } + return 0; + } + ink_release_assert(this->get_thread() == this_ethread()); SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); if (e == cross_thread_event) { cross_thread_event = NULL; @@ -53,11 +62,19 @@ Http2Stream::main_event_handler(int event, void *edata) case VC_EVENT_ACTIVE_TIMEOUT: case VC_EVENT_INACTIVITY_TIMEOUT: if (current_reader && read_vio.ntodo() > 0) { - SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread()); - read_vio._cont->handleEvent(event, &read_vio); + MUTEX_TRY_LOCK(lock, read_vio.mutex, this_ethread()); + if (lock.is_locked()) { + read_vio._cont->handleEvent(event, &read_vio); + } else { + this_ethread()->schedule_imm(read_vio._cont, event, &read_vio); + } } else if (current_reader && write_vio.ntodo() > 0) { - SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread()); - write_vio._cont->handleEvent(event, &write_vio); + MUTEX_TRY_LOCK(lock, write_vio.mutex, this_ethread()); + if (lock.is_locked()) { + write_vio._cont->handleEvent(event, &write_vio); + } else { + this_ethread()->schedule_imm(write_vio._cont, event, &write_vio); + } } break; case VC_EVENT_WRITE_READY: @@ -65,9 +82,11 @@ Http2Stream::main_event_handler(int event, void *edata) inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; if (e->cookie == &write_vio) { if (write_vio.mutex) { - SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread()); - if (write_vio._cont && this->current_reader) { + MUTEX_TRY_LOCK(lock, write_vio.mutex, this_ethread()); + if (lock.is_locked() && write_vio._cont && this->current_reader) { write_vio._cont->handleEvent(event, &write_vio); + } else { + this_ethread()->schedule_imm(write_vio._cont, event, &write_vio); } } } else { @@ -79,9 +98,11 @@ Http2Stream::main_event_handler(int event, void *edata) inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; if (e->cookie == &read_vio) { if (read_vio.mutex) { - SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread()); - if (read_vio._cont && this->current_reader) { + MUTEX_TRY_LOCK(lock, read_vio.mutex, this_ethread()); + if (lock.is_locked() && read_vio._cont && this->current_reader) { read_vio._cont->handleEvent(event, &read_vio); + } else { + this_ethread()->schedule_imm(read_vio._cont, event, &read_vio); } } } else { @@ -92,6 +113,8 @@ Http2Stream::main_event_handler(int event, void *edata) SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); // Clean up after yourself if this was an EOS ink_release_assert(this->closed); + // Safe to initiate SSN_CLOSE if this is the last stream + static_cast(parent)->connection_state.release_stream(this); this->destroy(); break; } @@ -244,48 +267,54 @@ void Http2Stream::do_io_close(int /* flags */) { SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); - current_reader = NULL; // SM on the way out + // disengage us from the SM + super::release(NULL); if (!sent_delete) { - sent_delete = true; Debug("http2_stream", "do_io_close stream %d", this->get_id()); - // Only close if we are done sending data back to the client - if (parent && (!this->is_body_done() || this->response_is_data_available())) { - Debug("http2_stream", "%d: Undo close to pass data", this->get_id()); - closed = false; // "unclose" so this gets picked up later when the netvc side is done - // If chunking is playing games with us, make sure we noticed when the end of message has happened - if (!this->is_body_done() && this->write_vio.ndone == this->write_vio.nbytes) { - this->mark_body_done(); - } else { - lock.release(); - this->reenable(&write_vio); // Kick the mechanism to get any remaining data pushed out - Warning("Re-enabled to get data pushed out is_done=%d", this->is_body_done()); - return; - } - } - closed = true; + // When we get here, the SM has initiated the shutdown. Either it received a WRITE_COMPLETE, or it is shutting down. Any + // remaining IO operations back to client should be abandoned. The SM-side buffers backing these operations will be deleted + // by the time this is called from transaction_done. + + sent_delete = true; + closed = true; if (parent) { // Make sure any trailing end of stream frames are sent - // Ourselve will be removed at send_data_frames or closing connection phase + // Wee will be removed at send_data_frames or closing connection phase static_cast(parent)->connection_state.send_data_frames(this); } - parent = NULL; // Check to see if the stream is in the closed state ink_assert(get_state() == HTTP2_STREAM_STATE_CLOSED); clear_timers(); clear_io_events(); - if (cross_thread_event != NULL) { - cross_thread_event->cancel(); - } - cross_thread_event = NULL; + // Wait until transaction_done is called from HttpSM to signal that the TXN_CLOSE hook has been executed + } +} + +/* + * HttpSM has called TXN_close hooks. + */ +void +Http2Stream::transaction_done() +{ + SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); + if (cross_thread_event != NULL) + cross_thread_event->cancel(); - // Send an event to get the stream to kill itself - // Thus if any events for the stream are in the queue, they will be handled first. - // We have marked the stream closed, so no new events should be queued - cross_thread_event = this_ethread()->schedule_imm(this, VC_EVENT_EOS); + if (!closed) + do_io_close(); // Make sure we've been closed. If we didn't close the parent session better still be open + ink_release_assert(closed || !static_cast(parent)->connection_state.is_state_closed()); + current_reader = NULL; + + if (closed) { + // Safe to initiate SSN_CLOSE if this is the last stream + if (cross_thread_event) + cross_thread_event->cancel(); + // Schedule the destroy to occur after we unwind here. IF we call directly, may delete with reference on the stack. + cross_thread_event = this->get_thread()->schedule_imm(this, VC_EVENT_EOS, NULL); } } @@ -302,10 +331,11 @@ Http2Stream::initiating_close() closed = true; _state = HTTP2_STREAM_STATE_CLOSED; - parent = NULL; - // leaving the reference to the SM, so we can detatch from the SM when we actually destroy // current_reader = NULL; + // Leaving reference to client session as well, so we can signal once the + // TXN_CLOSE has beent sent + // parent = NULL; clear_timers(); clear_io_events(); @@ -339,15 +369,10 @@ Http2Stream::initiating_close() } } else if (current_reader) { SCOPED_MUTEX_LOCK(lock, current_reader->mutex, this_ethread()); - current_reader->handleEvent(VC_EVENT_EOS); + current_reader->handleEvent(VC_EVENT_ERROR); } else if (!sent_write_complete) { - // Send an event to get the stream to kill itself - // Thus if any events for the stream are in the queue, they will be handled first. - // We have marked the stream closed, so no new events should be queued - if (cross_thread_event != NULL) { - cross_thread_event->cancel(); - } - cross_thread_event = this_ethread()->schedule_imm(this, VC_EVENT_EOS); + // Transaction is already gone. Kill yourself + do_io_close(); } } } @@ -372,7 +397,7 @@ Http2Stream::send_tracked_event(Event *in_event, int send_event, VIO *vio) void Http2Stream::update_read_request(int64_t read_len, bool call_update) { - if (closed || this->current_reader == NULL) { + if (closed || sent_delete || parent == NULL || current_reader == NULL) { return; } if (this->get_thread() != this_ethread()) { @@ -399,14 +424,11 @@ Http2Stream::update_read_request(int64_t read_len, bool call_update) request_reader->consume(bytes_added); read_vio.ndone += bytes_added; int send_event = (read_vio.nbytes == read_vio.ndone) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY; - // If call_update is true, should be safe to call the read_io continuation handler directly - // However, I was seeing performance regressions, so backed out this change to track that down - // Probably not the cause of performance regression, but need to test some more - /*if (call_update) { // Safe to call vio handler directly + if (call_update) { // Safe to call vio handler directly inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; - if (read_vio._cont && this->current_reader) read_vio._cont->handleEvent(send_event, &read_vio); - } else */ { // Called from do_io_read. Still setting things up. Send - // event to handle this after the dust settles + if (read_vio._cont && this->current_reader) + read_vio._cont->handleEvent(send_event, &read_vio); + } else { // Called from do_io_read. Still setting things up. Send event to handle this after the dust settles read_event = send_tracked_event(read_event, send_event, &read_vio); } } @@ -415,12 +437,12 @@ Http2Stream::update_read_request(int64_t read_len, bool call_update) // Try to be smart and only signal if there was additional data int send_event = (read_vio.nbytes == read_vio.ndone) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY; if (request_reader->read_avail() > 0 || send_event == VC_EVENT_READ_COMPLETE) { - // Same comment of call_update as above - /*if (call_update) { // Safe to call vio handler directly + if (call_update) { // Safe to call vio handler directly inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; - if (read_vio._cont && this->current_reader) read_vio._cont->handleEvent(send_event, &read_vio); - } else */ { // Called from do_io_read. Still setting things up. Send event - // to handle this after the dust settles + if (read_vio._cont && this->current_reader) + read_vio._cont->handleEvent(send_event, &read_vio); + } else { // Called from do_io_read. Still setting things up. Send event + // to handle this after the dust settles read_event = send_tracked_event(read_event, send_event, &read_vio); } } @@ -432,7 +454,7 @@ bool Http2Stream::update_write_request(IOBufferReader *buf_reader, int64_t write_len, bool call_update) { bool retval = true; - if (closed || parent == NULL) { + if (closed || sent_delete || parent == NULL) { return retval; } if (this->get_thread() != this_ethread()) { @@ -483,7 +505,7 @@ Http2Stream::update_write_request(IOBufferReader *buf_reader, int64_t write_len, parent->connection_state.send_headers_frame(this); // See if the response is chunked. Set up the dechunking logic if it is - this->response_initialize_data_handling(); + is_done = this->response_initialize_data_handling(); // If there is additional data, send it along in a data frame. Or if this was header only // make sure to send the end of stream @@ -491,12 +513,11 @@ Http2Stream::update_write_request(IOBufferReader *buf_reader, int64_t write_len, if (send_event != VC_EVENT_WRITE_COMPLETE) { // As with update_read_request, should be safe to call handler directly here if // call_update is true. Commented out for now while tracking a performance regression - /*if (call_update) { // Coming from reenable. Safe to call the handler directly + if (call_update) { // Coming from reenable. Safe to call the handler directly inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; - if (write_vio._cont && this->current_reader) write_vio._cont->handleEvent(send_event, &write_vio); - } else */ { // Called from do_io_write. Might - // still be setting up state. Send - // an event to let the dust settle + if (write_vio._cont && this->current_reader) + write_vio._cont->handleEvent(send_event, &write_vio); + } else { // Called from do_io_write. Might still be setting up state. Send an event to let the dust settle write_event = send_tracked_event(write_event, send_event, &write_vio); } } else { @@ -523,13 +544,11 @@ Http2Stream::update_write_request(IOBufferReader *buf_reader, int64_t write_len, retval = false; } else { send_response_body(); - // Same comment about call_update as above - /*if (call_update) { // Coming from reenable. Safe to call the handler directly + if (call_update) { // Coming from reenable. Safe to call the handler directly inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; - if (write_vio._cont && this->current_reader) write_vio._cont->handleEvent(send_event, &write_vio); - } else */ { // Called from do_io_write. Might still - // be setting up state. Send an event to - // let the dust settle + if (write_vio._cont && this->current_reader) + write_vio._cont->handleEvent(send_event, &write_vio); + } else { // Called from do_io_write. Might still be setting up state. Send an event to let the dust settle write_event = send_tracked_event(write_event, send_event, &write_vio); } } @@ -575,12 +594,6 @@ Http2Stream::destroy() // Clean up the write VIO in case of inactivity timeout this->do_io_write(NULL, 0, NULL); - if (m_active) { - m_active = false; - HTTP_DECREMENT_DYN_STAT(http_current_active_client_connections_stat); - } - HTTP_DECREMENT_DYN_STAT(http_current_client_transactions_stat); - HTTP2_DECREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread); ink_hrtime end_time = Thread::get_hrtime(); HTTP2_SUM_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_TRANSACTIONS_TIME, _thread, end_time - _start_time); @@ -738,3 +751,11 @@ Http2Stream::clear_io_events() } write_event = NULL; } + +void +Http2Stream::release(IOBufferReader *r) +{ + super::release(r); + current_reader = NULL; // State machine is on its own way down. + this->do_io_close(); +} diff --git a/proxy/http2/Http2Stream.h b/proxy/http2/Http2Stream.h index a5a0f7a505d..51e495afa5d 100644 --- a/proxy/http2/Http2Stream.h +++ b/proxy/http2/Http2Stream.h @@ -172,6 +172,7 @@ class Http2Stream : public ProxyClientTransaction void update_read_request(int64_t read_len, bool send_update); bool update_write_request(IOBufferReader *buf_reader, int64_t write_len, bool send_update); void reenable(VIO *vio); + virtual void transaction_done(); void send_response_body(); // Stream level window size @@ -211,13 +212,7 @@ class Http2Stream : public ProxyClientTransaction bool response_initialize_data_handling(); bool response_process_data(); bool response_is_data_available() const; - // For Http2 releasing the transaction should go ahead and delete it - void - release(IOBufferReader *r) - { - current_reader = NULL; // State machine is on its own way down. - this->do_io_close(); - } + void release(IOBufferReader *r); virtual bool allow_half_open() const