Skip to content

Commit

Permalink
Address noDeltaStream issues
Browse files Browse the repository at this point in the history
  • Loading branch information
vladsud committed Sep 9, 2021
1 parent 6c9f884 commit 8c34b22
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 27 deletions.
8 changes: 4 additions & 4 deletions packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1256,19 +1256,19 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
if (loadMode.deltaConnection !== "none") {
// Start prefetch, but not set opsBeforeReturnP - boot is not blocked by it!
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this._deltaManager.preFetchOps(false);
this._deltaManager.preFetchOps(false /* cacheOnly */);
}
break;
case "cached":
opsBeforeReturnP = this._deltaManager.preFetchOps(true);
opsBeforeReturnP = this._deltaManager.preFetchOps(true /* cacheOnly */);
// Keep going with fetching ops from storage once we have all cached ops in.
// Ops processing will start once cached ops are in and and will stop when queue is empty
// (which in most cases will happen when we are done processing cached ops)
// eslint-disable-next-line @typescript-eslint/no-floating-promises
opsBeforeReturnP.then(async () => this._deltaManager.preFetchOps(false));
opsBeforeReturnP.then(async () => this._deltaManager.preFetchOps(false /* cacheOnly */));
break;
case "all":
opsBeforeReturnP = this._deltaManager.preFetchOps(false);
opsBeforeReturnP = this._deltaManager.preFetchOps(false /* cacheOnly */);
break;
default:
unreachableCase(loadMode.opsBeforeReturn);
Expand Down
50 changes: 29 additions & 21 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,14 @@ export class DeltaManager
}

public async preFetchOps(cacheOnly: boolean) {
// Note that might already got connected to delta stream by now.
// If we did, then we proactively fetch ops at the end of setupNewSuccessfulConnection to ensure
if (this.connection === undefined && !this.closed) {
// There should be never pending fetch!
// This API is called right after attachOpHandler by Container.load().
// We might have connection already and it might have called fetchMissingDeltas() from
// setupNewSuccessfulConnection. But it should do nothing, because there is no way to fetch ops before
// we know snapshot sequence number that is set in attachOpHandler. So all such calls should be noop.
assert(this.fetchReason === undefined, "");

if (!this.closed) {
return this.fetchMissingDeltasCore("DocumentOpen", cacheOnly, this.lastQueuedSequenceNumber, undefined);
}
}
Expand Down Expand Up @@ -1624,25 +1629,28 @@ export class DeltaManager
// actionably to report gaps in this range.
this.enqueueMessages(pendingSorted, `${reason}_pending`, true /* allowGaps */);

// See issue #7312 for more details
// We observe cases where client gets into situation where it is not aware of missing ops
// (i.e. client being behind), and as such, does not attempt to fetch them.
// In some cases client may not have enough signal (example - "read" connection that is silent -
// there is no easy way for client to realize it's behind, see a bit of commentary / logic at the
// end of setupNewSuccessfulConnection). In other cases it should be able to learn that info ("write"
// connection, learn by receiving its own join op), but data suggest it does not happen.
// In 50% of these cases we do know we are behind through checkpointSequenceNumber on connection object
// and thus can leverage that to trigger recovery. But this is not going to solve all the problems
// (the other 50%), and thus these errors below should be looked at even if code below results in
// recovery.
if (this.fetchReason === undefined && this.lastQueuedSequenceNumber < this.lastObservedSeqNumber) {
// connectionMode === "read" case is too noisy, so not log it.
// It happens because fetch in setupNewSuccessfulConnection get cancelled due to other fetch, and we
// never retry (other than here)
if (this.connectionMode === "write") {
this.logConnectionIssue({ eventName: "OpsBehind" });
// Re-entrancy is ignored by fetchMissingDeltas, execution will come here when it's over
if (this.fetchReason === undefined) {
// See issue #7312 for more details
// We observe cases where client gets into situation where it is not aware of missing ops
// (i.e. client being behind), and as such, does not attempt to fetch them.
// In some cases client may not have enough signal (example - "read" connection that is silent -
// there is no easy way for client to realize it's behind, see a bit of commentary / logic at the
// end of setupNewSuccessfulConnection). In other cases it should be able to learn that info ("write"
// connection, learn by receiving its own join op), but data suggest it does not happen.
// In 50% of these cases we do know we are behind through checkpointSequenceNumber on connection object
// and thus can leverage that to trigger recovery. But this is not going to solve all the problems
// (the other 50%), and thus these errors below should be looked at even if code below results in
// recovery.
if (this.lastQueuedSequenceNumber < this.lastObservedSeqNumber) {
// connectionMode === "read" case is too noisy, so not log it.
// It happens because fetch in setupNewSuccessfulConnection get cancelled due to other fetch, and we
// never retry (other than here)
if (this.connectionMode === "write") {
this.logConnectionIssue({ eventName: "OpsBehind" });
}
this.fetchMissingDeltas("OpsBehind", this.lastQueuedSequenceNumber);
}
this.fetchMissingDeltas("OpsBehind", this.lastQueuedSequenceNumber);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import { strict as assert } from "assert";
import { IContainer } from "@fluidframework/container-definitions";
import { IContainer, LoaderHeader } from "@fluidframework/container-definitions";
import { IFluidCodeDetails } from "@fluidframework/core-interfaces";
import {
createLocalResolverCreateNewRequest,
Expand Down Expand Up @@ -63,7 +63,12 @@ describe("No Delta Stream", () => {
if (!storageOnly) {
loaderContainerTracker.add(loader);
}
const container = await loader.resolve({ url: documentLoadUrl });

// See issue #7426 - need better long term solution
const container = await loader.resolve({
url: documentLoadUrl,
headers: { [LoaderHeader.loadMode]: { opsBeforeReturn: "all" }},
});
await loaderContainerTracker.ensureSynchronized();
return container;
}
Expand Down

0 comments on commit 8c34b22

Please sign in to comment.