Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry reads for blobs and trees if we can retry because of skeleton snapshots. #4427

Merged
merged 23 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import {
IRuntimeState,
ICriticalContainerError,
ContainerWarning,
IThrottlingWarning,
AttachState,
IThrottlingWarning,
} from "@fluidframework/container-definitions";
import { CreateContainerError, GenericError } from "@fluidframework/container-utils";
import {
LoaderCachingPolicy,
IDocumentService,
IDocumentStorageService,
IFluidResolvedUrl,
Expand Down Expand Up @@ -92,7 +91,6 @@ import { IConnectionArgs, DeltaManager, ReconnectMode } from "./deltaManager";
import { DeltaManagerProxy } from "./deltaManagerProxy";
import { Loader, RelativeLoader } from "./loader";
import { pkgVersion } from "./packageVersion";
import { PrefetchDocumentStorageService } from "./prefetchDocumentStorageService";
import { parseUrl, convertProtocolAndAppSummaryToSnapshotTree } from "./utils";

const detachedContainerRefSeqNumber = 0;
Expand Down Expand Up @@ -1134,16 +1132,7 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
}

private async getDocumentStorageService(): Promise<IDocumentStorageService> {
if (this.service === undefined) {
throw new Error("Not attached");
}
let service = await this.service.connectToStorage();

// Enable prefetching for the service unless it has a caching policy set otherwise:
if (this.service.policies?.caching !== LoaderCachingPolicy.NoCaching) {
service = new PrefetchDocumentStorageService(service);
}
return service;
return this._deltaManager.connectToStorage();
}

private async getDocumentAttributes(
Expand Down
81 changes: 49 additions & 32 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Licensed under the MIT License.
*/

import { v4 as uuid } from "uuid";
import { ITelemetryLogger, IEventProvider } from "@fluidframework/common-definitions";
import {
IConnectionDetails,
Expand All @@ -11,15 +12,17 @@ import {
IDeltaManagerEvents,
IDeltaQueue,
ICriticalContainerError,
IThrottlingWarning,
ContainerErrorType,
IThrottlingWarning,
} from "@fluidframework/container-definitions";
import { assert, performance, TypedEventEmitter } from "@fluidframework/common-utils";
import { PerformanceEvent, TelemetryLogger, safeRaiseEvent } from "@fluidframework/telemetry-utils";
import {
IDocumentDeltaStorageService,
IDocumentService,
IDocumentDeltaConnection,
IDocumentStorageService,
LoaderCachingPolicy,
} from "@fluidframework/driver-definitions";
import { isSystemType, isSystemMessage } from "@fluidframework/protocol-base";
import {
Expand All @@ -46,6 +49,8 @@ import { CreateContainerError } from "@fluidframework/container-utils";
import { debug } from "./debug";
import { DeltaQueue } from "./deltaQueue";
import { logNetworkFailure, waitForConnectedState } from "./networkUtils";
import { RetriableDocumentStorageService } from "./retriableDocumentStorageService";
import { PrefetchDocumentStorageService } from "./prefetchDocumentStorageService";

const MaxReconnectDelaySeconds = 8;
const InitialReconnectDelaySeconds = 1;
Expand All @@ -58,7 +63,7 @@ const DefaultChunkSize = 16 * 1024;
const ImmediateNoOpResponse = "";

// eslint-disable-next-line @typescript-eslint/no-unsafe-return
const getRetryDelayFromError = (error: any): number | undefined => error?.retryAfterSeconds;
export const getRetryDelayFromError = (error: any): number | undefined => error?.retryAfterSeconds;

function getNackReconnectInfo(nackContent: INackContent) {
const reason = `Nack: ${nackContent.message}`;
Expand All @@ -75,11 +80,6 @@ function createReconnectError(prefix: string, err: any) {
return error2;
}

enum RetryFor {
DeltaStream,
DeltaStorage,
}

export interface IConnectionArgs {
mode?: ConnectionMode;
fetchOpsFromStorage?: boolean;
Expand Down Expand Up @@ -167,6 +167,9 @@ export class DeltaManager
private clientSequenceNumber = 0;
private clientSequenceNumberObserved = 0;
private closed = false;
private storageService: RetriableDocumentStorageService | undefined;
private readonly deltaStreamDelayId = uuid();
private readonly deltaStorageDelayId = uuid();

// track clientId used last time when we sent any ops
private lastSubmittedClientId: string | undefined;
Expand All @@ -177,9 +180,8 @@ export class DeltaManager
private messageBuffer: IDocumentMessage[] = [];

private connectFirstConnection = true;

private deltaStorageDelay: number = 0;
private deltaStreamDelay: number = 0;
private readonly idToDelayMap = new Map<string, number>();
private maxThrottlingDelay: number = 0;

// True if current connection has checkpoint information
// I.e. we know how far behind the client was at the time of establishing connection
Expand Down Expand Up @@ -305,6 +307,25 @@ export class DeltaManager
return this._reconnectMode;
}

public async connectToStorage(): Promise<IDocumentStorageService> {
if (this.storageService !== undefined) {
return this.storageService;
}
const service = this.serviceProvider();
if (service === undefined) {
throw new Error("Not attached");
}

let storageService = await service.connectToStorage();
// Enable prefetching for the service unless it has a caching policy set otherwise:
if (service.policies?.caching !== LoaderCachingPolicy.NoCaching) {
storageService = new PrefetchDocumentStorageService(storageService);
}

this.storageService = new RetriableDocumentStorageService(storageService, this);
return this.storageService;
}

/**
* Enables or disables automatic reconnecting.
* Will throw an error if reconnectMode set to Never.
Expand Down Expand Up @@ -588,7 +609,7 @@ export class DeltaManager
delay = retryDelayFromError ?? Math.min(delay * 2, MaxReconnectDelaySeconds);

if (retryDelayFromError !== undefined) {
this.emitDelayInfo(RetryFor.DeltaStream, retryDelayFromError, error);
this.emitDelayInfo(this.deltaStreamDelayId, retryDelayFromError, error);
}
await waitForConnectedState(delay * 1000);
}
Expand Down Expand Up @@ -812,7 +833,7 @@ export class DeltaManager
retryAfter = getRetryDelayFromError(origError);

if (retryAfter !== undefined && retryAfter >= 0) {
this.emitDelayInfo(RetryFor.DeltaStorage, retryAfter, error);
this.emitDelayInfo(this.deltaStorageDelayId, retryAfter, error);
}
}

Expand Down Expand Up @@ -874,7 +895,7 @@ export class DeltaManager
return;
}
this.closed = true;

this.storageService?.dispose();
this.stopSequenceNumberUpdate();

// This raises "disconnect" event if we have active connection.
Expand Down Expand Up @@ -923,27 +944,23 @@ export class DeltaManager
}
}

private cancelDelayInfo(retryEndpoint: number) {
if (retryEndpoint === RetryFor.DeltaStorage) {
this.deltaStorageDelay = 0;
} else if (retryEndpoint === RetryFor.DeltaStream) {
this.deltaStreamDelay = 0;
}
public cancelDelayInfo(id: string) {
this.idToDelayMap.delete(id);
this.maxThrottlingDelay = Math.max(...this.idToDelayMap.values());
}

private emitDelayInfo(retryEndpoint: number, delaySeconds: number, error: ICriticalContainerError) {
if (retryEndpoint === RetryFor.DeltaStorage) {
this.deltaStorageDelay = delaySeconds;
} else if (retryEndpoint === RetryFor.DeltaStream) {
this.deltaStreamDelay = delaySeconds;
}

const delayTime = Math.max(this.deltaStorageDelay, this.deltaStreamDelay);
if (delayTime > 0) {
public emitDelayInfo(
id: string,
delaySeconds: number,
error: ICriticalContainerError,
) {
this.idToDelayMap.set(id, delaySeconds);
if (delaySeconds > 0 && delaySeconds > this.maxThrottlingDelay) {
this.maxThrottlingDelay = delaySeconds;
const throttlingError: IThrottlingWarning = {
errorType: ContainerErrorType.throttlingError,
message: `Service busy/throttled: ${error.message}`,
retryAfterSeconds: delayTime,
retryAfterSeconds: delaySeconds,
};
this.emit("throttled", throttlingError);
}
Expand Down Expand Up @@ -1036,7 +1053,7 @@ export class DeltaManager
assert(!readonly || this.connectionMode === "read", "readonly perf with write connection");
this.set_readonlyPermissions(readonly);

this.cancelDelayInfo(RetryFor.DeltaStream);
this.cancelDelayInfo(this.deltaStreamDelayId);

if (this.closed) {
// Raise proper events, Log telemetry event and close connection.
Expand Down Expand Up @@ -1175,7 +1192,7 @@ export class DeltaManager
if (this.reconnectMode === ReconnectMode.Enabled) {
const delay = getRetryDelayFromError(error);
if (delay !== undefined) {
this.emitDelayInfo(RetryFor.DeltaStream, delay, error);
this.emitDelayInfo(this.deltaStreamDelayId, delay, error);
await waitForConnectedState(delay * 1000);
}

Expand Down Expand Up @@ -1371,7 +1388,7 @@ export class DeltaManager
this.fetching = true;

await this.getDeltas(telemetryEventSuffix, from, to, (messages) => {
this.cancelDelayInfo(RetryFor.DeltaStorage);
this.cancelDelayInfo(this.deltaStorageDelayId);
this.catchUpCore(messages, telemetryEventSuffix);
});

Expand Down
1 change: 1 addition & 0 deletions packages/loader/container-loader/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from "./deltaManager";
export * from "./loader";
export * from "./networkUtils";
export * from "./utils";
export * from "./retriableDocumentStorageService";
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*!
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { v4 as uuid } from "uuid";
import { CreateContainerError } from "@fluidframework/container-utils";
import { IDocumentStorageService } from "@fluidframework/driver-definitions";
import { canRetryOnError, DocumentStorageServiceProxy } from "@fluidframework/driver-utils";
import { ISnapshotTree, IVersion } from "@fluidframework/protocol-definitions";
import { DeltaManager, getRetryDelayFromError } from "./deltaManager";

export class RetriableDocumentStorageService extends DocumentStorageServiceProxy {
private disposed = false;
constructor(
internalStorageService: IDocumentStorageService,
private readonly deltaManager: Pick<DeltaManager, "emitDelayInfo" | "cancelDelayInfo">,
) {
super(internalStorageService);
}

public dispose() {
this.disposed = true;
}

public async getSnapshotTree(version?: IVersion): Promise<ISnapshotTree | null> {
return this.readWithRetry(async () => this.internalStorageService.getSnapshotTree(version));
}

public async read(blobId: string): Promise<string> {
return this.readWithRetry(async () => this.internalStorageService.read(blobId));
}

public async readBlob(id: string): Promise<ArrayBufferLike> {
return this.readWithRetry(async () => this.internalStorageService.readBlob(id));
}

public async readString(id: string): Promise<string> {
return this.readWithRetry(async () => this.internalStorageService.readString(id));
}

private async delay(timeMs: number): Promise<void> {
return new Promise((resolve) => setTimeout(() => resolve(), timeMs));
}

private async readWithRetry<T>(api: () => Promise<T>): Promise<T> {
let result: T | undefined;
let success = false;
let retryAfter = 0;
let id: string | undefined;
do {
try {
result = await api();
if (id !== undefined) {
this.deltaManager.cancelDelayInfo(id);
}
success = true;
} catch (err) {
if (this.disposed) {
throw CreateContainerError("Storage service disposed!!");
}
// If it is not retriable, then just throw the error.
if (!canRetryOnError(err)) {
throw err;
}
// If the error is throttling error, then wait for the specified time before retrying.
// If the waitTime is not specified, then we start with retrying immediately to max of 8s.
retryAfter = getRetryDelayFromError(err) ?? Math.min(retryAfter * 2, 8000);
if (id === undefined) {
id = uuid();
}
this.deltaManager.emitDelayInfo(id, retryAfter, CreateContainerError(err));
await this.delay(retryAfter);
}
} while (!success);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return result!;
}
}
Loading