Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move drivers to streaming API for op retrieval #5703

Merged
merged 13 commits into from
Apr 8, 2021
31 changes: 7 additions & 24 deletions packages/drivers/debugger/src/fluidDebuggerController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import { Sanitizer } from "./sanitizer";

export type debuggerUIFactory = (controller: IDebuggerController) => IDebuggerUI | null;

const MaxBatchDeltas = 2000;

/**
* Replay controller that uses pop-up window to control op playback
*/
Expand Down Expand Up @@ -172,8 +170,8 @@ export class DebugReplayController extends ReplayController implements IDebugger
return messages;
}

public fetchTo(currentOp: number): number {
return currentOp + MaxBatchDeltas;
public fetchTo(currentOp: number): number | undefined {
return undefined;
}

// Returns true if version / file / ops selections is made.
Expand Down Expand Up @@ -345,27 +343,12 @@ export class DebugReplayController extends ReplayController implements IDebugger
}

async function* generateSequencedMessagesFromDeltaStorage(deltaStorage: IDocumentDeltaStorageService) {
let lastSeq = 0;
const batch = 2000;
const stream = deltaStorage.fetchMessages(1, undefined);
while (true) {
const { messages, partialResult } = await loadChunk(lastSeq, lastSeq + batch, deltaStorage);
if (messages.length === 0) {
assert(!partialResult,
0x087 /* "No messages loaded from chunk, but nonzero number of partial results loaded from chunk!" */);
break;
}
yield messages;
lastSeq = messages[messages.length - 1].sequenceNumber;
}
}

async function loadChunk(from: number, to: number, deltaStorage: IDocumentDeltaStorageService) {
for (let iter = 0; iter < 3; iter++) {
try {
return await deltaStorage.get(from, to);
} catch (error) {
// Retry
const result = await stream.read();
if (result.done) {
return;
}
yield result.value;
}
throw new Error("Giving up after 3 attempts to download chunk of ops.");
}
15 changes: 8 additions & 7 deletions packages/drivers/file-driver/src/fileDeltaStorageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

import fs from "fs";
import { assert } from "@fluidframework/common-utils";
import { IDocumentDeltaStorageService, IDeltasFetchResult } from "@fluidframework/driver-definitions";
import { IDocumentDeltaStorageService, IStream } from "@fluidframework/driver-definitions";
import { emptyMessageStream } from "@fluidframework/driver-utils";
import * as api from "@fluidframework/protocol-definitions";

/**
Expand Down Expand Up @@ -33,12 +34,12 @@ export class FileDeltaStorageService implements IDocumentDeltaStorageService {
}
}

public async get(
from?: number,
to?: number,
): Promise<IDeltasFetchResult> {
// Do not allow container move forward
return { messages: [], partialResult: false };
public fetchMessages(from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean,
): IStream<api.ISequencedDocumentMessage[]> {
return emptyMessageStream;
}

public get ops(): readonly Readonly<api.ISequencedDocumentMessage>[] {
Expand Down
2 changes: 1 addition & 1 deletion packages/drivers/iframe-driver/src/innerDocumentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class InnerDocumentService implements IDocumentService {
*/
public async connectToDeltaStorage(): Promise<IDocumentDeltaStorageService> {
return {
get: async (from: number, to: number) => this.outerProxy.deltaStorage.get(from, to),
fetchMessages: (...args) => this.outerProxy.deltaStorage.fetchMessages(...args),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ export class DocumentServiceFactoryProxy implements IDocumentServiceFactoryProxy
}

private getDeltaStorage(deltaStorage: IDocumentDeltaStorageService): IDocumentDeltaStorageService {
const get = Comlink.proxy(async (from: number, to: number) => deltaStorage.get(from, to));
const fetchMessages = Comlink.proxy(deltaStorage.fetchMessages.bind(deltaStorage));

return {
get,
fetchMessages,
};
}

Expand Down
22 changes: 18 additions & 4 deletions packages/drivers/local-driver/src/localDeltaStorageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
*/

import * as api from "@fluidframework/driver-definitions";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";
import { IDatabaseManager } from "@fluidframework/server-services-core";
import { streamFromMessages } from "@fluidframework/driver-utils";

export class LocalDeltaStorageService implements api.IDocumentDeltaStorageService {
constructor(
Expand All @@ -13,15 +15,27 @@ export class LocalDeltaStorageService implements api.IDocumentDeltaStorageServic
private readonly databaseManager: IDatabaseManager) {
}

public async get(from: number, to: number): Promise<api.IDeltasFetchResult> {
public fetchMessages(
from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean,
): api.IStream<ISequencedDocumentMessage[]> {
return streamFromMessages(this.getCore(from, to));
}

private async getCore(from: number, to?: number) {
const query = { documentId: this.id, tenantId: this.tenantId };
query["operation.sequenceNumber"] = {};
query["operation.sequenceNumber"].$gt = from;
query["operation.sequenceNumber"].$lt = to;
query["operation.sequenceNumber"].$gt = from - 1; // from is inclusive

// This looks like a bug. It used to work without setting $lt key. Now it does not
// Need follow up
query["operation.sequenceNumber"].$lt = to ?? Number.MAX_SAFE_INTEGER;

const allDeltas = await this.databaseManager.getDeltaCollection(this.tenantId, this.id);
const dbDeltas = await allDeltas.find(query, { "operation.sequenceNumber": 1 });
const messages = dbDeltas.map((delta) => delta.operation);
return { messages, partialResult: false };
return messages;
}
}
4 changes: 4 additions & 0 deletions packages/drivers/odsp-driver/src/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ export interface HostStoragePolicy {
concurrentSnapshotFetch?: boolean;

blobDeduping?: boolean;

// Options overwriting default ops fetching from storage.
opsBatchSize?: number;
concurrentOpsBatches?: number;
}

/**
Expand Down
9 changes: 3 additions & 6 deletions packages/drivers/odsp-driver/src/odspDeltaStorageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { TokenFetchOptions } from "./tokenFetch";
/**
* Provides access to the underlying delta storage on the server for sharepoint driver.
*/
export class OdspDeltaStorageService implements api.IDocumentDeltaStorageService {
vladsud marked this conversation as resolved.
Show resolved Hide resolved
export class OdspDeltaStorageService {
constructor(
private readonly deltaFeedUrlProvider: () => Promise<string>,
private ops: ISequencedDeltaOpMessage[] | undefined,
Expand All @@ -32,7 +32,7 @@ export class OdspDeltaStorageService implements api.IDocumentDeltaStorageService
const ops = this.ops;
this.ops = undefined;
if (ops !== undefined) {
const messages = ops.filter((op) => op.sequenceNumber > from).map((op) => op.op);
const messages = ops.filter((op) => op.sequenceNumber >= from).map((op) => op.op);
if (messages.length > 0) {
return { messages, partialResult: true };
}
Expand Down Expand Up @@ -74,10 +74,7 @@ export class OdspDeltaStorageService implements api.IDocumentDeltaStorageService
}

public async buildUrl(from: number, to: number) {
const fromInclusive = from + 1;
const toInclusive = to - 1;

const filter = encodeURIComponent(`sequenceNumber ge ${fromInclusive} and sequenceNumber le ${toInclusive}`);
const filter = encodeURIComponent(`sequenceNumber ge ${from} and sequenceNumber le ${to - 1}`);
const queryString = `?filter=${filter}`;
return `${await this.deltaFeedUrlProvider()}${queryString}`;
}
Expand Down
32 changes: 26 additions & 6 deletions packages/drivers/odsp-driver/src/odspDocumentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
IDocumentStorageService,
IDocumentServicePolicies,
} from "@fluidframework/driver-definitions";
import { canRetryOnError } from "@fluidframework/driver-utils";
import { canRetryOnError, requestOps, emptyMessageStream } from "@fluidframework/driver-utils";
import { fetchTokenErrorCode, throwOdspNetworkError } from "@fluidframework/odsp-doclib-utils";
import {
IClient,
Expand Down Expand Up @@ -235,12 +235,32 @@ export class OdspDocumentService implements IDocumentService {
this.logger,
);

// batch size, please see issue #5211 for data around batch sizing
const batchSize = this.hostPolicy.opsBatchSize ?? 5000;
const concurrency = this.hostPolicy.concurrentOpsBatches ?? 1;

return {
get: async (from: number, to: number) => {
const { messages, partialResult } = await service.get(from, to);
this.opsReceived(messages);
return { messages, partialResult };
},
fetchMessages: (
from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean) => {
// Proper implementaiton Coming in future
if (cachedOnly) {
return emptyMessageStream;
}
return requestOps(
service.get.bind(service),
// Staging: starting with no concurrency, listening for feedback first.
// In future releases we will switch to actual concurrency
concurrency,
from, // inclusive
to, // exclusive
batchSize,
this.logger,
abortSignal,
);
},
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe("DeltaStorageService", () => {
async (_refresh) => "?access_token=123",
new EpochTracker(new LocalPersistentCacheAdapter(new LocalPersistentCache()), logger),
logger);
const actualDeltaUrl = await deltaStorageService.buildUrl(2, 8);
const actualDeltaUrl = await deltaStorageService.buildUrl(3, 8);
// eslint-disable-next-line max-len
const expectedDeltaUrl = `${deltaStorageBasePath}/drives/testdrive/items/testitem/opStream?filter=sequenceNumber%20ge%203%20and%20sequenceNumber%20le%207`;
assert.equal(actualDeltaUrl, expectedDeltaUrl, "The constructed delta url is invalid");
Expand Down
13 changes: 10 additions & 3 deletions packages/drivers/replay-driver/src/emptyDeltaStorageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
* Licensed under the MIT License.
*/

import { IDocumentDeltaStorageService, IDeltasFetchResult } from "@fluidframework/driver-definitions";
import { IDocumentDeltaStorageService, IStream } from "@fluidframework/driver-definitions";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";
import { emptyMessageStream } from "@fluidframework/driver-utils";

export class EmptyDeltaStorageService implements IDocumentDeltaStorageService {
/**
Expand All @@ -12,7 +14,12 @@ export class EmptyDeltaStorageService implements IDocumentDeltaStorageService {
* @param to - Op are returned from to - 1.
* @returns Array of ops requested by the user.
*/
public async get(from: number, to: number): Promise<IDeltasFetchResult> {
return { messages: [], partialResult: false };
public fetchMessages(
from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean): IStream<ISequencedDocumentMessage[]>
{
return emptyMessageStream;
}
}
2 changes: 1 addition & 1 deletion packages/drivers/replay-driver/src/replayController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export abstract class ReplayController extends ReadDocumentStorageServiceBase {
* Note: this API is called while replay() is in progress - next batch of ops is downloaded in parallel
* @param currentOp - current op
*/
public abstract fetchTo(currentOp: number): number;
public abstract fetchTo(currentOp: number): number | undefined;

/**
* Returns true if no more ops should be processed (or downloaded for future processing).
Expand Down
46 changes: 24 additions & 22 deletions packages/drivers/replay-driver/src/replayDocumentDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ import {
IVersion,
ScopeType,
} from "@fluidframework/protocol-definitions";
import { assert, TypedEventEmitter } from "@fluidframework/common-utils";
import { TypedEventEmitter } from "@fluidframework/common-utils";
import { debug } from "./debug";
import { ReplayController } from "./replayController";

const MaxBatchDeltas = 2000;

const ReplayDocumentId = "documentId";

export class ReplayControllerStatic extends ReplayController {
Expand Down Expand Up @@ -76,9 +74,10 @@ export class ReplayControllerStatic extends ReplayController {
}

public fetchTo(currentOp: number) {
const useFetchToBatch = !(this.unitIsTime !== true && this.replayTo >= 0);
const fetchToBatch = currentOp + MaxBatchDeltas;
return useFetchToBatch ? fetchToBatch : Math.min(fetchToBatch, this.replayTo);
if (!(this.unitIsTime !== true && this.replayTo >= 0)) {
return undefined;
}
return this.replayTo;
}

public isDoneFetch(currentOp: number, lastTimeStamp?: number) {
Expand Down Expand Up @@ -166,7 +165,6 @@ export class ReplayControllerStatic extends ReplayController {
}
}
}
// eslint-disable-next-line @typescript-eslint/no-use-before-define
scheduleNext(nextInterval);
emitter(playbackOps);
};
Expand Down Expand Up @@ -312,26 +310,30 @@ export class ReplayDocumentDeltaConnection
do {
const fetchTo = controller.fetchTo(currentOp);

const { messages, partialResult } = await documentStorageService.get(currentOp, fetchTo);

if (messages.length === 0) {
// No more ops. But, they can show up later, either because document was just created,
// or because another client keeps submitting new ops.
assert(!partialResult, 0x0af /* "No more ops, but nonzero partial results!" */);
if (controller.isDoneFetch(currentOp, undefined)) {
const abortController = new AbortController();
const stream = documentStorageService.fetchMessages(currentOp + 1, fetchTo, abortController.signal);
do {
const result = await stream.read();

if (result.done) {
// No more ops. But, they can show up later, either because document was just created,
// or because another client keeps submitting new ops.
done = controller.isDoneFetch(currentOp, undefined);
if (!done) {
await delay(2000);
}
break;
}
await delay(2000);
continue;
}
replayPromiseChain = replayPromiseChain.then(
async () => controller.replay((ops) => this.emit("op", ReplayDocumentId, ops), messages));

replayPromiseChain = replayPromiseChain.then(
async () => controller.replay((ops) => this.emit("op", ReplayDocumentId, ops), messages));
const messages = result.value;
currentOp += messages.length;
done = controller.isDoneFetch(currentOp, messages[messages.length - 1].timestamp);
} while (!done);

currentOp += messages.length;
done = controller.isDoneFetch(currentOp, messages[messages.length - 1].timestamp);
abortController.abort();
} while (!done);

return replayPromiseChain;
}
}
Loading