Skip to content

Commit

Permalink
Filter out unsubscribed documents key in DocEvent (#463)
Browse files Browse the repository at this point in the history
  • Loading branch information
chacha912 authored Feb 15, 2023
1 parent c42b11d commit 61e6953
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 1 deletion.
7 changes: 6 additions & 1 deletion server/backend/sync/memory/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,15 @@ func (m *PubSub) Publish(
)
}

watchDocEvent := sync.DocEvent{
Type: event.Type,
Publisher: event.Publisher,
DocumentKeys: []key.Key{docKey},
}
// NOTE: When a subscription is being closed by a subscriber,
// the subscriber may not receive messages.
select {
case sub.Events() <- event:
case sub.Events() <- watchDocEvent:
case <-gotime.After(100 * gotime.Millisecond):
logging.From(ctx).Warnf(
`Publish(%s,%s) to %s timeout`,
Expand Down
79 changes: 79 additions & 0 deletions test/integration/peer_awareness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,83 @@ func TestPeerAwareness(t *testing.T) {

assert.Equal(t, expected, responsePairs)
})

t.Run("Watch multiple documents test", func(t *testing.T) {
ctx := context.Background()

d1 := document.New(key.Key(t.Name()))
d2 := document.New(key.Key(t.Name()))
d3 := document.New(key.Key(t.Name() + "2"))
assert.NoError(t, c1.Attach(ctx, d1))
defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }()

var expected []watchResponsePair
var responsePairs []watchResponsePair
wgEvents := sync.WaitGroup{}
wgEvents.Add(1)

// starting to watch a document
watch1Ctx, cancel1 := context.WithCancel(ctx)
defer cancel1()
wrch, err := c1.Watch(watch1Ctx, d1)
assert.NoError(t, err)
go func() {
defer func() {
wgEvents.Done()
}()
for {
select {
case <-time.After(time.Second):
assert.Fail(t, "timeout")
return
case wr := <-wrch:
if wr.Err != nil {
assert.Fail(t, "unexpected stream closing", wr.Err)
return
}

if wr.Type == client.PeersChanged {
peers := wr.PeersMapByDoc[d1.Key().String()]
responsePairs = append(responsePairs, watchResponsePair{
Type: wr.Type,
Peers: peers,
})

if len(peers) == 1 {
return
}
}
}
}
}()

// 01. PeersChanged is triggered when another client watches the document
expected = append(expected, watchResponsePair{
Type: client.PeersChanged,
Peers: map[string]types.Presence{
c1.ID().String(): c1.Presence(),
c2.ID().String(): c2.Presence(),
},
})
assert.NoError(t, c2.Attach(ctx, d2))
defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }()
assert.NoError(t, c2.Attach(ctx, d3))
defer func() { assert.NoError(t, c2.Detach(ctx, d3)) }()
watch2Ctx, cancel2 := context.WithCancel(ctx)
_, err = c2.Watch(watch2Ctx, d2, d3)
assert.NoError(t, err)

// 02. PeersChanged is triggered when another client closes the watch
expected = append(expected, watchResponsePair{
Type: client.PeersChanged,
Peers: map[string]types.Presence{
c1.ID().String(): c1.Presence(),
},
})
cancel2()

wgEvents.Wait()

assert.Equal(t, expected, responsePairs)
})
}

0 comments on commit 61e6953

Please sign in to comment.