Skip to content

Commit

Permalink
http2: use native pipe instead of synchronous I/O
Browse files Browse the repository at this point in the history
This resolves the issue of using synchronous I/O for
`respondWithFile()` and `respondWithFD()`, and enables
scenarios in which the underlying file does not need
to be a regular file.

PR-URL: #18936
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
addaleax committed Mar 15, 2018
1 parent 67f1d76 commit 1eb6b01
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 162 deletions.
60 changes: 49 additions & 11 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

require('internal/util').assertCrypto();

const { internalBinding } = require('internal/bootstrap_loaders');
const { async_id_symbol } = require('internal/async_hooks').symbols;
const { UV_EOF } = process.binding('uv');
const http = require('http');
const binding = process.binding('http2');
const { FileHandle } = process.binding('fs');
const { StreamPipe } = internalBinding('stream_pipe');
const assert = require('assert');
const { Buffer } = require('buffer');
const EventEmitter = require('events');
Expand Down Expand Up @@ -65,6 +69,7 @@ const { onServerStream,
const { utcDate } = require('internal/http');
const { promisify } = require('internal/util');
const { isArrayBufferView } = require('internal/util/types');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { _connectionListener: httpConnectionListener } = require('http');
const { createPromise, promiseResolve } = process.binding('util');
const debug = util.debuglog('http2');
Expand Down Expand Up @@ -345,9 +350,7 @@ function onStreamClose(code) {
stream.end();
}

if (state.fd !== undefined)
tryClose(state.fd);

state.fd = -1;
// Defer destroy we actually emit end.
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
Expand Down Expand Up @@ -1928,6 +1931,26 @@ function processHeaders(headers) {
return headers;
}

function onFileCloseError(stream, err) {
stream.emit(err);
}

function onFileUnpipe() {
const stream = this.sink[kOwner];
if (stream.ownsFd)
this.source.close().catch(onFileCloseError.bind(stream));
else
this.source.releaseFD();
}

// This is only called once the pipe has returned back control, so
// it only has to handle errors and End-of-File.
function onPipedFileHandleRead(err) {
if (err < 0 && err !== UV_EOF) {
this.stream.close(NGHTTP2_INTERNAL_ERROR);
}
}

function processRespondWithFD(self, fd, headers, offset = 0, length = -1,
streamOptions = 0) {
const state = self[kState];
Expand All @@ -1940,18 +1963,32 @@ function processRespondWithFD(self, fd, headers, offset = 0, length = -1,
return;
}


// Close the writable side of the stream
// Close the writable side of the stream, but only as far as the writable
// stream implementation is concerned.
self._final = null;
self.end();

const ret = self[kHandle].respondFD(fd, headersList,
offset, length,
streamOptions);
const ret = self[kHandle].respond(headersList, streamOptions);

if (ret < 0) {
self.destroy(new NghttpError(ret));
return;
}

defaultTriggerAsyncIdScope(self[async_id_symbol], startFilePipe,
self, fd, offset, length);
}

function startFilePipe(self, fd, offset, length) {
const handle = new FileHandle(fd, offset, length);
handle.onread = onPipedFileHandleRead;
handle.stream = self;

const pipe = new StreamPipe(handle._externalStream,
self[kHandle]._externalStream);
pipe.onunpipe = onFileUnpipe;
pipe.start();

// exact length of the file doesn't matter here, since the
// stream is closing anyway - just use 1 to signify that
// a write does exist
Expand Down Expand Up @@ -2270,8 +2307,9 @@ class ServerHttp2Stream extends Http2Stream {
throw new ERR_INVALID_ARG_TYPE('fd', 'number');

debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: initiating response`);
`${sessionName(session[kType])}]: initiating response from fd`);
this[kUpdateTimer]();
this.ownsFd = false;

headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
Expand Down Expand Up @@ -2333,9 +2371,9 @@ class ServerHttp2Stream extends Http2Stream {

const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: initiating response`);
`${sessionName(session[kType])}]: initiating response from file`);
this[kUpdateTimer]();

this.ownsFd = true;

headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
Expand Down
125 changes: 0 additions & 125 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1888,28 +1888,6 @@ inline int Http2Stream::SubmitResponse(nghttp2_nv* nva,
}


// Initiate a response that contains data read from a file descriptor.
inline int Http2Stream::SubmitFile(int fd,
nghttp2_nv* nva, size_t len,
int64_t offset,
int64_t length,
int options) {
CHECK(!this->IsDestroyed());
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "submitting file");
if (options & STREAM_OPTION_GET_TRAILERS)
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;

if (offset > 0) fd_offset_ = offset;
if (length > -1) fd_length_ = length;

Http2Stream::Provider::FD prov(this, options, fd);
int ret = nghttp2_submit_response(session_->session(), id_, nva, len, *prov);
CHECK_NE(ret, NGHTTP2_ERR_NOMEM);
return ret;
}


// Submit informational headers for a stream.
inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
CHECK(!this->IsDestroyed());
Expand Down Expand Up @@ -2085,87 +2063,6 @@ Http2Stream::Provider::~Provider() {
provider_.source.ptr = nullptr;
}

// The FD Provider pulls data from a file descriptor using libuv. All of the
// data transfer occurs in C++, without any chunks being passed through JS
// land.
Http2Stream::Provider::FD::FD(Http2Stream* stream, int options, int fd)
: Http2Stream::Provider(stream, options) {
CHECK(!stream->IsDestroyed());
provider_.source.fd = fd;
provider_.read_callback = Http2Stream::Provider::FD::OnRead;
}

Http2Stream::Provider::FD::FD(int options, int fd)
: Http2Stream::Provider(options) {
provider_.source.fd = fd;
provider_.read_callback = Http2Stream::Provider::FD::OnRead;
}

ssize_t Http2Stream::Provider::FD::OnRead(nghttp2_session* handle,
int32_t id,
uint8_t* buf,
size_t length,
uint32_t* flags,
nghttp2_data_source* source,
void* user_data) {
Http2Session* session = static_cast<Http2Session*>(user_data);
Http2Stream* stream = session->FindStream(id);
if (stream->statistics_.first_byte_sent == 0)
stream->statistics_.first_byte_sent = uv_hrtime();

DEBUG_HTTP2SESSION2(session, "reading outbound file data for stream %d", id);
CHECK_EQ(id, stream->id());

int fd = source->fd;
int64_t offset = stream->fd_offset_;
ssize_t numchars = 0;

if (stream->fd_length_ >= 0 &&
stream->fd_length_ < static_cast<int64_t>(length))
length = stream->fd_length_;

uv_buf_t data;
data.base = reinterpret_cast<char*>(buf);
data.len = length;

uv_fs_t read_req;

if (length > 0) {
// TODO(addaleax): Never use synchronous I/O on the main thread.
numchars = uv_fs_read(session->event_loop(),
&read_req,
fd, &data, 1,
offset, nullptr);
uv_fs_req_cleanup(&read_req);
}

// Close the stream with an error if reading fails
if (numchars < 0)
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;

// Update the read offset for the next read
stream->fd_offset_ += numchars;
stream->fd_length_ -= numchars;

DEBUG_HTTP2SESSION2(session, "sending %d bytes", numchars);

// if numchars < length, assume that we are done.
if (static_cast<size_t>(numchars) < length || length <= 0) {
DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id);
*flags |= NGHTTP2_DATA_FLAG_EOF;
session->GetTrailers(stream, flags);
// If the stream or session gets destroyed during the GetTrailers
// callback, check that here and close down the stream
if (stream->IsDestroyed())
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
if (session->IsDestroyed())
return NGHTTP2_ERR_CALLBACK_FAILURE;
}

stream->statistics_.sent_bytes += numchars;
return numchars;
}

// The Stream Provider pulls data from a linked list of uv_buf_t structs
// built via the StreamBase API and the Streams js API.
Http2Stream::Provider::Stream::Stream(int options)
Expand Down Expand Up @@ -2508,27 +2405,6 @@ void Http2Stream::Respond(const FunctionCallbackInfo<Value>& args) {
DEBUG_HTTP2STREAM(stream, "response submitted");
}

// Initiates a response on the Http2Stream using a file descriptor to provide
// outbound DATA frames.
void Http2Stream::RespondFD(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Context> context = env->context();
Isolate* isolate = env->isolate();
Http2Stream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());

int fd = args[0]->Int32Value(context).ToChecked();
Local<Array> headers = args[1].As<Array>();

int64_t offset = args[2]->IntegerValue(context).ToChecked();
int64_t length = args[3]->IntegerValue(context).ToChecked();
int options = args[4]->IntegerValue(context).ToChecked();

Headers list(isolate, context, headers);
args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(),
offset, length, options));
DEBUG_HTTP2STREAM2(stream, "file response submitted for fd %d", fd);
}

// Submits informational headers on the Http2Stream
void Http2Stream::Info(const FunctionCallbackInfo<Value>& args) {
Expand Down Expand Up @@ -2891,7 +2767,6 @@ void Initialize(Local<Object> target,
env->SetProtoMethod(stream, "priority", Http2Stream::Priority);
env->SetProtoMethod(stream, "pushPromise", Http2Stream::PushPromise);
env->SetProtoMethod(stream, "info", Http2Stream::Info);
env->SetProtoMethod(stream, "respondFD", Http2Stream::RespondFD);
env->SetProtoMethod(stream, "respond", Http2Stream::Respond);
env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream);
env->SetProtoMethod(stream, "refreshState", Http2Stream::RefreshState);
Expand Down
24 changes: 0 additions & 24 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -580,13 +580,6 @@ class Http2Stream : public AsyncWrap,
size_t len,
int options);

// Send data read from a file descriptor as the response on this stream.
inline int SubmitFile(int fd,
nghttp2_nv* nva, size_t len,
int64_t offset,
int64_t length,
int options);

// Submit informational headers for this stream
inline int SubmitInfo(nghttp2_nv* nva, size_t len);

Expand Down Expand Up @@ -709,7 +702,6 @@ class Http2Stream : public AsyncWrap,
static void PushPromise(const FunctionCallbackInfo<Value>& args);
static void RefreshState(const FunctionCallbackInfo<Value>& args);
static void Info(const FunctionCallbackInfo<Value>& args);
static void RespondFD(const FunctionCallbackInfo<Value>& args);
static void Respond(const FunctionCallbackInfo<Value>& args);
static void RstStream(const FunctionCallbackInfo<Value>& args);

Expand Down Expand Up @@ -753,8 +745,6 @@ class Http2Stream : public AsyncWrap,
// waiting to be written out to the socket.
std::queue<nghttp2_stream_write> queue_;
size_t available_outbound_length_ = 0;
int64_t fd_offset_ = 0;
int64_t fd_length_ = -1;

Http2StreamListener stream_listener_;

Expand All @@ -780,20 +770,6 @@ class Http2Stream::Provider {
bool empty_ = false;
};

class Http2Stream::Provider::FD : public Http2Stream::Provider {
public:
FD(int options, int fd);
FD(Http2Stream* stream, int options, int fd);

static ssize_t OnRead(nghttp2_session* session,
int32_t id,
uint8_t* buf,
size_t length,
uint32_t* flags,
nghttp2_data_source* source,
void* user_data);
};

class Http2Stream::Provider::Stream : public Http2Stream::Provider {
public:
Stream(Http2Stream* stream, int options);
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-http2-respond-with-fd-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ const tests = specificTests.concat(genericTests);

let currentError;

// mock respondFD because we only care about testing error handling
Http2Stream.prototype.respondFD = () => currentError.ngError;
// mock `respond` because we only care about testing error handling
Http2Stream.prototype.respond = () => currentError.ngError;

const server = http2.createServer();
server.on('stream', common.mustCall((stream, headers) => {
Expand Down

0 comments on commit 1eb6b01

Please sign in to comment.