Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetching ops from PUSH #6954

Merged
merged 2 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/drivers/driver-base/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"@fluidframework/driver-definitions": "^0.39.6",
"@fluidframework/driver-utils": "^0.45.0",
"@fluidframework/protocol-definitions": "^0.1024.0",
"@fluidframework/telemetry-utils": "^0.45.0",
"debug": "^4.1.1"
},
"devDependencies": {
Expand Down
6 changes: 5 additions & 1 deletion packages/drivers/driver-base/src/documentDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
ScopeType,
} from "@fluidframework/protocol-definitions";
import { IDisposable, ITelemetryLogger } from "@fluidframework/common-definitions";
import { ChildLogger } from "@fluidframework/telemetry-utils";
import { debug } from "./debug";

/**
Expand Down Expand Up @@ -71,6 +72,7 @@ export class DocumentDeltaConnection
* After disconnection, we flip this to prevent any stale messages from being emitted.
*/
protected _disposed: boolean = false;
protected readonly logger: ITelemetryLogger;

public get details(): IConnected {
if (!this._details) {
Expand All @@ -86,10 +88,12 @@ export class DocumentDeltaConnection
protected constructor(
protected readonly socket: SocketIOClient.Socket,
public documentId: string,
protected readonly logger: ITelemetryLogger,
logger: ITelemetryLogger,
) {
super();

this.logger = ChildLogger.create(logger, "DeltaConnection");

this.submitManager = new BatchManager<IDocumentMessage[]>(
(submitType, work) => {
// Although the implementation here disconnects the socket and does not reuse it, other subclasses
Expand Down
28 changes: 26 additions & 2 deletions packages/drivers/odsp-driver/src/odspDocumentDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { ITelemetryLogger } from "@fluidframework/common-definitions";
import { assert } from "@fluidframework/common-utils";
import { DocumentDeltaConnection } from "@fluidframework/driver-base";
import { IDocumentDeltaConnection, DriverError } from "@fluidframework/driver-definitions";
import { DriverError } from "@fluidframework/driver-definitions";
import {
IClient,
IConnect,
Expand Down Expand Up @@ -182,7 +182,7 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
url: string,
telemetryLogger: ITelemetryLogger,
timeoutMs: number,
epochTracker: EpochTracker): Promise<IDocumentDeltaConnection>
epochTracker: EpochTracker): Promise<OdspDocumentDeltaConnection>
{
// enable multiplexing when the websocket url does not include the tenant/document id
const parsedUrl = new URL(url);
Expand Down Expand Up @@ -244,6 +244,7 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
}

private socketReference: SocketReference | undefined;
private readonly requestOpsNoncePrefix: string;

/**
* Error raising for socket.io issues
Expand Down Expand Up @@ -309,6 +310,15 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
{
super(socket, documentId, logger);
this.socketReference = socketReference;
this.requestOpsNoncePrefix = `${this.documentId}-`;
}

public requestOps(from: number, to: number) {
this.socket.emit("get_ops", this.clientId, {
nonce: `${this.requestOpsNoncePrefix}${uuid()}`,
from,
to,
});
}

protected async initialize(connectMessage: IConnect, timeout: number) {
Expand All @@ -321,6 +331,20 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
};
}

this.socket.on("get_ops_response", (result) => {
const messages = result.messages as ISequencedDocumentMessage[];
if ((result.nonce as string).startsWith(this.requestOpsNoncePrefix) && messages.length > 0) {
this.logger.sendTelemetryEvent({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the latency of the get_ops request be logged with this event? Could track the start time for the given nonce so it's accessible here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will make code a bit more complicated. I want to get this in first, and then keep improving based on feedback.
Including getting data on if this is the actual direction we want to go. I.e. I believe we will need to make a choice of either using this flow to understand better / easier why gaps are happening and keep improving PUSH, or vice verse - experiment with simplifying PUSH toward putting complex on client side

eventName: "GetOps",
first: messages[0].sequenceNumber,
last: messages[messages.length - 1].sequenceNumber,
code: result.code,
length: messages.length,
});
this.socket.emit("op", this.documentId, messages);
}
});

return super.initialize(connectMessage, timeout);
}

Expand Down
8 changes: 7 additions & 1 deletion packages/drivers/odsp-driver/src/odspDocumentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ export class OdspDocumentService implements IDocumentService {

private _opsCache?: OpsCache;

private currentConnection?: OdspDocumentDeltaConnection;

/**
* @param odspResolvedUrl - resolved url identifying document that will be managed by this service instance.
* @param getStorageToken - function that can provide the storage token. This is is also referred to as
Expand Down Expand Up @@ -184,6 +186,9 @@ export class OdspDocumentService implements IDocumentService {
concurrency,
async (from, to, telemetryProps) => service.get(from, to, telemetryProps),
async (from, to) => {
if (this.currentConnection !== undefined && !this.currentConnection.disposed) {
this.currentConnection.requestOps(from, to);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be behind some kind of flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels pretty safe to me, with two caveats:

  1. We have a chance to run with this change through ODSP stress test before it goes to prod. We will do it right away once it's in.
  2. There is really nothing interesting here from client side (in terms of potential introduction of bugs), so introduction of policy/flag is more a kill switch for PUSH functionality. I'd rather have kill switch on PUSH side than on client here - if server disables this functionality, it's noop on client (and does not require code change on client).

@GaryWilber - any thoughts on state of # 2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll have a killswitch on the push side to disable processing get_ops requests - it would make it a no-op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GaryWilber , do you have it implemented already? Or planning to add? This change will hit prod in a matter of 1-2 weeks after being approved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The killswitch is implemented but it's not deployed to production yet. ETA to full prod deployment would be 2 weeks from now (8/16)

}
const res = await this.opsCache?.get(from, to);
return res as ISequencedDocumentMessage[] ?? [];
},
Expand Down Expand Up @@ -247,6 +252,7 @@ export class OdspDocumentService implements IDocumentService {
connection.on("op", (documentId, ops: ISequencedDocumentMessage[]) => {
this.opsReceived(ops);
});
this.currentConnection = connection;
return connection;
} catch (error) {
this.cache.sessionJoinCache.remove(this.joinSessionKey);
Expand Down Expand Up @@ -294,7 +300,7 @@ export class OdspDocumentService implements IDocumentService {
io: SocketIOClientStatic,
client: IClient,
webSocketUrl: string,
): Promise<IDocumentDeltaConnection> {
): Promise<OdspDocumentDeltaConnection> {
const startTime = performance.now();
const connection = await OdspDocumentDeltaConnection.create(
tenantId,
Expand Down