Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.32: Improve out of order op processing #4754

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ export class DeltaManager
// If so, it's time to process any accumulated ops
// Or request OPs from snapshot / or point zero (if we have no ops at all)
if (this.pending.length > 0) {
this.catchUp([], "DocumentOpen");
this.processPendingOps("DocumentOpen");
} else if (this.connection !== undefined || this.connectionP !== undefined) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.fetchMissingDeltas("DocumentOpen", this.lastQueuedSequenceNumber);
Expand Down Expand Up @@ -835,6 +835,17 @@ export class DeltaManager
}
}

if (to !== undefined && this.lastQueuedSequenceNumber >= to) {
// the client caught up while we were trying to fetch ops from storage
// bail out since we no longer need to request these ops
telemetryEvent.end({
deltasRetrievedTotal,
requests,
lastQueuedSequenceNumber: this.lastQueuedSequenceNumber,
});
return;
}

let delay: number;
if (deltasRetrievedLast !== 0) {
delay = 0;
Expand Down Expand Up @@ -1283,6 +1294,12 @@ export class DeltaManager
this.lastQueuedSequenceNumber = message.sequenceNumber;
this.previouslyProcessedMessage = message;
this._inbound.push(message);

if (this.pending.length > 0) {
// we processed a correctly sequenced inbound op while some are pending
// pending might include ops after the current sequence number, so process them now
this.processPendingOps(`EnqueueMessages_${telemetryEventSuffix}`);
}
}
}

Expand Down Expand Up @@ -1391,9 +1408,11 @@ export class DeltaManager

await this.getDeltas(telemetryEventSuffix, from, to, (messages) => {
this.refreshDelayInfo(this.deltaStorageDelayId);
this.catchUpCore(messages, telemetryEventSuffix);
this.enqueueMessages(messages, telemetryEventSuffix);
});

this.refreshDelayInfo(this.deltaStorageDelayId);

this.fetching = false;
}

Expand All @@ -1417,17 +1436,13 @@ export class DeltaManager
}
this.logger.sendPerformanceEvent(props);

this.catchUpCore(messages, telemetryEventSuffix);
}

private catchUpCore(messages: ISequencedDocumentMessage[], telemetryEventSuffix?: string): void {
// Apply current operations
this.enqueueMessages(messages, telemetryEventSuffix);
}

// Then sort pending operations and attempt to apply them again.
// This could be optimized to stop handling messages once we realize we need to fetch missing values.
// But for simplicity, and because catching up should be rare, we just process all of them.
// Optimize for case of no handler - we put ops back into this.pending in such case
/**
* Sorts pending ops and attempts to apply them
*/
private processPendingOps(telemetryEventSuffix?: string): void {
if (this.handler !== undefined) {
const pendingSorted = this.pending.sort((a, b) => a.sequenceNumber - b.sequenceNumber);
this.pending = [];
Expand Down