Skip to content

Commit

Permalink
GH-257: Added tests for cache unsubscribe and removal.
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed Jul 3, 2024
1 parent 2b16e64 commit 3c09e14
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 16 deletions.
3 changes: 2 additions & 1 deletion server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type Config struct {
ResetThrottle int `json:"resetThrottle"`
ReferenceThrottle int `json:"referenceThrottle"`

NoHTTP bool `json:"-"` // Disable start of the HTTP server. Used for testing
NoHTTP bool `json:"-"` // Disable start of the HTTP server. Used for testing
NoUnsubscribeDelay bool `json:"-"` // Set remove and unsubscribe from cache delay to 0. Used for testing.

scheme string
netAddr string
Expand Down
6 changes: 5 additions & 1 deletion server/mqClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import (
)

func (s *Service) initMQClient() {
s.cache = rescache.NewCache(s.mq, CacheWorkers, s.cfg.ResetThrottle, UnsubscribeDelay, s.logger, s.metrics)
unsubdelay := UnsubscribeDelay
if s.cfg.NoUnsubscribeDelay {
unsubdelay = 0
}
s.cache = rescache.NewCache(s.mq, CacheWorkers, s.cfg.ResetThrottle, unsubdelay, s.logger, s.metrics)
}

// startMQClients creates a connection to the messaging system.
Expand Down
19 changes: 17 additions & 2 deletions server/rescache/rescache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type Cache struct {
unsubQueue *timerqueue.Queue
resetSub mq.Unsubscriber

// Handlers for testing
onUnsubscribe func(rid string)

// Deprecated behavior logging
depMutex sync.Mutex
depLogged map[string]featureType
Expand Down Expand Up @@ -81,13 +84,21 @@ func NewCache(mq mq.Client, workers int, resetThrottle int, unsubscribeDelay tim
}
}

// SetLogger sets the logger
// SetLogger sets the logger.
// Must be called before Start is called.
func (c *Cache) SetLogger(l logger.Logger) {
c.logger = l
}

// SetOnUnsubscribe sets a callback that is called when a resource is removed
// from the cache and unsubscribed. Used for testing purpose.
// Must be called before Start is called.
func (c *Cache) SetOnUnsubscribe(cb func(rid string)) {
c.onUnsubscribe = cb
}

// Start will initialize the cache, subscribing to global events
// It is assumed mq.Connect has already been called
// It is assumed mq.Connect has already been called.
func (c *Cache) Start() error {
if c.started {
return errors.New("cache: already started")
Expand Down Expand Up @@ -323,6 +334,10 @@ func (c *Cache) mqUnsubscribe(v interface{}) {
if c.metrics != nil {
c.metrics.CacheResources.Add(-1)
}

if c.onUnsubscribe != nil {
c.onUnsubscribe(eventSub.ResourceName)
}
}

func (c *Cache) handleSystemReset(payload []byte) {
Expand Down
16 changes: 7 additions & 9 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,14 @@ func (s *Service) SetLogger(l logger.Logger) *Service {

// SetOnWSClose sets a callback to be calld when a websocket connection is
// closed. Used for testing.
func (s *Service) SetOnWSClose(cb func(ws *websocket.Conn)) *Service {
s.mu.Lock()
defer s.mu.Unlock()

if s.stop != nil {
panic("SetOnWSClose must be called before starting server")
}

func (s *Service) SetOnWSClose(cb func(ws *websocket.Conn)) {
s.onWSClose = cb
return s
}

// SetOnCahceUnsubscribe sets a callback that is called when a resource is

Check failure on line 87 in server/service.go

View workflow job for this annotation

GitHub Actions / lint

comment on exported method SetOnUnsubscribe should be of the form "SetOnUnsubscribe ..." (ST1020)
// removed from the cache and unsubscribed. Used for testing.
func (s *Service) SetOnUnsubscribe(cb func(rid string)) {
s.cache.SetOnUnsubscribe(cb)
}

// Logf writes a formatted log message
Expand Down
64 changes: 64 additions & 0 deletions test/01subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,3 +551,67 @@ func TestSubscribe_WithThrottleOnNestedReferences_ThrottlesRequests(t *testing.T
c.ReferenceThrottle = referenceThrottle
})
}

// Test that two connections subscribing to the same model, both waiting for the
// response of the get request, gets the returned resource.
func TestSubscribe_MultipleSubscribersOnPendingModel_ModelSentToAllSubscribers(t *testing.T) {
model := resourceData("test.model")

runTest(t, func(s *Session) {
c1 := s.Connect()
c2 := s.Connect()
// Subscribe with client 1
creq1 := c1.Request("subscribe.test.model", nil)
mreqs1 := s.GetParallelRequests(t, 2)
// Handle access request
mreqs1.GetRequest(t, "access.test.model").
RespondSuccess(json.RawMessage(`{"get":true}`))
getreq := mreqs1.GetRequest(t, "get.test.model")

// Subscribe with client 2
creq2 := c2.Request("subscribe.test.model", nil)
// Handle access request
s.GetRequest(t).
AssertSubject(t, "access.test.model").
RespondSuccess(json.RawMessage(`{"get":true}`))

// Handle get request
getreq.RespondSuccess(json.RawMessage(`{"model":` + model + `}`))

// Validate client 1 response and validate
creq1.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`}}`))
// Validate client 2 response and validate
creq2.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`}}`))
})
}

// Test that two connections subscribing to the same model, both waiting for the
// response of the get request, gets the returned error.
func TestSubscribe_MultipleSubscribersOnPendingError_ErrorSentToAllSubscribers(t *testing.T) {
runTest(t, func(s *Session) {
c1 := s.Connect()
c2 := s.Connect()
// Subscribe with client 1
creq1 := c1.Request("subscribe.test.model", nil)
mreqs1 := s.GetParallelRequests(t, 2)
// Handle access request
mreqs1.GetRequest(t, "access.test.model").
RespondSuccess(json.RawMessage(`{"get":true}`))
getreq := mreqs1.GetRequest(t, "get.test.model")

// Subscribe with client 2
creq2 := c2.Request("subscribe.test.model", nil)
// Handle access request
s.GetRequest(t).
AssertSubject(t, "access.test.model").
RespondSuccess(json.RawMessage(`{"get":true}`))

// Handle get request
getreq.RespondError(reserr.ErrNotFound)

// Validate client 1 response and validate
creq1.GetResponse(t).AssertError(t, reserr.ErrNotFound)
// Validate client 2 response and validate
creq2.GetResponse(t).AssertError(t, reserr.ErrNotFound)
})
}
31 changes: 31 additions & 0 deletions test/03unsubscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/resgateio/resgate/server"
"github.com/resgateio/resgate/server/reserr"
)

Expand Down Expand Up @@ -266,3 +267,33 @@ func TestUnsubscribe_WithInvalidPayload_DoesNotUnsubscribesModel(t *testing.T) {
})
}
}

// Test that a model that no client subscribes to gets unsubscribed and removed
// from the cache.
func TestUnsubscribe_Model_UnsubcribesFromCache(t *testing.T) {
runTest(t, func(s *Session) {
c := s.Connect()
subscribeToTestModel(t, s, c)

// Call unsubscribe
c.Request("unsubscribe.test.model", nil).GetResponse(t)
s.AssertUnsubscribe("test.model")
}, func(cfg *server.Config) {
cfg.NoUnsubscribeDelay = true
})
}

// Test that a linked model resources that no client subscribes to gets
// unsubscribed and removed from the cache.
func TestUnsubscribe_LinkedModel_UnsubscribesFromCache(t *testing.T) {
runTest(t, func(s *Session) {
c := s.Connect()
subscribeToTestModelParent(t, s, c, false)

// Call unsubscribe
c.Request("unsubscribe.test.model.parent", nil).GetResponse(t)
s.AssertUnsubscribe("test.model", "test.model.parent")
}, func(cfg *server.Config) {
cfg.NoUnsubscribeDelay = true
})
}
49 changes: 49 additions & 0 deletions test/33metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func TestMetrics_CacheResources_ExpectedGaugeValues(t *testing.T) {
table := []struct {
Name string
Actions func(t *testing.T, s *Session, c *Conn)
Config func(cfg *server.Config)
ExpectedResources int
ExpectedSubscriptions int
}{
Expand Down Expand Up @@ -408,6 +409,51 @@ func TestMetrics_CacheResources_ExpectedGaugeValues(t *testing.T) {
ExpectedResources: 2,
ExpectedSubscriptions: 1,
},

{
Name: "unsubscribe simple model and wait for cache unsubscribe",
Actions: func(t *testing.T, s *Session, c *Conn) {
subscribeToTestModel(t, s, c)
c.Request("unsubscribe.test.model", nil).GetResponse(t)
s.AssertUnsubscribe("test.model")
c.AssertNoEvent(t, "test.model")
},
Config: func(cfg *server.Config) {
cfg.NoUnsubscribeDelay = true
},
ExpectedResources: 0,
ExpectedSubscriptions: 0,
},
{
Name: "unsubscribe parent model and wait for cache unsubscribe",
Actions: func(t *testing.T, s *Session, c *Conn) {
subscribeToTestModelParent(t, s, c, false)
c.Request("unsubscribe.test.model.parent", nil).GetResponse(t)
s.AwaitUnsubscribe()
s.AwaitUnsubscribe()
c.AssertNoEvent(t, "test.model.parent")
},
Config: func(cfg *server.Config) {
cfg.NoUnsubscribeDelay = true
},
ExpectedResources: 0,
ExpectedSubscriptions: 0,
},
{
Name: "unsubscribe overlapping parent model and wait for cache unsubscribe",
Actions: func(t *testing.T, s *Session, c *Conn) {
subscribeToTestModel(t, s, c)
subscribeToTestModelParent(t, s, c, true)
c.Request("unsubscribe.test.model.parent", nil).GetResponse(t)
c.AssertNoEvent(t, "test.model.parent")
s.AssertUnsubscribe("test.model.parent")
},
Config: func(cfg *server.Config) {
cfg.NoUnsubscribeDelay = true
},
ExpectedResources: 1,
ExpectedSubscriptions: 1,
},
}
for _, l := range table {
runNamedTest(t, l.Name, func(s *Session) {
Expand All @@ -420,6 +466,9 @@ func TestMetrics_CacheResources_ExpectedGaugeValues(t *testing.T) {
})
}, func(cfg *server.Config) {
cfg.MetricsPort = 8090
if l.Config != nil {
l.Config(cfg)
}
})
}
}
68 changes: 65 additions & 3 deletions test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sort"
"testing"
"time"

Expand All @@ -27,9 +28,10 @@ var (
type Session struct {
t *testing.T
*NATSTestClient
s *server.Service
conns map[*Conn]struct{}
dcCh chan struct{}
s *server.Service
conns map[*Conn]struct{}
dcCh chan struct{}
unsubs chan string
*CountLogger
}

Expand All @@ -49,6 +51,7 @@ func setup(t *testing.T, cfgs ...func(*server.Config)) *Session {
s: serv,
conns: make(map[*Conn]struct{}),
CountLogger: l,
unsubs: make(chan string, 256),
}

// Set on WS close handler to synchronize tests with WebSocket disconnects.
Expand All @@ -59,6 +62,9 @@ func setup(t *testing.T, cfgs ...func(*server.Config)) *Session {
close(ch)
}
})
serv.SetOnUnsubscribe(func(rid string) {
s.unsubs <- rid
})

if err := serv.Start(); err != nil {
panic("test: failed to start server: " + err.Error())
Expand Down Expand Up @@ -210,6 +216,62 @@ func (s *Session) MetricsHTTPRequest(opts ...func(r *http.Request)) *http.Respon
return rr.Result()
}

// AssertUnsubscribe awaits for one or more resources to be unsubscribed by the
// cache, and asserts that they match the provided resource IDs.
func (s *Session) AssertUnsubscribe(rids ...string) *Session {
if len(rids) == 0 {
return s
}

var rs []string
for count := 0; count < len(rids); count++ {
select {
case r := <-s.unsubs:
rs = append(rs, r)
case <-time.After(3 * time.Second):
if s.t != nil {
s.t.Fatalf("expected %d resource(s) to be unsubscribed from cache, but got %d", len(rids), count)
} else {
panic("test: assert unsubscribe called outside of test")
}
}
}

sort.StringSlice(rs).Sort()
sort.StringSlice(rids).Sort()

NextRID:
for _, rid := range rids {
for _, r := range rs {
if rid == r {
continue NextRID
}
}
if s.t != nil {
s.t.Fatalf("expected unsubscribed resources:\n\t%#v\nbut got :\n\t%#v", rids, rs)
} else {
panic("test: assert unsubscribe called outside of test")
}
}
return s
}

// AwaitUnsubscribe awaits for a resource to be unsubscribed by the cache,
// returning the resource ID.
func (s *Session) AwaitUnsubscribe() string {
var rid string
select {
case rid = <-s.unsubs:
case <-time.After(3 * time.Second):
if s.t != nil {
s.t.Fatalf("expected a cache unsubscribe but got none.")
} else {
panic("test: await unsubscribe timeout")
}
}
return rid
}

func teardown(s *Session) {
for conn := range s.conns {
err := conn.Error()
Expand Down

0 comments on commit 3c09e14

Please sign in to comment.