Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An initial implementation of the cache API #428

Merged
merged 3 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/tre/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
"extends": "../../package.json"
},
"dependencies": {
"@types/http-cache-semantics": "^4.0.1",
"acorn": "^8.8.0",
"acorn-walk": "^8.2.0",
"capnp-ts": "^0.7.0",
Expand All @@ -53,6 +52,7 @@
"@types/debug": "^4.1.7",
"@types/estree": "^1.0.0",
"@types/glob-to-regexp": "^0.4.1",
"@types/stoppable": "^1.1.1"
"@types/stoppable": "^1.1.1",
"@types/http-cache-semantics": "^4.0.1"
}
}
10 changes: 3 additions & 7 deletions packages/tre/src/plugins/cache/errors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Response } from "undici";
import { HeadersInit, Response } from "undici";
import { CfHeader } from "../shared/constants";

enum Status {
Expand All @@ -19,17 +19,13 @@ export async function fallible<T>(promise: Promise<T>): Promise<T | Response> {
}

export class CacheError extends Error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do this for R2Error too, but this should probably extend MiniflareError (or HttpError?), and use code instead of status. MiniflareError will automatically set the name correctly too based off new.target.

Copy link
Contributor Author

@penalosa penalosa Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this over from R2Error, but the reason it uses status rather than code is because it is the http status code. R2Error has another property v4Code (or v4code depending on context, the API is maddeningly inconsistent), so using code there would be a bit confusing, I think. For consistency, it might make sense to keep this as status. I'll have a look at extending from MiniflareError though

status: number;
headers: [string, string][];
constructor(
status: number,
private status: number,
message: string,
headers: [string, string][] = []
readonly headers: HeadersInit = []
) {
super(message);
this.name = "CacheError";
this.status = status;
this.headers = headers;
}

toResponse() {
Expand Down
172 changes: 112 additions & 60 deletions packages/tre/src/plugins/cache/gateway.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,81 @@
import crypto from "crypto";
import http from "http";
import { AddressInfo } from "net";
import http from "node:http";
import CachePolicy from "http-cache-semantics";
import { Headers, Request, Response, fetch } from "undici";
import { Headers, HeadersInit, Request, Response, fetch } from "undici";
import { Clock, millisToSeconds } from "../../shared";
import { Storage } from "../../storage";
import { CacheMiss, PurgeFailure, StorageFailure } from "./errors";
import { _getRangeResponse } from "./range";

interface CacheMetadata {
value: Uint8Array;
metadata: {
headers: string[][];
status: number;
headers: string[][];
status: number;
}

function getExpiration(
clock: Clock,
req: Request,
res: Response
):
| {
storable: true;
expiration: number;
headers: HeadersInit;
}
| {
storable: false;
expiration: number | undefined;
headers: HeadersInit;
} {
// Cloudflare ignores request Cache-Control
const reqHeaders = normaliseHeaders(req.headers);
delete reqHeaders["cache-control"];

// Cloudflare never caches responses with Set-Cookie headers
// If Cache-Control contains private=set-cookie, Cloudflare will remove
// the Set-Cookie header automatically
const resHeaders = normaliseHeaders(res.headers);
if (
resHeaders["cache-control"]?.toLowerCase().includes("private=set-cookie")
) {
resHeaders["cache-control"] = resHeaders["cache-control"]
?.toLowerCase()
.replace(/private=set-cookie;?/i, "");
delete resHeaders["set-cookie"];
}

// Build request and responses suitable for CachePolicy
const cacheReq: CachePolicy.Request = {
url: req.url,
// If a request gets to the Cache service, it's method will be GET. See README.md for details
method: "GET",
headers: reqHeaders,
};
const cacheRes: CachePolicy.Response = {
status: res.status,
headers: resHeaders,
};

// @ts-expect-error `now` isn't included in CachePolicy's type definitions
const originalNow = CachePolicy.prototype.now;
// @ts-expect-error `now` isn't included in CachePolicy's type definitions
CachePolicy.prototype.now = clock;
try {
const policy = new CachePolicy(cacheReq, cacheRes, { shared: true });

return {
// Check if the request & response is cacheable
storable: policy.storable() && !("set-cookie" in resHeaders),
expiration: policy.timeToLive(),
// Cache Policy Headers is typed as [header: string]: string | string[] | undefined
// It's safe to ignore the undefined here, which is what casting to HeadersInit does
headers: policy.responseHeaders() as HeadersInit,
};
} finally {
// @ts-expect-error `now` isn't included in CachePolicy's type definitions
CachePolicy.prototype.now = originalNow;
}
}

// Normalises headers to object mapping lower-case names to single values.
Expand Down Expand Up @@ -90,10 +153,10 @@ function getMatchResponse(
return new Response(resBody, { status: resStatus, headers: resHeaders });
}

class CacheResponse implements CacheMetadata {
metadata: CacheMetadata["metadata"];
class CacheResponse {
metadata: CacheMetadata;
value: Uint8Array;
constructor(metadata: CacheMetadata["metadata"], value: Uint8Array) {
constructor(metadata: CacheMetadata, value: Uint8Array) {
penalosa marked this conversation as resolved.
Show resolved Hide resolved
this.metadata = metadata;
this.value = value;
}
Expand All @@ -111,52 +174,54 @@ interface ParsedHttpResponse {
body: Uint8Array;
}
class HttpParser {
server: http.Server;
responses: Record<string, Uint8Array> = {};
parsing: Promise<ParsedHttpResponse> =
Promise.resolve() as unknown as Promise<ParsedHttpResponse>;
connected: Promise<void>;
readonly server: http.Server;
readonly responses: Map<string, Uint8Array> = new Map();
readonly connected: Promise<void>;
constructor() {
this.server = http.createServer(this.listen.bind(this));
this.connected = new Promise((accept) => {
this.server.listen(0, "localhost", () => {
accept();
});
this.server.listen(0, "localhost", accept);
});
}
private listen(request: http.IncomingMessage, response: http.ServerResponse) {
if (request.url) {
response?.socket?.write(this.responses[request.url] ?? new Uint8Array());
response?.socket?.write(
this.responses.get(request.url) ?? new Uint8Array()
);
}
response.end();
}
public async parse(response: Uint8Array): Promise<ParsedHttpResponse> {
await this.connected;
// Since multiple parses can be in-flight at once, an identifier is needed
const id = `/${crypto.randomBytes(16).toString("hex")}`;
penalosa marked this conversation as resolved.
Show resolved Hide resolved
this.responses[id] = response;
this.responses.set(id, response);
const address = this.server.address()! as AddressInfo;
const parsedResponse = await fetch(`http://localhost:${address.port}${id}`);
const body = await parsedResponse.arrayBuffer();
delete this.responses[id];
return {
headers: parsedResponse.headers,
status: parsedResponse.status,
body: new Uint8Array(body),
};
try {
const parsedResponse = await fetch(
`http://localhost:${address.port}${id}`
);
const body = await parsedResponse.arrayBuffer();
return {
headers: parsedResponse.headers,
status: parsedResponse.status,
body: new Uint8Array(body),
};
} finally {
this.responses.delete(id);
}
}
}

export class CacheGateway {
parser: HttpParser;
constructor(private readonly storage: Storage) {
this.parser = new HttpParser();
}
parser: HttpParser = new HttpParser();
penalosa marked this conversation as resolved.
Show resolved Hide resolved
constructor(
private readonly storage: Storage,
private readonly clock: Clock
) {}

async match(request: Request): Promise<Response> {
const cached = await this.storage.get<CacheMetadata["metadata"]>(
request.url
);
const cached = await this.storage.get<CacheMetadata>(request.url);
if (!cached || !cached?.metadata) throw new CacheMiss();

const response = new CacheResponse(
Expand All @@ -165,47 +230,34 @@ export class CacheGateway {
).toResponse();
response.headers.set("CF-Cache-Status", "HIT");

const res = getMatchResponse(
return getMatchResponse(
request.headers,
cached.metadata.status,
response.headers,
cached.value
);
return res;
}

async put(request: Request, value: ArrayBuffer): Promise<Response> {
const response = await this.parser.parse(new Uint8Array(value));
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
const responseHeaders = Object.fromEntries([...response.headers.entries()]);
if (
responseHeaders["cache-control"]
?.toLowerCase()
.includes("private=set-cookie")
) {
responseHeaders["cache-control"] = responseHeaders[
"cache-control"
].replace(/private=set-cookie/i, "");
delete responseHeaders["set-cookie"];
}
const policy = new CachePolicy(
{ url: request.url, headers: normaliseHeaders(request.headers) },
{ ...response, headers: responseHeaders },
{ shared: true }
);

const headers = Object.entries(policy.responseHeaders()) as [
string,
string
][];

if (!policy.storable() || !!headers.find(([h]) => h == "set-cookie")) {
const { storable, expiration, headers } = getExpiration(
this.clock,
request,
new Response(response.body, {
status: response.status,
headers: response.headers,
})
);
if (!storable) {
throw new StorageFailure();
}

await this.storage.put<CacheMetadata["metadata"]>(request.url, {
await this.storage.put<CacheMetadata>(request.url, {
value: response.body,
penalosa marked this conversation as resolved.
Show resolved Hide resolved
expiration: millisToSeconds(this.clock() + expiration),
metadata: {
headers: headers,
headers: Object.entries(headers),
status: response.status,
},
});
Expand Down
8 changes: 3 additions & 5 deletions packages/tre/src/plugins/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ import { Worker_Binding } from "../../runtime";
import { SERVICE_LOOPBACK } from "../core";
import {
BINDING_SERVICE_LOOPBACK,
BINDING_TEXT_NAMESPACE,
BINDING_TEXT_PERSIST,
BINDING_TEXT_PLUGIN,
HEADER_PERSIST,
PersistenceSchema,
Plugin,
SCRIPT_PLUGIN_NAMESPACE_PERSIST,
} from "../shared";
import { CacheGateway } from "./gateway";
import { CacheRouter } from "./router";
Expand Down Expand Up @@ -41,17 +39,17 @@ export const CACHE_PLUGIN: Plugin<
router: CacheRouter,
options: CacheOptionsSchema,
sharedOptions: CacheSharedOptionsSchema,
async getBindings(options) {
getBindings() {
return [];
},
getServices({ options, sharedOptions }) {
getServices() {
const loopbackBinding: Worker_Binding = {
name: BINDING_SERVICE_LOOPBACK,
service: { name: SERVICE_LOOPBACK },
};
return [
{
name: `cache`,
name: "cache",
worker: {
serviceWorkerScript: CACHE_LOOPBACK_SCRIPT,
bindings: [
Expand Down
10 changes: 4 additions & 6 deletions packages/tre/src/plugins/cache/router.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import { Request, RequestInit, Response } from "undici";
import { Request, RequestInit } from "undici";
import {
DELETE,
CfHeader,
GET,
PURGE,
PUT,
RouteHandler,
Router,
decodePersist,
PURGE,
CfHeader,
} from "../shared";
import { CacheError, fallible } from "./errors";
import { fallible } from "./errors";
import { CacheGateway } from "./gateway";

export interface CacheParams {
Expand All @@ -23,7 +22,6 @@ export class CacheRouter extends Router<CacheGateway> {
const uri = decodeURIComponent(params.uri);
const persist = decodePersist(req.headers);
const ns = req.headers.get(CfHeader.CacheNamespace);
console.log(req.headers);
const gateway = this.gatewayFactory.get(
params.namespace + ns ? `:ns:${ns}` : `:default`,
persist
Expand Down
1 change: 1 addition & 0 deletions packages/tre/src/plugins/r2/errors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Response } from "undici";
import { HttpError } from "../../shared";
import { CfHeader } from "../shared/constants";
import { R2Object } from "./r2Object";

Expand Down
Loading