diff --git a/src/workerd/api/container.c++ b/src/workerd/api/container.c++ index 339a6e45f23..e3ca87e5d15 100644 --- a/src/workerd/api/container.c++ +++ b/src/workerd/api/container.c++ @@ -4,11 +4,239 @@ #include "container.h" +#include #include namespace workerd::api { +// ======================================================================================= +// Basic lifecycle methods + Container::Container(rpc::Container::Client rpcClient) : rpcClient(IoContext::current().addObject(kj::heap(kj::mv(rpcClient)))) {} +void Container::start(jsg::Lock& js, jsg::Optional maybeOptions) { + JSG_REQUIRE(!running, Error, "start() cannot be called on a container that is already running."); + + StartupOptions options = kj::mv(maybeOptions).orDefault({}); + + auto req = rpcClient->startRequest(); + KJ_IF_SOME(entrypoint, options.entrypoint) { + auto list = req.initEntrypoint(entrypoint.size()); + for (auto i: kj::indices(entrypoint)) { + list.set(i, entrypoint[i]); + } + } + req.setEnableInternet(options.enableInternet); + + IoContext::current().addTask(req.send().ignoreResult()); + + running = true; +} + +jsg::Promise Container::monitor(jsg::Lock& js) { + JSG_REQUIRE(running, Error, "monitor() cannot be called on a container that is not running."); + + return IoContext::current() + .awaitIo(js, rpcClient->monitorRequest(capnp::MessageSize{4, 0}).send().ignoreResult()) + .then(js, [this](jsg::Lock& js) { + running = false; + KJ_IF_SOME(d, destroyReason) { + jsg::Value error = kj::mv(d); + destroyReason = kj::none; + js.throwException(kj::mv(error)); + } + }, [this](jsg::Lock& js, jsg::Value&& error) { + running = false; + destroyReason = kj::none; + js.throwException(kj::mv(error)); + }); +} + +jsg::Promise Container::destroy(jsg::Lock& js, jsg::Optional error) { + if (!running) return js.resolvedPromise(); + + if (destroyReason == kj::none) { + destroyReason = kj::mv(error); + } + + return IoContext::current().awaitIo( + js, rpcClient->destroyRequest(capnp::MessageSize{4, 0}).send().ignoreResult()); +} + +void Container::signal(jsg::Lock& js, int signo) { + JSG_REQUIRE(signo > 0 && signo <= 64, RangeError, "Invalid signal number."); + JSG_REQUIRE(running, Error, "signal() cannot be called on a container that is not running."); + + auto req = rpcClient->signalRequest(capnp::MessageSize{4, 0}); + req.setSigno(signo); + IoContext::current().addTask(req.send().ignoreResult()); +} + +// ======================================================================================= +// getTcpPort() + +// `getTcpPort()` returns a `Fetcher`, on which `fetch()` and `connect()` can be called. `Fetcher` +// is a JavaScript wrapper around `WorkerInterface`, so we need to implement that. +class Container::TcpPortWorkerInterface final: public WorkerInterface { + public: + TcpPortWorkerInterface(capnp::ByteStreamFactory& byteStreamFactory, + const kj::HttpHeaderTable& headerTable, + rpc::Container::Port::Client port) + : byteStreamFactory(byteStreamFactory), + headerTable(headerTable), + port(kj::mv(port)) {} + + // Implements fetch(), i.e., HTTP requests. We form a TCP connection, then run HTTP over it + // (as opposed to, say, speaking http-over-capnp to the container service). + kj::Promise request(kj::HttpMethod method, + kj::StringPtr url, + const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, + kj::HttpService::Response& response) override { + // URLs should have been validated earlier in the stack, so parsing the URL should succeed. + auto parsedUrl = KJ_REQUIRE_NONNULL(kj::Url::tryParse(url, kj::Url::Context::HTTP_PROXY_REQUEST, + {.percentDecode = false, .allowEmpty = true}), + "invalid url?", url); + + // We don't support TLS. + JSG_REQUIRE(parsedUrl.scheme != "https", Error, + "Connencting to a container using HTTPS is not currently supported; use HTTP instead. " + "TLS is unnecessary anyway, as the connection is already secure by default."); + + // Schemes other than http: and https: should have been rejected earlier, but let's verify. + KJ_REQUIRE(parsedUrl.scheme == "http"); + + // We need to convert the URL from proxy format (full URL in request line) to host format + // (path in request line, hostname in Host header). + auto newHeaders = headers.cloneShallow(); + newHeaders.set(kj::HttpHeaderId::HOST, parsedUrl.host); + auto noHostUrl = parsedUrl.toString(kj::Url::Context::HTTP_REQUEST); + + // Make a TCP connection... + auto pipe = kj::newTwoWayPipe(); + auto connectionPromise = + connectImpl(*pipe.ends[1]).then([]() -> kj::Promise { return kj::NEVER_DONE; }); + + // ... and then stack an HttpClient on it ... + auto client = kj::newHttpClient(headerTable, *pipe.ends[0]); + + // ... and then adapt that to an HttpService ... + auto service = kj::newHttpService(*client); + + // ... and now we can just forward our call to that. + co_await connectionPromise.exclusiveJoin( + service->request(method, noHostUrl, newHeaders, requestBody, response)); + } + + // Implements connect(), i.e., forms a raw socket. + kj::Promise connect(kj::StringPtr host, + const kj::HttpHeaders& headers, + kj::AsyncIoStream& connection, + ConnectResponse& response, + kj::HttpConnectSettings settings) override { + JSG_REQUIRE(!settings.useTls, Error, + "Connencting to a container using TLS is not currently supported. It is unnecessary " + "anyway, as the connection is already secure by default."); + + auto promise = connectImpl(connection); + + kj::HttpHeaders responseHeaders(headerTable); + response.accept(200, "OK", responseHeaders); + + return promise; + } + + // The only `CustomEvent` that can happen through `Fetcher` is a JSRPC call. Maybe we will + // support this someday? But not today. + kj::Promise customEvent(kj::Own event) override { + return event->notSupported(); + } + + // There's no way to invoke the remaining event types via `Fetcher`. + kj::Promise prewarm(kj::StringPtr url) override { + KJ_UNREACHABLE; + } + kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { + KJ_UNREACHABLE; + } + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override { + KJ_UNREACHABLE; + } + + private: + capnp::ByteStreamFactory& byteStreamFactory; + const kj::HttpHeaderTable& headerTable; + rpc::Container::Port::Client port; + + // Connect to the port and pump bytes to/from `connection`. Used by both request() and + // connect(). + kj::Promise connectImpl(kj::AsyncIoStream& connection) { + // A lot of the following is copied from + // capnp::HttpOverCapnpFactory::KjToCapnpHttpServiceAdapter::connect(). + auto req = port.connectRequest(capnp::MessageSize{4, 1}); + auto downPipe = kj::newOneWayPipe(); + req.setDown(byteStreamFactory.kjToCapnp(kj::mv(downPipe.out))); + auto pipeline = req.send(); + + // Make sure the request message isn't pinned into memory through the co_await below. + { auto drop = kj::mv(req); } + + auto downPumpTask = + downPipe.in->pumpTo(connection) + .then([&connection, down = kj::mv(downPipe.in)](uint64_t) -> kj::Promise { + connection.shutdownWrite(); + return kj::NEVER_DONE; + }); + auto up = pipeline.getUp(); + + auto upStream = byteStreamFactory.capnpToKjExplicitEnd(up); + auto upPumpTask = connection.pumpTo(*upStream) + .then([&upStream = *upStream](uint64_t) mutable { + return upStream.end(); + }).then([up = kj::mv(up), upStream = kj::mv(upStream)]() mutable -> kj::Promise { + return kj::NEVER_DONE; + }); + + co_await pipeline.ignoreResult(); + } +}; + +// `Fetcher` actually wants us to give it a factory that creates a new `WorkerInterface` for each +// request, so this is that. +class Container::TcpPortOutgoingFactory final: public Fetcher::OutgoingFactory { + public: + TcpPortOutgoingFactory(capnp::ByteStreamFactory& byteStreamFactory, + const kj::HttpHeaderTable& headerTable, + rpc::Container::Port::Client port) + : byteStreamFactory(byteStreamFactory), + headerTable(headerTable), + port(kj::mv(port)) {} + + kj::Own newSingleUseClient(kj::Maybe cfStr) override { + // At present we have no use for `cfStr`. + return kj::heap(byteStreamFactory, headerTable, port); + } + + private: + capnp::ByteStreamFactory& byteStreamFactory; + const kj::HttpHeaderTable& headerTable; + rpc::Container::Port::Client port; +}; + +jsg::Ref Container::getTcpPort(jsg::Lock& js, int port) { + JSG_REQUIRE(port > 0 && port < 65536, TypeError, "Invalid port number: ", port); + + auto req = rpcClient->getTcpPortRequest(capnp::MessageSize{4, 0}); + req.setPort(port); + + auto& ioctx = IoContext::current(); + + kj::Own factory = kj::heap( + ioctx.getByteStreamFactory(), ioctx.getHeaderTable(), req.send().getPort()); + + return jsg::alloc( + ioctx.addObject(kj::mv(factory)), Fetcher::RequiresHostAndProtocol::YES, true); +} + } // namespace workerd::api diff --git a/src/workerd/api/container.h b/src/workerd/api/container.h index 758cb8b6b7f..54d44f187ea 100644 --- a/src/workerd/api/container.h +++ b/src/workerd/api/container.h @@ -23,14 +23,57 @@ class Container: public jsg::Object { public: Container(rpc::Container::Client rpcClient); + struct StartupOptions { + jsg::Optional> entrypoint; + bool enableInternet = false; + + // TODO(containers): Allow intercepting stdin/stdout/stderr by specifying streams here. + + JSG_STRUCT(entrypoint, enableInternet); + }; + + bool getRunning() { + return running; + } + + // Methods correspond closely to the RPC interface in `container.capnp`. + void start(jsg::Lock& js, jsg::Optional options); + jsg::Promise monitor(jsg::Lock& js); + jsg::Promise destroy(jsg::Lock& js, jsg::Optional error); + void signal(jsg::Lock& js, int signo); + jsg::Ref getTcpPort(jsg::Lock& js, int port); + + // TODO(containers): listenTcp() + JSG_RESOURCE_TYPE(Container) { - // TODO(now): Implement the API. + JSG_READONLY_PROTOTYPE_PROPERTY(running, getRunning); + JSG_METHOD(start); + JSG_METHOD(monitor); + JSG_METHOD(destroy); + JSG_METHOD(signal); + JSG_METHOD(getTcpPort); + } + + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("destroyReason", destroyReason); } private: IoOwn rpcClient; + + // TODO(containers): Actually check if the container is already running when the DO starts. + bool running = false; + + kj::Maybe destroyReason; + + void visitForGc(jsg::GcVisitor& visitor) { + visitor.visit(destroyReason); + } + + class TcpPortWorkerInterface; + class TcpPortOutgoingFactory; }; -#define EW_CONTAINER_ISOLATE_TYPES api::Container +#define EW_CONTAINER_ISOLATE_TYPES api::Container, api::Container::StartupOptions } // namespace workerd::api