Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging in DeltaScheduler #9788

Merged
merged 10 commits into from
Apr 11, 2022
6 changes: 3 additions & 3 deletions api-report/container-runtime.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ export interface ContainerRuntimeMessage {
export class DeltaScheduler {
constructor(deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>, logger: ITelemetryLogger);
// (undocumented)
batchBegin(): void;
batchBegin(message: ISequencedDocumentMessage): void;
// (undocumented)
batchEnd(): void;
batchEnd(message: ISequencedDocumentMessage): void;
// (undocumented)
static readonly processingTime = 20;
static readonly processingTime = 50;
}

// @public (undocumented)
Expand Down
6 changes: 3 additions & 3 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ export class ScheduleManager {

// This could be the beginning of a new batch or an individual message.
this.emitter.emit("batchBegin", message);
this.deltaScheduler.batchBegin();
this.deltaScheduler.batchBegin(message);

const batch = (message?.metadata as IRuntimeMessageMetadata)?.batch;
if (batch) {
Expand All @@ -653,7 +653,7 @@ export class ScheduleManager {
this.hitError = true;
this.batchClientId = undefined;
this.emitter.emit("batchEnd", error, message);
this.deltaScheduler.batchEnd();
this.deltaScheduler.batchEnd(message);
return;
}

Expand All @@ -663,7 +663,7 @@ export class ScheduleManager {
if (this.batchClientId === undefined || batch === false) {
this.batchClientId = undefined;
this.emitter.emit("batchEnd", undefined, message);
this.deltaScheduler.batchEnd();
this.deltaScheduler.batchEnd(message);
return;
}
}
Expand Down
104 changes: 65 additions & 39 deletions packages/runtime/container-runtime/src/deltaScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import {
ISequencedDocumentMessage,
} from "@fluidframework/protocol-definitions";

import {
TelemetryLogger,
} from "@fluidframework/telemetry-utils";
/**
* DeltaScheduler is responsible for the scheduling of inbound delta queue in cases where there
* is more than one op a particular run of the queue. It does not schedule if there is just one
Expand All @@ -25,28 +28,27 @@ import {
export class DeltaScheduler {
private readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
// The time for processing ops in a single turn.
public static readonly processingTime = 20;
public static readonly processingTime = 50;
NicholasCouri marked this conversation as resolved.
Show resolved Hide resolved

// The increase in time for processing ops after each turn.
private readonly processingTimeIncrement = 10;

private processingStartTime: number | undefined;
private totalProcessingTime: number = DeltaScheduler.processingTime;

// This keeps track of whether the delta scheduler is scheduling a particular run of the
// the inbound delta queue. Basically, every time the delta queue starts processing with
// more than one op, this will be set to true until the run completes.
private isScheduling: boolean = false;
private currentAllowedProcessingTimeForTurn: number = DeltaScheduler.processingTime;

// This keeps track of the number of times inbound queue has been scheduled. After a particular
// count, we log telemetry for the number of ops processed, the time and number of turns it took
// to process the ops.
private schedulingCount: number = 0;

private schedulingLog: {
numberOfOps: number;
opsRemainingToProcess: number;
totalProcessingTime: number;
numberOfTurns: number;
numberOfBatchesProcessed: number;
lastSequenceNumber: number;
firstSequenceNumber: number;
startTime: number;
} | undefined;

constructor(
Expand All @@ -57,50 +59,72 @@ export class DeltaScheduler {
this.deltaManager.inbound.on("idle", () => { this.inboundQueueIdle(); });
}

public batchBegin() {
public batchBegin(message: ISequencedDocumentMessage) {
if (!this.processingStartTime) {
this.processingStartTime = performance.now();
}
if (this.schedulingLog === undefined && this.schedulingCount % 500 === 0) {
// Every 2000th time we are scheduling the inbound queue, we log telemetry for the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this should say 500th time now.

// number of ops processed, the time and number of turns it took to process the ops.
this.schedulingLog = {
opsRemainingToProcess: 0,
numberOfTurns: 1,
totalProcessingTime: 0,
numberOfBatchesProcessed: 0,
firstSequenceNumber: message.sequenceNumber,
lastSequenceNumber: message.sequenceNumber,
startTime: performance.now(),
};
}
}

public batchEnd() {
if (this.shouldRunScheduler()) {
if (!this.isScheduling) {
this.isScheduling = true;
// Every 2000th time we are scheduling the inbound queue, we log telemetry for the
// number of ops processed, the time and number of turns it took to process the ops.
if (this.schedulingCount % 2000 === 0) {
this.schedulingLog = {
numberOfOps: this.deltaManager.inbound.length,
numberOfTurns: 1,
totalProcessingTime: 0,
};
}
}
public batchEnd(message: ISequencedDocumentMessage) {
if (this.schedulingLog) {
this.schedulingLog.numberOfBatchesProcessed++;
this.schedulingLog.lastSequenceNumber = message.sequenceNumber;
this.schedulingLog.opsRemainingToProcess = this.deltaManager.inbound.length;
}

if (this.shouldRunScheduler()) {
const currentTime = performance.now();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const elapsedTime = performance.now() - this.processingStartTime!;
if (elapsedTime > this.totalProcessingTime) {
const elapsedTime = currentTime - this.processingStartTime!;
if (elapsedTime > this.currentAllowedProcessingTimeForTurn) {
// We have processed ops for more than the total processing time. So, pause the
// queue, yield the thread and schedule a resume.

// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.deltaManager.inbound.pause();
setTimeout(() => {
this.deltaManager.inbound.resume();
});

this.processingStartTime = undefined;
// Increase the total processing time. Keep doing this after each turn until all the ops have
// Increase the total processing time. Keep doing this after each turn until all the ops have
// been processed. This way we keep the responsiveness at the beginning while also making sure
// that all the ops process fairly quickly.
this.totalProcessingTime += this.processingTimeIncrement;
this.currentAllowedProcessingTimeForTurn += this.processingTimeIncrement;

// If we are logging the telemetry this time, update the telemetry log object.
if (this.schedulingLog) {
this.schedulingLog.numberOfTurns++;
this.schedulingLog.totalProcessingTime += elapsedTime;
Copy link
Contributor

@vladsud vladsud Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend naming these two variables differently, i.e. find better name (fort first one) - it's confusing to have exactly same names with quite different meaning / semantics:
this.totalProcessingTime
this.schedulingLog.totalProcessingTime

#Resolved

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point - changed the this.totalProcessingTime to currentAllowedProcessingTimeForTurn

}

setTimeout(() => {
if (this.schedulingLog) {
NicholasCouri marked this conversation as resolved.
Show resolved Hide resolved
this.logger.sendTelemetryEvent({
eventName: "InboundOpsPartialProcessingTime",
duration: TelemetryLogger.formatTick(elapsedTime),
opsProcessed: this.schedulingLog.lastSequenceNumber -
this.schedulingLog.firstSequenceNumber + 1,
opsRemainingToProcess: this.deltaManager.inbound.length,
Copy link
Contributor

@vladsud vladsud Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opsRemainingToProcess should be always zero here, as "idle" event is fired by queue only when there are no more elements in the queue. I'd remove it from logging here. #Resolved

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not on idle event but before we resume the inbound queue. I was assuming this could be non-zero here. Isn't it the case ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fine here, comment should have been for InboundOpsProcessingTime event :) There it should be always zero

processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime),
numberOfTurns: this.schedulingLog.numberOfTurns,
batchesProcessed: this.schedulingLog.numberOfBatchesProcessed,
timeToResume: TelemetryLogger.formatTick(performance.now() - currentTime),
});
}
this.deltaManager.inbound.resume();
});

this.processingStartTime = undefined;
}
}
}
Expand All @@ -109,29 +133,31 @@ export class DeltaScheduler {
if (this.schedulingLog) {
// Add the time taken for processing the final ops to the total processing time in the
// telemetry log object.
const currentTime = performance.now();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.schedulingLog.totalProcessingTime += performance.now() - this.processingStartTime!;
this.schedulingLog.totalProcessingTime += currentTime - this.processingStartTime!;

this.logger.sendTelemetryEvent({
eventName: "InboundOpsProcessingTime",
numberOfOps: this.schedulingLog.numberOfOps,
Copy link
Contributor

@vladsud vladsud Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to capture here this.schedulingCount. This allows us to see various skews, i.e. see if start of the session sees more pausing than middle of the session. #Resolved

opsRemainingToProcess: this.schedulingLog.opsRemainingToProcess,
numberOfTurns: this.schedulingLog.numberOfTurns,
processingTime: this.schedulingLog.totalProcessingTime,
processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime),
opsProcessed: this.schedulingLog.lastSequenceNumber - this.schedulingLog.firstSequenceNumber + 1,
batchesProcessed: this.schedulingLog.numberOfBatchesProcessed,
duration: TelemetryLogger.formatTick(currentTime - this.schedulingLog.startTime),
schedulingCount: this.schedulingCount,
});

this.schedulingLog = undefined;
}

// If we scheduled this batch of the inbound queue, increment the counter that tracks the
// number of times we have done this.
if (this.isScheduling) {
this.isScheduling = false;
this.schedulingCount++;
}
this.schedulingCount++;

// Reset the processing times.
this.processingStartTime = undefined;
this.totalProcessingTime = DeltaScheduler.processingTime;
this.currentAllowedProcessingTimeForTurn = DeltaScheduler.processingTime;
}

/**
Expand Down