Skip to content

Commit

Permalink
Rework caching layers in ODSP driver (#4885)
Browse files Browse the repository at this point in the history
Follow up to #4872

Due to evicting blobs from cache, we force summarizer to read most of the blobs from storage without hitting driver cache
As result, SPO starts to throttle requests and app, i.e. it results in self-made DDOS attack.

Reworking cache layer and disabling cache eviction for now.
  • Loading branch information
vladsud authored Feb 1, 2021
1 parent 6c0f2d4 commit 8b3c5a1
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 69 deletions.
163 changes: 112 additions & 51 deletions packages/drivers/odsp-driver/src/odspDocumentStorageManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,109 @@ async function promiseRaceWithWinner<T>(promises: Promise<T>[]): Promise<{ index
});
}

export class OdspDocumentStorageService implements IDocumentStorageService {
private readonly blobCache: Map<string, IBlob | ArrayBuffer> = new Map();
private readonly treesCache: Map<string, ITree> = new Map();

class BlobCache {
// Save the timeout so we can cancel and reschedule it as needed
// private blobCacheTimeout: ReturnType<typeof setTimeout> | undefined;
private blobCacheTimeout: ReturnType<typeof setTimeout> | undefined;
// If the defer flag is set when the timeout fires, we'll reschedule rather than clear immediately
// This deferral approach is used (rather than clearing/resetting the timer) as current calling patterns trigger
// too many calls to setTimeout/clearTimeout.
// private deferBlobCacheClear: boolean = false;
private deferBlobCacheClear: boolean = false;

private readonly _blobCache: Map<string, IBlob | ArrayBuffer> = new Map();

// Tracks all blob IDs evicted from cache
private readonly blobsEvicted: Set<string> = new Set();

// Initial time-out to purge data from cache
// If this time out is very small, then we purge blobs from cache too soon and that results in a lot of
// requests to storage, which brings down perf and may trip protection limits causing 429s
// Also we need to ensure that buildCachesForDedup() is called with full cache for summarizer client to build
// its SHA cache for blobs (currently that happens as result of requesting snapshot tree)
private blobCacheTimeoutDuration = 2 * 60 * 1000;

// SPO does not keep old snapshots around for long, so we are running chances of not
// being able to rehydrate data store / DDS in the future if we purge anything (and with blob de-duping,
// even if blob read by runtime, it could be read again in the future)
// So for now, purging is disabled.
private readonly purgeEnabled = false;

public get value() {
return this._blobCache;
}

public addBlobs(blobs: IBlob[]) {
blobs.forEach((blob) => {
assert(blob.encoding === "base64" || blob.encoding === undefined);
this._blobCache.set(blob.id, blob);
});
// Reset the timer on cache set
this.scheduleClearBlobsCache();
}

/**
* Schedule a timer for clearing the blob cache or defer the current one.
*/
private scheduleClearBlobsCache() {
if (this.blobCacheTimeout !== undefined) {
// If we already have an outstanding timer, just signal that we should defer the clear
this.deferBlobCacheClear = true;
} else {
// If we don't have an outstanding timer, set a timer
// When the timer runs out, we'll decide whether to proceed with the cache clear or reset the timer
const clearCacheOrDefer = () => {
this.blobCacheTimeout = undefined;
if (this.deferBlobCacheClear) {
this.deferBlobCacheClear = false;
this.scheduleClearBlobsCache();
} else {
// NOTE: Slightly better algorithm here would be to purge either only big blobs,
// or sort them by size and purge enough big blobs to leave only 256Kb of small blobs in cache
// Purging is optimizing memory footprint. But count controls potential number of storage requests
// We want to optimize both - memory footprint and number of future requests to storage.
// Note that Container can realize data store or DDS on-demand at any point in time, so we do not
// control when blobs will be used.
if (this.purgeEnabled) {
this._blobCache.forEach((_, blobId) => this.blobsEvicted.add(blobId));
this._blobCache.clear();
}
}
};
this.blobCacheTimeout = setTimeout(clearCacheOrDefer, this.blobCacheTimeoutDuration);
// any future storage reads that get into the cache should be cleared from cache rather quickly -
// there is not much value in keeping them longer
this.blobCacheTimeoutDuration = 10 * 1000;
}
}

public getBlob(blobId: string) {
// Reset the timer on attempted cache read
this.scheduleClearBlobsCache();
const blobContent = this._blobCache.get(blobId);
const evicted = this.blobsEvicted.has(blobId);
return { blobContent, evicted };
}

public setBlob(blobId: string, blob: IBlob | ArrayBuffer) {
// This API is called as result of cache miss and reading blob from storage.
// Runtime never reads same blob twice.
// The only reason we may get read request for same blob is blob de-duping in summaries.
// Note that the bigger the size, the less likely blobs are the same, so there is very little benefit of caching big blobs.
// Images are the only exception - user may insert same image twice. But we currently do not de-dup them - only snapshot
// blobs are de-duped.
const size = blob instanceof ArrayBuffer ? blob.byteLength : blob.size;
if (size < 256 * 1024) {
// Reset the timer on cache set
this.scheduleClearBlobsCache();
return this._blobCache.set(blobId, blob);
} else {
// we evicted it here by not caching.
this.blobsEvicted.add(blobId);
}
}
}

export class OdspDocumentStorageService implements IDocumentStorageService {
private readonly treesCache: Map<string, ITree> = new Map();

private readonly attributesBlobHandles: Set<string> = new Set();

Expand All @@ -134,6 +227,8 @@ export class OdspDocumentStorageService implements IDocumentStorageService {
private readonly maxSnapshotSizeLimit = 500000000; // 500 MB
private readonly maxSnapshotFetchTimeout = 120000; // 2 min

private readonly blobCache = new BlobCache();

public set ops(ops: ISequencedDeltaOpMessage[] | undefined) {
assert(this._ops === undefined);
assert(ops !== undefined);
Expand Down Expand Up @@ -168,7 +263,7 @@ export class OdspDocumentStorageService implements IDocumentStorageService {
docId: this.documentId,
};

this.odspSummaryUploadManager = new OdspSummaryUploadManager(this.snapshotUrl, getStorageToken, logger, epochTracker, this.blobCache);
this.odspSummaryUploadManager = new OdspSummaryUploadManager(this.snapshotUrl, getStorageToken, logger, epochTracker);
}

public get repositoryUrl(): string {
Expand Down Expand Up @@ -208,10 +303,9 @@ export class OdspDocumentStorageService implements IDocumentStorageService {
return response.content;
}

public async readBlobCore(blobId: string): Promise<IBlob | ArrayBuffer> {
let blob = this.blobCache.get(blobId);
// Reset the timer on attempted cache read
this.scheduleClearBlobsCache();
private async readBlobCore(blobId: string): Promise<IBlob | ArrayBuffer> {
const { blobContent, evicted } = this.blobCache.getBlob(blobId);
let blob = blobContent;

if (blob === undefined) {
this.checkAttachmentGETUrl();
Expand All @@ -226,21 +320,22 @@ export class OdspDocumentStorageService implements IDocumentStorageService {
{
eventName: "readDataBlob",
blobId,
evicted,
headers: Object.keys(headers).length !== 0 ? true : undefined,
waitQueueLength: this.epochTracker.rateLimiter.waitQueueLength,
},
async (event) => {
const res = await this.epochTracker.fetchResponse(url, { headers }, "blob");
const blobContent = await res.arrayBuffer();
blob = await res.arrayBuffer();
event.end({
size: blobContent.byteLength,
size: blob.byteLength,
waitQueueLength: this.epochTracker.rateLimiter.waitQueueLength,
});
return blobContent;
return blob;
},
);
});
this.blobCache.set(blobId, blob);
this.blobCache.setBlob(blobId, blob);
}

if (!this.attributesBlobHandles.has(blobId)) {
Expand All @@ -265,10 +360,6 @@ export class OdspDocumentStorageService implements IDocumentStorageService {
size: content.length,
encoding: undefined, // string
};
this.blobCache.set(blobId, blobPatched);

// No need to patch it again
this.attributesBlobHandles.delete(blobId);

return blobPatched;
}
Expand Down Expand Up @@ -357,7 +448,7 @@ export class OdspDocumentStorageService implements IDocumentStorageService {
}

if (this.hostPolicy.summarizerClient) {
await this.odspSummaryUploadManager.buildCachesForDedup(finalTree);
await this.odspSummaryUploadManager.buildCachesForDedup(finalTree, this.blobCache.value);
}
return finalTree;
}
Expand Down Expand Up @@ -724,37 +815,7 @@ export class OdspDocumentStorageService implements IDocumentStorageService {
}

private initBlobsCache(blobs: IBlob[]) {
blobs.forEach((blob) => {
assert(blob.encoding === "base64" || blob.encoding === undefined);
this.blobCache.set(blob.id, blob);
});
this.scheduleClearBlobsCache();
}

/**
* Schedule a timer for clearing the blob cache or defer the current one.
*/
private scheduleClearBlobsCache() {
/*
if (this.blobCacheTimeout !== undefined) {
// If we already have an outstanding timer, just signal that we should defer the clear
this.deferBlobCacheClear = true;
} else {
// If we don't have an outstanding timer, set a timer
// When the timer runs out, we'll decide whether to proceed with the cache clear or reset the timer
const clearCacheOrDefer = () => {
this.blobCacheTimeout = undefined;
if (this.deferBlobCacheClear) {
this.deferBlobCacheClear = false;
this.scheduleClearBlobsCache();
} else {
this.blobCache.clear();
}
};
const blobCacheTimeoutDuration = 10000;
this.blobCacheTimeout = setTimeout(clearCacheOrDefer, blobCacheTimeoutDuration);
}
*/
this.blobCache.addBlobs(blobs);
}

private checkSnapshotUrl() {
Expand Down
15 changes: 9 additions & 6 deletions packages/drivers/odsp-driver/src/odspSummaryUploadManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,16 @@ export class OdspSummaryUploadManager {
private readonly getStorageToken: (options: TokenFetchOptions, name?: string) => Promise<string | null>,
private readonly logger: ITelemetryLogger,
private readonly epochTracker: EpochTracker,
private readonly blobCache: Map<string, IBlob | ArrayBuffer> = new Map(),
) {
}

/**
* Builts the caches which will be used for blob deduping.
* @param snapshotTree - snapshot tree from which the dedup caches are built.
*/
public async buildCachesForDedup(snapshotTree: api.ISnapshotTree) {
public async buildCachesForDedup(snapshotTree: api.ISnapshotTree, blobCache: Map<string, IBlob | ArrayBuffer>) {
const prefixedSnapshotTree = this.addAppPrefixToSnapshotTree(snapshotTree);
await this.buildCachesForDedupCore(prefixedSnapshotTree);
await this.buildCachesForDedupCore(prefixedSnapshotTree, blobCache);
this.previousBlobTreeDedupCaches = { ...this.blobTreeDedupCaches };
}

Expand All @@ -91,7 +90,11 @@ export class OdspSummaryUploadManager {
* @param snapshotTree - snapshot tree from which the dedup caches are built.
* @param path - path of the current node evaluated.
*/
private async buildCachesForDedupCore(snapshotTree: api.ISnapshotTree, path: string = ""): Promise<api.ISummaryTree> {
private async buildCachesForDedupCore(
snapshotTree: api.ISnapshotTree,
blobCache: Map<string, IBlob | ArrayBuffer>,
path: string = ""): Promise<api.ISummaryTree>
{
assert(Object.keys(snapshotTree.commits).length === 0, "There should not be commit tree entries in snapshot");

const summaryTree: api.ISummaryTree = {
Expand All @@ -102,7 +105,7 @@ export class OdspSummaryUploadManager {
// fullBlobPath does not start with "/"
const fullBlobPath = path === "" ? key : `${path}/${key}`;
let hash: string | undefined;
const blobValue = this.blobCache.get(value);
const blobValue = blobCache.get(value);
if (blobValue !== undefined) {
hash = await hashFile(
blobValue instanceof ArrayBuffer ?
Expand All @@ -124,7 +127,7 @@ export class OdspSummaryUploadManager {
for (const [key, tree] of Object.entries(snapshotTree.trees)) {
// fullTreePath does not start with "/"
const fullTreePath = path === "" ? key : `${path}/${key}`;
const subtree = await this.buildCachesForDedupCore(tree, fullTreePath);
const subtree = await this.buildCachesForDedupCore(tree, blobCache, fullTreePath);
this.blobTreeDedupCaches.treesPathToTree.set(fullTreePath, subtree);
summaryTree.tree[key] = subtree;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { ISummaryContext } from "@fluidframework/driver-definitions";
import { EpochTracker } from "../epochTracker";
import { LocalPersistentCache, LocalPersistentCacheAdapter } from "../odspCache";
import { IDedupCaches, OdspSummaryUploadManager } from "../odspSummaryUploadManager";
import { IBlob } from "../contracts";
import { TokenFetchOptions } from "../tokenFetch";
import { mockFetch } from "./mockFetch";

Expand All @@ -30,21 +31,17 @@ describe("Odsp Summary Upload Manager Tests", () => {
async (options: TokenFetchOptions, name?: string) => "token",
logger,
epochTracker,
new Map(),
);
});

it("Should populate caches properly", async () => {
odspSummaryUploadManager["blobCache"].set("blob1",
{ content: "blob1", id: "blob1", size: 5, byteLength: 1, encoding: undefined });
odspSummaryUploadManager["blobCache"].set("blob2",
{ content: "blob2", id: "blob2", size: 5, byteLength: 1, encoding: undefined });
odspSummaryUploadManager["blobCache"].set("blob3",
{ content: "blob2", id: "blob2", size: 5, byteLength: 1, encoding: undefined });
odspSummaryUploadManager["blobCache"].set("blob4",
{ content: "blob4", id: "blob4", size: 5, byteLength: 1, encoding: undefined });
odspSummaryUploadManager["blobCache"].set("blob5",
{ content: "blob5", id: "blob5", size: 5, byteLength: 1, encoding: undefined });
const blobCache = new Map<string, IBlob>();

blobCache.set("blob1", { content: "blob1", id: "blob1", size: 5, encoding: undefined });
blobCache.set("blob2", { content: "blob2", id: "blob2", size: 5, encoding: undefined });
blobCache.set("blob3", { content: "blob2", id: "blob2", size: 5, encoding: undefined });
blobCache.set("blob4", { content: "blob4", id: "blob4", size: 5, encoding: undefined });
blobCache.set("blob5", { content: "blob5", id: "blob5", size: 5, encoding: undefined });
const protocolTree: api.ISnapshotTree = {
blobs: {
blob1: "blob1",
Expand Down Expand Up @@ -78,7 +75,7 @@ describe("Odsp Summary Upload Manager Tests", () => {
},
};

await odspSummaryUploadManager.buildCachesForDedup(snapshotTree);
await odspSummaryUploadManager.buildCachesForDedup(snapshotTree, blobCache);

assert.strictEqual(odspSummaryUploadManager["blobTreeDedupCaches"].blobShaToPath.size, 4,
"4 blobs should be in cache as 4 blobs with different content");
Expand Down

0 comments on commit 8b3c5a1

Please sign in to comment.