From c9cfcd1faeb6e5b6e834f2c784b431110670cd75 Mon Sep 17 00:00:00 2001 From: Vlad Sudzilouski Date: Sat, 13 Mar 2021 20:07:13 -0800 Subject: [PATCH] Implement parallel ops requesting (#5407) Implements and closes #5393 Implement ability to to do multiple parallel requests to storage to speed up retrieval of ops. This change implements ability to do concurrent fetches. It does not change behaviour of DeltaManager for now, and only enables this new capability in fetch-tool with 4 parallel requests by 20K ops. Future work in this area is tracked by issue #5523. This new capability will help in following scenarios: - Boot from cached (stale) snapshot (in future) - Being offline for a day - week (in future) - fetching a lot of ops using fetch-tool (in this PR) IN future, I'll refactor this approach into an adapter layer that drivers could use to implement parallel requests (or not), as well as their batching strategy. An API that will be exposed from driver will be an IReadPipe API and full (including infinite) request, where DeltaManager no longer would control batching or parallelism. It's easy to start with no parallelism and add concurrency (workers) later. We need to explore that such that we start with one request and add more parallel requests only when we know we are far behind. That will reduce pressure on storage (extra empty requests for nothing). --- .../container-loader/src/deltaManager.ts | 179 +++++---- packages/loader/driver-utils/src/index.ts | 1 + .../driver-utils/src/parallelRequests.ts | 341 ++++++++++++++++++ .../src/test/parallelRequests.spec.ts | 170 +++++++++ packages/tools/fetch-tool/package.json | 1 + packages/tools/fetch-tool/src/fluidFetch.ts | 9 +- .../fetch-tool/src/fluidFetchMessages.ts | 65 ++-- packages/utils/telemetry-utils/src/logger.ts | 38 ++ 8 files changed, 698 insertions(+), 106 deletions(-) create mode 100644 packages/loader/driver-utils/src/parallelRequests.ts create mode 100644 packages/loader/driver-utils/src/test/parallelRequests.spec.ts diff --git a/packages/loader/container-loader/src/deltaManager.ts b/packages/loader/container-loader/src/deltaManager.ts index d857f886b2c6..481a8621bf2f 100644 --- a/packages/loader/container-loader/src/deltaManager.ts +++ b/packages/loader/container-loader/src/deltaManager.ts @@ -49,6 +49,7 @@ import { createWriteError, createGenericNetworkError, getRetryDelayFromError, + ParallelRequests, } from "@fluidframework/driver-utils"; import { CreateContainerError, @@ -785,98 +786,130 @@ export class DeltaManager private async getDeltas( telemetryEventSuffix: string, - fromInitial: number, - to: number | undefined, - callback: (messages: ISequencedDocumentMessage[]) => void) { - let retry: number = 0; - let from: number = fromInitial; - let deltas: ISequencedDocumentMessage[] = []; - let deltasRetrievedTotal = 0; - + from: number, // exclusive + to: number | undefined, // exclusive + callback: (messages: ISequencedDocumentMessage[]) => void) + { const docService = this.serviceProvider(); if (docService === undefined) { throw new Error("Delta manager is not attached"); } + if (this.deltaStorageP === undefined) { + this.deltaStorageP = docService.connectToDeltaStorage(); + } + const telemetryEvent = PerformanceEvent.start(this.logger, { eventName: `GetDeltas_${telemetryEventSuffix}`, from, to, }); + let deltasRetrievedTotal = 0; let requests = 0; + + let lastFetch: number | undefined; + + const manager = new ParallelRequests( + from + 1, // from is exclusive, but ParallelRequests uses inclusive left + to, // exclusive right + MaxBatchDeltas, + this.logger, + async (request: number, _from: number, _to: number, strongTo: boolean) => { + requests++; + return this.getSingleOpBatch(request, _from, _to, telemetryEvent, strongTo); + }, + (deltas: ISequencedDocumentMessage[]) => { + deltasRetrievedTotal += deltas.length; + lastFetch = deltas[deltas.length - 1].sequenceNumber; + PerformanceEvent.timedExec( + this.logger, + { eventName: "GetDeltas_OpProcessing", count: deltas.length}, + () => callback(deltas), + { end: true, cancel: "error" }); + }, + ); + + // Staging: starting with no concurrency, listening for feedback first. + // In future releases we will switch to actual concurrency + await manager.run(1 /* concurrency */); + + telemetryEvent.end({ + lastFetch, + deltasRetrievedTotal, + requests, + lastQueuedSequenceNumber: this.lastQueuedSequenceNumber, + }); + } + + /** + * Retrieve single batch of ops + * @param request - request index + * @param from - inclusive boundary + * @param to - exclusive boundary + * @param telemetryEvent - telemetry event used to track consecutive batch of requests + * @param strongTo - tells if ops in range from...to have to be there and have to be retrieved. + * If false, returning less ops would mean we reached end of file. + * @returns - an object with resulting ops and cancellation / partial result flags + */ + async getSingleOpBatch( + request: number, + from: number, + to: number, + telemetryEvent: PerformanceEvent, + strongTo: boolean): + Promise<{ partial: boolean, cancel: boolean, payload: ISequencedDocumentMessage[] }> + { let deltaStorage: IDocumentDeltaStorageService | undefined; let lastSuccessTime: number | undefined; - while (!this.closed) { - const maxFetchTo = from + MaxBatchDeltas; - const fetchTo = to === undefined ? maxFetchTo : Math.min(maxFetchTo, to); + let retry: number = 0; + const deltas: ISequencedDocumentMessage[] = []; + let deltasRetrievedTotal = 0; + const nothing = { partial: false, cancel: true, payload: []}; - let deltasRetrievedLast = 0; - let canRetry = false; - let retryAfter: number | undefined; - let delay: number; + const start = performance.now(); - // Calculate delay for next iteration if request fails or we get no ops. - // If request succeeds and returns some ops, we will reset these variables. + while (!this.closed) { retry++; - delay = retryAfter ?? Math.min(MaxFetchDelaySeconds, MissingFetchDelaySeconds * Math.pow(2, retry)); + let delay = Math.min(MaxFetchDelaySeconds, MissingFetchDelaySeconds * Math.pow(2, retry)); + let canRetry = false; try { // Connect to the delta storage endpoint if (deltaStorage === undefined) { - if (this.deltaStorageP === undefined) { - this.deltaStorageP = docService.connectToDeltaStorage(); - } deltaStorage = await this.deltaStorageP; } - requests++; - // Issue async request for deltas - limit the number fetched to MaxBatchDeltas canRetry = true; - const deltasP = deltaStorage.get(from, fetchTo); + assert(deltaStorage !== undefined); + // left is inclusive for ParallelRequests, but exclusive for IDocumentDeltaStorageService + // right is exclusive for both + const deltasP = deltaStorage.get(from - 1, to); - // Return previously fetched deltas, for processing while we are waiting for new request. - if (deltas.length > 0) { - callback(deltas); - } - - // Now wait for request to come back const { messages, partialResult } = await deltasP; - deltas = messages; + deltas.push(...messages); - // Note that server (or driver code) can push here something unexpected, like undefined - // Exception thrown as result of it will result in us retrying - deltasRetrievedLast = deltas.length; + const deltasRetrievedLast = messages.length; deltasRetrievedTotal += deltasRetrievedLast; - const lastFetch = deltasRetrievedLast > 0 ? deltas[deltasRetrievedLast - 1].sequenceNumber : from; - - // If we have no upper bound, then need to check partialResult flag. Different caching layers will - // return whatever ops they have and we need to keep asking until we get to the end. - // Only when partialResult = false, we know we got everything caching/storage layers have to offer, - // and if it's less than what we asked for, we know we reached the end. - // But if we know upper bound, then we have to get all these ops, even of storage says it does not - // have them. That could happen if offering service did not flush them yet to storage, or is in process - // of doing it, and we know we have a gap on our knowledge and can't proceed further without these ops. - // Note #1: we can get more ops than what we asked for - need to account for that! - // Note #2: from & to are exclusive! I.e. we actually expect [from + 1, to - 1] range of ops back! - if (to === undefined ? (!partialResult && lastFetch < maxFetchTo - 1) : to - 1 <= lastFetch) { - callback(deltas); - telemetryEvent.end({ lastFetch, deltasRetrievedTotal, requests }); - return; + + if (deltasRetrievedLast !== 0 || !strongTo) { + telemetryEvent.reportProgress({ + chunkDeltas: deltasRetrievedTotal, + chunkFrom: from, + chunkTo: to, + chunkRequests: retry, + chunkDuration: TelemetryLogger.formatTick(performance.now() - start), + }); + return { payload: deltas, cancel: false, partial: partialResult}; } + // Storage does not have ops we need. // Attempt to fetch more deltas. If we didn't receive any in the previous call we up our retry // count since something prevented us from seeing those deltas - from = lastFetch; - - if (deltasRetrievedLast !== 0) { - // If we are getting some ops, reset all counters. - delay = 0; - retry = 0; - lastSuccessTime = undefined; - } else if (lastSuccessTime === undefined) { + + if (lastSuccessTime === undefined) { lastSuccessTime = Date.now(); } else if (Date.now() - lastSuccessTime > 30000) { // If we are connected and receiving proper responses from server, but can't get any ops back, @@ -887,7 +920,7 @@ export class DeltaManager category: "error", error: "too many retries", retry, - requests, + request, deltasRetrievedTotal, replayFrom: from, to, @@ -896,7 +929,7 @@ export class DeltaManager "Failed to retrieve ops from storage: giving up after too many retries", false /* canRetry */, )); - return; + return nothing; } } catch (origError) { canRetry = canRetry && canRetryOnError(origError); @@ -908,9 +941,9 @@ export class DeltaManager this.logger, { eventName: "GetDeltas_Error", - fetchTo, + fetchTo: to, from, - requests, + request, retry, }, origError); @@ -919,42 +952,28 @@ export class DeltaManager // It's game over scenario. telemetryEvent.cancel({ category: "error" }, origError); this.close(error); - return; + return nothing; } - retryAfter = getRetryDelayFromError(origError); + const retryAfter = getRetryDelayFromError(origError); if (retryAfter !== undefined && retryAfter >= 0) { this.emitDelayInfo(this.deltaStorageDelayId, retryAfter, error); + delay = retryAfter; } } if (to !== undefined && this.lastQueuedSequenceNumber >= to) { // the client caught up while we were trying to fetch ops from storage // bail out since we no longer need to request these ops - telemetryEvent.end({ - deltasRetrievedTotal, - requests, - lastQueuedSequenceNumber: this.lastQueuedSequenceNumber, - }); - return; + return nothing; } - telemetryEvent.reportProgress({ - delay, // seconds - deltasRetrievedLast, - deltasRetrievedTotal, - replayFrom: from, - requests, - retry, - success: lastSuccessTime !== undefined, - }); - await waitForConnectedState(delay * 1000); } // Might need to change to non-error event - this.logger.sendErrorEvent({ eventName: "GetDeltasClosedConnection" }); telemetryEvent.cancel({ error: "container closed" }); + return nothing; } /** diff --git a/packages/loader/driver-utils/src/index.ts b/packages/loader/driver-utils/src/index.ts index 377ca4f48315..631e05555b38 100644 --- a/packages/loader/driver-utils/src/index.ts +++ b/packages/loader/driver-utils/src/index.ts @@ -13,3 +13,4 @@ export * from "./network"; export * from "./readAndParse"; export * from "./fluidResolvedUrl"; export * from "./summaryForCreateNew"; +export * from "./parallelRequests"; diff --git a/packages/loader/driver-utils/src/parallelRequests.ts b/packages/loader/driver-utils/src/parallelRequests.ts new file mode 100644 index 000000000000..78b03b75dfb2 --- /dev/null +++ b/packages/loader/driver-utils/src/parallelRequests.ts @@ -0,0 +1,341 @@ +/*! + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ +import { assert, Deferred } from "@fluidframework/common-utils"; +import { ITelemetryLogger } from "@fluidframework/common-definitions"; + +/** + * Helper class to organize parallel fetching of data + * It can be used to concurrently do many requests, while consuming + * data in the right order. Take a look at UT for examples. + * @param concurrency - level of concurrency + * @param from - starting point of fetching data (inclusive) + * @param to - ending point of fetching data. exclusive, or undefined if unknown + * @param payloadSize - batch size + * @param logger - logger to use + * @param requestCallback - callback to request batches + * @returns - Queue that can be used to retrieve data + */ +export class ParallelRequests { + private latestRequested: number; + private nextToDeliver: number; + private readonly results: Map = new Map(); + private working = true; + private requestsInFlight = 0; + private readonly endEvent = new Deferred(); + private requests = 0; + private readonly knewTo: boolean; + + constructor( + from: number, + private to: number | undefined, + private readonly payloadSize: number, + private readonly logger: ITelemetryLogger, + private readonly requestCallback: (request: number, from: number, to: number, strongTo: boolean) => + Promise<{ partial: boolean, cancel: boolean, payload: T[] }>, + private readonly responseCallback: (payload: T[]) => void) + { + this.latestRequested = from; + this.nextToDeliver = from; + this.knewTo = (to !== undefined); + } + + public cancel() { + this.working = false; + this.endEvent.resolve(); + } + + public async run(concurrency: number) { + assert(concurrency > 0); + assert(this.working); + + let c = concurrency; + while (c > 0) { + c--; + this.addRequest(); + } + this.dispatch();// will recalculate and trigger this.endEvent if needed + return this.endEvent.promise; + } + + private done() { + // We should satisfy request fully. + assert(this.to !== undefined); + assert(this.nextToDeliver === this.to); + this.working = false; + this.endEvent.resolve(); + } + + private fail(error) { + this.working = false; + this.endEvent.reject(error); + } + + private dispatch() { + while (this.working) { + const value = this.results.get(this.nextToDeliver); + if (value === undefined) { + break; + } + this.results.delete(this.nextToDeliver); + this.nextToDeliver += value.length; + this.responseCallback(value); + } + + // Account for cancellation - state might be not in consistent state on cancelling operation + if (this.working) { + assert(this.requestsInFlight !== 0 || this.results.size === 0); + + if (this.requestsInFlight === 0) { + // we should have dispatched everything, no matter whether we knew about the end or not. + // see comment in addRequestCore() around throwing away chunk if it's above this.to + assert(this.results.size === 0); + this.done(); + } else if (this.to !== undefined && this.nextToDeliver >= this.to) { + // Learned about the end and dispatched all the ops up to it. + // Ignore all the in-flight requests above boundary - unblock caller sooner. + assert(!this.knewTo); + this.done(); + } + } + } + + private getNextChunk() { + if (!this.working) { + return undefined; + } + + const from = this.latestRequested; + if (this.to !== undefined) { + if (this.to <= from) { + return undefined; + } + } + + // this.latestRequested + // inclusive on the right side! Exclusive on the left. + this.latestRequested += this.payloadSize; + + if (this.to !== undefined) { + this.latestRequested = Math.min(this.to, this.latestRequested); + } + + assert(from < this.latestRequested); + + return { from, to: this.latestRequested}; + } + + private addRequest() { + const chunk = this.getNextChunk(); + if (chunk === undefined) { + return; + } + this.addRequestCore(chunk.from, chunk.to).catch(this.fail.bind(this)); + } + + private async addRequestCore(fromArg: number, toArg: number) { + assert(this.working); + + let from = fromArg; + let to = toArg; + + // to & from are exclusive + this.requestsInFlight++; + while (this.working) { + const requestedLength = to - from; + assert(requestedLength > 0); + + // We should not be wasting time asking for something useless. + if (this.to !== undefined) { + assert(from < this.to); + assert(to <= this.to); + } + + this.requests++; + + const promise = this.requestCallback(this.requests, from, to, this.to !== undefined); + + // dispatch any prior received data + this.dispatch(); + + const { payload, cancel, partial } = await promise; + + if (cancel) { + this.cancel(); + } + + if (this.to !== undefined && from >= this.to) { + // while we were waiting for response, we learned on what is the boundary + // We can get here (with actual result!) if situation changed while this request was in + // flight, i.e. the end was extended over what we learn in some other request + // While it's useful not to throw this result, this is very corner cases and makes logic + // (including consistency checks) much harder to write correctly. + // So for now, we are throwing this result out the window. + assert(!this.knewTo); + // Learn how often it happens and if it's too wasteful to throw these chunks. + // If it pops into our view a lot, we would need to reconsider how we approach it. + // Note that this is not visible to user other than potentially not hitting 100% of + // what we can in perf domain. + if (payload.length !== 0) { + this.logger.sendErrorEvent({ + eventName: "ParallelRequests_GotExtra", + from, + to, + end: this.to, + length: payload.length, + }); + } + + break; + } + + if (this.working) { + if (payload.length !== 0) { + this.results.set(from, payload); + } else { + // 1. empty (partial) chunks should not be returned by various caching / adapter layers - + // they should fall back to next layer. This might be important invariant to hold to ensure + // that we are less likely have bugs where such layer would keep returning empty partial + // result on each call. + // 2. Current invariant is that callback does retries until it gets something, + // with the goal of failing if zero data is retrieved in given amount of time. + // This is very specific property of storage / ops, so this logic is not here, but given only + // one user of this class, we assert that to catch issues earlier. + // These invariant can be relaxed if needed. + assert(!partial); + assert(!this.knewTo); + } + + let fullChunk = (requestedLength <= payload.length); // we can possible get more than we asked. + from += payload.length; + + if (!partial && !fullChunk) { + if (!this.knewTo) { + if (this.to === undefined || this.to > from) { + // The END + assert(!this.knewTo); + this.to = from; + } + break; + } + // We know that there are more items to be retrieved + // Can we get partial chunk? Ideally storage indicates that's not a full chunk + // Note that it's possible that not all ops hit storage yet. + // We will come back to request more, and if we can't get any more ops soon, it's + // catastrophic failure (see comment above on responsibility of callback to return something) + // This layer will just keep trying until it gets full set. + this.logger.sendErrorEvent({ + eventName: "ParallelRequestsPartial", + from, + to, + length: payload.length, + }); + } + + if (to === this.latestRequested) { + // we can go after full chunk at the end if we received partial chunk, or more than asked + this.latestRequested = from; + fullChunk = true; + } + + if (fullChunk) { + const chunk = this.getNextChunk(); + if (chunk === undefined) { break; } + from = chunk.from; + to = chunk.to; + } + } + } + this.requestsInFlight--; + this.dispatch(); + } +} + +/** + * Read interface for the Queue + */ +export interface IReadPipe { + pop(): Promise; +} + +/** + * Helper queue class to allow async push / pull + * It's essentially a pipe allowing multiple writers, and single reader + */ +export class Queue implements IReadPipe { + private readonly queue: Promise[] = []; + private deferred: Deferred | undefined; + private done = false; + + public pushValue(value: T) { + this.pushCore(Promise.resolve(value)); + } + + public pushError(error: any) { + this.pushCore(Promise.reject(error)); + this.done = true; + } + + public pushDone() { + this.pushCore(Promise.resolve(undefined)); + this.done = true; + } + + protected pushCore(value: Promise) { + assert(!this.done); + if (this.deferred) { + assert(this.queue.length === 0); + this.deferred.resolve(value); + this.deferred = undefined; + } else { + this.queue.push(value); + } + } + + public async pop(): Promise { + assert(this.deferred === undefined); + const el = this.queue.shift(); + if (el !== undefined) { + return el; + } + assert(!this.done); + this.deferred = new Deferred(); + return this.deferred.promise; + } +} + +/** + * Helper function to expose ParallelRequests through IReadPipe interface + * @param concurrency - level of concurrency + * @param from - starting point of fetching data (inclusive) + * @param to - ending point of fetching data. exclusive, or undefined if unknown + * @param payloadSize - batch size + * @param logger - logger to use + * @param requestCallback - callback to request batches + * @returns - Queue that can be used to retrieve data + */ +export function parallel( + concurrency: number, + from: number, + to: number | undefined, + payloadSize: number, + logger: ITelemetryLogger, + requestCallback: (request: number, from: number, to: number, strongTo: boolean) => + Promise<{ partial: boolean, cancel: boolean, payload: T[] }>, +): IReadPipe { + const queue = new Queue(); + const manager = new ParallelRequests( + from, + to, + payloadSize, + logger, + requestCallback, + (messages: T[]) => queue.pushValue(messages)); + + manager.run(concurrency) + .then(() => queue.pushDone()) + .catch((error) => queue.pushError(error)); + + return queue; +} diff --git a/packages/loader/driver-utils/src/test/parallelRequests.spec.ts b/packages/loader/driver-utils/src/test/parallelRequests.spec.ts new file mode 100644 index 000000000000..24bc9228fd58 --- /dev/null +++ b/packages/loader/driver-utils/src/test/parallelRequests.spec.ts @@ -0,0 +1,170 @@ +/*! + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; +import { TelemetryUTLogger } from "@fluidframework/telemetry-utils"; +import { ParallelRequests } from "../parallelRequests"; + +describe("Parallel Requests", () => { + async function test( + concurrency: number, + payloadSize: number, + from: number, + to: number, + expectedRequests: number, + knownTo: boolean, + partial = false) + { + let nextElement = from; + let requests = 0; + let dispatches = 0; + + const manager = new ParallelRequests( + from, + knownTo ? to : undefined, + payloadSize, + new TelemetryUTLogger(), + async (request: number, _from: number, _to: number) => { + let length = _to - _from; + requests++; + + assert(_from >= from); + assert(length <= payloadSize); + assert(requests <= request); + assert(!knownTo || _to <= to); + + if (partial) { + length = Math.min(length, payloadSize / 2 + 1); + } + // covering knownTo === false case + const actualTo = Math.min(_from + length, to); + + const payload: number[] = []; + for (let i = _from; i < actualTo; i++) { + payload.push(i); + } + + return { partial: _from === to ? false : partial, cancel: false, payload}; + }, + (deltas: number[]) => { + dispatches++; + assert(dispatches <= requests); + for (const el of deltas) { + assert(el === nextElement); + nextElement++; + } + }, + ); + + await manager.run(concurrency); + + assert(nextElement === to); + assert(dispatches <= requests); + assert(!knownTo || dispatches === requests); + assert(requests === expectedRequests); + } + + async function testCancel( + from: number, + to: number | undefined, + cancelAt: number, + payloadSize, + expectedRequests: number) + { + let nextElement = from; + let requests = 0; + let dispatches = 0; + + const manager = new ParallelRequests( + from, + to, + payloadSize, + new TelemetryUTLogger(), + async (request: number, _from: number, _to: number) => { + const length = _to - _from; + requests++; + + assert(_from >= from); + assert(length <= payloadSize); + assert(requests <= request); + assert(to === undefined || _to <= to); + + if (_to > cancelAt) { + return { partial: false, cancel: true, payload: []}; + } + + const payload: number[] = []; + for (let i = _from; i < _to; i++) { + payload.push(i); + } + + return { partial: false, cancel: false, payload}; + }, + (deltas: number[]) => { + dispatches++; + assert(dispatches <= requests); + for (const el of deltas) { + assert(el === nextElement); + nextElement++; + } + }, + ); + + await manager.run(10); + + assert(dispatches <= requests); + assert(requests === expectedRequests); + } + + it("no concurrency, single request, over", async () => { + await test(1, 100, 123, 156, 1, true); + await test(1, 100, 123, 156, 1, false); + await test(1, 100, 123, 156, 1, true, true); + }); + + it("no concurrency, single request, exact", async () => { + await test(1, 156 - 123, 123, 156, 1, true); + await test(1, 156 - 123, 123, 156, 2, false); + await test(1, 156 - 123, 123, 156, 2, true, true); + await test(1, 156 - 123, 123, 156, 3, false, true); + }); + + it("concurrency, single request, exact", async () => { + await test(2, 156 - 123, 123, 156, 1, true); + await test(2, 156 - 123, 123, 156, 2, true, true); + // here, the number of actual requests is Ok to be 2..3 + await test(2, 156 - 123, 123, 156, 3, false); + await test(2, 156 - 123, 123, 156, 3, false, true); + }); + + it("no concurrency, multiple requests", async () => { + await test(1, 10, 123, 156, 4, true); + await test(1, 10, 123, 156, 4, false); + }); + + it("two concurrent requests exact", async () => { + await test(2, 10, 123, 153, 3, true); + await test(2, 10, 123, 153, 6, true, true); + await test(2, 10, 123, 153, 5, false); + await test(2, 10, 123, 153, 8, false, true); + }); + + it("two concurrent requests one over", async () => { + await test(2, 10, 123, 154, 4, true); + // here, the number of actual requests is Ok to be 4..5 + await test(2, 10, 123, 154, 5, false); + }); + + it("four concurrent requests", async () => { + await test(4, 10, 123, 156, 4, true); + // here, the number of actual requests is Ok to be 4..7 + await test(4, 10, 123, 156, 7, false); + }); + + it("cancellation", async () => { + await testCancel(1, 1000, 502, 10, 60); + await testCancel(1, undefined, 502, 10, 60); + }); +}); diff --git a/packages/tools/fetch-tool/package.json b/packages/tools/fetch-tool/package.json index 821190c741ae..2c19990d0b09 100644 --- a/packages/tools/fetch-tool/package.json +++ b/packages/tools/fetch-tool/package.json @@ -39,6 +39,7 @@ "@fluidframework/routerlicious-driver": "^0.37.0", "@fluidframework/routerlicious-urlresolver": "^0.37.0", "@fluidframework/runtime-definitions": "^0.37.0", + "@fluidframework/telemetry-utils": "^0.37.0", "@fluidframework/tool-utils": "^0.37.0", "assert": "^2.0.0" }, diff --git a/packages/tools/fetch-tool/src/fluidFetch.ts b/packages/tools/fetch-tool/src/fluidFetch.ts index 1e62dcafc001..b613d505ccdd 100644 --- a/packages/tools/fetch-tool/src/fluidFetch.ts +++ b/packages/tools/fetch-tool/src/fluidFetch.ts @@ -106,9 +106,12 @@ fluidFetchMain() if (error instanceof Error) { let extraMsg = ""; for (const key of Object.keys(error)) { - if (key !== "message" && key !== "stack") { - extraMsg += `\n${key}: ${JSON.stringify(error[key], undefined, 2)}`; - } + // error[key] might have circular structure + try { + if (key !== "message" && key !== "stack") { + extraMsg += `\n${key}: ${JSON.stringify(error[key], undefined, 2)}`; + } + } catch (_) {} } console.error(`ERROR: ${error.stack}${extraMsg}`); } else if (typeof error === "object") { diff --git a/packages/tools/fetch-tool/src/fluidFetchMessages.ts b/packages/tools/fetch-tool/src/fluidFetchMessages.ts index cbe4b5530d05..4a44e6c4f1f7 100644 --- a/packages/tools/fetch-tool/src/fluidFetchMessages.ts +++ b/packages/tools/fetch-tool/src/fluidFetchMessages.ts @@ -4,7 +4,8 @@ */ import fs from "fs"; -import { assert } from "@fluidframework/common-utils"; +import { assert} from "@fluidframework/common-utils"; +import { TelemetryUTLogger } from "@fluidframework/telemetry-utils"; import { IDocumentDeltaStorageService, IDocumentService, @@ -15,6 +16,7 @@ import { MessageType, ScopeType, } from "@fluidframework/protocol-definitions"; +import { parallel } from "@fluidframework/driver-utils"; import { printMessageStats } from "./fluidAnalyzeMessages"; import { connectToWebSocket, @@ -32,18 +34,7 @@ async function loadChunk(from: number, to: number, deltaStorage: IDocumentDeltaS console.log(`Loading ops at ${from}`); for (let iter = 0; iter < 3; iter++) { try { - const { messages, partialResult } = await deltaStorage.get(from, to); - // This parsing of message contents happens in delta manager. But when we analyze messages - // for message stats, we skip that path. So parsing of json contents needs to happen here. - for (const message of messages) { - if (typeof message.contents === "string" - && message.contents !== "" - && message.type !== MessageType.ClientLeave - ) { - message.contents = JSON.parse(message.contents); - } - } - return { messages, partialResult }; + return await deltaStorage.get(from - 1, to); // from is exclusive for get() } catch (error) { console.error("Hit error while downloading ops. Retrying"); console.error(error); @@ -56,7 +47,6 @@ async function* loadAllSequencedMessages( documentService?: IDocumentService, dir?: string, files?: string[]) { - const batch = 20000; // see data in issue #5211 on possible sizes we can use. let lastSeq = 0; // If we have local save, read ops from there first @@ -91,22 +81,51 @@ async function* loadAllSequencedMessages( let timeStart = Date.now(); let requests = 0; let opsStorage = 0; + + const concurrency = 4; + const batch = 20000; // see data in issue #5211 on possible sizes we can use. + + const queue = parallel( + concurrency, + lastSeq + 1, // inclusive left + undefined, // to + batch, + new TelemetryUTLogger(), + async (_request: number, from: number, to: number) => { + const { messages, partialResult } = await loadChunk(from, to, deltaStorage); + return {partial: partialResult, cancel: false, payload: messages}; + }, + ); + while (true) { - requests++; - const { messages, partialResult } = await loadChunk(lastSeq, lastSeq + batch, deltaStorage); - if (messages.length === 0) { - assert(!partialResult, "No messages to load, but nonzero partial result"); + const messages = await queue.pop(); + if (messages === undefined) { break; } - yield messages; + requests++; + + // Empty buckets should never be returned + assert(messages.length !== 0); + // console.log(`Loaded ops at ${messages[0].sequenceNumber}`); + + // This parsing of message contents happens in delta manager. But when we analyze messages + // for message stats, we skip that path. So parsing of json contents needs to happen here. + for (const message of messages) { + if (typeof message.contents === "string" + && message.contents !== "" + && message.type !== MessageType.ClientLeave + ) { + message.contents = JSON.parse(message.contents); + } + } + opsStorage += messages.length; lastSeq = messages[messages.length - 1].sequenceNumber; + yield messages; } - if (requests > 0) { - // eslint-disable-next-line max-len - console.log(`\n${Math.floor((Date.now() - timeStart) / 1000)} seconds to retrieve ${opsStorage} ops in ${requests} requests`); - } + // eslint-disable-next-line max-len + console.log(`\n${Math.floor((Date.now() - timeStart) / 1000)} seconds to retrieve ${opsStorage} ops in ${requests} requests, using ${concurrency} parallel requests`); if (connectToWebSocket) { let logMsg = ""; diff --git a/packages/utils/telemetry-utils/src/logger.ts b/packages/utils/telemetry-utils/src/logger.ts index 54898778aa45..48c61cce26ce 100644 --- a/packages/utils/telemetry-utils/src/logger.ts +++ b/packages/utils/telemetry-utils/src/logger.ts @@ -511,3 +511,41 @@ export class LoggingError extends Error { return props; } } + +/** + * Logger that is useful for UT + * It can be used in places where logger instance is required, but events should be not send over. + */ + export class TelemetryUTLogger implements ITelemetryLogger { + public send(event: ITelemetryBaseEvent): void { + } + public sendTelemetryEvent(event: ITelemetryGenericEvent, error?: any) { + } + public sendErrorEvent(event: ITelemetryErrorEvent, error?: any) { + this.reportError("errorEvent in UT logger!", event, error); + } + public sendPerformanceEvent(event: ITelemetryPerformanceEvent, error?: any): void { + } + public logGenericError(eventName: string, error: any) { + this.reportError(`genericError in UT logger!`, { eventName }, error); + } + public logException(event: ITelemetryErrorEvent, exception: any): void { + this.reportError("exception in UT logger!", event, exception); + } + public debugAssert(condition: boolean, event?: ITelemetryErrorEvent): void { + this.reportError("debugAssert in UT logger!"); + } + public shipAssert(condition: boolean, event?: ITelemetryErrorEvent): void { + this.reportError("shipAssert in UT logger!"); + } + + private reportError(message: string, event?: ITelemetryErrorEvent, err?: any) { + const error = new Error(message); + (error as any).error = error; + (error as any).event = event; + // report to console as exception can be eaten + console.error(message); + console.error(error); + throw error; + } +}