Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move drivers to streaming API for op retrieval #5703

Merged
merged 13 commits into from
Apr 8, 2021
31 changes: 7 additions & 24 deletions packages/drivers/debugger/src/fluidDebuggerController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import { Sanitizer } from "./sanitizer";

export type debuggerUIFactory = (controller: IDebuggerController) => IDebuggerUI | null;

const MaxBatchDeltas = 2000;

/**
* Replay controller that uses pop-up window to control op playback
*/
Expand Down Expand Up @@ -172,8 +170,8 @@ export class DebugReplayController extends ReplayController implements IDebugger
return messages;
}

public fetchTo(currentOp: number): number {
return currentOp + MaxBatchDeltas;
public fetchTo(currentOp: number): number | undefined {
return undefined;
}

// Returns true if version / file / ops selections is made.
Expand Down Expand Up @@ -345,27 +343,12 @@ export class DebugReplayController extends ReplayController implements IDebugger
}

async function* generateSequencedMessagesFromDeltaStorage(deltaStorage: IDocumentDeltaStorageService) {
let lastSeq = 0;
const batch = 2000;
const stream = deltaStorage.fetchMessages(1, undefined);
while (true) {
const { messages, partialResult } = await loadChunk(lastSeq, lastSeq + batch, deltaStorage);
if (messages.length === 0) {
assert(!partialResult,
0x087 /* "No messages loaded from chunk, but nonzero number of partial results loaded from chunk!" */);
break;
}
yield messages;
lastSeq = messages[messages.length - 1].sequenceNumber;
}
}

async function loadChunk(from: number, to: number, deltaStorage: IDocumentDeltaStorageService) {
for (let iter = 0; iter < 3; iter++) {
try {
return await deltaStorage.get(from, to);
} catch (error) {
// Retry
const result = await stream.read();
if (result.done) {
return;
}
yield result.value;
}
throw new Error("Giving up after 3 attempts to download chunk of ops.");
}
15 changes: 8 additions & 7 deletions packages/drivers/file-driver/src/fileDeltaStorageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

import fs from "fs";
import { assert } from "@fluidframework/common-utils";
import { IDocumentDeltaStorageService, IDeltasFetchResult } from "@fluidframework/driver-definitions";
import { IDocumentDeltaStorageService, IStream } from "@fluidframework/driver-definitions";
import { emptyMessageStream } from "@fluidframework/driver-utils";
import * as api from "@fluidframework/protocol-definitions";

/**
Expand Down Expand Up @@ -33,12 +34,12 @@ export class FileDeltaStorageService implements IDocumentDeltaStorageService {
}
}

public async get(
from?: number,
to?: number,
): Promise<IDeltasFetchResult> {
// Do not allow container move forward
return { messages: [], partialResult: false };
public fetchMessages(from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean,
): IStream<api.ISequencedDocumentMessage[]> {
return emptyMessageStream;
}

public get ops(): readonly Readonly<api.ISequencedDocumentMessage>[] {
Expand Down
2 changes: 1 addition & 1 deletion packages/drivers/iframe-driver/src/innerDocumentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class InnerDocumentService implements IDocumentService {
*/
public async connectToDeltaStorage(): Promise<IDocumentDeltaStorageService> {
return {
get: async (from: number, to: number) => this.outerProxy.deltaStorage.get(from, to),
fetchMessages: (...args) => this.outerProxy.deltaStorage.fetchMessages(...args),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ export class DocumentServiceFactoryProxy implements IDocumentServiceFactoryProxy
}

private getDeltaStorage(deltaStorage: IDocumentDeltaStorageService): IDocumentDeltaStorageService {
const get = Comlink.proxy(async (from: number, to: number) => deltaStorage.get(from, to));
const fetchMessages = Comlink.proxy(deltaStorage.fetchMessages.bind(deltaStorage));

return {
get,
fetchMessages,
};
}

Expand Down
22 changes: 18 additions & 4 deletions packages/drivers/local-driver/src/localDeltaStorageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
*/

import * as api from "@fluidframework/driver-definitions";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";
import { IDatabaseManager } from "@fluidframework/server-services-core";
import { streamFromMessages } from "@fluidframework/driver-utils";

export class LocalDeltaStorageService implements api.IDocumentDeltaStorageService {
constructor(
Expand All @@ -13,15 +15,27 @@ export class LocalDeltaStorageService implements api.IDocumentDeltaStorageServic
private readonly databaseManager: IDatabaseManager) {
}

public async get(from: number, to: number): Promise<api.IDeltasFetchResult> {
public fetchMessages(
from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean,
): api.IStream<ISequencedDocumentMessage[]> {
return streamFromMessages(this.getCore(from, to));
}

private async getCore(from: number, to?: number) {
const query = { documentId: this.id, tenantId: this.tenantId };
query["operation.sequenceNumber"] = {};
query["operation.sequenceNumber"].$gt = from;
query["operation.sequenceNumber"].$lt = to;
query["operation.sequenceNumber"].$gt = from - 1; // from is inclusive

// This looks like a bug. It used to work without setting $lt key. Now it does not
// Need follow up
query["operation.sequenceNumber"].$lt = to ?? Number.MAX_SAFE_INTEGER;

const allDeltas = await this.databaseManager.getDeltaCollection(this.tenantId, this.id);
const dbDeltas = await allDeltas.find(query, { "operation.sequenceNumber": 1 });
const messages = dbDeltas.map((delta) => delta.operation);
return { messages, partialResult: false };
return messages;
}
}
4 changes: 4 additions & 0 deletions packages/drivers/odsp-driver/src/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ export interface HostStoragePolicy {

blobDeduping?: boolean;

// Options overwriting default ops fetching from storage.
opsBatchSize?: number;
concurrentOpsBatches?: number;

/**
* Policy controlling ops caching (leveraging IPersistedCache passed to driver factory)
*/
Expand Down
7 changes: 2 additions & 5 deletions packages/drivers/odsp-driver/src/odspDeltaStorageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { TokenFetchOptions } from "./tokenFetch";
/**
* Provides access to the underlying delta storage on the server for sharepoint driver.
*/
export class OdspDeltaStorageService implements api.IDocumentDeltaStorageService {
vladsud marked this conversation as resolved.
Show resolved Hide resolved
export class OdspDeltaStorageService {
constructor(
private readonly deltaFeedUrl: string,
private readonly getStorageToken: (options: TokenFetchOptions, name?: string) => Promise<string | null>,
Expand Down Expand Up @@ -63,10 +63,7 @@ export class OdspDeltaStorageService implements api.IDocumentDeltaStorageService
}

public async buildUrl(from: number, to: number) {
const fromInclusive = from + 1;
const toInclusive = to - 1;

const filter = encodeURIComponent(`sequenceNumber ge ${fromInclusive} and sequenceNumber le ${toInclusive}`);
const filter = encodeURIComponent(`sequenceNumber ge ${from} and sequenceNumber le ${to - 1}`);
const queryString = `?filter=${filter}`;
return `${this.deltaFeedUrl}${queryString}`;
}
Expand Down
103 changes: 68 additions & 35 deletions packages/drivers/odsp-driver/src/odspDocumentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import {
IDocumentStorageService,
IDocumentServicePolicies,
} from "@fluidframework/driver-definitions";
import { canRetryOnError } from "@fluidframework/driver-utils";
import {
canRetryOnError,
requestOps,
streamObserver,
} from "@fluidframework/driver-utils";
import { fetchTokenErrorCode, throwOdspNetworkError } from "@fluidframework/odsp-doclib-utils";
import {
IClient,
Expand Down Expand Up @@ -223,41 +227,70 @@ export class OdspDocumentService implements IDocumentService {
this.logger,
);

let missed = false;
return {
get: async (from: number, to: number) => {
if (snapshotOps !== undefined && snapshotOps.length !== 0) {
const messages = snapshotOps.filter((op) => op.sequenceNumber > from).map((op) => op.op);
snapshotOps = undefined;
if (messages.length > 0 && messages[0].sequenceNumber === from + 1) {
// Consider not caching these ops as they will be cached as part of snapshot cache entry
this.opsReceived(messages);
return { messages, partialResult: true };
} else {
this.logger.sendErrorEvent({
eventName: "SnapshotOpsNotUsed",
length: messages.length,
first: messages[0].sequenceNumber,
from,
to,
});
}
}

// We always write ops sequentially. Once there is a miss, stop consulting cache.
// This saves a bit of processing time
if (!missed) {
const messagesFromCache = await this.opsCache?.get(from, to);
if (messagesFromCache !== undefined && messagesFromCache.length !== 0) {
return { messages: messagesFromCache as ISequencedDocumentMessage[], partialResult: true };
}
missed = true;
}
// batch size, please see issue #5211 for data around batch sizing
const batchSize = this.hostPolicy.opsBatchSize ?? 5000;
const concurrency = this.hostPolicy.concurrentOpsBatches ?? 1;

const result = await service.get(from, to);
this.opsReceived(result.messages);
return result;
},
return {
fetchMessages: (
from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean) => {
let missed = false;
const stream = requestOps(
async (f: number, t: number) => {
if (snapshotOps !== undefined && snapshotOps.length !== 0) {
const messages = snapshotOps.filter((op) =>
op.sequenceNumber > from).map((op) => op.op);
snapshotOps = undefined;
if (messages.length > 0 && messages[0].sequenceNumber === from + 1) {
// Consider not caching these ops as they will be cached as part of
// snapshot cache entry
this.opsReceived(messages);
return { messages, partialResult: true };
} else {
this.logger.sendErrorEvent({
eventName: "SnapshotOpsNotUsed",
length: messages.length,
first: messages[0].sequenceNumber,
from,
to,
});
}
}
// We always write ops sequentially. Once there is a miss, stop consulting cache.
// This saves a bit of processing time
if (!missed) {
const messagesFromCache = await this.opsCache?.get(from, to);
if (messagesFromCache !== undefined && messagesFromCache.length !== 0) {
return {
messages: messagesFromCache as ISequencedDocumentMessage[],
partialResult: true,
};
}
missed = true;
}

// Proper implementaiton Coming in future
if (cachedOnly) {
return { messages: [], partialResult: false };
}

return service.get(f, t);
},
// Staging: starting with no concurrency, listening for feedback first.
// In future releases we will switch to actual concurrency
concurrency,
from, // inclusive
to, // exclusive
batchSize,
this.logger,
abortSignal,
);

return streamObserver(stream, (ops) => this.opsReceived(ops));
},
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe("DeltaStorageService", () => {
async (_refresh) => "?access_token=123",
createUtEpochTracker(fileEntry, logger),
logger);
const actualDeltaUrl = await deltaStorageService.buildUrl(2, 8);
const actualDeltaUrl = await deltaStorageService.buildUrl(3, 8);
// eslint-disable-next-line max-len
const expectedDeltaUrl = `${deltaStorageBasePath}/drives/testdrive/items/testitem/opStream?filter=sequenceNumber%20ge%203%20and%20sequenceNumber%20le%207`;
assert.equal(actualDeltaUrl, expectedDeltaUrl, "The constructed delta url is invalid");
Expand Down
13 changes: 10 additions & 3 deletions packages/drivers/replay-driver/src/emptyDeltaStorageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
* Licensed under the MIT License.
*/

import { IDocumentDeltaStorageService, IDeltasFetchResult } from "@fluidframework/driver-definitions";
import { IDocumentDeltaStorageService, IStream } from "@fluidframework/driver-definitions";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";
import { emptyMessageStream } from "@fluidframework/driver-utils";

export class EmptyDeltaStorageService implements IDocumentDeltaStorageService {
/**
Expand All @@ -12,7 +14,12 @@ export class EmptyDeltaStorageService implements IDocumentDeltaStorageService {
* @param to - Op are returned from to - 1.
* @returns Array of ops requested by the user.
*/
public async get(from: number, to: number): Promise<IDeltasFetchResult> {
return { messages: [], partialResult: false };
public fetchMessages(
from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean): IStream<ISequencedDocumentMessage[]>
{
return emptyMessageStream;
}
}
2 changes: 1 addition & 1 deletion packages/drivers/replay-driver/src/replayController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export abstract class ReplayController extends ReadDocumentStorageServiceBase {
* Note: this API is called while replay() is in progress - next batch of ops is downloaded in parallel
* @param currentOp - current op
*/
public abstract fetchTo(currentOp: number): number;
public abstract fetchTo(currentOp: number): number | undefined;

/**
* Returns true if no more ops should be processed (or downloaded for future processing).
Expand Down
Loading