diff --git a/packages/drivers/debugger/src/fluidDebuggerController.ts b/packages/drivers/debugger/src/fluidDebuggerController.ts index 94cbf6beac13..c80e20e31b95 100644 --- a/packages/drivers/debugger/src/fluidDebuggerController.ts +++ b/packages/drivers/debugger/src/fluidDebuggerController.ts @@ -28,8 +28,6 @@ import { Sanitizer } from "./sanitizer"; export type debuggerUIFactory = (controller: IDebuggerController) => IDebuggerUI | null; -const MaxBatchDeltas = 2000; - /** * Replay controller that uses pop-up window to control op playback */ @@ -172,8 +170,8 @@ export class DebugReplayController extends ReplayController implements IDebugger return messages; } - public fetchTo(currentOp: number): number { - return currentOp + MaxBatchDeltas; + public fetchTo(currentOp: number): number | undefined { + return undefined; } // Returns true if version / file / ops selections is made. @@ -345,27 +343,12 @@ export class DebugReplayController extends ReplayController implements IDebugger } async function* generateSequencedMessagesFromDeltaStorage(deltaStorage: IDocumentDeltaStorageService) { - let lastSeq = 0; - const batch = 2000; + const stream = deltaStorage.fetchMessages(1, undefined); while (true) { - const { messages, partialResult } = await loadChunk(lastSeq, lastSeq + batch, deltaStorage); - if (messages.length === 0) { - assert(!partialResult, - 0x087 /* "No messages loaded from chunk, but nonzero number of partial results loaded from chunk!" */); - break; - } - yield messages; - lastSeq = messages[messages.length - 1].sequenceNumber; - } -} - -async function loadChunk(from: number, to: number, deltaStorage: IDocumentDeltaStorageService) { - for (let iter = 0; iter < 3; iter++) { - try { - return await deltaStorage.get(from, to); - } catch (error) { - // Retry + const result = await stream.read(); + if (result.done) { + return; } + yield result.value; } - throw new Error("Giving up after 3 attempts to download chunk of ops."); } diff --git a/packages/drivers/file-driver/src/fileDeltaStorageService.ts b/packages/drivers/file-driver/src/fileDeltaStorageService.ts index af7648969da4..99d8f6d8c17d 100644 --- a/packages/drivers/file-driver/src/fileDeltaStorageService.ts +++ b/packages/drivers/file-driver/src/fileDeltaStorageService.ts @@ -5,7 +5,8 @@ import fs from "fs"; import { assert } from "@fluidframework/common-utils"; -import { IDocumentDeltaStorageService, IDeltasFetchResult } from "@fluidframework/driver-definitions"; +import { IDocumentDeltaStorageService, IStream } from "@fluidframework/driver-definitions"; +import { emptyMessageStream } from "@fluidframework/driver-utils"; import * as api from "@fluidframework/protocol-definitions"; /** @@ -33,12 +34,12 @@ export class FileDeltaStorageService implements IDocumentDeltaStorageService { } } - public async get( - from?: number, - to?: number, - ): Promise { - // Do not allow container move forward - return { messages: [], partialResult: false }; + public fetchMessages(from: number, + to: number | undefined, + abortSignal?: AbortSignal, + cachedOnly?: boolean, + ): IStream { + return emptyMessageStream; } public get ops(): readonly Readonly[] { diff --git a/packages/drivers/iframe-driver/src/innerDocumentService.ts b/packages/drivers/iframe-driver/src/innerDocumentService.ts index ea4347fec0cd..f99e7f99b046 100644 --- a/packages/drivers/iframe-driver/src/innerDocumentService.ts +++ b/packages/drivers/iframe-driver/src/innerDocumentService.ts @@ -69,7 +69,7 @@ export class InnerDocumentService implements IDocumentService { */ public async connectToDeltaStorage(): Promise { return { - get: async (from: number, to: number) => this.outerProxy.deltaStorage.get(from, to), + fetchMessages: (...args) => this.outerProxy.deltaStorage.fetchMessages(...args), }; } diff --git a/packages/drivers/iframe-driver/src/outerDocumentServiceFactory.ts b/packages/drivers/iframe-driver/src/outerDocumentServiceFactory.ts index 160dbc5b6f9f..bf4b1b9f4e16 100644 --- a/packages/drivers/iframe-driver/src/outerDocumentServiceFactory.ts +++ b/packages/drivers/iframe-driver/src/outerDocumentServiceFactory.ts @@ -190,10 +190,10 @@ export class DocumentServiceFactoryProxy implements IDocumentServiceFactoryProxy } private getDeltaStorage(deltaStorage: IDocumentDeltaStorageService): IDocumentDeltaStorageService { - const get = Comlink.proxy(async (from: number, to: number) => deltaStorage.get(from, to)); + const fetchMessages = Comlink.proxy(deltaStorage.fetchMessages.bind(deltaStorage)); return { - get, + fetchMessages, }; } diff --git a/packages/drivers/local-driver/src/localDeltaStorageService.ts b/packages/drivers/local-driver/src/localDeltaStorageService.ts index da25d29f44c4..13eeccb66558 100644 --- a/packages/drivers/local-driver/src/localDeltaStorageService.ts +++ b/packages/drivers/local-driver/src/localDeltaStorageService.ts @@ -4,7 +4,9 @@ */ import * as api from "@fluidframework/driver-definitions"; +import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions"; import { IDatabaseManager } from "@fluidframework/server-services-core"; +import { streamFromMessages } from "@fluidframework/driver-utils"; export class LocalDeltaStorageService implements api.IDocumentDeltaStorageService { constructor( @@ -13,15 +15,27 @@ export class LocalDeltaStorageService implements api.IDocumentDeltaStorageServic private readonly databaseManager: IDatabaseManager) { } - public async get(from: number, to: number): Promise { + public fetchMessages( + from: number, + to: number | undefined, + abortSignal?: AbortSignal, + cachedOnly?: boolean, + ): api.IStream { + return streamFromMessages(this.getCore(from, to)); + } + + private async getCore(from: number, to?: number) { const query = { documentId: this.id, tenantId: this.tenantId }; query["operation.sequenceNumber"] = {}; - query["operation.sequenceNumber"].$gt = from; - query["operation.sequenceNumber"].$lt = to; + query["operation.sequenceNumber"].$gt = from - 1; // from is inclusive + + // This looks like a bug. It used to work without setting $lt key. Now it does not + // Need follow up + query["operation.sequenceNumber"].$lt = to ?? Number.MAX_SAFE_INTEGER; const allDeltas = await this.databaseManager.getDeltaCollection(this.tenantId, this.id); const dbDeltas = await allDeltas.find(query, { "operation.sequenceNumber": 1 }); const messages = dbDeltas.map((delta) => delta.operation); - return { messages, partialResult: false }; + return messages; } } diff --git a/packages/drivers/odsp-driver/src/contracts.ts b/packages/drivers/odsp-driver/src/contracts.ts index 36c2ab7e154d..16bcbb49dbed 100644 --- a/packages/drivers/odsp-driver/src/contracts.ts +++ b/packages/drivers/odsp-driver/src/contracts.ts @@ -274,6 +274,10 @@ export interface HostStoragePolicy { blobDeduping?: boolean; + // Options overwriting default ops fetching from storage. + opsBatchSize?: number; + concurrentOpsBatches?: number; + /** * Policy controlling ops caching (leveraging IPersistedCache passed to driver factory) */ diff --git a/packages/drivers/odsp-driver/src/odspDeltaStorageService.ts b/packages/drivers/odsp-driver/src/odspDeltaStorageService.ts index 46aab13825d2..b7cc73626c8d 100644 --- a/packages/drivers/odsp-driver/src/odspDeltaStorageService.ts +++ b/packages/drivers/odsp-driver/src/odspDeltaStorageService.ts @@ -15,7 +15,7 @@ import { TokenFetchOptions } from "./tokenFetch"; /** * Provides access to the underlying delta storage on the server for sharepoint driver. */ -export class OdspDeltaStorageService implements api.IDocumentDeltaStorageService { +export class OdspDeltaStorageService { constructor( private readonly deltaFeedUrl: string, private readonly getStorageToken: (options: TokenFetchOptions, name?: string) => Promise, @@ -63,10 +63,7 @@ export class OdspDeltaStorageService implements api.IDocumentDeltaStorageService } public async buildUrl(from: number, to: number) { - const fromInclusive = from + 1; - const toInclusive = to - 1; - - const filter = encodeURIComponent(`sequenceNumber ge ${fromInclusive} and sequenceNumber le ${toInclusive}`); + const filter = encodeURIComponent(`sequenceNumber ge ${from} and sequenceNumber le ${to - 1}`); const queryString = `?filter=${filter}`; return `${this.deltaFeedUrl}${queryString}`; } diff --git a/packages/drivers/odsp-driver/src/odspDocumentService.ts b/packages/drivers/odsp-driver/src/odspDocumentService.ts index 831b5b7936a3..2af816f3ae4d 100644 --- a/packages/drivers/odsp-driver/src/odspDocumentService.ts +++ b/packages/drivers/odsp-driver/src/odspDocumentService.ts @@ -17,7 +17,11 @@ import { IDocumentStorageService, IDocumentServicePolicies, } from "@fluidframework/driver-definitions"; -import { canRetryOnError } from "@fluidframework/driver-utils"; +import { + canRetryOnError, + requestOps, + streamObserver, +} from "@fluidframework/driver-utils"; import { fetchTokenErrorCode, throwOdspNetworkError } from "@fluidframework/odsp-doclib-utils"; import { IClient, @@ -223,41 +227,70 @@ export class OdspDocumentService implements IDocumentService { this.logger, ); - let missed = false; - return { - get: async (from: number, to: number) => { - if (snapshotOps !== undefined && snapshotOps.length !== 0) { - const messages = snapshotOps.filter((op) => op.sequenceNumber > from).map((op) => op.op); - snapshotOps = undefined; - if (messages.length > 0 && messages[0].sequenceNumber === from + 1) { - // Consider not caching these ops as they will be cached as part of snapshot cache entry - this.opsReceived(messages); - return { messages, partialResult: true }; - } else { - this.logger.sendErrorEvent({ - eventName: "SnapshotOpsNotUsed", - length: messages.length, - first: messages[0].sequenceNumber, - from, - to, - }); - } - } - - // We always write ops sequentially. Once there is a miss, stop consulting cache. - // This saves a bit of processing time - if (!missed) { - const messagesFromCache = await this.opsCache?.get(from, to); - if (messagesFromCache !== undefined && messagesFromCache.length !== 0) { - return { messages: messagesFromCache as ISequencedDocumentMessage[], partialResult: true }; - } - missed = true; - } + // batch size, please see issue #5211 for data around batch sizing + const batchSize = this.hostPolicy.opsBatchSize ?? 5000; + const concurrency = this.hostPolicy.concurrentOpsBatches ?? 1; - const result = await service.get(from, to); - this.opsReceived(result.messages); - return result; - }, + return { + fetchMessages: ( + from: number, + to: number | undefined, + abortSignal?: AbortSignal, + cachedOnly?: boolean) => { + let missed = false; + const stream = requestOps( + async (f: number, t: number) => { + if (snapshotOps !== undefined && snapshotOps.length !== 0) { + const messages = snapshotOps.filter((op) => + op.sequenceNumber > from).map((op) => op.op); + snapshotOps = undefined; + if (messages.length > 0 && messages[0].sequenceNumber === from + 1) { + // Consider not caching these ops as they will be cached as part of + // snapshot cache entry + this.opsReceived(messages); + return { messages, partialResult: true }; + } else { + this.logger.sendErrorEvent({ + eventName: "SnapshotOpsNotUsed", + length: messages.length, + first: messages[0].sequenceNumber, + from, + to, + }); + } + } + // We always write ops sequentially. Once there is a miss, stop consulting cache. + // This saves a bit of processing time + if (!missed) { + const messagesFromCache = await this.opsCache?.get(from, to); + if (messagesFromCache !== undefined && messagesFromCache.length !== 0) { + return { + messages: messagesFromCache as ISequencedDocumentMessage[], + partialResult: true, + }; + } + missed = true; + } + + // Proper implementaiton Coming in future + if (cachedOnly) { + return { messages: [], partialResult: false }; + } + + return service.get(f, t); + }, + // Staging: starting with no concurrency, listening for feedback first. + // In future releases we will switch to actual concurrency + concurrency, + from, // inclusive + to, // exclusive + batchSize, + this.logger, + abortSignal, + ); + + return streamObserver(stream, (ops) => this.opsReceived(ops)); + }, }; } diff --git a/packages/drivers/odsp-driver/src/test/deltaStorageService.spec.ts b/packages/drivers/odsp-driver/src/test/deltaStorageService.spec.ts index 28d52c33e0ad..6e9bc3e81029 100644 --- a/packages/drivers/odsp-driver/src/test/deltaStorageService.spec.ts +++ b/packages/drivers/odsp-driver/src/test/deltaStorageService.spec.ts @@ -32,7 +32,7 @@ describe("DeltaStorageService", () => { async (_refresh) => "?access_token=123", createUtEpochTracker(fileEntry, logger), logger); - const actualDeltaUrl = await deltaStorageService.buildUrl(2, 8); + const actualDeltaUrl = await deltaStorageService.buildUrl(3, 8); // eslint-disable-next-line max-len const expectedDeltaUrl = `${deltaStorageBasePath}/drives/testdrive/items/testitem/opStream?filter=sequenceNumber%20ge%203%20and%20sequenceNumber%20le%207`; assert.equal(actualDeltaUrl, expectedDeltaUrl, "The constructed delta url is invalid"); diff --git a/packages/drivers/replay-driver/src/emptyDeltaStorageService.ts b/packages/drivers/replay-driver/src/emptyDeltaStorageService.ts index 2a0c9e960d9c..c812089406b3 100644 --- a/packages/drivers/replay-driver/src/emptyDeltaStorageService.ts +++ b/packages/drivers/replay-driver/src/emptyDeltaStorageService.ts @@ -3,7 +3,9 @@ * Licensed under the MIT License. */ -import { IDocumentDeltaStorageService, IDeltasFetchResult } from "@fluidframework/driver-definitions"; +import { IDocumentDeltaStorageService, IStream } from "@fluidframework/driver-definitions"; +import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions"; +import { emptyMessageStream } from "@fluidframework/driver-utils"; export class EmptyDeltaStorageService implements IDocumentDeltaStorageService { /** @@ -12,7 +14,12 @@ export class EmptyDeltaStorageService implements IDocumentDeltaStorageService { * @param to - Op are returned from to - 1. * @returns Array of ops requested by the user. */ - public async get(from: number, to: number): Promise { - return { messages: [], partialResult: false }; + public fetchMessages( + from: number, + to: number | undefined, + abortSignal?: AbortSignal, + cachedOnly?: boolean): IStream + { + return emptyMessageStream; } } diff --git a/packages/drivers/replay-driver/src/replayController.ts b/packages/drivers/replay-driver/src/replayController.ts index 4a73794bd396..1fbbc4d0e059 100644 --- a/packages/drivers/replay-driver/src/replayController.ts +++ b/packages/drivers/replay-driver/src/replayController.ts @@ -64,7 +64,7 @@ export abstract class ReplayController extends ReadDocumentStorageServiceBase { * Note: this API is called while replay() is in progress - next batch of ops is downloaded in parallel * @param currentOp - current op */ - public abstract fetchTo(currentOp: number): number; + public abstract fetchTo(currentOp: number): number | undefined; /** * Returns true if no more ops should be processed (or downloaded for future processing). diff --git a/packages/drivers/replay-driver/src/replayDocumentDeltaConnection.ts b/packages/drivers/replay-driver/src/replayDocumentDeltaConnection.ts index 6de5bf0d3bfd..4b9ccec24faa 100644 --- a/packages/drivers/replay-driver/src/replayDocumentDeltaConnection.ts +++ b/packages/drivers/replay-driver/src/replayDocumentDeltaConnection.ts @@ -21,12 +21,10 @@ import { IVersion, ScopeType, } from "@fluidframework/protocol-definitions"; -import { assert, TypedEventEmitter } from "@fluidframework/common-utils"; +import { TypedEventEmitter } from "@fluidframework/common-utils"; import { debug } from "./debug"; import { ReplayController } from "./replayController"; -const MaxBatchDeltas = 2000; - const ReplayDocumentId = "documentId"; export class ReplayControllerStatic extends ReplayController { @@ -76,9 +74,10 @@ export class ReplayControllerStatic extends ReplayController { } public fetchTo(currentOp: number) { - const useFetchToBatch = !(this.unitIsTime !== true && this.replayTo >= 0); - const fetchToBatch = currentOp + MaxBatchDeltas; - return useFetchToBatch ? fetchToBatch : Math.min(fetchToBatch, this.replayTo); + if (!(this.unitIsTime !== true && this.replayTo >= 0)) { + return undefined; + } + return this.replayTo; } public isDoneFetch(currentOp: number, lastTimeStamp?: number) { @@ -166,7 +165,6 @@ export class ReplayControllerStatic extends ReplayController { } } } - // eslint-disable-next-line @typescript-eslint/no-use-before-define scheduleNext(nextInterval); emitter(playbackOps); }; @@ -312,26 +310,30 @@ export class ReplayDocumentDeltaConnection do { const fetchTo = controller.fetchTo(currentOp); - const { messages, partialResult } = await documentStorageService.get(currentOp, fetchTo); - - if (messages.length === 0) { - // No more ops. But, they can show up later, either because document was just created, - // or because another client keeps submitting new ops. - assert(!partialResult, 0x0af /* "No more ops, but nonzero partial results!" */); - if (controller.isDoneFetch(currentOp, undefined)) { + const abortController = new AbortController(); + const stream = documentStorageService.fetchMessages(currentOp + 1, fetchTo, abortController.signal); + do { + const result = await stream.read(); + + if (result.done) { + // No more ops. But, they can show up later, either because document was just created, + // or because another client keeps submitting new ops. + done = controller.isDoneFetch(currentOp, undefined); + if (!done) { + await delay(2000); + } break; } - await delay(2000); - continue; - } + replayPromiseChain = replayPromiseChain.then( + async () => controller.replay((ops) => this.emit("op", ReplayDocumentId, ops), messages)); - replayPromiseChain = replayPromiseChain.then( - async () => controller.replay((ops) => this.emit("op", ReplayDocumentId, ops), messages)); + const messages = result.value; + currentOp += messages.length; + done = controller.isDoneFetch(currentOp, messages[messages.length - 1].timestamp); + } while (!done); - currentOp += messages.length; - done = controller.isDoneFetch(currentOp, messages[messages.length - 1].timestamp); + abortController.abort(); } while (!done); - return replayPromiseChain; } } diff --git a/packages/drivers/routerlicious-driver/src/deltaStorageService.ts b/packages/drivers/routerlicious-driver/src/deltaStorageService.ts index 8668d5b1548b..6f49f6d7150f 100644 --- a/packages/drivers/routerlicious-driver/src/deltaStorageService.ts +++ b/packages/drivers/routerlicious-driver/src/deltaStorageService.ts @@ -7,9 +7,11 @@ import { IDeltaStorageService, IDocumentDeltaStorageService, IDeltasFetchResult, + IStream, } from "@fluidframework/driver-definitions"; import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions"; -import { readAndParse } from "@fluidframework/driver-utils"; +import { readAndParse, requestOps, emptyMessageStream } from "@fluidframework/driver-utils"; +import { TelemetryNullLogger } from "@fluidframework/common-utils"; import { ITelemetryLogger } from "@fluidframework/common-definitions"; import { PerformanceEvent } from "@fluidframework/telemetry-utils"; import { ITokenProvider } from "./tokens"; @@ -31,30 +33,43 @@ export class DocumentDeltaStorageService implements IDocumentDeltaStorageService private logtailSha: string | undefined = this.documentStorageService.logTailSha; - public async get(from: number, to: number): Promise { + fetchMessages(from: number, + to: number | undefined, + abortSignal?: AbortSignal, + cachedOnly?: boolean, + ): IStream + { + if (cachedOnly) { + return emptyMessageStream; + } + return requestOps( + this.getCore.bind(this), + // Staging: starting with no concurrency, listening for feedback first. + // In future releases we will switch to actual concurrency + 1, // concurrency + from, // inclusive + to, // exclusive + MaxBatchDeltas, + new TelemetryNullLogger(), + abortSignal, + ); + } + + private async getCore(from: number, to: number): Promise { const opsFromLogTail = this.logtailSha ? await readAndParse (this.documentStorageService, this.logtailSha) : []; this.logtailSha = undefined; if (opsFromLogTail.length > 0) { const messages = opsFromLogTail.filter((op) => - op.sequenceNumber > from, + op.sequenceNumber >= from, ); if (messages.length > 0) { return { messages, partialResult: true }; } } - const length = to - from - 1; // to & from are exclusive! - const batchLength = Math.min(MaxBatchDeltas, length); // limit number of ops we retrieve at once - const result = await this.storageService.get(this.tenantId, this.id, from, from + batchLength + 1); - - // if we got full batch, and did not fully satisfy original request, then there is likely more... - // Note that it's not disallowed to return more ops than requested! - if (result.messages.length >= batchLength && result.messages.length !== length) { - result.partialResult = true; - } - return result; + return this.storageService.get(this.tenantId, this.id, from, to); } } @@ -71,8 +86,10 @@ export class DeltaStorageService implements IDeltaStorageService { public async get( tenantId: string, id: string, - from: number, - to: number): Promise { + from: number, // inclusive + to: number, // exclusive + ): Promise + { const ordererRestWrapper = await RouterliciousOrdererRestWrapper.load( tenantId, id, this.tokenProvider, this.logger); const ops = await PerformanceEvent.timedExecAsync( @@ -83,7 +100,9 @@ export class DeltaStorageService implements IDeltaStorageService { to, }, async (event) => { - const response = await ordererRestWrapper.get(this.url, { from, to }); + const response = await ordererRestWrapper.get( + this.url, + { from: from - 1, to }); event.end({ count: response.length, }); diff --git a/packages/loader/container-loader/src/container.ts b/packages/loader/container-loader/src/container.ts index 1488089467d9..a51fdc2f34a2 100644 --- a/packages/loader/container-loader/src/container.ts +++ b/packages/loader/container-loader/src/container.ts @@ -199,16 +199,20 @@ export async function waitContainerToCatchUp(container: Container) { deltaManager.on("op", callbackOps); }; - if (container.connectionState !== ConnectionState.Disconnected) { + // We can leverage DeltaManager's "connect" event here and test for ConnectionState.Disconnected + // But that works only if service provides us checkPointSequenceNumber + // Our internal testing is based on R11S that does not, but almost all tests connect as "write" and + // use this function to catch up, so leveraging our own join op as a fence/barrier + if (container.connectionState === ConnectionState.Connected) { waitForOps(); return; } const callback = () => { - deltaManager.off(connectEventName, callback); + container.off(connectedEventName, callback); waitForOps(); }; - deltaManager.on(connectEventName, callback); + container.on(connectedEventName, callback); container.resume(); }); diff --git a/packages/loader/container-loader/src/deltaManager.ts b/packages/loader/container-loader/src/deltaManager.ts index 01552bc4e271..fc3ee8d347cb 100644 --- a/packages/loader/container-loader/src/deltaManager.ts +++ b/packages/loader/container-loader/src/deltaManager.ts @@ -18,7 +18,7 @@ import { ReadOnlyInfo, } from "@fluidframework/container-definitions"; import { assert, performance, TypedEventEmitter } from "@fluidframework/common-utils"; -import { PerformanceEvent, TelemetryLogger, ChildLogger, safeRaiseEvent } from "@fluidframework/telemetry-utils"; +import { PerformanceEvent, TelemetryLogger, safeRaiseEvent } from "@fluidframework/telemetry-utils"; import { IDocumentDeltaStorageService, IDocumentService, @@ -51,7 +51,6 @@ import { getRetryDelayFromError, logNetworkFailure, waitForConnectedState, - requestOps, } from "@fluidframework/driver-utils"; import { CreateContainerError, @@ -65,7 +64,6 @@ import { PrefetchDocumentStorageService } from "./prefetchDocumentStorageService const MaxReconnectDelaySeconds = 8; const InitialReconnectDelaySeconds = 1; -const MaxBatchDeltas = 5000; // Please see Issue #5211 for data around batch sizing const DefaultChunkSize = 16 * 1024; function getNackReconnectInfo(nackContent: INackContent) { @@ -817,28 +815,22 @@ export class DeltaManager this.deltaStorageP = docService.connectToDeltaStorage(); } - const pipe = requestOps( - await this.deltaStorageP, - // Staging: starting with no concurrency, listening for feedback first. - // In future releases we will switch to actual concurrency - 1, // concurrency - from + 1, // from is exclusive, but ParallelRequests uses inclusive left - to, // exclusive right - MaxBatchDeltas, - ChildLogger.create(this.logger, undefined, { all: {reason: telemetryEventSuffix } }), - this.closeAbortController.signal, - ); + const storage = await this.deltaStorageP; + const stream = storage.fetchMessages( + from, // inclusive + to, // exclusive + this.closeAbortController.signal); // eslint-disable-next-line no-constant-condition while (true) { - const deltas = await pipe.pop(); - if (deltas === undefined) { + const result = await stream.read(); + if (result.done) { break; } PerformanceEvent.timedExec( this.logger, - { eventName: "GetDeltas_OpProcessing", count: deltas.length}, - () => callback(deltas), + { eventName: "GetDeltas_OpProcessing", count: result.value.length}, + () => callback(result.value), { end: true, cancel: "error" }); } } @@ -1342,7 +1334,7 @@ export class DeltaManager /** * Retrieves the missing deltas between the given sequence numbers */ - private fetchMissingDeltas(telemetryEventSuffix: string, fromArg: number, to?: number) { + private fetchMissingDeltas(telemetryEventSuffix: string, lastKnowOp: number, to?: number) { // Exit out early if we're already fetching deltas if (this.fetching) { return; @@ -1353,8 +1345,8 @@ export class DeltaManager return; } - assert(fromArg === this.lastQueuedSequenceNumber, 0x0f1 /* "from arg" */); - let from = fromArg; + assert(lastKnowOp === this.lastQueuedSequenceNumber, 0x0f1 /* "from arg" */); + let from = lastKnowOp + 1; const n = this.previouslyProcessedMessage?.sequenceNumber; if (n !== undefined) { @@ -1363,8 +1355,8 @@ export class DeltaManager // Knowing about this mechanism, we could ask for op we already observed to increase validation. // This is especially useful when coming out of offline mode or loading from // very old cached (by client / driver) snapshot. - assert(n === fromArg, 0x0f2 /* "previouslyProcessedMessage" */); - assert(from > 0, 0x0f3 /* "not positive" */); + assert(n === lastKnowOp, 0x0f2 /* "previouslyProcessedMessage" */); + assert(from > 1, 0x0f3 /* "not positive" */); from--; } diff --git a/packages/loader/driver-definitions/src/storage.ts b/packages/loader/driver-definitions/src/storage.ts index 4668e4cd4fd9..fbc46c27dd7b 100644 --- a/packages/loader/driver-definitions/src/storage.ts +++ b/packages/loader/driver-definitions/src/storage.ts @@ -49,8 +49,18 @@ export interface IDeltaStorageService { get( tenantId: string, id: string, - from: number, - to: number): Promise; + from: number, // inclusive + to: number // exclusive + ): Promise; +} + +export type IStreamResult = { done: true; } | { done: false; value: T; }; + +/** + * Read interface for the Queue + */ + export interface IStream { + read(): Promise>; } /** @@ -59,8 +69,16 @@ export interface IDeltaStorageService { export interface IDocumentDeltaStorageService { /** * Retrieves all the delta operations within the exclusive sequence number range - */ - get(from: number, to: number): Promise; + * @param from - first op to retrieve (inclusive) + * @param to - first op not to retrieve (exclusive end) + * @param abortSignal - signal that aborts operation + * @param cachedOnly - return only cached ops, i.e. ops available locally on client. + */ + fetchMessages(from: number, + to: number | undefined, + abortSignal?: AbortSignal, + cachedOnly?: boolean, + ): IStream; } export interface IDocumentStorageServicePolicies { diff --git a/packages/loader/driver-utils/src/parallelRequests.ts b/packages/loader/driver-utils/src/parallelRequests.ts index e1b2a2c0ad6a..7575bad56775 100644 --- a/packages/loader/driver-utils/src/parallelRequests.ts +++ b/packages/loader/driver-utils/src/parallelRequests.ts @@ -6,7 +6,7 @@ import { assert, Deferred, performance } from "@fluidframework/common-utils"; import { ITelemetryLogger } from "@fluidframework/common-definitions"; import { PerformanceEvent, TelemetryLogger } from "@fluidframework/telemetry-utils"; import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions"; -import { IDocumentDeltaStorageService } from "@fluidframework/driver-definitions"; +import { IDeltasFetchResult, IStream, IStreamResult } from "@fluidframework/driver-definitions"; import { getRetryDelayFromError, canRetryOnError, createGenericNetworkError } from "./network"; import { waitForConnectedState } from "./networkUtils"; @@ -284,24 +284,17 @@ export class ParallelRequests { } } -/** - * Read interface for the Queue - */ -export interface IReadPipe { - pop(): Promise; -} - /** * Helper queue class to allow async push / pull * It's essentially a pipe allowing multiple writers, and single reader */ -export class Queue implements IReadPipe { - private readonly queue: Promise[] = []; - private deferred: Deferred | undefined; +export class Queue implements IStream { + private readonly queue: Promise>[] = []; + private deferred: Deferred> | undefined; private done = false; public pushValue(value: T) { - this.pushCore(Promise.resolve(value)); + this.pushCore(Promise.resolve({ done: false, value })); } public pushError(error: any) { @@ -310,11 +303,11 @@ export class Queue implements IReadPipe { } public pushDone() { - this.pushCore(Promise.resolve(undefined)); + this.pushCore(Promise.resolve({ done: true })); this.done = true; } - protected pushCore(value: Promise) { + protected pushCore(value: Promise>) { assert(!this.done, 0x112 /* "cannot push onto queue if done" */); if (this.deferred) { assert(this.queue.length === 0, 0x113 /* "deferred queue should be empty" */); @@ -325,14 +318,14 @@ export class Queue implements IReadPipe { } } - public async pop(): Promise { + public async read(): Promise> { assert(this.deferred === undefined, 0x114 /* "cannot pop if deferred" */); - const el = this.queue.shift(); - if (el !== undefined) { - return el; + const value = this.queue.shift(); + if (value !== undefined) { + return value; } assert(!this.done, 0x115 /* "queue should not be done during pop" */); - this.deferred = new Deferred(); + this.deferred = new Deferred>(); return this.deferred.promise; } } @@ -348,7 +341,7 @@ export class Queue implements IReadPipe { * @returns - an object with resulting ops and cancellation / partial result flags */ async function getSingleOpBatch( - deltaStorage: IDocumentDeltaStorageService, + get: (from: number, to: number) => Promise, request: number, from: number, to: number, @@ -374,9 +367,7 @@ async function getSingleOpBatch( try { // Issue async request for deltas - limit the number fetched to MaxBatchDeltas canRetry = true; - // left is inclusive for ParallelRequests, but exclusive for IDocumentDeltaStorageService - // right is exclusive for both - const deltasP = deltaStorage.get(from - 1, to); + const deltasP = get(from, to); const { messages, partialResult } = await deltasP; deltas.push(...messages); @@ -467,14 +458,14 @@ async function getSingleOpBatch( } export function requestOps( - deltaStorage: IDocumentDeltaStorageService, + get: (from: number, to: number) => Promise, concurrency: number, from: number, to: number | undefined, payloadSize: number, logger: ITelemetryLogger, signal?: AbortSignal, -): IReadPipe { +): IStream { let requests = 0; let lastFetch: number | undefined; let deltasRetrievedTotal = 0; @@ -493,7 +484,7 @@ export function requestOps( logger, async (request: number, _from: number, _to: number, strongTo: boolean) => { requests++; - return getSingleOpBatch(deltaStorage, request, _from, _to, telemetryEvent, strongTo, signal); + return getSingleOpBatch(get, request, _from, _to, telemetryEvent, strongTo, signal); }, (deltas: ISequencedDocumentMessage[]) => { lastFetch = deltas[deltas.length - 1].sequenceNumber; @@ -521,3 +512,36 @@ export function requestOps( return queue; } + +export const emptyMessageStream: IStream = { + read: async () => { return { done: true };}, +}; + +export function streamFromMessages(messagesArg: Promise): + IStream +{ + let messages: Promise | undefined = messagesArg; + return { + read: async () => { + if (messages === undefined) { + return { done: true }; + } + const value = await messages; + messages = undefined; + return value.length === 0 ? { done: true } : { done: false, value }; + }, + }; +} + +// eslint-disable-next-line prefer-arrow/prefer-arrow-functions +export function streamObserver(stream: IStream, handler: (value: T) => void): IStream { + return { + read: async () => { + const value = await stream.read(); + if (value.done === false) { + handler(value.value); + } + return value; + }, + }; +} diff --git a/packages/loader/test-loader-utils/package.json b/packages/loader/test-loader-utils/package.json index 042db5d6395b..547661bbf908 100644 --- a/packages/loader/test-loader-utils/package.json +++ b/packages/loader/test-loader-utils/package.json @@ -27,6 +27,7 @@ "dependencies": { "@fluidframework/common-utils": "^0.29.0-0", "@fluidframework/driver-definitions": "^0.38.0", + "@fluidframework/driver-utils": "^0.38.0", "@fluidframework/protocol-definitions": "^0.1022.0-0" }, "devDependencies": { diff --git a/packages/loader/test-loader-utils/src/mockDeltaStorage.ts b/packages/loader/test-loader-utils/src/mockDeltaStorage.ts index 010a4e9f306d..d417d9678a4b 100644 --- a/packages/loader/test-loader-utils/src/mockDeltaStorage.ts +++ b/packages/loader/test-loader-utils/src/mockDeltaStorage.ts @@ -3,7 +3,11 @@ * Licensed under the MIT License. */ -import { IDocumentDeltaStorageService, IDeltasFetchResult } from "@fluidframework/driver-definitions"; +import { + IDocumentDeltaStorageService, + IStream, +} from "@fluidframework/driver-definitions"; +import { streamFromMessages } from "@fluidframework/driver-utils"; import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions"; /** @@ -14,21 +18,30 @@ export class MockDocumentDeltaStorageService implements IDocumentDeltaStorageSer this.messages = messages.sort((a, b) => b.sequenceNumber - a.sequenceNumber); } - public async get(from: number, to: number): Promise { + public fetchMessages( + from: number, // inclusive + to: number | undefined, // exclusive + abortSignal?: AbortSignal, + cachedOnly?: boolean, + ): IStream { + return streamFromMessages(this.getCore(from, to)); + } + + private async getCore(from: number, to?: number) { const messages: ISequencedDocumentMessage[] = []; let index: number = 0; // Find first - while (index < this.messages.length && this.messages[index].sequenceNumber <= from) { + while (index < this.messages.length && this.messages[index].sequenceNumber < from) { index++; } // start reading - while (index < this.messages.length && this.messages[index].sequenceNumber < to) { + while (index < this.messages.length && (to === undefined || this.messages[index].sequenceNumber < to)) { messages.push(this.messages[index]); index++; } - return { messages, partialResult: false }; + return messages; } } diff --git a/packages/test/local-server-tests/src/test/noDeltaStream.spec.ts b/packages/test/local-server-tests/src/test/noDeltaStream.spec.ts index ee3aaa142339..50a7e9df282b 100644 --- a/packages/test/local-server-tests/src/test/noDeltaStream.spec.ts +++ b/packages/test/local-server-tests/src/test/noDeltaStream.spec.ts @@ -22,7 +22,7 @@ import { TestContainerRuntimeFactory, TestFluidObjectFactory, } from "@fluidframework/test-utils"; -import { Container, DeltaManager } from "@fluidframework/container-loader"; +import { Container, DeltaManager, waitContainerToCatchUp } from "@fluidframework/container-loader"; describe("No Delta Stream", () => { const documentId = "localServerTest"; @@ -52,13 +52,17 @@ describe("No Delta Stream", () => { return container; } - async function loadContainer(storageOnly: boolean): Promise { + async function loadContainer(storageOnly: boolean, track = true): Promise { + const service = new LocalDocumentServiceFactory(deltaConnectionServer, { storageOnly }); const loader = createLoader( [[codeDetails, factory]], - new LocalDocumentServiceFactory(deltaConnectionServer, { storageOnly }), + service, new LocalResolver()); - loaderContainerTracker.add(loader); + if (!storageOnly) { + loaderContainerTracker.add(loader); + } const container = await loader.resolve({ url: documentLoadUrl }); + await loaderContainerTracker.ensureSynchronized(); return container; } @@ -116,7 +120,7 @@ describe("No Delta Stream", () => { }); it("doesn't affect normal containers", async () => { - await loadContainer(true) as Container; + await loadContainer(true, false) as Container; const normalContainer1 = await loadContainer(false) as Container; const normalContainer2 = await loadContainer(false) as Container; const normalDataObject1 = await requestFluidObject(normalContainer1, "default"); @@ -127,6 +131,7 @@ describe("No Delta Stream", () => { assert.strictEqual(await normalDataObject2.root.wait("fluid"), "great"); const storageOnlyContainer = await loadContainer(true); + await waitContainerToCatchUp(storageOnlyContainer as Container); const storageOnlyDataObject = await requestFluidObject(storageOnlyContainer, "default"); assert.strictEqual(await storageOnlyDataObject.root.wait("prague"), "a city in europe"); assert.strictEqual(await storageOnlyDataObject.root.wait("fluid"), "great"); diff --git a/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts b/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts index e4c3cab0fbd6..d32d0a51b86e 100644 --- a/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts +++ b/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts @@ -57,7 +57,7 @@ describe("Ops on Reconnect", () => { /** * Waits for the "connected" event from the given container. */ - async function waitForContainerReconnection(container: Container): Promise { + async function waitForContainerReconnection(container: IContainer): Promise { await new Promise((resolve) => container.once("connected", () => resolve())); } @@ -102,6 +102,7 @@ describe("Ops on Reconnect", () => { async function setupSecondContainersDataObject(): Promise { const loader = await createLoader(); const container2 = await loader.resolve({ url: documentLoadUrl }); + await waitForContainerReconnection(container2); container2.on("op", (containerMessage: ISequencedDocumentMessage) => { if (!isRuntimeMessage(containerMessage)) { return; diff --git a/packages/test/test-drivers/src/odspDriverApi.ts b/packages/test/test-drivers/src/odspDriverApi.ts index 25e3128a7cf1..c3c737a11d3d 100644 --- a/packages/test/test-drivers/src/odspDriverApi.ts +++ b/packages/test/test-drivers/src/odspDriverApi.ts @@ -48,6 +48,8 @@ export const odspOpsCaching: OptionsMatrix = { export const odspHostPolicyMatrix = new Lazy> (()=>({ blobDeduping: booleanCases, concurrentSnapshotFetch: booleanCases, + opsBatchSize: numberCases, + concurrentOpsBatches: numberCases, snapshotOptions:[undefined, ...generatePairwiseOptions(odspSnapshotOptions)], opsCaching: [undefined, ...generatePairwiseOptions(odspOpsCaching)], })); diff --git a/packages/test/test-version-utils/src/compatUtils.ts b/packages/test/test-version-utils/src/compatUtils.ts index 3ea03b3b3a52..8978b6ad6077 100644 --- a/packages/test/test-version-utils/src/compatUtils.ts +++ b/packages/test/test-version-utils/src/compatUtils.ts @@ -76,7 +76,7 @@ function createGetDataStoreFactoryFunction(api: ReturnType Promise.resolve(""); const odspDocumentServiceFactory = new odsp.OdspDocumentServiceFactory( getStorageTokenStub, - getWebsocketTokenStub); + getWebsocketTokenStub, + undefined, + { + opsBatchSize: 20000, + concurrentOpsBatches: 4, + }); return odspDocumentServiceFactory.createDocumentService(odspResolvedUrl); } diff --git a/packages/tools/fetch-tool/src/fluidFetchMessages.ts b/packages/tools/fetch-tool/src/fluidFetchMessages.ts index f29434aa33ab..38cce36a8b56 100644 --- a/packages/tools/fetch-tool/src/fluidFetchMessages.ts +++ b/packages/tools/fetch-tool/src/fluidFetchMessages.ts @@ -5,7 +5,6 @@ import fs from "fs"; import { assert} from "@fluidframework/common-utils"; -import { TelemetryUTLogger } from "@fluidframework/telemetry-utils"; import { IDocumentService, } from "@fluidframework/driver-definitions"; @@ -15,7 +14,6 @@ import { MessageType, ScopeType, } from "@fluidframework/protocol-definitions"; -import { requestOps } from "@fluidframework/driver-utils"; import { printMessageStats } from "./fluidAnalyzeMessages"; import { connectToWebSocket, @@ -68,24 +66,18 @@ async function* loadAllSequencedMessages( let requests = 0; let opsStorage = 0; - const concurrency = 4; - const batch = 20000; // see data in issue #5211 on possible sizes we can use. - - const queue = requestOps( - deltaStorage, - concurrency, + const stream = deltaStorage.fetchMessages( lastSeq + 1, // inclusive left undefined, // to - batch, - new TelemetryUTLogger(), ); while (true) { - const messages = await queue.pop(); - if (messages === undefined) { + const result = await stream.read(); + if (result.done) { break; } requests++; + const messages = result.value; // Empty buckets should never be returned assert(messages.length !== 0, 0x1ba /* "should not return empty buckets" */); @@ -108,7 +100,7 @@ async function* loadAllSequencedMessages( } // eslint-disable-next-line max-len - console.log(`\n${Math.floor((Date.now() - timeStart) / 1000)} seconds to retrieve ${opsStorage} ops in ${requests} requests, using ${concurrency} parallel requests`); + console.log(`\n${Math.floor((Date.now() - timeStart) / 1000)} seconds to retrieve ${opsStorage} ops in ${requests} requests`); if (connectToWebSocket) { let logMsg = ""; diff --git a/server/routerlicious/packages/lambdas/src/scribe/interfaces.ts b/server/routerlicious/packages/lambdas/src/scribe/interfaces.ts index 89fcb74126db..ec9243ef563e 100644 --- a/server/routerlicious/packages/lambdas/src/scribe/interfaces.ts +++ b/server/routerlicious/packages/lambdas/src/scribe/interfaces.ts @@ -63,7 +63,7 @@ export interface IPendingMessageReader { * @param from Starting sequence number (inclusive) * @param to End sequence number (inclusive) */ - readMessages(from: number, to: number): Promise; + readMessages(from: number, to: number): Promise; } /** diff --git a/server/routerlicious/packages/routerlicious-base/src/alfred/routes/api/deltas.ts b/server/routerlicious/packages/routerlicious-base/src/alfred/routes/api/deltas.ts index 4c04963f75c5..3ec7ce769f39 100644 --- a/server/routerlicious/packages/routerlicious-base/src/alfred/routes/api/deltas.ts +++ b/server/routerlicious/packages/routerlicious-base/src/alfred/routes/api/deltas.ts @@ -24,7 +24,7 @@ import winston from "winston"; import { IAlfredTenant } from "@fluidframework/server-services-client"; import { Constants } from "../../../utils"; -export async function getDeltas( +async function getDeltas( mongoManager: MongoManager, collectionName: string, tenantId: string, diff --git a/server/tinylicious/src/routes/ordering/deltas.ts b/server/tinylicious/src/routes/ordering/deltas.ts index 1ad2d1da76ae..215d84e53f6b 100644 --- a/server/tinylicious/src/routes/ordering/deltas.ts +++ b/server/tinylicious/src/routes/ordering/deltas.ts @@ -9,7 +9,7 @@ import { Router } from "express"; import { Provider } from "nconf"; import { getParam, queryParamToNumber } from "../../utils"; -export async function getDeltas( +async function getDeltas( mongoManager: MongoManager, collectionName: string, tenantId: string,