Skip to content

Commit

Permalink
ContainerRuntime: Process incoming batches op-by-op instead of waitin…
Browse files Browse the repository at this point in the history
…g for the whole batch (microsoft#22508)

We are concerned that holding batch messages in ContainerRuntime even
while DeltaManager advances its tracked sequence numbers through the
batch could have unintended consequences. So this PR restores the old
behavior of processing each message in a batch one-by-one, rather than
holding until the whole batch arrives.

Note that there's no change in behavior here for Grouped Batches.

### How the change works

PR microsoft#21785 switched the RemoteMessageProcessor from returning ungrouped
batch ops as they arrived, to holding them and finally returning the
whole batch once the last arrived. The downstream code was also updated
to take whole batches, whereas before it would take individual messages
and use the batch metadata to detect batch start/end.

Too many other changes were made after that PR to straight revert it.
Logic was added throughout CR and PSM that looks at info about that
batch which is found on the first message in the batch. So we can
reverse the change and process one-at-a-time, but we need a way to carry
around that "batch start info" with the first message in the batch.

So we are now modeling the result that RMP yields as one of three cases:

- A full batch of messages (could be from a single-message batch or a
Grouped Batch)
- The first message of a multi-message batch
- The next message in a multi-message batch

The first two cases include the "batch start info" needed for the recent
Offline work. The third case just indicates whether it's the last
message or not.

microsoft#22501 added some of the necessary structure, introducing the type for
"batch start info" and updating the downstream code to use that instead
of reading it off the old "Inbound Batch" type. This PR now adds those
other two cases to the RMP return type and handles processing them
throughout CR and PSM.
  • Loading branch information
markfields authored Sep 16, 2024
1 parent 92d35f9 commit 709f085
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 123 deletions.
16 changes: 16 additions & 0 deletions .changeset/metal-hornets-travel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
"@fluidframework/container-runtime": minor
---
---
"section": "fix"
---
Restored old op processing behavior around batched ops to avoid potential regression

There's a theoretical risk of indeterminate behavior due to a recent change to how batches of ops are processed.
This fix reverses that change.

Pull Request #21785 updated the ContainerRuntime to hold onto the messages in an incoming batch until they've all arrived, and only then process the set of messages.

While the batch is being processed, the DeltaManager and ContainerRuntime's view of the latest sequence numbers will be
out of sync. This may have unintended side effects, so out of an abundance of caution we're reversing this behavior until
we can add the proper protections to ensure the system stays properly in sync.
70 changes: 42 additions & 28 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2717,29 +2717,31 @@ export class ContainerRuntime
return;
}

const batchStart: BatchStartInfo = inboundResult.batchStart;
const result = this.duplicateBatchDetector?.processInboundBatch(batchStart);
if (result?.duplicate) {
const error = new DataCorruptionError(
"Duplicate batch - The same batch was sequenced twice",
{ batchId: batchStart.batchId },
);
if ("batchStart" in inboundResult) {
const batchStart: BatchStartInfo = inboundResult.batchStart;
const result = this.duplicateBatchDetector?.processInboundBatch(batchStart);
if (result?.duplicate) {
const error = new DataCorruptionError(
"Duplicate batch - The same batch was sequenced twice",
{ batchId: batchStart.batchId },
);

this.mc.logger.sendTelemetryEvent(
{
eventName: "DuplicateBatch",
details: {
batchId: batchStart.batchId,
clientId: batchStart.clientId,
batchStartCsn: batchStart.batchStartCsn,
size: inboundResult.length,
duplicateBatchSequenceNumber: result.otherSequenceNumber,
...extractSafePropertiesFromMessage(batchStart.keyMessage),
this.mc.logger.sendTelemetryEvent(
{
eventName: "DuplicateBatch",
details: {
batchId: batchStart.batchId,
clientId: batchStart.clientId,
batchStartCsn: batchStart.batchStartCsn,
size: inboundResult.length,
duplicateBatchSequenceNumber: result.otherSequenceNumber,
...extractSafePropertiesFromMessage(batchStart.keyMessage),
},
},
},
error,
);
throw error;
error,
);
throw error;
}
}

let runtimeBatch: boolean = true;
Expand All @@ -2751,7 +2753,18 @@ export class ContainerRuntime
localOpMetadata?: unknown;
}[] = this.pendingStateManager.processInboundMessages(inboundResult, local);

if (inboundResult.length === 0) {
if (inboundResult.type !== "fullBatch") {
assert(
messagesWithPendingState.length === 1,
"Partial batch should have exactly one message",
);
}

if (messagesWithPendingState.length === 0) {
assert(
inboundResult.type === "fullBatch",
"Empty batch is always considered a full batch",
);
/**
* We need to process an empty batch, which will execute expected actions while processing even if there
* are no inner runtime messages.
Expand All @@ -2774,11 +2787,12 @@ export class ContainerRuntime
runtimeBatch = false;
}

// This is trivial today, but when support for other types is added, we can quickly update this.
const locationInBatch: { batchStart: boolean; batchEnd: boolean } = {
batchStart: true,
batchEnd: true,
};
const locationInBatch: { batchStart: boolean; batchEnd: boolean } =
inboundResult.type === "fullBatch"
? { batchStart: true, batchEnd: true }
: inboundResult.type === "batchStartingMessage"
? { batchStart: true, batchEnd: false }
: { batchStart: false, batchEnd: inboundResult.batchEnd === true };

this.processInboundMessages(
messagesWithPendingState,
Expand Down Expand Up @@ -2808,7 +2822,7 @@ export class ContainerRuntime
private _processedClientSequenceNumber: number | undefined;

/**
* Processes inbound batch message(s). It calls schedule manager according to the location in the batch of the message(s).
* Processes inbound message(s). It calls schedule manager according to the messages' location in the batch.
* @param messages - messages to process.
* @param locationInBatch - Are we processing the start and/or end of a batch?
* @param local - true if the messages were originally generated by the client receiving it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,30 @@ export interface BatchStartInfo {

/**
* Result of processing the next inbound message.
* Right now we only return full batches, but soon we will add support for individual messages within the batch too.
* Depending on the message and configuration of RemoteMessageProcessor, the result may be:
* - A full batch of messages (including a single-message batch)
* - The first message of a multi-message batch
* - The next message in a multi-message batch
*/
// eslint-disable-next-line @typescript-eslint/consistent-type-definitions -- Preparing to add other union cases
export type InboundMessageResult = {
type: "fullBatch";
messages: InboundSequencedContainerRuntimeMessage[];
batchStart: BatchStartInfo;
length: number;
};
export type InboundMessageResult =
| {
type: "fullBatch";
messages: InboundSequencedContainerRuntimeMessage[];
batchStart: BatchStartInfo;
length: number;
}
| {
type: "batchStartingMessage";
batchStart: BatchStartInfo;
nextMessage: InboundSequencedContainerRuntimeMessage;
length?: never;
}
| {
type: "nextBatchMessage";
batchEnd?: boolean;
nextMessage: InboundSequencedContainerRuntimeMessage;
length?: never;
};

function assertHasClientId(
message: ISequencedDocumentMessage,
Expand All @@ -72,14 +87,7 @@ function assertHasClientId(
* @internal
*/
export class RemoteMessageProcessor {
/**
* The current batch being received, with details needed to process it.
*
* @remarks If undefined, we are expecting the next message to start a new batch.
*/
private batchInProgress:
| (BatchStartInfo & { messages: InboundSequencedContainerRuntimeMessage[] })
| undefined;
private batchInProgress: boolean = false;

constructor(
private readonly opSplitter: OpSplitter,
Expand Down Expand Up @@ -146,11 +154,8 @@ export class RemoteMessageProcessor {
}

if (isGroupedBatch(message)) {
// We should be awaiting a new batch (batchInProgress undefined)
assert(
this.batchInProgress === undefined,
0x9d3 /* Grouped batch interrupting another batch */,
);
// We should be awaiting a new batch (batchInProgress false)
assert(!this.batchInProgress, 0x9d3 /* Grouped batch interrupting another batch */);
const batchId = asBatchMetadata(message.metadata)?.batchId;
const groupedMessages = this.opGroupingManager.ungroupOp(message).map(unpack);

Expand All @@ -170,67 +175,63 @@ export class RemoteMessageProcessor {
// Do a final unpack of runtime messages in case the message was not grouped, compressed, or chunked
unpackRuntimeMessage(message, logLegacyCase);

const { batchEnded } = this.addMessageToBatch(
return this.getResultBasedOnBatchMetadata(
message as InboundSequencedContainerRuntimeMessage & { clientId: string },
);

if (!batchEnded) {
// batch not yet complete
return undefined;
}

assert(this.batchInProgress !== undefined, "Completed batch should be non-empty");
const { messages, ...batchStart } = this.batchInProgress;
this.batchInProgress = undefined;
return {
type: "fullBatch",
messages,
batchStart,
length: messages.length,
};
}

/**
* Add the given message to the current batch, and indicate whether the batch is now complete.
*
* @returns batchEnded: true if the batch is now complete, batchEnded: false if more messages are expected
* Now that the message has been "unwrapped" as to any virtualization (grouping, compression, chunking),
* inspect the batch metadata flag and determine what kind of result to return.
*/
private addMessageToBatch(
private getResultBasedOnBatchMetadata(
message: InboundSequencedContainerRuntimeMessage & { clientId: string },
): { batchEnded: boolean } {
): InboundMessageResult {
const batchMetadataFlag = asBatchMetadata(message.metadata)?.batch;
if (this.batchInProgress === undefined) {
if (!this.batchInProgress) {
// We are waiting for a new batch
assert(batchMetadataFlag !== false, 0x9d5 /* Unexpected batch end marker */);

// Start of a new multi-message batch
if (batchMetadataFlag === true) {
this.batchInProgress = {
messages: [message],
batchId: asBatchMetadata(message.metadata)?.batchId,
clientId: message.clientId,
batchStartCsn: message.clientSequenceNumber,
keyMessage: message,
this.batchInProgress = true;
return {
type: "batchStartingMessage",
batchStart: {
batchId: asBatchMetadata(message.metadata)?.batchId,
clientId: message.clientId,
batchStartCsn: message.clientSequenceNumber,
keyMessage: message,
},
nextMessage: message,
};

return { batchEnded: false };
}

// Single-message batch (Since metadata flag is undefined)
this.batchInProgress = {
return {
type: "fullBatch",
messages: [message],
batchStartCsn: message.clientSequenceNumber,
clientId: message.clientId,
batchId: asBatchMetadata(message.metadata)?.batchId,
keyMessage: message,
batchStart: {
batchStartCsn: message.clientSequenceNumber,
clientId: message.clientId,
batchId: asBatchMetadata(message.metadata)?.batchId,
keyMessage: message,
},
length: 1,
};
return { batchEnded: true };
}
assert(batchMetadataFlag !== true, 0x9d6 /* Unexpected batch start marker */);

this.batchInProgress.messages.push(message);
// Clear batchInProgress state if the batch is ending
if (batchMetadataFlag === false) {
this.batchInProgress = false;
}

return { batchEnded: batchMetadataFlag === false };
return {
type: "nextBatchMessage",
nextMessage: message,
batchEnd: batchMetadataFlag === false,
};
}
}

Expand Down
10 changes: 6 additions & 4 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ export class PendingStateManager implements IDisposable {

// An inbound remote batch should not match the pending batch ID for this client.
// That would indicate the container forked (two instances trying to submit the same local state)
if (this.remoteBatchMatchesPendingBatch(inbound.batchStart)) {
if ("batchStart" in inbound && this.remoteBatchMatchesPendingBatch(inbound.batchStart)) {
throw DataProcessingError.create(
"Forked Container Error! Matching batchIds but mismatched clientId",
"PendingStateManager.processInboundMessages",
Expand All @@ -375,7 +375,7 @@ export class PendingStateManager implements IDisposable {
}

// No localOpMetadata for remote messages
const messages = inbound.messages;
const messages = inbound.type === "fullBatch" ? inbound.messages : [inbound.nextMessage];
return messages.map((message) => ({ message }));
}

Expand All @@ -390,7 +390,9 @@ export class PendingStateManager implements IDisposable {
message: InboundSequencedContainerRuntimeMessage;
localOpMetadata: unknown;
}[] {
this.onLocalBatchBegin(inbound.batchStart, inbound.length);
if ("batchStart" in inbound) {
this.onLocalBatchBegin(inbound.batchStart, inbound.length);
}

// Empty batch
if (inbound.length === 0) {
Expand All @@ -404,7 +406,7 @@ export class PendingStateManager implements IDisposable {
return [];
}

const messages = inbound.messages;
const messages = inbound.type === "fullBatch" ? inbound.messages : [inbound.nextMessage];

return messages.map((message) => ({
message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,8 @@ describe("Runtime", () => {
replayPendingStates: () => {},
hasPendingMessages: (): boolean => pendingMessages > 0,
processInboundMessages: (inbound: InboundMessageResult, _local: boolean) => {
const messages = inbound.messages;
const messages =
inbound.type === "fullBatch" ? inbound.messages : [inbound.nextMessage];
return messages.map<{
message: InboundSequencedContainerRuntimeMessage;
localOpMetadata?: unknown;
Expand Down
Loading

0 comments on commit 709f085

Please sign in to comment.