Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ContaineRuntime: Refactor batch processing code to support either op-by-op or batch-all-at-once semantics #22501

Merged
merged 14 commits into from
Sep 13, 2024
Merged
75 changes: 49 additions & 26 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ import { IBatchMetadata, ISavedOpMetadata } from "./metadata.js";
import {
BatchId,
BatchMessage,
BatchStartInfo,
DuplicateBatchDetector,
ensureContentsDeserialized,
IBatch,
Expand Down Expand Up @@ -2707,30 +2708,31 @@ export class ContainerRuntime
if (hasModernRuntimeMessageEnvelope) {
// If the message has the modern message envelope, then process it here.
// Here we unpack the message (decompress, unchunk, and/or ungroup) into a batch of messages with ContainerMessageType
const inboundBatch = this.remoteMessageProcessor.process(messageCopy, logLegacyCase);
if (inboundBatch === undefined) {
const inboundResult = this.remoteMessageProcessor.process(messageCopy, logLegacyCase);
if (inboundResult === undefined) {
// This means the incoming message is an incomplete part of a message or batch
// and we need to process more messages before the rest of the system can understand it.
return;
}

const result = this.duplicateBatchDetector?.processInboundBatch(inboundBatch);
const batchStart: BatchStartInfo = inboundResult.batchStart;
const result = this.duplicateBatchDetector?.processInboundBatch(batchStart);
if (result?.duplicate) {
const error = new DataCorruptionError(
"Duplicate batch - The same batch was sequenced twice",
{ batchId: inboundBatch.batchId },
{ batchId: batchStart.batchId },
);

this.mc.logger.sendTelemetryEvent(
{
eventName: "DuplicateBatch",
details: {
batchId: inboundBatch.batchId,
clientId: inboundBatch.clientId,
batchStartCsn: inboundBatch.batchStartCsn,
size: inboundBatch.messages.length,
batchId: batchStart.batchId,
clientId: batchStart.clientId,
batchStartCsn: batchStart.batchStartCsn,
size: inboundResult.length,
duplicateBatchSequenceNumber: result.otherSequenceNumber,
...extractSafePropertiesFromMessage(inboundBatch.keyMessage),
...extractSafePropertiesFromMessage(batchStart.keyMessage),
},
},
error,
Expand All @@ -2745,8 +2747,9 @@ export class ContainerRuntime
let messagesWithPendingState: {
message: ISequencedDocumentMessage;
localOpMetadata?: unknown;
}[] = this.pendingStateManager.processInboundBatch(inboundBatch, local);
if (inboundBatch.messages.length === 0) {
}[] = this.pendingStateManager.processInboundMessages(inboundResult, local);

if (inboundResult.length === 0) {
/**
* We need to process an empty batch, which will execute expected actions while processing even if there
* are no inner runtime messages.
Expand All @@ -2761,17 +2764,31 @@ export class ContainerRuntime
*/
messagesWithPendingState = [
{
message: inboundBatch.keyMessage,
message: inboundResult.batchStart.keyMessage,
localOpMetadata: undefined,
},
];
// Empty batch message is a non-runtime message as it was generated by the op grouping manager.
runtimeBatch = false;
}
this.processBatch(messagesWithPendingState, local, savedOp, runtimeBatch);

// This is trivial today, but when support for other types is added, we can quickly update this.
const locationInBatch: { batchStart: boolean; batchEnd: boolean } = {
batchStart: true,
batchEnd: true,
};

this.processInboundMessages(
messagesWithPendingState,
locationInBatch,
local,
savedOp,
runtimeBatch,
);
} else {
this.processBatch(
this.processInboundMessages(
[{ message: messageCopy, localOpMetadata: undefined }],
{ batchStart: true, batchEnd: true }, // Single message
local,
savedOp,
isRuntimeMessage(messageCopy) /* runtimeBatch */,
Expand All @@ -2789,28 +2806,32 @@ export class ContainerRuntime
private _processedClientSequenceNumber: number | undefined;

/**
* Processes a batch of messages. It calls schedule manager before and after processing the batch.
* @param batch - Batch of messages to process.
* Processes inbound batch message(s). It calls schedule manager according to the location in the batch of the message(s).
* @param messages - messages to process.
* @param locationInBatch - Are we processing the start and/or end of a batch?
* @param local - true if the messages were originally generated by the client receiving it.
* @param savedOp - true if the message is a replayed saved op.
* @param runtimeBatch - true if this is a batch of runtime messages.
* @param runtimeBatch - true if these are runtime messages.
*/
private processBatch(
batch: {
private processInboundMessages(
messages: {
message: ISequencedDocumentMessage;
localOpMetadata?: unknown;
}[],
locationInBatch: { batchStart: boolean; batchEnd: boolean },
local: boolean,
savedOp: boolean | undefined,
runtimeBatch: boolean,
) {
const firstMessage = batch[0]?.message;
assert(firstMessage !== undefined, 0xa31 /* Batch must have at least one message */);
this.scheduleManager.batchBegin(firstMessage);
if (locationInBatch.batchStart) {
const firstMessage = messages[0]?.message;
assert(firstMessage !== undefined, 0xa31 /* Batch must have at least one message */);
this.scheduleManager.batchBegin(firstMessage);
}

let error: unknown;
try {
batch.forEach(({ message, localOpMetadata }) => {
messages.forEach(({ message, localOpMetadata }) => {
this.ensureNoDataModelChanges(() => {
if (runtimeBatch) {
this.validateAndProcessRuntimeMessage({
Expand All @@ -2828,9 +2849,11 @@ export class ContainerRuntime
error = e;
throw error;
} finally {
const lastMessage = batch[batch.length - 1]?.message;
assert(lastMessage !== undefined, 0xa32 /* Batch must have at least one message */);
this.scheduleManager.batchEnd(error, lastMessage);
if (locationInBatch.batchEnd) {
const lastMessage = messages[messages.length - 1]?.message;
assert(lastMessage !== undefined, 0xa32 /* Batch must have at least one message */);
this.scheduleManager.batchEnd(error, lastMessage);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { asBatchMetadata, type IBatchMetadata } from "../metadata.js";
import type { IPendingMessage } from "../pendingStateManager.js";

import { BatchMessage, IBatch, IBatchCheckpoint } from "./definitions.js";
import type { InboundBatch } from "./remoteMessageProcessor.js";
import type { BatchStartInfo } from "./remoteMessageProcessor.js";

export interface IBatchManagerOptions {
readonly hardLimit: number;
Expand All @@ -37,15 +37,15 @@ export function generateBatchId(originalClientId: string, batchStartCsn: number)

/**
* Get the effective batch ID for the input argument.
* Supports either an IPendingMessage or an InboundBatch.
* Supports either an IPendingMessage or BatchStartInfo.
* If the batch ID is explicitly present, return it.
* Otherwise, generate a new batch ID using the client ID and batch start CSN.
*/
export function getEffectiveBatchId(
pendingMessageOrInboundBatch: IPendingMessage | InboundBatch,
pendingMessageOrBatchStartInfo: IPendingMessage | BatchStartInfo,
): string {
if ("localOpMetadata" in pendingMessageOrInboundBatch) {
const pendingMessage: IPendingMessage = pendingMessageOrInboundBatch;
if ("localOpMetadata" in pendingMessageOrBatchStartInfo) {
const pendingMessage: IPendingMessage = pendingMessageOrBatchStartInfo;
return (
asBatchMetadata(pendingMessage.opMetadata)?.batchId ??
generateBatchId(
Expand All @@ -55,10 +55,8 @@ export function getEffectiveBatchId(
);
}

const inboundBatch: InboundBatch = pendingMessageOrInboundBatch;
return (
inboundBatch.batchId ?? generateBatchId(inboundBatch.clientId, inboundBatch.batchStartCsn)
);
const batchStart: BatchStartInfo = pendingMessageOrBatchStartInfo;
return batchStart.batchId ?? generateBatchId(batchStart.clientId, batchStart.batchStartCsn);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { assert } from "@fluidframework/core-utils/internal";

import { getEffectiveBatchId } from "./batchManager.js";
import { type InboundBatch } from "./remoteMessageProcessor.js";
import { type BatchStartInfo } from "./remoteMessageProcessor.js";

/**
* This class tracks recent batchIds we've seen, and checks incoming batches for duplicates.
Expand All @@ -25,9 +25,9 @@ export class DuplicateBatchDetector {
* @remarks - We also use the minimumSequenceNumber to clear out old batchIds that are no longer at risk for duplicates.
*/
public processInboundBatch(
inboundBatch: InboundBatch,
batchStart: BatchStartInfo,
): { duplicate: true; otherSequenceNumber: number } | { duplicate: false } {
const { sequenceNumber, minimumSequenceNumber } = inboundBatch.keyMessage;
const { sequenceNumber, minimumSequenceNumber } = batchStart.keyMessage;

// Glance at this batch's MSN. Any batchIds we're tracking with a lower sequence number are now safe to forget.
// Why? Because any other client holding the same batch locally would have seen the earlier batch and closed before submitting its duplicate.
Expand All @@ -37,7 +37,7 @@ export class DuplicateBatchDetector {
// the original batch (not resubmitted, so no batchId) arrives in parallel with a resubmitted batch.
// In the presence of typical network conditions, this would not be possible
// (the original batch should roundtrip WAY before another container could rehydrate, connect, and resubmit)
const batchId = getEffectiveBatchId(inboundBatch);
const batchId = getEffectiveBatchId(batchStart);

// Check this batch against the tracked batchIds to see if it's a duplicate
if (this.batchIdsAll.has(batchId)) {
Expand Down
3 changes: 2 additions & 1 deletion packages/runtime/container-runtime/src/opLifecycle/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ export { OpDecompressor } from "./opDecompressor.js";
export { OpSplitter, splitOp, isChunkedMessage } from "./opSplitter.js";
export {
ensureContentsDeserialized,
InboundBatch,
InboundMessageResult,
BatchStartInfo,
RemoteMessageProcessor,
unpackRuntimeMessage,
} from "./remoteMessageProcessor.js";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import { OpDecompressor } from "./opDecompressor.js";
import { OpGroupingManager, isGroupedBatch } from "./opGroupingManager.js";
import { OpSplitter, isChunkedMessage } from "./opSplitter.js";

/** Messages being received as a batch, with details needed to process the batch */
export interface InboundBatch {
/** Messages in this batch */
readonly messages: InboundSequencedContainerRuntimeMessage[];
/** Info about the batch we learn when we process the first message */
export interface BatchStartInfo {
/** Batch ID, if present */
readonly batchId: string | undefined;
/** clientId that sent this batch. Used to compute Batch ID if needed */
Expand All @@ -46,6 +44,18 @@ export interface InboundBatch {
readonly keyMessage: ISequencedDocumentMessage;
}

/**
* Result of processing the next inbound message.
* Right now we only return full batches, but soon we will add support for individual messages within the batch too.
*/
// eslint-disable-next-line @typescript-eslint/consistent-type-definitions -- Preparing to add other union cases
export type InboundMessageResult = {
type: "fullBatch";
messages: InboundSequencedContainerRuntimeMessage[];
batchStart: BatchStartInfo;
length: number;
};

function assertHasClientId(
message: ISequencedDocumentMessage,
): asserts message is ISequencedDocumentMessage & { clientId: string } {
Expand All @@ -67,7 +77,9 @@ export class RemoteMessageProcessor {
*
* @remarks If undefined, we are expecting the next message to start a new batch.
*/
private batchInProgress: InboundBatch | undefined;
private batchInProgress:
| (BatchStartInfo & { messages: InboundSequencedContainerRuntimeMessage[] })
| undefined;

constructor(
private readonly opSplitter: OpSplitter,
Expand Down Expand Up @@ -105,7 +117,7 @@ export class RemoteMessageProcessor {
public process(
remoteMessageCopy: ISequencedDocumentMessage,
logLegacyCase: (codePath: string) => void,
): InboundBatch | undefined {
): InboundMessageResult | undefined {
let message = remoteMessageCopy;

assertHasClientId(message);
Expand Down Expand Up @@ -141,12 +153,17 @@ export class RemoteMessageProcessor {
);
const batchId = asBatchMetadata(message.metadata)?.batchId;
const groupedMessages = this.opGroupingManager.ungroupOp(message).map(unpack);

return {
type: "fullBatch",
messages: groupedMessages, // Will be [] for an empty batch
batchStartCsn: message.clientSequenceNumber,
clientId,
batchId,
keyMessage: groupedMessages[0] ?? message, // For an empty batch, this is the empty grouped batch message. Needed for sequence numbers for this batch
batchStart: {
batchStartCsn: message.clientSequenceNumber,
clientId,
batchId,
keyMessage: groupedMessages[0] ?? message, // For an empty batch, this is the empty grouped batch message. Needed for sequence numbers for this batch
},
length: groupedMessages.length, // Will be 0 for an empty batch
};
}

Expand All @@ -162,9 +179,15 @@ export class RemoteMessageProcessor {
return undefined;
}

const completedBatch = this.batchInProgress;
assert(this.batchInProgress !== undefined, "Completed batch should be non-empty");
const { messages, ...batchStart } = this.batchInProgress;
this.batchInProgress = undefined;
return completedBatch;
return {
type: "fullBatch",
messages,
batchStart,
length: messages.length,
};
}

/**
Expand Down
Loading
Loading