Skip to content

Commit

Permalink
Add TestToDeviceMessagesAreProcessedInOrder
Browse files Browse the repository at this point in the history
Fixes #35
  • Loading branch information
kegsay committed Jun 11, 2024
1 parent 83b5541 commit 8f3ad97
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 1 deletion.
2 changes: 1 addition & 1 deletion internal/api/rust/rust.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (c *RustClient) TrySendMessage(t ct.TestLike, roomID, text string) (eventID
if ev == nil {
continue
}
if ev.Text == text && ev.ID != "" {
if ev.Text == text && ev.Sender == c.userID && ev.ID != "" {
// if we haven't seen this event yet, assign the return arg and signal that
// the function should unblock. It's important to only close the channel once
// else this will panic on the 2nd call.
Expand Down
22 changes: 22 additions & 0 deletions tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ func WithAccessToken(accessToken string) func(*api.ClientCreationOpts) {
}
}

// BaseClient is a Complement client along with type information for the HS / language the client
// is associated with.
type BaseClient struct {
*client.CSAPI
ClientType api.ClientType
// TODO : Opts
}

// TestContext provides a consistent set of variables which most tests will need access to.
type TestContext struct {
Deployment *deploy.SlidingSyncDeployment
Expand Down Expand Up @@ -201,6 +209,20 @@ func (c *TestContext) WithClientSyncing(t *testing.T, clientType api.ClientType,
callback(clientUnderTest)
}

func (c *TestContext) WithClientsSyncing(t *testing.T, clients []BaseClient, callback func(clients []api.Client), options ...func(*api.ClientCreationOpts)) {
t.Helper()
cryptoClients := make([]api.Client, len(clients))
for i, cli := range clients {
cryptoClients[i] = c.MustLoginClient(t, cli.CSAPI, cli.ClientType)
defer cryptoClients[i].Close(t)
}
for _, cli := range cryptoClients {
stopSyncing := cli.MustStartSyncing(t)
defer stopSyncing()
}
callback(cryptoClients)
}

// MustCreateMultiprocessClient creates a new RPC process and instructs it to create a client given by the client creation options.
func (c *TestContext) MustCreateMultiprocessClient(t *testing.T, lang api.ClientTypeLang, opts api.ClientCreationOpts) api.Client {
t.Helper()
Expand Down
124 changes: 124 additions & 0 deletions tests/to_device_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,127 @@ func TestToDeviceMessagesArentLostWhenKeysQueryFails(t *testing.T) {

})
}

// Regression test for https://github.com/element-hq/element-web/issues/25723
//
// This test doesn't ensure that the messages are processed in-order (as we cannot
// introspect that in a platform agnostic way) but it does cause 100s of to-device
// messages to be sent to the client in one go. If clients process these 100s of
// messages out of order, it will cause decryption errors, hence it serves as a
// canary that something is wrong.

This comment has been minimized.

//
// This test does this by:
// - Alice in a public encrypted room on her own with rotation_period_msgs set to 1.
// - Block Alice's /sync
// - Create 4 new users and join them to the encrypted room.
// - Send 40 messages as each user.
// - This will cause 40x4=160 to-device messages due to the low rotation period msgs value.
// - Unblock Alice's /sync
// - Ensure Alice can decrypt every single event.
//
// Both Sliding Sync and Sync v2 return to-device msgs in batches of 100, so going much above
// 100 here isn't going to do much. We do a good chunk above it (160) just in case the client
// is /syncing before processing the last response, but we also don't want to send too much
// data as it makes this test take a long time to complete.
//
// This is quite a complex stress test so it's possible for this test to fail for reasons
// unrelated to processing out-of-order e.g it will cause fallback keys for alice to be used.
func TestToDeviceMessagesAreProcessedInOrder(t *testing.T) {
numClients := 4
numMsgsPerClient := 40
ForEachClientType(t, func(t *testing.T, clientType api.ClientType) {
tc := CreateTestContext(t, clientType)
roomID := tc.CreateNewEncryptedRoom(
t, tc.Alice, EncRoomOptions.RotationPeriodMsgs(1), EncRoomOptions.PresetPublicChat(),
)
// intercept /sync just so we can observe the number of to-device msgs coming down.
// We also synchronise on this to know when the client has received the to-device msgs
callbackURL, close := deploy.NewCallbackServer(t, tc.Deployment, func(cd deploy.CallbackData) {
// try v2 sync then SS
toDeviceEvents := gjson.ParseBytes(cd.ResponseBody).Get("to_device.events").Array()
if len(toDeviceEvents) == 0 {
toDeviceEvents = gjson.ParseBytes(cd.ResponseBody).Get("extensions.to_device.events").Array()
}
if len(toDeviceEvents) > 0 {
t.Logf("sniffed %d to_device events down /sync", len(toDeviceEvents))
}
})
defer close()
var timelineEvents = []struct {
ID string
Body string
}{}
tc.WithAliceSyncing(t, func(alice api.Client) {
// Block Alice's /sync
tc.Deployment.WithMITMOptions(t, map[string]interface{}{
"statuscode": map[string]interface{}{
"return_status": http.StatusGatewayTimeout,
"block_request": true,
"filter": "~u .*/sync.* ~hq " + alice.CurrentAccessToken(t),
},
"callback": map[string]interface{}{
"callback_url": callbackURL,
"filter": "~u .*/sync.* ~hq " + alice.CurrentAccessToken(t),
},
}, func() {
// create 10 users and join the room
baseClients := make([]BaseClient, numClients)
for i := range baseClients {
baseClients[i] = BaseClient{
CSAPI: tc.Deployment.Register(t, clientType.HS, helpers.RegistrationOpts{
LocalpartSuffix: "ilikebots",
Password: "complement-crypto-password",
}),
ClientType: clientType,
}
baseClients[i].MustJoinRoom(t, roomID, []string{clientType.HS})
}
// send 30 messages as each user (interleaved)
tc.WithClientsSyncing(t, baseClients, func(clients []api.Client) {
for i := 0; i < numMsgsPerClient; i++ {
for _, c := range clients {
body := fmt.Sprintf("Message %d", i+1)
eventID := c.SendMessage(t, roomID, body)
timelineEvents = append(timelineEvents, struct {
ID string
Body string
}{
ID: eventID,
Body: body,
})
}
}
})
t.Logf("sent %d timeline events", len(timelineEvents))
})
// Alice's /sync is unblocked, wait until we see the last event.
// Re-add the callback server TODO: allow composing see https://github.com/matrix-org/complement-crypto/issues/68
tc.Deployment.WithMITMOptions(t, map[string]interface{}{
"callback": map[string]interface{}{
"callback_url": callbackURL,
"filter": "~u .*/sync.* ~hq " + alice.CurrentAccessToken(t),
},
}, func() {
lastTimelineEvent := timelineEvents[len(timelineEvents)-1]
alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(lastTimelineEvent.ID)).Waitf(
// wait a while here as we need to wait for both /sync to retry and a large response
// to be processed.
t, 20*time.Second, "did not see latest timeline event %s", lastTimelineEvent.ID,
)
// now verify we can decrypt all the events
time.Sleep(10 * time.Second)
// backpaginate 10 times. We don't do a single huge backpagination call because
// this can cause failures on JS "Promise was collected".
for i := 0; i < 10; i++ {
alice.MustBackpaginate(t, roomID, len(timelineEvents)/10)
}
for i := len(timelineEvents) - 1; i >= 0; i-- {
nextTimelineEvent := timelineEvents[i]
ev := alice.MustGetEvent(t, roomID, nextTimelineEvent.ID)
must.Equal(t, ev.FailedToDecrypt, false, "failed to decrypt event ID "+nextTimelineEvent.ID)
must.Equal(t, ev.Text, nextTimelineEvent.Body, "failed to decrypt body of event "+nextTimelineEvent.ID)
}
})
})
})
}

0 comments on commit 8f3ad97

Please sign in to comment.