Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): implement cloud.Bucket inflight method signedUrl for sim target #7137

Merged
merged 17 commits into from
Sep 17, 2024
47 changes: 5 additions & 42 deletions packages/@winglang/sdk/src/target-sim/api.inflight.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as fs from "fs";
import { Server } from "http";
import { AddressInfo, Socket } from "net";
import { join } from "path";
import express from "express";
import { IEventPublisher } from "./event-mapping";
Expand All @@ -10,7 +9,7 @@ import {
ApiRoute,
EventSubscription,
} from "./schema-resources";
import { exists } from "./util";
import { exists, isPortAvailable, listenExpress } from "./util";
import {
API_FQN,
ApiRequest,
Expand All @@ -28,8 +27,6 @@ import {
} from "../simulator/simulator";
import { LogLevel, Json, TraceType } from "../std";

const LOCALHOST_ADDRESS = "127.0.0.1";

const STATE_FILENAME = "state.json";

/**
Expand Down Expand Up @@ -114,21 +111,10 @@ export class Api
}
}

// `server.address()` returns `null` until the server is listening
// on a port. We use a promise to wait for the server to start
// listening before returning the URL.
const addrInfo: AddressInfo = await new Promise((resolve, reject) => {
this.server = this.app.listen(lastPort ?? 0, LOCALHOST_ADDRESS, () => {
const addr = this.server?.address();
if (addr && typeof addr === "object" && (addr as AddressInfo).port) {
resolve(addr);
} else {
reject(new Error("No address found"));
}
});
});
this.port = addrInfo.port;
this.url = `http://${addrInfo.address}:${addrInfo.port}`;
const { server, address } = await listenExpress(this.app, lastPort);
this.server = server;
this.port = address.port;
this.url = `http://${address.address}:${address.port}`;

this.addTrace(`Server listening on ${this.url}`, LogLevel.VERBOSE);

Expand Down Expand Up @@ -343,26 +329,3 @@ function asyncMiddleware(
Promise.resolve(fn(req, res, next)).catch(next);
};
}

async function isPortAvailable(port: number): Promise<boolean> {
return new Promise((resolve, _reject) => {
const s = new Socket();
s.once("error", (err) => {
s.destroy();
if ((err as any).code !== "ECONNREFUSED") {
resolve(false);
} else {
// connection refused means the port is not used
resolve(true);
}
});

s.once("connect", () => {
s.destroy();
// connection successful means the port is used
resolve(false);
});

s.connect(port, LOCALHOST_ADDRESS);
});
}
223 changes: 212 additions & 11 deletions packages/@winglang/sdk/src/target-sim/bucket.inflight.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import * as crypto from "crypto";
import * as fs from "fs";
import { Server } from "http";
import { dirname, join } from "path";
import * as url from "url";
import { pathToFileURL } from "url";
import express from "express";
import mime from "mime-types";
import { BucketAttributes, BucketSchema } from "./schema-resources";
import { exists } from "./util";
import { exists, isPortAvailable, listenExpress } from "./util";
import {
ITopicClient,
BucketSignedUrlOptions,
Expand All @@ -16,6 +18,8 @@ import {
BucketGetOptions,
BucketTryGetOptions,
BUCKET_FQN,
BucketSignedUrlAction,
CorsHeaders,
} from "../cloud";
import { deserialize, serialize } from "../simulator/serialization";
import {
Expand All @@ -27,19 +31,133 @@ import { Datetime, Json, LogLevel, TraceType } from "../std";

export const METADATA_FILENAME = "metadata.json";

const STATE_FILENAME = "state.json";

/**
* Contents of the state file for this resource.
*/
interface StateFileContents {
/**
* The last port used by the API server on a previous simulator run.
*/
readonly lastPort?: number;
}

export class Bucket implements IBucketClient, ISimulatorResourceInstance {
private _fileDir!: string;
private _context: ISimulatorContext | undefined;
private readonly initialObjects: Record<string, string>;
private readonly _public: boolean;
private readonly topicHandlers: Partial<Record<BucketEventType, string>>;
private _metadata: Map<string, ObjectMetadata>;
private readonly app: express.Application;
private server: Server | undefined;
private url: string | undefined;
private port: number | undefined;

public constructor(props: BucketSchema) {
this.initialObjects = props.initialObjects ?? {};
this._public = props.public ?? false;
this.topicHandlers = props.topics;
this._metadata = new Map();

this.app = express();

// Enable cors for all requests.
this.app.use((req, res, next) => {
const corsHeaders: CorsHeaders = {
defaultResponse: {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, PUT",
"Access-Control-Allow-Headers": "*",
},
optionsResponse: {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, PUT",
"Access-Control-Allow-Headers": "*",
"Access-Control-Max-Age": "86400",
},
};
const method =
req.method && req.method.toUpperCase && req.method.toUpperCase();

if (method === "OPTIONS") {
for (const [key, value] of Object.entries(
corsHeaders.optionsResponse
)) {
res.setHeader(key, value);
}
res.status(204).send();
} else {
for (const [key, value] of Object.entries(
corsHeaders.defaultResponse
)) {
res.setHeader(key, value);
}
next();
}
});

// Handle signed URL uploads.
this.app.put("*", (req, res) => {
const action = req.query.action;
Chriscbr marked this conversation as resolved.
Show resolved Hide resolved
if (action !== BucketSignedUrlAction.UPLOAD) {
return res.status(403).send("Operation not allowed");
}

const validUntil = req.query.validUntil?.toString();
if (!validUntil || Date.now() > parseInt(validUntil)) {
return res.status(403).send("Signed URL has expired");
}

const key = req.path.slice(1); // remove leading slash
const hash = this.hashKey(key);
const filename = join(this._fileDir, hash);

const actionType: BucketEventType = this._metadata.has(key)
? BucketEventType.UPDATE
: BucketEventType.CREATE;

const contentType = req.header("content-type");
if (contentType?.startsWith("multipart/form-data")) {
return res.status(400).send("Multipart uploads not supported");
}

const fileStream = fs.createWriteStream(filename);
req.pipe(fileStream);

fileStream.on("error", () => {
res.status(500).send("Failed to save the file.");
});

fileStream.on("finish", () => {
void this.updateMetadataAndNotify(key, actionType, contentType).then(
() => {
res.status(200).send();
}
);
});

return;
});

// Handle signed URL downloads.
this.app.get("*", (req, res) => {
const action = req.query.action;
if (action !== BucketSignedUrlAction.DOWNLOAD) {
return res.status(403).send("Operation not allowed");
}

const validUntil = req.query.validUntil?.toString();
if (!validUntil || Date.now() > parseInt(validUntil)) {
return res.status(403).send("Signed URL has expired");
}

const key = req.path.slice(1); // remove leading slash
const hash = this.hashKey(key);
const filename = join(this._fileDir, hash);
return res.download(filename);
});
}

private get context(): ISimulatorContext {
Expand Down Expand Up @@ -86,22 +204,79 @@ export class Bucket implements IBucketClient, ISimulatorResourceInstance {
});
}

return {};
// Check for a previous state file to see if there was a port that was previously being used
// if so, try to use it out of convenience
let lastPort: number | undefined;
const state: StateFileContents = await this.loadState();
if (state.lastPort) {
const portAvailable = await isPortAvailable(state.lastPort);
if (portAvailable) {
lastPort = state.lastPort;
}
}

const { server, address } = await listenExpress(this.app, lastPort);
this.server = server;
this.port = address.port;
this.url = `http://${address.address}:${address.port}`;

this.addTrace(`Server listening on ${this.url}`, LogLevel.VERBOSE);

return {
url: this.url,
};
}

public async cleanup(): Promise<void> {}
public async cleanup(): Promise<void> {
this.addTrace(`Closing server on ${this.url}`, LogLevel.VERBOSE);

return new Promise((resolve, reject) => {
this.server?.close((err) => {
if (err) {
return reject(err);
}

this.server?.closeAllConnections();
return resolve();
});
});
}

public async plan() {
return UpdatePlan.AUTO;
}

private async loadState(): Promise<StateFileContents> {
const stateFileExists = await exists(
join(this.context.statedir, STATE_FILENAME)
);
if (stateFileExists) {
const stateFileContents = await fs.promises.readFile(
join(this.context.statedir, STATE_FILENAME),
"utf-8"
);
return JSON.parse(stateFileContents);
} else {
return {};
}
}

private async saveState(state: StateFileContents): Promise<void> {
fs.writeFileSync(
join(this.context.statedir, STATE_FILENAME),
JSON.stringify(state)
);
}

public async save(): Promise<void> {
// no need to save individual files, since they are already persisted in the state dir
// during the bucket's lifecycle
fs.writeFileSync(
join(this.context.statedir, METADATA_FILENAME),
serialize(Array.from(this._metadata.entries())) // metadata contains Datetime values, so we need to serialize it
);

await this.saveState({ lastPort: this.port });
}

private async notifyListeners(
Expand Down Expand Up @@ -291,19 +466,36 @@ export class Bucket implements IBucketClient, ISimulatorResourceInstance {
);
}

return url.pathToFileURL(filePath).href;
return pathToFileURL(filePath).href;
},
});
}

public async signedUrl(key: string, options?: BucketSignedUrlOptions) {
options;
return this.context.withTrace({
message: `Signed URL (key=${key})`,
message: `Signed URL (key=${key}).`,
activity: async () => {
throw new Error(
`signedUrl is not implemented yet for the simulator (key=${key})`
const action = options?.action ?? BucketSignedUrlAction.DOWNLOAD;
// BUG: The `options?.duration` is supposed to be an instance of `Duration` but it is not. It's just
// a POJO with seconds, but TypeScript thinks otherwise.
const duration = options?.duration?.seconds ?? 900;

if (
action === BucketSignedUrlAction.DOWNLOAD &&
!(await this.exists(key))
) {
throw new Error(
`Cannot provide signed url for a non-existent key (key=${key})`
);
}

const url = new URL(key, this.url);
url.searchParams.set("action", action);
url.searchParams.set(
"validUntil",
String(Date.now() + duration * 1000)
);
return url.toString();
},
});
}
Expand Down Expand Up @@ -370,10 +562,19 @@ export class Bucket implements IBucketClient, ISimulatorResourceInstance {
await fs.promises.mkdir(dirName, { recursive: true });
await fs.promises.writeFile(filename, value);

await this.updateMetadataAndNotify(key, actionType, contentType);
}

private async updateMetadataAndNotify(
key: string,
actionType: BucketEventType,
contentType?: string
): Promise<void> {
const hash = this.hashKey(key);
const filename = join(this._fileDir, hash);
const filestat = await fs.promises.stat(filename);
const determinedContentType =
(contentType ?? mime.lookup(key)) || "application/octet-stream";

contentType ?? (mime.lookup(key) || "application/octet-stream");
this._metadata.set(key, {
size: filestat.size,
lastModified: Datetime.fromDate(filestat.mtime),
Expand Down
Loading
Loading