From c4d5751f3502ca084aaa11fa1e08ff2a04837bec Mon Sep 17 00:00:00 2001 From: Nicholas Couri Date: Thu, 7 Apr 2022 19:27:06 -0700 Subject: [PATCH 01/10] Improve logging in DeltaScheduler --- .../container-runtime/src/deltaScheduler.ts | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/packages/runtime/container-runtime/src/deltaScheduler.ts b/packages/runtime/container-runtime/src/deltaScheduler.ts index ef44cb3cbb18..2c47f7cf892f 100644 --- a/packages/runtime/container-runtime/src/deltaScheduler.ts +++ b/packages/runtime/container-runtime/src/deltaScheduler.ts @@ -25,7 +25,7 @@ import { export class DeltaScheduler { private readonly deltaManager: IDeltaManager; // The time for processing ops in a single turn. - public static readonly processingTime = 20; + public static readonly processingTime = 50; // The increase in time for processing ops after each turn. private readonly processingTimeIncrement = 10; @@ -43,6 +43,9 @@ export class DeltaScheduler { // to process the ops. private schedulingCount: number = 0; + // Keeps track of the Queue length before processing a batch. + private queueLengthBeforeBatchBegin: number = 0; + private schedulingLog: { numberOfOps: number; totalProcessingTime: number; @@ -60,6 +63,7 @@ export class DeltaScheduler { public batchBegin() { if (!this.processingStartTime) { this.processingStartTime = performance.now(); + this.queueLengthBeforeBatchBegin = this.deltaManager.inbound.length; } } @@ -86,7 +90,18 @@ export class DeltaScheduler { // eslint-disable-next-line @typescript-eslint/no-floating-promises this.deltaManager.inbound.pause(); + setTimeout(() => { + if (this.schedulingLog) { + this.logger.sendTelemetryEvent({ + eventName: "DeltaManagerPaused", + duration: elapsedTime, + numberOfOps: this.deltaManager.inbound.length, + numberofOpsProcessed: this.queueLengthBeforeBatchBegin - this.deltaManager.inbound.length, + processingTime: this.schedulingLog.totalProcessingTime, + numberOfTurns:this.schedulingLog.numberOfTurns, + }); + } this.deltaManager.inbound.resume(); }); @@ -117,6 +132,8 @@ export class DeltaScheduler { numberOfOps: this.schedulingLog.numberOfOps, numberOfTurns: this.schedulingLog.numberOfTurns, processingTime: this.schedulingLog.totalProcessingTime, + numberOfOpsBefore: this.queueLengthBeforeBatchBegin, + numberOfOpsRemaining: this.deltaManager.inbound.length, }); this.schedulingLog = undefined; @@ -129,6 +146,8 @@ export class DeltaScheduler { this.schedulingCount++; } + this.queueLengthBeforeBatchBegin = 0; + // Reset the processing times. this.processingStartTime = undefined; this.totalProcessingTime = DeltaScheduler.processingTime; From 77c17a428ec18d8ab2a0fd673f715f36a58e7736 Mon Sep 17 00:00:00 2001 From: Nicholas Couri Date: Fri, 8 Apr 2022 08:44:41 -0700 Subject: [PATCH 02/10] adding missing file. --- api-report/container-runtime.api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api-report/container-runtime.api.md b/api-report/container-runtime.api.md index 85eadf3658f7..329809d19548 100644 --- a/api-report/container-runtime.api.md +++ b/api-report/container-runtime.api.md @@ -202,7 +202,7 @@ export class DeltaScheduler { // (undocumented) batchEnd(): void; // (undocumented) - static readonly processingTime = 20; + static readonly processingTime = 50; } // @public (undocumented) From a137d40eb8f23216004757443044c7f5cba81314 Mon Sep 17 00:00:00 2001 From: Nicholas Couri Date: Fri, 8 Apr 2022 13:54:58 -0700 Subject: [PATCH 03/10] Addressing review comments + hooking up with the deltamanager.inbound.push Ops to count sequence numbers --- .../container-runtime/src/deltaScheduler.ts | 132 ++++++++++-------- 1 file changed, 76 insertions(+), 56 deletions(-) diff --git a/packages/runtime/container-runtime/src/deltaScheduler.ts b/packages/runtime/container-runtime/src/deltaScheduler.ts index 2c47f7cf892f..c965a0036e12 100644 --- a/packages/runtime/container-runtime/src/deltaScheduler.ts +++ b/packages/runtime/container-runtime/src/deltaScheduler.ts @@ -9,8 +9,12 @@ import { IDeltaManager } from "@fluidframework/container-definitions"; import { IDocumentMessage, ISequencedDocumentMessage, + MessageType, } from "@fluidframework/protocol-definitions"; +import { + TelemetryLogger, +} from "@fluidframework/telemetry-utils"; /** * DeltaScheduler is responsible for the scheduling of inbound delta queue in cases where there * is more than one op a particular run of the queue. It does not schedule if there is just one @@ -33,88 +37,103 @@ export class DeltaScheduler { private processingStartTime: number | undefined; private totalProcessingTime: number = DeltaScheduler.processingTime; - // This keeps track of whether the delta scheduler is scheduling a particular run of the - // the inbound delta queue. Basically, every time the delta queue starts processing with - // more than one op, this will be set to true until the run completes. - private isScheduling: boolean = false; - // This keeps track of the number of times inbound queue has been scheduled. After a particular // count, we log telemetry for the number of ops processed, the time and number of turns it took // to process the ops. private schedulingCount: number = 0; - // Keeps track of the Queue length before processing a batch. - private queueLengthBeforeBatchBegin: number = 0; - private schedulingLog: { - numberOfOps: number; + opsRemainingToProcess: number; totalProcessingTime: number; numberOfTurns: number; } | undefined; + // Keeps track of the batching numbers so we can have an idea on how many ops were processed. + private batchProcessBegin: boolean = false; + private numberOfBatchesProcessed: number = 0; + private lastSequenceNumber: number = 0; + private firstSequenceNumber: number = 0; + constructor( deltaManager: IDeltaManager, private readonly logger: ITelemetryLogger, ) { this.deltaManager = deltaManager; this.deltaManager.inbound.on("idle", () => { this.inboundQueueIdle(); }); + this.deltaManager.inbound.on("op", (op: ISequencedDocumentMessage) => { + if (op.type === MessageType.Operation) { + if (this.batchProcessBegin) { + this.batchProcessBegin = false; + this.firstSequenceNumber = this.lastSequenceNumber = op.sequenceNumber; + } + else { // Last Sequence Number will always have the latest op sequenceNumber. + this.lastSequenceNumber = op.sequenceNumber; + } + } + }); } public batchBegin() { if (!this.processingStartTime) { this.processingStartTime = performance.now(); - this.queueLengthBeforeBatchBegin = this.deltaManager.inbound.length; + this.batchProcessBegin = true; + this.numberOfBatchesProcessed = 0; } + this.numberOfBatchesProcessed++; } public batchEnd() { - if (this.shouldRunScheduler()) { - if (!this.isScheduling) { - this.isScheduling = true; - // Every 2000th time we are scheduling the inbound queue, we log telemetry for the - // number of ops processed, the time and number of turns it took to process the ops. + if (this.schedulingLog === undefined) { if (this.schedulingCount % 2000 === 0) { + // Every 2000th time we are scheduling the inbound queue, we log telemetry for the + // number of ops processed, the time and number of turns it took to process the ops. this.schedulingLog = { - numberOfOps: this.deltaManager.inbound.length, + opsRemainingToProcess: this.deltaManager.inbound.length, numberOfTurns: 1, totalProcessingTime: 0, }; } } - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const elapsedTime = performance.now() - this.processingStartTime!; - if (elapsedTime > this.totalProcessingTime) { - // We have processed ops for more than the total processing time. So, pause the - // queue, yield the thread and schedule a resume. - - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.deltaManager.inbound.pause(); - - setTimeout(() => { + if (this.shouldRunScheduler()) { + { + const currentTime = performance.now(); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const elapsedTime = currentTime - this.processingStartTime!; + if (elapsedTime > this.totalProcessingTime) { + // We have processed ops for more than the total processing time. So, pause the + // queue, yield the thread and schedule a resume. + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.deltaManager.inbound.pause(); + + setTimeout(() => { + if (this.schedulingLog) { + this.logger.sendTelemetryEvent({ + eventName: "DeltaManagerPaused", + duration: TelemetryLogger.formatTick(elapsedTime), + opsProcessed: this.lastSequenceNumber - this.firstSequenceNumber, + opsRemainingToProcess: this.deltaManager.inbound.length, + processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), + numberOfTurns:this.schedulingLog.numberOfTurns, + batchesProcessed: this.numberOfBatchesProcessed, + waitingToResume: TelemetryLogger.formatTick(performance.now() - currentTime), + }); + this.deltaManager.inbound.resume(); + } + }); + + this.processingStartTime = undefined; + // Increase the total processing time. Keep doing this after each turn until all the ops have + // been processed. This way we keep the responsiveness at the beginning while also making sure + // that all the ops process fairly quickly. + this.totalProcessingTime += this.processingTimeIncrement; + + // If we are logging the telemetry this time, update the telemetry log object. if (this.schedulingLog) { - this.logger.sendTelemetryEvent({ - eventName: "DeltaManagerPaused", - duration: elapsedTime, - numberOfOps: this.deltaManager.inbound.length, - numberofOpsProcessed: this.queueLengthBeforeBatchBegin - this.deltaManager.inbound.length, - processingTime: this.schedulingLog.totalProcessingTime, - numberOfTurns:this.schedulingLog.numberOfTurns, - }); + this.schedulingLog.numberOfTurns++; + this.schedulingLog.totalProcessingTime += elapsedTime; } - this.deltaManager.inbound.resume(); - }); - - this.processingStartTime = undefined; - // Increase the total processing time. Keep doing this after each turn until all the ops have - // been processed. This way we keep the responsiveness at the beginning while also making sure - // that all the ops process fairly quickly. - this.totalProcessingTime += this.processingTimeIncrement; - - // If we are logging the telemetry this time, update the telemetry log object. - if (this.schedulingLog) { - this.schedulingLog.numberOfTurns++; - this.schedulingLog.totalProcessingTime += elapsedTime; } } } @@ -129,11 +148,12 @@ export class DeltaScheduler { this.logger.sendTelemetryEvent({ eventName: "InboundOpsProcessingTime", - numberOfOps: this.schedulingLog.numberOfOps, + opsRemainingToProcess: this.schedulingLog.opsRemainingToProcess, numberOfTurns: this.schedulingLog.numberOfTurns, - processingTime: this.schedulingLog.totalProcessingTime, - numberOfOpsBefore: this.queueLengthBeforeBatchBegin, - numberOfOpsRemaining: this.deltaManager.inbound.length, + processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), + opsProcessed: this.lastSequenceNumber - this.firstSequenceNumber, + batchesProcessed: this.numberOfBatchesProcessed, + }); this.schedulingLog = undefined; @@ -141,12 +161,12 @@ export class DeltaScheduler { // If we scheduled this batch of the inbound queue, increment the counter that tracks the // number of times we have done this. - if (this.isScheduling) { - this.isScheduling = false; - this.schedulingCount++; - } + this.schedulingCount++; - this.queueLengthBeforeBatchBegin = 0; + // Batching data needs to be reset. + this.lastSequenceNumber = 0; + this.firstSequenceNumber = 0; + this.numberOfBatchesProcessed = 0; // Reset the processing times. this.processingStartTime = undefined; From e5062930312accec290b714a6ee850571c0524b3 Mon Sep 17 00:00:00 2001 From: Nicholas Couri Date: Fri, 8 Apr 2022 16:42:09 -0700 Subject: [PATCH 04/10] Passing Message as argument to batchstart and batchend --- api-report/container-runtime.api.md | 4 +- .../container-runtime/src/containerRuntime.ts | 6 +- .../container-runtime/src/deltaScheduler.ts | 115 ++++++++---------- 3 files changed, 56 insertions(+), 69 deletions(-) diff --git a/api-report/container-runtime.api.md b/api-report/container-runtime.api.md index 329809d19548..bc94eefd48d4 100644 --- a/api-report/container-runtime.api.md +++ b/api-report/container-runtime.api.md @@ -198,9 +198,9 @@ export interface ContainerRuntimeMessage { export class DeltaScheduler { constructor(deltaManager: IDeltaManager, logger: ITelemetryLogger); // (undocumented) - batchBegin(): void; + batchBegin(message: ISequencedDocumentMessage): void; // (undocumented) - batchEnd(): void; + batchEnd(message: ISequencedDocumentMessage): void; // (undocumented) static readonly processingTime = 50; } diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 5bc32e63058a..16cfda4defca 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -632,7 +632,7 @@ export class ScheduleManager { // This could be the beginning of a new batch or an individual message. this.emitter.emit("batchBegin", message); - this.deltaScheduler.batchBegin(); + this.deltaScheduler.batchBegin(message); const batch = (message?.metadata as IRuntimeMessageMetadata)?.batch; if (batch) { @@ -653,7 +653,7 @@ export class ScheduleManager { this.hitError = true; this.batchClientId = undefined; this.emitter.emit("batchEnd", error, message); - this.deltaScheduler.batchEnd(); + this.deltaScheduler.batchEnd(message); return; } @@ -663,7 +663,7 @@ export class ScheduleManager { if (this.batchClientId === undefined || batch === false) { this.batchClientId = undefined; this.emitter.emit("batchEnd", undefined, message); - this.deltaScheduler.batchEnd(); + this.deltaScheduler.batchEnd(message); return; } } diff --git a/packages/runtime/container-runtime/src/deltaScheduler.ts b/packages/runtime/container-runtime/src/deltaScheduler.ts index c965a0036e12..3293090feedc 100644 --- a/packages/runtime/container-runtime/src/deltaScheduler.ts +++ b/packages/runtime/container-runtime/src/deltaScheduler.ts @@ -9,7 +9,6 @@ import { IDeltaManager } from "@fluidframework/container-definitions"; import { IDocumentMessage, ISequencedDocumentMessage, - MessageType, } from "@fluidframework/protocol-definitions"; import { @@ -49,7 +48,6 @@ export class DeltaScheduler { } | undefined; // Keeps track of the batching numbers so we can have an idea on how many ops were processed. - private batchProcessBegin: boolean = false; private numberOfBatchesProcessed: number = 0; private lastSequenceNumber: number = 0; private firstSequenceNumber: number = 0; @@ -60,81 +58,70 @@ export class DeltaScheduler { ) { this.deltaManager = deltaManager; this.deltaManager.inbound.on("idle", () => { this.inboundQueueIdle(); }); - this.deltaManager.inbound.on("op", (op: ISequencedDocumentMessage) => { - if (op.type === MessageType.Operation) { - if (this.batchProcessBegin) { - this.batchProcessBegin = false; - this.firstSequenceNumber = this.lastSequenceNumber = op.sequenceNumber; - } - else { // Last Sequence Number will always have the latest op sequenceNumber. - this.lastSequenceNumber = op.sequenceNumber; - } - } - }); } - public batchBegin() { + public batchBegin(message: ISequencedDocumentMessage) { if (!this.processingStartTime) { this.processingStartTime = performance.now(); - this.batchProcessBegin = true; this.numberOfBatchesProcessed = 0; + this.firstSequenceNumber = this.lastSequenceNumber = message.sequenceNumber; } this.numberOfBatchesProcessed++; } - public batchEnd() { - if (this.schedulingLog === undefined) { - if (this.schedulingCount % 2000 === 0) { - // Every 2000th time we are scheduling the inbound queue, we log telemetry for the - // number of ops processed, the time and number of turns it took to process the ops. - this.schedulingLog = { - opsRemainingToProcess: this.deltaManager.inbound.length, - numberOfTurns: 1, - totalProcessingTime: 0, - }; - } + public batchEnd(message: ISequencedDocumentMessage) { + this.lastSequenceNumber = message.sequenceNumber; + if (this.schedulingLog === undefined) { + if (this.schedulingCount % 2000 === 0) { + // Every 2000th time we are scheduling the inbound queue, we log telemetry for the + // number of ops processed, the time and number of turns it took to process the ops. + this.schedulingLog = { + opsRemainingToProcess: this.deltaManager.inbound.length, + numberOfTurns: 1, + totalProcessingTime: 0, + }; } + } - if (this.shouldRunScheduler()) { - { - const currentTime = performance.now(); - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const elapsedTime = currentTime - this.processingStartTime!; - if (elapsedTime > this.totalProcessingTime) { - // We have processed ops for more than the total processing time. So, pause the - // queue, yield the thread and schedule a resume. - - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.deltaManager.inbound.pause(); - - setTimeout(() => { - if (this.schedulingLog) { - this.logger.sendTelemetryEvent({ - eventName: "DeltaManagerPaused", - duration: TelemetryLogger.formatTick(elapsedTime), - opsProcessed: this.lastSequenceNumber - this.firstSequenceNumber, - opsRemainingToProcess: this.deltaManager.inbound.length, - processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), - numberOfTurns:this.schedulingLog.numberOfTurns, - batchesProcessed: this.numberOfBatchesProcessed, - waitingToResume: TelemetryLogger.formatTick(performance.now() - currentTime), - }); - this.deltaManager.inbound.resume(); - } - }); - - this.processingStartTime = undefined; - // Increase the total processing time. Keep doing this after each turn until all the ops have - // been processed. This way we keep the responsiveness at the beginning while also making sure - // that all the ops process fairly quickly. - this.totalProcessingTime += this.processingTimeIncrement; - - // If we are logging the telemetry this time, update the telemetry log object. + if (this.shouldRunScheduler()) { + const currentTime = performance.now(); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const elapsedTime = currentTime - this.processingStartTime!; + if (elapsedTime > this.totalProcessingTime) { + // We have processed ops for more than the total processing time. So, pause the + // queue, yield the thread and schedule a resume. + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.deltaManager.inbound.pause(); + + // Increase the total processing time. Keep doing this after each turn until all the ops have + // been processed. This way we keep the responsiveness at the beginning while also making sure + // that all the ops process fairly quickly. + this.totalProcessingTime += this.processingTimeIncrement; + + // If we are logging the telemetry this time, update the telemetry log object. + if (this.schedulingLog) { + this.schedulingLog.numberOfTurns++; + this.schedulingLog.totalProcessingTime += elapsedTime; + } + + setTimeout(() => { if (this.schedulingLog) { - this.schedulingLog.numberOfTurns++; - this.schedulingLog.totalProcessingTime += elapsedTime; + this.logger.sendTelemetryEvent({ + eventName: "InboundOpsPartialProcessingTime ", + duration: TelemetryLogger.formatTick(elapsedTime), + opsProcessed: this.lastSequenceNumber - this.firstSequenceNumber, + opsRemainingToProcess: this.deltaManager.inbound.length, + processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), + numberOfTurns: this.schedulingLog.numberOfTurns, + batchesProcessed: this.numberOfBatchesProcessed, + waitingToResume: TelemetryLogger.formatTick(performance.now() - currentTime), + }); } - } + this.deltaManager.inbound.resume(); + }); + + this.processingStartTime = undefined; } } } From a45e7e6aedc3d3ce3c1a3ceff2bb083cf7b86cbf Mon Sep 17 00:00:00 2001 From: Nicholas Couri Date: Sat, 9 Apr 2022 01:43:44 -0700 Subject: [PATCH 05/10] Moving every variable into the schedulingLog --- .../container-runtime/src/deltaScheduler.ts | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/packages/runtime/container-runtime/src/deltaScheduler.ts b/packages/runtime/container-runtime/src/deltaScheduler.ts index 3293090feedc..59c288b95a34 100644 --- a/packages/runtime/container-runtime/src/deltaScheduler.ts +++ b/packages/runtime/container-runtime/src/deltaScheduler.ts @@ -45,13 +45,11 @@ export class DeltaScheduler { opsRemainingToProcess: number; totalProcessingTime: number; numberOfTurns: number; + numberOfBatchesProcessed: number; + lastSequenceNumber: number; + firstSequenceNumber: number; } | undefined; - // Keeps track of the batching numbers so we can have an idea on how many ops were processed. - private numberOfBatchesProcessed: number = 0; - private lastSequenceNumber: number = 0; - private firstSequenceNumber: number = 0; - constructor( deltaManager: IDeltaManager, private readonly logger: ITelemetryLogger, @@ -63,25 +61,31 @@ export class DeltaScheduler { public batchBegin(message: ISequencedDocumentMessage) { if (!this.processingStartTime) { this.processingStartTime = performance.now(); - this.numberOfBatchesProcessed = 0; - this.firstSequenceNumber = this.lastSequenceNumber = message.sequenceNumber; - } - this.numberOfBatchesProcessed++; - } - - public batchEnd(message: ISequencedDocumentMessage) { - this.lastSequenceNumber = message.sequenceNumber; - if (this.schedulingLog === undefined) { - if (this.schedulingCount % 2000 === 0) { + if (this.schedulingLog === undefined && this.schedulingCount % 2000 === 0) { // Every 2000th time we are scheduling the inbound queue, we log telemetry for the // number of ops processed, the time and number of turns it took to process the ops. this.schedulingLog = { - opsRemainingToProcess: this.deltaManager.inbound.length, + opsRemainingToProcess: 0, numberOfTurns: 1, totalProcessingTime: 0, + numberOfBatchesProcessed: 0, + firstSequenceNumber: message.sequenceNumber, + lastSequenceNumber: message.sequenceNumber, }; } } + if (this.schedulingLog) { + this.schedulingLog.numberOfBatchesProcessed++; + } + } + + public batchEnd(message: ISequencedDocumentMessage) { + if (this.schedulingLog) { + this.schedulingLog.lastSequenceNumber = message.sequenceNumber; + if (this.schedulingCount % 2000 === 0) { + this.schedulingLog.opsRemainingToProcess = this.deltaManager.inbound.length; + } + } if (this.shouldRunScheduler()) { const currentTime = performance.now(); @@ -108,14 +112,15 @@ export class DeltaScheduler { setTimeout(() => { if (this.schedulingLog) { this.logger.sendTelemetryEvent({ - eventName: "InboundOpsPartialProcessingTime ", + eventName: "InboundOpsPartialProcessingTime", duration: TelemetryLogger.formatTick(elapsedTime), - opsProcessed: this.lastSequenceNumber - this.firstSequenceNumber, + opsProcessed: this.schedulingLog.lastSequenceNumber - + this.schedulingLog.firstSequenceNumber, opsRemainingToProcess: this.deltaManager.inbound.length, processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), numberOfTurns: this.schedulingLog.numberOfTurns, - batchesProcessed: this.numberOfBatchesProcessed, - waitingToResume: TelemetryLogger.formatTick(performance.now() - currentTime), + batchesProcessed: this.schedulingLog.numberOfBatchesProcessed, + timeToResume: TelemetryLogger.formatTick(performance.now() - currentTime), }); } this.deltaManager.inbound.resume(); @@ -138,8 +143,8 @@ export class DeltaScheduler { opsRemainingToProcess: this.schedulingLog.opsRemainingToProcess, numberOfTurns: this.schedulingLog.numberOfTurns, processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), - opsProcessed: this.lastSequenceNumber - this.firstSequenceNumber, - batchesProcessed: this.numberOfBatchesProcessed, + opsProcessed: this.schedulingLog.lastSequenceNumber - this.schedulingLog.firstSequenceNumber, + batchesProcessed: this.schedulingLog.numberOfBatchesProcessed, }); @@ -150,11 +155,6 @@ export class DeltaScheduler { // number of times we have done this. this.schedulingCount++; - // Batching data needs to be reset. - this.lastSequenceNumber = 0; - this.firstSequenceNumber = 0; - this.numberOfBatchesProcessed = 0; - // Reset the processing times. this.processingStartTime = undefined; this.totalProcessingTime = DeltaScheduler.processingTime; From a7c97d420a63735538d629ce6e588fb72826ea88 Mon Sep 17 00:00:00 2001 From: Nicholas Couri Date: Sat, 9 Apr 2022 05:28:11 -0700 Subject: [PATCH 06/10] small adjustment --- packages/runtime/container-runtime/src/deltaScheduler.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/runtime/container-runtime/src/deltaScheduler.ts b/packages/runtime/container-runtime/src/deltaScheduler.ts index 59c288b95a34..3a2e4ab92fb1 100644 --- a/packages/runtime/container-runtime/src/deltaScheduler.ts +++ b/packages/runtime/container-runtime/src/deltaScheduler.ts @@ -82,9 +82,7 @@ export class DeltaScheduler { public batchEnd(message: ISequencedDocumentMessage) { if (this.schedulingLog) { this.schedulingLog.lastSequenceNumber = message.sequenceNumber; - if (this.schedulingCount % 2000 === 0) { - this.schedulingLog.opsRemainingToProcess = this.deltaManager.inbound.length; - } + this.schedulingLog.opsRemainingToProcess = this.deltaManager.inbound.length; } if (this.shouldRunScheduler()) { From 5ace08661e82eb9fe2b4e7ae1e87145a2454aa40 Mon Sep 17 00:00:00 2001 From: Nicholas Couri Date: Sat, 9 Apr 2022 06:11:45 -0700 Subject: [PATCH 07/10] small adjustment --- .../container-runtime/src/deltaScheduler.ts | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/runtime/container-runtime/src/deltaScheduler.ts b/packages/runtime/container-runtime/src/deltaScheduler.ts index 3a2e4ab92fb1..5f4fbebbbf79 100644 --- a/packages/runtime/container-runtime/src/deltaScheduler.ts +++ b/packages/runtime/container-runtime/src/deltaScheduler.ts @@ -61,18 +61,18 @@ export class DeltaScheduler { public batchBegin(message: ISequencedDocumentMessage) { if (!this.processingStartTime) { this.processingStartTime = performance.now(); - if (this.schedulingLog === undefined && this.schedulingCount % 2000 === 0) { - // Every 2000th time we are scheduling the inbound queue, we log telemetry for the - // number of ops processed, the time and number of turns it took to process the ops. - this.schedulingLog = { - opsRemainingToProcess: 0, - numberOfTurns: 1, - totalProcessingTime: 0, - numberOfBatchesProcessed: 0, - firstSequenceNumber: message.sequenceNumber, - lastSequenceNumber: message.sequenceNumber, - }; - } + } + if (this.schedulingLog === undefined && this.schedulingCount % 2000 === 0) { + // Every 2000th time we are scheduling the inbound queue, we log telemetry for the + // number of ops processed, the time and number of turns it took to process the ops. + this.schedulingLog = { + opsRemainingToProcess: 0, + numberOfTurns: 1, + totalProcessingTime: 0, + numberOfBatchesProcessed: 0, + firstSequenceNumber: message.sequenceNumber, + lastSequenceNumber: message.sequenceNumber, + }; } if (this.schedulingLog) { this.schedulingLog.numberOfBatchesProcessed++; From b135f345ddf976e11456e188d9de429e5ee7d444 Mon Sep 17 00:00:00 2001 From: Nicholas Couri Date: Sat, 9 Apr 2022 06:30:25 -0700 Subject: [PATCH 08/10] Measure total End to End time --- packages/runtime/container-runtime/src/deltaScheduler.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/runtime/container-runtime/src/deltaScheduler.ts b/packages/runtime/container-runtime/src/deltaScheduler.ts index 5f4fbebbbf79..4e9c406703e9 100644 --- a/packages/runtime/container-runtime/src/deltaScheduler.ts +++ b/packages/runtime/container-runtime/src/deltaScheduler.ts @@ -48,6 +48,7 @@ export class DeltaScheduler { numberOfBatchesProcessed: number; lastSequenceNumber: number; firstSequenceNumber: number; + startTime: number; } | undefined; constructor( @@ -72,6 +73,7 @@ export class DeltaScheduler { numberOfBatchesProcessed: 0, firstSequenceNumber: message.sequenceNumber, lastSequenceNumber: message.sequenceNumber, + startTime: performance.now(), }; } if (this.schedulingLog) { @@ -133,8 +135,9 @@ export class DeltaScheduler { if (this.schedulingLog) { // Add the time taken for processing the final ops to the total processing time in the // telemetry log object. + const currentTime = performance.now(); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.schedulingLog.totalProcessingTime += performance.now() - this.processingStartTime!; + this.schedulingLog.totalProcessingTime += currentTime - this.processingStartTime!; this.logger.sendTelemetryEvent({ eventName: "InboundOpsProcessingTime", @@ -143,6 +146,7 @@ export class DeltaScheduler { processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), opsProcessed: this.schedulingLog.lastSequenceNumber - this.schedulingLog.firstSequenceNumber, batchesProcessed: this.schedulingLog.numberOfBatchesProcessed, + totalTime: TelemetryLogger.formatTick(currentTime - this.schedulingLog.startTime), }); From 9da1a08ca131caaa637d1143112642e18d019674 Mon Sep 17 00:00:00 2001 From: Nicholas Couri Date: Sat, 9 Apr 2022 19:48:53 -0700 Subject: [PATCH 09/10] change total time to duration --- packages/runtime/container-runtime/src/deltaScheduler.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/runtime/container-runtime/src/deltaScheduler.ts b/packages/runtime/container-runtime/src/deltaScheduler.ts index 4e9c406703e9..4b4eec87f34c 100644 --- a/packages/runtime/container-runtime/src/deltaScheduler.ts +++ b/packages/runtime/container-runtime/src/deltaScheduler.ts @@ -146,7 +146,7 @@ export class DeltaScheduler { processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), opsProcessed: this.schedulingLog.lastSequenceNumber - this.schedulingLog.firstSequenceNumber, batchesProcessed: this.schedulingLog.numberOfBatchesProcessed, - totalTime: TelemetryLogger.formatTick(currentTime - this.schedulingLog.startTime), + duration: TelemetryLogger.formatTick(currentTime - this.schedulingLog.startTime), }); From ce43901fb799b0a6fb9b5c9f20bf9d762f91ddfd Mon Sep 17 00:00:00 2001 From: Nicholas Couri Date: Mon, 11 Apr 2022 11:00:53 -0700 Subject: [PATCH 10/10] Addressing review comments. --- .../container-runtime/src/deltaScheduler.ts | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/packages/runtime/container-runtime/src/deltaScheduler.ts b/packages/runtime/container-runtime/src/deltaScheduler.ts index 4b4eec87f34c..2d357aa0f922 100644 --- a/packages/runtime/container-runtime/src/deltaScheduler.ts +++ b/packages/runtime/container-runtime/src/deltaScheduler.ts @@ -34,7 +34,7 @@ export class DeltaScheduler { private readonly processingTimeIncrement = 10; private processingStartTime: number | undefined; - private totalProcessingTime: number = DeltaScheduler.processingTime; + private currentAllowedProcessingTimeForTurn: number = DeltaScheduler.processingTime; // This keeps track of the number of times inbound queue has been scheduled. After a particular // count, we log telemetry for the number of ops processed, the time and number of turns it took @@ -63,7 +63,7 @@ export class DeltaScheduler { if (!this.processingStartTime) { this.processingStartTime = performance.now(); } - if (this.schedulingLog === undefined && this.schedulingCount % 2000 === 0) { + if (this.schedulingLog === undefined && this.schedulingCount % 500 === 0) { // Every 2000th time we are scheduling the inbound queue, we log telemetry for the // number of ops processed, the time and number of turns it took to process the ops. this.schedulingLog = { @@ -76,13 +76,11 @@ export class DeltaScheduler { startTime: performance.now(), }; } - if (this.schedulingLog) { - this.schedulingLog.numberOfBatchesProcessed++; - } } public batchEnd(message: ISequencedDocumentMessage) { if (this.schedulingLog) { + this.schedulingLog.numberOfBatchesProcessed++; this.schedulingLog.lastSequenceNumber = message.sequenceNumber; this.schedulingLog.opsRemainingToProcess = this.deltaManager.inbound.length; } @@ -91,7 +89,7 @@ export class DeltaScheduler { const currentTime = performance.now(); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const elapsedTime = currentTime - this.processingStartTime!; - if (elapsedTime > this.totalProcessingTime) { + if (elapsedTime > this.currentAllowedProcessingTimeForTurn) { // We have processed ops for more than the total processing time. So, pause the // queue, yield the thread and schedule a resume. @@ -101,7 +99,7 @@ export class DeltaScheduler { // Increase the total processing time. Keep doing this after each turn until all the ops have // been processed. This way we keep the responsiveness at the beginning while also making sure // that all the ops process fairly quickly. - this.totalProcessingTime += this.processingTimeIncrement; + this.currentAllowedProcessingTimeForTurn += this.processingTimeIncrement; // If we are logging the telemetry this time, update the telemetry log object. if (this.schedulingLog) { @@ -115,7 +113,7 @@ export class DeltaScheduler { eventName: "InboundOpsPartialProcessingTime", duration: TelemetryLogger.formatTick(elapsedTime), opsProcessed: this.schedulingLog.lastSequenceNumber - - this.schedulingLog.firstSequenceNumber, + this.schedulingLog.firstSequenceNumber + 1, opsRemainingToProcess: this.deltaManager.inbound.length, processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), numberOfTurns: this.schedulingLog.numberOfTurns, @@ -144,10 +142,10 @@ export class DeltaScheduler { opsRemainingToProcess: this.schedulingLog.opsRemainingToProcess, numberOfTurns: this.schedulingLog.numberOfTurns, processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime), - opsProcessed: this.schedulingLog.lastSequenceNumber - this.schedulingLog.firstSequenceNumber, + opsProcessed: this.schedulingLog.lastSequenceNumber - this.schedulingLog.firstSequenceNumber + 1, batchesProcessed: this.schedulingLog.numberOfBatchesProcessed, duration: TelemetryLogger.formatTick(currentTime - this.schedulingLog.startTime), - + schedulingCount: this.schedulingCount, }); this.schedulingLog = undefined; @@ -159,7 +157,7 @@ export class DeltaScheduler { // Reset the processing times. this.processingStartTime = undefined; - this.totalProcessingTime = DeltaScheduler.processingTime; + this.currentAllowedProcessingTimeForTurn = DeltaScheduler.processingTime; } /**