diff --git a/proxy/http2/Http2Stream.cc b/proxy/http2/Http2Stream.cc index e317c2304a3..83722b89256 100644 --- a/proxy/http2/Http2Stream.cc +++ b/proxy/http2/Http2Stream.cc @@ -73,7 +73,13 @@ Http2Stream::main_event_handler(int event, void *edata) Event *e = static_cast(edata); reentrancy_count++; - if (e == cross_thread_event) { + if (e == _read_vio_event) { + this->signal_read_event(e->callback_event); + return 0; + } else if (e == _write_vio_event) { + this->signal_write_event(e->callback_event); + return 0; + } else if (e == cross_thread_event) { cross_thread_event = nullptr; } else if (e == active_event) { event = VC_EVENT_ACTIVE_TIMEOUT; @@ -95,25 +101,9 @@ Http2Stream::main_event_handler(int event, void *edata) case VC_EVENT_ACTIVE_TIMEOUT: case VC_EVENT_INACTIVITY_TIMEOUT: if (_sm && read_vio.ntodo() > 0) { - MUTEX_TRY_LOCK(lock, read_vio.mutex, this_ethread()); - if (lock.is_locked()) { - read_vio.cont->handleEvent(event, &read_vio); - } else { - if (this->_read_vio_event) { - this->_read_vio_event->cancel(); - } - this->_read_vio_event = this_ethread()->schedule_imm(read_vio.cont, event, &read_vio); - } + this->signal_read_event(event); } else if (_sm && write_vio.ntodo() > 0) { - MUTEX_TRY_LOCK(lock, write_vio.mutex, this_ethread()); - if (lock.is_locked()) { - write_vio.cont->handleEvent(event, &write_vio); - } else { - if (this->_write_vio_event) { - this->_write_vio_event->cancel(); - } - this->_write_vio_event = this_ethread()->schedule_imm(write_vio.cont, event, &write_vio); - } + this->signal_write_event(event); } break; case VC_EVENT_WRITE_READY: @@ -121,15 +111,7 @@ Http2Stream::main_event_handler(int event, void *edata) inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; if (e->cookie == &write_vio) { if (write_vio.mutex && write_vio.cont && this->_sm) { - MUTEX_TRY_LOCK(lock, write_vio.mutex, this_ethread()); - if (lock.is_locked()) { - write_vio.cont->handleEvent(event, &write_vio); - } else { - if (this->_write_vio_event) { - this->_write_vio_event->cancel(); - } - this->_write_vio_event = this_ethread()->schedule_imm(write_vio.cont, event, &write_vio); - } + this->signal_write_event(event); } } else { update_write_request(write_vio.get_reader(), INT64_MAX, true); @@ -140,15 +122,7 @@ Http2Stream::main_event_handler(int event, void *edata) inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; if (e->cookie == &read_vio) { if (read_vio.mutex && read_vio.cont && this->_sm) { - MUTEX_TRY_LOCK(lock, read_vio.mutex, this_ethread()); - if (lock.is_locked()) { - read_vio.cont->handleEvent(event, &read_vio); - } else { - if (this->_read_vio_event) { - this->_read_vio_event->cancel(); - } - this->_read_vio_event = this_ethread()->schedule_imm(read_vio.cont, event, &read_vio); - } + signal_read_event(event); } } else { this->update_read_request(INT64_MAX, true); @@ -671,7 +645,26 @@ Http2Stream::signal_read_event(int event) if (this->_read_vio_event) { this->_read_vio_event->cancel(); } - this->_read_vio_event = this_ethread()->schedule_imm(read_vio.cont, event, &read_vio); + this->_read_vio_event = this_ethread()->schedule_in(this, retry_delay, event, &read_vio); + } +} + +void +Http2Stream::signal_write_event(int event) +{ + if (this->write_vio.cont == nullptr || this->write_vio.cont->mutex == nullptr || this->write_vio.op == VIO::NONE) { + return; + } + + MUTEX_TRY_LOCK(lock, write_vio.cont->mutex, this_ethread()); + if (lock.is_locked()) { + inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; + this->write_vio.cont->handleEvent(event, &this->write_vio); + } else { + if (this->_write_vio_event) { + this->_write_vio_event->cancel(); + } + this->_write_vio_event = this_ethread()->schedule_in(this, retry_delay, event, &write_vio); } } diff --git a/proxy/http2/Http2Stream.h b/proxy/http2/Http2Stream.h index cc8bffc2c43..df560a1c386 100644 --- a/proxy/http2/Http2Stream.h +++ b/proxy/http2/Http2Stream.h @@ -49,7 +49,8 @@ enum class Http2StreamMilestone { class Http2Stream : public ProxyTransaction { public: - using super = ProxyTransaction; ///< Parent type. + const int retry_delay = HRTIME_MSECONDS(10); + using super = ProxyTransaction; ///< Parent type. Http2Stream(Http2StreamId sid = 0, ssize_t initial_rwnd = Http2::initial_window_size); @@ -75,6 +76,7 @@ class Http2Stream : public ProxyTransaction void update_write_request(IOBufferReader *buf_reader, int64_t write_len, bool send_update); void signal_read_event(int event); + void signal_write_event(int event); void signal_write_event(bool call_update); void restart_sending();