diff --git a/proxy/http2/Http2ConnectionState.cc b/proxy/http2/Http2ConnectionState.cc index 7fb41ce6f64..0ef20e1f0f4 100644 --- a/proxy/http2/Http2ConnectionState.cc +++ b/proxy/http2/Http2ConnectionState.cc @@ -160,6 +160,11 @@ rcv_data_frame(Http2ConnectionState &cstate, const Http2Frame &frame) stream->decrement_server_rwnd(payload_length); const uint32_t unpadded_length = payload_length - pad_length; + MIOBuffer *writer = stream->read_vio_writer(); + if (writer == nullptr) { + return Http2Error(Http2ErrorClass::HTTP2_ERROR_CLASS_STREAM, Http2ErrorCode::HTTP2_ERROR_INTERNAL_ERROR); + } + // If we call write() multiple times, we must keep the same reader, so we can // update its offset via consume. Otherwise, we will read the same data on the // second time through @@ -168,18 +173,24 @@ rcv_data_frame(Http2ConnectionState &cstate, const Http2Frame &frame) if (frame.header().flags & HTTP2_FLAGS_DATA_PADDED) { myreader->consume(HTTP2_DATA_PADLEN_LEN); } - while (nbytes < payload_length - pad_length) { + + while (nbytes < unpadded_length) { size_t read_len = BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_DATA]); if (nbytes + read_len > unpadded_length) { read_len -= nbytes + read_len - unpadded_length; } - nbytes += stream->request_buffer.write(myreader, read_len); + nbytes += writer->write(myreader, read_len); myreader->consume(nbytes); - // If there is an outstanding read, update the buffer - stream->update_read_request(INT64_MAX, true); } myreader->writer()->dealloc_reader(myreader); + if (frame.header().flags & HTTP2_FLAGS_DATA_END_STREAM) { + // TODO: set total written size to read_vio.nbytes + stream->signal_read_event(VC_EVENT_READ_COMPLETE); + } else { + stream->signal_read_event(VC_EVENT_READ_READY); + } + uint32_t initial_rwnd = cstate.server_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE); uint32_t min_rwnd = std::min(initial_rwnd, cstate.server_settings.get(HTTP2_SETTINGS_MAX_FRAME_SIZE)); // Connection level WINDOW UPDATE diff --git a/proxy/http2/Http2Stream.cc b/proxy/http2/Http2Stream.cc index fad170cf55e..77361d86dd6 100644 --- a/proxy/http2/Http2Stream.cc +++ b/proxy/http2/Http2Stream.cc @@ -52,7 +52,7 @@ Http2Stream::init(Http2StreamId sid, ssize_t initial_rwnd) this->_thread = this_ethread(); this->_client_rwnd = initial_rwnd; - _reader = request_reader = request_buffer.alloc_reader(); + this->_reader = this->_request_buffer.alloc_reader(); // FIXME: Are you sure? every "stream" needs request_header? _req_header.create(HTTP_TYPE_REQUEST); response_header.create(HTTP_TYPE_RESPONSE); @@ -195,21 +195,30 @@ Http2Stream::send_request(Http2ConnectionState &cstate) do { bufindex = 0; tmp = dumpoffset; - IOBufferBlock *block = request_buffer.get_current_block(); + IOBufferBlock *block = this->_request_buffer.get_current_block(); if (!block) { - request_buffer.add_block(); - block = request_buffer.get_current_block(); + this->_request_buffer.add_block(); + block = this->_request_buffer.get_current_block(); } done = _req_header.print(block->start(), block->write_avail(), &bufindex, &tmp); dumpoffset += bufindex; - request_buffer.fill(bufindex); + this->_request_buffer.fill(bufindex); if (!done) { - request_buffer.add_block(); + this->_request_buffer.add_block(); } } while (!done); - // Is there a read_vio request waiting? - this->update_read_request(INT64_MAX, true); + if (bufindex == 0) { + // No data to signal read event + return; + } + + if (this->recv_end_stream) { + this->read_vio.nbytes = bufindex; + this->signal_read_event(VC_EVENT_READ_COMPLETE); + } else { + this->signal_read_event(VC_EVENT_READ_READY); + } } bool @@ -330,9 +339,7 @@ Http2Stream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) read_vio.vc_server = this; read_vio.op = VIO::READ; - // Is there already data in the request_buffer? If so, copy it over and then - // schedule a READ_READY or READ_COMPLETE event after we return. - update_read_request(nbytes, false, true); + // TODO: re-enable read_vio return &read_vio; } @@ -523,44 +530,26 @@ Http2Stream::update_read_request(int64_t read_len, bool call_update, bool check_ ink_release_assert(this->_thread == this_ethread()); SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread()); - if (read_vio.nbytes > 0 && read_vio.ndone <= read_vio.nbytes) { - // If this vio has a different buffer, we must copy - ink_release_assert(this_ethread() == this->_thread); - if (read_vio.buffer.writer() != (&request_buffer)) { - int64_t num_to_read = read_vio.nbytes - read_vio.ndone; - if (num_to_read > read_len) { - num_to_read = read_len; - } - if (num_to_read > 0) { - int bytes_added = read_vio.buffer.writer()->write(request_reader, num_to_read); - if (bytes_added > 0 || (check_eos && recv_end_stream)) { - request_reader->consume(bytes_added); - read_vio.ndone += bytes_added; - int send_event = (read_vio.nbytes == read_vio.ndone || recv_end_stream) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY; - if (call_update) { // Safe to call vio handler directly - inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; - if (read_vio.cont && this->_sm) { - read_vio.cont->handleEvent(send_event, &read_vio); - } - } else { // Called from do_io_read. Still setting things up. Send event to handle this after the dust settles - read_event = send_tracked_event(read_event, send_event, &read_vio); - } - } - } - } else { - // Try to be smart and only signal if there was additional data - int send_event = (read_vio.nbytes == read_vio.ndone) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY; - if (request_reader->read_avail() > 0 || send_event == VC_EVENT_READ_COMPLETE) { - if (call_update) { // Safe to call vio handler directly - inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; - if (read_vio.cont && this->_sm) { - read_vio.cont->handleEvent(send_event, &read_vio); - } - } else { // Called from do_io_read. Still setting things up. Send event - // to handle this after the dust settles - read_event = send_tracked_event(read_event, send_event, &read_vio); - } + if (read_vio.nbytes == 0) { + return; + } + + // Try to be smart and only signal if there was additional data + int send_event = VC_EVENT_READ_READY; + if (read_vio.ntodo() == 0 || (this->recv_end_stream && this->read_vio.nbytes != INT64_MAX)) { + send_event = VC_EVENT_READ_COMPLETE; + } + + int64_t read_avail = this->read_vio.buffer.writer()->max_read_avail(); + if (read_avail > 0 || send_event == VC_EVENT_READ_COMPLETE) { + if (call_update) { // Safe to call vio handler directly + inactive_timeout_at = Thread::get_hrtime() + inactive_timeout; + if (read_vio.cont && this->_sm) { + read_vio.cont->handleEvent(send_event, &read_vio); } + } else { // Called from do_io_read. Still setting things up. Send event + // to handle this after the dust settles + read_event = send_tracked_event(read_event, send_event, &read_vio); } } } @@ -687,6 +676,24 @@ Http2Stream::update_write_request(IOBufferReader *buf_reader, int64_t write_len, return; } +void +Http2Stream::signal_read_event(int event) +{ + if (this->read_vio.cont == nullptr || this->read_vio.cont->mutex == nullptr || this->read_vio.op == VIO::NONE) { + return; + } + + MUTEX_TRY_LOCK(lock, read_vio.cont->mutex, this_ethread()); + if (lock.is_locked()) { + this->read_vio.cont->handleEvent(event, &this->read_vio); + } else { + if (this->_read_vio_event) { + this->_read_vio_event->cancel(); + } + this->_read_vio_event = this_ethread()->schedule_imm(read_vio.cont, event, &read_vio); + } +} + void Http2Stream::signal_write_event(bool call_update) { @@ -811,7 +818,7 @@ Http2Stream::destroy() response_header.destroy(); // Drop references to all buffer data - request_buffer.clear(); + this->_request_buffer.clear(); // Free the mutexes in the VIO read_vio.mutex.clear(); diff --git a/proxy/http2/Http2Stream.h b/proxy/http2/Http2Stream.h index 759b0f28171..a893de9161d 100644 --- a/proxy/http2/Http2Stream.h +++ b/proxy/http2/Http2Stream.h @@ -74,7 +74,10 @@ class Http2Stream : public ProxyTransaction void terminate_if_possible(); void update_read_request(int64_t read_len, bool send_update, bool check_eos = false); void update_write_request(IOBufferReader *buf_reader, int64_t write_len, bool send_update); + + void signal_read_event(int event); void signal_write_event(bool call_update); + void restart_sending(); void push_promise(URL &url, const MIMEField *accept_encoding); @@ -123,6 +126,7 @@ class Http2Stream : public ProxyTransaction void update_initial_rwnd(Http2WindowSize new_size); bool has_trailing_header() const; void set_request_headers(HTTPHdr &h2_headers); + MIOBuffer *read_vio_writer() const; ////////////////// // Variables @@ -141,8 +145,6 @@ class Http2Stream : public ProxyTransaction HTTPHdr response_header; IOBufferReader *response_reader = nullptr; - IOBufferReader *request_reader = nullptr; - MIOBuffer request_buffer = CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX; Http2DependencyTree::Node *priority_node = nullptr; private: @@ -166,6 +168,8 @@ class Http2Stream : public ProxyTransaction int64_t _http_sm_id = -1; HTTPHdr _req_header; + MIOBuffer _request_buffer = CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX; + int64_t read_vio_nbytes; VIO read_vio; VIO write_vio; @@ -330,3 +334,9 @@ Http2Stream::is_first_transaction() const { return is_first_transaction_flag; } + +inline MIOBuffer * +Http2Stream::read_vio_writer() const +{ + return this->read_vio.get_writer(); +}