Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Container Rehydration from a snapshot taken from detached container. #3061

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
59f1e16
final changes for rehydration
jatgarg Aug 4, 2020
4bffb77
final changes for rehydration
jatgarg Aug 5, 2020
5573ffa
merge conflict
jatgarg Aug 5, 2020
59d66b5
mertge conflict
jatgarg Aug 5, 2020
d240a3a
remove
jatgarg Aug 5, 2020
f99f68e
Merge branch 'master' of https://github.com/microsoft/FluidFramework …
jatgarg Aug 5, 2020
217aacc
fix tiny
jatgarg Aug 5, 2020
1bb9da0
remove unloaded map
jatgarg Aug 5, 2020
6fcce12
remove unrealized map
jatgarg Aug 5, 2020
3f903b7
add comment
jatgarg Aug 5, 2020
e255f54
remove loadlocal
jatgarg Aug 5, 2020
59a4ff2
pr sugg
jatgarg Aug 5, 2020
737111d
pr sugg
jatgarg Aug 5, 2020
cfde78c
pr sugg
jatgarg Aug 5, 2020
0a38dff
merge conflict
jatgarg Aug 5, 2020
acc535a
fix name
jatgarg Aug 5, 2020
918a5c4
merge conflict
jatgarg Aug 6, 2020
84bc42e
pr sugg
jatgarg Aug 6, 2020
808d530
merge conflict
jatgarg Aug 12, 2020
7150af4
separate the 2 apis
jatgarg Aug 12, 2020
60f977f
resolve m,erge
jatgarg Aug 12, 2020
4bd5d7e
merge conflict
jatgarg Aug 13, 2020
f93b1df
set existing as true if loaded from rehydration snapshopt
jatgarg Aug 17, 2020
79592b1
set existing as true if loaded from rehydration snapshopt
jatgarg Aug 17, 2020
2876424
merge conflict
jatgarg Aug 17, 2020
acd860d
remove
jatgarg Aug 17, 2020
e52d2ac
remove
jatgarg Aug 17, 2020
3f46d22
localdatastore
jatgarg Aug 17, 2020
a19116f
rename comp toi datta store
jatgarg Aug 19, 2020
eae3ec6
merge conflict
jatgarg Aug 19, 2020
562b29b
merge conflict
jatgarg Aug 19, 2020
55d76e2
cast
jatgarg Aug 20, 2020
d18fd5f
cast
jatgarg Aug 20, 2020
0525797
merge conflict
jatgarg Aug 24, 2020
7c3e821
merge conflict
jatgarg Aug 28, 2020
c395773
pr sugg
jatgarg Aug 28, 2020
cf97acd
merge conflict
jatgarg Sep 4, 2020
a1a07a8
pr sugg
jatgarg Sep 8, 2020
545f304
merge conflict
jatgarg Sep 8, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/dds/cell/src/cell.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ export class SharedCell<T extends Serializable = any> extends SharedObject<IShar
* @returns - promise that resolved when the load is completed
*/
protected async loadCore(
branchId: string,
branchId: string | undefined,
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
storage: IChannelStorageService): Promise<void> {
const rawContent = await storage.read(snapshotFileName);

Expand Down
2 changes: 1 addition & 1 deletion packages/dds/cell/src/cellFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class CellFactory implements IChannelFactory {
runtime: IFluidDataStoreRuntime,
id: string,
services: IChannelServices,
branchId: string,
branchId: string | undefined,
attributes: IChannelAttributes): Promise<ISharedCell> {
const cell = new SharedCell(id, runtime, attributes);
await cell.load(branchId, services);
Expand Down
2 changes: 1 addition & 1 deletion packages/dds/counter/src/counter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ export class SharedCounter extends SharedObject<ISharedCounterEvents> implements
* @returns - promise that resolved when the load is completed
*/
protected async loadCore(
branchId: string,
branchId: string | undefined,
storage: IChannelStorageService): Promise<void> {
const rawContent = await storage.read(snapshotFileName);

Expand Down
2 changes: 1 addition & 1 deletion packages/dds/ink/src/ink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ export class Ink extends SharedObject<IInkEvents> implements IInk {
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.loadCore}
*/
protected async loadCore(
branchId: string,
branchId: string | undefined,
storage: IChannelStorageService,
): Promise<void> {
const header = await storage.read(snapshotFileName);
Expand Down
2 changes: 1 addition & 1 deletion packages/dds/map/src/directory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ export class SharedDirectory extends SharedObject<ISharedDirectoryEvents> implem
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.loadCore}
*/
protected async loadCore(
branchId: string,
branchId: string | undefined,
storage: IChannelStorageService) {
const header = await storage.read(snapshotFileName);
const data = JSON.parse(fromBase64ToUtf8(header));
Expand Down
2 changes: 1 addition & 1 deletion packages/dds/map/src/map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ export class SharedMap extends SharedObject<ISharedMapEvents> implements IShared
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.loadCore}
*/
protected async loadCore(
branchId: string,
branchId: string | undefined,
storage: IChannelStorageService) {
const header = await storage.read(snapshotFileName);

Expand Down
2 changes: 1 addition & 1 deletion packages/dds/matrix/src/matrix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ export class SharedMatrix<T extends Serializable = Serializable>
debug(`${this.id} is now disconnected`);
}

protected async loadCore(branchId: string, storage: IChannelStorageService) {
protected async loadCore(branchId: string | undefined, storage: IChannelStorageService) {
try {
await this.rows.load(this.runtime, new ObjectStoragePartition(storage, SnapshotPath.rows), branchId);
await this.cols.load(this.runtime, new ObjectStoragePartition(storage, SnapshotPath.cols), branchId);
Expand Down
1 change: 1 addition & 0 deletions packages/dds/merge-tree/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"dependencies": {
"@fluidframework/common-definitions": "^0.18.1",
"@fluidframework/common-utils": "^0.21.0",
"@fluidframework/container-definitions": "^0.25.1",
"@fluidframework/core-interfaces": "^0.25.1",
"@fluidframework/datastore-definitions": "^0.25.1",
"@fluidframework/protocol-definitions": "^0.1011.1-0",
Expand Down
38 changes: 20 additions & 18 deletions packages/dds/merge-tree/src/snapshotLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { ChildLogger } from "@fluidframework/telemetry-utils";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";
import { IFluidDataStoreRuntime, IChannelStorageService } from "@fluidframework/datastore-definitions";
import { ITelemetryLogger } from "@fluidframework/common-definitions";
import { AttachState } from "@fluidframework/container-definitions";
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
import { Client } from "./client";
import { NonCollabClient, UniversalSequenceNumber } from "./constants";
import { ISegment, MergeTree } from "./mergeTree";
Expand Down Expand Up @@ -134,24 +135,25 @@ export class SnapshotLoader {
if (chunk.headerMetadata === undefined) {
throw new Error("header metadata not available");
}
// specify a default client id, "snapshot" here as we
// should enter collaboration/op sending mode if we load
// a snapshot in any case (summary or attach message)
// once we get a client id this will be called with that
// clientId in the connected event
// TODO: this won't support rehydrating a detached container
// we need to think more holistically about the dds state machine
// now that we differentiate attached vs local
this.client.startOrUpdateCollaboration(
this.runtime.clientId ?? "snapshot",

// TODO: Make 'minSeq' non-optional once the new snapshot format becomes the default?
// (See https://github.com/microsoft/FluidFramework/issues/84)
/* minSeq: */ chunk.headerMetadata.minSequenceNumber !== undefined
? chunk.headerMetadata.minSequenceNumber
: chunk.headerMetadata.sequenceNumber,
/* currentSeq: */ chunk.headerMetadata.sequenceNumber,
branching);
// If we load a detached container from snapshot, then we don't supply a default clientId
// because we don't want to start collaboration.
if (this.runtime.attachState !== AttachState.Detached) {
// specify a default client id, "snapshot" here as we
// should enter collaboration/op sending mode if we load
// a snapshot in any case (summary or attach message)
// once we get a client id this will be called with that
// clientId in the connected event
this.client.startOrUpdateCollaboration(
this.runtime.clientId ?? "snapshot",

// TODO: Make 'minSeq' non-optional once the new snapshot format becomes the default?
// (See https://github.com/microsoft/FluidFramework/issues/84)
/* minSeq: */ chunk.headerMetadata.minSequenceNumber !== undefined
? chunk.headerMetadata.minSequenceNumber
: chunk.headerMetadata.sequenceNumber,
/* currentSeq: */ chunk.headerMetadata.sequenceNumber,
branching);
}

return chunk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ export class ConsensusOrderedCollection<T = any>
}

protected async loadCore(
branchId: string,
branchId: string | undefined,
storage: IChannelStorageService): Promise<void> {
assert(this.jobTracking.size === 0);
const rawContentTracking = await storage.read(snapshotFileNameTracking);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ export class ConsensusRegisterCollection<T>
}

protected async loadCore(
branchId: string,
branchId: string | undefined,
storage: IChannelStorageService,
): Promise<void> {
const header = await storage.read(snapshotFileName);
Expand Down
2 changes: 1 addition & 1 deletion packages/dds/sequence/src/sequence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ export abstract class SharedSegmentSequence<T extends MergeTree.ISegment>
}

protected async loadCore(
branchId: string,
branchId: string | undefined,
storage: IChannelStorageService) {
const header = await storage.read(snapshotFileName);

Expand Down
2 changes: 1 addition & 1 deletion packages/dds/sequence/src/sharedIntervalCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export class SharedIntervalCollection<TInterval extends ISerializableInterval =
}

protected async loadCore(
branchId: string,
branchId: string | undefined,
storage: IChannelStorageService) {
const header = await storage.read(snapshotFileName);

Expand Down
16 changes: 10 additions & 6 deletions packages/dds/shared-object-base/src/sharedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,18 @@ export abstract class SharedObject<TEvent extends ISharedObjectEvents = ISharedO
* @param services - Services used by the shared object
*/
public async load(
branchId: string,
services: IChannelServices): Promise<void> {
this.services = services;

branchId: string | undefined,
services: IChannelServices,
): Promise<void> {
if (this.runtime.attachState !== AttachState.Detached) {
this.services = services;
}
await this.loadCore(
branchId,
services.objectStorage);
this.attachDeltaHandler();
if (this.runtime.attachState !== AttachState.Detached) {
this.attachDeltaHandler();
}
}

/**
Expand Down Expand Up @@ -200,7 +204,7 @@ export abstract class SharedObject<TEvent extends ISharedObjectEvents = ISharedO
* @param services - Storage used by the shared object
*/
protected abstract loadCore(
branchId: string,
branchId: string | undefined,
services: IChannelStorageService): Promise<void>;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export class SharedSummaryBlock extends SharedObject implements ISharedSummaryBl
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.loadCore}
*/
protected async loadCore(
branchId: string,
branchId: string | undefined,
storage: IChannelStorageService): Promise<void> {
const rawContent = await storage.read(snapshotFileName);
const contents = JSON.parse(fromBase64ToUtf8(rawContent)) as ISharedSummaryBlockDataSerializable;
Expand Down
7 changes: 7 additions & 0 deletions packages/loader/container-definitions/src/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
IDocumentMessage,
IQuorum,
ISequencedDocumentMessage,
ISnapshotTree,
} from "@fluidframework/protocol-definitions";
import { IResolvedUrl } from "@fluidframework/driver-definitions";
import { IEvent, IEventProvider } from "@fluidframework/common-definitions";
Expand Down Expand Up @@ -149,6 +150,12 @@ export interface ILoader {
* updates will only be local until the user explicitly attaches the container to a service provider.
*/
createDetachedContainer(source: IFluidCodeDetails): Promise<IContainer>;

/**
* Creates a new container using the specified snapshot but in an unattached state. While unattached all
* updates will only be local until the user explicitly attaches the container to a service provider.
*/
createDetachedContainerFromSnapshot(source: ISnapshotTree): Promise<IContainer>;
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
}

export enum LoaderHeader {
Expand Down
2 changes: 1 addition & 1 deletion packages/loader/container-definitions/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export interface IContainerContext extends IMessageScheduler, IDisposable {
readonly storage: IDocumentStorageService | undefined | null;
readonly connected: boolean;
readonly branch: string;
readonly baseSnapshot: ISnapshotTree | null;
readonly baseSnapshot: ISnapshotTree | undefined;
readonly submitFn: (type: MessageType, contents: any, batch: boolean, appData?: any) => number;
readonly submitSignalFn: (contents: any) => void;
readonly snapshotFn: (message: string) => Promise<void>;
Expand Down
69 changes: 54 additions & 15 deletions packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
isOnline,
ensureFluidResolvedUrl,
combineAppAndProtocolSummary,
readAndParseFromBlobs,
} from "@fluidframework/driver-utils";
import { CreateContainerError } from "@fluidframework/container-utils";
import {
Expand Down Expand Up @@ -193,7 +194,8 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
options: any,
scope: IFluidObject,
loader: Loader,
source: IFluidCodeDetails,
codeDetails: IFluidCodeDetails | undefined,
snapshot: ISnapshotTree | undefined,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's easy to differentiate interfaces, I'd rather have single IFluidCodeDetails | ISnapshotTree argument.
Or a type that differentiates them:
{ create: true, code: IFluidCodeDetails } | { create: false, snapshot: ISnapshotTree}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chose this one: { create: true, code: IFluidCodeDetails } | { create: false, snapshot: ISnapshotTree}. It is more explicit, infact it was like this before but later moved to other one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like like this type. i'd rather just have separate methods. also, can the code proposal be updated on a detached container? i think we will want to support this if we don't


In reply to: 479472885 [](ancestors = 479472885)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you basically already have two methods anyway. i don't see a whole lot of value in coupling these


In reply to: 483120067 [](ancestors = 483120067,479472885)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2 methods one of which load from codeDetails and the other which load from snapshot are private methods. Loader calls create method on container which is public if it wants to create a detached container either from snapshot or code details. So this type is just used here. Aesthetically this looks good to me:
{ create: true, code: IFluidCodeDetails } | { create: false, snapshot: ISnapshotTree}

Code can be proposed again by the quorum, isn't it? I mean can we update the code and load from snapshot at the same time? I don't think so.

serviceFactory: IDocumentServiceFactory,
urlResolver: IUrlResolver,
logger?: ITelemetryBaseLogger,
Expand All @@ -207,7 +209,13 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
urlResolver,
{},
logger);
await container.createDetached(source);

if (snapshot !== undefined) {
await container.createDetachedFromSnapshot(snapshot);
} else {
assert(codeDetails, "One of the source should be there to load from!!");
await container.createDetached(codeDetails);
}

return container;
}
Expand Down Expand Up @@ -564,8 +572,6 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
this._attachState = AttachState.Attached;
this.emit("attached");
this.cachedAttachSummary = undefined;
// We know this is create new flow.
this._existing = false;
this._parentBranch = this._id;

// Propagate current connection state through the system.
Expand Down Expand Up @@ -1016,6 +1022,9 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i

this.attachDeltaManagerOpHandler(attributes);

// We know this is create detached flow without snapshot.
this._existing = false;

// Need to just seed the source data in the code quorum. Quorum itself is empty
this._protocolHandler = this.initializeProtocolState(
attributes,
Expand All @@ -1031,6 +1040,26 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
this.propagateConnectionState();
}

private async createDetachedFromSnapshot(snapshotTree: ISnapshotTree) {
const attributes = await this.getDocumentAttributes(undefined, snapshotTree);
assert.strictEqual(attributes.sequenceNumber, 0, "Seq number in detached container should be 0!!");
this.attachDeltaManagerOpHandler(attributes);

// We know this is create detached flow with snapshot.
this._existing = true;

// ...load in the existing quorum
// Initialize the protocol handler
this._protocolHandler =
await this.loadAndInitializeProtocolState(attributes, undefined, snapshotTree);

await this.createDetachedContext(attributes, snapshotTree);

this.loaded = true;

this.propagateConnectionState();
}

private async getDocumentStorageService(): Promise<IDocumentStorageService> {
if (this.service === undefined) {
throw new Error("Not attached");
Expand All @@ -1040,7 +1069,7 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
}

private async getDocumentAttributes(
storage: IDocumentStorageService,
storage: IDocumentStorageService | undefined,
tree: ISnapshotTree | undefined,
): Promise<IDocumentAttributes> {
if (tree === undefined) {
Expand All @@ -1057,7 +1086,8 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
? tree.trees[".protocol"].blobs.attributes
: tree.blobs[".attributes"];

const attributes = await readAndParse<IDocumentAttributes>(storage, attributesHash);
const attributes = storage !== undefined ? await readAndParse<IDocumentAttributes>(storage, attributesHash)
: readAndParseFromBlobs<IDocumentAttributes>(tree.trees[".protocol"].blobs, attributesHash);
jatgarg marked this conversation as resolved.
Show resolved Hide resolved

// Back-compat for older summaries with no term
if (attributes.term === undefined) {
Expand All @@ -1069,7 +1099,7 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i

private async loadAndInitializeProtocolState(
attributes: IDocumentAttributes,
storage: IDocumentStorageService,
storage: IDocumentStorageService | undefined,
snapshot: ISnapshotTree | undefined,
): Promise<ProtocolOpHandler> {
let members: [string, ISequencedClient][] = [];
Expand All @@ -1078,11 +1108,20 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i

if (snapshot !== undefined) {
const baseTree = ".protocol" in snapshot.trees ? snapshot.trees[".protocol"] : snapshot;
[members, proposals, values] = await Promise.all([
readAndParse<[string, ISequencedClient][]>(storage, baseTree.blobs.quorumMembers),
readAndParse<[number, ISequencedProposal, string[]][]>(storage, baseTree.blobs.quorumProposals),
readAndParse<[string, ICommittedProposal][]>(storage, baseTree.blobs.quorumValues),
]);
if (storage !== undefined) {
[members, proposals, values] = await Promise.all([
readAndParse<[string, ISequencedClient][]>(storage, baseTree.blobs.quorumMembers),
readAndParse<[number, ISequencedProposal, string[]][]>(storage, baseTree.blobs.quorumProposals),
readAndParse<[string, ICommittedProposal][]>(storage, baseTree.blobs.quorumValues),
]);
} else {
members = readAndParseFromBlobs<[string, ISequencedClient][]>(snapshot.trees[".protocol"].blobs,
baseTree.blobs.quorumMembers);
proposals = readAndParseFromBlobs<[number, ISequencedProposal, string[]][]>(
snapshot.trees[".protocol"].blobs, baseTree.blobs.quorumProposals);
values = readAndParseFromBlobs<[string, ICommittedProposal][]>(snapshot.trees[".protocol"].blobs,
baseTree.blobs.quorumValues);
}
}

const protocolHandler = this.initializeProtocolState(
Expand Down Expand Up @@ -1507,7 +1546,7 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
this.scope,
this.codeLoader,
chaincode,
snapshot ?? null,
snapshot,
attributes,
this.blobManager,
new DeltaManagerProxy(this._deltaManager),
Expand All @@ -1529,7 +1568,7 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
/**
* Creates a new, unattached container context
*/
private async createDetachedContext(attributes: IDocumentAttributes) {
private async createDetachedContext(attributes: IDocumentAttributes, snapshot?: ISnapshotTree) {
this.pkg = this.getCodeDetailsFromQuorum();
if (this.pkg === undefined) {
throw new Error("pkg should be provided in create flow!!");
Expand All @@ -1545,7 +1584,7 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
this.scope,
this.codeLoader,
runtimeFactory,
{ id: null, blobs: {}, commits: {}, trees: {} }, // TODO this will be from the offline store
snapshot,
attributes,
this.blobManager,
new DeltaManagerProxy(this._deltaManager),
Expand Down
Loading