Skip to content

Commit

Permalink
One last summary before closing [0.15] (#1729)
Browse files Browse the repository at this point in the history
* Bypass delay on spawning summarizer if excessive ops before join

* Add waitStop to RunningSummarizer to let it summarize one last time before closing

* Remove unused line

* Slight refactor and add min ops

* Add comment

* Make sure originalRequest is set in Container

* Do not spawn a summarizer if any exists

* Silently ignore timeout cancels

* Add back trySummarizeCore to eliminate immediately invoked async function

* Remove mysterious trailing spaces

* Change flushPromises to use process.nextTick

Co-authored-by: Arin <arinwt@outlook.com>
  • Loading branch information
Arin Taylor and Arin authored Apr 13, 2020
1 parent f31f1f4 commit 8fa78b8
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 30 deletions.
77 changes: 52 additions & 25 deletions packages/runtime/container-runtime/src/summarizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import { IClientSummaryWatcher, SummaryCollection } from "./summaryCollection";
const maxSummarizeTimeoutTime = 20000; // 20 sec
const maxSummarizeTimeoutCount = 5; // Double and resend 5 times

const minOpsForLastSummary = 50;

declare module "@microsoft/fluid-component-core-interfaces" {
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface IComponent extends Readonly<Partial<IProvideSummarizer>> { }
Expand Down Expand Up @@ -62,6 +64,7 @@ export class Summarizer implements ISummarizer {
private opListener?: (error: any, op: ISequencedDocumentMessage) => void;
private immediateSummary: boolean = false;
public readonly summaryCollection: SummaryCollection;
private stopReason?: string;

constructor(
public readonly url: string,
Expand Down Expand Up @@ -91,10 +94,14 @@ export class Summarizer implements ISummarizer {
await this.runCore(onBehalfOf);
} finally {
// Cleanup after running
this.dispose();
if (this.runtime.connected) {
this.stop("runEnded");
if (this.runningSummarizer) {
// let running summarizer finish
await this.runningSummarizer.waitStop();
}
this.runtime.closeFn(`Summarizer: ${this.stopReason ?? "runEnded"}`);
}
this.dispose();
}
}

Expand All @@ -104,13 +111,17 @@ export class Summarizer implements ISummarizer {
* @param reason - reason code for stopping
*/
public stop(reason?: string) {
if (this.stopReason) {
// already stopping
return;
}
this.stopReason = reason;
this.logger.sendTelemetryEvent({
eventName: "StoppingSummarizer",
onBehalfOf: this.onBehalfOfClientId,
reason,
});
this.runCoordinator.stop();
this.runtime.closeFn(`Summarizer: ${reason}`);
}

public async request(request: IRequest): Promise<IResponse> {
Expand Down Expand Up @@ -218,8 +229,9 @@ export class Summarizer implements ISummarizer {
}

private async generateSummary(full: boolean, safe: boolean): Promise<GenerateSummaryData | undefined> {
if (this.onBehalfOfClientId !== this.runtime.summarizerClientId) {
// We are no longer the summarizer, we should stop ourself
if (this.onBehalfOfClientId !== this.runtime.summarizerClientId
&& this.runtime.clientId !== this.runtime.summarizerClientId) {
// We are no longer the summarizer; a different client is, so we should stop ourself
this.stop("parentNoLongerSummarizer");
return undefined;
}
Expand Down Expand Up @@ -310,7 +322,7 @@ export class RunningSummarizer implements IDisposable {
public get disposed() { return this._disposed; }

private _disposed = false;
private summarizing = false;
private summarizing: Deferred<void> | undefined;
private summarizeCount: number = 0;
private tryWhileSummarizing = false;
private readonly summarizeTimer: Timer;
Expand Down Expand Up @@ -398,6 +410,18 @@ export class RunningSummarizer implements IDisposable {
}
}

public async waitStop(): Promise<void> {
if (this.disposed) {
return;
}
const outstandingOps = this.heuristics.lastOpSeqNumber - this.heuristics.lastAcked.refSequenceNumber;
if (outstandingOps > minOpsForLastSummary) {
// This resolves when the current pending summary is broadcast.
// We don't stick around and wait to see if it is acked or not.
await this.trySummarize("lastSummary").broadcastP;
}
}

private async waitStart() {
// Wait no longer than ack timeout for all pending
const maybeLastAck = await Promise.race([
Expand All @@ -416,35 +440,37 @@ export class RunningSummarizer implements IDisposable {
}
}

private trySummarize(reason: string) {
this.trySummarizeCore(reason).catch((error) => {
this.logger.sendErrorEvent({ eventName: "UnexpectedSummarizeError" }, error);
});
}

private async trySummarizeCore(reason: string) {
private trySummarize(reason: string): { broadcastP: Promise<void> } {
if (this.summarizing) {
// We can't summarize if we are already
this.tryWhileSummarizing = true;
return;
return { broadcastP: Promise.resolve() };
}

// GenerateSummary could take some time
// mark that we are currently summarizing to prevent concurrent summarizing
this.summarizing = true;
this.summarizing = new Deferred();

try {
const result = await this.summarize(reason, false);
if (result === false) {
// On nack, try again in safe mode
await this.summarize(reason, true);
}
} finally {
this.summarizing = false;
if (this.tryWhileSummarizing) {
this.trySummarizeCore(reason).finally(() => {
// Make sure to always exit summarizing state
this.summarizing.resolve();
this.summarizing = undefined;
if (this.tryWhileSummarizing && !this.disposed) {
this.tryWhileSummarizing = false;
this.heuristics.run();
}
}).catch((error) => {
this.logger.sendErrorEvent({ eventName: "UnexpectedSummarizeError" }, error);
});

return { broadcastP: this.summarizing.promise };
}

private async trySummarizeCore(reason: string): Promise<void> {
const result = await this.summarize(reason, false);
if (result === false) {
// On nack, try again in safe mode
await this.summarize(reason, true);
}
}

Expand Down Expand Up @@ -478,11 +504,12 @@ export class RunningSummarizer implements IDisposable {
summaryTime: Date.now(),
};

const pendingTimeoutP = this.pendingAckTimer.start();
const pendingTimeoutP = this.pendingAckTimer.start().catch(() => undefined);
const summary = this.summaryWatcher.watchSummary(summaryData.clientSequenceNumber);

// Wait for broadcast
const summaryOp = await Promise.race([summary.waitBroadcast(), pendingTimeoutP]);
this.summarizing.resolve(); // broadcast means client is free to close
if (!summaryOp) {
return undefined;
}
Expand Down
29 changes: 27 additions & 2 deletions packages/runtime/container-runtime/src/summaryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import { ISummarizer, Summarizer } from "./summarizer";
interface ITrackedClient {
clientId: string;
sequenceNumber: number;
isSummarizer: boolean;
}

class ClientComparer implements IComparer<ITrackedClient> {
public readonly min: ITrackedClient = {
clientId: "",
sequenceNumber: -1,
isSummarizer: false,
};

public compare(a: ITrackedClient, b: ITrackedClient): number {
Expand All @@ -30,23 +32,35 @@ class ClientComparer implements IComparer<ITrackedClient> {
class QuorumHeap {
private readonly heap = new Heap<ITrackedClient>((new ClientComparer()));
private readonly heapMembers = new Map<string, IHeapNode<ITrackedClient>>();
private summarizerCount = 0;

public addClient(clientId: string, client: ISequencedClient) {
const heapNode = this.heap.add({ clientId, sequenceNumber: client.sequenceNumber });
const isSummarizer = client.client.details.type === "summarizer";
const heapNode = this.heap.add({ clientId, sequenceNumber: client.sequenceNumber, isSummarizer });
this.heapMembers.set(clientId, heapNode);
if (isSummarizer) {
this.summarizerCount++;
}
}

public removeClient(clientId: string) {
const member = this.heapMembers.get(clientId);
if (member) {
this.heap.remove(member);
this.heapMembers.delete(clientId);
if (member.value.isSummarizer) {
this.summarizerCount--;
}
}
}

public getFirstClientId(): string | undefined {
return this.heap.count() > 0 ? this.heap.peek().value.clientId : undefined;
}

public getSummarizerCount(): number {
return this.summarizerCount;
}
}

enum SummaryManagerState {
Expand All @@ -57,6 +71,7 @@ enum SummaryManagerState {

const defaultMaxRestarts = 5;
const defaultInitialDelayMs = 5000;
const opsToBypassInitialDelay = 4000;

export class SummaryManager extends EventEmitter implements IDisposable {
private readonly logger: ITelemetryLogger;
Expand All @@ -69,6 +84,7 @@ export class SummaryManager extends EventEmitter implements IDisposable {
private state = SummaryManagerState.Off;
private runningSummarizer?: IComponentRunnable;
private _disposed = false;
private opsUntilFirstConnect: number | undefined;

public get summarizer() {
return this.summarizerClientId;
Expand Down Expand Up @@ -108,6 +124,9 @@ export class SummaryManager extends EventEmitter implements IDisposable {
}

context.quorum.on("addMember", (clientId: string, details: ISequencedClient) => {
if (this.opsUntilFirstConnect === undefined && clientId === this.clientId) {
this.opsUntilFirstConnect = details.sequenceNumber - this.context.deltaManager.initialSequenceNumber;
}
this.quorumHeap.addClient(clientId, details);
this.refreshSummarizer();
});
Expand Down Expand Up @@ -203,6 +222,12 @@ export class SummaryManager extends EventEmitter implements IDisposable {
}

private start(attempt: number = 1) {
if (this.quorumHeap.getSummarizerCount() > 0) {
// Need to wait for any other existing summarizer clients to close,
// because they can live longer than their parent container.
return;
}

if (attempt > this.maxRestarts) {
this.logger.sendErrorEvent({ eventName: "MaxRestarts", maxRestarts: this.maxRestarts });
this.state = SummaryManagerState.Off;
Expand Down Expand Up @@ -272,7 +297,7 @@ export class SummaryManager extends EventEmitter implements IDisposable {

private async createSummarizer(delayMs: number): Promise<ISummarizer | undefined> {
await Promise.all([
this.initialDelayP,
this.opsUntilFirstConnect >= opsToBypassInitialDelay ? Promise.resolve() : this.initialDelayP,
delayMs > 0 ? new Promise((resolve) => setTimeout(resolve, delayMs)) : Promise.resolve(),
]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ describe("Runtime", () => {
}

async function flushPromises() {
const p = new Promise((resolve) => setTimeout(() => { resolve(); }, 0));
clock.tick(0);
return p;
return new Promise((resolve) => process.nextTick(resolve));
}

async function tickAndFlushPromises(ms: number) {
Expand Down

0 comments on commit 8fa78b8

Please sign in to comment.