diff --git a/node.gyp b/node.gyp
index 5efe2323599cff..1b047fe9ac52f2 100644
--- a/node.gyp
+++ b/node.gyp
@@ -338,6 +338,7 @@
         'src/string_decoder.cc',
         'src/string_search.cc',
         'src/stream_base.cc',
+        'src/stream_pipe.cc',
         'src/stream_wrap.cc',
         'src/tcp_wrap.cc',
         'src/timer_wrap.cc',
@@ -394,6 +395,7 @@
         'src/string_decoder-inl.h',
         'src/stream_base.h',
         'src/stream_base-inl.h',
+        'src/stream_pipe.h',
         'src/stream_wrap.h',
         'src/tracing/agent.h',
         'src/tracing/node_trace_buffer.h',
diff --git a/src/async_wrap.h b/src/async_wrap.h
index 608764bab5361c..f0689d32f3c69f 100644
--- a/src/async_wrap.h
+++ b/src/async_wrap.h
@@ -58,6 +58,7 @@ namespace node {
   V(SHUTDOWNWRAP)                                                             \
   V(SIGNALWRAP)                                                               \
   V(STATWATCHER)                                                              \
+  V(STREAMPIPE)                                                               \
   V(TCPCONNECTWRAP)                                                           \
   V(TCPSERVERWRAP)                                                            \
   V(TCPWRAP)                                                                  \
diff --git a/src/env.h b/src/env.h
index e0f6856f8d0b1d..4fc6b31ffebd21 100644
--- a/src/env.h
+++ b/src/env.h
@@ -222,6 +222,7 @@ struct PackageConfig {
   V(onstop_string, "onstop")                                                  \
   V(onstreamclose_string, "onstreamclose")                                    \
   V(ontrailers_string, "ontrailers")                                          \
+  V(onunpipe_string, "onunpipe")                                              \
   V(onwrite_string, "onwrite")                                                \
   V(openssl_error_stack, "opensslErrorStack")                                 \
   V(output_string, "output")                                                  \
@@ -233,6 +234,8 @@ struct PackageConfig {
   V(pbkdf2_error_string, "PBKDF2 Error")                                      \
   V(pid_string, "pid")                                                        \
   V(pipe_string, "pipe")                                                      \
+  V(pipe_target_string, "pipeTarget")                                         \
+  V(pipe_source_string, "pipeSource")                                         \
   V(port_string, "port")                                                      \
   V(preference_string, "preference")                                          \
   V(priority_string, "priority")                                              \
@@ -255,9 +258,11 @@ struct PackageConfig {
   V(session_id_string, "sessionId")                                           \
   V(shell_string, "shell")                                                    \
   V(signal_string, "signal")                                                  \
+  V(sink_string, "sink")                                                      \
   V(size_string, "size")                                                      \
   V(sni_context_err_string, "Invalid SNI context")                            \
   V(sni_context_string, "sni_context")                                        \
+  V(source_string, "source")                                                  \
   V(stack_string, "stack")                                                    \
   V(status_string, "status")                                                  \
   V(stdio_string, "stdio")                                                    \
diff --git a/src/node_http2.cc b/src/node_http2.cc
index 8dd222a692a52c..d6df93cf3804a7 100644
--- a/src/node_http2.cc
+++ b/src/node_http2.cc
@@ -1813,7 +1813,9 @@ inline void Http2Stream::Close(int32_t code) {
 }
 
 int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
-  CHECK(!this->IsDestroyed());
+  if (IsDestroyed())
+    return UV_EPIPE;
+
   {
     Http2Scope h2scope(this);
     flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
diff --git a/src/node_internals.h b/src/node_internals.h
index 2faa6f93475ad7..79c2ce553200f3 100644
--- a/src/node_internals.h
+++ b/src/node_internals.h
@@ -120,6 +120,7 @@ struct sockaddr;
     V(serdes)                                                                 \
     V(signal_wrap)                                                            \
     V(spawn_sync)                                                             \
+    V(stream_pipe)                                                            \
     V(stream_wrap)                                                            \
     V(string_decoder)                                                         \
     V(tcp_wrap)                                                               \
diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h
index f0d522a7b06b6c..7523b3a545355f 100644
--- a/src/stream_base-inl.h
+++ b/src/stream_base-inl.h
@@ -67,8 +67,14 @@ inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
 
 inline StreamResource::~StreamResource() {
   while (listener_ != nullptr) {
-    listener_->OnStreamDestroy();
-    RemoveStreamListener(listener_);
+    StreamListener* listener = listener_;
+    listener->OnStreamDestroy();
+    // Remove the listener if it didn’t remove itself. This makes the logic
+    // logic in `OnStreamDestroy()` implementations easier, because they
+    // may call generic cleanup functions which can just remove the
+    // listener unconditionally.
+    if (listener == listener_)
+      RemoveStreamListener(listener_);
   }
 }
 
diff --git a/src/stream_base.h b/src/stream_base.h
index 96a7787e5bb41c..7264824265a579 100644
--- a/src/stream_base.h
+++ b/src/stream_base.h
@@ -141,6 +141,9 @@ class StreamListener {
   // This is called immediately before the stream is destroyed.
   virtual void OnStreamDestroy() {}
 
+  // The stream this is currently associated with, or nullptr if there is none.
+  inline StreamResource* stream() { return stream_; }
+
  protected:
   // Pass along a read error to the `StreamListener` instance that was active
   // before this one. For example, a protocol parser does not care about read
diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc
new file mode 100644
index 00000000000000..8f0263cd9ae99b
--- /dev/null
+++ b/src/stream_pipe.cc
@@ -0,0 +1,266 @@
+#include "stream_pipe.h"
+#include "stream_base-inl.h"
+#include "node_buffer.h"
+#include "node_internals.h"
+
+using v8::Context;
+using v8::External;
+using v8::FunctionCallbackInfo;
+using v8::FunctionTemplate;
+using v8::Local;
+using v8::Object;
+using v8::Value;
+
+namespace node {
+
+StreamPipe::StreamPipe(StreamBase* source,
+                       StreamBase* sink,
+                       Local<Object> obj)
+    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
+  MakeWeak(this);
+
+  CHECK_NE(sink, nullptr);
+  CHECK_NE(source, nullptr);
+
+  source->PushStreamListener(&readable_listener_);
+  sink->PushStreamListener(&writable_listener_);
+
+  CHECK(sink->HasWantsWrite());
+
+  // Set up links between this object and the source/sink objects.
+  // In particular, this makes sure that they are garbage collected as a group,
+  // if that applies to the given streams (for example, Http2Streams use
+  // weak references).
+  obj->Set(env()->context(), env()->source_string(), source->GetObject())
+      .FromJust();
+  source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
+      .FromJust();
+  obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
+      .FromJust();
+  sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
+      .FromJust();
+}
+
+StreamPipe::~StreamPipe() {
+  CHECK(is_closed_);
+}
+
+StreamBase* StreamPipe::source() {
+  return static_cast<StreamBase*>(readable_listener_.stream());
+}
+
+StreamBase* StreamPipe::sink() {
+  return static_cast<StreamBase*>(writable_listener_.stream());
+}
+
+void StreamPipe::Unpipe() {
+  if (is_closed_)
+    return;
+
+  // Note that we cannot use virtual methods on `source` and `sink` here,
+  // because this function can be called from their destructors via
+  // `OnStreamDestroy()`.
+
+  is_closed_ = true;
+  is_reading_ = false;
+  source()->RemoveStreamListener(&readable_listener_);
+  sink()->RemoveStreamListener(&writable_listener_);
+
+  // Delay the JS-facing part with SetImmediate, because this might be from
+  // inside the garbage collector, so we can’t run JS here.
+  HandleScope handle_scope(env()->isolate());
+  env()->SetImmediate([](Environment* env, void* data) {
+    StreamPipe* pipe = static_cast<StreamPipe*>(data);
+
+    HandleScope handle_scope(env->isolate());
+    Context::Scope context_scope(env->context());
+    Local<Object> object = pipe->object();
+
+    if (object->Has(env->context(), env->onunpipe_string()).FromJust()) {
+      pipe->MakeCallback(env->onunpipe_string(), 0, nullptr).ToLocalChecked();
+    }
+
+    // Set all the links established in the constructor to `null`.
+    Local<Value> null = Null(env->isolate());
+
+    Local<Value> source_v;
+    Local<Value> sink_v;
+    source_v = object->Get(env->context(), env->source_string())
+        .ToLocalChecked();
+    sink_v = object->Get(env->context(), env->sink_string())
+        .ToLocalChecked();
+    CHECK(source_v->IsObject());
+    CHECK(sink_v->IsObject());
+
+    object->Set(env->context(), env->source_string(), null).FromJust();
+    object->Set(env->context(), env->sink_string(), null).FromJust();
+    source_v.As<Object>()->Set(env->context(),
+                               env->pipe_target_string(),
+                               null).FromJust();
+    sink_v.As<Object>()->Set(env->context(),
+                             env->pipe_source_string(),
+                             null).FromJust();
+  }, static_cast<void*>(this), object());
+}
+
+uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
+  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
+  size_t size = std::min(suggested_size, pipe->wanted_data_);
+  CHECK_GT(size, 0);
+  return uv_buf_init(Malloc(size), size);
+}
+
+void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
+                                                const uv_buf_t& buf) {
+  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
+  AsyncScope async_scope(pipe);
+  if (nread < 0) {
+    // EOF or error; stop reading and pass the error to the previous listener
+    // (which might end up in JS).
+    free(buf.base);
+    pipe->is_eof_ = true;
+    stream()->ReadStop();
+    CHECK_NE(previous_listener_, nullptr);
+    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
+    // If we’re not writing, close now. Otherwise, we’ll do that in
+    // `OnStreamAfterWrite()`.
+    if (!pipe->is_writing_) {
+      pipe->ShutdownWritable();
+      pipe->Unpipe();
+    }
+    return;
+  }
+
+  pipe->ProcessData(nread, buf);
+}
+
+void StreamPipe::ProcessData(size_t nread, const uv_buf_t& buf) {
+  uv_buf_t buffer = uv_buf_init(buf.base, nread);
+  StreamWriteResult res = sink()->Write(&buffer, 1);
+  if (!res.async) {
+    free(buf.base);
+    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
+  } else {
+    is_writing_ = true;
+    is_reading_ = false;
+    res.wrap->SetAllocatedStorage(buf.base, buf.len);
+    source()->ReadStop();
+  }
+}
+
+void StreamPipe::ShutdownWritable() {
+  sink()->Shutdown();
+}
+
+void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
+                                                      int status) {
+  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
+  pipe->is_writing_ = false;
+  if (pipe->is_eof_) {
+    AsyncScope async_scope(pipe);
+    pipe->ShutdownWritable();
+    pipe->Unpipe();
+    return;
+  }
+
+  if (status != 0) {
+    CHECK_NE(previous_listener_, nullptr);
+    StreamListener* prev = previous_listener_;
+    pipe->Unpipe();
+    prev->OnStreamAfterWrite(w, status);
+    return;
+  }
+}
+
+void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
+                                                         int status) {
+  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
+  CHECK_NE(previous_listener_, nullptr);
+  StreamListener* prev = previous_listener_;
+  pipe->Unpipe();
+  prev->OnStreamAfterShutdown(w, status);
+}
+
+void StreamPipe::ReadableListener::OnStreamDestroy() {
+  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
+  if (!pipe->is_eof_) {
+    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
+  }
+}
+
+void StreamPipe::WritableListener::OnStreamDestroy() {
+  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
+  pipe->is_eof_ = true;
+  pipe->Unpipe();
+}
+
+void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
+  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
+  pipe->wanted_data_ = suggested_size;
+  if (pipe->is_reading_ || pipe->is_closed_)
+    return;
+  AsyncScope async_scope(pipe);
+  pipe->is_reading_ = true;
+  pipe->source()->ReadStart();
+}
+
+uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
+  CHECK_NE(previous_listener_, nullptr);
+  return previous_listener_->OnStreamAlloc(suggested_size);
+}
+
+void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
+                                                const uv_buf_t& buf) {
+  CHECK_NE(previous_listener_, nullptr);
+  return previous_listener_->OnStreamRead(nread, buf);
+}
+
+void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
+  CHECK(args.IsConstructCall());
+  CHECK(args[0]->IsExternal());
+  CHECK(args[1]->IsExternal());
+  auto source = static_cast<StreamBase*>(args[0].As<External>()->Value());
+  auto sink = static_cast<StreamBase*>(args[1].As<External>()->Value());
+
+  new StreamPipe(source, sink, args.This());
+}
+
+void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
+  StreamPipe* pipe;
+  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
+  pipe->is_closed_ = false;
+  if (pipe->wanted_data_ > 0)
+    pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
+}
+
+void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
+  StreamPipe* pipe;
+  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
+  pipe->Unpipe();
+}
+
+namespace {
+
+void InitializeStreamPipe(Local<Object> target,
+                          Local<Value> unused,
+                          Local<Context> context) {
+  Environment* env = Environment::GetCurrent(context);
+
+  // Create FunctionTemplate for FileHandle::CloseReq
+  Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
+  Local<String> stream_pipe_string =
+      FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
+  env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
+  env->SetProtoMethod(pipe, "start", StreamPipe::Start);
+  AsyncWrap::AddWrapMethods(env, pipe);
+  pipe->SetClassName(stream_pipe_string);
+  pipe->InstanceTemplate()->SetInternalFieldCount(1);
+  target->Set(context, stream_pipe_string, pipe->GetFunction()).FromJust();
+}
+
+}  // anonymous namespace
+
+}  // namespace node
+
+NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
+                                   node::InitializeStreamPipe)
diff --git a/src/stream_pipe.h b/src/stream_pipe.h
new file mode 100644
index 00000000000000..98d6dae11be841
--- /dev/null
+++ b/src/stream_pipe.h
@@ -0,0 +1,68 @@
+#ifndef SRC_STREAM_PIPE_H_
+#define SRC_STREAM_PIPE_H_
+
+#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+#include "stream_base.h"
+
+namespace node {
+
+class StreamPipe : public AsyncWrap {
+ public:
+  StreamPipe(StreamBase* source, StreamBase* sink, v8::Local<v8::Object> obj);
+  ~StreamPipe();
+
+  void Unpipe();
+
+  static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+  static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
+  static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+  size_t self_size() const override { return sizeof(*this); }
+
+ private:
+  StreamBase* source();
+  StreamBase* sink();
+
+  void ShutdownWritable();
+  void FlushToWritable();
+
+  bool is_reading_ = false;
+  bool is_writing_ = false;
+  bool is_eof_ = false;
+  bool is_closed_ = true;
+
+  // Set a default value so that when we’re coming from Start(), we know
+  // that we don’t want to read just yet.
+  // This will likely need to be changed when supporting streams without
+  // `OnStreamWantsWrite()` support.
+  size_t wanted_data_ = 0;
+
+  void ProcessData(size_t nread, const uv_buf_t& buf);
+
+  class ReadableListener : public StreamListener {
+   public:
+    uv_buf_t OnStreamAlloc(size_t suggested_size) override;
+    void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
+    void OnStreamDestroy() override;
+  };
+
+  class WritableListener : public StreamListener {
+   public:
+    uv_buf_t OnStreamAlloc(size_t suggested_size) override;
+    void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
+    void OnStreamAfterWrite(WriteWrap* w, int status) override;
+    void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
+    void OnStreamWantsWrite(size_t suggested_size) override;
+    void OnStreamDestroy() override;
+  };
+
+  ReadableListener readable_listener_;
+  WritableListener writable_listener_;
+};
+
+}  // namespace node
+
+#endif
+
+#endif  // SRC_STREAM_PIPE_H_
diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js
index 66eaabec25d977..64c4fd5cd8ab50 100644
--- a/test/sequential/test-async-wrap-getasyncid.js
+++ b/test/sequential/test-async-wrap-getasyncid.js
@@ -35,6 +35,7 @@ common.crashOnUnhandledRejection();
     delete providers.HTTP2STREAM;
     delete providers.HTTP2PING;
     delete providers.HTTP2SETTINGS;
+    delete providers.STREAMPIPE;
 
     const objKeys = Object.keys(providers);
     if (objKeys.length > 0)