diff --git a/group.go b/group.go index 37c7170..a5f62af 100644 --- a/group.go +++ b/group.go @@ -86,9 +86,12 @@ UnexpectedEnd: } func (g group) toString(rname string, tokens []string) string { + if g == nil { + return rname + } l := len(g) if l == 0 { - return rname + return "" } if l == 1 && g[0].str != "" { return g[0].str diff --git a/mux.go b/mux.go index 7bea8ce..58957fa 100644 --- a/mux.go +++ b/mux.go @@ -200,9 +200,15 @@ func (m *Mux) Handle(pattern string, hf ...Option) { // AddHandler register a handler for the given resource pattern. // The pattern used is the same as described for Handle. func (m *Mux) AddHandler(pattern string, hs Handler) { + var g group + if hs.Parallel { + g = []gpart{} + } else { + g = parseGroup(hs.Group, pattern) + } h := regHandler{ Handler: hs, - group: parseGroup(hs.Group, pattern), + group: g, } m.add(pattern, &h) diff --git a/restest/assert.go b/restest/assert.go index 1f323f5..b247f02 100644 --- a/restest/assert.go +++ b/restest/assert.go @@ -12,8 +12,8 @@ import ( // AssertEqualJSON expects that a and b json marshals into equal values, and // returns true if they do, otherwise logs a fatal error and returns false. func AssertEqualJSON(t *testing.T, name string, result, expected interface{}, ctx ...interface{}) bool { - aa, aj := jsonMap(t, result) - bb, bj := jsonMap(t, expected) + aa, aj := jsonMap(result) + bb, bj := jsonMap(expected) if !reflect.DeepEqual(aa, bb) { t.Fatalf("expected %s to be:\n\t%s\nbut got:\n\t%s%s", name, bj, aj, ctxString(ctx)) @@ -117,7 +117,7 @@ func ctxString(ctx []interface{}) string { return "\nin " + fmt.Sprint(ctx...) } -func jsonMap(t *testing.T, v interface{}) (interface{}, []byte) { +func jsonMap(v interface{}) (interface{}, []byte) { var err error j, err := json.Marshal(v) if err != nil { diff --git a/restest/natsrequest.go b/restest/natsrequest.go index bf5b272..5f9233c 100644 --- a/restest/natsrequest.go +++ b/restest/natsrequest.go @@ -8,6 +8,10 @@ type NATSRequest struct { inb string } +// NATSRequests represents a slice of requests sent over NATS to the service, +// but that may get responses in undetermined order. +type NATSRequests []*NATSRequest + // Response gets the next pending message that is published to NATS by the // service. // @@ -19,6 +23,26 @@ func (nr *NATSRequest) Response() *Msg { return m } +// Response gets the next pending message that is published to NATS by the +// service, and matches it to one of the requests. +// +// If no message is received within a set amount of time, or if the message is +// not a response to one of the requests, it will log it as a fatal error. +// +// The matching request will be set to nil. +func (nrs NATSRequests) Response(c *MockConn) *Msg { + m := c.GetMsg() + for i := 0; i < len(nrs); i++ { + nr := nrs[i] + if nr != nil && nr.inb == m.Subject { + nrs[i] = nil + return m + } + } + c.t.Fatalf("expected to find request matching response %s, but found none", m.Subject) + return nil +} + // Get sends a get request to the service. // // The resource ID, rid, may contain a query part: diff --git a/service.go b/service.go index 107819a..3a2775b 100644 --- a/service.go +++ b/service.go @@ -137,6 +137,11 @@ type Handler struct { // resource name will be used as identifier. Group string + // Parallel is a flag telling that all requests to the handler may be + // handled in parallel on different goroutines. If set to true, any value in + // Group will be ignored. + Parallel bool + // OnRegister is callback that is to be call when the handler has been // registered to a service. // @@ -196,7 +201,9 @@ type Service struct { nc Conn // NATS Server connection inCh chan *nats.Msg // Channel for incoming nats messages rwork map[string]*work // map of resource work - workCh chan *work // Resource work channel, listened to by the workers + workqueue []*work // Resource work queue. + workbuf []*work // Underlying buffer of the workqueue + workcond sync.Cond // Cond waited on by workers and signaled when work is added to workqueue wg sync.WaitGroup // WaitGroup for all workers mu sync.Mutex // Mutex to protect rwork map logger logger.Logger // Logger @@ -517,6 +524,16 @@ func Group(group string) Option { }) } +// Parallel sets the parallel flag. All requests for the handler may be handled +// in parallel on different worker goroutines. +// +// If set to true, any value in Group will be ignored. +func Parallel(parallel bool) Option { + return OptionFunc(func(hs *Handler) { + hs.Parallel = parallel + }) +} + // OnRegister sets a callback to be called when the handler is registered to a // service. // @@ -648,14 +665,16 @@ func (s *Service) serve(nc Conn) error { workCh := make(chan *work, 1) s.nc = nc s.inCh = inCh - s.workCh = workCh - s.rwork = make(map[string]*work) + s.workcond = sync.Cond{L: &s.mu} + s.workbuf = make([]*work, s.inChannelSize) + s.workqueue = s.workbuf[:0] + s.rwork = make(map[string]*work, s.inChannelSize) s.queryTQ = timerqueue.New(s.queryEventExpire, s.queryDuration) // Start workers s.wg.Add(s.workerCount) for i := 0; i < s.workerCount; i++ { - go s.startWorker(s.workCh) + go s.startWorker() } atomic.StoreInt32(&s.state, stateStarted) @@ -699,7 +718,6 @@ func (s *Service) Shutdown() error { s.inCh = nil s.nc = nil - s.workCh = nil atomic.StoreInt32(&s.state, stateStopped) @@ -709,6 +727,11 @@ func (s *Service) Shutdown() error { // close calls Close on the NATS connection, and closes the incoming channel func (s *Service) close() { + s.mu.Lock() + s.workqueue = nil + s.mu.Unlock() + s.workcond.Broadcast() + s.nc.Close() close(s.inCh) } @@ -956,17 +979,25 @@ func (s *Service) runWith(wid string, cb func()) { s.mu.Lock() // Get current work queue for the resource - w, ok := s.rwork[wid] + var w *work + var ok bool + if wid != "" { + w, ok = s.rwork[wid] + } if !ok { // Create a new work queue and pass it to a worker w = &work{ - s: s, - wid: wid, - queue: []func(){cb}, + s: s, + wid: wid, + single: [1]func(){cb}, + } + w.queue = w.single[:1] + if wid != "" { + s.rwork[wid] = w } - s.rwork[wid] = w + s.workqueue = append(s.workqueue, w) s.mu.Unlock() - s.workCh <- w + s.workcond.Signal() } else { // Append callback to existing work queue w.queue = append(w.queue, cb) diff --git a/test/00service_test.go b/test/00service_test.go index 9edc362..d1770cf 100644 --- a/test/00service_test.go +++ b/test/00service_test.go @@ -203,9 +203,7 @@ func TestServiceSetOnServe_ValidCallback_IsCalledOnServe(t *testing.T) { select { case <-ch: case <-time.After(timeoutDuration): - if t == nil { - t.Fatal("expected OnServe callback to be called, but it wasn't") - } + t.Fatal("expected OnServe callback to be called, but it wasn't") } }) } @@ -257,9 +255,7 @@ func TestServiceWithResource_WithMatchingResource_CallsCallback(t *testing.T) { select { case <-ch: case <-time.After(timeoutDuration): - if t == nil { - t.Fatal("expected WithResource callback to be called, but it wasn't") - } + t.Fatal("expected WithResource callback to be called, but it wasn't") } }) } @@ -276,9 +272,7 @@ func TestServiceWithGroup_WithMatchingResource_CallsCallback(t *testing.T) { select { case <-ch: case <-time.After(timeoutDuration): - if t == nil { - t.Fatal("expected WithGroup callback to be called, but it wasn't") - } + t.Fatal("expected WithGroup callback to be called, but it wasn't") } }) } @@ -380,3 +374,41 @@ func TestServiceSetInChannelSize_GreaterThanZero_DoesNotPanic(t *testing.T) { s.SetInChannelSize(10) }, nil, restest.WithoutReset) } + +func TestServiceWithParallel_WithMultipleCallsOnSameResource_CallsCallbacksInParallel(t *testing.T) { + ch := make(chan bool) + done := make(chan bool) + runTest(t, func(s *res.Service) { + s.Handle("model", + res.Parallel(true), + res.GetResource(func(r res.GetRequest) { + ch <- true + <-done + r.NotFound() + }), + ) + }, func(s *restest.Session) { + // Test getting the same model twice + reqs := restest.NATSRequests{ + s.Get("test.model"), + s.Get("test.model"), + s.Get("test.model"), + } + + for i := 0; i < len(reqs); i++ { + select { + case <-ch: + case <-time.After(timeoutDuration): + t.Fatal("expected get handler to be called twice in parallel, but it wasn't") + } + } + + close(done) + + for i := len(reqs); i > 0; i-- { + reqs. + Response(s.MockConn). + AssertError(res.ErrNotFound) + } + }) +} diff --git a/test/22query_event_test.go b/test/22query_event_test.go index 63bdeb5..7fb33a1 100644 --- a/test/22query_event_test.go +++ b/test/22query_event_test.go @@ -580,9 +580,7 @@ func TestInvalidQueryResponse(t *testing.T) { select { case <-ch: case <-time.After(timeoutDuration): - if t == nil { - t.Fatal("expected query request to get a query response, but it timed out") - } + t.Fatal("expected query request to get a query response, but it timed out") } if t.Failed() { t.Logf("failed on test idx %d", i) diff --git a/worker.go b/worker.go index c012e49..c9e5e52 100644 --- a/worker.go +++ b/worker.go @@ -1,25 +1,40 @@ package res type work struct { - s *Service - wid string // Worker ID for the work queue - queue []func() // Callback queue + s *Service + wid string // Worker ID for the work queue + single [1]func() + queue []func() // Callback queue } // startWorker starts a new resource worker that will listen for resources to // process requests on. -func (s *Service) startWorker(ch chan *work) { - for w := range ch { +func (s *Service) startWorker() { + s.mu.Lock() + defer s.mu.Unlock() + defer s.wg.Done() + // workqueue being nil signals we the service is closing + for s.workqueue != nil { + for len(s.workqueue) == 0 { + s.workcond.Wait() + if s.workqueue == nil { + return + } + } + w := s.workqueue[0] + if len(s.workqueue) == 1 { + s.workqueue = s.workbuf[:0] + } else { + s.workqueue = s.workqueue[1:] + } w.processQueue() } - s.wg.Done() } func (w *work) processQueue() { var f func() idx := 0 - w.s.mu.Lock() for len(w.queue) > idx { f = w.queue[idx] w.s.mu.Unlock() @@ -27,7 +42,8 @@ func (w *work) processQueue() { f() w.s.mu.Lock() } - // Work complete - delete(w.s.rwork, w.wid) - w.s.mu.Unlock() + // Work complete. Delete if it has a work ID. + if w.wid != "" { + delete(w.s.rwork, w.wid) + } }