Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions iocore/net/I_NetProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class NetProcessor : public Processor
call back with success. If this behaviour is desired use
synchronous connect connet_s method.

@see connect_s()

@param cont Continuation to be called back with events.
@param addr target address and port to connect to.
@param options @see NetVCOptions.
Expand All @@ -184,6 +186,27 @@ class NetProcessor : public Processor

inkcoreapi Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *options = nullptr);

/**
Open a NetVConnection for connection oriented I/O. This call
is simliar to connect method except that the cont is called
back only after the connections has been established. In the
case of connect the cont could be called back with NET_EVENT_OPEN
event and OS could still be in the process of establishing the
connection. Re-entrant Callbacks: same as connect. If unix
asynchronous type connect is desired use connect_re().

@param cont Continuation to be called back with events.
@param addr Address to which to connect (includes port).
@param timeout for connect, the cont will get NET_EVENT_OPEN_FAILED
if connection could not be established for timeout msecs. The
default is 30 secs.
@param options @see NetVCOptions.

@see connect_re()

*/
Action *connect_s(Continuation *cont, sockaddr const *addr, int timeout = NET_CONNECT_TIMEOUT, NetVCOptions *opts = nullptr);

/**
Initializes the net processor. This must be called before the event threads are started.

Expand Down
10 changes: 0 additions & 10 deletions iocore/net/P_SSLNetVConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,6 @@ class SSLNetVConnection : public UnixNetVConnection
write.enabled = 1;
}

bool
trackFirstHandshake() override
{
bool retval = sslHandshakeBeginTime == 0;
if (retval) {
sslHandshakeBeginTime = Thread::get_hrtime();
}
return retval;
}

bool
getSSLHandShakeComplete() const override
{
Expand Down
6 changes: 0 additions & 6 deletions iocore/net/P_UnixNetVConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,6 @@ class UnixNetVConnection : public NetVConnection
return (true);
}

virtual bool
trackFirstHandshake()
{
return false;
}

virtual void net_read_io(NetHandler *nh, EThread *lthread);
virtual int64_t load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf, int64_t &total_written, int &needs);
void readDisable(NetHandler *nh);
Expand Down
116 changes: 116 additions & 0 deletions iocore/net/UnixNetProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,122 @@ UnixNetProcessor::connect(Continuation *cont, UnixNetVConnection ** /* avc */, s
return connect_re(cont, target, opt);
}

struct CheckConnect : public Continuation {
UnixNetVConnection *vc;
Action action_;
MIOBuffer *buf;
IOBufferReader *reader;
int connect_status;
int recursion;
ink_hrtime timeout;

int
handle_connect(int event, Event *e)
{
connect_status = event;
switch (event) {
case NET_EVENT_OPEN:
vc = (UnixNetVConnection *)e;
Debug("iocore_net_connect", "connect Net open");
vc->do_io_write(this, 10, /* some non-zero number just to get the poll going */
reader);
/* dont wait for more than timeout secs */
vc->set_inactivity_timeout(timeout);
return EVENT_CONT;
break;

case NET_EVENT_OPEN_FAILED:
Debug("iocore_net_connect", "connect Net open failed");
if (!action_.cancelled) {
action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)e);
}
break;

case VC_EVENT_WRITE_READY:
int sl, ret;
socklen_t sz;
if (!action_.cancelled) {
sz = sizeof(int);
ret = getsockopt(vc->con.fd, SOL_SOCKET, SO_ERROR, (char *)&sl, &sz);
if (!ret && sl == 0) {
Debug("iocore_net_connect", "connection established");
/* disable write on vc */
vc->write.enabled = 0;
vc->cancel_inactivity_timeout();
// write_disable(get_NetHandler(this_ethread()), vc);
/* clean up vc fields */
vc->write.vio.nbytes = 0;
vc->write.vio.op = VIO::NONE;
vc->write.vio.buffer.clear();

action_.continuation->handleEvent(NET_EVENT_OPEN, vc);
delete this;
return EVENT_DONE;
}
}
vc->do_io_close();
if (!action_.cancelled) {
action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)-ENET_CONNECT_FAILED);
}
break;
case VC_EVENT_INACTIVITY_TIMEOUT:
Debug("iocore_net_connect", "connect timed out");
vc->do_io_close();
if (!action_.cancelled) {
action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)-ENET_CONNECT_TIMEOUT);
}
break;
default:
ink_assert(!"unknown connect event");
if (!action_.cancelled) {
action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)-ENET_CONNECT_FAILED);
}
}
if (!recursion) {
delete this;
}
return EVENT_DONE;
}

Action *
connect_s(Continuation *cont, sockaddr const *target, int _timeout, NetVCOptions *opt)
{
action_ = cont;
timeout = HRTIME_SECONDS(_timeout);
recursion++;
netProcessor.connect_re(this, target, opt);
recursion--;
if (connect_status != NET_EVENT_OPEN_FAILED) {
return &action_;
} else {
delete this;
return ACTION_RESULT_DONE;
}
}

explicit CheckConnect(Ptr<ProxyMutex> &m) : Continuation(m.get()), vc(nullptr), connect_status(-1), recursion(0), timeout(0)
{
SET_HANDLER(&CheckConnect::handle_connect);
buf = new_empty_MIOBuffer(1);
reader = buf->alloc_reader();
}

~CheckConnect() override
{
buf->dealloc_all_readers();
buf->clear();
free_MIOBuffer(buf);
}
};

Action *
NetProcessor::connect_s(Continuation *cont, sockaddr const *target, int timeout, NetVCOptions *opt)
{
Debug("iocore_net_connect", "NetProcessor::connect_s called");
CheckConnect *c = new CheckConnect(cont->mutex);
return c->connect_s(cont, target, timeout, opt);
}

struct PollCont;

// This needs to be called before the ET_NET threads are started.
Expand Down
8 changes: 0 additions & 8 deletions iocore/net/UnixNetVConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,6 @@ write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
// This function will always return true unless
// vc is an SSLNetVConnection.
if (!vc->getSSLHandShakeComplete()) {
if (vc->trackFirstHandshake()) {
// Eat the first write-ready. Until the TLS handshake is complete,
// we should still be under the connect timeout and shouldn't bother
// the state machine until the TLS handshake is complete
vc->write.triggered = 0;
nh->write_ready_list.remove(vc);
}

int err, ret;

if (vc->get_context() == NET_VCONNECTION_OUT) {
Expand Down
52 changes: 27 additions & 25 deletions proxy/http/HttpSM.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,7 @@ HttpSM::state_read_push_response_header(int event, void *data)

//////////////////////////////////////////////////////////////////////////////
//
// HttpSM::state_raw_http_server_open()
// HttpSM::state_http_server_open()
//
//////////////////////////////////////////////////////////////////////////////
int
Expand Down Expand Up @@ -1751,29 +1751,6 @@ HttpSM::state_http_server_open(int event, void *data)
} else {
session->to_parent_proxy = false;
}
if (plugin_tunnel_type == HTTP_NO_PLUGIN_TUNNEL) {
SMDebug("http", "[%" PRId64 "] setting handler for TCP handshake", sm_id);
// Just want to get a write-ready event so we know that the TCP handshake is complete.
server_entry->vc_handler = &HttpSM::state_http_server_open;
server_entry->write_vio = server_session->do_io_write(this, 1, server_session->get_reader());
} else { // in the case of an intercept plugin don't to the connect timeout change
SMDebug("http", "[%" PRId64 "] not setting handler for TCP handshake", sm_id);
handle_http_server_open();
}
return 0;
case VC_EVENT_READ_COMPLETE:
case VC_EVENT_WRITE_READY:
case VC_EVENT_WRITE_COMPLETE:
// Update the time out to the regular connection timeout.
SMDebug("http_ss", "[%" PRId64 "] TCP Handshake complete", sm_id);
server_entry->vc_handler = &HttpSM::state_send_server_request_header;

// Reset the timeout to the non-connect timeout
if (t_state.api_txn_no_activity_timeout_value != -1) {
server_session->get_netvc()->set_inactivity_timeout(HRTIME_MSECONDS(t_state.api_txn_no_activity_timeout_value));
} else {
server_session->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_out));
}
handle_http_server_open();
return 0;
case EVENT_INTERVAL: // Delayed call from another thread
Expand Down Expand Up @@ -5021,11 +4998,36 @@ HttpSM::do_http_server_open(bool raw)
connect_action_handle = sslNetProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
} else {
} else if (t_state.method != HTTP_WKSIDX_CONNECT && t_state.method != HTTP_WKSIDX_POST && t_state.method != HTTP_WKSIDX_PUT) {
SMDebug("http", "calling netProcessor.connect_re");
connect_action_handle = netProcessor.connect_re(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
&opt);
} else {
// The request transform would be applied to POST and/or PUT request.
// The server_vc should be established (writeable) before request transform start.
// The CheckConnect is created by connect_s,
// It will callback NET_EVENT_OPEN to HttpSM if server_vc is WRITE_READY,
// Otherwise NET_EVENT_OPEN_FAILED is callbacked.
MgmtInt connect_timeout;

ink_assert(t_state.method == HTTP_WKSIDX_CONNECT || t_state.method == HTTP_WKSIDX_POST || t_state.method == HTTP_WKSIDX_PUT);

// Set the inactivity timeout to the connect timeout so that we
// we fail this server if it doesn't start sending the response
// header
if (t_state.method == HTTP_WKSIDX_POST || t_state.method == HTTP_WKSIDX_PUT) {
connect_timeout = t_state.txn_conf->post_connect_attempts_timeout;
} else if (t_state.current.server == &t_state.parent_info) {
connect_timeout = t_state.txn_conf->parent_connect_timeout;
} else {
connect_timeout = t_state.txn_conf->connect_attempts_timeout;
}

SMDebug("http", "calling netProcessor.connect_s");
connect_action_handle = netProcessor.connect_s(this, // state machine
&t_state.current.server->dst_addr.sa, // addr + port
connect_timeout, &opt);
}

if (connect_action_handle != ACTION_RESULT_DONE) {
Expand Down