diff --git a/packages/drivers/odsp-driver/src/odspDocumentStorageManager.ts b/packages/drivers/odsp-driver/src/odspDocumentStorageManager.ts index f0c479dfd32f..511ce131905e 100644 --- a/packages/drivers/odsp-driver/src/odspDocumentStorageManager.ts +++ b/packages/drivers/odsp-driver/src/odspDocumentStorageManager.ts @@ -99,16 +99,109 @@ async function promiseRaceWithWinner(promises: Promise[]): Promise<{ index }); } -export class OdspDocumentStorageService implements IDocumentStorageService { - private readonly blobCache: Map = new Map(); - private readonly treesCache: Map = new Map(); - +class BlobCache { // Save the timeout so we can cancel and reschedule it as needed - // private blobCacheTimeout: ReturnType | undefined; + private blobCacheTimeout: ReturnType | undefined; // If the defer flag is set when the timeout fires, we'll reschedule rather than clear immediately // This deferral approach is used (rather than clearing/resetting the timer) as current calling patterns trigger // too many calls to setTimeout/clearTimeout. - // private deferBlobCacheClear: boolean = false; + private deferBlobCacheClear: boolean = false; + + private readonly _blobCache: Map = new Map(); + + // Tracks all blob IDs evicted from cache + private readonly blobsEvicted: Set = new Set(); + + // Initial time-out to purge data from cache + // If this time out is very small, then we purge blobs from cache too soon and that results in a lot of + // requests to storage, which brings down perf and may trip protection limits causing 429s + // Also we need to ensure that buildCachesForDedup() is called with full cache for summarizer client to build + // its SHA cache for blobs (currently that happens as result of requesting snapshot tree) + private blobCacheTimeoutDuration = 2 * 60 * 1000; + + // SPO does not keep old snapshots around for long, so we are running chances of not + // being able to rehydrate data store / DDS in the future if we purge anything (and with blob de-duping, + // even if blob read by runtime, it could be read again in the future) + // So for now, purging is disabled. + private readonly purgeEnabled = false; + + public get value() { + return this._blobCache; + } + + public addBlobs(blobs: IBlob[]) { + blobs.forEach((blob) => { + assert(blob.encoding === "base64" || blob.encoding === undefined); + this._blobCache.set(blob.id, blob); + }); + // Reset the timer on cache set + this.scheduleClearBlobsCache(); + } + + /** + * Schedule a timer for clearing the blob cache or defer the current one. + */ + private scheduleClearBlobsCache() { + if (this.blobCacheTimeout !== undefined) { + // If we already have an outstanding timer, just signal that we should defer the clear + this.deferBlobCacheClear = true; + } else { + // If we don't have an outstanding timer, set a timer + // When the timer runs out, we'll decide whether to proceed with the cache clear or reset the timer + const clearCacheOrDefer = () => { + this.blobCacheTimeout = undefined; + if (this.deferBlobCacheClear) { + this.deferBlobCacheClear = false; + this.scheduleClearBlobsCache(); + } else { + // NOTE: Slightly better algorithm here would be to purge either only big blobs, + // or sort them by size and purge enough big blobs to leave only 256Kb of small blobs in cache + // Purging is optimizing memory footprint. But count controls potential number of storage requests + // We want to optimize both - memory footprint and number of future requests to storage. + // Note that Container can realize data store or DDS on-demand at any point in time, so we do not + // control when blobs will be used. + if (this.purgeEnabled) { + this._blobCache.forEach((_, blobId) => this.blobsEvicted.add(blobId)); + this._blobCache.clear(); + } + } + }; + this.blobCacheTimeout = setTimeout(clearCacheOrDefer, this.blobCacheTimeoutDuration); + // any future storage reads that get into the cache should be cleared from cache rather quickly - + // there is not much value in keeping them longer + this.blobCacheTimeoutDuration = 10 * 1000; + } + } + + public getBlob(blobId: string) { + // Reset the timer on attempted cache read + this.scheduleClearBlobsCache(); + const blobContent = this._blobCache.get(blobId); + const evicted = this.blobsEvicted.has(blobId); + return { blobContent, evicted }; + } + + public setBlob(blobId: string, blob: IBlob | ArrayBuffer) { + // This API is called as result of cache miss and reading blob from storage. + // Runtime never reads same blob twice. + // The only reason we may get read request for same blob is blob de-duping in summaries. + // Note that the bigger the size, the less likely blobs are the same, so there is very little benefit of caching big blobs. + // Images are the only exception - user may insert same image twice. But we currently do not de-dup them - only snapshot + // blobs are de-duped. + const size = blob instanceof ArrayBuffer ? blob.byteLength : blob.size; + if (size < 256 * 1024) { + // Reset the timer on cache set + this.scheduleClearBlobsCache(); + return this._blobCache.set(blobId, blob); + } else { + // we evicted it here by not caching. + this.blobsEvicted.add(blobId); + } + } +} + +export class OdspDocumentStorageService implements IDocumentStorageService { + private readonly treesCache: Map = new Map(); private readonly attributesBlobHandles: Set = new Set(); @@ -134,6 +227,8 @@ export class OdspDocumentStorageService implements IDocumentStorageService { private readonly maxSnapshotSizeLimit = 500000000; // 500 MB private readonly maxSnapshotFetchTimeout = 120000; // 2 min + private readonly blobCache = new BlobCache(); + public set ops(ops: ISequencedDeltaOpMessage[] | undefined) { assert(this._ops === undefined); assert(ops !== undefined); @@ -168,7 +263,7 @@ export class OdspDocumentStorageService implements IDocumentStorageService { docId: this.documentId, }; - this.odspSummaryUploadManager = new OdspSummaryUploadManager(this.snapshotUrl, getStorageToken, logger, epochTracker, this.blobCache); + this.odspSummaryUploadManager = new OdspSummaryUploadManager(this.snapshotUrl, getStorageToken, logger, epochTracker); } public get repositoryUrl(): string { @@ -208,10 +303,9 @@ export class OdspDocumentStorageService implements IDocumentStorageService { return response.content; } - public async readBlobCore(blobId: string): Promise { - let blob = this.blobCache.get(blobId); - // Reset the timer on attempted cache read - this.scheduleClearBlobsCache(); + private async readBlobCore(blobId: string): Promise { + const { blobContent, evicted } = this.blobCache.getBlob(blobId); + let blob = blobContent; if (blob === undefined) { this.checkAttachmentGETUrl(); @@ -226,21 +320,22 @@ export class OdspDocumentStorageService implements IDocumentStorageService { { eventName: "readDataBlob", blobId, + evicted, headers: Object.keys(headers).length !== 0 ? true : undefined, waitQueueLength: this.epochTracker.rateLimiter.waitQueueLength, }, async (event) => { const res = await this.epochTracker.fetchResponse(url, { headers }, "blob"); - const blobContent = await res.arrayBuffer(); + blob = await res.arrayBuffer(); event.end({ - size: blobContent.byteLength, + size: blob.byteLength, waitQueueLength: this.epochTracker.rateLimiter.waitQueueLength, }); - return blobContent; + return blob; }, ); }); - this.blobCache.set(blobId, blob); + this.blobCache.setBlob(blobId, blob); } if (!this.attributesBlobHandles.has(blobId)) { @@ -265,10 +360,6 @@ export class OdspDocumentStorageService implements IDocumentStorageService { size: content.length, encoding: undefined, // string }; - this.blobCache.set(blobId, blobPatched); - - // No need to patch it again - this.attributesBlobHandles.delete(blobId); return blobPatched; } @@ -357,7 +448,7 @@ export class OdspDocumentStorageService implements IDocumentStorageService { } if (this.hostPolicy.summarizerClient) { - await this.odspSummaryUploadManager.buildCachesForDedup(finalTree); + await this.odspSummaryUploadManager.buildCachesForDedup(finalTree, this.blobCache.value); } return finalTree; } @@ -724,37 +815,7 @@ export class OdspDocumentStorageService implements IDocumentStorageService { } private initBlobsCache(blobs: IBlob[]) { - blobs.forEach((blob) => { - assert(blob.encoding === "base64" || blob.encoding === undefined); - this.blobCache.set(blob.id, blob); - }); - this.scheduleClearBlobsCache(); - } - - /** - * Schedule a timer for clearing the blob cache or defer the current one. - */ - private scheduleClearBlobsCache() { - /* - if (this.blobCacheTimeout !== undefined) { - // If we already have an outstanding timer, just signal that we should defer the clear - this.deferBlobCacheClear = true; - } else { - // If we don't have an outstanding timer, set a timer - // When the timer runs out, we'll decide whether to proceed with the cache clear or reset the timer - const clearCacheOrDefer = () => { - this.blobCacheTimeout = undefined; - if (this.deferBlobCacheClear) { - this.deferBlobCacheClear = false; - this.scheduleClearBlobsCache(); - } else { - this.blobCache.clear(); - } - }; - const blobCacheTimeoutDuration = 10000; - this.blobCacheTimeout = setTimeout(clearCacheOrDefer, blobCacheTimeoutDuration); - } - */ + this.blobCache.addBlobs(blobs); } private checkSnapshotUrl() { diff --git a/packages/drivers/odsp-driver/src/odspSummaryUploadManager.ts b/packages/drivers/odsp-driver/src/odspSummaryUploadManager.ts index 230248bcc978..f2e675d3f664 100644 --- a/packages/drivers/odsp-driver/src/odspSummaryUploadManager.ts +++ b/packages/drivers/odsp-driver/src/odspSummaryUploadManager.ts @@ -72,7 +72,6 @@ export class OdspSummaryUploadManager { private readonly getStorageToken: (options: TokenFetchOptions, name?: string) => Promise, private readonly logger: ITelemetryLogger, private readonly epochTracker: EpochTracker, - private readonly blobCache: Map = new Map(), ) { } @@ -80,9 +79,9 @@ export class OdspSummaryUploadManager { * Builts the caches which will be used for blob deduping. * @param snapshotTree - snapshot tree from which the dedup caches are built. */ - public async buildCachesForDedup(snapshotTree: api.ISnapshotTree) { + public async buildCachesForDedup(snapshotTree: api.ISnapshotTree, blobCache: Map) { const prefixedSnapshotTree = this.addAppPrefixToSnapshotTree(snapshotTree); - await this.buildCachesForDedupCore(prefixedSnapshotTree); + await this.buildCachesForDedupCore(prefixedSnapshotTree, blobCache); this.previousBlobTreeDedupCaches = { ...this.blobTreeDedupCaches }; } @@ -91,7 +90,11 @@ export class OdspSummaryUploadManager { * @param snapshotTree - snapshot tree from which the dedup caches are built. * @param path - path of the current node evaluated. */ - private async buildCachesForDedupCore(snapshotTree: api.ISnapshotTree, path: string = ""): Promise { + private async buildCachesForDedupCore( + snapshotTree: api.ISnapshotTree, + blobCache: Map, + path: string = ""): Promise + { assert(Object.keys(snapshotTree.commits).length === 0, "There should not be commit tree entries in snapshot"); const summaryTree: api.ISummaryTree = { @@ -102,7 +105,7 @@ export class OdspSummaryUploadManager { // fullBlobPath does not start with "/" const fullBlobPath = path === "" ? key : `${path}/${key}`; let hash: string | undefined; - const blobValue = this.blobCache.get(value); + const blobValue = blobCache.get(value); if (blobValue !== undefined) { hash = await hashFile( blobValue instanceof ArrayBuffer ? @@ -124,7 +127,7 @@ export class OdspSummaryUploadManager { for (const [key, tree] of Object.entries(snapshotTree.trees)) { // fullTreePath does not start with "/" const fullTreePath = path === "" ? key : `${path}/${key}`; - const subtree = await this.buildCachesForDedupCore(tree, fullTreePath); + const subtree = await this.buildCachesForDedupCore(tree, blobCache, fullTreePath); this.blobTreeDedupCaches.treesPathToTree.set(fullTreePath, subtree); summaryTree.tree[key] = subtree; } diff --git a/packages/drivers/odsp-driver/src/test/odspSummaryUploadManagerTests.spec.ts b/packages/drivers/odsp-driver/src/test/odspSummaryUploadManagerTests.spec.ts index ba750ea8d91c..0ba56b9c02c0 100644 --- a/packages/drivers/odsp-driver/src/test/odspSummaryUploadManagerTests.spec.ts +++ b/packages/drivers/odsp-driver/src/test/odspSummaryUploadManagerTests.spec.ts @@ -14,6 +14,7 @@ import { ISummaryContext } from "@fluidframework/driver-definitions"; import { EpochTracker } from "../epochTracker"; import { LocalPersistentCache, LocalPersistentCacheAdapter } from "../odspCache"; import { IDedupCaches, OdspSummaryUploadManager } from "../odspSummaryUploadManager"; +import { IBlob } from "../contracts"; import { TokenFetchOptions } from "../tokenFetch"; import { mockFetch } from "./mockFetch"; @@ -30,21 +31,17 @@ describe("Odsp Summary Upload Manager Tests", () => { async (options: TokenFetchOptions, name?: string) => "token", logger, epochTracker, - new Map(), ); }); it("Should populate caches properly", async () => { - odspSummaryUploadManager["blobCache"].set("blob1", - { content: "blob1", id: "blob1", size: 5, byteLength: 1, encoding: undefined }); - odspSummaryUploadManager["blobCache"].set("blob2", - { content: "blob2", id: "blob2", size: 5, byteLength: 1, encoding: undefined }); - odspSummaryUploadManager["blobCache"].set("blob3", - { content: "blob2", id: "blob2", size: 5, byteLength: 1, encoding: undefined }); - odspSummaryUploadManager["blobCache"].set("blob4", - { content: "blob4", id: "blob4", size: 5, byteLength: 1, encoding: undefined }); - odspSummaryUploadManager["blobCache"].set("blob5", - { content: "blob5", id: "blob5", size: 5, byteLength: 1, encoding: undefined }); + const blobCache = new Map(); + + blobCache.set("blob1", { content: "blob1", id: "blob1", size: 5, encoding: undefined }); + blobCache.set("blob2", { content: "blob2", id: "blob2", size: 5, encoding: undefined }); + blobCache.set("blob3", { content: "blob2", id: "blob2", size: 5, encoding: undefined }); + blobCache.set("blob4", { content: "blob4", id: "blob4", size: 5, encoding: undefined }); + blobCache.set("blob5", { content: "blob5", id: "blob5", size: 5, encoding: undefined }); const protocolTree: api.ISnapshotTree = { blobs: { blob1: "blob1", @@ -78,7 +75,7 @@ describe("Odsp Summary Upload Manager Tests", () => { }, }; - await odspSummaryUploadManager.buildCachesForDedup(snapshotTree); + await odspSummaryUploadManager.buildCachesForDedup(snapshotTree, blobCache); assert.strictEqual(odspSummaryUploadManager["blobTreeDedupCaches"].blobShaToPath.size, 4, "4 blobs should be in cache as 4 blobs with different content");