Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Miniflare 3] Implement new storage system #555

Merged
merged 7 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/tre/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -877,3 +877,4 @@ export * from "./plugins";
export * from "./runtime";
export * from "./shared";
export * from "./storage";
export * from "./storage2";
213 changes: 145 additions & 68 deletions packages/tre/src/plugins/kv/gateway.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { Clock, HttpError, Log, millisToSeconds } from "../../shared";
import { Storage, StoredKeyMeta, StoredValueMeta } from "../../storage";
import { ReadableStream, TransformStream } from "stream/web";
import {
Clock,
HttpError,
Log,
maybeApply,
millisToSeconds,
secondsToMillis,
} from "../../shared";
import { Storage } from "../../storage";
import { KeyValueStorage } from "../../storage2";
import {
MAX_KEY_SIZE,
MAX_LIST_KEYS,
Expand Down Expand Up @@ -47,57 +56,132 @@ function normaliseInt(value: string | number | undefined): number | undefined {
}
}

function createMaxValueSizeError(length: number) {
return new KVError(
413,
`Value length of ${length} exceeds limit of ${MAX_VALUE_SIZE}.`
);
}
penalosa marked this conversation as resolved.
Show resolved Hide resolved
class MaxLengthStream extends TransformStream<Uint8Array, Uint8Array> {
constructor(maxLength: number, errorFactory: (length: number) => Error) {
let length = 0;
super({
transform(chunk, controller) {
length += chunk.byteLength;
// If we exceeded the maximum length, don't enqueue the chunk as we'll
// be aborting the stream, but don't error just yet, so we get the
// correct final length in the error
if (length <= maxLength) controller.enqueue(chunk);
},
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
flush(controller) {
// If we exceeded the maximum length, abort the stream
if (length > maxLength) controller.error(errorFactory(length));
},
});
}
}

export interface KVGatewayGetOptions {
cacheTtl?: number;
}
export interface KVGatewayGetResult<Metadata = unknown> {
value: ReadableStream<Uint8Array>;
expiration?: number; // seconds since unix epoch
metadata?: Metadata;
}

export interface KVGatewayPutOptions<Meta = unknown> {
expiration?: string | number;
expirationTtl?: string | number;
metadata?: Meta;
export interface KVGatewayPutOptions<Metadata = unknown> {
expiration?: string | number; // seconds since unix epoch
expirationTtl?: string | number; // seconds relative to now
metadata?: Metadata;
valueLengthHint?: number;
}

export interface KVGatewayListOptions {
limit?: number;
prefix?: string;
cursor?: string;
}
export interface KVGatewayListResult<Meta = unknown> {
keys: StoredKeyMeta<Meta>[];
cursor?: string;
list_complete: boolean;
export interface KVGatewayListKey {
name: string;
expiration?: number; // seconds since unix epoch
metadata?: string; // JSON-stringified metadata
}
export type KVGatewayListResult = {
keys: KVGatewayListKey[];
} & (
| { list_complete: false; cursor: string }
| { list_complete: true; cursor: undefined }
);

export function validateGetOptions(
key: string,
options?: KVGatewayGetOptions
): void {
validateKey(key);
// Validate cacheTtl, but ignore it as there's only one "edge location":
// the user's computer
Comment on lines +122 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if at some point we might want to add some random jitter to KV writes, to simulate the real consistency guarantees?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've so far avoided adding random failures to Miniflare, because I wanted tests using it to be as deterministic as possible, but it would definitely be interesting to explore. Could try simulate things like transient network failures too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep it deterministic, the API could perhaps include a way to set the next operation (specific or any) to fail the next one or more times?

const cacheTtl = options?.cacheTtl;
if (cacheTtl !== undefined && (isNaN(cacheTtl) || cacheTtl < MIN_CACHE_TTL)) {
throw new KVError(
400,
`Invalid ${PARAM_CACHE_TTL} of ${cacheTtl}. Cache TTL must be at least ${MIN_CACHE_TTL}.`
);
}
}

export function validateListOptions(options: KVGatewayListOptions): void {
// Validate key limit
const limit = options.limit;
if (limit !== undefined) {
if (isNaN(limit) || limit < 1) {
throw new KVError(
400,
`Invalid ${PARAM_LIST_LIMIT} of ${limit}. Please specify an integer greater than 0.`
);
}
if (limit > MAX_LIST_KEYS) {
throw new KVError(
400,
`Invalid ${PARAM_LIST_LIMIT} of ${limit}. Please specify an integer less than ${MAX_LIST_KEYS}.`
);
}
}

// Validate key prefix
const prefix = options.prefix;
if (prefix !== undefined) validateKeyLength(prefix);
}

export class KVGateway {
private readonly storage: KeyValueStorage;

constructor(
private readonly log: Log,
private readonly storage: Storage,
legacyStorage: Storage,
private readonly clock: Clock
) {}
) {
const storage = legacyStorage.getNewStorage();
this.storage = new KeyValueStorage(storage, clock);
}

async get(
async get<Metadata = unknown>(
key: string,
options?: KVGatewayGetOptions
): Promise<StoredValueMeta | undefined> {
validateKey(key);
// Validate cacheTtl, but ignore it as there's only one "edge location":
// the user's computer
const cacheTtl = options?.cacheTtl;
if (
cacheTtl !== undefined &&
(isNaN(cacheTtl) || cacheTtl < MIN_CACHE_TTL)
) {
throw new KVError(
400,
`Invalid ${PARAM_CACHE_TTL} of ${cacheTtl}. Cache TTL must be at least ${MIN_CACHE_TTL}.`
);
}
return this.storage.get(key, false, cacheTtl);
): Promise<KVGatewayGetResult<Metadata> | undefined> {
validateGetOptions(key, options);
const entry = await this.storage.get(key);
if (entry === null) return;
return {
value: entry.value,
expiration: maybeApply(millisToSeconds, entry.expiration),
metadata: entry.metadata as Metadata,
};
}

async put(
key: string,
value: Uint8Array,
value: ReadableStream<Uint8Array>,
options: KVGatewayPutOptions = {}
): Promise<void> {
validateKey(key);
Expand Down Expand Up @@ -135,13 +219,7 @@ export class KVGateway {
}
}

// Validate value and metadata size
if (value.byteLength > MAX_VALUE_SIZE) {
throw new KVError(
413,
`Value length of ${value.byteLength} exceeds limit of ${MAX_VALUE_SIZE}.`
);
}
// Validate metadata size
if (options.metadata !== undefined) {
const metadataJSON = JSON.stringify(options.metadata);
const metadataLength = Buffer.byteLength(metadataJSON);
Expand All @@ -153,9 +231,25 @@ export class KVGateway {
}
}

return this.storage.put(key, {
// Validate value size
const valueLengthHint = options.valueLengthHint;
if (valueLengthHint !== undefined && valueLengthHint > MAX_VALUE_SIZE) {
// If we know the size of the value (i.e. from `Content-Length`) use that
throw createMaxValueSizeError(valueLengthHint);
} else {
// Otherwise, pipe through a transform stream that counts the number of
// bytes and errors if it exceeds the max. This error will be thrown
// within the `storage.put()` call below and will be propagated up to the
// caller.
value = value.pipeThrough(
new MaxLengthStream(MAX_VALUE_SIZE, createMaxValueSizeError)
);
}

return this.storage.put({
key,
value,
expiration,
expiration: maybeApply(secondsToMillis, expiration),
metadata: options.metadata,
});
}
Expand All @@ -166,36 +260,19 @@ export class KVGateway {
}

async list(options: KVGatewayListOptions = {}): Promise<KVGatewayListResult> {
// Validate key limit
const limit = options.limit ?? MAX_LIST_KEYS;
if (isNaN(limit) || limit < 1) {
throw new KVError(
400,
`Invalid ${PARAM_LIST_LIMIT} of ${limit}. Please specify an integer greater than 0.`
);
}
if (limit > MAX_LIST_KEYS) {
throw new KVError(
400,
`Invalid ${PARAM_LIST_LIMIT} of ${limit}. Please specify an integer less than ${MAX_LIST_KEYS}.`
);
}

// Validate key prefix
const prefix = options.prefix;
if (prefix !== undefined) validateKeyLength(prefix);

const cursor = options.cursor;
validateListOptions(options);
const { limit = MAX_LIST_KEYS, prefix, cursor } = options;
const res = await this.storage.list({ limit, prefix, cursor });
return {
keys: res.keys.map((key) => ({
...key,
// workerd expects metadata to be a JSON-serialised string
metadata:
key.metadata === undefined ? undefined : JSON.stringify(key.metadata),
})),
cursor: res.cursor === "" ? undefined : res.cursor,
list_complete: res.cursor === "",
};
const keys = res.keys.map<KVGatewayListKey>((key) => ({
name: key.key,
expiration: maybeApply(millisToSeconds, key.expiration),
// workerd expects metadata to be a JSON-serialised string
metadata: maybeApply(JSON.stringify, key.metadata),
}));
if (res.cursor === undefined) {
return { keys, list_complete: true, cursor: undefined };
} else {
return { keys, list_complete: false, cursor: res.cursor };
}
}
}
70 changes: 51 additions & 19 deletions packages/tre/src/plugins/kv/router.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from "assert";
import { Headers, Response } from "../../http";
import {
DELETE,
Expand All @@ -19,7 +20,15 @@ import {
PARAM_LIST_PREFIX,
PARAM_URL_ENCODED,
} from "./constants";
import { KVError, KVGateway } from "./gateway";
import {
KVError,
KVGateway,
KVGatewayGetOptions,
KVGatewayGetResult,
KVGatewayListOptions,
KVGatewayListResult,
} from "./gateway";
import { sitesGatewayGet, sitesGatewayList } from "./sites";

export interface KVParams {
namespace: string;
Expand All @@ -44,20 +53,24 @@ export class KVRouter extends Router<KVGateway> {
get: RouteHandler<KVParams> = async (req, params, url) => {
// Get gateway with (persistent) storage
const persist = decodePersist(req.headers);
// Workers Sites: if this is a sites request, persist should be used as the
// root without any additional namespace
const namespace =
req.headers.get(HEADER_SITES) === null ? params.namespace : "";
const gateway = this.gatewayFactory.get(namespace, persist);

// Decode URL parameters
const key = decodeKey(params, url.searchParams);
const cacheTtlParam = url.searchParams.get(PARAM_CACHE_TTL);
const cacheTtl =
cacheTtlParam === null ? undefined : parseInt(cacheTtlParam);
const options: KVGatewayGetOptions = {
cacheTtl: cacheTtlParam === null ? undefined : parseInt(cacheTtlParam),
};

// Get value from storage
const value = await gateway.get(key, { cacheTtl });
let value: KVGatewayGetResult | undefined;
if (req.headers.get(HEADER_SITES) === null) {
const gateway = this.gatewayFactory.get(params.namespace, persist);
value = await gateway.get(key, options);
} else {
// Workers Sites: if this is a sites request, persist should be used as
// the root without any additional namespace
value = await sitesGatewayGet(persist, key, options);
}
if (value === undefined) throw new KVError(404, "Not Found");

// Return value in runtime-friendly format
Expand Down Expand Up @@ -88,9 +101,24 @@ export class KVRouter extends Router<KVGateway> {
const metadata =
metadataHeader === null ? undefined : JSON.parse(metadataHeader);

// Read body and put value into storage
const value = new Uint8Array(await req.arrayBuffer());
await gateway.put(key, value, { expiration, expirationTtl, metadata });
// Put value into storage
const value = req.body;
assert(value !== null);

// If we know the value length, avoid passing the body through a transform
// stream to count it (trusting `workerd` to send correct value here).
// Safety of `!`: `parseInt(null)` is `NaN`
const contentLength = parseInt(req.headers.get("Content-Length")!);
const valueLengthHint = Number.isNaN(contentLength)
? undefined
: contentLength;

await gateway.put(key, value, {
expiration,
expirationTtl,
metadata,
valueLengthHint,
});

return new Response();
};
Expand All @@ -113,20 +141,24 @@ export class KVRouter extends Router<KVGateway> {
list: RouteHandler<Omit<KVParams, "key">> = async (req, params, url) => {
// Get gateway with (persistent) storage
const persist = decodePersist(req.headers);
// Workers Sites: if this is a sites request, persist should be used as the
// root without any additional namespace
const namespace =
req.headers.get(HEADER_SITES) === null ? params.namespace : "";
const gateway = this.gatewayFactory.get(namespace, persist);

// Decode URL parameters
const limitParam = url.searchParams.get(PARAM_LIST_LIMIT);
const limit = limitParam === null ? undefined : parseInt(limitParam);
const prefix = url.searchParams.get(PARAM_LIST_PREFIX) ?? undefined;
const cursor = url.searchParams.get(PARAM_LIST_CURSOR) ?? undefined;
const options: KVGatewayListOptions = { limit, prefix, cursor };

// List keys from storage
const res = await gateway.list({ limit, prefix, cursor });
return Response.json(res);
let result: KVGatewayListResult;
if (req.headers.get(HEADER_SITES) === null) {
const gateway = this.gatewayFactory.get(params.namespace, persist);
result = await gateway.list(options);
} else {
// Workers Sites: if this is a sites request, persist should be used as
// the root without any additional namespace
result = await sitesGatewayList(persist, options);
}
return Response.json(result);
};
}
Loading