Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure we do not add relations to the wrong timeline #3427

Merged
merged 19 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions spec/integ/matrix-client-event-timeline.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ describe("MatrixClient event timelines", function () {

const prom = emitPromise(room, ThreadEvent.Update);
// Assume we're seeing the reply while loading backlog
room.addLiveEvents([THREAD_REPLY2]);
await room.addLiveEvents([THREAD_REPLY2]);
httpBackend
.when(
"GET",
Expand All @@ -1156,7 +1156,7 @@ describe("MatrixClient event timelines", function () {
});
await flushHttp(prom);
// but while loading the metadata, a new reply has arrived
room.addLiveEvents([THREAD_REPLY3]);
await room.addLiveEvents([THREAD_REPLY3]);
const thread = room.getThread(THREAD_ROOT_UPDATED.event_id!)!;
// then the events should still be all in the right order
expect(thread.events.map((it) => it.getId())).toEqual([
Expand Down Expand Up @@ -1248,7 +1248,7 @@ describe("MatrixClient event timelines", function () {

const prom = emitPromise(room, ThreadEvent.Update);
// Assume we're seeing the reply while loading backlog
room.addLiveEvents([THREAD_REPLY2]);
await room.addLiveEvents([THREAD_REPLY2]);
httpBackend
.when(
"GET",
Expand All @@ -1267,7 +1267,7 @@ describe("MatrixClient event timelines", function () {
});
await flushHttp(prom);
// but while loading the metadata, a new reply has arrived
room.addLiveEvents([THREAD_REPLY3]);
await room.addLiveEvents([THREAD_REPLY3]);
const thread = room.getThread(THREAD_ROOT_UPDATED.event_id!)!;
// then the events should still be all in the right order
expect(thread.events.map((it) => it.getId())).toEqual([
Expand Down Expand Up @@ -1572,7 +1572,7 @@ describe("MatrixClient event timelines", function () {
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD_ROOT_UPDATED);
respondToEvent(THREAD2_ROOT);
room.addLiveEvents([THREAD_REPLY2]);
await room.addLiveEvents([THREAD_REPLY2]);
await httpBackend.flushAllExpected();
await prom;
expect(thread.length).toBe(2);
Expand Down
2 changes: 1 addition & 1 deletion spec/integ/matrix-client-unread-notifications.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ describe("MatrixClient syncing", () => {

const thread = mkThread({ room, client: client!, authorId: selfUserId, participantUserIds: [selfUserId] });
const threadReply = thread.events.at(-1)!;
room.addLiveEvents([thread.rootEvent]);
await room.addLiveEvents([thread.rootEvent]);

// Initialize read receipt datastructure before testing the reaction
room.addReceiptToStructure(thread.rootEvent.getId()!, ReceiptType.Read, selfUserId, { ts: 1 }, false);
Expand Down
152 changes: 70 additions & 82 deletions spec/integ/sliding-sync-sdk.spec.ts

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion spec/unit/models/thread.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ async function createThread(client: MatrixClient, user: string, roomId: string):

// Ensure the root is in the room timeline
root.setThreadId(root.getId());
room.addLiveEvents([root]);
await room.addLiveEvents([root]);

// Create the thread and wait for it to be initialised
const thread = room.createThread(root.getId()!, root, [], false);
Expand Down
390 changes: 199 additions & 191 deletions spec/unit/room.spec.ts

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5573,11 +5573,13 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
room.currentState.setUnknownStateEvents(stateEvents);
}

const [timelineEvents, threadedEvents] = room.partitionThreadedEvents(matrixEvents);
const [timelineEvents, threadedEvents, unknownRelations] =
room.partitionThreadedEvents(matrixEvents);

this.processAggregatedTimelineEvents(room, timelineEvents);
room.addEventsToTimeline(timelineEvents, true, room.getLiveTimeline());
this.processThreadEvents(room, threadedEvents, true);
unknownRelations.forEach((event) => room.relations.aggregateChildEvent(event));

room.oldState.paginationToken = res.end ?? null;
if (res.chunk.length === 0) {
Expand Down Expand Up @@ -5686,11 +5688,12 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
timeline.getState(EventTimeline.FORWARDS)!.paginationToken = res.end;
}

const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(events);
const [timelineEvents, threadedEvents, unknownRelations] = timelineSet.room.partitionThreadedEvents(events);
timelineSet.addEventsToTimeline(timelineEvents, true, timeline, res.start);
// The target event is not in a thread but process the contextual events, so we can show any threads around it.
this.processThreadEvents(timelineSet.room, threadedEvents, true);
this.processAggregatedTimelineEvents(timelineSet.room, timelineEvents);
unknownRelations.forEach((event) => timelineSet.relations.aggregateChildEvent(event));

// There is no guarantee that the event ended up in "timeline" (we might have switched to a neighbouring
// timeline) - so check the room's index again. On the other hand, there's no guarantee the event ended up
Expand Down Expand Up @@ -6230,14 +6233,15 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const matrixEvents = res.chunk.filter(noUnsafeEventProps).map(this.getEventMapper());

const timelineSet = eventTimeline.getTimelineSet();
const [timelineEvents] = room.partitionThreadedEvents(matrixEvents);
const [timelineEvents, , unknownRelations] = room.partitionThreadedEvents(matrixEvents);
timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
this.processAggregatedTimelineEvents(room, timelineEvents);
this.processThreadRoots(
room,
timelineEvents.filter((it) => it.getServerAggregatedRelation(THREAD_RELATION_TYPE.name)),
false,
);
unknownRelations.forEach((event) => room.relations.aggregateChildEvent(event));

const atEnd = res.end === undefined || res.end === res.start;

Expand Down
1 change: 1 addition & 0 deletions src/models/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export interface IUnsigned {
"transaction_id"?: string;
"invite_room_state"?: StrippedState[];
"m.relations"?: Record<RelationType | string, any>; // No common pattern for aggregated relations
"io.element.relation_thread_id"?: string;
t3chguy marked this conversation as resolved.
Show resolved Hide resolved
}

export interface IThreadBundledRelationship {
Expand Down
84 changes: 72 additions & 12 deletions src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ import { isPollEvent, Poll, PollEvent } from "./poll";
export const KNOWN_SAFE_ROOM_VERSION = "9";
const SAFE_ROOM_VERSIONS = ["1", "2", "3", "4", "5", "6", "7", "8", "9"];

const UNSIGNED_THREAD_ID_FIELD = "io.element.relation_thread_id";

interface IOpts {
/**
* Controls where pending messages appear in a room's timeline.
Expand Down Expand Up @@ -2132,6 +2134,13 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
return this.eventShouldLiveIn(parentEvent, events, roots);
}

if (!event.isRelation()) {
return {
shouldLiveInRoom: true,
shouldLiveInThread: false,
};
}

// Edge case where we know the event is a relation but don't have the parentEvent
if (roots?.has(event.relationEventId!)) {
return {
Expand All @@ -2141,9 +2150,19 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
};
}

// We've exhausted all scenarios, can safely assume that this event should live in the room timeline only
const unsigned = event.getUnsigned();
if (typeof unsigned[UNSIGNED_THREAD_ID_FIELD] === "string") {
return {
shouldLiveInRoom: false,
shouldLiveInThread: true,
threadId: unsigned[UNSIGNED_THREAD_ID_FIELD],
};
}

// We've exhausted all scenarios,
// we cannot assume that it lives in the main timeline as this may be a relation for an unknown thread
t3chguy marked this conversation as resolved.
Show resolved Hide resolved
return {
shouldLiveInRoom: true,
shouldLiveInRoom: false,
shouldLiveInThread: false,
};
}
Expand Down Expand Up @@ -2700,16 +2719,20 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
* @param addLiveEventOptions - addLiveEvent options
* @throws If `duplicateStrategy` is not falsey, 'replace' or 'ignore'.
*/
public addLiveEvents(events: MatrixEvent[], addLiveEventOptions?: IAddLiveEventOptions): void;
public async addLiveEvents(events: MatrixEvent[], addLiveEventOptions?: IAddLiveEventOptions): Promise<void>;
/**
* @deprecated In favor of the overload with `IAddLiveEventOptions`
*/
public addLiveEvents(events: MatrixEvent[], duplicateStrategy?: DuplicateStrategy, fromCache?: boolean): void;
public addLiveEvents(
public async addLiveEvents(
events: MatrixEvent[],
duplicateStrategy?: DuplicateStrategy,
fromCache?: boolean,
): Promise<void>;
public async addLiveEvents(
events: MatrixEvent[],
duplicateStrategyOrOpts?: DuplicateStrategy | IAddLiveEventOptions,
fromCache = false,
): void {
): Promise<void> {
let duplicateStrategy: DuplicateStrategy | undefined = duplicateStrategyOrOpts as DuplicateStrategy;
let timelineWasEmpty: boolean | undefined = false;
if (typeof duplicateStrategyOrOpts === "object") {
Expand Down Expand Up @@ -2760,6 +2783,9 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
timelineWasEmpty,
};

// List of extra events to check for being parents of any relations encountered
const neighbouringEvents = [...events];

for (const event of events) {
// TODO: We should have a filter to say "only add state event types X Y Z to the timeline".
this.processLiveEvent(event);
Expand All @@ -2773,19 +2799,44 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
}
}

const { shouldLiveInRoom, shouldLiveInThread, threadId } = this.eventShouldLiveIn(
let { shouldLiveInRoom, shouldLiveInThread, threadId } = this.eventShouldLiveIn(
event,
events,
neighbouringEvents,
threadRoots,
);

if (!shouldLiveInThread && !shouldLiveInRoom && event.isRelation()) {
try {
const parentEvent = new MatrixEvent(
await this.client.fetchRoomEvent(this.roomId, event.relationEventId!),
);
neighbouringEvents.push(parentEvent);
if (parentEvent.threadRootId) {
threadRoots.add(parentEvent.threadRootId);
const unsigned = event.getUnsigned();
unsigned[UNSIGNED_THREAD_ID_FIELD] = parentEvent.threadRootId;
event.setUnsigned(unsigned);
}

({ shouldLiveInRoom, shouldLiveInThread, threadId } = this.eventShouldLiveIn(
event,
neighbouringEvents,
threadRoots,
));
} catch (e) {
logger.error("Failed to load parent event of unhandled relation", e);
}
}

if (shouldLiveInThread && !eventsByThread[threadId ?? ""]) {
eventsByThread[threadId ?? ""] = [];
}
eventsByThread[threadId ?? ""]?.push(event);

if (shouldLiveInRoom) {
this.addLiveEvent(event, options);
} else if (!shouldLiveInThread && event.isRelation()) {
this.relations.aggregateChildEvent(event);
}
}

Expand All @@ -2796,13 +2847,14 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {

public partitionThreadedEvents(
events: MatrixEvent[],
): [timelineEvents: MatrixEvent[], threadedEvents: MatrixEvent[]] {
): [timelineEvents: MatrixEvent[], threadedEvents: MatrixEvent[], unknownRelations: MatrixEvent[]] {
// Indices to the events array, for readability
const ROOM = 0;
const THREAD = 1;
const UNKNOWN_RELATION = 2;
if (this.client.supportsThreads()) {
const threadRoots = this.findThreadRoots(events);
return events.reduce(
return events.reduce<[MatrixEvent[], MatrixEvent[], MatrixEvent[]]>(
(memo, event: MatrixEvent) => {
const { shouldLiveInRoom, shouldLiveInThread, threadId } = this.eventShouldLiveIn(
event,
Expand All @@ -2819,13 +2871,17 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
memo[THREAD].push(event);
}

if (!shouldLiveInThread && !shouldLiveInRoom) {
memo[UNKNOWN_RELATION].push(event);
}

return memo;
},
[[] as MatrixEvent[], [] as MatrixEvent[]],
[[], [], []],
);
} else {
// When `experimentalThreadSupport` is disabled treat all events as timelineEvents
return [events as MatrixEvent[], [] as MatrixEvent[]];
return [events as MatrixEvent[], [] as MatrixEvent[], [] as MatrixEvent[]];
}
}

Expand All @@ -2838,6 +2894,10 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
if (event.isRelation(THREAD_RELATION_TYPE.name)) {
threadRoots.add(event.relationEventId ?? "");
}
const unsigned = event.getUnsigned();
if (typeof unsigned[UNSIGNED_THREAD_ID_FIELD] === "string") {
threadRoots.add(unsigned[UNSIGNED_THREAD_ID_FIELD]);
}
}
return threadRoots;
}
Expand Down
12 changes: 6 additions & 6 deletions src/sliding-sync-sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ export class SlidingSyncSdk {

if (roomData.invite_state) {
const inviteStateEvents = mapEvents(this.client, room.roomId, roomData.invite_state);
this.injectRoomEvents(room, inviteStateEvents);
await this.injectRoomEvents(room, inviteStateEvents);
if (roomData.initial) {
room.recalculate();
this.client.store.storeRoom(room);
Expand Down Expand Up @@ -700,7 +700,7 @@ export class SlidingSyncSdk {
}
} */

this.injectRoomEvents(room, stateEvents, timelineEvents, roomData.num_live);
await this.injectRoomEvents(room, stateEvents, timelineEvents, roomData.num_live);

// we deliberately don't add ephemeral events to the timeline
room.addEphemeralEvents(ephemeralEvents);
Expand Down Expand Up @@ -747,12 +747,12 @@ export class SlidingSyncSdk {
* @param numLive - the number of events in timelineEventList which just happened,
* supplied from the server.
*/
public injectRoomEvents(
public async injectRoomEvents(
room: Room,
stateEventList: MatrixEvent[],
timelineEventList?: MatrixEvent[],
numLive?: number,
): void {
): Promise<void> {
timelineEventList = timelineEventList || [];
stateEventList = stateEventList || [];
numLive = numLive || 0;
Expand Down Expand Up @@ -811,11 +811,11 @@ export class SlidingSyncSdk {
// if the timeline has any state events in it.
// This also needs to be done before running push rules on the events as they need
// to be decorated with sender etc.
room.addLiveEvents(timelineEventList, {
await room.addLiveEvents(timelineEventList, {
fromCache: true,
});
if (liveTimelineEvents.length > 0) {
room.addLiveEvents(liveTimelineEvents, {
await room.addLiveEvents(liveTimelineEvents, {
fromCache: false,
});
}
Expand Down
6 changes: 3 additions & 3 deletions src/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ export class SyncApi {
},
)
.then(
(res) => {
async (res) => {
if (this._peekRoom !== peekRoom) {
debuglog("Stopped peeking in room %s", peekRoom.roomId);
return;
Expand Down Expand Up @@ -541,7 +541,7 @@ export class SyncApi {
})
.map(this.client.getEventMapper());

peekRoom.addLiveEvents(events);
await peekRoom.addLiveEvents(events);
this.peekPoll(peekRoom, res.end);
},
(err) => {
Expand Down Expand Up @@ -1773,7 +1773,7 @@ export class SyncApi {
// if the timeline has any state events in it.
// This also needs to be done before running push rules on the events as they need
// to be decorated with sender etc.
room.addLiveEvents(timelineEventList || [], {
await room.addLiveEvents(timelineEventList || [], {
fromCache,
timelineWasEmpty,
});
Expand Down