Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions proxy/http2/Http2ConnectionState.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ rcv_data_frame(Http2ConnectionState &cstate, const Http2Frame &frame)
stream->decrement_server_rwnd(payload_length);

const uint32_t unpadded_length = payload_length - pad_length;
MIOBuffer *writer = stream->read_vio_writer();
if (writer == nullptr) {
return Http2Error(Http2ErrorClass::HTTP2_ERROR_CLASS_STREAM, Http2ErrorCode::HTTP2_ERROR_INTERNAL_ERROR);
}

// If we call write() multiple times, we must keep the same reader, so we can
// update its offset via consume. Otherwise, we will read the same data on the
// second time through
Expand All @@ -168,18 +173,24 @@ rcv_data_frame(Http2ConnectionState &cstate, const Http2Frame &frame)
if (frame.header().flags & HTTP2_FLAGS_DATA_PADDED) {
myreader->consume(HTTP2_DATA_PADLEN_LEN);
}
while (nbytes < payload_length - pad_length) {

while (nbytes < unpadded_length) {
size_t read_len = BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_DATA]);
if (nbytes + read_len > unpadded_length) {
read_len -= nbytes + read_len - unpadded_length;
}
nbytes += stream->request_buffer.write(myreader, read_len);
nbytes += writer->write(myreader, read_len);
myreader->consume(nbytes);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off-topic, but, consuming nbytes in the while loop looks wrong.

// If there is an outstanding read, update the buffer
stream->update_read_request(INT64_MAX, true);
}
myreader->writer()->dealloc_reader(myreader);

if (frame.header().flags & HTTP2_FLAGS_DATA_END_STREAM) {
// TODO: set total written size to read_vio.nbytes
stream->signal_read_event(VC_EVENT_READ_COMPLETE);
} else {
stream->signal_read_event(VC_EVENT_READ_READY);
}

uint32_t initial_rwnd = cstate.server_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
uint32_t min_rwnd = std::min(initial_rwnd, cstate.server_settings.get(HTTP2_SETTINGS_MAX_FRAME_SIZE));
// Connection level WINDOW UPDATE
Expand Down
105 changes: 56 additions & 49 deletions proxy/http2/Http2Stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Http2Stream::init(Http2StreamId sid, ssize_t initial_rwnd)
this->_thread = this_ethread();
this->_client_rwnd = initial_rwnd;

_reader = request_reader = request_buffer.alloc_reader();
this->_reader = this->_request_buffer.alloc_reader();
// FIXME: Are you sure? every "stream" needs request_header?
_req_header.create(HTTP_TYPE_REQUEST);
response_header.create(HTTP_TYPE_RESPONSE);
Expand Down Expand Up @@ -195,21 +195,30 @@ Http2Stream::send_request(Http2ConnectionState &cstate)
do {
bufindex = 0;
tmp = dumpoffset;
IOBufferBlock *block = request_buffer.get_current_block();
IOBufferBlock *block = this->_request_buffer.get_current_block();
if (!block) {
request_buffer.add_block();
block = request_buffer.get_current_block();
this->_request_buffer.add_block();
block = this->_request_buffer.get_current_block();
}
done = _req_header.print(block->start(), block->write_avail(), &bufindex, &tmp);
dumpoffset += bufindex;
request_buffer.fill(bufindex);
this->_request_buffer.fill(bufindex);
if (!done) {
request_buffer.add_block();
this->_request_buffer.add_block();
}
} while (!done);

// Is there a read_vio request waiting?
this->update_read_request(INT64_MAX, true);
if (bufindex == 0) {
// No data to signal read event
return;
}

if (this->recv_end_stream) {
this->read_vio.nbytes = bufindex;
this->signal_read_event(VC_EVENT_READ_COMPLETE);
} else {
this->signal_read_event(VC_EVENT_READ_READY);
}
}

bool
Expand Down Expand Up @@ -330,9 +339,7 @@ Http2Stream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
read_vio.vc_server = this;
read_vio.op = VIO::READ;

// Is there already data in the request_buffer? If so, copy it over and then
// schedule a READ_READY or READ_COMPLETE event after we return.
update_read_request(nbytes, false, true);
// TODO: re-enable read_vio

return &read_vio;
}
Expand Down Expand Up @@ -523,44 +530,26 @@ Http2Stream::update_read_request(int64_t read_len, bool call_update, bool check_
ink_release_assert(this->_thread == this_ethread());

SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
if (read_vio.nbytes > 0 && read_vio.ndone <= read_vio.nbytes) {
// If this vio has a different buffer, we must copy
ink_release_assert(this_ethread() == this->_thread);
if (read_vio.buffer.writer() != (&request_buffer)) {
int64_t num_to_read = read_vio.nbytes - read_vio.ndone;
if (num_to_read > read_len) {
num_to_read = read_len;
}
if (num_to_read > 0) {
int bytes_added = read_vio.buffer.writer()->write(request_reader, num_to_read);
if (bytes_added > 0 || (check_eos && recv_end_stream)) {
request_reader->consume(bytes_added);
read_vio.ndone += bytes_added;
int send_event = (read_vio.nbytes == read_vio.ndone || recv_end_stream) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY;
if (call_update) { // Safe to call vio handler directly
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (read_vio.cont && this->_sm) {
read_vio.cont->handleEvent(send_event, &read_vio);
}
} else { // Called from do_io_read. Still setting things up. Send event to handle this after the dust settles
read_event = send_tracked_event(read_event, send_event, &read_vio);
}
}
}
} else {
// Try to be smart and only signal if there was additional data
int send_event = (read_vio.nbytes == read_vio.ndone) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY;
if (request_reader->read_avail() > 0 || send_event == VC_EVENT_READ_COMPLETE) {
if (call_update) { // Safe to call vio handler directly
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (read_vio.cont && this->_sm) {
read_vio.cont->handleEvent(send_event, &read_vio);
}
} else { // Called from do_io_read. Still setting things up. Send event
// to handle this after the dust settles
read_event = send_tracked_event(read_event, send_event, &read_vio);
}
if (read_vio.nbytes == 0) {
return;
}

// Try to be smart and only signal if there was additional data
int send_event = VC_EVENT_READ_READY;
if (read_vio.ntodo() == 0 || (this->recv_end_stream && this->read_vio.nbytes != INT64_MAX)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not (read_vio.ntodo() == 0 || this->recv_end_stream) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this is for some AuTest failures. This checks the upper layer (HttpSM/HttpTunnel) called do_io_read(this, INT64_MAX, XXXX). Yes, it's a bit unreliable. ( I should have left some comments )

send_event = VC_EVENT_READ_COMPLETE;
}

int64_t read_avail = this->read_vio.buffer.writer()->max_read_avail();
if (read_avail > 0 || send_event == VC_EVENT_READ_COMPLETE) {
if (call_update) { // Safe to call vio handler directly
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (read_vio.cont && this->_sm) {
read_vio.cont->handleEvent(send_event, &read_vio);
}
} else { // Called from do_io_read. Still setting things up. Send event
// to handle this after the dust settles
read_event = send_tracked_event(read_event, send_event, &read_vio);
}
}
}
Expand Down Expand Up @@ -687,6 +676,24 @@ Http2Stream::update_write_request(IOBufferReader *buf_reader, int64_t write_len,
return;
}

void
Http2Stream::signal_read_event(int event)
{
if (this->read_vio.cont == nullptr || this->read_vio.cont->mutex == nullptr || this->read_vio.op == VIO::NONE) {
return;
}

MUTEX_TRY_LOCK(lock, read_vio.cont->mutex, this_ethread());
if (lock.is_locked()) {
this->read_vio.cont->handleEvent(event, &this->read_vio);
} else {
if (this->_read_vio_event) {
this->_read_vio_event->cancel();
}
this->_read_vio_event = this_ethread()->schedule_imm(read_vio.cont, event, &read_vio);
}
}

void
Http2Stream::signal_write_event(bool call_update)
{
Expand Down Expand Up @@ -811,7 +818,7 @@ Http2Stream::destroy()
response_header.destroy();

// Drop references to all buffer data
request_buffer.clear();
this->_request_buffer.clear();

// Free the mutexes in the VIO
read_vio.mutex.clear();
Expand Down
14 changes: 12 additions & 2 deletions proxy/http2/Http2Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ class Http2Stream : public ProxyTransaction
void terminate_if_possible();
void update_read_request(int64_t read_len, bool send_update, bool check_eos = false);
void update_write_request(IOBufferReader *buf_reader, int64_t write_len, bool send_update);

void signal_read_event(int event);
void signal_write_event(bool call_update);

void restart_sending();
void push_promise(URL &url, const MIMEField *accept_encoding);

Expand Down Expand Up @@ -123,6 +126,7 @@ class Http2Stream : public ProxyTransaction
void update_initial_rwnd(Http2WindowSize new_size);
bool has_trailing_header() const;
void set_request_headers(HTTPHdr &h2_headers);
MIOBuffer *read_vio_writer() const;

//////////////////
// Variables
Expand All @@ -141,8 +145,6 @@ class Http2Stream : public ProxyTransaction

HTTPHdr response_header;
IOBufferReader *response_reader = nullptr;
IOBufferReader *request_reader = nullptr;
MIOBuffer request_buffer = CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX;
Http2DependencyTree::Node *priority_node = nullptr;

private:
Expand All @@ -166,6 +168,8 @@ class Http2Stream : public ProxyTransaction
int64_t _http_sm_id = -1;

HTTPHdr _req_header;
MIOBuffer _request_buffer = CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX;
int64_t read_vio_nbytes;
VIO read_vio;
VIO write_vio;

Expand Down Expand Up @@ -330,3 +334,9 @@ Http2Stream::is_first_transaction() const
{
return is_first_transaction_flag;
}

inline MIOBuffer *
Http2Stream::read_vio_writer() const
{
return this->read_vio.get_writer();
}