Skip to content

Commit

Permalink
src: convey potential exceptions during StreamPipe construction
Browse files Browse the repository at this point in the history
This moves the V8 calls during the StreamPipe construction to an
overload of StreamPipe::New(), so that it is possible to indicate if
there is a pending exception/termination.

Refs: nodejs#40425 (comment)
Signed-off-by: Darshan Sen <raisinten@gmail.com>

PR-URL: nodejs#43240
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
  • Loading branch information
RaisinTen authored and italojs committed Jun 6, 2022
1 parent 0999c3f commit a0a237a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 30 deletions.
62 changes: 36 additions & 26 deletions src/stream_pipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Just;
using v8::Local;
using v8::Maybe;
using v8::Nothing;
using v8::Object;
using v8::Value;

Expand All @@ -28,31 +31,6 @@ StreamPipe::StreamPipe(StreamBase* source,
sink->PushStreamListener(&writable_listener_);

uses_wants_write_ = sink->HasWantsWrite();

// Set up links between this object and the source/sink objects.
// In particular, this makes sure that they are garbage collected as a group,
// if that applies to the given streams (for example, Http2Streams use
// weak references).
if (obj->Set(env()->context(),
env()->source_string(),
source->GetObject()).IsNothing()) {
return;
}
if (source->GetObject()->Set(env()->context(),
env()->pipe_target_string(),
obj).IsNothing()) {
return;
}
if (obj->Set(env()->context(),
env()->sink_string(),
sink->GetObject()).IsNothing()) {
return;
}
if (sink->GetObject()->Set(env()->context(),
env()->pipe_source_string(),
obj).IsNothing()) {
return;
}
}

StreamPipe::~StreamPipe() {
Expand Down Expand Up @@ -261,14 +239,46 @@ void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
return previous_listener_->OnStreamRead(nread, buf);
}

Maybe<StreamPipe*> StreamPipe::New(StreamBase* source,
StreamBase* sink,
Local<Object> obj) {
std::unique_ptr<StreamPipe> stream_pipe(new StreamPipe(source, sink, obj));

// Set up links between this object and the source/sink objects.
// In particular, this makes sure that they are garbage collected as a group,
// if that applies to the given streams (for example, Http2Streams use
// weak references).
Environment* env = source->stream_env();
if (obj->Set(env->context(), env->source_string(), source->GetObject())
.IsNothing()) {
return Nothing<StreamPipe*>();
}
if (source->GetObject()
->Set(env->context(), env->pipe_target_string(), obj)
.IsNothing()) {
return Nothing<StreamPipe*>();
}
if (obj->Set(env->context(), env->sink_string(), sink->GetObject())
.IsNothing()) {
return Nothing<StreamPipe*>();
}
if (sink->GetObject()
->Set(env->context(), env->pipe_source_string(), obj)
.IsNothing()) {
return Nothing<StreamPipe*>();
}

return Just(stream_pipe.release());
}

void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
CHECK(args.IsConstructCall());
CHECK(args[0]->IsObject());
CHECK(args[1]->IsObject());
StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());

new StreamPipe(source, sink, args.This());
if (StreamPipe::New(source, sink, args.This()).IsNothing()) return;
}

void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
Expand Down
7 changes: 3 additions & 4 deletions src/stream_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ class StreamPipe : public AsyncWrap {

void Unpipe(bool is_in_deletion = false);

// TODO(RaisinTen): Just like MessagePort, add the following overload:
// static StreamPipe* New(StreamBase* source, StreamBase* sink,
// v8::Local<v8::Object> obj);
// so that we can indicate if there is a pending exception/termination.
static v8::Maybe<StreamPipe*> New(StreamBase* source,
StreamBase* sink,
v8::Local<v8::Object> obj);
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down

0 comments on commit a0a237a

Please sign in to comment.