Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add class to normalize handling of pending action #7667

Merged
merged 2 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 44 additions & 115 deletions proxy/http/HttpSM.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1474,9 +1474,6 @@ HttpSM::state_api_callout(int event, void *data)
// This is a reschedule via the tunnel. Just fall through
//
case EVENT_INTERVAL:
if (data != pending_action) {
pending_action->cancel();
}
pending_action = nullptr;
// FALLTHROUGH
case EVENT_NONE:
Expand Down Expand Up @@ -1527,7 +1524,7 @@ plugins required to work with sni_routing.
if (!lock.is_locked()) {
api_timer = -Thread::get_hrtime_updated();
HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::state_api_callout);
ink_assert(pending_action == nullptr);
ink_release_assert(pending_action.is_empty());
pending_action = mutex->thread_holding->schedule_in(this, HRTIME_MSECONDS(10));
return -1;
}
Expand Down Expand Up @@ -1807,7 +1804,7 @@ HttpSM::state_http_server_open(int event, void *data)
SMDebug("http_track", "entered inside state_http_server_open");
STATE_ENTER(&HttpSM::state_http_server_open, event);
ink_release_assert(event == EVENT_INTERVAL || event == NET_EVENT_OPEN || event == NET_EVENT_OPEN_FAILED ||
pending_action == nullptr);
pending_action.is_empty());
if (event != NET_EVENT_OPEN) {
pending_action = nullptr;
}
Expand All @@ -1828,7 +1825,7 @@ HttpSM::state_http_server_open(int event, void *data)
// Since the UnixNetVConnection::action_ or SocksEntry::action_ may be returned from netProcessor.connect_re, and the
// SocksEntry::action_ will be copied into UnixNetVConnection::action_ before call back NET_EVENT_OPEN from SocksEntry::free(),
// so we just compare the Continuation between pending_action and VC's action_.
ink_release_assert(pending_action == nullptr || pending_action->continuation == vc->get_action()->continuation);
ink_release_assert(pending_action.is_empty() || pending_action.get_continuation() == vc->get_action()->continuation);
pending_action = nullptr;

session->new_connection(vc, nullptr, nullptr);
Expand Down Expand Up @@ -2343,12 +2340,8 @@ HttpSM::state_hostdb_lookup(int event, void *data)
opt.timeout = (t_state.api_txn_dns_timeout_value != -1) ? t_state.api_txn_dns_timeout_value : 0;
opt.host_res_style = ats_host_res_from(ua_txn->get_netvc()->get_local_addr()->sa_family, t_state.txn_conf->host_res_data.order);

Action *dns_lookup_action_handle =
hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info, host_name, 0, opt);
if (dns_lookup_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = dns_lookup_action_handle;
} else {
pending_action = hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info, host_name, 0, opt);
if (pending_action.is_empty()) {
call_transact_and_set_next_state(nullptr);
}
} break;
Expand Down Expand Up @@ -2482,13 +2475,13 @@ HttpSM::state_cache_open_write(int event, void *data)

// Make sure we are on the "right" thread
if (ua_txn) {
if (pending_action) {
pending_action->cancel();
}
if ((pending_action = ua_txn->adjust_thread(this, event, data))) {
pending_action = ua_txn->adjust_thread(this, event, data);
if (!pending_action.is_empty()) {
HTTP_INCREMENT_DYN_STAT(http_cache_open_write_adjust_thread_stat);
return 0; // Go away if we reschedule
}
NetVConnection *vc = ua_txn->get_netvc();
ink_release_assert(vc && vc->thread == this_ethread());
}

milestones[TS_MILESTONE_CACHE_OPEN_WRITE_END] = Thread::get_hrtime();
Expand Down Expand Up @@ -4149,13 +4142,7 @@ HttpSM::do_remap_request(bool run_inline)
}

SMDebug("url_rewrite", "Found a remap map entry for [%" PRId64 "], attempting to remap request and call any plugins", sm_id);
Action *remap_action_handle = remapProcessor.perform_remap(this, &t_state);

if (remap_action_handle != ACTION_RESULT_DONE) {
SMDebug("url_rewrite", "Still more remapping needed for [%" PRId64 "]", sm_id);
ink_assert(!pending_action);
pending_action = remap_action_handle;
}
pending_action = remapProcessor.perform_remap(this, &t_state);

return;
}
Expand All @@ -4164,7 +4151,7 @@ void
HttpSM::do_hostdb_lookup()
{
ink_assert(t_state.dns_info.lookup_name != nullptr);
ink_assert(pending_action == nullptr);
ink_assert(pending_action.is_empty());

milestones[TS_MILESTONE_DNS_LOOKUP_BEGIN] = Thread::get_hrtime();

Expand All @@ -4181,13 +4168,8 @@ HttpSM::do_hostdb_lookup()
if (t_state.api_txn_dns_timeout_value != -1) {
opt.timeout = t_state.api_txn_dns_timeout_value;
}
Action *srv_lookup_action_handle =
hostDBProcessor.getSRVbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_srv_info, d, 0, opt);

if (srv_lookup_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = srv_lookup_action_handle;
} else {
pending_action = hostDBProcessor.getSRVbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_srv_info, d, 0, opt);
if (pending_action.is_empty()) {
char *host_name = t_state.dns_info.srv_lookup_success ? t_state.dns_info.srv_hostname : t_state.dns_info.lookup_name;
opt.port = t_state.dns_info.srv_lookup_success ?
t_state.dns_info.srv_port :
Expand All @@ -4199,12 +4181,8 @@ HttpSM::do_hostdb_lookup()
opt.host_res_style =
ats_host_res_from(ua_txn->get_netvc()->get_local_addr()->sa_family, t_state.txn_conf->host_res_data.order);

Action *dns_lookup_action_handle =
hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info, host_name, 0, opt);
if (dns_lookup_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = dns_lookup_action_handle;
} else {
pending_action = hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info, host_name, 0, opt);
if (pending_action.is_empty()) {
call_transact_and_set_next_state(nullptr);
}
}
Expand Down Expand Up @@ -4235,13 +4213,9 @@ HttpSM::do_hostdb_lookup()

opt.host_res_style = ats_host_res_from(ua_txn->get_netvc()->get_local_addr()->sa_family, t_state.txn_conf->host_res_data.order);

Action *dns_lookup_action_handle = hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info,
t_state.dns_info.lookup_name, 0, opt);

if (dns_lookup_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = dns_lookup_action_handle;
} else {
pending_action = hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info,
t_state.dns_info.lookup_name, 0, opt);
if (pending_action.is_empty()) {
call_transact_and_set_next_state(nullptr);
}
return;
Expand All @@ -4254,18 +4228,14 @@ void
HttpSM::do_hostdb_reverse_lookup()
{
ink_assert(t_state.dns_info.lookup_name != nullptr);
ink_assert(pending_action == nullptr);
ink_assert(pending_action.is_empty());

SMDebug("http_seq", "[HttpSM::do_hostdb_reverse_lookup] Doing reverse DNS Lookup");

IpEndpoint addr;
ats_ip_pton(t_state.dns_info.lookup_name, &addr.sa);
Action *dns_lookup_action_handle = hostDBProcessor.getbyaddr_re(this, &addr.sa);
pending_action = hostDBProcessor.getbyaddr_re(this, &addr.sa);

if (dns_lookup_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = dns_lookup_action_handle;
}
return;
}

Expand Down Expand Up @@ -4651,7 +4621,7 @@ HttpSM::do_cache_lookup_and_read()
{
// TODO decide whether to uncomment after finish testing redirect
// ink_assert(server_session == NULL);
ink_assert(pending_action == nullptr);
ink_assert(pending_action.is_empty());

HTTP_INCREMENT_DYN_STAT(http_cache_lookups_stat);

Expand All @@ -4674,7 +4644,7 @@ HttpSM::do_cache_lookup_and_read()
HttpCacheKey key;
Cache::generate_key(&key, c_url, t_state.txn_conf->cache_generation_number);

Action *cache_action_handle = cache_sm.open_read(
pending_action = cache_sm.open_read(
&key, c_url, &t_state.hdr_info.client_request, t_state.txn_conf,
static_cast<time_t>((t_state.cache_control.pin_in_cache_for < 0) ? 0 : t_state.cache_control.pin_in_cache_for));
//
Expand All @@ -4683,11 +4653,7 @@ HttpSM::do_cache_lookup_and_read()
// optimize the typical open_read/open_read failed/open_write
// sequence.
//
if (cache_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = cache_action_handle;
}
REMEMBER((long)pending_action, reentrancy_count);
REMEMBER((long)pending_action.get(), reentrancy_count);

return;
}
Expand All @@ -4701,17 +4667,9 @@ HttpSM::do_cache_delete_all_alts(Continuation *cont)
SMDebug("http_seq", "[HttpSM::do_cache_delete_all_alts] Issuing cache delete for %s",
t_state.cache_info.lookup_url->string_get_ref());

Action *cache_action_handle = nullptr;

HttpCacheKey key;
Cache::generate_key(&key, t_state.cache_info.lookup_url, t_state.txn_conf->cache_generation_number);
cache_action_handle = cacheProcessor.remove(cont, &key);
if (cont != nullptr) {
if (cache_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = cache_action_handle;
}
}
pending_action = cacheProcessor.remove(cont, &key);

return;
}
Expand Down Expand Up @@ -4761,7 +4719,7 @@ HttpSM::do_cache_prepare_action(HttpCacheSM *c_sm, CacheHTTPInfo *object_read_in
URL *o_url, *s_url;
bool restore_client_request = false;

ink_assert(!pending_action);
ink_assert(pending_action.is_empty());

if (t_state.redirect_info.redirect_in_process) {
o_url = &(t_state.redirect_info.original_url);
Expand Down Expand Up @@ -4790,15 +4748,10 @@ HttpSM::do_cache_prepare_action(HttpCacheSM *c_sm, CacheHTTPInfo *object_read_in
HttpCacheKey key;
Cache::generate_key(&key, s_url, t_state.txn_conf->cache_generation_number);

Action *cache_action_handle =
pending_action =
c_sm->open_write(&key, s_url, &t_state.hdr_info.client_request, object_read_info,
static_cast<time_t>((t_state.cache_control.pin_in_cache_for < 0) ? 0 : t_state.cache_control.pin_in_cache_for),
retry, allow_multiple);

if (cache_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = cache_action_handle;
}
}

void
Expand Down Expand Up @@ -4898,15 +4851,9 @@ HttpSM::do_http_server_open(bool raw)
auto fam_name = ats_ip_family_name(ip_family);
SMDebug("http_track", "entered inside do_http_server_open ][%.*s]", static_cast<int>(fam_name.size()), fam_name.data());

// Make sure we are on the "right" thread
if (ua_txn) {
if ((pending_action = ua_txn->adjust_thread(this, EVENT_INTERVAL, nullptr))) {
HTTP_INCREMENT_DYN_STAT(http_origin_connect_adjust_thread_stat);
return; // Go away if we reschedule
}
}
NetVConnection *vc = ua_txn->get_netvc();
ink_release_assert(vc && vc->thread == this_ethread());
pending_action = nullptr;
ink_assert(server_entry == nullptr);

// Clean up connection tracking info if any. Need to do it now so the selected group
// is consistent with the actual upstream in case of retry.
Expand All @@ -4918,7 +4865,7 @@ HttpSM::do_http_server_open(bool raw)
ink_assert(ua_entry != nullptr || t_state.req_flavor == HttpTransact::REQ_FLAVOR_SCHEDULED_UPDATE ||
t_state.req_flavor == HttpTransact::REQ_FLAVOR_REVPROXY);

ink_assert(pending_action == nullptr);
ink_assert(pending_action.is_empty());
ink_assert(t_state.current.server->dst_addr.port() != 0);

char addrbuf[INET6_ADDRPORTSTRLEN];
Expand Down Expand Up @@ -5135,7 +5082,7 @@ HttpSM::do_http_server_open(bool raw)
if (ccount > t_state.txn_conf->outbound_conntrack.max) {
ct_state.release();

ink_assert(pending_action == nullptr); // in case of reschedule must not have already pending.
ink_assert(pending_action.is_empty()); // in case of reschedule must not have already pending.

ct_state.blocked();
HTTP_INCREMENT_DYN_STAT(http_origin_connections_throttled_stat);
Expand All @@ -5152,8 +5099,6 @@ HttpSM::do_http_server_open(bool raw)
}

// We did not manage to get an existing session and need to open a new connection
Action *connect_action_handle;

NetVCOptions opt;
opt.f_blocking_connect = false;
opt.set_sock_param(t_state.txn_conf->sock_recv_buffer_size_out, t_state.txn_conf->sock_send_buffer_size_out,
Expand Down Expand Up @@ -5246,19 +5191,14 @@ HttpSM::do_http_server_open(bool raw)
opt.set_ssl_servername(t_state.server_info.name);
}

connect_action_handle = sslNetProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
pending_action = sslNetProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
} else {
SMDebug("http", "calling netProcessor.connect_re");
connect_action_handle = netProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
}

if (connect_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = connect_action_handle;
pending_action = netProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
}

return;
Expand Down Expand Up @@ -7002,11 +6942,10 @@ HttpSM::kill_this()
// state. This is because we are depending on the
// callout to complete for the state machine to
// get killed.
if (callout_state == HTTP_API_NO_CALLOUT && pending_action) {
pending_action->cancel();
if (callout_state == HTTP_API_NO_CALLOUT && !pending_action.is_empty()) {
pending_action = nullptr;
} else if (pending_action) {
ink_assert(pending_action == nullptr);
} else if (!pending_action.is_empty()) {
ink_assert(pending_action.is_empty());
}

cache_sm.end_both();
Expand Down Expand Up @@ -7068,10 +7007,7 @@ HttpSM::kill_this()
// then the value of kill_this_async_done has changed so
// we must check it again
if (kill_this_async_done == true) {
if (pending_action) {
pending_action->cancel();
pending_action = nullptr;
}
pending_action = nullptr;
if (t_state.http_config_param->enable_http_stats) {
update_stats();
}
Expand Down Expand Up @@ -7105,7 +7041,7 @@ HttpSM::kill_this()
plugin_tunnel = nullptr;
}

ink_assert(pending_action == nullptr);
ink_assert(pending_action.is_empty());
ink_release_assert(vc_table.is_table_clear() == true);
ink_release_assert(tunnel.is_tunnel_active() == false);

Expand Down Expand Up @@ -7702,17 +7638,10 @@ HttpSM::set_next_state()
break;
}

case HttpTransact::SM_ACTION_INTERNAL_REQUEST: {
case HttpTransact::SM_ACTION_INTERNAL_REQUEST:
HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::state_handle_stat_page);
Action *action_handle = statPagesManager.handle_http(this, &t_state.hdr_info.client_request);

if (action_handle != ACTION_RESULT_DONE) {
ink_assert(pending_action == nullptr);
pending_action = action_handle;
}

pending_action = statPagesManager.handle_http(this, &t_state.hdr_info.client_request);
break;
}

case HttpTransact::SM_ACTION_ORIGIN_SERVER_RR_MARK_DOWN: {
HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::state_mark_os_down);
Expand Down Expand Up @@ -8112,7 +8041,7 @@ HttpSM::get_http_schedule(int event, void * /* data ATS_UNUSED */)

if (!plugin_lock) {
HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::get_http_schedule);
ink_assert(pending_action == nullptr);
ink_assert(pending_action.is_empty());
pending_action = mutex->thread_holding->schedule_in(this, HRTIME_MSECONDS(10));
return 0;
} else {
Expand Down
Loading