Skip to content

Commit

Permalink
Better telemetry for fetching ops (#6947)
Browse files Browse the repository at this point in the history
Problem statement:

Newly added NoJoinOp telemetry event points out to a condition where ops are not processed for very long time.
Examining telemetry shows that all such cases have one thing in common - there is outstanding ops request to service that takes a long time. And in pretty much all the cases actual network request (as indicated by OpsFetch event) takes relatively short time, but overall process (GetDeltas_end) takes long time, occasionally minutes.

I believe in all these cases ops never get to storage (in reasonable time), but in majority cases client actually receives missing ops through websocket (though in all cases, read on). DeltaManager does cancel request in such case (see ExtraStorageCall event), but request is not immediately cancelled, blocking future requests (see fetchMissingDeltasCore - it allows only one outstanding call). As result, whole process does not more forward for the long time.

I do not have in-depth understanding where we get stuck in the process, but one such case is obvious waitForConnectedState() - it's possible that browser lies to us or does not quickly reacts to online/offline, which may cause process to get stuck for up to 30 seconds.

The other one more likely reason - 429s returned from SPO for fetching ops. We do not have logging for individual retryable attempts, so this goes unnoticed today.

Fix:

1. Make op fetching process return on cancellation immediately by listening for cancelation event.
2. Add telemetry for some sub-processes, like fetching ops from cache, if it takes longer than 1 second.
3. Remove ExtraStorageCall event as it fires on all successful fetches, and instead make core op fetching logic raise GetDeltas_cancel event instead if cancel was processed before all ops were fetched.
4. Add telemetry (logNetworkFailure in getSingleOpBatch) for individual failed fetched, such that we get insights for things like 429 that may block fetching process (but currently not visible in telemetry).

Outcome:

This does address many, but not all NoJoinOp issues (remaining needs to be looked deeper).
But this in turn brings back "too many retries" errors, indicating that one of the reasons we run into initial problem is due to client not being able to find relevant ops (and on top of it - not failing sooner, but hanging). These errors needs to also be looked deeper to understand if bugs are on client or server side.
  • Loading branch information
vladsud authored Aug 2, 2021
1 parent 8ce8baa commit dd6af78
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 24 deletions.
2 changes: 2 additions & 0 deletions api-report/driver-utils.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ export class ParallelRequests<T> {
// (undocumented)
cancel(): void;
// (undocumented)
get canceled(): boolean;
// (undocumented)
run(concurrency: number): Promise<void>;
}

Expand Down
6 changes: 4 additions & 2 deletions packages/drivers/odsp-driver/src/opsCaching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,15 @@ export class OpsCache {

batchNumber++;
}
if (messages.length > 0) {

const duration = performance.now() - start;
if (messages.length > 0 || duration > 1000) {
this.logger.sendPerformanceEvent({
eventName: "CacheOpsUsed",
from,
to,
length: messages.length,
duration: performance.now() - start,
duration,
});
}
return messages;
Expand Down
7 changes: 0 additions & 7 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -871,13 +871,6 @@ export class DeltaManager
// connection and reconnected (likely to another box), and new socket's initial ops contains these ops.
assert(op.sequenceNumber === this.lastQueuedSequenceNumber, "seq#'s");
if (this.lastQueuedSequenceNumber >= lastExpectedOp) {
this.logger.sendPerformanceEvent({
reason: this.fetchReason,
eventName: "ExtraStorageCall",
from,
to,
...this.connectionStateProps,
});
controller.abort();
this._inbound.off("push", listener);
}
Expand Down
60 changes: 45 additions & 15 deletions packages/loader/driver-utils/src/parallelRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import { PerformanceEvent} from "@fluidframework/telemetry-utils";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";
import { IDeltasFetchResult, IStream, IStreamResult } from "@fluidframework/driver-definitions";
import { getRetryDelayFromError, canRetryOnError, createGenericNetworkError } from "./network";
import { waitForConnectedState } from "./networkUtils";
import { waitForConnectedState, logNetworkFailure } from "./networkUtils";

const MaxFetchDelayInMs = 10000;
const MissingFetchDelayInMs = 100;

type WorkingState = "working" | "done" | "canceled";

/**
* Helper class to organize parallel fetching of data
* It can be used to concurrently do many requests, while consuming
Expand All @@ -29,12 +31,15 @@ export class ParallelRequests<T> {
private latestRequested: number;
private nextToDeliver: number;
private readonly results: Map<number, T[]> = new Map();
private working = true;
private workingState: WorkingState = "working";
private requestsInFlight = 0;
private readonly endEvent = new Deferred<void>();
private requests = 0;
private readonly knewTo: boolean;

private get working() { return this.workingState === "working"; }
public get canceled() { return this.workingState === "canceled"; }

constructor(
from: number,
private to: number | undefined,
Expand All @@ -54,8 +59,11 @@ export class ParallelRequests<T> {
}

public cancel() {
this.working = false;
this.endEvent.resolve();
if (this.working) {
this.workingState = "canceled";
this.logger.sendTelemetryEvent({ eventName: "GetDeltas_cancel" });
this.endEvent.resolve();
}
}

public async run(concurrency: number) {
Expand All @@ -75,13 +83,17 @@ export class ParallelRequests<T> {
// We should satisfy request fully.
assert(this.to !== undefined, 0x104 /* "undefined end point for parallel fetch" */);
assert(this.nextToDeliver >= this.to, 0x105 /* "unexpected end point for parallel fetch" */);
this.working = false;
this.endEvent.resolve();
if (this.working) {
this.workingState = "done";
this.endEvent.resolve();
}
}

private fail(error) {
this.working = false;
this.endEvent.reject(error);
if (this.working) {
this.workingState = "done";
this.endEvent.reject(error);
}
}

private dispatch() {
Expand Down Expand Up @@ -348,6 +360,7 @@ async function getSingleOpBatch(
get: (telemetryProps: ITelemetryProperties) => Promise<IDeltasFetchResult>,
props: ITelemetryProperties,
strongTo: boolean,
logger: ITelemetryLogger,
signal?: AbortSignal):
Promise<{ partial: boolean, cancel: boolean, payload: ISequencedDocumentMessage[] }>
{
Expand Down Expand Up @@ -402,16 +415,14 @@ async function getSingleOpBatch(

lastSuccessTime = undefined;

/*
logNetworkFailure(
this.logger,
logger,
{
eventName: "GetDeltas_Error",
...props,
retry,
},
error);
*/

if (!canRetry) {
// It's game over scenario.
Expand Down Expand Up @@ -450,7 +461,7 @@ export function requestOps(
};

const telemetryEvent = PerformanceEvent.start(logger, {
eventName: `GetDeltas`,
eventName: "GetDeltas",
...propsTotal,
});

Expand All @@ -465,6 +476,7 @@ export function requestOps(
async (propsAll) => get(from, to, propsAll),
{ request, from, to, ...propsTotal, ...propsPerRequest },
strongTo,
logger,
signal,
);
},
Expand All @@ -474,13 +486,31 @@ export function requestOps(
queue.pushValue(deltas);
});

// Implement faster cancellation. getSingleOpBatch() checks signal, but only in between
// waits (up to 10 seconds) and fetches (can take infinite amount of time).
// While every such case should be improved and take into account signal (and thus cancel immediately),
// it is beneficial to have catch-all
const listener = (event: Event) => { manager.cancel(); };
if (signal !== undefined) {
signal.addEventListener("abort", listener);
}

manager.run(concurrency)
.then(() => {
telemetryEvent.end({
.finally(() => {
if (signal !== undefined) {
signal.removeEventListener("abort", listener);
}
}).then(() => {
const props = {
lastFetch,
deltasRetrievedTotal,
requests,
});
};
if (manager.canceled) {
telemetryEvent.cancel(props);
} else {
telemetryEvent.end(props);
}
queue.pushDone();
})
.catch((error) => {
Expand Down

0 comments on commit dd6af78

Please sign in to comment.