Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly handle limited sync responses by resetting the thread timeline #3056

Merged
merged 12 commits into from
Jan 16, 2023
132 changes: 131 additions & 1 deletion spec/unit/models/thread.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { Thread, THREAD_RELATION_TYPE, ThreadEvent } from "../../../src/models/t
import { mkThread } from "../../test-utils/thread";
import { TestClient } from "../../TestClient";
import { emitPromise, mkMessage, mock } from "../../test-utils/test-utils";
import { EventStatus, MatrixEvent } from "../../../src";
import { Direction, EventStatus, MatrixEvent } from "../../../src";
import { ReceiptType } from "../../../src/@types/read_receipts";
import { getMockClientWithEventEmitter, mockClientMethodsUser } from "../../test-utils/client";
import { ReEmitter } from "../../../src/ReEmitter";
Expand Down Expand Up @@ -283,4 +283,134 @@ describe("Thread", () => {
expect(thread2.getEventReadUpTo(myUserId)).toBe(null);
});
});

describe("resetLiveTimeline", () => {
it("correctly resets the live timeline", async () => {
const myUserId = "@bob:example.org";
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
timelineSupport: false,
});
const client = testClient.client;
const room = new Room("123", client, myUserId, {
pendingEventOrdering: PendingEventOrdering.Detached,
});

jest.spyOn(client, "getRoom").mockReturnValue(room);

const { thread } = mkThread({
room,
client,
authorId: myUserId,
participantUserIds: ["@alice:example.org"],
length: 3,
});
await emitPromise(thread, ThreadEvent.Update);
expect(thread.length).toBe(2);

jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) =>
Promise.resolve({
chunk: [],
start: `${token}-new`,
end: `${token}-new`,
}),
);

function timelines(): [string | null, string | null][] {
return thread.timelineSet
.getTimelines()
.map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]);
}

expect(timelines()).toEqual([[null, null]]);
const promise = thread.resetLiveTimeline("b1", "f1");
expect(timelines()).toEqual([
[null, "f1"],
["b1", null],
]);
await promise;
expect(timelines()).toEqual([
[null, "f1-new"],
["b1-new", null],
]);
});

it("does not modify changed tokens", async () => {
const myUserId = "@bob:example.org";
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
timelineSupport: false,
});
const client = testClient.client;
const room = new Room("123", client, myUserId, {
pendingEventOrdering: PendingEventOrdering.Detached,
});

jest.spyOn(client, "getRoom").mockReturnValue(room);

const { thread } = mkThread({
room,
client,
authorId: myUserId,
participantUserIds: ["@alice:example.org"],
length: 3,
});
await emitPromise(thread, ThreadEvent.Update);
expect(thread.length).toBe(2);

jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) =>
Promise.resolve({
chunk: [],
start: `${token}-new`,
end: `${token}-new`,
}),
);

function timelines(): [string | null, string | null][] {
return thread.timelineSet
.getTimelines()
.map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]);
}

expect(timelines()).toEqual([[null, null]]);
const promise = thread.resetLiveTimeline("b1", "f1");
expect(timelines()).toEqual([
[null, "f1"],
["b1", null],
]);
thread.timelineSet.getTimelines()[0].setPaginationToken("f2", Direction.Forward);
thread.timelineSet.getTimelines()[1].setPaginationToken("b2", Direction.Backward);
await promise;
expect(timelines()).toEqual([
[null, "f2"],
["b2", null],
]);
});

it("is correctly called by the room", async () => {
const myUserId = "@bob:example.org";
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
timelineSupport: false,
});
const client = testClient.client;
const room = new Room("123", client, myUserId, {
pendingEventOrdering: PendingEventOrdering.Detached,
});

jest.spyOn(client, "getRoom").mockReturnValue(room);

const { thread } = mkThread({
room,
client,
authorId: myUserId,
participantUserIds: ["@alice:example.org"],
length: 3,
});
await emitPromise(thread, ThreadEvent.Update);
expect(thread.length).toBe(2);
const mock = jest.spyOn(thread, "resetLiveTimeline");
mock.mockReturnValue(Promise.resolve());

room.resetLiveTimeline("b1", "f1");
expect(mock).toHaveBeenCalledWith("b1", "f1");
});
});
});
5 changes: 4 additions & 1 deletion src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,9 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
for (const timelineSet of this.timelineSets) {
timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined);
}
for (const thread of this.threads.values()) {
thread.resetLiveTimeline(backPaginationToken, forwardPaginationToken);
}

this.fixUpLegacyTimelineFields();
}
Expand Down Expand Up @@ -1223,7 +1226,7 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
const event = this.findEventById(eventId);
const thread = this.findThreadForEvent(event);
if (thread) {
return thread.timelineSet.getLiveTimeline();
return thread.timelineSet.getTimelineForEvent(eventId);
} else {
return this.getUnfilteredTimelineSet().getTimelineForEvent(eventId);
}
Expand Down
56 changes: 55 additions & 1 deletion src/models/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ export class Thread extends ReadReceipt<EmittedEvents, EventHandlerMap> {
this.setEventMetadata(event);

const lastReply = this.lastReply();
const isNewestReply = !lastReply || event.localTimestamp > lastReply!.localTimestamp;
const isNewestReply = !lastReply || event.localTimestamp >= lastReply!.localTimestamp;

// Add all incoming events to the thread's timeline set when there's no server support
if (!Thread.hasServerSideSupport) {
Expand Down Expand Up @@ -358,6 +358,60 @@ export class Thread extends ReadReceipt<EmittedEvents, EventHandlerMap> {
this.pendingReplyCount = pendingEvents.length;
}

/**
* Reset the live timeline of all timelineSets, and start new ones.
*
* <p>This is used when /sync returns a 'limited' timeline.
justjanne marked this conversation as resolved.
Show resolved Hide resolved
*
* @param backPaginationToken - token for back-paginating the new timeline
* @param forwardPaginationToken - token for forward-paginating the old live timeline,
* if absent or null, all timelines are reset, removing old ones (including the previous live
* timeline which would otherwise be unable to paginate forwards without this token).
* Removing just the old live timeline whilst preserving previous ones is not supported.
*/
public async resetLiveTimeline(
backPaginationToken?: string | null,
forwardPaginationToken?: string | null,
): Promise<void> {
const oldLive = this.liveTimeline;
this.timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined);
const newLive = this.liveTimeline;

// FIXME: Remove the following as soon as https://github.com/matrix-org/synapse/issues/14830 is resolved.
//
// The pagination API for thread timelines currently can't handle the type of pagination tokens returned by sync
//
// To make this work anyway, we'll have to transform them into one of the types that the API can handle.
// One option is passing the tokens to /messages, which can handle sync tokens, and returns the right format.
// /messages does not return new tokens on requests with a limit of 0.
// This means our timelines might overlap a slight bit, but that's not an issue, as we deduplicate messages
// anyway.

let newBackward: string | undefined;
let oldForward: string | undefined;
if (backPaginationToken) {
const res = await this.client.createMessagesRequest(this.roomId, backPaginationToken, 1, Direction.Forward);
newBackward = res.end;
}
if (forwardPaginationToken) {
const res = await this.client.createMessagesRequest(
this.roomId,
forwardPaginationToken,
1,
Direction.Backward,
);
oldForward = res.start;
}
// Only replace the token if we don't have paginated away from this position already. This situation doesn't
// occur today, but if the above issue is resolved, we'd have to go down this path.
if (forwardPaginationToken && oldLive.getPaginationToken(Direction.Forward) === forwardPaginationToken) {
oldLive.setPaginationToken(oldForward ?? null, Direction.Forward);
}
if (backPaginationToken && newLive.getPaginationToken(Direction.Backward) === backPaginationToken) {
newLive.setPaginationToken(newBackward ?? null, Direction.Backward);
}
}

private async updateThreadMetadata(): Promise<void> {
this.updatePendingReplyCount();

Expand Down