diff --git a/src/env-inl.h b/src/env-inl.h index 31f2af402417dc..05c7a90c52fee7 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -484,6 +484,11 @@ Environment::fs_stats_field_array() { return &fs_stats_field_array_; } +inline std::vector>& +Environment::file_handle_read_wrap_freelist() { + return file_handle_read_wrap_freelist_; +} + void Environment::CreateImmediate(native_immediate_callback cb, void* data, v8::Local obj, diff --git a/src/env.cc b/src/env.cc index 36a3612a780731..d361262e22ba09 100644 --- a/src/env.cc +++ b/src/env.cc @@ -2,6 +2,7 @@ #include "async_wrap.h" #include "node_buffer.h" #include "node_platform.h" +#include "node_file.h" #include #include diff --git a/src/env.h b/src/env.h index c778e7ba112340..e0f6856f8d0b1d 100644 --- a/src/env.h +++ b/src/env.h @@ -48,6 +48,10 @@ struct nghttp2_rcbuf; namespace node { +namespace fs { +class FileHandleReadWrap; +} + namespace performance { class performance_state; } @@ -297,6 +301,7 @@ struct PackageConfig { V(context, v8::Context) \ V(domain_callback, v8::Function) \ V(fd_constructor_template, v8::ObjectTemplate) \ + V(filehandlereadwrap_template, v8::ObjectTemplate) \ V(fsreqpromise_constructor_template, v8::ObjectTemplate) \ V(fdclose_constructor_template, v8::ObjectTemplate) \ V(host_import_module_dynamically_callback, v8::Function) \ @@ -642,6 +647,9 @@ class Environment { inline AliasedBuffer* fs_stats_field_array(); + inline std::vector>& + file_handle_read_wrap_freelist(); + inline performance::performance_state* performance_state(); inline std::map* performance_marks(); @@ -822,6 +830,9 @@ class Environment { static const int kFsStatsFieldsLength = 2 * 14; AliasedBuffer fs_stats_field_array_; + std::vector> + file_handle_read_wrap_freelist_; + struct ExitCallback { void (*cb_)(void* arg); void* arg_; diff --git a/src/node_file.cc b/src/node_file.cc index fe3b0e1383e8cb..36bb326aa51517 100644 --- a/src/node_file.cc +++ b/src/node_file.cc @@ -26,6 +26,7 @@ #include "node_file.h" #include "req_wrap-inl.h" +#include "stream_base-inl.h" #include "string_bytes.h" #include "string_search.h" @@ -41,7 +42,6 @@ #endif #include -#include namespace node { @@ -115,11 +115,13 @@ using v8::Value; // The FileHandle object wraps a file descriptor and will close it on garbage // collection if necessary. If that happens, a process warning will be // emitted (or a fatal exception will occur if the fd cannot be closed.) -FileHandle::FileHandle(Environment* env, int fd) +FileHandle::FileHandle(Environment* env, int fd, Local obj) : AsyncWrap(env, - env->fd_constructor_template() - ->NewInstance(env->context()).ToLocalChecked(), - AsyncWrap::PROVIDER_FILEHANDLE), fd_(fd) { + obj.IsEmpty() ? env->fd_constructor_template() + ->NewInstance(env->context()).ToLocalChecked() : obj, + AsyncWrap::PROVIDER_FILEHANDLE), + StreamBase(env), + fd_(fd) { MakeWeak(this); v8::PropertyAttribute attr = static_cast(v8::ReadOnly | v8::DontDelete); @@ -129,6 +131,19 @@ FileHandle::FileHandle(Environment* env, int fd) attr).FromJust(); } +void FileHandle::New(const v8::FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(args.IsConstructCall()); + CHECK(args[0]->IsInt32()); + + FileHandle* handle = + new FileHandle(env, args[0].As()->Value(), args.This()); + if (args[1]->IsNumber()) + handle->read_offset_ = args[1]->IntegerValue(env->context()).FromJust(); + if (args[2]->IsNumber()) + handle->read_length_ = args[2]->IntegerValue(env->context()).FromJust(); +} + FileHandle::~FileHandle() { CHECK(!closing_); // We should not be deleting while explicitly closing! Close(); // Close synchronously and emit warning @@ -142,10 +157,10 @@ FileHandle::~FileHandle() { // will crash the process immediately. inline void FileHandle::Close() { if (closed_) return; - closed_ = true; uv_fs_t req; int ret = uv_fs_close(env()->event_loop(), &req, fd_, nullptr); uv_fs_req_cleanup(&req); + AfterClose(); struct err_detail { int ret; int fd; }; @@ -219,18 +234,18 @@ inline MaybeLocal FileHandle::ClosePromise() { CHECK(!maybe_resolver.IsEmpty()); Local resolver = maybe_resolver.ToLocalChecked(); Local promise = resolver.As(); + CHECK(!reading_); if (!closed_ && !closing_) { closing_ = true; CloseReq* req = new CloseReq(env(), promise, object()); auto AfterClose = [](uv_fs_t* req) { CloseReq* close = static_cast(req->data); CHECK_NE(close, nullptr); - close->file_handle()->closing_ = false; + close->file_handle()->AfterClose(); Isolate* isolate = close->env()->isolate(); if (req->result < 0) { close->Reject(UVException(isolate, req->result, "close")); } else { - close->file_handle()->closed_ = true; close->Resolve(); } delete close; @@ -256,6 +271,162 @@ void FileHandle::Close(const FunctionCallbackInfo& args) { } +void FileHandle::ReleaseFD(const FunctionCallbackInfo& args) { + FileHandle* fd; + ASSIGN_OR_RETURN_UNWRAP(&fd, args.Holder()); + // Just act as if this FileHandle has been closed. + fd->AfterClose(); +} + + +void FileHandle::AfterClose() { + closing_ = false; + closed_ = true; + if (reading_ && !persistent().IsEmpty()) + EmitRead(UV_EOF); +} + + +FileHandleReadWrap::FileHandleReadWrap(FileHandle* handle, Local obj) + : ReqWrap(handle->env(), obj, AsyncWrap::PROVIDER_FSREQWRAP), + file_handle_(handle) {} + +int FileHandle::ReadStart() { + if (!IsAlive() || IsClosing()) + return UV_EOF; + + reading_ = true; + + if (current_read_) + return 0; + + std::unique_ptr read_wrap; + + if (read_length_ == 0) { + EmitRead(UV_EOF); + return 0; + } + + { + // Create a new FileHandleReadWrap or re-use one. + // Either way, we need these two scopes for AsyncReset() or otherwise + // for creating the new instance. + HandleScope handle_scope(env()->isolate()); + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this); + + auto& freelist = env()->file_handle_read_wrap_freelist(); + if (freelist.size() > 0) { + read_wrap = std::move(freelist.back()); + freelist.pop_back(); + read_wrap->AsyncReset(); + read_wrap->file_handle_ = this; + } else { + Local wrap_obj = env()->filehandlereadwrap_template() + ->NewInstance(env()->context()).ToLocalChecked(); + read_wrap.reset(new FileHandleReadWrap(this, wrap_obj)); + } + } + int64_t recommended_read = 65536; + if (read_length_ >= 0 && read_length_ <= recommended_read) + recommended_read = read_length_; + + read_wrap->buffer_ = EmitAlloc(recommended_read); + read_wrap->Dispatched(); + + current_read_ = std::move(read_wrap); + + uv_fs_read(env()->event_loop(), + current_read_->req(), + fd_, + ¤t_read_->buffer_, + 1, + read_offset_, + [](uv_fs_t* req) { + FileHandle* handle; + { + FileHandleReadWrap* req_wrap = FileHandleReadWrap::from_req(req); + handle = req_wrap->file_handle_; + CHECK_EQ(handle->current_read_.get(), req_wrap); + } + + // ReadStart() checks whether current_read_ is set to determine whether + // a read is in progress. Moving it into a local variable makes sure that + // the ReadStart() call below doesn’t think we’re still actively reading. + std::unique_ptr read_wrap = + std::move(handle->current_read_); + + int result = req->result; + uv_buf_t buffer = read_wrap->buffer_; + + uv_fs_req_cleanup(req); + + // Push the read wrap back to the freelist, or let it be destroyed + // once we’re exiting the current scope. + constexpr size_t wanted_freelist_fill = 100; + auto& freelist = handle->env()->file_handle_read_wrap_freelist(); + if (freelist.size() < wanted_freelist_fill) + freelist.emplace_back(std::move(read_wrap)); + + if (result >= 0) { + // Read at most as many bytes as we originally planned to. + if (handle->read_length_ >= 0 && handle->read_length_ < result) + result = handle->read_length_; + + // If we read data and we have an expected length, decrease it by + // how much we have read. + if (handle->read_length_ >= 0) + handle->read_length_ -= result; + + // If we have an offset, increase it by how much we have read. + if (handle->read_offset_ >= 0) + handle->read_offset_ += result; + } + + // Reading 0 bytes from a file always means EOF, or that we reached + // the end of the requested range. + if (result == 0) + result = UV_EOF; + + handle->EmitRead(result, buffer); + + // Start over, if EmitRead() didn’t tell us to stop. + if (handle->reading_) + handle->ReadStart(); + }); + + return 0; +} + +int FileHandle::ReadStop() { + reading_ = false; + return 0; +} + +typedef SimpleShutdownWrap> FileHandleCloseWrap; + +ShutdownWrap* FileHandle::CreateShutdownWrap(Local object) { + return new FileHandleCloseWrap(this, object); +} + +int FileHandle::DoShutdown(ShutdownWrap* req_wrap) { + FileHandleCloseWrap* wrap = static_cast(req_wrap); + closing_ = true; + wrap->Dispatched(); + uv_fs_close(env()->event_loop(), wrap->req(), fd_, [](uv_fs_t* req) { + FileHandleCloseWrap* wrap = static_cast( + FileHandleCloseWrap::from_req(req)); + FileHandle* handle = static_cast(wrap->stream()); + handle->AfterClose(); + + int result = req->result; + uv_fs_req_cleanup(req); + wrap->Done(result); + }); + + return 0; +} + + void FSReqWrap::Reject(Local reject) { MakeCallback(env()->oncomplete_string(), 1, &reject); } @@ -1730,6 +1901,17 @@ void InitFs(Local target, fst->SetClassName(wrapString); target->Set(context, wrapString, fst->GetFunction()).FromJust(); + // Create FunctionTemplate for FileHandleReadWrap. There’s no need + // to do anything in the constructor, so we only store the instance template. + Local fh_rw = FunctionTemplate::New(env->isolate()); + fh_rw->InstanceTemplate()->SetInternalFieldCount(1); + AsyncWrap::AddWrapMethods(env, fh_rw); + Local fhWrapString = + FIXED_ONE_BYTE_STRING(env->isolate(), "FileHandleReqWrap"); + fh_rw->SetClassName(fhWrapString); + env->set_filehandlereadwrap_template( + fst->InstanceTemplate()); + // Create Function Template for FSReqPromise Local fpt = FunctionTemplate::New(env->isolate()); AsyncWrap::AddWrapMethods(env, fpt); @@ -1741,14 +1923,16 @@ void InitFs(Local target, env->set_fsreqpromise_constructor_template(fpo); // Create FunctionTemplate for FileHandle - Local fd = FunctionTemplate::New(env->isolate()); + Local fd = env->NewFunctionTemplate(FileHandle::New); AsyncWrap::AddWrapMethods(env, fd); env->SetProtoMethod(fd, "close", FileHandle::Close); + env->SetProtoMethod(fd, "releaseFD", FileHandle::ReleaseFD); Local fdt = fd->InstanceTemplate(); fdt->SetInternalFieldCount(1); Local handleString = FIXED_ONE_BYTE_STRING(env->isolate(), "FileHandle"); fd->SetClassName(handleString); + StreamBase::AddMethods(env, fd, StreamBase::kFlagNone); target->Set(context, handleString, fd->GetFunction()).FromJust(); env->set_fd_constructor_template(fdt); diff --git a/src/node_file.h b/src/node_file.h index bf277a0e433525..fa373d46ad0003 100644 --- a/src/node_file.h +++ b/src/node_file.h @@ -4,6 +4,7 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "node.h" +#include "stream_base.h" #include "req_wrap-inl.h" namespace node { @@ -20,6 +21,7 @@ using v8::Value; namespace fs { + class FSReqBase : public ReqWrap { public: FSReqBase(Environment* env, Local req, AsyncWrap::ProviderType type) @@ -120,13 +122,38 @@ class FSReqAfterScope { Context::Scope context_scope_; }; +class FileHandle; + +// A request wrap specifically for uv_fs_read()s scheduled for reading +// from a FileHandle. +class FileHandleReadWrap : public ReqWrap { + public: + FileHandleReadWrap(FileHandle* handle, v8::Local obj); + + static inline FileHandleReadWrap* from_req(uv_fs_t* req) { + return static_cast(ReqWrap::from_req(req)); + } + + size_t self_size() const override { return sizeof(*this); } + + private: + FileHandle* file_handle_; + uv_buf_t buffer_; + + friend class FileHandle; +}; + // A wrapper for a file descriptor that will automatically close the fd when // the object is garbage collected -class FileHandle : public AsyncWrap { +class FileHandle : public AsyncWrap, public StreamBase { public: - FileHandle(Environment* env, int fd); + FileHandle(Environment* env, + int fd, + v8::Local obj = v8::Local()); virtual ~FileHandle(); + static void New(const v8::FunctionCallbackInfo& args); + int fd() const { return fd_; } size_t self_size() const override { return sizeof(*this); } @@ -134,9 +161,32 @@ class FileHandle : public AsyncWrap { // be resolved once closing is complete. static void Close(const FunctionCallbackInfo& args); + // Releases ownership of the FD. + static void ReleaseFD(const FunctionCallbackInfo& args); + + // StreamBase interface: + int ReadStart() override; + int ReadStop() override; + + bool IsAlive() override { return !closed_; } + bool IsClosing() override { return closing_; } + AsyncWrap* GetAsyncWrap() override { return this; } + + // In the case of file streams, shutting down corresponds to closing. + ShutdownWrap* CreateShutdownWrap(v8::Local object) override; + int DoShutdown(ShutdownWrap* req_wrap) override; + + int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) override { + return UV_ENOSYS; // Not implemented (yet). + } + private: // Synchronous close that emits a warning - inline void Close(); + void Close(); + void AfterClose(); class CloseReq : public ReqWrap { public: @@ -176,6 +226,12 @@ class FileHandle : public AsyncWrap { int fd_; bool closing_ = false; bool closed_ = false; + int64_t read_offset_ = -1; + int64_t read_length_ = -1; + + bool reading_ = false; + std::unique_ptr current_read_ = nullptr; + DISALLOW_COPY_AND_ASSIGN(FileHandle); }; diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index c87393e6fc1c72..b7495a80ac63e0 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -146,6 +146,9 @@ inline Environment* StreamBase::stream_env() const { inline int StreamBase::Shutdown(v8::Local req_wrap_obj) { Environment* env = stream_env(); + + HandleScope handle_scope(env->isolate()); + if (req_wrap_obj.IsEmpty()) { req_wrap_obj = env->shutdown_wrap_constructor_function() @@ -183,6 +186,8 @@ inline StreamWriteResult StreamBase::Write( } } + HandleScope handle_scope(env->isolate()); + if (req_wrap_obj.IsEmpty()) { req_wrap_obj = env->write_wrap_constructor_function() diff --git a/src/stream_base.h b/src/stream_base.h index 6962648650e1a6..f3e010d5bc94c9 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -95,7 +95,7 @@ class StreamListener { public: virtual ~StreamListener(); - // This is called when a stream wants to allocate memory immediately before + // This is called when a stream wants to allocate memory before // reading data into the freshly allocated buffer (i.e. it is always followed // by a `OnStreamRead()` call). // This memory may be statically or dynamically allocated; for example, @@ -105,6 +105,9 @@ class StreamListener { // The returned buffer does not need to contain `suggested_size` bytes. // The default implementation of this method returns a buffer that has exactly // the suggested size and is allocated using malloc(). + // It is not valid to return a zero-length buffer from this method. + // It is not guaranteed that the corresponding `OnStreamRead()` call + // happens in the same event loop turn as this call. virtual uv_buf_t OnStreamAlloc(size_t suggested_size); // `OnStreamRead()` is called when data is available on the socket and has diff --git a/src/util.h b/src/util.h index 7c679952d5fb1f..c822390ec56f8b 100644 --- a/src/util.h +++ b/src/util.h @@ -34,6 +34,7 @@ #include #include +#include // std::function #include // std::remove_reference namespace node { @@ -433,7 +434,6 @@ class BufferValue : public MaybeStackBuffer { // Use this when a variable or parameter is unused in order to explicitly // silence a compiler warning about that. template inline void USE(T&&) {} - } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS