Skip to content

Commit

Permalink
SharedInterval Throw on Reconnect Attempt (#9776)
Browse files Browse the repository at this point in the history
The current code is not safe. It doesn't do any rebasing of the operation it just naively resubmits it. This can lead to data corruption, so better to just fail in this case.

related to #8739
  • Loading branch information
anthony-murphy authored Apr 7, 2022
1 parent d900211 commit 867e337
Showing 1 changed file with 36 additions and 18 deletions.
54 changes: 36 additions & 18 deletions packages/dds/sequence/src/mapKernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,25 @@ interface IMapMessageHandler {
op: IMapOperation,
local: boolean,
message: ISequencedDocumentMessage | undefined,
localOpMetadata: unknown,
localOpMetadata: IMapMessageLocalMetadata,
): void;

/**
* Communicate the operation to remote clients.
* @param op - The map operation to submit
* @param localOpMetadata - The metadata to be submitted with the message.
*/
submit(op: IMapOperation, localOpMetadata: unknown): void;
submit(op: IMapOperation): void;

getStashedOpLocalMetadata(op: IMapOperation): unknown;
}

interface IMapMessageLocalMetadata{
pendingClearMessageId?: number,
pendingMessageId?: number,
lastProcessedSeq: number
}

/**
* Describes an operation specific to a value type.
*/
Expand Down Expand Up @@ -183,6 +189,8 @@ export class MapKernel implements IValueTypeCreator {
*/
private readonly localValueMaker: LocalValueMaker;

private lastProcessedSeq: number = -1;

/**
* Create a new shared map kernel.
* @param serializer - The serializer to serialize / parse handles
Expand All @@ -195,7 +203,7 @@ export class MapKernel implements IValueTypeCreator {
constructor(
private readonly serializer: IFluidSerializer,
private readonly handle: IFluidHandle,
private readonly submitMessage: (op: any, localOpMetadata: unknown) => void,
private readonly submitMessage: (op: any, localOpMetadata: IMapMessageLocalMetadata) => void,
private readonly isAttached: () => boolean,
valueTypes: Readonly<IValueType<any>[]>,
public readonly eventEmitter = new TypedEventEmitter<ISharedMapEvents>(),
Expand Down Expand Up @@ -499,8 +507,14 @@ export class MapKernel implements IValueTypeCreator {
public trySubmitMessage(op: any, localOpMetadata: unknown): boolean {
const type: string = op.type;
if (this.messageHandlers.has(type)) {
const mapLocalMetadata: Partial<IMapMessageLocalMetadata> = localOpMetadata;
// we don't know how to rebase these operations, so if any other op has come in
// we will fail.
if(this.lastProcessedSeq !== mapLocalMetadata?.lastProcessedSeq) {
throw new Error("SharedInterval does not support reconnect in presence of external changes");
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.messageHandlers.get(type)!.submit(op as IMapOperation, localOpMetadata);
this.messageHandlers.get(type)!.submit(op as IMapOperation);
return true;
}
return false;
Expand Down Expand Up @@ -529,11 +543,14 @@ export class MapKernel implements IValueTypeCreator {
message: ISequencedDocumentMessage | undefined,
localOpMetadata: unknown,
): boolean {
// track the seq of every incoming message, so we can detect if any
// changes happened during a resubmit
this.lastProcessedSeq = message.sequenceNumber;
if (this.messageHandlers.has(op.type)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.messageHandlers
.get(op.type)!
.process(op, local, message, localOpMetadata);
.process(op, local, message, localOpMetadata as IMapMessageLocalMetadata);
return true;
}
return false;
Expand Down Expand Up @@ -631,11 +648,12 @@ export class MapKernel implements IValueTypeCreator {
private needProcessKeyOperation(
op: IMapKeyOperation,
local: boolean,
localOpMetadata: unknown,
localOpMetadata: IMapMessageLocalMetadata,
): boolean {
if (this.pendingClearMessageId !== -1) {
if (local) {
assert(localOpMetadata !== undefined && localOpMetadata as number < this.pendingClearMessageId,
assert(localOpMetadata?.pendingClearMessageId !== undefined
&& localOpMetadata.pendingClearMessageId < this.pendingClearMessageId,
0x1f1 /* "Received out of order op when there is an unacked clear message" */);
}
// If we have an unacked clear, we can ignore all ops.
Expand All @@ -648,7 +666,7 @@ export class MapKernel implements IValueTypeCreator {
if (local) {
assert(localOpMetadata !== undefined,
0x1f2 /* `pendingMessageId is missing from the local client's ${op.type} operation` */);
const pendingMessageId = localOpMetadata as number;
const pendingMessageId = localOpMetadata.pendingMessageId;
const pendingKeyMessageId = this.pendingKeys.get(op.key);
if (pendingKeyMessageId === pendingMessageId) {
this.pendingKeys.delete(op.key);
Expand All @@ -674,7 +692,7 @@ export class MapKernel implements IValueTypeCreator {
if (local) {
assert(localOpMetadata !== undefined,
0x1f3 /* "pendingMessageId is missing from the local client's clear operation" */);
const pendingMessageId = localOpMetadata as number;
const pendingMessageId = localOpMetadata?.pendingMessageId;
if (this.pendingClearMessageId === pendingMessageId) {
this.pendingClearMessageId = -1;
}
Expand All @@ -686,7 +704,7 @@ export class MapKernel implements IValueTypeCreator {
}
this.clearCore(local, message);
},
submit: (op: IMapClearOperation, localOpMetadata: unknown) => {
submit: (op: IMapClearOperation) => {
// We don't reuse the metadata but send a new one on each submit.
this.submitMapClearMessage(op);
},
Expand All @@ -704,7 +722,7 @@ export class MapKernel implements IValueTypeCreator {
}
this.deleteCore(op.key, local, message);
},
submit: (op: IMapDeleteOperation, localOpMetadata: unknown) => {
submit: (op: IMapDeleteOperation) => {
// We don't reuse the metadata but send a new one on each submit.
this.submitMapKeyMessage(op);
},
Expand All @@ -725,7 +743,7 @@ export class MapKernel implements IValueTypeCreator {
const context = this.makeLocal(op.key, op.value);
this.setCore(op.key, context, local, message);
},
submit: (op: IMapSetOperation, localOpMetadata: unknown) => {
submit: (op: IMapSetOperation) => {
// We don't reuse the metadata but send a new one on each submit.
this.submitMapKeyMessage(op);
},
Expand Down Expand Up @@ -758,8 +776,8 @@ export class MapKernel implements IValueTypeCreator {
const event: IValueChanged = { key: op.key, previousValue };
this.eventEmitter.emit("valueChanged", event, local, message, this.eventEmitter);
},
submit: (op: IMapValueTypeOperation, localOpMetadata: unknown) => {
this.submitMessage(op, localOpMetadata);
submit: (op: IMapValueTypeOperation) => {
this.submitMessage(op, {lastProcessedSeq: this.lastProcessedSeq});
},
getStashedOpLocalMetadata: (op: IMapValueTypeOperation) => {
assert(false, 0x016 /* "apply stashed op not implemented for custom value type ops" */);
Expand All @@ -780,8 +798,8 @@ export class MapKernel implements IValueTypeCreator {
* @param op - The clear message
*/
private submitMapClearMessage(op: IMapClearOperation): void {
const pendingMessageId = this.getMapClearMessageLocalMetadata(op);
this.submitMessage(op, pendingMessageId);
const pendingClearMessageId = this.getMapClearMessageLocalMetadata(op);
this.submitMessage(op, {pendingClearMessageId, lastProcessedSeq: this.lastProcessedSeq});
}

private getMapKeyMessageLocalMetadata(op: IMapKeyOperation): number {
Expand All @@ -796,7 +814,7 @@ export class MapKernel implements IValueTypeCreator {
*/
private submitMapKeyMessage(op: IMapKeyOperation): void {
const pendingMessageId = this.getMapKeyMessageLocalMetadata(op);
this.submitMessage(op, pendingMessageId);
this.submitMessage(op, {pendingMessageId, lastProcessedSeq: this.lastProcessedSeq});
}

/**
Expand All @@ -821,7 +839,7 @@ export class MapKernel implements IValueTypeCreator {
},
};
// Send the localOpMetadata as undefined because we don't care about the ack.
this.submitMessage(op, undefined /* localOpMetadata */);
this.submitMessage(op, {lastProcessedSeq: this.lastProcessedSeq});

const event: IValueChanged = { key, previousValue };
this.eventEmitter.emit("valueChanged", event, true, null, this.eventEmitter);
Expand Down

0 comments on commit 867e337

Please sign in to comment.