Skip to content

Commit

Permalink
TransformTerminus::do_io_*: handle null buffer
Browse files Browse the repository at this point in the history
This is a very similar change as apache#11789. With this patch,
TransformTerminus properly supports a 0 byte read to cancel VIO. Without
this patch, the logic naively scheduled an event which called the
handler which would crash on what was often a canceled continuation or
otherwise invalid continuation.
  • Loading branch information
bneradt committed Nov 26, 2024
1 parent 29c3e31 commit b94c969
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 26 deletions.
7 changes: 7 additions & 0 deletions include/proxy/TransformInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class TransformTerminus : public VConnection
{
public:
TransformTerminus(TransformVConnection *tvc);
~TransformTerminus() override;

int handle_event(int event, void *edata);

Expand All @@ -49,6 +50,12 @@ class TransformTerminus : public VConnection
int m_deletable;
int m_closed;
int m_called_user;

private:
Event *_read_event = nullptr;
bool _read_disabled = false;
Event *_write_event = nullptr;
bool _write_disabled = false;
};

class TransformVConnection : public TransformVCChain
Expand Down
95 changes: 69 additions & 26 deletions src/proxy/Transform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ TransformTerminus::TransformTerminus(TransformVConnection *tvc)
SET_HANDLER(&TransformTerminus::handle_event);
}

TransformTerminus::~TransformTerminus()
{
if (_read_event != nullptr) {
_read_event->cancel();
_read_event = nullptr;
}
if (_write_event != nullptr) {
_write_event->cancel();
_write_event = nullptr;
}
}

#define RETRY() \
if (ink_atomic_increment((int *)&m_event_count, 1) < 0) { \
ink_assert(!"not reached"); \
Expand All @@ -140,8 +152,15 @@ TransformTerminus::TransformTerminus(TransformVConnection *tvc)
return 0;

int
TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */)
TransformTerminus::handle_event(int event, void *edata)
{
Event *event_p = reinterpret_cast<Event *>(edata);
if (event_p == _read_event) {
_read_event = nullptr;
} else if (event_p == _write_event) {
_write_event = nullptr;
}

int val;

m_deletable = ((m_closed != 0) && (m_tvc->m_closed != 0));
Expand Down Expand Up @@ -212,27 +231,31 @@ TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */)
}
}

if (m_write_vio.ntodo() > 0) {
if (towrite > 0) {
m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
if (!_write_disabled) {
if (m_write_vio.ntodo() > 0) {
if (towrite > 0) {
m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
}
} else {
m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
}
} else {
m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
}

// We could have closed on the write callback
if (m_closed != 0 && m_tvc->m_closed != 0) {
return 0;
}

if (m_read_vio.ntodo() > 0) {
if (m_write_vio.ntodo() <= 0) {
m_read_vio.cont->handleEvent(VC_EVENT_EOS, &m_read_vio);
} else if (towrite > 0) {
m_read_vio.cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio);
if (!_read_disabled) {
if (m_read_vio.ntodo() > 0) {
if (m_write_vio.ntodo() <= 0) {
m_read_vio.cont->handleEvent(VC_EVENT_EOS, &m_read_vio);
} else if (towrite > 0) {
m_read_vio.cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio);
}
} else {
m_read_vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio);
}
} else {
m_read_vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio);
}
}
} else {
Expand All @@ -256,7 +279,7 @@ TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */)
if (!m_called_user) {
m_called_user = 1;
m_tvc->m_cont->handleEvent(ev, &m_read_vio);
} else {
} else if (!_read_disabled) {
ink_assert(m_read_vio.cont != nullptr);
m_read_vio.cont->handleEvent(ev, &m_read_vio);
}
Expand All @@ -275,19 +298,29 @@ TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */)
VIO *
TransformTerminus::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
{
m_read_vio.set_writer(buf);
m_read_vio.op = VIO::READ;
m_read_vio.set_continuation(c);
m_read_vio.nbytes = nbytes;
m_read_vio.ndone = 0;
m_read_vio.vc_server = this;

if (ink_atomic_increment(&m_event_count, 1) < 0) {
ink_assert(!"not reached");
}
Dbg(dbg_ctl_transform, "[TransformTerminus::do_io_read] event_count %d", m_event_count);
if (buf != nullptr) {
_read_disabled = false;
m_read_vio.set_writer(buf);
if (ink_atomic_increment(&m_event_count, 1) < 0) {
ink_assert(!"not reached");
}
Dbg(dbg_ctl_transform, "[TransformTerminus::do_io_read] event_count %d", m_event_count);

this_ethread()->schedule_imm_local(this);
this_ethread()->schedule_imm_local(this);
} else {
_read_disabled = true;
if (_read_event != nullptr) {
_read_event->cancel();
_read_event = nullptr;
}
m_read_vio.buffer.clear();
}

return &m_read_vio;
}
Expand All @@ -300,19 +333,29 @@ TransformTerminus::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *
{
// In the process of eliminating 'owner' mode so asserting against it
ink_assert(!owner);
m_write_vio.set_reader(buf);
m_write_vio.op = VIO::WRITE;
m_write_vio.set_continuation(c);
m_write_vio.nbytes = nbytes;
m_write_vio.ndone = 0;
m_write_vio.vc_server = this;

if (ink_atomic_increment(&m_event_count, 1) < 0) {
ink_assert(!"not reached");
}
Dbg(dbg_ctl_transform, "[TransformTerminus::do_io_write] event_count %d", m_event_count);
if (buf != nullptr) {
_write_disabled = false;
m_write_vio.set_reader(buf);
if (ink_atomic_increment(&m_event_count, 1) < 0) {
ink_assert(!"not reached");
}
Dbg(dbg_ctl_transform, "[TransformTerminus::do_io_write] event_count %d", m_event_count);

this_ethread()->schedule_imm_local(this);
this_ethread()->schedule_imm_local(this);
} else {
_write_disabled = true;
if (_write_event != nullptr) {
_write_event->cancel();
_write_event = nullptr;
}
m_write_vio.buffer.clear();
}

return &m_write_vio;
}
Expand Down

0 comments on commit b94c969

Please sign in to comment.