Skip to content

Commit

Permalink
Implement parallel ops requesting (#5407)
Browse files Browse the repository at this point in the history
Implements and closes #5393

Implement ability to to do multiple parallel requests to storage to speed up retrieval of ops.
This change implements ability to do concurrent fetches. It does not change behaviour of DeltaManager for now, and only enables this new capability in fetch-tool with 4 parallel requests by 20K ops.
Future work in this area is tracked by issue #5523.

This new capability will help in following scenarios:
- Boot from cached (stale) snapshot (in future)
- Being offline for a day - week (in future)
- fetching a lot of ops using fetch-tool (in this PR)

IN future, I'll refactor this approach into an adapter layer that drivers could use to implement parallel requests (or not), as well as their batching strategy.
An API that will be exposed from driver will be an IReadPipe API and full (including infinite) request, where DeltaManager no longer would control batching or parallelism.
It's easy to start with no parallelism and add concurrency (workers) later. We need to explore that such that we start with one request and add more parallel requests only when we know we are far behind. That will reduce pressure on storage (extra empty requests for nothing).
  • Loading branch information
vladsud authored Mar 14, 2021
1 parent 56afaaa commit c9cfcd1
Show file tree
Hide file tree
Showing 8 changed files with 698 additions and 106 deletions.
179 changes: 99 additions & 80 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import {
createWriteError,
createGenericNetworkError,
getRetryDelayFromError,
ParallelRequests,
} from "@fluidframework/driver-utils";
import {
CreateContainerError,
Expand Down Expand Up @@ -785,98 +786,130 @@ export class DeltaManager

private async getDeltas(
telemetryEventSuffix: string,
fromInitial: number,
to: number | undefined,
callback: (messages: ISequencedDocumentMessage[]) => void) {
let retry: number = 0;
let from: number = fromInitial;
let deltas: ISequencedDocumentMessage[] = [];
let deltasRetrievedTotal = 0;

from: number, // exclusive
to: number | undefined, // exclusive
callback: (messages: ISequencedDocumentMessage[]) => void)
{
const docService = this.serviceProvider();
if (docService === undefined) {
throw new Error("Delta manager is not attached");
}

if (this.deltaStorageP === undefined) {
this.deltaStorageP = docService.connectToDeltaStorage();
}

const telemetryEvent = PerformanceEvent.start(this.logger, {
eventName: `GetDeltas_${telemetryEventSuffix}`,
from,
to,
});

let deltasRetrievedTotal = 0;
let requests = 0;

let lastFetch: number | undefined;

const manager = new ParallelRequests<ISequencedDocumentMessage>(
from + 1, // from is exclusive, but ParallelRequests uses inclusive left
to, // exclusive right
MaxBatchDeltas,
this.logger,
async (request: number, _from: number, _to: number, strongTo: boolean) => {
requests++;
return this.getSingleOpBatch(request, _from, _to, telemetryEvent, strongTo);
},
(deltas: ISequencedDocumentMessage[]) => {
deltasRetrievedTotal += deltas.length;
lastFetch = deltas[deltas.length - 1].sequenceNumber;
PerformanceEvent.timedExec(
this.logger,
{ eventName: "GetDeltas_OpProcessing", count: deltas.length},
() => callback(deltas),
{ end: true, cancel: "error" });
},
);

// Staging: starting with no concurrency, listening for feedback first.
// In future releases we will switch to actual concurrency
await manager.run(1 /* concurrency */);

telemetryEvent.end({
lastFetch,
deltasRetrievedTotal,
requests,
lastQueuedSequenceNumber: this.lastQueuedSequenceNumber,
});
}

/**
* Retrieve single batch of ops
* @param request - request index
* @param from - inclusive boundary
* @param to - exclusive boundary
* @param telemetryEvent - telemetry event used to track consecutive batch of requests
* @param strongTo - tells if ops in range from...to have to be there and have to be retrieved.
* If false, returning less ops would mean we reached end of file.
* @returns - an object with resulting ops and cancellation / partial result flags
*/
async getSingleOpBatch(
request: number,
from: number,
to: number,
telemetryEvent: PerformanceEvent,
strongTo: boolean):
Promise<{ partial: boolean, cancel: boolean, payload: ISequencedDocumentMessage[] }>
{
let deltaStorage: IDocumentDeltaStorageService | undefined;
let lastSuccessTime: number | undefined;

while (!this.closed) {
const maxFetchTo = from + MaxBatchDeltas;
const fetchTo = to === undefined ? maxFetchTo : Math.min(maxFetchTo, to);
let retry: number = 0;
const deltas: ISequencedDocumentMessage[] = [];
let deltasRetrievedTotal = 0;
const nothing = { partial: false, cancel: true, payload: []};

let deltasRetrievedLast = 0;
let canRetry = false;
let retryAfter: number | undefined;
let delay: number;
const start = performance.now();

// Calculate delay for next iteration if request fails or we get no ops.
// If request succeeds and returns some ops, we will reset these variables.
while (!this.closed) {
retry++;
delay = retryAfter ?? Math.min(MaxFetchDelaySeconds, MissingFetchDelaySeconds * Math.pow(2, retry));
let delay = Math.min(MaxFetchDelaySeconds, MissingFetchDelaySeconds * Math.pow(2, retry));
let canRetry = false;

try {
// Connect to the delta storage endpoint
if (deltaStorage === undefined) {
if (this.deltaStorageP === undefined) {
this.deltaStorageP = docService.connectToDeltaStorage();
}
deltaStorage = await this.deltaStorageP;
}

requests++;

// Issue async request for deltas - limit the number fetched to MaxBatchDeltas
canRetry = true;
const deltasP = deltaStorage.get(from, fetchTo);
assert(deltaStorage !== undefined);
// left is inclusive for ParallelRequests, but exclusive for IDocumentDeltaStorageService
// right is exclusive for both
const deltasP = deltaStorage.get(from - 1, to);

// Return previously fetched deltas, for processing while we are waiting for new request.
if (deltas.length > 0) {
callback(deltas);
}

// Now wait for request to come back
const { messages, partialResult } = await deltasP;
deltas = messages;
deltas.push(...messages);

// Note that server (or driver code) can push here something unexpected, like undefined
// Exception thrown as result of it will result in us retrying
deltasRetrievedLast = deltas.length;
const deltasRetrievedLast = messages.length;
deltasRetrievedTotal += deltasRetrievedLast;
const lastFetch = deltasRetrievedLast > 0 ? deltas[deltasRetrievedLast - 1].sequenceNumber : from;

// If we have no upper bound, then need to check partialResult flag. Different caching layers will
// return whatever ops they have and we need to keep asking until we get to the end.
// Only when partialResult = false, we know we got everything caching/storage layers have to offer,
// and if it's less than what we asked for, we know we reached the end.
// But if we know upper bound, then we have to get all these ops, even of storage says it does not
// have them. That could happen if offering service did not flush them yet to storage, or is in process
// of doing it, and we know we have a gap on our knowledge and can't proceed further without these ops.
// Note #1: we can get more ops than what we asked for - need to account for that!
// Note #2: from & to are exclusive! I.e. we actually expect [from + 1, to - 1] range of ops back!
if (to === undefined ? (!partialResult && lastFetch < maxFetchTo - 1) : to - 1 <= lastFetch) {
callback(deltas);
telemetryEvent.end({ lastFetch, deltasRetrievedTotal, requests });
return;

if (deltasRetrievedLast !== 0 || !strongTo) {
telemetryEvent.reportProgress({
chunkDeltas: deltasRetrievedTotal,
chunkFrom: from,
chunkTo: to,
chunkRequests: retry,
chunkDuration: TelemetryLogger.formatTick(performance.now() - start),
});
return { payload: deltas, cancel: false, partial: partialResult};
}

// Storage does not have ops we need.
// Attempt to fetch more deltas. If we didn't receive any in the previous call we up our retry
// count since something prevented us from seeing those deltas
from = lastFetch;

if (deltasRetrievedLast !== 0) {
// If we are getting some ops, reset all counters.
delay = 0;
retry = 0;
lastSuccessTime = undefined;
} else if (lastSuccessTime === undefined) {

if (lastSuccessTime === undefined) {
lastSuccessTime = Date.now();
} else if (Date.now() - lastSuccessTime > 30000) {
// If we are connected and receiving proper responses from server, but can't get any ops back,
Expand All @@ -887,7 +920,7 @@ export class DeltaManager
category: "error",
error: "too many retries",
retry,
requests,
request,
deltasRetrievedTotal,
replayFrom: from,
to,
Expand All @@ -896,7 +929,7 @@ export class DeltaManager
"Failed to retrieve ops from storage: giving up after too many retries",
false /* canRetry */,
));
return;
return nothing;
}
} catch (origError) {
canRetry = canRetry && canRetryOnError(origError);
Expand All @@ -908,9 +941,9 @@ export class DeltaManager
this.logger,
{
eventName: "GetDeltas_Error",
fetchTo,
fetchTo: to,
from,
requests,
request,
retry,
},
origError);
Expand All @@ -919,42 +952,28 @@ export class DeltaManager
// It's game over scenario.
telemetryEvent.cancel({ category: "error" }, origError);
this.close(error);
return;
return nothing;
}
retryAfter = getRetryDelayFromError(origError);
const retryAfter = getRetryDelayFromError(origError);

if (retryAfter !== undefined && retryAfter >= 0) {
this.emitDelayInfo(this.deltaStorageDelayId, retryAfter, error);
delay = retryAfter;
}
}

if (to !== undefined && this.lastQueuedSequenceNumber >= to) {
// the client caught up while we were trying to fetch ops from storage
// bail out since we no longer need to request these ops
telemetryEvent.end({
deltasRetrievedTotal,
requests,
lastQueuedSequenceNumber: this.lastQueuedSequenceNumber,
});
return;
return nothing;
}

telemetryEvent.reportProgress({
delay, // seconds
deltasRetrievedLast,
deltasRetrievedTotal,
replayFrom: from,
requests,
retry,
success: lastSuccessTime !== undefined,
});

await waitForConnectedState(delay * 1000);
}

// Might need to change to non-error event
this.logger.sendErrorEvent({ eventName: "GetDeltasClosedConnection" });
telemetryEvent.cancel({ error: "container closed" });
return nothing;
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/loader/driver-utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ export * from "./network";
export * from "./readAndParse";
export * from "./fluidResolvedUrl";
export * from "./summaryForCreateNew";
export * from "./parallelRequests";
Loading

0 comments on commit c9cfcd1

Please sign in to comment.