Skip to content

Commit

Permalink
Shallow clone CatchupOPs and Test Blob Handle On SharedString (#4167)
Browse files Browse the repository at this point in the history
Tried to repo the below issues on main with blobs and map but didn't seem to hit it. looks like things have changed, but wanted to add the test, and the move to shallow clone anyway. Will try to repo on 0.27

related #3956
related #3707
  • Loading branch information
anthony-murphy authored Oct 29, 2020
1 parent 61bd2cc commit 16859ae
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 31 deletions.
28 changes: 16 additions & 12 deletions packages/dds/sequence/src/sequence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export abstract class SharedSegmentSequence<T extends MergeTree.ISegment>
[MergeTree.IMergeTreeOp, MergeTree.SegmentGroup | MergeTree.SegmentGroup[]][] = [];
// cache incoming ops that arrive when partial loading
private deferIncomingOps = true;
private readonly loadedDeferredIncommingOps: ISequencedDocumentMessage[] = [];
private readonly loadedDeferredIncomingOps: ISequencedDocumentMessage[] = [];

private messagesSinceMSNChange: ISequencedDocumentMessage[] = [];
private readonly intervalMapKernel: MapKernel;
Expand Down Expand Up @@ -495,7 +495,7 @@ export abstract class SharedSegmentSequence<T extends MergeTree.ISegment>
try {
// this will load the header, and return a promise
// that will resolve when the body is loaded
// and the catchup ops are avaialible.
// and the catchup ops are available.
const { catchupOpsP } = await this.client.load(
this.runtime,
new ObjectStoragePartition(storage, contentPath),
Expand Down Expand Up @@ -546,7 +546,7 @@ export abstract class SharedSegmentSequence<T extends MergeTree.ISegment>
// incoming ops to be applied after loading is complete
if (this.deferIncomingOps) {
assert(!local, "Unexpected local op when loading not finished");
this.loadedDeferredIncommingOps.push(message);
this.loadedDeferredIncomingOps.push(message);
} else {
assert(message.type === MessageType.Operation, "Sequence message not operation");

Expand Down Expand Up @@ -605,11 +605,9 @@ export abstract class SharedSegmentSequence<T extends MergeTree.ISegment>
ops.push(...SharedSegmentSequence.createOpsFromDelta(event));
}
const needsTransformation = message.referenceSequenceNumber !== message.sequenceNumber - 1;
let stashMessage = message;
let stashMessage: Readonly<ISequencedDocumentMessage> = message;
if (this.runtime.options?.newMergeTreeSnapshotFormat !== true) {
if (needsTransformation) {
stashMessage = cloneDeep(message);
stashMessage.referenceSequenceNumber = message.sequenceNumber - 1;
this.on("sequenceDelta", transfromOps);
}
}
Expand All @@ -619,7 +617,13 @@ export abstract class SharedSegmentSequence<T extends MergeTree.ISegment>
if (this.runtime.options?.newMergeTreeSnapshotFormat !== true) {
if (needsTransformation) {
this.removeListener("sequenceDelta", transfromOps);
stashMessage.contents = ops.length !== 1 ? MergeTree.createGroupOp(...ops) : ops[0];
// shallow clone the message as we only overwrite top level properties,
// like referenceSequenceNumber and content only
stashMessage = {
... message,
referenceSequenceNumber: stashMessage.sequenceNumber - 1,
contents: ops.length !== 1 ? MergeTree.createGroupOp(...ops) : ops[0],
};
}

this.messagesSinceMSNChange.push(stashMessage);
Expand Down Expand Up @@ -656,14 +660,14 @@ export abstract class SharedSegmentSequence<T extends MergeTree.ISegment>
this.logger.sendErrorEvent({ eventName: "SequenceLoadFailed" }, error);
this.loadedDeferred.reject(error);
} else {
// it is important this series remains sychronous
// first we stop defering incomming ops, and apply then all
// it is important this series remains synchronous
// first we stop defering incoming ops, and apply then all
this.deferIncomingOps = false;
while (this.loadedDeferredIncommingOps.length > 0) {
this.processCore(this.loadedDeferredIncommingOps.shift(), false, undefined);
while (this.loadedDeferredIncomingOps.length > 0) {
this.processCore(this.loadedDeferredIncomingOps.shift(), false, undefined);
}
// then resolve the loaded promise
// and resbumit all the outstanding ops, as the snapshot
// and resubmit all the outstanding ops, as the snapshot
// is fully loaded, and all outstanding ops are applied
this.loadedDeferred.resolve();

Expand Down
70 changes: 59 additions & 11 deletions packages/test/end-to-end-tests/src/test/blobs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ import { ContainerMessageType } from "@fluidframework/container-runtime";
import { IFluidHandle } from "@fluidframework/core-interfaces";
import { ISummaryConfiguration } from "@fluidframework/protocol-definitions";
import { requestFluidObject } from "@fluidframework/runtime-utils";
import { SharedString } from "@fluidframework/sequence";
import uuid from "uuid";
import { ReferenceType } from "@fluidframework/merge-tree";
import { generateTest, ICompatLocalTestObjectProvider, ITestContainerConfig, TestDataObject } from "./compatUtils";

const testContainerConfig: ITestContainerConfig = {
runtimeOptions: { initialSummarizerDelayMs: 20 },
registry: [["sharedString", SharedString.getFactory()]],
};

const tests = (args: ICompatLocalTestObjectProvider) => {
Expand All @@ -26,10 +30,10 @@ const tests = (args: ICompatLocalTestObjectProvider) => {
}
}));

const component = await requestFluidObject<TestDataObject>(container, "default");
const blob = await component._runtime.uploadBlob(IsoBuffer.from("some random text"));
const dataStore = await requestFluidObject<TestDataObject>(container, "default");
const blob = await dataStore._runtime.uploadBlob(IsoBuffer.from("some random text"));

component._root.set("my blob", blob);
dataStore._root.set("my blob", blob);

await blobOpP;
});
Expand All @@ -39,25 +43,25 @@ const tests = (args: ICompatLocalTestObjectProvider) => {
const testKey = "a blob";
const container1 = await args.makeTestContainer(testContainerConfig);

const component1 = await requestFluidObject<TestDataObject>(container1, "default");
const dataStore1 = await requestFluidObject<TestDataObject>(container1, "default");

const blob = await component1._runtime.uploadBlob(IsoBuffer.from(testString, "utf-8"));
component1._root.set(testKey, blob);
const blob = await dataStore1._runtime.uploadBlob(IsoBuffer.from(testString, "utf-8"));
dataStore1._root.set(testKey, blob);

const container2 = await args.loadTestContainer(testContainerConfig);
const component2 = await requestFluidObject<TestDataObject>(container2, "default");
const dataStore2 = await requestFluidObject<TestDataObject>(container2, "default");

const blobHandle = await component2._root.wait<IFluidHandle<ArrayBufferLike>>(testKey);
const blobHandle = await dataStore2._root.wait<IFluidHandle<ArrayBufferLike>>(testKey);
assert.strictEqual(IsoBuffer.from(await blobHandle.get()).toString("utf-8"), testString);
});

it("loads from snapshot", async function() {
const container1 = await args.makeTestContainer(testContainerConfig);
const component = await requestFluidObject<TestDataObject>(container1, "default");
const blob = await component._runtime.uploadBlob(IsoBuffer.from("some random text"));
const dataStore = await requestFluidObject<TestDataObject>(container1, "default");
const blob = await dataStore._runtime.uploadBlob(IsoBuffer.from("some random text"));

// attach blob, wait for blob attach op, then take BlobManager snapshot
component._root.set("my blob", blob);
dataStore._root.set("my blob", blob);
await new Promise((res, rej) => container1.on("op", (op) => {
if (op.contents?.type === ContainerMessageType.BlobAttach) {
// eslint-disable-next-line @typescript-eslint/no-unused-expressions
Expand Down Expand Up @@ -87,6 +91,50 @@ const tests = (args: ICompatLocalTestObjectProvider) => {
assert.strictEqual(snapshot2.entries.length, 1);
assert.strictEqual(snapshot1.entries[0].id, snapshot2.entries[0].id);
});

it("round trip blob handle on shared string property", async function() {
const container1 = await args.makeTestContainer(testContainerConfig);
const container2 = await args.loadTestContainer(testContainerConfig);
const testString = "this is a test string";
// setup
{
const dataStore = await requestFluidObject<TestDataObject>(container2, "default");
const sharedString = SharedString.create(dataStore._runtime, uuid());
dataStore._root.set("sharedString", sharedString.handle);

const blob = await dataStore._runtime.uploadBlob(IsoBuffer.from(testString));

sharedString.insertMarker(0, ReferenceType.Simple, { blob });

// wait for summarize, then summary ack so the next container will load from snapshot
await new Promise((resolve, reject) => {
let summarized = false;
container1.on("op", (op) => {
if (op.type === "summaryAck") {
if (summarized) {
resolve();
}
} else if (op.type === "summaryNack") {
reject("summaryNack");
} else if (op.type === "summarize") {
summarized = true;
}
});
});
}

// validate on remote container, local container, and container loaded from summary
for (const container of [container1, container2, await args.loadTestContainer(testContainerConfig)]) {
const dataStore2 = await requestFluidObject<TestDataObject>(container, "default");
const handle: IFluidHandle<SharedString> =
await dataStore2._root.wait("sharedString");
const sharedString2 = await handle.get();

const props = sharedString2.getPropertiesAtPosition(0);

assert.strictEqual(IsoBuffer.from(await props.blob.get()).toString("utf-8"), testString);
}
});
};

describe("blobs", () => {
Expand Down
25 changes: 17 additions & 8 deletions packages/test/end-to-end-tests/src/test/compatUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,21 @@ export class OldTestDataObject extends old.DataObject {
public get _root() { return this.root; }
}

export const createPrimedDataStoreFactory = (): IFluidDataStoreFactory => {
return new DataObjectFactory(TestDataObject.type, TestDataObject, [], {});
export const createPrimedDataStoreFactory = (registry?: ChannelFactoryRegistry): IFluidDataStoreFactory => {
return new DataObjectFactory(
TestDataObject.type,
TestDataObject,
[... registry ?? []].map((r)=>r[1]),
{});
};

export const createOldPrimedDataStoreFactory = (): old.IFluidDataStoreFactory => {
return new old.DataObjectFactory(OldTestDataObject.type, OldTestDataObject, [], {});
export const createOldPrimedDataStoreFactory =
(registry?: ChannelFactoryRegistry): old.IFluidDataStoreFactory => {
return new old.DataObjectFactory(
OldTestDataObject.type,
OldTestDataObject,
[... convertRegistry(registry)].map((r)=>r[1]),
{});
};

export const createTestFluidDataStoreFactory = (registry: ChannelFactoryRegistry = []): IFluidDataStoreFactory => {
Expand Down Expand Up @@ -163,7 +172,7 @@ export const generateTest = (
TestDataObject.type,
containerOptions?.testFluidDataObject
? createTestFluidDataStoreFactory(containerOptions?.registry)
: createPrimedDataStoreFactory(),
: createPrimedDataStoreFactory(containerOptions?.registry),
containerOptions?.runtimeOptions,
);

Expand All @@ -188,7 +197,7 @@ export const generateCompatTest = (
const dataStoreFactory = (containerOptions?: ITestContainerConfig) =>
containerOptions?.testFluidDataObject
? createTestFluidDataStoreFactory(containerOptions?.registry)
: createPrimedDataStoreFactory();
: createPrimedDataStoreFactory(containerOptions?.registry);
const runtimeFactory = (containerOptions?: ITestContainerConfig) =>
createRuntimeFactory(
TestDataObject.type,
Expand All @@ -214,7 +223,7 @@ export const generateCompatTest = (
OldTestDataObject.type,
containerOptions?.testFluidDataObject
? createOldTestFluidDataStoreFactory(containerOptions?.registry)
: createOldPrimedDataStoreFactory(),
: createOldPrimedDataStoreFactory(containerOptions?.registry),
containerOptions?.runtimeOptions,
) as any as IRuntimeFactory;

Expand All @@ -236,7 +245,7 @@ export const generateCompatTest = (
OldTestDataObject.type,
containerOptions?.testFluidDataObject
? createOldTestFluidDataStoreFactory(containerOptions?.registry)
: createOldPrimedDataStoreFactory(),
: createOldPrimedDataStoreFactory(containerOptions?.registry),
containerOptions?.runtimeOptions,
);

Expand Down

0 comments on commit 16859ae

Please sign in to comment.