From 2dd5a5b46ef536ca4c1df088fd0fd16953342509 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 25 Mar 2022 12:40:36 -0500 Subject: [PATCH] Test case to try to reproduce missing state in sync See https://github.com/matrix-org/synapse/issues/12281 --- internal/b/hs_with_application_service.go | 4 + internal/client/client.go | 8 +- tests/msc2716_test.go | 114 ++++++++++++++++++++++ 3 files changed, 122 insertions(+), 4 deletions(-) diff --git a/internal/b/hs_with_application_service.go b/internal/b/hs_with_application_service.go index e3aa059b..2bb7b154 100644 --- a/internal/b/hs_with_application_service.go +++ b/internal/b/hs_with_application_service.go @@ -11,6 +11,10 @@ var BlueprintHSWithApplicationService = MustValidate(Blueprint{ Localpart: "@alice", DisplayName: "Alice", }, + { + Localpart: "@bob", + DisplayName: "Bob", + }, }, ApplicationServices: []ApplicationService{ { diff --git a/internal/client/client.go b/internal/client/client.go index aa81355c..6b0785fa 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -608,7 +608,7 @@ func GjsonEscape(in string) string { // Check that the timeline for `roomID` has an event which passes the check function. func SyncTimelineHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt { return func(clientUserID string, topLevelSyncJSON gjson.Result) error { - err := loopArray( + err := LoopArray( topLevelSyncJSON, "rooms.join."+GjsonEscape(roomID)+".timeline.events", check, ) if err == nil { @@ -637,7 +637,7 @@ func SyncInvitedTo(userID, roomID string) SyncCheckOpt { // - actively being invited to a room. if clientUserID == userID { // active - err := loopArray( + err := LoopArray( topLevelSyncJSON, "rooms.invite."+GjsonEscape(roomID)+".invite_state.events", func(ev gjson.Result) bool { return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "invite" @@ -696,11 +696,11 @@ func SyncLeftFrom(userID, roomID string) SyncCheckOpt { // `check` function returns true for at least one event. func SyncGlobalAccountDataHas(check func(gjson.Result) bool) SyncCheckOpt { return func(clientUserID string, topLevelSyncJSON gjson.Result) error { - return loopArray(topLevelSyncJSON, "account_data.events", check) + return LoopArray(topLevelSyncJSON, "account_data.events", check) } } -func loopArray(object gjson.Result, key string, check func(gjson.Result) bool) error { +func LoopArray(object gjson.Result, key string, check func(gjson.Result) bool) error { array := object.Get(key) if !array.Exists() { return fmt.Errorf("Key %s does not exist", key) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 010dd8a5..89aab995 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/matrix-org/complement/internal/b" @@ -541,6 +542,119 @@ func TestImportHistoricalMessages(t *testing.T) { ) }) + t.Run("try reproduce missing state in sync synapse#12281", func(t *testing.T) { + t.Parallel() + + // Initial /sync for Alice + _, since := alice.MustSync(t, client.SyncReq{}) + + // Bridge creates a room and invites Alice to it. + // Along with some `m.bridge` `initial_state`. + roomID := as.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "name": "the hangout spot", + "room_version": "org.matrix.msc2716v3", + "invite": []string{alice.UserID}, + "initial_state": []map[string]interface{}{ + { + "type": "m.bridge", + "state_key": "", + "content": map[string]interface{}{ + "bridgebot": "@bridgebot:hs1", + "protocol": map[string]interface{}{ + "id": "gitter", + "displayname": "Gitter", + "external_url": "https://gitter.im/", + }, + "channel": map[string]interface{}{ + "id": "123abc", + "displayname": "foo/bar", + "external_url": "https://gitter.im/foo/bar", + }, + }, + }, { + "type": "m.other.state", + "state_key": "", + "content": map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }) + + // We expect to find the `m.bridge` event in the `invite_state` + // TODO: This is commented out because `m.bridge` is not part of `invite_state` + // since = alice.MustSyncUntil(t, client.SyncReq{Since: since}, func(clientUserID string, topLevelSyncJSON gjson.Result) error { + // err := client.LoopArray( + // topLevelSyncJSON, "rooms.invite."+client.GjsonEscape(roomID)+".invite_state.events", + // func(ev gjson.Result) bool { + // return ev.Get("type").Str == "m.bridge" + // }, + // ) + // if err != nil { + // return fmt.Errorf("SyncInvitedTo(%s): %s", roomID, err) + // } + // return nil + // }) + + // As the bot, join the user to the room via `m.room.member` events + // TODO: This results in HTTP 403 : {"errcode":"M_FORBIDDEN","error":"Cannot force another user to join."} + // as.SendEventSynced(t, roomID, b.Event{ + // Type: "m.room.member", + // Sender: as.UserID, + // StateKey: &alice.UserID, + // Content: map[string]interface{}{ + // "membership": "join", + // }, + // }) + + // Alternative to bot joins Alice via m.room.member commented out above. + // Alice joins the room + alice.JoinRoom(t, roomID, nil) + + // Create the "live" event we are going to import our historical events next to + // We can't use as.SendEventSynced(...) because application services can't use the /sync API + eventBefore := b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "foo", + "msgtype": "m.text", + }, + } + txnId := getTxnID("sendLiveEvent-txn") + eventBeforeSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", eventBefore.Type, txnId}, client.WithJSONBody(t, eventBefore.Content)) + eventBeforeSendBody := client.ParseJSON(t, eventBeforeSendRes) + eventIdBefore := client.GetJSONFieldStr(t, eventBeforeSendBody, "event_id") + timeAfterEventBefore := time.Now() + + // Import some historical events + batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 3), + // Status + 200, + ) + + syncCount := 0 + alice.MustSyncUntil(t, client.SyncReq{Since: since}, func(clientUserID string, topLevelSyncJSON gjson.Result) error { + logrus.WithFields(logrus.Fields{ + "topLevelSyncJSON": topLevelSyncJSON.Raw, + }).Error("/sync") + + if syncCount == 3 { + return nil + } + + syncCount++ + return fmt.Errorf("another sync...") + }) + }) + t.Run("TODO: What happens when you point multiple batches at the same insertion event?", func(t *testing.T) { t.Skip("Skipping until implemented") })