Skip to content

Commit

Permalink
feat(ext/http): upgradeHttpRaw
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Apr 26, 2023
1 parent 33f1c70 commit 2e45e95
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 55 deletions.
12 changes: 0 additions & 12 deletions cli/bench/testdata/deno_upgrade_http.js

This file was deleted.

7 changes: 6 additions & 1 deletion cli/tests/unit/serve_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import {
} from "./test_util.ts";
import { consoleSize } from "../../../runtime/js/40_tty.js";

const {
upgradeHttpRaw,
// @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol
} = Deno[Deno.internal];

function createOnErrorCb(ac: AbortController): (err: unknown) => Response {
return (err) => {
console.error(err);
Expand Down Expand Up @@ -810,7 +815,7 @@ Deno.test(
const listeningPromise = deferred();
const server = Deno.serve({
handler: async (request) => {
const { conn, response } = Deno.upgradeHttpRaw(request);
const { conn, response } = upgradeHttpRaw(request);
const buf = new Uint8Array(1024);
let read;

Expand Down
26 changes: 0 additions & 26 deletions cli/tsc/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1517,32 +1517,6 @@ declare namespace Deno {
headers?: HeadersInit,
): HttpUpgrade;

/** **UNSTABLE**: New API, yet to be vetted.
*
* Allows "hijacking" the connection that the request is associated with.
* This can be used to implement protocols that build on top of HTTP (eg.
* {@linkcode WebSocket}).
*
* This method can only be called on requests originating the
* {@linkcode Deno.serve} server.
*
* @category HTTP Server
*/
export function upgradeHttpRaw(request: Request): HttpUpgrade;

/** The object that is returned from a {@linkcode Deno.upgradeHttp} or {@linkcode Deno.upgradeHttpRaw}
* request.
*
* @category Web Sockets */
export interface HttpUpgrade {
/** The response object that represents the HTTP response to the client,
* which should be used to the {@linkcode RequestEvent} `.respondWith()` for
* the upgrade to be successful. */
response: Response;
/** The socket interface to communicate to the client via. */
conn: Deno.Conn;
}

/** **UNSTABLE**: New API, yet to be vetted.
*
* Open a new {@linkcode Deno.Kv} connection to persist data.
Expand Down
21 changes: 18 additions & 3 deletions ext/http/00_serve.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
const core = globalThis.Deno.core;
const primordials = globalThis.__bootstrap.primordials;
const internals = globalThis.__bootstrap.internals;

const { BadResourcePrototype } = core;
import { InnerBody } from "ext:deno_fetch/22_body.js";
Expand All @@ -10,7 +11,7 @@ import {
newInnerResponse,
toInnerResponse,
} from "ext:deno_fetch/23_response.js";
import { fromInnerRequest } from "ext:deno_fetch/23_request.js";
import { fromInnerRequest, toInnerRequest } from "ext:deno_fetch/23_request.js";
import { AbortController } from "ext:deno_web/03_abort_signal.js";
import {
_eventLoop,
Expand Down Expand Up @@ -83,6 +84,14 @@ const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse(
"immutable",
);

function upgradeHttpRaw(req, conn) {
const inner = toInnerRequest(req);
if (inner._wantsUpgrade) {
return inner._wantsUpgrade("upgradeHttpRaw", conn);
}
throw new TypeError("upgradeHttpRaw may only be used with Deno.serve");
}

class InnerRequest {
#slabId;
#context;
Expand Down Expand Up @@ -126,6 +135,7 @@ class InnerRequest {
// upgradeHttpRaw is sync
if (upgradeType == "upgradeHttpRaw") {
const slabId = this.#slabId;
const underlyingConn = originalArgs[0];

this.url();
this.headerList;
Expand All @@ -135,8 +145,11 @@ class InnerRequest {

const upgradeRid = core.ops.op_upgrade_raw(slabId);

// TODO(mmastrac): remoteAddr
const conn = new TcpConn(upgradeRid, null, null);
const conn = new TcpConn(
upgradeRid,
underlyingConn?.remoteAddr,
underlyingConn?.localAddr,
);

return { response: UPGRADE_RESPONSE_SENTINEL, conn };
}
Expand Down Expand Up @@ -626,4 +639,6 @@ async function serve(arg1, arg2) {
}
}

internals.upgradeHttpRaw = upgradeHttpRaw;

export { serve };
9 changes: 1 addition & 8 deletions ext/http/01_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,6 @@ function upgradeHttp(req) {
return req[_deferred].promise;
}

function upgradeHttpRaw(req, tcpConn) {
const inner = toInnerRequest(req);
if (inner._wantsUpgrade) {
return inner._wantsUpgrade("upgradeHttpRaw", arguments);
}
}

const spaceCharCode = StringPrototypeCharCodeAt(" ", 0);
const tabCharCode = StringPrototypeCharCodeAt("\t", 0);
const commaCharCode = StringPrototypeCharCodeAt(",", 0);
Expand Down Expand Up @@ -563,4 +556,4 @@ function buildCaseInsensitiveCommaValueFinder(checkText) {
internals.buildCaseInsensitiveCommaValueFinder =
buildCaseInsensitiveCommaValueFinder;

export { _ws, HttpConn, serve, upgradeHttp, upgradeHttpRaw, upgradeWebSocket };
export { _ws, HttpConn, serve, upgradeHttp, upgradeWebSocket };
11 changes: 7 additions & 4 deletions ext/http/http_next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use hyper1::server::conn::http2;
use hyper1::service::service_fn;
use hyper1::service::HttpService;
use hyper1::upgrade::OnUpgrade;
use hyper1::upgrade::Upgraded;

use hyper1::StatusCode;
use pin_project::pin_project;
use pin_project::pinned_drop;
Expand All @@ -55,10 +55,10 @@ use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::rc::Rc;
use tokio::io::AsyncRead;

use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::ReadBuf;

use tokio::task::spawn_local;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -250,11 +250,13 @@ pub fn op_upgrade_raw(
})?;

let (read, write) = tokio::io::duplex(1024);
let (mut read_rx, mut write_tx) = tokio::io::split(read);
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);

spawn_local(async move {
let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();

// Stage 2: Extract the Upgraded connection
let mut buf = [0; 1024];
let upgraded = loop {
let read = Pin::new(&mut write_rx).read(&mut buf).await?;
Expand All @@ -271,6 +273,7 @@ pub fn op_upgrade_raw(
}
};

// Stage 3: Pump the data
let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);

spawn_local(async move {
Expand Down
1 change: 0 additions & 1 deletion runtime/js/90_deno_ns.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ const denoNsUnstable = {
funlock: fs.funlock,
funlockSync: fs.funlockSync,
upgradeHttp: http.upgradeHttp,
upgradeHttpRaw: http.upgradeHttpRaw,
serve: http.serve,
openKv: kv.openKv,
Kv: kv.Kv,
Expand Down

0 comments on commit 2e45e95

Please sign in to comment.