diff --git a/api-report/container-runtime.api.md b/api-report/container-runtime.api.md index 85eadf3658f7..bc94eefd48d4 100644 --- a/api-report/container-runtime.api.md +++ b/api-report/container-runtime.api.md @@ -198,11 +198,11 @@ export interface ContainerRuntimeMessage { export class DeltaScheduler { constructor(deltaManager: IDeltaManager, logger: ITelemetryLogger); // (undocumented) - batchBegin(): void; + batchBegin(message: ISequencedDocumentMessage): void; // (undocumented) - batchEnd(): void; + batchEnd(message: ISequencedDocumentMessage): void; // (undocumented) - static readonly processingTime = 20; + static readonly processingTime = 50; } // @public (undocumented) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 5bc32e63058a..16cfda4defca 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -632,7 +632,7 @@ export class ScheduleManager { // This could be the beginning of a new batch or an individual message. this.emitter.emit("batchBegin", message); - this.deltaScheduler.batchBegin(); + this.deltaScheduler.batchBegin(message); const batch = (message?.metadata as IRuntimeMessageMetadata)?.batch; if (batch) { @@ -653,7 +653,7 @@ export class ScheduleManager { this.hitError = true; this.batchClientId = undefined; this.emitter.emit("batchEnd", error, message); - this.deltaScheduler.batchEnd(); + this.deltaScheduler.batchEnd(message); return; } @@ -663,7 +663,7 @@ export class ScheduleManager { if (this.batchClientId === undefined || batch === false) { this.batchClientId = undefined; this.emitter.emit("batchEnd", undefined, message); - this.deltaScheduler.batchEnd(); + this.deltaScheduler.batchEnd(message); return; } } diff --git a/packages/runtime/container-runtime/src/deltaScheduler.ts b/packages/runtime/container-runtime/src/deltaScheduler.ts index ef44cb3cbb18..2d357aa0f922 100644 --- a/packages/runtime/container-runtime/src/deltaScheduler.ts +++ b/packages/runtime/container-runtime/src/deltaScheduler.ts @@ -11,6 +11,9 @@ import { ISequencedDocumentMessage, } from "@fluidframework/protocol-definitions"; +import { + TelemetryLogger, +} from "@fluidframework/telemetry-utils"; /** * DeltaScheduler is responsible for the scheduling of inbound delta queue in cases where there * is more than one op a particular run of the queue. It does not schedule if there is just one @@ -25,18 +28,13 @@ import { export class DeltaScheduler { private readonly deltaManager: IDeltaManager; // The time for processing ops in a single turn. - public static readonly processingTime = 20; + public static readonly processingTime = 50; // The increase in time for processing ops after each turn. private readonly processingTimeIncrement = 10; private processingStartTime: number | undefined; - private totalProcessingTime: number = DeltaScheduler.processingTime; - - // This keeps track of whether the delta scheduler is scheduling a particular run of the - // the inbound delta queue. Basically, every time the delta queue starts processing with - // more than one op, this will be set to true until the run completes. - private isScheduling: boolean = false; + private currentAllowedProcessingTimeForTurn: number = DeltaScheduler.processingTime; // This keeps track of the number of times inbound queue has been scheduled. After a particular // count, we log telemetry for the number of ops processed, the time and number of turns it took @@ -44,9 +42,13 @@ export class DeltaScheduler { private schedulingCount: number = 0; private schedulingLog: { - numberOfOps: number; + opsRemainingToProcess: number; totalProcessingTime: number; numberOfTurns: number; + numberOfBatchesProcessed: number; + lastSequenceNumber: number; + firstSequenceNumber: number; + startTime: number; } | undefined; constructor( @@ -57,50 +59,72 @@ export class DeltaScheduler { this.deltaManager.inbound.on("idle", () => { this.inboundQueueIdle(); }); } - public batchBegin() { + public batchBegin(message: ISequencedDocumentMessage) { if (!this.processingStartTime) { this.processingStartTime = performance.now(); } + if (this.schedulingLog === undefined && this.schedulingCount % 500 === 0) { + // Every 2000th time we are scheduling the inbound queue, we log telemetry for the + // number of ops processed, the time and number of turns it took to process the ops. + this.schedulingLog = { + opsRemainingToProcess: 0, + numberOfTurns: 1, + totalProcessingTime: 0, + numberOfBatchesProcessed: 0, + firstSequenceNumber: message.sequenceNumber, + lastSequenceNumber: message.sequenceNumber, + startTime: performance.now(), + }; + } } - public batchEnd() { - if (this.shouldRunScheduler()) { - if (!this.isScheduling) { - this.isScheduling = true; - // Every 2000th time we are scheduling the inbound queue, we log telemetry for the - // number of ops processed, the time and number of turns it took to process the ops. - if (this.schedulingCount % 2000 === 0) { - this.schedulingLog = { - numberOfOps: this.deltaManager.inbound.length, - numberOfTurns: 1, - totalProcessingTime: 0, - }; - } - } + public batchEnd(message: ISequencedDocumentMessage) { + if (this.schedulingLog) { + this.schedulingLog.numberOfBatchesProcessed++; + this.schedulingLog.lastSequenceNumber = message.sequenceNumber; + this.schedulingLog.opsRemainingToProcess = this.deltaManager.inbound.length; + } + if (this.shouldRunScheduler()) { + const currentTime = performance.now(); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const elapsedTime = performance.now() - this.processingStartTime!; - if (elapsedTime > this.totalProcessingTime) { + const elapsedTime = currentTime - this.processingStartTime!; + if (elapsedTime > this.currentAllowedProcessingTimeForTurn) { // We have processed ops for more than the total processing time. So, pause the // queue, yield the thread and schedule a resume. // eslint-disable-next-line @typescript-eslint/no-floating-promises this.deltaManager.inbound.pause(); - setTimeout(() => { - this.deltaManager.inbound.resume(); - }); - this.processingStartTime = undefined; - // Increase the total processing time. Keep doing this after each turn until all the ops have + // Increase the total processing time. Keep doing this after each turn until all the ops have // been processed. This way we keep the responsiveness at the beginning while also making sure // that all the ops process fairly quickly. - this.totalProcessingTime += this.processingTimeIncrement; + this.currentAllowedProcessingTimeForTurn += this.processingTimeIncrement; // If we are logging the telemetry this time, update the telemetry log object. if (this.schedulingLog) { this.schedulingLog.numberOfTurns++; this.schedulingLog.totalProcessingTime += elapsedTime; } + + setTimeout(() => { + if (this.schedulingLog) { + this.logger.sendTelemetryEvent({ + eventName: "InboundOpsPartialProcessingTime", + duration: TelemetryLogger.formatTick(elapsedTime), + opsProcessed: this.schedulingLog.lastSequenceNumber - + this.schedulingLog.firstSequenceNumber + 1, + opsRemainingToProcess: this.deltaManager.inbound.length, + processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), + numberOfTurns: this.schedulingLog.numberOfTurns, + batchesProcessed: this.schedulingLog.numberOfBatchesProcessed, + timeToResume: TelemetryLogger.formatTick(performance.now() - currentTime), + }); + } + this.deltaManager.inbound.resume(); + }); + + this.processingStartTime = undefined; } } } @@ -109,14 +133,19 @@ export class DeltaScheduler { if (this.schedulingLog) { // Add the time taken for processing the final ops to the total processing time in the // telemetry log object. + const currentTime = performance.now(); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.schedulingLog.totalProcessingTime += performance.now() - this.processingStartTime!; + this.schedulingLog.totalProcessingTime += currentTime - this.processingStartTime!; this.logger.sendTelemetryEvent({ eventName: "InboundOpsProcessingTime", - numberOfOps: this.schedulingLog.numberOfOps, + opsRemainingToProcess: this.schedulingLog.opsRemainingToProcess, numberOfTurns: this.schedulingLog.numberOfTurns, - processingTime: this.schedulingLog.totalProcessingTime, + processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), + opsProcessed: this.schedulingLog.lastSequenceNumber - this.schedulingLog.firstSequenceNumber + 1, + batchesProcessed: this.schedulingLog.numberOfBatchesProcessed, + duration: TelemetryLogger.formatTick(currentTime - this.schedulingLog.startTime), + schedulingCount: this.schedulingCount, }); this.schedulingLog = undefined; @@ -124,14 +153,11 @@ export class DeltaScheduler { // If we scheduled this batch of the inbound queue, increment the counter that tracks the // number of times we have done this. - if (this.isScheduling) { - this.isScheduling = false; - this.schedulingCount++; - } + this.schedulingCount++; // Reset the processing times. this.processingStartTime = undefined; - this.totalProcessingTime = DeltaScheduler.processingTime; + this.currentAllowedProcessingTimeForTurn = DeltaScheduler.processingTime; } /**