Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions proxy/ProxySession.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ ProxySession::state_api_callout(int event, void *data)
if (!schedule_event) { // Don't bother if there is already one
schedule_event = mutex->thread_holding->schedule_in(this, HRTIME_MSECONDS(10));
}
return 0;
return -1;
}

cur_hook = nullptr; // mark current callback at dispatched.
Expand All @@ -134,7 +134,7 @@ ProxySession::state_api_callout(int event, void *data)
return 0;
}

void
int
ProxySession::do_api_callout(TSHttpHookID id)
{
ink_assert(id == TS_HTTP_SSN_START_HOOK || id == TS_HTTP_SSN_CLOSE_HOOK);
Expand All @@ -143,10 +143,11 @@ ProxySession::do_api_callout(TSHttpHookID id)
cur_hook = hook_state.getNext();
if (nullptr != cur_hook) {
SET_HANDLER(&ProxySession::state_api_callout);
this->state_api_callout(EVENT_NONE, nullptr);
return this->state_api_callout(EVENT_NONE, nullptr);
} else {
this->handle_api_return(TS_EVENT_HTTP_CONTINUE);
}
return 0;
}

void
Expand Down
2 changes: 1 addition & 1 deletion proxy/ProxySession.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class ProxySession : public VConnection
virtual const char *protocol_contains(std::string_view tag_prefix) const;

// Non-Virtual Methods
void do_api_callout(TSHttpHookID id);
int do_api_callout(TSHttpHookID id);

// Non-Virtual Accessors
void *get_user_arg(unsigned ix) const;
Expand Down
6 changes: 1 addition & 5 deletions proxy/http/Http1ClientSession.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ Http1ClientSession::destroy()
in_destroy = true;

HttpSsnDebug("[%" PRId64 "] session destroy", con_id);
ink_release_assert(!client_vc);
ink_assert(read_buffer);
ink_release_assert(transact_count == released_transactions);
do_api_callout(TS_HTTP_SSN_CLOSE_HOOK);
Expand Down Expand Up @@ -287,10 +286,7 @@ Http1ClientSession::do_io_close(int alerrno)
HTTP_SUM_DYN_STAT(http_transactions_per_client_con, transact_count);
HTTP_DECREMENT_DYN_STAT(http_current_client_connections_stat);
conn_decrease = false;
if (client_vc) {
client_vc->do_io_close();
client_vc = nullptr;
}
// the netvc will be closed in the session free
}
if (transact_count == released_transactions) {
this->destroy();
Expand Down
32 changes: 24 additions & 8 deletions proxy/http/HttpSM.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,14 @@ HttpSM::set_ua_half_close_flag()
ua_txn->set_half_close_flag(true);
}

inline void
inline int
HttpSM::do_api_callout()
{
if (hooks_set) {
do_api_callout_internal();
return do_api_callout_internal();
} else {
handle_api_return();
return 0;
}
}

Expand All @@ -424,7 +425,16 @@ HttpSM::state_add_to_list(int event, void * /* data ATS_UNUSED */)
}

t_state.api_next_action = HttpTransact::SM_ACTION_API_SM_START;
do_api_callout();
if (do_api_callout() < 0) {
// Didn't get the hook continuation lock. Clear the read and wait for next event
if (ua_entry->read_vio) {
// Seems like ua_entry->read_vio->disable(); should work, but that was
// not sufficient to stop the state machine from processing IO events until the
// TXN_START hooks had completed
ua_entry->read_vio = ua_entry->vc->do_io_read(nullptr, 0, nullptr);
}
return EVENT_CONT;
}
return EVENT_DONE;
}

Expand Down Expand Up @@ -582,6 +592,7 @@ HttpSM::attach_client_session(ProxyTransaction *client_vc, IOBufferReader *buffe
++reentrancy_count;
// Add our state sm to the sm list
state_add_to_list(EVENT_NONE, nullptr);

// This is another external entry point and it is possible for the state machine to get terminated
// while down the call chain from @c state_add_to_list. So we need to use the reentrancy_count to
// prevent cleanup there and do it here as we return to the external caller.
Expand Down Expand Up @@ -1481,7 +1492,7 @@ plugins required to work with sni_routing.
HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::state_api_callout);
ink_assert(pending_action == nullptr);
pending_action = mutex->thread_holding->schedule_in(this, HRTIME_MSECONDS(10));
return 0;
return -1;
}

SMDebug("http", "[%" PRId64 "] calling plugin on hook %s at hook %p", sm_id, HttpDebugNames::get_api_hook_name(cur_hook_id),
Expand Down Expand Up @@ -5117,7 +5128,7 @@ HttpSM::do_http_server_open(bool raw)
return;
}

void
int
HttpSM::do_api_callout_internal()
{
switch (t_state.api_next_action) {
Expand Down Expand Up @@ -5158,7 +5169,7 @@ HttpSM::do_api_callout_internal()
case HttpTransact::SM_ACTION_API_SM_SHUTDOWN:
if (callout_state == HTTP_API_IN_CALLOUT || callout_state == HTTP_API_DEFERED_SERVER_ERROR) {
callout_state = HTTP_API_DEFERED_CLOSE;
return;
return 0;
} else {
cur_hook_id = TS_HTTP_TXN_CLOSE_HOOK;
}
Expand All @@ -5171,7 +5182,7 @@ HttpSM::do_api_callout_internal()
hook_state.init(cur_hook_id, http_global_hooks, ua_txn ? ua_txn->feature_hooks() : nullptr, &api_hooks);
cur_hook = nullptr;
cur_hooks = 0;
state_api_callout(0, nullptr);
return state_api_callout(0, nullptr);
}

VConnection *
Expand Down Expand Up @@ -6856,7 +6867,12 @@ HttpSM::kill_this()
// the terminate_flag
terminate_sm = false;
t_state.api_next_action = HttpTransact::SM_ACTION_API_SM_SHUTDOWN;
do_api_callout();
if (do_api_callout() < 0) { // Failed to get a continuation lock
// Need to hang out until we can complete the TXN_CLOSE hook
terminate_sm = false;
reentrancy_count--;
return;
}
}
// The reentrancy_count is still valid up to this point since
// the api shutdown hook is asynchronous and double frees can
Expand Down
4 changes: 2 additions & 2 deletions proxy/http/HttpSM.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ class HttpSM : public Continuation
void do_cache_prepare_action(HttpCacheSM *c_sm, CacheHTTPInfo *object_read_info, bool retry, bool allow_multiple = false);
void do_cache_delete_all_alts(Continuation *cont);
void do_auth_callout();
void do_api_callout();
void do_api_callout_internal();
int do_api_callout();
int do_api_callout_internal();
void do_redirect();
void redirect_request(const char *redirect_url, const int redirect_len);
void do_drain_request_body(HTTPHdr &response);
Expand Down
12 changes: 2 additions & 10 deletions proxy/http2/Http2ClientSession.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,11 @@ Http2ClientSession::do_io_close(int alerrno)
ink_assert(this->mutex->thread_holding == this_ethread());
send_connection_event(&this->connection_state, HTTP2_SESSION_EVENT_FINI, this);

// Don't send the SSN_CLOSE_HOOK until we got rid of all the streams
// And handled all the TXN_CLOSE_HOOK's
if (client_vc) {
// Copy aside the client address before releasing the vc
cached_client_addr.assign(client_vc->get_remote_addr());
cached_local_addr.assign(client_vc->get_local_addr());
client_vc->do_io_close();
client_vc = nullptr;
}
// client_vc will be closed in Http2ClientSession::free

{
SCOPED_MUTEX_LOCK(lock, this->connection_state.mutex, this_ethread());
this->connection_state.release_stream(nullptr);
this->connection_state.release_stream();
}

this->clear_session_active();
Expand Down
22 changes: 7 additions & 15 deletions proxy/http2/Http2ConnectionState.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ rcv_rst_stream_frame(Http2ConnectionState &cstate, const Http2Frame &frame)
Http2StreamDebug(cstate.ua_session, stream_id, "RST_STREAM: Error Code: %u", rst_stream.error_code);

stream->set_rx_error_code({ProxyErrorClass::TXN, static_cast<uint32_t>(rst_stream.error_code)});
cstate.delete_stream(stream);
stream->initiating_close();
}

return Http2Error(Http2ErrorClass::HTTP2_ERROR_CLASS_NONE);
Expand Down Expand Up @@ -997,8 +997,8 @@ Http2ConnectionState::main_event_handler(int event, void *edata)
ink_assert(this->fini_received == false);
this->fini_received = true;
cleanup_streams();
release_stream();
SET_HANDLER(&Http2ConnectionState::state_closed);
this->release_stream(nullptr);
} break;

case HTTP2_SESSION_EVENT_XMIT: {
Expand Down Expand Up @@ -1286,13 +1286,11 @@ Http2ConnectionState::cleanup_streams()
if (this->tx_error_code.cls != ProxyErrorClass::NONE) {
s->set_tx_error_code(this->tx_error_code);
}
this->delete_stream(s);
s->initiating_close();
ink_assert(s != next);
s = next;
}

ink_assert(stream_list.empty());

if (!is_state_closed()) {
SCOPED_MUTEX_LOCK(lock, this->ua_session->mutex, this_ethread());

Expand Down Expand Up @@ -1355,16 +1353,10 @@ Http2ConnectionState::delete_stream(Http2Stream *stream)
}

void
Http2ConnectionState::release_stream(Http2Stream *stream)
Http2ConnectionState::release_stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is called release_stream by historical reason. It might be time to rename because this is a completely connection specific function now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I completely agree. Leaving that for @a-canary to address in his refactoring.

{
REMEMBER(NO_EVENT, this->recursion)

if (stream) {
// Decrement total_client_streams_count here, because it's a counter include streams in the process of shutting down.
// Other counters (client_streams_in_count/client_streams_out_count) are already decremented in delete_stream().
--total_client_streams_count;
}

SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
if (this->ua_session) {
ink_assert(this->mutex == ua_session->mutex);
Expand Down Expand Up @@ -1460,7 +1452,7 @@ Http2ConnectionState::send_data_frames_depends_on_priority()
}
case Http2SendDataFrameResult::DONE: {
dependency_tree->deactivate(node, len);
delete_stream(stream);
stream->initiating_close();
break;
}
default:
Expand Down Expand Up @@ -1557,7 +1549,7 @@ Http2ConnectionState::send_data_frames(Http2Stream *stream)
if (stream->get_state() == Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL ||
stream->get_state() == Http2StreamState::HTTP2_STREAM_STATE_CLOSED) {
Http2StreamDebug(this->ua_session, stream->get_id(), "Shutdown half closed local stream");
this->delete_stream(stream);
stream->initiating_close();
return;
}

Expand All @@ -1572,7 +1564,7 @@ Http2ConnectionState::send_data_frames(Http2Stream *stream)
// RST_STREAM and WINDOW_UPDATE.
// See 'closed' state written at [RFC 7540] 5.1.
Http2StreamDebug(this->ua_session, stream->get_id(), "Shutdown stream");
this->delete_stream(stream);
stream->initiating_close();
}
}

Expand Down
8 changes: 7 additions & 1 deletion proxy/http2/Http2ConnectionState.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class Http2ConnectionState : public Continuation
Http2Stream *find_stream(Http2StreamId id) const;
void restart_streams();
bool delete_stream(Http2Stream *stream);
void release_stream(Http2Stream *stream);
void release_stream();
void cleanup_streams();

void update_initial_rwnd(Http2WindowSize new_size);
Expand Down Expand Up @@ -230,6 +230,12 @@ class Http2ConnectionState : public Continuation
return client_streams_in_count;
}

void
decrement_stream_count()
{
--total_client_streams_count;
}

double
get_stream_error_rate() const
{
Expand Down
6 changes: 3 additions & 3 deletions proxy/http2/Http2Stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ Http2Stream::terminate_if_possible()

Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
h2_proxy_ssn->connection_state.delete_stream(this);
destroy();
}
}
Expand Down Expand Up @@ -763,8 +762,10 @@ Http2Stream::destroy()
// In many cases, this has been called earlier, so this call is a no-op
h2_proxy_ssn->connection_state.delete_stream(this);

h2_proxy_ssn->connection_state.decrement_stream_count();

// Update session's stream counts, so it accurately goes into keep-alive state
h2_proxy_ssn->connection_state.release_stream(this);
h2_proxy_ssn->connection_state.release_stream();

cid = _proxy_ssn->connection_id();
}
Expand Down Expand Up @@ -911,7 +912,6 @@ void
Http2Stream::release(IOBufferReader *r)
{
super::release(r);
_sm = nullptr; // State machine is on its own way down.
this->do_io_close();
}

Expand Down
10 changes: 5 additions & 5 deletions tests/gold_tests/continuations/openclose.test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
# add response to the server dictionary
server.addResponse("sessionfile.log", request_header, response_header)
ts.Disk.records_config.update({
'proxy.config.diags.debug.enabled': 1,
'proxy.config.diags.debug.enabled': 0,
'proxy.config.diags.debug.tags': 'ssntxnorder_verify.*',
'proxy.config.http.cache.http': 0, # disable cache to simply the test.
'proxy.config.cache.enable_read_while_writer': 0
Expand All @@ -52,8 +52,8 @@
ts.Variables.port, server.Variables.Port)
)

cmd = 'curl -vs http://127.0.0.1:{0}'.format(ts.Variables.port)
numberOfRequests = 25
cmd = 'curl -vs -H "host:oc.test" http://127.0.0.1:{0}'.format(ts.Variables.port)
numberOfRequests = 100

tr = Test.AddTestRun()
# Create a bunch of curl commands to be executed in parallel. Default.Process is set in SpawnCommands.
Expand All @@ -66,14 +66,14 @@
# Execution order is: ts/server, ps(curl cmds), Default Process.
tr.Processes.Default.StartBefore(
server, ready=When.PortOpen(server.Variables.Port))
# Adds a delay once the ts port is ready. This is because we cannot test the ts state.
tr.Processes.Default.StartBefore(ts, ready=10)
tr.Processes.Default.StartBefore(Test.Processes.ts, ready=When.PortOpen(ts.Variables.port))
ts.StartAfter(*ps)
server.StartAfter(*ps)
tr.StillRunningAfter = ts

# Signal that all the curl processes have completed
tr = Test.AddTestRun("Curl Done")
tr.DelayStart = 2 # Delaying a couple seconds to make sure the global continuation's lock contention resolves.
tr.Processes.Default.Command = "traffic_ctl plugin msg done done"
tr.Processes.Default.ReturnCode = 0
tr.Processes.Default.Env = ts.Env
Expand Down
16 changes: 9 additions & 7 deletions tests/gold_tests/continuations/openclose_h2.test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
# add response to the server dictionary
server.addResponse("sessionfile.log", request_header, response_header)
ts.Disk.records_config.update({
'proxy.config.diags.debug.enabled': 1,
'proxy.config.http2.zombie_debug_timeout_in': 10,
'proxy.config.diags.debug.enabled': 0,
'proxy.config.diags.debug.tags': 'ssntxnorder_verify',
'proxy.config.http.cache.http': 0, # disable cache to simply the test.
'proxy.config.cache.enable_read_while_writer': 0,
Expand All @@ -58,15 +59,15 @@
})

ts.Disk.remap_config.AddLine(
'map http://oc.test:{0} http://127.0.0.1:{1}'.format(
ts.Variables.port, server.Variables.Port)
'map https://oc.test:{0} http://127.0.0.1:{1}'.format(
ts.Variables.ssl_port, server.Variables.Port)
)
ts.Disk.ssl_multicert_config.AddLine(
'dest_ip=* ssl_cert_name=server.pem ssl_key_name=server.key'
)

cmd = 'curl -k --http2 -vs https://127.0.0.1:{0}'.format(ts.Variables.ssl_port)
numberOfRequests = 25
cmd = 'curl -k --resolve oc.test:{0}:127.0.0.1 --http2 https://oc.test:{0}'.format(ts.Variables.ssl_port)
numberOfRequests = 100

tr = Test.AddTestRun()
# Create a bunch of curl commands to be executed in parallel. Default.Process is set in SpawnCommands.
Expand All @@ -79,14 +80,15 @@
# Execution order is: ts/server, ps(curl cmds), Default Process.
tr.Processes.Default.StartBefore(
server, ready=When.PortOpen(server.Variables.Port))
# Adds a delay once the ts port is ready. This is because we cannot test the ts state.
tr.Processes.Default.StartBefore(ts, ready=10)
tr.Processes.Default.StartBefore(Test.Processes.ts, ready=When.PortOpen(ts.Variables.ssl_port))
# Don't know why we need both the start before and the start after
ts.StartAfter(*ps)
server.StartAfter(*ps)
tr.StillRunningAfter = ts

# Signal that all the curl processes have completed
tr = Test.AddTestRun("Curl Done")
tr.DelayStart = 2 # Delaying a couple seconds to make sure the global continuation's lock contention resolves.
tr.Processes.Default.Command = "traffic_ctl plugin msg done done"
tr.Processes.Default.ReturnCode = 0
tr.Processes.Default.Env = ts.Env
Expand Down
Loading