Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 31 additions & 38 deletions proxy/http2/Http2Stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ Http2Stream::main_event_handler(int event, void *edata)

Event *e = static_cast<Event *>(edata);
reentrancy_count++;
if (e == cross_thread_event) {
if (e == _read_vio_event) {
this->signal_read_event(e->callback_event);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to set _read_vio_event nullptr before signal_read_event() call. Otherwise, _read_vio_event->cancel() will be called when the mutex could not be locked again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpicking] e->callback_event is same to event, right? Using event looks clear in these event handlers.

return 0;
} else if (e == _write_vio_event) {
this->signal_write_event(e->callback_event);
return 0;
} else if (e == cross_thread_event) {
cross_thread_event = nullptr;
} else if (e == active_event) {
event = VC_EVENT_ACTIVE_TIMEOUT;
Expand All @@ -95,41 +101,17 @@ Http2Stream::main_event_handler(int event, void *edata)
case VC_EVENT_ACTIVE_TIMEOUT:
case VC_EVENT_INACTIVITY_TIMEOUT:
if (_sm && read_vio.ntodo() > 0) {
MUTEX_TRY_LOCK(lock, read_vio.mutex, this_ethread());
if (lock.is_locked()) {
read_vio.cont->handleEvent(event, &read_vio);
} else {
if (this->_read_vio_event) {
this->_read_vio_event->cancel();
}
this->_read_vio_event = this_ethread()->schedule_imm(read_vio.cont, event, &read_vio);
}
this->signal_read_event(event);
} else if (_sm && write_vio.ntodo() > 0) {
MUTEX_TRY_LOCK(lock, write_vio.mutex, this_ethread());
if (lock.is_locked()) {
write_vio.cont->handleEvent(event, &write_vio);
} else {
if (this->_write_vio_event) {
this->_write_vio_event->cancel();
}
this->_write_vio_event = this_ethread()->schedule_imm(write_vio.cont, event, &write_vio);
}
this->signal_write_event(event);
}
break;
case VC_EVENT_WRITE_READY:
case VC_EVENT_WRITE_COMPLETE:
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (e->cookie == &write_vio) {
if (write_vio.mutex && write_vio.cont && this->_sm) {
MUTEX_TRY_LOCK(lock, write_vio.mutex, this_ethread());
if (lock.is_locked()) {
write_vio.cont->handleEvent(event, &write_vio);
} else {
if (this->_write_vio_event) {
this->_write_vio_event->cancel();
}
this->_write_vio_event = this_ethread()->schedule_imm(write_vio.cont, event, &write_vio);
}
this->signal_write_event(event);
}
} else {
update_write_request(write_vio.get_reader(), INT64_MAX, true);
Expand All @@ -140,15 +122,7 @@ Http2Stream::main_event_handler(int event, void *edata)
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (e->cookie == &read_vio) {
if (read_vio.mutex && read_vio.cont && this->_sm) {
MUTEX_TRY_LOCK(lock, read_vio.mutex, this_ethread());
if (lock.is_locked()) {
read_vio.cont->handleEvent(event, &read_vio);
} else {
if (this->_read_vio_event) {
this->_read_vio_event->cancel();
}
this->_read_vio_event = this_ethread()->schedule_imm(read_vio.cont, event, &read_vio);
}
signal_read_event(event);
}
} else {
this->update_read_request(INT64_MAX, true);
Expand Down Expand Up @@ -671,7 +645,26 @@ Http2Stream::signal_read_event(int event)
if (this->_read_vio_event) {
this->_read_vio_event->cancel();
}
this->_read_vio_event = this_ethread()->schedule_imm(read_vio.cont, event, &read_vio);
this->_read_vio_event = this_ethread()->schedule_in(this, retry_delay, event, &read_vio);
}
}

void
Http2Stream::signal_write_event(int event)
{
if (this->write_vio.cont == nullptr || this->write_vio.cont->mutex == nullptr || this->write_vio.op == VIO::NONE) {
return;
}

MUTEX_TRY_LOCK(lock, write_vio.cont->mutex, this_ethread());
if (lock.is_locked()) {
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
this->write_vio.cont->handleEvent(event, &this->write_vio);
} else {
if (this->_write_vio_event) {
this->_write_vio_event->cancel();
}
this->_write_vio_event = this_ethread()->schedule_in(this, retry_delay, event, &write_vio);
Copy link
Member

@maskit maskit Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, we don't need to pass write_vio because it always use THE write_vio that Http2Stream has, right?

Is retry_delay required? Should we change signal_read_event to use it too (on maybe another PR)?

Edit: I found signal_read_event already use retry_delay on this PR.

}
}

Expand Down
4 changes: 3 additions & 1 deletion proxy/http2/Http2Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ enum class Http2StreamMilestone {
class Http2Stream : public ProxyTransaction
{
public:
using super = ProxyTransaction; ///< Parent type.
const int retry_delay = HRTIME_MSECONDS(10);
using super = ProxyTransaction; ///< Parent type.

Http2Stream(Http2StreamId sid = 0, ssize_t initial_rwnd = Http2::initial_window_size);

Expand All @@ -75,6 +76,7 @@ class Http2Stream : public ProxyTransaction
void update_write_request(IOBufferReader *buf_reader, int64_t write_len, bool send_update);

void signal_read_event(int event);
void signal_write_event(int event);
void signal_write_event(bool call_update);

void restart_sending();
Expand Down