diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index f20aa8dc7bc0..e75e0de746d7 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -171,6 +171,7 @@ import { IBatchMetadata, ISavedOpMetadata } from "./metadata.js"; import { BatchId, BatchMessage, + BatchStartInfo, DuplicateBatchDetector, ensureContentsDeserialized, IBatch, @@ -2707,30 +2708,31 @@ export class ContainerRuntime if (hasModernRuntimeMessageEnvelope) { // If the message has the modern message envelope, then process it here. // Here we unpack the message (decompress, unchunk, and/or ungroup) into a batch of messages with ContainerMessageType - const inboundBatch = this.remoteMessageProcessor.process(messageCopy, logLegacyCase); - if (inboundBatch === undefined) { + const inboundResult = this.remoteMessageProcessor.process(messageCopy, logLegacyCase); + if (inboundResult === undefined) { // This means the incoming message is an incomplete part of a message or batch // and we need to process more messages before the rest of the system can understand it. return; } - const result = this.duplicateBatchDetector?.processInboundBatch(inboundBatch); + const batchStart: BatchStartInfo = inboundResult.batchStart; + const result = this.duplicateBatchDetector?.processInboundBatch(batchStart); if (result?.duplicate) { const error = new DataCorruptionError( "Duplicate batch - The same batch was sequenced twice", - { batchId: inboundBatch.batchId }, + { batchId: batchStart.batchId }, ); this.mc.logger.sendTelemetryEvent( { eventName: "DuplicateBatch", details: { - batchId: inboundBatch.batchId, - clientId: inboundBatch.clientId, - batchStartCsn: inboundBatch.batchStartCsn, - size: inboundBatch.messages.length, + batchId: batchStart.batchId, + clientId: batchStart.clientId, + batchStartCsn: batchStart.batchStartCsn, + size: inboundResult.length, duplicateBatchSequenceNumber: result.otherSequenceNumber, - ...extractSafePropertiesFromMessage(inboundBatch.keyMessage), + ...extractSafePropertiesFromMessage(batchStart.keyMessage), }, }, error, @@ -2745,8 +2747,9 @@ export class ContainerRuntime let messagesWithPendingState: { message: ISequencedDocumentMessage; localOpMetadata?: unknown; - }[] = this.pendingStateManager.processInboundBatch(inboundBatch, local); - if (inboundBatch.messages.length === 0) { + }[] = this.pendingStateManager.processInboundMessages(inboundResult, local); + + if (inboundResult.length === 0) { /** * We need to process an empty batch, which will execute expected actions while processing even if there * are no inner runtime messages. @@ -2761,17 +2764,31 @@ export class ContainerRuntime */ messagesWithPendingState = [ { - message: inboundBatch.keyMessage, + message: inboundResult.batchStart.keyMessage, localOpMetadata: undefined, }, ]; // Empty batch message is a non-runtime message as it was generated by the op grouping manager. runtimeBatch = false; } - this.processBatch(messagesWithPendingState, local, savedOp, runtimeBatch); + + // This is trivial today, but when support for other types is added, we can quickly update this. + const locationInBatch: { batchStart: boolean; batchEnd: boolean } = { + batchStart: true, + batchEnd: true, + }; + + this.processInboundMessages( + messagesWithPendingState, + locationInBatch, + local, + savedOp, + runtimeBatch, + ); } else { - this.processBatch( + this.processInboundMessages( [{ message: messageCopy, localOpMetadata: undefined }], + { batchStart: true, batchEnd: true }, // Single message local, savedOp, isRuntimeMessage(messageCopy) /* runtimeBatch */, @@ -2789,28 +2806,32 @@ export class ContainerRuntime private _processedClientSequenceNumber: number | undefined; /** - * Processes a batch of messages. It calls schedule manager before and after processing the batch. - * @param batch - Batch of messages to process. + * Processes inbound batch message(s). It calls schedule manager according to the location in the batch of the message(s). + * @param messages - messages to process. + * @param locationInBatch - Are we processing the start and/or end of a batch? * @param local - true if the messages were originally generated by the client receiving it. * @param savedOp - true if the message is a replayed saved op. - * @param runtimeBatch - true if this is a batch of runtime messages. + * @param runtimeBatch - true if these are runtime messages. */ - private processBatch( - batch: { + private processInboundMessages( + messages: { message: ISequencedDocumentMessage; localOpMetadata?: unknown; }[], + locationInBatch: { batchStart: boolean; batchEnd: boolean }, local: boolean, savedOp: boolean | undefined, runtimeBatch: boolean, ) { - const firstMessage = batch[0]?.message; - assert(firstMessage !== undefined, 0xa31 /* Batch must have at least one message */); - this.scheduleManager.batchBegin(firstMessage); + if (locationInBatch.batchStart) { + const firstMessage = messages[0]?.message; + assert(firstMessage !== undefined, 0xa31 /* Batch must have at least one message */); + this.scheduleManager.batchBegin(firstMessage); + } let error: unknown; try { - batch.forEach(({ message, localOpMetadata }) => { + messages.forEach(({ message, localOpMetadata }) => { this.ensureNoDataModelChanges(() => { if (runtimeBatch) { this.validateAndProcessRuntimeMessage({ @@ -2828,9 +2849,11 @@ export class ContainerRuntime error = e; throw error; } finally { - const lastMessage = batch[batch.length - 1]?.message; - assert(lastMessage !== undefined, 0xa32 /* Batch must have at least one message */); - this.scheduleManager.batchEnd(error, lastMessage); + if (locationInBatch.batchEnd) { + const lastMessage = messages[messages.length - 1]?.message; + assert(lastMessage !== undefined, 0xa32 /* Batch must have at least one message */); + this.scheduleManager.batchEnd(error, lastMessage); + } } } diff --git a/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts b/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts index c69fcb333904..197289b8d6f6 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts @@ -10,7 +10,7 @@ import { asBatchMetadata, type IBatchMetadata } from "../metadata.js"; import type { IPendingMessage } from "../pendingStateManager.js"; import { BatchMessage, IBatch, IBatchCheckpoint } from "./definitions.js"; -import type { InboundBatch } from "./remoteMessageProcessor.js"; +import type { BatchStartInfo } from "./remoteMessageProcessor.js"; export interface IBatchManagerOptions { readonly hardLimit: number; @@ -37,15 +37,15 @@ export function generateBatchId(originalClientId: string, batchStartCsn: number) /** * Get the effective batch ID for the input argument. - * Supports either an IPendingMessage or an InboundBatch. + * Supports either an IPendingMessage or BatchStartInfo. * If the batch ID is explicitly present, return it. * Otherwise, generate a new batch ID using the client ID and batch start CSN. */ export function getEffectiveBatchId( - pendingMessageOrInboundBatch: IPendingMessage | InboundBatch, + pendingMessageOrBatchStartInfo: IPendingMessage | BatchStartInfo, ): string { - if ("localOpMetadata" in pendingMessageOrInboundBatch) { - const pendingMessage: IPendingMessage = pendingMessageOrInboundBatch; + if ("localOpMetadata" in pendingMessageOrBatchStartInfo) { + const pendingMessage: IPendingMessage = pendingMessageOrBatchStartInfo; return ( asBatchMetadata(pendingMessage.opMetadata)?.batchId ?? generateBatchId( @@ -55,10 +55,8 @@ export function getEffectiveBatchId( ); } - const inboundBatch: InboundBatch = pendingMessageOrInboundBatch; - return ( - inboundBatch.batchId ?? generateBatchId(inboundBatch.clientId, inboundBatch.batchStartCsn) - ); + const batchStart: BatchStartInfo = pendingMessageOrBatchStartInfo; + return batchStart.batchId ?? generateBatchId(batchStart.clientId, batchStart.batchStartCsn); } /** diff --git a/packages/runtime/container-runtime/src/opLifecycle/duplicateBatchDetector.ts b/packages/runtime/container-runtime/src/opLifecycle/duplicateBatchDetector.ts index 2283980492d8..cd5b5110bd45 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/duplicateBatchDetector.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/duplicateBatchDetector.ts @@ -6,7 +6,7 @@ import { assert } from "@fluidframework/core-utils/internal"; import { getEffectiveBatchId } from "./batchManager.js"; -import { type InboundBatch } from "./remoteMessageProcessor.js"; +import { type BatchStartInfo } from "./remoteMessageProcessor.js"; /** * This class tracks recent batchIds we've seen, and checks incoming batches for duplicates. @@ -25,9 +25,9 @@ export class DuplicateBatchDetector { * @remarks - We also use the minimumSequenceNumber to clear out old batchIds that are no longer at risk for duplicates. */ public processInboundBatch( - inboundBatch: InboundBatch, + batchStart: BatchStartInfo, ): { duplicate: true; otherSequenceNumber: number } | { duplicate: false } { - const { sequenceNumber, minimumSequenceNumber } = inboundBatch.keyMessage; + const { sequenceNumber, minimumSequenceNumber } = batchStart.keyMessage; // Glance at this batch's MSN. Any batchIds we're tracking with a lower sequence number are now safe to forget. // Why? Because any other client holding the same batch locally would have seen the earlier batch and closed before submitting its duplicate. @@ -37,7 +37,7 @@ export class DuplicateBatchDetector { // the original batch (not resubmitted, so no batchId) arrives in parallel with a resubmitted batch. // In the presence of typical network conditions, this would not be possible // (the original batch should roundtrip WAY before another container could rehydrate, connect, and resubmit) - const batchId = getEffectiveBatchId(inboundBatch); + const batchId = getEffectiveBatchId(batchStart); // Check this batch against the tracked batchIds to see if it's a duplicate if (this.batchIdsAll.has(batchId)) { diff --git a/packages/runtime/container-runtime/src/opLifecycle/index.ts b/packages/runtime/container-runtime/src/opLifecycle/index.ts index e5b5be0e0e4e..ea08e63f1c83 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/index.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/index.ts @@ -20,7 +20,8 @@ export { OpDecompressor } from "./opDecompressor.js"; export { OpSplitter, splitOp, isChunkedMessage } from "./opSplitter.js"; export { ensureContentsDeserialized, - InboundBatch, + InboundMessageResult, + BatchStartInfo, RemoteMessageProcessor, unpackRuntimeMessage, } from "./remoteMessageProcessor.js"; diff --git a/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts b/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts index b1ac224366e1..01b6ac1f42e5 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts @@ -21,10 +21,8 @@ import { OpDecompressor } from "./opDecompressor.js"; import { OpGroupingManager, isGroupedBatch } from "./opGroupingManager.js"; import { OpSplitter, isChunkedMessage } from "./opSplitter.js"; -/** Messages being received as a batch, with details needed to process the batch */ -export interface InboundBatch { - /** Messages in this batch */ - readonly messages: InboundSequencedContainerRuntimeMessage[]; +/** Info about the batch we learn when we process the first message */ +export interface BatchStartInfo { /** Batch ID, if present */ readonly batchId: string | undefined; /** clientId that sent this batch. Used to compute Batch ID if needed */ @@ -46,6 +44,18 @@ export interface InboundBatch { readonly keyMessage: ISequencedDocumentMessage; } +/** + * Result of processing the next inbound message. + * Right now we only return full batches, but soon we will add support for individual messages within the batch too. + */ +// eslint-disable-next-line @typescript-eslint/consistent-type-definitions -- Preparing to add other union cases +export type InboundMessageResult = { + type: "fullBatch"; + messages: InboundSequencedContainerRuntimeMessage[]; + batchStart: BatchStartInfo; + length: number; +}; + function assertHasClientId( message: ISequencedDocumentMessage, ): asserts message is ISequencedDocumentMessage & { clientId: string } { @@ -67,7 +77,9 @@ export class RemoteMessageProcessor { * * @remarks If undefined, we are expecting the next message to start a new batch. */ - private batchInProgress: InboundBatch | undefined; + private batchInProgress: + | (BatchStartInfo & { messages: InboundSequencedContainerRuntimeMessage[] }) + | undefined; constructor( private readonly opSplitter: OpSplitter, @@ -105,7 +117,7 @@ export class RemoteMessageProcessor { public process( remoteMessageCopy: ISequencedDocumentMessage, logLegacyCase: (codePath: string) => void, - ): InboundBatch | undefined { + ): InboundMessageResult | undefined { let message = remoteMessageCopy; assertHasClientId(message); @@ -141,12 +153,17 @@ export class RemoteMessageProcessor { ); const batchId = asBatchMetadata(message.metadata)?.batchId; const groupedMessages = this.opGroupingManager.ungroupOp(message).map(unpack); + return { + type: "fullBatch", messages: groupedMessages, // Will be [] for an empty batch - batchStartCsn: message.clientSequenceNumber, - clientId, - batchId, - keyMessage: groupedMessages[0] ?? message, // For an empty batch, this is the empty grouped batch message. Needed for sequence numbers for this batch + batchStart: { + batchStartCsn: message.clientSequenceNumber, + clientId, + batchId, + keyMessage: groupedMessages[0] ?? message, // For an empty batch, this is the empty grouped batch message. Needed for sequence numbers for this batch + }, + length: groupedMessages.length, // Will be 0 for an empty batch }; } @@ -162,9 +179,15 @@ export class RemoteMessageProcessor { return undefined; } - const completedBatch = this.batchInProgress; + assert(this.batchInProgress !== undefined, "Completed batch should be non-empty"); + const { messages, ...batchStart } = this.batchInProgress; this.batchInProgress = undefined; - return completedBatch; + return { + type: "fullBatch", + messages, + batchStart, + length: messages.length, + }; } /** diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index 45b40176829d..921fc6878382 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -24,7 +24,8 @@ import { BatchId, BatchMessage, getEffectiveBatchId, - InboundBatch, + BatchStartInfo, + InboundMessageResult, } from "./opLifecycle/index.js"; /** @@ -323,10 +324,10 @@ export class PendingStateManager implements IDisposable { /** * Compares the batch ID of the incoming batch with the pending batch ID for this client. * They should not match, as that would indicate a forked container. - * @param remoteBatch - An incoming batch *NOT* submitted by this client + * @param remoteBatchStart - BatchStartInfo for an incoming batch *NOT* submitted by this client * @returns whether the batch IDs match */ - private remoteBatchMatchesPendingBatch(remoteBatch: InboundBatch): boolean { + private remoteBatchMatchesPendingBatch(remoteBatchStart: BatchStartInfo): boolean { // We may have no pending changes - if so, no match, no problem. const pendingMessage = this.pendingMessages.peekFront(); if (pendingMessage === undefined) { @@ -336,63 +337,66 @@ export class PendingStateManager implements IDisposable { // We must compare the effective batch IDs, since one of these ops // may have been the original, not resubmitted, so wouldn't have its batch ID stamped yet. const pendingBatchId = getEffectiveBatchId(pendingMessage); - const inboundBatchId = getEffectiveBatchId(remoteBatch); + const inboundBatchId = getEffectiveBatchId(remoteBatchStart); return pendingBatchId === inboundBatchId; } /** - * Processes an inbound batch of messages - May be local or remote. + * Processes an inbound message or batch of messages - May be local or remote. * - * @param batch - The inbound batch of messages to process. Could be local or remote. - * @param local - true if we submitted this batch and expect corresponding pending messages - * @returns The inbound batch's messages with localOpMetadata "zipped" in. + * @param inbound - The inbound message(s) to process, with extra info (e.g. about the start of a batch). Could be local or remote. + * @param local - true if we submitted these messages and expect corresponding pending messages + * @returns The inbound messages with localOpMetadata "zipped" in. * * @throws a DataProcessingError in either of these cases: - * - The pending message content doesn't match the incoming message content for any message in the batch - * - The batch IDs *do match* but it's not a local batch (indicates Container forking). + * - The pending message content doesn't match the incoming message content for any message here + * - The batch IDs *do match* but it's not local (indicates Container forking). */ - public processInboundBatch( - batch: InboundBatch, + public processInboundMessages( + inbound: InboundMessageResult, local: boolean, ): { message: InboundSequencedContainerRuntimeMessage; localOpMetadata?: unknown; }[] { if (local) { - return this.processPendingLocalBatch(batch); + return this.processPendingLocalMessages(inbound); } // An inbound remote batch should not match the pending batch ID for this client. // That would indicate the container forked (two instances trying to submit the same local state) - if (this.remoteBatchMatchesPendingBatch(batch)) { + if (this.remoteBatchMatchesPendingBatch(inbound.batchStart)) { throw DataProcessingError.create( "Forked Container Error! Matching batchIds but mismatched clientId", - "PendingStateManager.processInboundBatch", - batch.keyMessage, + "PendingStateManager.processInboundMessages", + inbound.batchStart.keyMessage, ); } // No localOpMetadata for remote messages - return batch.messages.map((message) => ({ message })); + const messages = inbound.messages; + return messages.map((message) => ({ message })); } /** - * Processes the incoming batch from the server that was submitted by this client. - * It verifies that messages are received in the right order and that the batch information is correct. - * @param batch - The inbound batch (originating from this client) to correlate with the pending local state - * @throws DataProcessingError if the pending message content doesn't match the incoming message content for any message in the batch. - * @returns The inbound batch's messages with localOpMetadata "zipped" in. + * Processes the incoming message(s) from the server that were submitted by this client. + * It verifies that messages are received in the right order and that any batch information is correct. + * @param inbound - The inbound message(s) (originating from this client) to correlate with the pending local state + * @throws DataProcessingError if the pending message content doesn't match the incoming message content for any message here + * @returns The inbound messages with localOpMetadata "zipped" in. */ - private processPendingLocalBatch(batch: InboundBatch): { + private processPendingLocalMessages(inbound: InboundMessageResult): { message: InboundSequencedContainerRuntimeMessage; localOpMetadata: unknown; }[] { - this.onLocalBatchBegin(batch); + this.onLocalBatchBegin(inbound.batchStart, inbound.length); // Empty batch - if (batch.messages.length === 0) { - const localOpMetadata = this.processNextPendingMessage(batch.keyMessage.sequenceNumber); + if (inbound.length === 0) { + const localOpMetadata = this.processNextPendingMessage( + inbound.batchStart.keyMessage.sequenceNumber, + ); assert( asEmptyBatchLocalOpMetadata(localOpMetadata)?.emptyBatch === true, 0xa20 /* Expected empty batch marker */, @@ -400,7 +404,9 @@ export class PendingStateManager implements IDisposable { return []; } - return batch.messages.map((message) => ({ + const messages = inbound.messages; + + return messages.map((message) => ({ message, localOpMetadata: this.processNextPendingMessage(message.sequenceNumber, message), })); @@ -472,7 +478,7 @@ export class PendingStateManager implements IDisposable { /** * Check if the incoming batch matches the batch info for the next pending message. */ - private onLocalBatchBegin(batch: InboundBatch) { + private onLocalBatchBegin(batchStart: BatchStartInfo, batchLength?: number) { // Get the next message from the pending queue. Verify a message exists. const pendingMessage = this.pendingMessages.peekFront(); assert( @@ -484,37 +490,38 @@ export class PendingStateManager implements IDisposable { // and the next pending message should be an empty batch marker. // More Info: We must submit empty batches and track them in case a different fork // of this container also submitted the same batch (and it may not be empty for that fork). - const firstMessage = batch.messages[0]; + const firstMessage = batchStart.keyMessage; + // -1 length is for back compat, undefined length means we actually don't know it + const skipLengthCheck = + pendingMessage.batchInfo.length === -1 || batchLength === undefined; const expectedPendingBatchLength = - pendingMessage.batchInfo.length === -1 - ? -1 // Ignore the actual incoming length; -1 length is for back compat so force this to match - : batch.messages.length === 0 - ? 1 // For an empty batch, expect a singleton array with the empty batch marker - : batch.messages.length; // Otherwise, the lengths should actually match + batchLength === 0 + ? 1 // For an empty batch, expect a singleton array with the empty batch marker + : batchLength; // Otherwise, the lengths should actually match // Note: We don't need to use getEffectiveBatchId here, just check the explicit stamped batchID // That logic is needed only when comparing across potential container forks. // Furthermore, we also are comparing the batch IDs constituent data - clientId (it's local) and batchStartCsn. const pendingBatchId = asBatchMetadata(pendingMessage.opMetadata)?.batchId; - const inboundBatchId = batch.batchId; + const inboundBatchId = batchStart.batchId; // We expect the incoming batch to be of the same length, starting at the same clientSequenceNumber, // as the batch we originally submitted. The batchIds should match as well, if set (or neither should be set) // We have another later check to compare the message contents, which we'd expect to fail if this check does, // so we don't throw here, merely log. In a later release this check may replace that one since it's cheaper. if ( - pendingMessage.batchInfo.batchStartCsn !== batch.batchStartCsn || - pendingMessage.batchInfo.length !== expectedPendingBatchLength || + pendingMessage.batchInfo.batchStartCsn !== batchStart.batchStartCsn || + (!skipLengthCheck && pendingMessage.batchInfo.length !== expectedPendingBatchLength) || pendingBatchId !== inboundBatchId ) { this.logger?.sendErrorEvent({ eventName: "BatchInfoMismatch", details: { pendingBatchCsn: pendingMessage.batchInfo.batchStartCsn, - batchStartCsn: batch.batchStartCsn, + batchStartCsn: batchStart.batchStartCsn, pendingBatchLength: pendingMessage.batchInfo.length, expectedPendingBatchLength, - batchLength: batch.messages.length, + batchLength, pendingBatchId, inboundBatchId, pendingMessageBatchMetadata: asBatchMetadata(pendingMessage.opMetadata)?.batch, diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 241d3bf4c367..6bf5ed91e19d 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -69,11 +69,12 @@ import { import { ContainerMessageType, type ContainerRuntimeGCMessage, + type InboundSequencedContainerRuntimeMessage, type OutboundContainerRuntimeMessage, type RecentlyAddedContainerRuntimeMessageDetails, type UnknownContainerRuntimeMessage, } from "../messageTypes.js"; -import type { BatchMessage, InboundBatch } from "../opLifecycle/index.js"; +import type { BatchMessage, InboundMessageResult } from "../opLifecycle/index.js"; import { IPendingLocalState, IPendingMessage, @@ -823,11 +824,12 @@ describe("Runtime", () => { return { replayPendingStates: () => {}, hasPendingMessages: (): boolean => pendingMessages > 0, - processMessage: (_message: ISequencedDocumentMessage, _local: boolean) => { - return { localAck: false, localOpMetadata: undefined }; - }, - processInboundBatch: (batch: InboundBatch, _local: boolean) => { - return batch.messages.map((message) => ({ + processInboundMessages: (inbound: InboundMessageResult, _local: boolean) => { + const messages = inbound.messages; + return messages.map<{ + message: InboundSequencedContainerRuntimeMessage; + localOpMetadata?: unknown; + }>((message) => ({ message, localOpMetadata: undefined, })); @@ -837,7 +839,7 @@ describe("Runtime", () => { }, onFlushBatch: (batch: BatchMessage[], _csn?: number) => (pendingMessages += batch.length), - } as unknown as PendingStateManager; + } satisfies Partial as any as PendingStateManager; }; const getMockChannelCollection = (): ChannelCollection => { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/duplicateBatchDetector.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/duplicateBatchDetector.spec.ts index 2fe4df988d00..9eaeabd5ab6f 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/duplicateBatchDetector.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/duplicateBatchDetector.spec.ts @@ -9,10 +9,10 @@ import { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/in // eslint-disable-next-line import/no-internal-modules import { DuplicateBatchDetector } from "../../opLifecycle/duplicateBatchDetector.js"; -import type { InboundBatch } from "../../opLifecycle/index.js"; +import type { BatchStartInfo } from "../../opLifecycle/index.js"; /** - * Helper function to create (enough of) an InboundBatch for testing. + * Helper function to create (enough of) a BatchStartInfo for testing. * Inbound batch may have explicit batchId, or merely clientId and batchStartCsn and batchId must be computed - allow either as inputs */ function makeBatch({ @@ -24,7 +24,7 @@ function makeBatch({ }: { sequenceNumber: number; minimumSequenceNumber: number } & ( | { batchId: string; clientId?: undefined; batchStartCsn?: undefined } | { batchId?: undefined; clientId: string; batchStartCsn: number } -)): InboundBatch { +)): BatchStartInfo { return { keyMessage: { sequenceNumber, @@ -33,7 +33,7 @@ function makeBatch({ batchId, clientId, batchStartCsn, - } satisfies Partial as InboundBatch; + } satisfies Partial as BatchStartInfo; } type Patch = Omit & U; diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts index 8c8ef8814dd9..e224d57e0d4a 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts @@ -14,12 +14,13 @@ import { import { MockLogger } from "@fluidframework/telemetry-utils/internal"; import { ContainerMessageType } from "../../index.js"; +import type { InboundSequencedContainerRuntimeMessage } from "../../messageTypes.js"; import { BatchManager, type BatchMessage, + type BatchStartInfo, ensureContentsDeserialized, type IBatch, - type InboundBatch, OpCompressor, OpDecompressor, OpGroupingManager, @@ -160,13 +161,10 @@ describe("RemoteMessageProcessor", () => { outboundMessages.push(...batch.messages); const messageProcessor = getMessageProcessor(); - let actualBatch: InboundBatch | undefined; + let batchStart: BatchStartInfo | undefined; + const inboundMessages: InboundSequencedContainerRuntimeMessage[] = []; let seqNum = 1; for (const message of outboundMessages) { - assert( - actualBatch === undefined, - "actualBatch only should be set when we're done looping", - ); // eslint-disable-next-line @typescript-eslint/consistent-type-assertions const inboundMessage = { type: MessageType.Operation, @@ -179,8 +177,16 @@ describe("RemoteMessageProcessor", () => { } as ISequencedDocumentMessage; ensureContentsDeserialized(inboundMessage, true, () => {}); - // actualBatch will remain undefined every time except the last time through the loop - actualBatch = messageProcessor.process(inboundMessage, () => {}); + const result = messageProcessor.process(inboundMessage, () => {}); + switch (result?.type) { + case "fullBatch": + batchStart = result.batchStart; + inboundMessages.push(...result.messages); + break; + default: + assert(result === undefined, "unexpected result type"); + break; + } } const expected = option.grouping @@ -199,9 +205,9 @@ describe("RemoteMessageProcessor", () => { getProcessedMessage("e", startSeqNum, startSeqNum, false), ]; - assert.deepStrictEqual(actualBatch?.messages, expected, "unexpected output"); + assert.deepStrictEqual(inboundMessages, expected, "unexpected output"); assert.equal( - actualBatch?.batchStartCsn, + batchStart?.batchStartCsn, leadingChunkCount + 1, "unexpected batchStartCsn", ); @@ -305,36 +311,52 @@ describe("RemoteMessageProcessor", () => { undefined, undefined, { + type: "fullBatch", messages: messagesA, - clientId: "CLIENT_ID", - batchId: undefined, - batchStartCsn: 1, - keyMessage: messagesA[0], + batchStart: { + clientId: "CLIENT_ID", + batchId: undefined, + batchStartCsn: 1, + keyMessage: messagesA[0], + }, + length: 3, }, // B { + type: "fullBatch", messages: messagesB, - clientId: "CLIENT_ID", - batchId: undefined, - batchStartCsn: 4, - keyMessage: messagesB[0], + batchStart: { + clientId: "CLIENT_ID", + batchId: undefined, + batchStartCsn: 4, + keyMessage: messagesB[0], + }, + length: 1, }, // C undefined, { + type: "fullBatch", messages: messagesC, - batchId: "C", - clientId: "CLIENT_ID", - batchStartCsn: 5, - keyMessage: messagesC[0], + batchStart: { + batchId: "C", + clientId: "CLIENT_ID", + batchStartCsn: 5, + keyMessage: messagesC[0], + }, + length: 2, }, // D { + type: "fullBatch", messages: messagesD, - clientId: "CLIENT_ID", - batchId: "D", - batchStartCsn: 7, - keyMessage: messagesD[0], + batchStart: { + clientId: "CLIENT_ID", + batchId: "D", + batchStartCsn: 7, + keyMessage: messagesD[0], + }, + length: 1, }, ]; @@ -429,13 +451,18 @@ describe("RemoteMessageProcessor", () => { }; const documentMessage = message as ISequencedDocumentMessage; ensureContentsDeserialized(documentMessage, true, () => {}); - const processResult = messageProcessor.process(documentMessage, () => {})?.messages ?? []; + const processResult = messageProcessor.process(documentMessage, () => {}); + assert.equal( + processResult?.type, + "fullBatch", + "Single message should yield a 'fullBatch' result", + ); assert.strictEqual(processResult.length, 1, "only expected a single processed message"); - const result = processResult[0]; + const [inboundMessage] = processResult.messages; - assert.deepStrictEqual(result.contents, contents.contents); - assert.deepStrictEqual(result.type, contents.type); + assert.deepStrictEqual(inboundMessage.contents, contents.contents); + assert.deepStrictEqual(inboundMessage.type, contents.type); }); it("Don't unpack non-datastore messages", () => { @@ -447,13 +474,18 @@ describe("RemoteMessageProcessor", () => { metadata: { meta: "data" }, }; const documentMessage = message as ISequencedDocumentMessage; - const processResult = messageProcessor.process(documentMessage, () => {})?.messages ?? []; + const processResult = messageProcessor.process(documentMessage, () => {}); + assert.equal( + processResult?.type, + "fullBatch", + "Single message should yield a 'fullBatch' result", + ); assert.strictEqual(processResult.length, 1, "only expected a single processed message"); - const result = processResult[0]; + const [inboundMessage] = processResult.messages; - assert.deepStrictEqual(result.contents, message.contents); - assert.deepStrictEqual(result.type, message.type); + assert.deepStrictEqual(inboundMessage.contents, message.contents); + assert.deepStrictEqual(inboundMessage.type, message.type); }); it("Processing groupedBatch works as expected", () => { @@ -490,7 +522,7 @@ describe("RemoteMessageProcessor", () => { }, }; const messageProcessor = getMessageProcessor(); - const inboundBatch = messageProcessor.process( + const processResult = messageProcessor.process( groupedBatch as ISequencedDocumentMessage, () => {}, ); @@ -520,13 +552,17 @@ describe("RemoteMessageProcessor", () => { }, ]; assert.deepStrictEqual( - inboundBatch, + processResult, { + type: "fullBatch", messages: expected, - batchStartCsn: 12, - clientId: "CLIENT_ID", - batchId: "BATCH_ID", - keyMessage: expected[0], + batchStart: { + batchStartCsn: 12, + clientId: "CLIENT_ID", + batchId: "BATCH_ID", + keyMessage: expected[0], + }, + length: 2, }, "unexpected processing of groupedBatch", ); @@ -554,11 +590,15 @@ describe("RemoteMessageProcessor", () => { assert.deepStrictEqual( processResult, { + type: "fullBatch", messages: [], - batchStartCsn: 8, - clientId: "CLIENT_ID", - batchId: "BATCH_ID", - keyMessage: groupedBatch, + batchStart: { + batchStartCsn: 8, + clientId: "CLIENT_ID", + batchId: "BATCH_ID", + keyMessage: groupedBatch, + }, + length: 0, }, "unexpected processing of empty groupedBatch", ); diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index 0bd1fa2ad4a6..9f1157437b31 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -163,15 +163,19 @@ describe("Pending State Manager", () => { emptyBatchSequenceNumber?: number, resubmittedBatchId?: string, ) => - pendingStateManager.processInboundBatch( + pendingStateManager.processInboundMessages( { + type: "fullBatch", messages: messages as InboundSequencedContainerRuntimeMessage[], - batchStartCsn, - keyMessage: { - sequenceNumber: emptyBatchSequenceNumber, - } satisfies Partial as ISequencedDocumentMessage, - clientId, - batchId: resubmittedBatchId, + batchStart: { + batchStartCsn, + keyMessage: { + sequenceNumber: emptyBatchSequenceNumber, + } satisfies Partial as ISequencedDocumentMessage, + clientId, + batchId: resubmittedBatchId, + }, + length: messages.length, }, true /* local */, ); @@ -556,13 +560,17 @@ describe("Pending State Manager", () => { ); const inboundMessage = futureRuntimeMessage as ISequencedDocumentMessage & UnknownContainerRuntimeMessage; - pendingStateManager.processInboundBatch( + pendingStateManager.processInboundMessages( { + type: "fullBatch", messages: [inboundMessage], - batchStartCsn: 1 /* batchStartCsn */, - batchId: undefined, - clientId: "clientId", - keyMessage: inboundMessage, + batchStart: { + batchStartCsn: 1 /* batchStartCsn */, + batchId: undefined, + clientId: "clientId", + keyMessage: inboundMessage, + }, + length: 1, }, true /* local */, );