Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/curly-beans-exercise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/aws": patch
---

Improve composable cache performance
101 changes: 45 additions & 56 deletions packages/open-next/src/adapters/composable-cache.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,24 @@
import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";
import type { CacheValue } from "types/overrides";
import { writeTags } from "utils/cache";
import { fromReadableStream, toReadableStream } from "utils/stream";
import { debug } from "./logger";

const pendingWritePromiseMap = new Map<
string,
Promise<CacheValue<"composable">>
>();
const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>();

export default {
async get(cacheKey: string) {
try {
// We first check if we have a pending write for this cache key
// If we do, we return the pending promise instead of fetching the cache
if (pendingWritePromiseMap.has(cacheKey)) {
const stored = pendingWritePromiseMap.get(cacheKey);
if (stored) {
return stored.then((entry) => ({
...entry,
value: toReadableStream(entry.value),
}));
}
}
const stored = pendingWritePromiseMap.get(cacheKey);
if (stored) return stored;

const result = await globalThis.incrementalCache.get(
cacheKey,
"composable",
);
if (!result?.value?.value) {
return undefined;
}
if (!result?.value?.value) return undefined;

debug("composable cache result", result);

// We need to check if the tags associated with this entry has been revalidated
if (
globalThis.tagCache.mode === "nextMode" &&
result.value.tags.length > 0
Expand Down Expand Up @@ -69,73 +54,77 @@ export default {
},

async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {
const promiseEntry = pendingEntry.then(async (entry) => ({
...entry,
value: await fromReadableStream(entry.value),
}));
pendingWritePromiseMap.set(cacheKey, promiseEntry);
const teedPromise = pendingEntry.then((entry) => {
// Optimization: We avoid consuming and stringifying the stream here,
// because it creates double copies just to be discarded when this function
// ends. This avoids unnecessary memory usage, and reduces GC pressure.
const [stream1, stream2] = entry.value.tee();
return [
{ ...entry, value: stream1 },
{ ...entry, value: stream2 },
] as const;
});

const entry = await promiseEntry.finally(() => {
pendingWritePromiseMap.set(
cacheKey,
teedPromise.then(([entry]) => entry),
);

const [, entryForStorage] = await teedPromise.finally(() => {
pendingWritePromiseMap.delete(cacheKey);
});

await globalThis.incrementalCache.set(
cacheKey,
{
...entry,
value: entry.value,
...entryForStorage,
value: await fromReadableStream(entryForStorage.value),
},
"composable",
);

if (globalThis.tagCache.mode === "original") {
const storedTags = await globalThis.tagCache.getByPath(cacheKey);
const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag));
const tagsToWrite = [];
for (const tag of entryForStorage.tags) {
if (!storedTags.includes(tag)) {
tagsToWrite.push({ tag, path: cacheKey });
}
}
if (tagsToWrite.length > 0) {
await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey })));
await writeTags(tagsToWrite);
}
}
},

async refreshTags() {
// We don't do anything for now, do we want to do something here ???
return;
},
async refreshTags() {},

async getExpiration(...tags: string[]) {
if (globalThis.tagCache.mode === "nextMode") {
return globalThis.tagCache.getLastRevalidated(tags);
}
// We always return 0 here, original tag cache are handled directly in the get part
// TODO: We need to test this more, i'm not entirely sure that this is working as expected
return 0;
return globalThis.tagCache.mode === "nextMode"
? globalThis.tagCache.getLastRevalidated(tags)
: 0;
},

async expireTags(...tags: string[]) {
if (globalThis.tagCache.mode === "nextMode") {
return writeTags(tags);
}

const tagCache = globalThis.tagCache;
const revalidatedAt = Date.now();
// For the original mode, we have more work to do here.
// We need to find all paths linked to to these tags
const pathsToUpdate = await Promise.all(
tags.map(async (tag) => {
const paths = await tagCache.getByTag(tag);
return paths.map((path) => ({
path,
tag,
revalidatedAt,
}));
return paths.map((path) => ({ path, tag, revalidatedAt }));
}),
);
// We need to deduplicate paths, we use a set for that
const setToWrite = new Set<{ path: string; tag: string }>();

const dedupeMap = new Map();
for (const entry of pathsToUpdate.flat()) {
setToWrite.add(entry);
dedupeMap.set(`${entry.path}|${entry.tag}`, entry);
}
await writeTags(Array.from(setToWrite));
await writeTags(Array.from(dedupeMap.values()));
},

// This one is necessary for older versions of next
async receiveExpiredTags(...tags: string[]) {
// This function does absolutely nothing
return;
},
async receiveExpiredTags() {},
} satisfies ComposableCacheHandler;
10 changes: 3 additions & 7 deletions packages/open-next/src/utils/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@ export async function fromReadableStream(
return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8");
}

// Pre-allocate buffer with exact size to avoid reallocation
const buffer = Buffer.alloc(totalLength);
let offset = 0;
for (const chunk of chunks) {
buffer.set(chunk, offset);
offset += chunk.length;
}
// Use Buffer.concat which is more efficient than manual allocation and copy
// It handles the allocation and copy in optimized native code
const buffer = Buffer.concat(chunks, totalLength);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once I look at this with a different perspective, I think this might be faster, but I didn't run a separate benchmark for this. @jasnell @vicb


return buffer.toString(base64 ? "base64" : "utf8");
}
Expand Down
Loading