diff --git a/.changeset/metal-hornets-travel.md b/.changeset/metal-hornets-travel.md new file mode 100644 index 000000000000..9afb0ade5f96 --- /dev/null +++ b/.changeset/metal-hornets-travel.md @@ -0,0 +1,16 @@ +--- +"@fluidframework/container-runtime": minor +--- +--- +"section": "fix" +--- +Restored old op processing behavior around batched ops to avoid potential regression + +There's a theoretical risk of indeterminate behavior due to a recent change to how batches of ops are processed. +This fix reverses that change. + +Pull Request #21785 updated the ContainerRuntime to hold onto the messages in an incoming batch until they've all arrived, and only then process the set of messages. + +While the batch is being processed, the DeltaManager and ContainerRuntime's view of the latest sequence numbers will be +out of sync. This may have unintended side effects, so out of an abundance of caution we're reversing this behavior until +we can add the proper protections to ensure the system stays properly in sync. diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index b337e78b741c..7b0aceda7c76 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -171,6 +171,7 @@ import { IBatchMetadata, ISavedOpMetadata } from "./metadata.js"; import { BatchId, BatchMessage, + BatchStartInfo, ensureContentsDeserialized, IBatch, IBatchCheckpoint, @@ -180,7 +181,6 @@ import { OpSplitter, Outbox, RemoteMessageProcessor, - type InboundBatch, } from "./opLifecycle/index.js"; import { pkgVersion } from "./packageVersion.js"; import { @@ -2666,18 +2666,24 @@ export class ContainerRuntime if (hasModernRuntimeMessageEnvelope) { // If the message has the modern message envelope, then process it here. // Here we unpack the message (decompress, unchunk, and/or ungroup) into a batch of messages with ContainerMessageType - const inboundBatch = this.remoteMessageProcessor.process(messageCopy, logLegacyCase); - if (inboundBatch === undefined) { + const inboundResult = this.remoteMessageProcessor.process(messageCopy, logLegacyCase); + if (inboundResult === undefined) { // This means the incoming message is an incomplete part of a message or batch // and we need to process more messages before the rest of the system can understand it. return; } // Reach out to PendingStateManager to zip localOpMetadata into the message list if it's a local batch - const messagesWithPendingState = this.pendingStateManager.processInboundBatch( - inboundBatch, + const messagesWithPendingState = this.pendingStateManager.processInboundMessages( + inboundResult, local, ); + if (inboundResult.type !== "fullBatch") { + assert( + messagesWithPendingState.length === 1, + "Partial batch should have exactly one message", + ); + } if (messagesWithPendingState.length > 0) { messagesWithPendingState.forEach(({ message, localOpMetadata }) => { const msg: MessageWithContext = { @@ -2690,7 +2696,13 @@ export class ContainerRuntime this.ensureNoDataModelChanges(() => this.processRuntimeMessage(msg)); }); } else { - this.ensureNoDataModelChanges(() => this.processEmptyBatch(inboundBatch, local)); + assert( + inboundResult.type === "fullBatch", + "Empty batch is always considered a full batch", + ); + this.ensureNoDataModelChanges(() => + this.processEmptyBatch(inboundResult.batchStart, local), + ); } } else { // Check if message.type is one of values in ContainerMessageType @@ -2781,19 +2793,27 @@ export class ContainerRuntime } /** - * Process an empty batch, which will execute expected actions while processing even if there are no messages. - * This is a separate function because the processCore function expects at least one message to process. - * It is expected to happen only when the outbox produces an empty batch due to a resubmit flow. + * Process an empty batch, which will execute expected actions while processing even if there are no inner runtime messages. + * + * @remarks - Empty batches are produced by the outbox on resubmit when the resubmit flow resulted in no runtime messages. + * This can happen if changes from a remote client "cancel out" the pending changes being resubmited by this client. + * We submit an empty batch if "offline load" (aka rehydrating from stashed state) is enabled, + * to ensure we account for this batch when comparing batchIds, checking for a forked container. + * Otherwise, we would not realize this container has forked in the case where it did fork, and a batch became empty but wasn't submitted as such. */ - private processEmptyBatch(emptyBatch: InboundBatch, local: boolean) { - const { emptyBatchSequenceNumber: sequenceNumber, batchStartCsn } = emptyBatch; - assert(sequenceNumber !== undefined, 0x9fa /* emptyBatchSequenceNumber must be defined */); - this.emit("batchBegin", { sequenceNumber }); + private processEmptyBatch(emptyBatch: BatchStartInfo, local: boolean) { + const { keyMessage, batchStartCsn } = emptyBatch; + this.scheduleManager.beforeOpProcessing(keyMessage); + this._processedClientSequenceNumber = batchStartCsn; if (!this.hasPendingMessages()) { this.updateDocumentDirtyState(false); } - this.emit("batchEnd", undefined, { sequenceNumber }); + + // We emit this event but say isRuntimeMessage is false, because there are no actual runtime messages here being processed. + // But someone listening to this event expecting to be notified whenever a message arrives would want to know about this. + this.emit("op", keyMessage, false /* isRuntimeMessage */); + this.scheduleManager.afterOpProcessing(undefined /* error */, keyMessage); if (local) { this.resetReconnectCount(); } diff --git a/packages/runtime/container-runtime/src/opLifecycle/index.ts b/packages/runtime/container-runtime/src/opLifecycle/index.ts index 9edb01bfcbeb..df55eeade131 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/index.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/index.ts @@ -18,7 +18,8 @@ export { OpDecompressor } from "./opDecompressor.js"; export { OpSplitter, splitOp, isChunkedMessage } from "./opSplitter.js"; export { ensureContentsDeserialized, - InboundBatch, + InboundMessageResult, + BatchStartInfo, RemoteMessageProcessor, unpackRuntimeMessage, } from "./remoteMessageProcessor.js"; diff --git a/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts b/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts index 99bd163aa7cc..49f10bf2d59f 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts @@ -21,26 +21,56 @@ import { OpDecompressor } from "./opDecompressor.js"; import { OpGroupingManager, isGroupedBatch } from "./opGroupingManager.js"; import { OpSplitter, isChunkedMessage } from "./opSplitter.js"; -/** Messages being received as a batch, with details needed to process the batch */ -export interface InboundBatch { - /** Messages in this batch */ - readonly messages: InboundSequencedContainerRuntimeMessage[]; +/** Info about the batch we learn when we process the first message */ +export interface BatchStartInfo { /** Batch ID, if present */ readonly batchId: string | undefined; /** clientId that sent this batch. Used to compute Batch ID if needed */ readonly clientId: string; /** - * Client Sequence Number of the first message in the batch. + * Client Sequence Number of the Grouped Batch message, or the first message in the ungrouped batch. * Used to compute Batch ID if needed * * @remarks For chunked batches, this is the CSN of the "representative" chunk (the final chunk). * For grouped batches, clientSequenceNumber on messages is overwritten, so we track this original value here. */ readonly batchStartCsn: number; - /** For an empty batch (with no messages), we need to remember the empty grouped batch's sequence number */ - readonly emptyBatchSequenceNumber?: number; + /** + * The first message in the batch, or if the batch is empty, the empty grouped batch message + * Used for accessing the sequence numbers for the (start of the) batch. + * + * @remarks Do not use clientSequenceNumber here, use batchStartCsn instead. + */ + readonly keyMessage: ISequencedDocumentMessage; } +/** + * Result of processing the next inbound message. + * Depending on the message and configuration of RemoteMessageProcessor, the result may be: + * - A full batch of messages (including a single-message batch) + * - The first message of a multi-message batch + * - The next message in a multi-message batch + */ +export type InboundMessageResult = + | { + type: "fullBatch"; + messages: InboundSequencedContainerRuntimeMessage[]; + batchStart: BatchStartInfo; + length: number; + } + | { + type: "batchStartingMessage"; + batchStart: BatchStartInfo; + nextMessage: InboundSequencedContainerRuntimeMessage; + length?: never; + } + | { + type: "nextBatchMessage"; + batchEnd?: boolean; + nextMessage: InboundSequencedContainerRuntimeMessage; + length?: never; + }; + function assertHasClientId( message: ISequencedDocumentMessage, ): asserts message is ISequencedDocumentMessage & { clientId: string } { @@ -57,12 +87,7 @@ function assertHasClientId( * @internal */ export class RemoteMessageProcessor { - /** - * The current batch being received, with details needed to process it. - * - * @remarks If undefined, we are expecting the next message to start a new batch. - */ - private batchInProgress: InboundBatch | undefined; + private batchInProgress: boolean = false; constructor( private readonly opSplitter: OpSplitter, @@ -100,7 +125,7 @@ export class RemoteMessageProcessor { public process( remoteMessageCopy: ISequencedDocumentMessage, logLegacyCase: (codePath: string) => void, - ): InboundBatch | undefined { + ): InboundMessageResult | undefined { let message = remoteMessageCopy; assertHasClientId(message); @@ -129,80 +154,84 @@ export class RemoteMessageProcessor { } if (isGroupedBatch(message)) { - // We should be awaiting a new batch (batchInProgress undefined) - assert( - this.batchInProgress === undefined, - 0x9d3 /* Grouped batch interrupting another batch */, - ); + // We should be awaiting a new batch (batchInProgress false) + assert(!this.batchInProgress, 0x9d3 /* Grouped batch interrupting another batch */); const batchId = asBatchMetadata(message.metadata)?.batchId; const groupedMessages = this.opGroupingManager.ungroupOp(message).map(unpack); + return { + type: "fullBatch", messages: groupedMessages, // Will be [] for an empty batch - batchStartCsn: message.clientSequenceNumber, - clientId, - batchId, - // If the batch is empty, we need to return the sequence number aside - emptyBatchSequenceNumber: - groupedMessages.length === 0 ? message.sequenceNumber : undefined, + batchStart: { + batchStartCsn: message.clientSequenceNumber, + clientId, + batchId, + keyMessage: groupedMessages[0] ?? message, // For an empty batch, this is the empty grouped batch message. Needed for sequence numbers for this batch + }, + length: groupedMessages.length, // Will be 0 for an empty batch }; } // Do a final unpack of runtime messages in case the message was not grouped, compressed, or chunked unpackRuntimeMessage(message, logLegacyCase); - const { batchEnded } = this.addMessageToBatch( + return this.getResultBasedOnBatchMetadata( message as InboundSequencedContainerRuntimeMessage & { clientId: string }, ); - - if (!batchEnded) { - // batch not yet complete - return undefined; - } - - const completedBatch = this.batchInProgress; - this.batchInProgress = undefined; - return completedBatch; } /** - * Add the given message to the current batch, and indicate whether the batch is now complete. - * - * @returns batchEnded: true if the batch is now complete, batchEnded: false if more messages are expected + * Now that the message has been "unwrapped" as to any virtualization (grouping, compression, chunking), + * inspect the batch metadata flag and determine what kind of result to return. */ - private addMessageToBatch( + private getResultBasedOnBatchMetadata( message: InboundSequencedContainerRuntimeMessage & { clientId: string }, - ): { batchEnded: boolean } { + ): InboundMessageResult { const batchMetadataFlag = asBatchMetadata(message.metadata)?.batch; - if (this.batchInProgress === undefined) { + if (!this.batchInProgress) { // We are waiting for a new batch assert(batchMetadataFlag !== false, 0x9d5 /* Unexpected batch end marker */); // Start of a new multi-message batch if (batchMetadataFlag === true) { - this.batchInProgress = { - messages: [message], - batchId: asBatchMetadata(message.metadata)?.batchId, - clientId: message.clientId, - batchStartCsn: message.clientSequenceNumber, + this.batchInProgress = true; + return { + type: "batchStartingMessage", + batchStart: { + batchId: asBatchMetadata(message.metadata)?.batchId, + clientId: message.clientId, + batchStartCsn: message.clientSequenceNumber, + keyMessage: message, + }, + nextMessage: message, }; - - return { batchEnded: false }; } // Single-message batch (Since metadata flag is undefined) - this.batchInProgress = { + return { + type: "fullBatch", messages: [message], - batchStartCsn: message.clientSequenceNumber, - clientId: message.clientId, - batchId: asBatchMetadata(message.metadata)?.batchId, + batchStart: { + batchStartCsn: message.clientSequenceNumber, + clientId: message.clientId, + batchId: asBatchMetadata(message.metadata)?.batchId, + keyMessage: message, + }, + length: 1, }; - return { batchEnded: true }; } assert(batchMetadataFlag !== true, 0x9d6 /* Unexpected batch start marker */); - this.batchInProgress.messages.push(message); + // Clear batchInProgress state if the batch is ending + if (batchMetadataFlag === false) { + this.batchInProgress = false; + } - return { batchEnded: batchMetadataFlag === false }; + return { + type: "nextBatchMessage", + nextMessage: message, + batchEnd: batchMetadataFlag === false, + }; } } diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index cfb6918e9740..5dc096037cd2 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -20,7 +20,13 @@ import { type LocalContainerRuntimeMessage, } from "./messageTypes.js"; import { asBatchMetadata, asEmptyBatchLocalOpMetadata } from "./metadata.js"; -import { BatchId, BatchMessage, generateBatchId, InboundBatch } from "./opLifecycle/index.js"; +import { + BatchId, + BatchMessage, + BatchStartInfo, + generateBatchId, + InboundMessageResult, +} from "./opLifecycle/index.js"; /** * This represents a message that has been submitted and is added to the pending queue when `submit` is called on the @@ -326,49 +332,50 @@ export class PendingStateManager implements IDisposable { } /** - * Processes an inbound batch of messages - May be local or remote. + * Processes an inbound message or batch of messages - May be local or remote. * - * @param batch - The inbound batch of messages to process. Could be local or remote. - * @param local - true if we submitted this batch and expect corresponding pending messages - * @returns The inbound batch's messages with localOpMetadata "zipped" in. + * @param inbound - The inbound message(s) to process, with extra info (e.g. about the start of a batch). Could be local or remote. + * @param local - true if we submitted these messages and expect corresponding pending messages + * @returns The inbound messages with localOpMetadata "zipped" in. * - * @remarks Closes the container if: - * - The batchStartCsn doesn't match for local batches + * @throws a DataProcessingError if the pending message content doesn't match the incoming message content (for local messages) */ - public processInboundBatch( - batch: InboundBatch, + public processInboundMessages( + inbound: InboundMessageResult, local: boolean, ): { message: InboundSequencedContainerRuntimeMessage; localOpMetadata?: unknown; }[] { if (local) { - return this.processPendingLocalBatch(batch); + return this.processPendingLocalMessages(inbound); } // No localOpMetadata for remote messages - return batch.messages.map((message) => ({ message })); + const messages = inbound.type === "fullBatch" ? inbound.messages : [inbound.nextMessage]; + return messages.map((message) => ({ message })); } /** - * Processes the incoming batch from the server that was submitted by this client. - * It verifies that messages are received in the right order and that the batch information is correct. - * @param batch - The inbound batch (originating from this client) to correlate with the pending local state - * @returns The inbound batch's messages with localOpMetadata "zipped" in. + * Processes the incoming message(s) from the server that were submitted by this client. + * It verifies that messages are received in the right order and that any batch information is correct. + * @param inbound - The inbound message(s) (originating from this client) to correlate with the pending local state + * @throws DataProcessingError if the pending message content doesn't match the incoming message content for any message here + * @returns The inbound messages with localOpMetadata "zipped" in. */ - private processPendingLocalBatch(batch: InboundBatch): { + private processPendingLocalMessages(inbound: InboundMessageResult): { message: InboundSequencedContainerRuntimeMessage; localOpMetadata: unknown; }[] { - this.onLocalBatchBegin(batch); + if ("batchStart" in inbound) { + this.onLocalBatchBegin(inbound.batchStart, inbound.length); + } // Empty batch - if (batch.messages.length === 0) { - assert( - batch.emptyBatchSequenceNumber !== undefined, - 0x9fb /* Expected sequence number for empty batch */, + if (inbound.length === 0) { + const localOpMetadata = this.processNextPendingMessage( + inbound.batchStart.keyMessage.sequenceNumber, ); - const localOpMetadata = this.processNextPendingMessage(batch.emptyBatchSequenceNumber); assert( asEmptyBatchLocalOpMetadata(localOpMetadata)?.emptyBatch === true, 0xa20 /* Expected empty batch marker */, @@ -376,7 +383,9 @@ export class PendingStateManager implements IDisposable { return []; } - return batch.messages.map((message) => ({ + const messages = inbound.type === "fullBatch" ? inbound.messages : [inbound.nextMessage]; + + return messages.map((message) => ({ message, localOpMetadata: this.processNextPendingMessage(message.sequenceNumber, message), })); @@ -448,7 +457,7 @@ export class PendingStateManager implements IDisposable { /** * Check if the incoming batch matches the batch info for the next pending message. */ - private onLocalBatchBegin(batch: InboundBatch) { + private onLocalBatchBegin(batchStart: BatchStartInfo, batchLength?: number) { // Get the next message from the pending queue. Verify a message exists. const pendingMessage = this.pendingMessages.peekFront(); assert( @@ -460,25 +469,27 @@ export class PendingStateManager implements IDisposable { // In this case the next pending message is an empty batch marker. // Empty batches became empty on Resubmit, and submit them and track them in case // a different fork of this container also submitted the same batch (and it may not be empty for that fork). - const firstMessage = batch.messages.length > 0 ? batch.messages[0] : undefined; - const expectedPendingBatchLength = batch.messages.length === 0 ? 1 : batch.messages.length; + const firstMessage = batchStart.keyMessage; + // -1 length is for back compat, undefined length means we actually don't know it + const skipLengthCheck = + pendingMessage.batchInfo.length === -1 || batchLength === undefined; + const expectedPendingBatchLength = batchLength === 0 ? 1 : batchLength; // We expect the incoming batch to be of the same length, starting at the same clientSequenceNumber, // as the batch we originally submitted. // We have another later check to compare the message contents, which we'd expect to fail if this check does, // so we don't throw here, merely log. In a later release this check may replace that one. if ( - pendingMessage.batchInfo.batchStartCsn !== batch.batchStartCsn || - (pendingMessage.batchInfo.length >= 0 && // -1 length is back compat and isn't suitable for this check - pendingMessage.batchInfo.length !== expectedPendingBatchLength) + pendingMessage.batchInfo.batchStartCsn !== batchStart.batchStartCsn || + (!skipLengthCheck && pendingMessage.batchInfo.length !== expectedPendingBatchLength) ) { this.logger?.sendErrorEvent({ eventName: "BatchInfoMismatch", details: { pendingBatchCsn: pendingMessage.batchInfo.batchStartCsn, - batchStartCsn: batch.batchStartCsn, + batchStartCsn: batchStart.batchStartCsn, pendingBatchLength: pendingMessage.batchInfo.length, - batchLength: batch.messages.length, + batchLength, pendingMessageBatchMetadata: asBatchMetadata(pendingMessage.opMetadata)?.batch, messageBatchMetadata: asBatchMetadata(firstMessage?.metadata)?.batch, }, diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 863873cffcf5..f1053112a162 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -68,11 +68,12 @@ import { import { ContainerMessageType, type ContainerRuntimeGCMessage, + type InboundSequencedContainerRuntimeMessage, type OutboundContainerRuntimeMessage, type RecentlyAddedContainerRuntimeMessageDetails, type UnknownContainerRuntimeMessage, } from "../messageTypes.js"; -import type { BatchMessage, InboundBatch } from "../opLifecycle/index.js"; +import type { BatchMessage, InboundMessageResult } from "../opLifecycle/index.js"; import { IPendingLocalState, IPendingMessage, @@ -787,11 +788,13 @@ describe("Runtime", () => { return { replayPendingStates: () => {}, hasPendingMessages: (): boolean => pendingMessages > 0, - processMessage: (_message: ISequencedDocumentMessage, _local: boolean) => { - return { localAck: false, localOpMetadata: undefined }; - }, - processInboundBatch: (batch: InboundBatch, _local: boolean) => { - return batch.messages.map((message) => ({ + processInboundMessages: (inbound: InboundMessageResult, _local: boolean) => { + const messages = + inbound.type === "fullBatch" ? inbound.messages : [inbound.nextMessage]; + return messages.map<{ + message: InboundSequencedContainerRuntimeMessage; + localOpMetadata?: unknown; + }>((message) => ({ message, localOpMetadata: undefined, })); @@ -801,7 +804,7 @@ describe("Runtime", () => { }, onFlushBatch: (batch: BatchMessage[], _csn?: number) => (pendingMessages += batch.length), - } as unknown as PendingStateManager; + } satisfies Partial as unknown as PendingStateManager; }; const getMockChannelCollection = (): ChannelCollection => { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts index 6101f30764f9..5a25d3f39684 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts @@ -14,12 +14,14 @@ import { import { MockLogger } from "@fluidframework/telemetry-utils/internal"; import { ContainerMessageType } from "../../index.js"; +import type { InboundSequencedContainerRuntimeMessage } from "../../messageTypes.js"; import { BatchManager, type BatchMessage, + type BatchStartInfo, ensureContentsDeserialized, type IBatch, - type InboundBatch, + type InboundMessageResult, OpCompressor, OpDecompressor, OpGroupingManager, @@ -160,13 +162,10 @@ describe("RemoteMessageProcessor", () => { outboundMessages.push(...batch.messages); const messageProcessor = getMessageProcessor(); - let actualBatch: InboundBatch | undefined; + let batchStart: BatchStartInfo | undefined; + const inboundMessages: InboundSequencedContainerRuntimeMessage[] = []; let seqNum = 1; for (const message of outboundMessages) { - assert( - actualBatch === undefined, - "actualBatch only should be set when we're done looping", - ); // eslint-disable-next-line @typescript-eslint/consistent-type-assertions const inboundMessage = { type: MessageType.Operation, @@ -179,8 +178,36 @@ describe("RemoteMessageProcessor", () => { } as ISequencedDocumentMessage; ensureContentsDeserialized(inboundMessage, true, () => {}); - // actualBatch will remain undefined every time except the last time through the loop - actualBatch = messageProcessor.process(inboundMessage, () => {}); + const result = messageProcessor.process(inboundMessage, () => {}); + switch (result?.type) { + case "fullBatch": + assert( + option.compressionAndChunking.chunking || outboundMessages.length === 1, + "Apart from chunking, expected fullBatch for single-message batch only (includes Grouped Batches)", + ); + batchStart = result.batchStart; + inboundMessages.push(...result.messages); + break; + case "batchStartingMessage": + batchStart = result.batchStart; + inboundMessages.push(result.nextMessage); + break; + case "nextBatchMessage": + assert( + batchStart !== undefined, + "batchStart should have been set from a prior message", + ); + inboundMessages.push(result.nextMessage); + break; + default: + // These are leading chunks + assert(result === undefined, "unexpected result type"); + assert( + option.compressionAndChunking.chunking, + "undefined result only expected with chunking", + ); + break; + } } const expected = option.grouping @@ -199,17 +226,19 @@ describe("RemoteMessageProcessor", () => { getProcessedMessage("e", startSeqNum, startSeqNum, false), ]; - assert.deepStrictEqual(actualBatch?.messages, expected, "unexpected output"); + assert.deepStrictEqual(inboundMessages, expected, "unexpected output"); assert.equal( - actualBatch?.batchStartCsn, + batchStart?.batchStartCsn, leadingChunkCount + 1, "unexpected batchStartCsn", ); }); }); - it("Processes multiple batches", () => { + it("Processes multiple batches (No Grouped Batching)", () => { let csn = 1; + + // Use BatchManager.popBatch to get the right batch metadata included const batchManager = new BatchManager({ canRebase: false, hardLimit: Number.MAX_VALUE, @@ -244,92 +273,130 @@ describe("RemoteMessageProcessor", () => { processor.process(message, () => {}), ); - const expectedResults = [ + // Expected results + const messagesA = [ + { + "contents": "A1", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 1, + "metadata": { "batch": true }, + "clientId": "CLIENT_ID", + }, + { + "contents": "A2", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 2, + "clientId": "CLIENT_ID", + }, + { + "contents": "A3", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 3, + "metadata": { "batch": false }, + "clientId": "CLIENT_ID", + }, + ]; + const messagesB = [ + { + "contents": "B1", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 4, + "clientId": "CLIENT_ID", + }, + ]; + const messagesC = [ + { + "contents": "C1", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 5, + "metadata": { "batch": true, "batchId": "C" }, + "clientId": "CLIENT_ID", + }, + { + "contents": "C2", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 6, + "metadata": { "batch": false }, + "clientId": "CLIENT_ID", + }, + ]; + const messagesD = [ + { + "contents": "D1", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 7, + "metadata": { "batchId": "D" }, + "clientId": "CLIENT_ID", + }, + ]; + const expectedInfo: Partial[] = [ // A - undefined, - undefined, { - messages: [ - { - "contents": "A1", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 1, - "metadata": { "batch": true }, - "clientId": "CLIENT_ID", - }, - { - "contents": "A2", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 2, - "clientId": "CLIENT_ID", - }, - { - "contents": "A3", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 3, - "metadata": { "batch": false }, - "clientId": "CLIENT_ID", - }, - ], - clientId: "CLIENT_ID", - batchId: undefined, - batchStartCsn: 1, + type: "batchStartingMessage", + batchStart: { + batchId: undefined, + clientId: "CLIENT_ID", + keyMessage: messagesA[0] as ISequencedDocumentMessage, + batchStartCsn: 1, + }, }, + { type: "nextBatchMessage", batchEnd: false }, + { type: "nextBatchMessage", batchEnd: true }, // B { - messages: [ - { - "contents": "B1", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 4, - "clientId": "CLIENT_ID", - }, - ], - clientId: "CLIENT_ID", - batchId: undefined, - batchStartCsn: 4, + type: "fullBatch", + batchStart: { + clientId: "CLIENT_ID", + batchId: undefined, + batchStartCsn: 4, + keyMessage: messagesB[0] as ISequencedDocumentMessage, + }, + length: 1, }, // C - undefined, { - messages: [ - { - "contents": "C1", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 5, - "metadata": { "batch": true, "batchId": "C" }, - "clientId": "CLIENT_ID", - }, - { - "contents": "C2", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 6, - "metadata": { "batch": false }, - "clientId": "CLIENT_ID", - }, - ], - batchId: "C", - clientId: "CLIENT_ID", - batchStartCsn: 5, + type: "batchStartingMessage", + batchStart: { + batchId: "C", + clientId: "CLIENT_ID", + batchStartCsn: 5, + keyMessage: messagesC[0] as ISequencedDocumentMessage, + }, }, + { type: "nextBatchMessage", batchEnd: true }, // D { - messages: [ - { - "contents": "D1", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 7, - "metadata": { "batchId": "D" }, - "clientId": "CLIENT_ID", - }, - ], - clientId: "CLIENT_ID", - batchId: "D", - batchStartCsn: 7, + type: "fullBatch", + batchStart: { + clientId: "CLIENT_ID", + batchId: "D", + batchStartCsn: 7, + keyMessage: messagesD[0] as ISequencedDocumentMessage, + }, + length: 1, }, ]; + const expectedMessages = [...messagesA, ...messagesB, ...messagesC, ...messagesD]; - assert.deepStrictEqual(processResults, expectedResults, "unexpected output from process"); + assert.deepStrictEqual( + processResults.flatMap((result) => + result?.type === "fullBatch" ? [...result.messages] : [result?.nextMessage], + ), + expectedMessages, + "unexpected output from process", + ); + + // We checked messages in the previous assert, now clear them since they're not included in expectedInfo + const clearMessages = (result: any) => { + delete result.messages; + delete result.nextMessage; + return result as InboundMessageResult; + }; + assert.deepStrictEqual( + processResults.map(clearMessages), + expectedInfo, + "unexpected result info", + ); }); describe("Throws on invalid batches", () => { @@ -420,13 +487,18 @@ describe("RemoteMessageProcessor", () => { }; const documentMessage = message as ISequencedDocumentMessage; ensureContentsDeserialized(documentMessage, true, () => {}); - const processResult = messageProcessor.process(documentMessage, () => {})?.messages ?? []; + const processResult = messageProcessor.process(documentMessage, () => {}); + assert.equal( + processResult?.type, + "fullBatch", + "Single message should yield a 'fullBatch' result", + ); assert.strictEqual(processResult.length, 1, "only expected a single processed message"); - const result = processResult[0]; + const [inboundMessage] = processResult.messages; - assert.deepStrictEqual(result.contents, contents.contents); - assert.deepStrictEqual(result.type, contents.type); + assert.deepStrictEqual(inboundMessage.contents, contents.contents); + assert.deepStrictEqual(inboundMessage.type, contents.type); }); it("Don't unpack non-datastore messages", () => { @@ -438,13 +510,18 @@ describe("RemoteMessageProcessor", () => { metadata: { meta: "data" }, }; const documentMessage = message as ISequencedDocumentMessage; - const processResult = messageProcessor.process(documentMessage, () => {})?.messages ?? []; + const processResult = messageProcessor.process(documentMessage, () => {}); + assert.equal( + processResult?.type, + "fullBatch", + "Single message should yield a 'fullBatch' result", + ); assert.strictEqual(processResult.length, 1, "only expected a single processed message"); - const result = processResult[0]; + const [inboundMessage] = processResult.messages; - assert.deepStrictEqual(result.contents, message.contents); - assert.deepStrictEqual(result.type, message.type); + assert.deepStrictEqual(inboundMessage.contents, message.contents); + assert.deepStrictEqual(inboundMessage.type, message.type); }); it("Processing groupedBatch works as expected", () => { @@ -481,7 +558,7 @@ describe("RemoteMessageProcessor", () => { }, }; const messageProcessor = getMessageProcessor(); - const inboundBatch = messageProcessor.process( + const processResult = messageProcessor.process( groupedBatch as ISequencedDocumentMessage, () => {}, ); @@ -511,13 +588,17 @@ describe("RemoteMessageProcessor", () => { }, ]; assert.deepStrictEqual( - inboundBatch, + processResult, { + type: "fullBatch", messages: expected, - batchStartCsn: 12, - clientId: "CLIENT_ID", - batchId: "BATCH_ID", - emptyBatchSequenceNumber: undefined, + batchStart: { + batchStartCsn: 12, + clientId: "CLIENT_ID", + batchId: "BATCH_ID", + keyMessage: expected[0], + }, + length: 2, }, "unexpected processing of groupedBatch", ); @@ -545,11 +626,15 @@ describe("RemoteMessageProcessor", () => { assert.deepStrictEqual( processResult, { + type: "fullBatch", messages: [], - batchStartCsn: 8, - clientId: "CLIENT_ID", - batchId: "BATCH_ID", - emptyBatchSequenceNumber: 10, + batchStart: { + batchStartCsn: 8, + clientId: "CLIENT_ID", + batchId: "BATCH_ID", + keyMessage: groupedBatch, + }, + length: 0, }, "unexpected processing of empty groupedBatch", ); diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index d81c39420410..c3987d263260 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -18,7 +18,12 @@ import type { RecentlyAddedContainerRuntimeMessageDetails, UnknownContainerRuntimeMessage, } from "../messageTypes.js"; -import { BatchManager, BatchMessage, generateBatchId } from "../opLifecycle/index.js"; +import { + BatchManager, + BatchMessage, + generateBatchId, + type InboundMessageResult, +} from "../opLifecycle/index.js"; import { IPendingMessage, PendingStateManager } from "../pendingStateManager.js"; type PendingStateManager_WithPrivates = Omit & { @@ -157,23 +162,57 @@ describe("Pending State Manager", () => { ); }; - const process = ( + const processFullBatch = ( messages: Partial[], batchStartCsn: number, emptyBatchSequenceNumber?: number, ) => - pendingStateManager.processInboundBatch( + pendingStateManager.processInboundMessages( { + type: "fullBatch", messages: messages as InboundSequencedContainerRuntimeMessage[], - batchStartCsn, - emptyBatchSequenceNumber, - clientId, - batchId: generateBatchId(clientId, batchStartCsn), + batchStart: { + batchStartCsn, + keyMessage: { + sequenceNumber: emptyBatchSequenceNumber, + } satisfies Partial as ISequencedDocumentMessage, + clientId, + batchId: generateBatchId(clientId, batchStartCsn), + }, + length: messages.length, }, true /* local */, ); - it("proper batch is processed correctly", () => { + it("Grouped batch is processed correctly", () => { + const messages: Partial[] = [ + { + clientId, + type: MessageType.Operation, + clientSequenceNumber: 0, + referenceSequenceNumber: 0, + metadata: { batch: true }, + }, + { + clientId, + type: MessageType.Operation, + clientSequenceNumber: 1, + referenceSequenceNumber: 0, + }, + { + clientId, + type: MessageType.Operation, + metadata: { batch: false }, + clientSequenceNumber: 2, + referenceSequenceNumber: 0, + }, + ]; + + submitBatch(messages); + processFullBatch(messages, 0 /* batchStartCsn */); + }); + + it("Ungrouped batch is processed correctly", () => { const messages: Partial[] = [ { clientId, @@ -198,7 +237,34 @@ describe("Pending State Manager", () => { ]; submitBatch(messages); - process(messages, 0 /* batchStartCsn */); + pendingStateManager.processInboundMessages( + { + type: "batchStartingMessage", + nextMessage: messages[0] as InboundSequencedContainerRuntimeMessage, + batchStart: { + batchStartCsn: 0, + keyMessage: messages[0] as ISequencedDocumentMessage, + clientId, + batchId: undefined, + }, + } satisfies InboundMessageResult, + true /* local */, + ); + pendingStateManager.processInboundMessages( + { + type: "nextBatchMessage", + nextMessage: messages[1] as InboundSequencedContainerRuntimeMessage, + } satisfies InboundMessageResult, + true /* local */, + ); + pendingStateManager.processInboundMessages( + { + type: "nextBatchMessage", + nextMessage: messages[2] as InboundSequencedContainerRuntimeMessage, + batchEnd: true, + } satisfies InboundMessageResult, + true /* local */, + ); }); it("empty batch is processed correctly", () => { @@ -218,7 +284,7 @@ describe("Pending State Manager", () => { // A groupedBatch is supposed to have nested messages inside its contents, // but an empty batch has no nested messages. When processing en empty grouped batch, // the psm will expect the next pending message to be an "empty" message as portrayed above. - process([], 1 /* batchStartCsn */, 3 /* emptyBatchSequenceNumber */); + processFullBatch([], 1 /* batchStartCsn */, 3 /* emptyBatchSequenceNumber */); }); describe("processing out of sync messages will throw and log", () => { @@ -235,7 +301,7 @@ describe("Pending State Manager", () => { submitBatch(messages); assert.throws( () => - process( + processFullBatch( messages.map((message) => ({ ...message, type: "otherType", @@ -273,7 +339,7 @@ describe("Pending State Manager", () => { submitBatch(messages); assert.throws( () => - process( + processFullBatch( messages.map((message) => ({ ...message, contents: undefined, @@ -311,7 +377,7 @@ describe("Pending State Manager", () => { submitBatch(messages); assert.throws( () => - process( + processFullBatch( messages.map((message) => ({ ...message, contents: { prop1: true }, @@ -359,7 +425,7 @@ describe("Pending State Manager", () => { ); assert.throws( - () => process([message], 0 /* batchStartCsn */), + () => processFullBatch([message], 0 /* batchStartCsn */), (closeError: any) => closeError.errorType === ContainerErrorTypes.dataProcessingError, ); @@ -403,7 +469,7 @@ describe("Pending State Manager", () => { ); assert.throws( - () => process([message], 0 /* batchStartCsn */), + () => processFullBatch([message], 0 /* batchStartCsn */), (closeError: any) => closeError.errorType === ContainerErrorTypes.dataProcessingError, ); mockLogger.assertMatch( @@ -436,7 +502,7 @@ describe("Pending State Manager", () => { ]; submitBatch(messages); - process( + processFullBatch( messages.map((message) => ({ ...message, contents: { prop1: true }, @@ -455,7 +521,7 @@ describe("Pending State Manager", () => { sequenceNumber: i + 1, // starting with sequence number 1 so first assert does not filter any op })); submitBatch(messages); - process(messages, 0 /* batchStartCsn */); + processFullBatch(messages, 0 /* batchStartCsn */); let pendingState = pendingStateManager.getLocalState(0).pendingStates; assert.strictEqual(pendingState.length, 10); pendingState = pendingStateManager.getLocalState(5).pendingStates; @@ -546,15 +612,19 @@ describe("Pending State Manager", () => { ], 1, ); - pendingStateManager.processInboundBatch( + const inboundMessage = futureRuntimeMessage as ISequencedDocumentMessage & + UnknownContainerRuntimeMessage; + pendingStateManager.processInboundMessages( { - messages: [ - futureRuntimeMessage as ISequencedDocumentMessage & - UnknownContainerRuntimeMessage, - ], - batchStartCsn: 1 /* batchStartCsn */, - batchId: "batchId", - clientId: "clientId", + type: "fullBatch", + messages: [inboundMessage], + batchStart: { + batchStartCsn: 1 /* batchStartCsn */, + batchId: "batchId", + clientId: "clientId", + keyMessage: inboundMessage, + }, + length: 1, }, true /* local */, );