Skip to content

Commit

Permalink
Revert "Add class to normalize handling of pending action (apache#7667)"
Browse files Browse the repository at this point in the history
This reverts commit 739994f.
  • Loading branch information
maskit committed May 10, 2021
1 parent 99efb64 commit 747dff2
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 87 deletions.
159 changes: 115 additions & 44 deletions proxy/http/HttpSM.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,9 @@ HttpSM::state_api_callout(int event, void *data)
// This is a reschedule via the tunnel. Just fall through
//
case EVENT_INTERVAL:
if (data != pending_action) {
pending_action->cancel();
}
pending_action = nullptr;
// FALLTHROUGH
case EVENT_NONE:
Expand Down Expand Up @@ -1531,7 +1534,7 @@ plugins required to work with sni_routing.
if (!lock.is_locked()) {
api_timer = -Thread::get_hrtime_updated();
HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::state_api_callout);
ink_release_assert(pending_action.is_empty());
ink_assert(pending_action == nullptr);
pending_action = mutex->thread_holding->schedule_in(this, HRTIME_MSECONDS(10));
return -1;
}
Expand Down Expand Up @@ -1811,7 +1814,7 @@ HttpSM::state_http_server_open(int event, void *data)
SMDebug("http_track", "entered inside state_http_server_open");
STATE_ENTER(&HttpSM::state_http_server_open, event);
ink_release_assert(event == EVENT_INTERVAL || event == NET_EVENT_OPEN || event == NET_EVENT_OPEN_FAILED ||
pending_action.is_empty());
pending_action == nullptr);
if (event != NET_EVENT_OPEN) {
pending_action = nullptr;
}
Expand All @@ -1832,7 +1835,7 @@ HttpSM::state_http_server_open(int event, void *data)
// Since the UnixNetVConnection::action_ or SocksEntry::action_ may be returned from netProcessor.connect_re, and the
// SocksEntry::action_ will be copied into UnixNetVConnection::action_ before call back NET_EVENT_OPEN from SocksEntry::free(),
// so we just compare the Continuation between pending_action and VC's action_.
ink_release_assert(pending_action.is_empty() || pending_action.get_continuation() == vc->get_action()->continuation);
ink_release_assert(pending_action == nullptr || pending_action->continuation == vc->get_action()->continuation);
pending_action = nullptr;

session->new_connection(vc, nullptr, nullptr);
Expand Down Expand Up @@ -2359,8 +2362,12 @@ HttpSM::state_hostdb_lookup(int event, void *data)
opt.timeout = (t_state.api_txn_dns_timeout_value != -1) ? t_state.api_txn_dns_timeout_value : 0;
opt.host_res_style = ats_host_res_from(ua_txn->get_netvc()->get_local_addr()->sa_family, t_state.txn_conf->host_res_data.order);

pending_action = hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info, host_name, 0, opt);
if (pending_action.is_empty()) {
Action *dns_lookup_action_handle =
hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info, host_name, 0, opt);
if (dns_lookup_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = dns_lookup_action_handle;
} else {
call_transact_and_set_next_state(nullptr);
}
} break;
Expand Down Expand Up @@ -2494,13 +2501,13 @@ HttpSM::state_cache_open_write(int event, void *data)

// Make sure we are on the "right" thread
if (ua_txn) {
pending_action = ua_txn->adjust_thread(this, event, data);
if (!pending_action.is_empty()) {
if (pending_action) {
pending_action->cancel();
}
if ((pending_action = ua_txn->adjust_thread(this, event, data))) {
HTTP_INCREMENT_DYN_STAT(http_cache_open_write_adjust_thread_stat);
return 0; // Go away if we reschedule
}
NetVConnection *vc = ua_txn->get_netvc();
ink_release_assert(vc && vc->thread == this_ethread());
}

milestones[TS_MILESTONE_CACHE_OPEN_WRITE_END] = Thread::get_hrtime();
Expand Down Expand Up @@ -4185,7 +4192,13 @@ HttpSM::do_remap_request(bool run_inline)
}

SMDebug("url_rewrite", "Found a remap map entry for [%" PRId64 "], attempting to remap request and call any plugins", sm_id);
pending_action = remapProcessor.perform_remap(this, &t_state);
Action *remap_action_handle = remapProcessor.perform_remap(this, &t_state);

if (remap_action_handle != ACTION_RESULT_DONE) {
SMDebug("url_rewrite", "Still more remapping needed for [%" PRId64 "]", sm_id);
ink_assert(!pending_action);
pending_action = remap_action_handle;
}

return;
}
Expand All @@ -4194,7 +4207,7 @@ void
HttpSM::do_hostdb_lookup()
{
ink_assert(t_state.dns_info.lookup_name != nullptr);
ink_assert(pending_action.is_empty());
ink_assert(pending_action == nullptr);

milestones[TS_MILESTONE_DNS_LOOKUP_BEGIN] = Thread::get_hrtime();

Expand All @@ -4211,8 +4224,13 @@ HttpSM::do_hostdb_lookup()
if (t_state.api_txn_dns_timeout_value != -1) {
opt.timeout = t_state.api_txn_dns_timeout_value;
}
pending_action = hostDBProcessor.getSRVbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_srv_info, d, 0, opt);
if (pending_action.is_empty()) {
Action *srv_lookup_action_handle =
hostDBProcessor.getSRVbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_srv_info, d, 0, opt);

if (srv_lookup_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = srv_lookup_action_handle;
} else {
char *host_name = t_state.dns_info.srv_lookup_success ? t_state.dns_info.srv_hostname : t_state.dns_info.lookup_name;
opt.port = t_state.dns_info.srv_lookup_success ?
t_state.dns_info.srv_port :
Expand All @@ -4224,8 +4242,12 @@ HttpSM::do_hostdb_lookup()
opt.host_res_style =
ats_host_res_from(ua_txn->get_netvc()->get_local_addr()->sa_family, t_state.txn_conf->host_res_data.order);

pending_action = hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info, host_name, 0, opt);
if (pending_action.is_empty()) {
Action *dns_lookup_action_handle =
hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info, host_name, 0, opt);
if (dns_lookup_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = dns_lookup_action_handle;
} else {
call_transact_and_set_next_state(nullptr);
}
}
Expand Down Expand Up @@ -4256,9 +4278,13 @@ HttpSM::do_hostdb_lookup()

opt.host_res_style = ats_host_res_from(ua_txn->get_netvc()->get_local_addr()->sa_family, t_state.txn_conf->host_res_data.order);

pending_action = hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info,
t_state.dns_info.lookup_name, 0, opt);
if (pending_action.is_empty()) {
Action *dns_lookup_action_handle = hostDBProcessor.getbyname_imm(this, (cb_process_result_pfn)&HttpSM::process_hostdb_info,
t_state.dns_info.lookup_name, 0, opt);

if (dns_lookup_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = dns_lookup_action_handle;
} else {
call_transact_and_set_next_state(nullptr);
}
return;
Expand All @@ -4271,14 +4297,18 @@ void
HttpSM::do_hostdb_reverse_lookup()
{
ink_assert(t_state.dns_info.lookup_name != nullptr);
ink_assert(pending_action.is_empty());
ink_assert(pending_action == nullptr);

SMDebug("http_seq", "[HttpSM::do_hostdb_reverse_lookup] Doing reverse DNS Lookup");

IpEndpoint addr;
ats_ip_pton(t_state.dns_info.lookup_name, &addr.sa);
pending_action = hostDBProcessor.getbyaddr_re(this, &addr.sa);
Action *dns_lookup_action_handle = hostDBProcessor.getbyaddr_re(this, &addr.sa);

if (dns_lookup_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = dns_lookup_action_handle;
}
return;
}

Expand Down Expand Up @@ -4664,7 +4694,7 @@ HttpSM::do_cache_lookup_and_read()
{
// TODO decide whether to uncomment after finish testing redirect
// ink_assert(server_session == NULL);
ink_assert(pending_action.is_empty());
ink_assert(pending_action == nullptr);

HTTP_INCREMENT_DYN_STAT(http_cache_lookups_stat);

Expand All @@ -4687,7 +4717,7 @@ HttpSM::do_cache_lookup_and_read()
HttpCacheKey key;
Cache::generate_key(&key, c_url, t_state.txn_conf->cache_generation_number);

pending_action = cache_sm.open_read(
Action *cache_action_handle = cache_sm.open_read(
&key, c_url, &t_state.hdr_info.client_request, t_state.txn_conf,
static_cast<time_t>((t_state.cache_control.pin_in_cache_for < 0) ? 0 : t_state.cache_control.pin_in_cache_for));
//
Expand All @@ -4696,7 +4726,11 @@ HttpSM::do_cache_lookup_and_read()
// optimize the typical open_read/open_read failed/open_write
// sequence.
//
REMEMBER((long)pending_action.get(), reentrancy_count);
if (cache_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = cache_action_handle;
}
REMEMBER((long)pending_action, reentrancy_count);

return;
}
Expand All @@ -4710,9 +4744,17 @@ HttpSM::do_cache_delete_all_alts(Continuation *cont)
SMDebug("http_seq", "[HttpSM::do_cache_delete_all_alts] Issuing cache delete for %s",
t_state.cache_info.lookup_url->string_get_ref());

Action *cache_action_handle = nullptr;

HttpCacheKey key;
Cache::generate_key(&key, t_state.cache_info.lookup_url, t_state.txn_conf->cache_generation_number);
pending_action = cacheProcessor.remove(cont, &key);
cache_action_handle = cacheProcessor.remove(cont, &key);
if (cont != nullptr) {
if (cache_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = cache_action_handle;
}
}

return;
}
Expand Down Expand Up @@ -4762,7 +4804,7 @@ HttpSM::do_cache_prepare_action(HttpCacheSM *c_sm, CacheHTTPInfo *object_read_in
URL *o_url, *s_url;
bool restore_client_request = false;

ink_assert(pending_action.is_empty());
ink_assert(!pending_action);

if (t_state.redirect_info.redirect_in_process) {
o_url = &(t_state.redirect_info.original_url);
Expand Down Expand Up @@ -4791,10 +4833,15 @@ HttpSM::do_cache_prepare_action(HttpCacheSM *c_sm, CacheHTTPInfo *object_read_in
HttpCacheKey key;
Cache::generate_key(&key, s_url, t_state.txn_conf->cache_generation_number);

pending_action =
Action *cache_action_handle =
c_sm->open_write(&key, s_url, &t_state.hdr_info.client_request, object_read_info,
static_cast<time_t>((t_state.cache_control.pin_in_cache_for < 0) ? 0 : t_state.cache_control.pin_in_cache_for),
retry, allow_multiple);

if (cache_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = cache_action_handle;
}
}

void
Expand Down Expand Up @@ -4901,9 +4948,15 @@ HttpSM::do_http_server_open(bool raw)
auto fam_name = ats_ip_family_name(ip_family);
SMDebug("http_track", "entered inside do_http_server_open ][%.*s]", static_cast<int>(fam_name.size()), fam_name.data());

NetVConnection *vc = ua_txn->get_netvc();
ink_release_assert(vc && vc->thread == this_ethread());
// Make sure we are on the "right" thread
if (ua_txn) {
if ((pending_action = ua_txn->adjust_thread(this, EVENT_INTERVAL, nullptr))) {
HTTP_INCREMENT_DYN_STAT(http_origin_connect_adjust_thread_stat);
return; // Go away if we reschedule
}
}
pending_action = nullptr;
ink_assert(server_entry == nullptr);

// Clean up connection tracking info if any. Need to do it now so the selected group
// is consistent with the actual upstream in case of retry.
Expand All @@ -4915,7 +4968,7 @@ HttpSM::do_http_server_open(bool raw)
ink_assert(ua_entry != nullptr || t_state.req_flavor == HttpTransact::REQ_FLAVOR_SCHEDULED_UPDATE ||
t_state.req_flavor == HttpTransact::REQ_FLAVOR_REVPROXY);

ink_assert(pending_action.is_empty());
ink_assert(pending_action == nullptr);
ink_assert(t_state.current.server->dst_addr.port() != 0);

char addrbuf[INET6_ADDRPORTSTRLEN];
Expand Down Expand Up @@ -5132,7 +5185,7 @@ HttpSM::do_http_server_open(bool raw)
if (ccount > t_state.txn_conf->outbound_conntrack.max) {
ct_state.release();

ink_assert(pending_action.is_empty()); // in case of reschedule must not have already pending.
ink_assert(pending_action == nullptr); // in case of reschedule must not have already pending.

ct_state.blocked();
HTTP_INCREMENT_DYN_STAT(http_origin_connections_throttled_stat);
Expand All @@ -5149,6 +5202,8 @@ HttpSM::do_http_server_open(bool raw)
}

// We did not manage to get an existing session and need to open a new connection
Action *connect_action_handle;

NetVCOptions opt;
opt.f_blocking_connect = false;
opt.set_sock_param(t_state.txn_conf->sock_recv_buffer_size_out, t_state.txn_conf->sock_send_buffer_size_out,
Expand Down Expand Up @@ -5241,14 +5296,19 @@ HttpSM::do_http_server_open(bool raw)
opt.set_ssl_servername(t_state.server_info.name);
}

pending_action = sslNetProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
connect_action_handle = sslNetProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
} else {
SMDebug("http", "calling netProcessor.connect_re");
pending_action = netProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
connect_action_handle = netProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
}

if (connect_action_handle != ACTION_RESULT_DONE) {
ink_assert(!pending_action);
pending_action = connect_action_handle;
}

return;
Expand Down Expand Up @@ -6968,10 +7028,11 @@ HttpSM::kill_this()
// state. This is because we are depending on the
// callout to complete for the state machine to
// get killed.
if (callout_state == HTTP_API_NO_CALLOUT && !pending_action.is_empty()) {
if (callout_state == HTTP_API_NO_CALLOUT && pending_action) {
pending_action->cancel();
pending_action = nullptr;
} else if (!pending_action.is_empty()) {
ink_assert(pending_action.is_empty());
} else if (pending_action) {
ink_assert(pending_action == nullptr);
}

cache_sm.end_both();
Expand Down Expand Up @@ -7033,7 +7094,10 @@ HttpSM::kill_this()
// then the value of kill_this_async_done has changed so
// we must check it again
if (kill_this_async_done == true) {
pending_action = nullptr;
if (pending_action) {
pending_action->cancel();
pending_action = nullptr;
}
if (t_state.http_config_param->enable_http_stats) {
update_stats();
}
Expand Down Expand Up @@ -7067,7 +7131,7 @@ HttpSM::kill_this()
plugin_tunnel = nullptr;
}

ink_assert(pending_action.is_empty());
ink_assert(pending_action == nullptr);
ink_release_assert(vc_table.is_table_clear() == true);
ink_release_assert(tunnel.is_tunnel_active() == false);

Expand Down Expand Up @@ -7658,10 +7722,17 @@ HttpSM::set_next_state()
break;
}

case HttpTransact::SM_ACTION_INTERNAL_REQUEST:
case HttpTransact::SM_ACTION_INTERNAL_REQUEST: {
HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::state_handle_stat_page);
pending_action = statPagesManager.handle_http(this, &t_state.hdr_info.client_request);
Action *action_handle = statPagesManager.handle_http(this, &t_state.hdr_info.client_request);

if (action_handle != ACTION_RESULT_DONE) {
ink_assert(pending_action == nullptr);
pending_action = action_handle;
}

break;
}

case HttpTransact::SM_ACTION_ORIGIN_SERVER_RR_MARK_DOWN: {
HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::state_mark_os_down);
Expand Down Expand Up @@ -8064,7 +8135,7 @@ HttpSM::get_http_schedule(int event, void * /* data ATS_UNUSED */)

if (!plugin_lock) {
HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::get_http_schedule);
ink_assert(pending_action.is_empty());
ink_assert(pending_action == nullptr);
pending_action = mutex->thread_holding->schedule_in(this, HRTIME_MSECONDS(10));
return 0;
} else {
Expand Down
Loading

0 comments on commit 747dff2

Please sign in to comment.