Skip to content

Commit

Permalink
GH-107: Added Parallel flag to Handler.
Browse files Browse the repository at this point in the history
GH-108: Replaced workCh channel with workqueue slice.
  • Loading branch information
jirenius committed Oct 25, 2024
1 parent 7c7d432 commit 107540c
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 38 deletions.
5 changes: 4 additions & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ UnexpectedEnd:
}

func (g group) toString(rname string, tokens []string) string {
if g == nil {
return rname
}
l := len(g)
if l == 0 {
return rname
return ""
}
if l == 1 && g[0].str != "" {
return g[0].str
Expand Down
8 changes: 7 additions & 1 deletion mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,15 @@ func (m *Mux) Handle(pattern string, hf ...Option) {
// AddHandler register a handler for the given resource pattern.
// The pattern used is the same as described for Handle.
func (m *Mux) AddHandler(pattern string, hs Handler) {
var g group
if hs.Parallel {
g = []gpart{}
} else {
g = parseGroup(hs.Group, pattern)
}
h := regHandler{
Handler: hs,
group: parseGroup(hs.Group, pattern),
group: g,
}

m.add(pattern, &h)
Expand Down
6 changes: 3 additions & 3 deletions restest/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
// AssertEqualJSON expects that a and b json marshals into equal values, and
// returns true if they do, otherwise logs a fatal error and returns false.
func AssertEqualJSON(t *testing.T, name string, result, expected interface{}, ctx ...interface{}) bool {
aa, aj := jsonMap(t, result)
bb, bj := jsonMap(t, expected)
aa, aj := jsonMap(result)
bb, bj := jsonMap(expected)

if !reflect.DeepEqual(aa, bb) {
t.Fatalf("expected %s to be:\n\t%s\nbut got:\n\t%s%s", name, bj, aj, ctxString(ctx))
Expand Down Expand Up @@ -117,7 +117,7 @@ func ctxString(ctx []interface{}) string {
return "\nin " + fmt.Sprint(ctx...)
}

func jsonMap(t *testing.T, v interface{}) (interface{}, []byte) {
func jsonMap(v interface{}) (interface{}, []byte) {
var err error
j, err := json.Marshal(v)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions restest/natsrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ type NATSRequest struct {
inb string
}

// NATSRequests represents a slice of requests sent over NATS to the service,
// but that may get responses in undetermined order.
type NATSRequests []*NATSRequest

// Response gets the next pending message that is published to NATS by the
// service.
//
Expand All @@ -19,6 +23,26 @@ func (nr *NATSRequest) Response() *Msg {
return m
}

// Response gets the next pending message that is published to NATS by the
// service, and matches it to one of the requests.
//
// If no message is received within a set amount of time, or if the message is
// not a response to one of the requests, it will log it as a fatal error.
//
// The matching request will be set to nil.
func (nrs NATSRequests) Response(c *MockConn) *Msg {
m := c.GetMsg()
for i := 0; i < len(nrs); i++ {
nr := nrs[i]
if nr != nil && nr.inb == m.Subject {
nrs[i] = nil
return m
}
}
c.t.Fatalf("expected to find request matching response %s, but found none", m.Subject)
return nil
}

// Get sends a get request to the service.
//
// The resource ID, rid, may contain a query part:
Expand Down
53 changes: 42 additions & 11 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ type Handler struct {
// resource name will be used as identifier.
Group string

// Parallel is a flag telling that all requests to the handler may be
// handled in parallel on different goroutines. If set to true, any value in
// Group will be ignored.
Parallel bool

// OnRegister is callback that is to be call when the handler has been
// registered to a service.
//
Expand Down Expand Up @@ -196,7 +201,9 @@ type Service struct {
nc Conn // NATS Server connection
inCh chan *nats.Msg // Channel for incoming nats messages
rwork map[string]*work // map of resource work
workCh chan *work // Resource work channel, listened to by the workers
workqueue []*work // Resource work queue.
workbuf []*work // Underlying buffer of the workqueue
workcond sync.Cond // Cond waited on by workers and signalled when work is added to workqueue

Check failure on line 206 in service.go

View workflow job for this annotation

GitHub Actions / lint

"signalled" is a misspelling of "signaled"
wg sync.WaitGroup // WaitGroup for all workers
mu sync.Mutex // Mutex to protect rwork map
logger logger.Logger // Logger
Expand Down Expand Up @@ -517,6 +524,16 @@ func Group(group string) Option {
})
}

// Parallel sets the parallel flag. All requests for the handler may be handled
// in parallel on different worker goroutines.
//
// If set to true, any value in Group will be ignored.
func Parallel(parallel bool) Option {
return OptionFunc(func(hs *Handler) {
hs.Parallel = parallel
})
}

// OnRegister sets a callback to be called when the handler is registered to a
// service.
//
Expand Down Expand Up @@ -648,14 +665,16 @@ func (s *Service) serve(nc Conn) error {
workCh := make(chan *work, 1)
s.nc = nc
s.inCh = inCh
s.workCh = workCh
s.rwork = make(map[string]*work)
s.workcond = sync.Cond{L: &s.mu}
s.workbuf = make([]*work, s.inChannelSize)
s.workqueue = s.workbuf[:0]
s.rwork = make(map[string]*work, s.inChannelSize)
s.queryTQ = timerqueue.New(s.queryEventExpire, s.queryDuration)

// Start workers
s.wg.Add(s.workerCount)
for i := 0; i < s.workerCount; i++ {
go s.startWorker(s.workCh)
go s.startWorker()
}

atomic.StoreInt32(&s.state, stateStarted)
Expand Down Expand Up @@ -699,7 +718,6 @@ func (s *Service) Shutdown() error {

s.inCh = nil
s.nc = nil
s.workCh = nil

atomic.StoreInt32(&s.state, stateStopped)

Expand All @@ -709,6 +727,11 @@ func (s *Service) Shutdown() error {

// close calls Close on the NATS connection, and closes the incoming channel
func (s *Service) close() {
s.mu.Lock()
s.workqueue = nil
s.mu.Unlock()
s.workcond.Broadcast()

s.nc.Close()
close(s.inCh)
}
Expand Down Expand Up @@ -956,17 +979,25 @@ func (s *Service) runWith(wid string, cb func()) {

s.mu.Lock()
// Get current work queue for the resource
w, ok := s.rwork[wid]
var w *work
var ok bool
if wid != "" {
w, ok = s.rwork[wid]
}
if !ok {
// Create a new work queue and pass it to a worker
w = &work{
s: s,
wid: wid,
queue: []func(){cb},
s: s,
wid: wid,
single: [1]func(){cb},
}
w.queue = w.single[:1]
if wid != "" {
s.rwork[wid] = w
}
s.rwork[wid] = w
s.workqueue = append(s.workqueue, w)
s.mu.Unlock()
s.workCh <- w
s.workcond.Signal()
} else {
// Append callback to existing work queue
w.queue = append(w.queue, cb)
Expand Down
50 changes: 41 additions & 9 deletions test/00service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,7 @@ func TestServiceSetOnServe_ValidCallback_IsCalledOnServe(t *testing.T) {
select {
case <-ch:
case <-time.After(timeoutDuration):
if t == nil {
t.Fatal("expected OnServe callback to be called, but it wasn't")
}
t.Fatal("expected OnServe callback to be called, but it wasn't")
}
})
}
Expand Down Expand Up @@ -257,9 +255,7 @@ func TestServiceWithResource_WithMatchingResource_CallsCallback(t *testing.T) {
select {
case <-ch:
case <-time.After(timeoutDuration):
if t == nil {
t.Fatal("expected WithResource callback to be called, but it wasn't")
}
t.Fatal("expected WithResource callback to be called, but it wasn't")
}
})
}
Expand All @@ -276,9 +272,7 @@ func TestServiceWithGroup_WithMatchingResource_CallsCallback(t *testing.T) {
select {
case <-ch:
case <-time.After(timeoutDuration):
if t == nil {
t.Fatal("expected WithGroup callback to be called, but it wasn't")
}
t.Fatal("expected WithGroup callback to be called, but it wasn't")
}
})
}
Expand Down Expand Up @@ -380,3 +374,41 @@ func TestServiceSetInChannelSize_GreaterThanZero_DoesNotPanic(t *testing.T) {
s.SetInChannelSize(10)
}, nil, restest.WithoutReset)
}

func TestServiceWithParallel_WithMultipleCallsOnSameResource_CallsCallbacksInParallel(t *testing.T) {
ch := make(chan bool)
done := make(chan bool)
runTest(t, func(s *res.Service) {
s.Handle("model",
res.Parallel(true),
res.GetResource(func(r res.GetRequest) {
ch <- true
<-done
r.NotFound()
}),
)
}, func(s *restest.Session) {
// Test getting the same model twice
reqs := restest.NATSRequests{
s.Get("test.model"),
s.Get("test.model"),
s.Get("test.model"),
}

for i := 0; i < len(reqs); i++ {
select {
case <-ch:
case <-time.After(timeoutDuration):
t.Fatal("expected get handler to be called twice in parallel, but it wasn't")
}
}

close(done)

for i := len(reqs); i > 0; i-- {
reqs.
Response(s.MockConn).
AssertError(res.ErrNotFound)
}
})
}
4 changes: 1 addition & 3 deletions test/22query_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,7 @@ func TestInvalidQueryResponse(t *testing.T) {
select {
case <-ch:
case <-time.After(timeoutDuration):
if t == nil {
t.Fatal("expected query request to get a query response, but it timed out")
}
t.Fatal("expected query request to get a query response, but it timed out")
}
if t.Failed() {
t.Logf("failed on test idx %d", i)
Expand Down
36 changes: 26 additions & 10 deletions worker.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,49 @@
package res

type work struct {
s *Service
wid string // Worker ID for the work queue
queue []func() // Callback queue
s *Service
wid string // Worker ID for the work queue
single [1]func()
queue []func() // Callback queue
}

// startWorker starts a new resource worker that will listen for resources to
// process requests on.
func (s *Service) startWorker(ch chan *work) {
for w := range ch {
func (s *Service) startWorker() {
s.mu.Lock()
defer s.mu.Unlock()
defer s.wg.Done()
// workqueue being nil signals we the service is closing
for s.workqueue != nil {
for len(s.workqueue) == 0 {
s.workcond.Wait()
if s.workqueue == nil {
return
}
}
w := s.workqueue[0]
if len(s.workqueue) == 1 {
s.workqueue = s.workbuf[:0]
} else {
s.workqueue = s.workqueue[1:]
}
w.processQueue()
}
s.wg.Done()
}

func (w *work) processQueue() {
var f func()
idx := 0

w.s.mu.Lock()
for len(w.queue) > idx {
f = w.queue[idx]
w.s.mu.Unlock()
idx++
f()
w.s.mu.Lock()
}
// Work complete
delete(w.s.rwork, w.wid)
w.s.mu.Unlock()
// Work complete. Delete if it has a work ID.
if w.wid != "" {
delete(w.s.rwork, w.wid)
}
}

0 comments on commit 107540c

Please sign in to comment.