diff --git a/src/node_http2.cc b/src/node_http2.cc index a695e3990dca5a..56ef79f8a5da03 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -4,7 +4,6 @@ #include "node_http2.h" #include "node_http2_state.h" -#include #include namespace node { @@ -23,6 +22,23 @@ using v8::Undefined; namespace http2 { +namespace { + +const char zero_bytes_256[256] = {}; + +inline Http2Stream* GetStream(Http2Session* session, + int32_t id, + nghttp2_data_source* source) { + Http2Stream* stream = static_cast(source->ptr); + if (stream == nullptr) + stream = session->FindStream(id); + CHECK_NE(stream, nullptr); + CHECK_EQ(id, stream->id()); + return stream; +} + +} // anonymous namespace + // These configure the callbacks required by nghttp2 itself. There are // two sets of callback functions, one that is used if a padding callback // is set, and other that does not include the padding callback. @@ -370,6 +386,8 @@ Http2Session::Callbacks::Callbacks(bool kHasGetPaddingCallback) { callbacks, OnInvalidHeader); nghttp2_session_callbacks_set_error_callback( callbacks, OnNghttpError); + nghttp2_session_callbacks_set_send_data_callback( + callbacks, OnSendData); if (kHasGetPaddingCallback) { nghttp2_session_callbacks_set_select_padding_callback( @@ -419,6 +437,9 @@ Http2Session::Http2Session(Environment* env, // be catching before it gets this far. Either way, crash if this // fails. CHECK_EQ(fn(&session_, callbacks, this, *opts), 0); + + outgoing_storage_.reserve(4096); + outgoing_buffers_.reserve(32); } void Http2Session::Unconsume() { @@ -508,6 +529,7 @@ inline ssize_t Http2Session::OnMaxFrameSizePadding(size_t frameLen, // not be the preferred option. inline ssize_t Http2Session::OnCallbackPadding(size_t frameLen, size_t maxPayloadLen) { + if (frameLen == 0) return 0; DEBUG_HTTP2SESSION(this, "using callback to determine padding"); Isolate* isolate = env()->isolate(); HandleScope handle_scope(isolate); @@ -1033,6 +1055,20 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) { MakeCallback(env()->onsettings_string(), arraysize(argv), argv); } +// Callback used when data has been written to the stream. +void Http2Session::OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx) { + Http2Session* session = static_cast(ctx); + DEBUG_HTTP2SESSION2(session, "write finished with status %d", status); + + // Inform all pending writes about their completion. + session->ClearOutgoing(status); + + if (!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) { + // Schedule a new write if nghttp2 wants to send data. + session->MaybeScheduleWrite(); + } +} + // If the underlying nghttp2_session struct has data pending in its outbound // queue, MaybeScheduleWrite will schedule a SendPendingData() call to occcur // on the next iteration of the Node.js event loop (using the SetImmediate @@ -1040,6 +1076,7 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) { void Http2Session::MaybeScheduleWrite() { CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0); if (session_ != nullptr && nghttp2_session_want_write(session_)) { + DEBUG_HTTP2SESSION(this, "scheduling write"); flags_ |= SESSION_STATE_WRITE_SCHEDULED; env()->SetImmediate([](Environment* env, void* data) { Http2Session* session = static_cast(data); @@ -1059,6 +1096,39 @@ void Http2Session::MaybeScheduleWrite() { } } +// Unset the sending state, finish up all current writes, and reset +// storage for data and metadata that was associated with these writes. +void Http2Session::ClearOutgoing(int status) { + CHECK_NE(flags_ & SESSION_STATE_SENDING, 0); + flags_ &= ~SESSION_STATE_SENDING; + + for (const nghttp2_stream_write& wr : outgoing_buffers_) { + WriteWrap* wrap = wr.req_wrap; + if (wrap != nullptr) + wrap->Done(status); + } + + outgoing_buffers_.clear(); + outgoing_storage_.clear(); +} + +// Queue a given block of data for sending. This always creates a copy, +// so it is used for the cases in which nghttp2 requests sending of a +// small chunk of data. +void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) { + size_t offset = outgoing_storage_.size(); + outgoing_storage_.resize(offset + src_length); + memcpy(&outgoing_storage_[offset], src, src_length); + + // Store with a base of `nullptr` initially, since future resizes + // of the outgoing_buffers_ vector may invalidate the pointer. + // The correct base pointers will be set later, before writing to the + // underlying socket. + outgoing_buffers_.emplace_back(nghttp2_stream_write { + uv_buf_init(nullptr, src_length) + }); +} + // Prompts nghttp2 to begin serializing it's pending data and pushes each // chunk out to the i/o socket to be sent. This is a particularly hot method // that will generally be called at least twice be event loop iteration. @@ -1075,64 +1145,133 @@ void Http2Session::SendPendingData() { // SendPendingData should not be called recursively. if (flags_ & SESSION_STATE_SENDING) return; + // This is cleared by ClearOutgoing(). flags_ |= SESSION_STATE_SENDING; - WriteWrap* req = nullptr; - char* dest = nullptr; - size_t destRemaining = 0; - size_t destLength = 0; // amount of data stored in dest - size_t destOffset = 0; // current write offset of dest - - const uint8_t* src; // pointer to the serialized data - ssize_t srcLength = 0; // length of serialized data chunk - - // While srcLength is greater than zero - while ((srcLength = nghttp2_session_mem_send(session_, &src)) > 0) { - if (req == nullptr) { - req = AllocateSend(); - destRemaining = req->ExtraSize(); - dest = req->Extra(); - } - DEBUG_HTTP2SESSION2(this, "nghttp2 has %d bytes to send", srcLength); - size_t srcRemaining = srcLength; - size_t srcOffset = 0; - - // The amount of data we have to copy is greater than the space - // remaining. Copy what we can into the remaining space, send it, - // the proceed with the rest. - while (srcRemaining > destRemaining) { - DEBUG_HTTP2SESSION2(this, "pushing %d bytes to the socket", - destLength + destRemaining); - memcpy(dest + destOffset, src + srcOffset, destRemaining); - destLength += destRemaining; - Send(req, dest, destLength); - destOffset = 0; - destLength = 0; - srcRemaining -= destRemaining; - srcOffset += destRemaining; - req = AllocateSend(); - destRemaining = req->ExtraSize(); - dest = req->Extra(); - } + ssize_t src_length; + const uint8_t* src; + + CHECK_EQ(outgoing_buffers_.size(), 0); + CHECK_EQ(outgoing_storage_.size(), 0); - if (srcRemaining > 0) { - memcpy(dest + destOffset, src + srcOffset, srcRemaining); - destLength += srcRemaining; - destOffset += srcRemaining; - destRemaining -= srcRemaining; - srcRemaining = 0; - srcOffset = 0; + // Part One: Gather data from nghttp2 + + while ((src_length = nghttp2_session_mem_send(session_, &src)) > 0) { + DEBUG_HTTP2SESSION2(this, "nghttp2 has %d bytes to send", src_length); + CopyDataIntoOutgoing(src, src_length); + } + + CHECK_NE(src_length, NGHTTP2_ERR_NOMEM); + + if (stream_ == nullptr) { + // It would seem nice to bail out earlier, but `nghttp2_session_mem_send()` + // does take care of things like closing the individual streams after + // a socket has been torn down, so we still need to call it. + ClearOutgoing(UV_ECANCELED); + return; + } + + // Part Two: Pass Data to the underlying stream + + size_t count = outgoing_buffers_.size(); + if (count == 0) { + flags_ &= ~SESSION_STATE_SENDING; + return; + } + MaybeStackBuffer bufs; + bufs.AllocateSufficientStorage(count); + + // Set the buffer base pointers for copied data that ended up in the + // sessions's own storage since it might have shifted around during gathering. + // (Those are marked by having .base == nullptr.) + size_t offset = 0; + size_t i = 0; + for (const nghttp2_stream_write& write : outgoing_buffers_) { + if (write.buf.base == nullptr) { + bufs[i++] = uv_buf_init( + reinterpret_cast(outgoing_storage_.data() + offset), + write.buf.len); + offset += write.buf.len; + } else { + bufs[i++] = write.buf; } } - CHECK_NE(srcLength, NGHTTP2_ERR_NOMEM); - if (destLength > 0 && srcLength >= 0) { - DEBUG_HTTP2SESSION2(this, "pushing %d bytes to the socket", destLength); - Send(req, dest, destLength); + + chunks_sent_since_last_write_++; + + // DoTryWrite may modify both the buffer list start itself and the + // base pointers/length of the individual buffers. + uv_buf_t* writebufs = *bufs; + if (stream_->DoTryWrite(&writebufs, &count) != 0 || count == 0) { + // All writes finished synchronously, nothing more to do here. + ClearOutgoing(0); + return; + } + + WriteWrap* req = AllocateSend(); + if (stream_->DoWrite(req, writebufs, count, nullptr) != 0) { + req->Dispose(); } + DEBUG_HTTP2SESSION2(this, "wants data in return? %d", nghttp2_session_want_read(session_)); +} - flags_ &= ~SESSION_STATE_SENDING; + +// This callback is called from nghttp2 when it wants to send DATA frames for a +// given Http2Stream, when we set the `NGHTTP2_DATA_FLAG_NO_COPY` flag earlier +// in the Http2Stream::Provider::Stream::OnRead callback. +// We take the write information directly out of the stream's data queue. +int Http2Session::OnSendData( + nghttp2_session* session_, + nghttp2_frame* frame, + const uint8_t* framehd, + size_t length, + nghttp2_data_source* source, + void* user_data) { + Http2Session* session = static_cast(user_data); + Http2Stream* stream = GetStream(session, frame->hd.stream_id, source); + + // Send the frame header + a byte that indicates padding length. + session->CopyDataIntoOutgoing(framehd, 9); + if (frame->data.padlen > 0) { + uint8_t padding_byte = frame->data.padlen - 1; + CHECK_EQ(padding_byte, frame->data.padlen - 1); + session->CopyDataIntoOutgoing(&padding_byte, 1); + } + + DEBUG_HTTP2SESSION2(session, "nghttp2 has %d bytes to send directly", length); + while (length > 0) { + // nghttp2 thinks that there is data available (length > 0), which means + // we told it so, which means that we *should* have data available. + CHECK(!stream->queue_.empty()); + + nghttp2_stream_write& write = stream->queue_.front(); + if (write.buf.len <= length) { + // This write does not suffice by itself, so we can consume it completely. + length -= write.buf.len; + session->outgoing_buffers_.emplace_back(std::move(write)); + stream->queue_.pop(); + continue; + } + + // Slice off `length` bytes of the first write in the queue. + session->outgoing_buffers_.emplace_back(nghttp2_stream_write { + uv_buf_init(write.buf.base, length) + }); + write.buf.base += length; + write.buf.len -= length; + break; + } + + if (frame->data.padlen > 0) { + // Send padding if that was requested. + session->outgoing_buffers_.emplace_back(nghttp2_stream_write { + uv_buf_init(const_cast(zero_bytes_256), frame->data.padlen - 1) + }); + } + + return 0; } // Creates a new Http2Stream and submits a new http2 request. @@ -1163,25 +1302,7 @@ WriteWrap* Http2Session::AllocateSend() { Local obj = env()->write_wrap_constructor_function() ->NewInstance(env()->context()).ToLocalChecked(); - // Base the amount allocated on the remote peers max frame size - uint32_t size = - nghttp2_session_get_remote_settings( - session(), - NGHTTP2_SETTINGS_MAX_FRAME_SIZE); - // Max frame size + 9 bytes for the header - return WriteWrap::New(env(), obj, stream_, size + 9); -} - -// Pushes chunks of data to the i/o stream. -void Http2Session::Send(WriteWrap* req, char* buf, size_t length) { - DEBUG_HTTP2SESSION2(this, "attempting to send %d bytes", length); - if (stream_ == nullptr) - return; - chunks_sent_since_last_write_++; - uv_buf_t actual = uv_buf_init(buf, length); - if (stream_->DoWrite(req, &actual, 1, nullptr)) { - req->Dispose(); - } + return WriteWrap::New(env(), obj, stream_); } // Allocates the data buffer used to receive inbound data from the i/o stream @@ -1255,6 +1376,7 @@ void Http2Session::Consume(Local external) { prev_read_cb_ = stream->read_cb(); stream->set_alloc_cb({ Http2Session::OnStreamAllocImpl, this }); stream->set_read_cb({ Http2Session::OnStreamReadImpl, this }); + stream->set_after_write_cb({ Http2Session::OnStreamAfterWriteImpl, this }); stream->set_destruct_cb({ Http2Session::OnStreamDestructImpl, this }); DEBUG_HTTP2SESSION(this, "i/o stream consumed"); } @@ -1422,9 +1544,9 @@ inline void Http2Stream::Destroy() { // here because it's possible for destroy to have been called while // we still have qeueued outbound writes. while (!stream->queue_.empty()) { - nghttp2_stream_write* head = stream->queue_.front(); - head->req_wrap->Done(UV_ECANCELED); - delete head; + nghttp2_stream_write& head = stream->queue_.front(); + if (head.req_wrap != nullptr) + head.req_wrap->Done(UV_ECANCELED); stream->queue_.pop(); } @@ -1616,12 +1738,15 @@ inline int Http2Stream::DoWrite(WriteWrap* req_wrap, return 0; } DEBUG_HTTP2STREAM2(this, "queuing %d buffers to send", id_, nbufs); - nghttp2_stream_write* item = new nghttp2_stream_write; - item->req_wrap = req_wrap; - item->nbufs = nbufs; - item->bufs.AllocateSufficientStorage(nbufs); - memcpy(*(item->bufs), bufs, nbufs * sizeof(*bufs)); - queue_.push(item); + for (size_t i = 0; i < nbufs; ++i) { + // Store the req_wrap on the last write info in the queue, so that it is + // only marked as finished once all buffers associated with it are finished. + queue_.emplace(nghttp2_stream_write { + i == nbufs - 1 ? req_wrap : nullptr, + bufs[i] + }); + available_outbound_length_ += bufs[i].len; + } CHECK_NE(nghttp2_session_resume_data(**session_, id_), NGHTTP2_ERR_NOMEM); return 0; } @@ -1655,18 +1780,6 @@ inline bool Http2Stream::AddHeader(nghttp2_rcbuf* name, return true; } - -Http2Stream* GetStream(Http2Session* session, - int32_t id, - nghttp2_data_source* source) { - Http2Stream* stream = static_cast(source->ptr); - if (stream == nullptr) - stream = session->FindStream(id); - CHECK_NE(stream, nullptr); - CHECK_EQ(id, stream->id()); - return stream; -} - // A Provider is the thing that provides outbound DATA frame data. Http2Stream::Provider::Provider(Http2Stream* stream, int options) { CHECK(!stream->IsDestroyed()); @@ -1787,30 +1900,16 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle, size_t amount = 0; // amount of data being sent in this data frame. - uv_buf_t current; - if (!stream->queue_.empty()) { DEBUG_HTTP2SESSION2(session, "stream %d has pending outbound data", id); - nghttp2_stream_write* head = stream->queue_.front(); - current = head->bufs[stream->queue_index_]; - size_t clen = current.len - stream->queue_offset_; - amount = std::min(clen, length); + amount = std::min(stream->available_outbound_length_, length); DEBUG_HTTP2SESSION2(session, "sending %d bytes for data frame on stream %d", amount, id); if (amount > 0) { - memcpy(buf, current.base + stream->queue_offset_, amount); - stream->queue_offset_ += amount; - } - if (stream->queue_offset_ == current.len) { - stream->queue_index_++; - stream->queue_offset_ = 0; - } - if (stream->queue_index_ == head->nbufs) { - head->req_wrap->Done(0); - delete head; - stream->queue_.pop(); - stream->queue_offset_ = 0; - stream->queue_index_ = 0; + // Just return the length, let Http2Session::OnSendData take care of + // actually taking the buffers out of the queue. + *flags |= NGHTTP2_DATA_FLAG_NO_COPY; + stream->available_outbound_length_ -= amount; } } diff --git a/src/node_http2.h b/src/node_http2.h index 4af8c75d5bc1f5..8cccfb342db3e0 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -126,9 +126,12 @@ enum nghttp2_stream_options { }; struct nghttp2_stream_write { - unsigned int nbufs = 0; WriteWrap* req_wrap = nullptr; - MaybeStackBuffer bufs; + uv_buf_t buf; + + inline explicit nghttp2_stream_write(uv_buf_t buf_) : buf(buf_) {} + inline nghttp2_stream_write(WriteWrap* req, uv_buf_t buf_) : + req_wrap(req), buf(buf_) {} }; struct nghttp2_header { @@ -725,11 +728,12 @@ class Http2Stream : public AsyncWrap, // Outbound Data... This is the data written by the JS layer that is // waiting to be written out to the socket. - std::queue queue_; - unsigned int queue_index_ = 0; - size_t queue_offset_ = 0; + std::queue queue_; + size_t available_outbound_length_ = 0; int64_t fd_offset_ = 0; int64_t fd_length_ = -1; + + friend class Http2Session; }; class Http2Stream::Provider { @@ -860,6 +864,7 @@ class Http2Session : public AsyncWrap { const uv_buf_t* bufs, uv_handle_type pending, void* ctx); + static void OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx); static void OnStreamDestructImpl(void* ctx); // The JavaScript API @@ -882,7 +887,6 @@ class Http2Session : public AsyncWrap { template static void GetSettings(const FunctionCallbackInfo& args); - void Send(WriteWrap* req, char* buf, size_t length); WriteWrap* AllocateSend(); uv_loop_t* event_loop() const { @@ -957,6 +961,13 @@ class Http2Session : public AsyncWrap { const char* message, size_t len, void* user_data); + static inline int OnSendData( + nghttp2_session* session, + nghttp2_frame* frame, + const uint8_t* framehd, + size_t length, + nghttp2_data_source* source, + void* user_data); static inline ssize_t OnStreamReadFD( @@ -1015,6 +1026,12 @@ class Http2Session : public AsyncWrap { size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS; std::queue outstanding_pings_; + std::vector outgoing_buffers_; + std::vector outgoing_storage_; + + void CopyDataIntoOutgoing(const uint8_t* src, size_t src_length); + void ClearOutgoing(int status); + friend class Http2Scope; };