Skip to content

Commit

Permalink
Renames, remove cancel(), move closer to iterator shape of result to …
Browse files Browse the repository at this point in the history
…switch to async iterator in the future
  • Loading branch information
vladsud committed Apr 2, 2021
1 parent 1b4fb33 commit 2f1834d
Show file tree
Hide file tree
Showing 15 changed files with 49 additions and 53 deletions.
8 changes: 4 additions & 4 deletions packages/drivers/debugger/src/fluidDebuggerController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,12 @@ export class DebugReplayController extends ReplayController implements IDebugger
}

async function* generateSequencedMessagesFromDeltaStorage(deltaStorage: IDocumentDeltaStorageService) {
const pipe = deltaStorage.readMessages(1, undefined);
const stream = deltaStorage.fetchMessages(1, undefined);
while (true) {
const messages = await pipe.pop();
if (messages === undefined) {
const result = await stream.read();
if (result.done) {
return;
}
yield messages;
yield result.value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class FileDeltaStorageService implements IDocumentDeltaStorageService {
}
}

public readMessages(from: number,
public fetchMessages(from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean,
Expand Down
2 changes: 1 addition & 1 deletion packages/drivers/iframe-driver/src/innerDocumentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class InnerDocumentService implements IDocumentService {
*/
public async connectToDeltaStorage(): Promise<IDocumentDeltaStorageService> {
return {
readMessages: (...args) => this.outerProxy.deltaStorage.readMessages(...args),
fetchMessages: (...args) => this.outerProxy.deltaStorage.fetchMessages(...args),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ export class DocumentServiceFactoryProxy implements IDocumentServiceFactoryProxy
}

private getDeltaStorage(deltaStorage: IDocumentDeltaStorageService): IDocumentDeltaStorageService {
const readMessages = Comlink.proxy(deltaStorage.readMessages.bind(deltaStorage));
const fetchMessages = Comlink.proxy(deltaStorage.fetchMessages.bind(deltaStorage));

return {
readMessages,
fetchMessages,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class LocalDeltaStorageService implements api.IDocumentDeltaStorageServic
private readonly databaseManager: IDatabaseManager) {
}

public readMessages(
public fetchMessages(
from: number,
to: number | undefined,
abortSignal?: AbortSignal,
Expand Down
2 changes: 1 addition & 1 deletion packages/drivers/odsp-driver/src/odspDocumentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ export class OdspDocumentService implements IDocumentService {
const concurrency = this.hostPolicy.concurrentOpsBatches ?? 1;

return {
readMessages: (
fetchMessages: (
from: number,
to: number | undefined,
abortSignal?: AbortSignal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class EmptyDeltaStorageService implements IDocumentDeltaStorageService {
* @param to - Op are returned from to - 1.
* @returns Array of ops requested by the user.
*/
public readMessages(
public fetchMessages(
from: number,
to: number | undefined,
abortSignal?: AbortSignal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,12 @@ export class ReplayDocumentDeltaConnection
do {
const fetchTo = controller.fetchTo(currentOp);

const pipe = documentStorageService.readMessages(currentOp + 1, fetchTo);
const abortController = new AbortController();
const stream = documentStorageService.fetchMessages(currentOp + 1, fetchTo, abortController.signal);
do {
const messages = await pipe.pop();
const result = await stream.read();

if (messages === undefined) {
if (result.done) {
// No more ops. But, they can show up later, either because document was just created,
// or because another client keeps submitting new ops.
done = controller.isDoneFetch(currentOp, undefined);
Expand All @@ -326,11 +327,12 @@ export class ReplayDocumentDeltaConnection
replayPromiseChain = replayPromiseChain.then(
async () => controller.replay((ops) => this.emit("op", ReplayDocumentId, ops), messages));

const messages = result.value;
currentOp += messages.length;
done = controller.isDoneFetch(currentOp, messages[messages.length - 1].timestamp);
} while (!done);

pipe.cancel();
abortController.abort();
} while (!done);
return replayPromiseChain;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class DocumentDeltaStorageService implements IDocumentDeltaStorageService

private logtailSha: string | undefined = this.documentStorageService.logTailSha;

readMessages(from: number,
fetchMessages(from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean,
Expand Down
10 changes: 5 additions & 5 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -807,21 +807,21 @@ export class DeltaManager
}

const storage = await this.deltaStorageP;
const pipe = storage.readMessages(
const stream = storage.fetchMessages(
from, // inclusive
to, // exclusive
this.closeAbortController.signal);

// eslint-disable-next-line no-constant-condition
while (true) {
const deltas = await pipe.pop();
if (deltas === undefined) {
const result = await stream.read();
if (result.done) {
break;
}
PerformanceEvent.timedExec(
this.logger,
{ eventName: "GetDeltas_OpProcessing", count: deltas.length},
() => callback(deltas),
{ eventName: "GetDeltas_OpProcessing", count: result.value.length},
() => callback(result.value),
{ end: true, cancel: "error" });
}
}
Expand Down
10 changes: 4 additions & 6 deletions packages/loader/driver-definitions/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,13 @@ export interface IDeltaStorageService {
): Promise<IDeltasFetchResult>;
}

export type IStreamResult<T> = { done: true; } | { done: false; value: T; };

/**
* Read interface for the Queue
*/
export interface IStream<T> {
pop(): Promise<T | undefined>;
/**
* aborts processing and releases resources
*/
cancel(): void;
read(): Promise<IStreamResult<T>>;
}

/**
Expand All @@ -76,7 +74,7 @@ export interface IDocumentDeltaStorageService {
* @param abortSignal - signal that aborts operation
* @param cachedOnly - return only cached ops, i.e. ops available locally on client.
*/
readMessages(from: number,
fetchMessages(from: number,
to: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean,
Expand Down
37 changes: 16 additions & 21 deletions packages/loader/driver-utils/src/parallelRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { assert, Deferred, performance } from "@fluidframework/common-utils";
import { ITelemetryLogger } from "@fluidframework/common-definitions";
import { PerformanceEvent, TelemetryLogger } from "@fluidframework/telemetry-utils";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";
import { IDeltasFetchResult, IStream } from "@fluidframework/driver-definitions";
import { IDeltasFetchResult, IStream, IStreamResult } from "@fluidframework/driver-definitions";
import { getRetryDelayFromError, canRetryOnError, createGenericNetworkError } from "./network";
import { waitForConnectedState } from "./networkUtils";

Expand Down Expand Up @@ -289,12 +289,12 @@ export class ParallelRequests<T> {
* It's essentially a pipe allowing multiple writers, and single reader
*/
export class Queue<T> implements IStream<T> {
private readonly queue: Promise<T | undefined>[] = [];
private deferred: Deferred<T | undefined> | undefined;
private readonly queue: Promise<IStreamResult<T>>[] = [];
private deferred: Deferred<IStreamResult<T>> | undefined;
private done = false;

public pushValue(value: T) {
this.pushCore(Promise.resolve(value));
this.pushCore(Promise.resolve({ done: false, value }));
}

public pushError(error: any) {
Expand All @@ -303,11 +303,11 @@ export class Queue<T> implements IStream<T> {
}

public pushDone() {
this.pushCore(Promise.resolve(undefined));
this.pushCore(Promise.resolve({ done: true }));
this.done = true;
}

protected pushCore(value: Promise<T | undefined>) {
protected pushCore(value: Promise<IStreamResult<T>>) {
assert(!this.done, 0x112 /* "cannot push onto queue if done" */);
if (this.deferred) {
assert(this.queue.length === 0, 0x113 /* "deferred queue should be empty" */);
Expand All @@ -318,19 +318,16 @@ export class Queue<T> implements IStream<T> {
}
}

public async pop(): Promise<T | undefined> {
public async read(): Promise<IStreamResult<T>> {
assert(this.deferred === undefined, 0x114 /* "cannot pop if deferred" */);
const el = this.queue.shift();
if (el !== undefined) {
return el;
const value = this.queue.shift();
if (value !== undefined) {
return value;
}
assert(!this.done, 0x115 /* "queue should not be done during pop" */);
this.deferred = new Deferred<T>();
this.deferred = new Deferred<IStreamResult<T>>();
return this.deferred.promise;
}

public cancel() {
}
}

/**
Expand Down Expand Up @@ -517,23 +514,21 @@ export function requestOps(
}

export const emptyMessageStream: IStream<ISequencedDocumentMessage[]> = {
pop: async () => undefined,
cancel: () => {},
read: async () => { return { done: true };},
};

export function streamFromMessages(messagesArg: Promise<ISequencedDocumentMessage[]>):
IStream<ISequencedDocumentMessage[]>
{
let messages: Promise<ISequencedDocumentMessage[]> | undefined = messagesArg;
return {
pop: async () => {
read: async () => {
if (messages === undefined) {
return undefined;
return { done: true };
}
const result = await messages;
const value = await messages;
messages = undefined;
return result.length === 0 ? undefined : result;
return value.length === 0 ? { done: true } : { done: false, value };
},
cancel: () => {},
};
}
2 changes: 1 addition & 1 deletion packages/loader/test-loader-utils/src/mockDeltaStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export class MockDocumentDeltaStorageService implements IDocumentDeltaStorageSer
this.messages = messages.sort((a, b) => b.sequenceNumber - a.sequenceNumber);
}

public readMessages(
public fetchMessages(
from: number, // inclusive
to: number | undefined, // exclusive
abortSignal?: AbortSignal,
Expand Down
7 changes: 4 additions & 3 deletions packages/tools/fetch-tool/src/fluidFetchMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,18 @@ async function* loadAllSequencedMessages(
let requests = 0;
let opsStorage = 0;

const queue = deltaStorage.readMessages(
const stream = deltaStorage.fetchMessages(
lastSeq + 1, // inclusive left
undefined, // to
);

while (true) {
const messages = await queue.pop();
if (messages === undefined) {
const result = await stream.read();
if (result.done) {
break;
}
requests++;
const messages = result.value;

// Empty buckets should never be returned
assert(messages.length !== 0, 0x1ba /* "should not return empty buckets" */);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export interface IPendingMessageReader {
* @param from Starting sequence number (inclusive)
* @param to End sequence number (inclusive)
*/
readMessages(from: number, to: number): Promise<ISequencedDocumentMessage[]>;
fetchMessages(from: number, to: number): Promise<ISequencedDocumentMessage[]>;
}

/**
Expand Down

0 comments on commit 2f1834d

Please sign in to comment.