Skip to content

Commit

Permalink
refactor: spawn only N consumers per queue (#35)
Browse files Browse the repository at this point in the history
* refactor: spawn only N consumers per queue

* Update README.md

* chore: increase sleep for ci tests

---------

Co-authored-by: Lakshay Kalbhor <lakshay.kalbhor@zerodha.com>
  • Loading branch information
kalbhor and Lakshay Kalbhor authored Aug 14, 2023
1 parent ebab494 commit 14957be
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 44 deletions.
34 changes: 31 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,12 @@ func main() {
```

#### Task Options
Queue is the name of the queue assigned to the task. By default the value is "tasqueue:tasks". Queues can be
shared between tasks.

Concurrency is the number of processors run for this task. Queue is the queue to consume for this task.
Task options contains callbacks that are executed one a state change.
Concurrency defines the number of processor go-routines running on the queue. By default this number is equal
to `runtime.GOMAXPROCS(0)` (number of CPUs on the system). Ideally, it is recommended that the client tweak this number according
to their tasks.

```go
type TaskOpts struct {
Expand Down Expand Up @@ -162,8 +165,33 @@ func SumProcessor(b []byte, m tasqueue.JobCtx) error {
}
```

Once a queue is created if the client creates a task with an existing queue but supplies a different concurrency
in the `TaskOpts`, then `RegisterTask` will return an error.

```go
srv.RegisterTask("add", tasks.SumProcessor, TaskOpts{Concurrency: 5})
// This creates the q1 queue if it doesn't exist and assigns 5 concurrency to it
err := srv.RegisterTask("add", tasks.SumProcessor, TaskOpts{Queue:"q1", Concurrency: 5})
if err != nil {
log.Fatal(err)
}

// No error
err := srv.RegisterTask("div", tasks.DivProcessor, TaskOpts{Queue:"q1"})
if err != nil {
log.Fatal(err)
}

// No error
err := srv.RegisterTask("sub", tasks.SubProcessor, TaskOpts{Queue:"q1", Concurrency: 5})
if err != nil {
log.Fatal(err)
}

// This will return an error since q1 is already created and its concurrency cannot be modified
err := srv.RegisterTask("multiplication", tasks.MulProcessor, TaskOpts{Queue:"q1", Concurrency: 10})
if err != nil {
log.Fatal(err)
}
```

#### Start server
Expand Down
15 changes: 12 additions & 3 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ func BenchmarkJob(b *testing.B) {
}

// Register the handler and enqueue the jobs.
srv.RegisterTask(sampleHandler, handler, TaskOpts{Concurrency: concurrency})
err := srv.RegisterTask(sampleHandler, handler, TaskOpts{Concurrency: concurrency})
if err != nil {
b.Fatal(err)
}
for i := 0; i < num; i++ {
if _, err := srv.Enqueue(ctx, newJob(b)); err != nil {
b.Fatalf("could not enqueue job : %v", err)
Expand Down Expand Up @@ -144,7 +147,10 @@ func BenchmarkChain(b *testing.B) {
}

// Register the handler and enqueue the jobs.
srv.RegisterTask(sampleHandler, handler, TaskOpts{Concurrency: concurrency})
err := srv.RegisterTask(sampleHandler, handler, TaskOpts{Concurrency: concurrency})
if err != nil {
b.Fatal(err)
}
for i := 0; i < num; i++ {
// Create a list of jobs to generate a chain.
var jobs = make([]Job, jobNum)
Expand Down Expand Up @@ -205,7 +211,10 @@ func BenchmarkGroup(b *testing.B) {
}

// Register the handler and enqueue the jobs.
srv.RegisterTask(sampleHandler, handler, TaskOpts{Concurrency: concurrency})
err := srv.RegisterTask(sampleHandler, handler, TaskOpts{Concurrency: concurrency})
if err != nil {
b.Fatal(err)
}
for i := 0; i < num; i++ {
// Create a list of jobs to generate a group.
var jobs = make([]Job, jobNum)
Expand Down
10 changes: 8 additions & 2 deletions chains_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,14 @@ func TestGetChain(t *testing.T) {
ctx = context.Background()
)

srv.RegisterTask(chainTask1, initHandler, TaskOpts{})
srv.RegisterTask(chainTask, handler, TaskOpts{})
err := srv.RegisterTask(chainTask1, initHandler, TaskOpts{})
if err != nil {
t.Fatal(err)
}
err = srv.RegisterTask(chainTask, handler, TaskOpts{})
if err != nil {
t.Fatal(err)
}

// Create sequential list of jobs for the chain.
var jobs = []Job{
Expand Down
5 changes: 4 additions & 1 deletion examples/in-memory/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func main() {
log.Fatal(err)
}

srv.RegisterTask("add", tasks.SumProcessor, tasqueue.TaskOpts{})
err = srv.RegisterTask("add", tasks.SumProcessor, tasqueue.TaskOpts{})
if err != nil {
log.Fatal(err)
}

var chain []tasqueue.Job

Expand Down
7 changes: 4 additions & 3 deletions examples/nats-js/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ func main() {
log.Fatal(err)
}

srv.RegisterTask("add", tasks.SumProcessor, tasqueue.TaskOpts{
Concurrency: 5,
})
err = srv.RegisterTask("add", tasks.SumProcessor, tasqueue.TaskOpts{})
if err != nil {
log.Fatal(err)
}

var chain []tasqueue.Job

Expand Down
7 changes: 4 additions & 3 deletions examples/redis/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ func main() {
log.Fatal(err)
}

srv.RegisterTask("add", tasks.SumProcessor, tasqueue.TaskOpts{
Concurrency: 5,
})
err = srv.RegisterTask("add", tasks.SumProcessor, tasqueue.TaskOpts{Concurrency: 5})
if err != nil {
log.Fatal(err)
}

var group []tasqueue.Job

Expand Down
12 changes: 9 additions & 3 deletions jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestGetJob(t *testing.T) {
}

// Wait for task to be consumed & processed.
time.Sleep(time.Second)
time.Sleep(time.Second * 2)
msg, err := srv.GetJob(ctx, uuid)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -143,7 +143,10 @@ func TestSaveJob(t *testing.T) {
)

// Register the task and handler
srv.RegisterTask("validate-save", handler, TaskOpts{})
err := srv.RegisterTask("validate-save", handler, TaskOpts{})
if err != nil {
t.Fatal(err)
}

// Create a job that passes the data needed to be saved.
job, err := NewJob("validate-save", []byte(savedData), JobOpts{MaxRetries: 1})
Expand Down Expand Up @@ -188,7 +191,10 @@ func TestDeleteJob(t *testing.T) {
)

// Register the task and handler
srv.RegisterTask("validate-save", handler, TaskOpts{})
err := srv.RegisterTask("validate-save", handler, TaskOpts{})
if err != nil {
t.Fatal(err)
}

// Create a job that passes the data needed to be saved.
job, err := NewJob("validate-save", []byte(savedData), JobOpts{MaxRetries: 1})
Expand Down
75 changes: 52 additions & 23 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasqueue
import (
"context"
"fmt"
"runtime"
"sync"
"time"

Expand All @@ -16,8 +17,6 @@ import (
)

const (
defaultConcurrency = 1

// This is the initial state when a job is pushed onto the broker.
StatusStarted = "queued"

Expand Down Expand Up @@ -63,17 +62,36 @@ type TaskOpts struct {

// RegisterTask maps a new task against the tasks map on the server.
// It accepts different options for the task (to set callbacks).
func (s *Server) RegisterTask(name string, fn handler, opts TaskOpts) {
func (s *Server) RegisterTask(name string, fn handler, opts TaskOpts) error {
s.log.Info("added handler", "name", name)

if opts.Concurrency <= 0 {
opts.Concurrency = defaultConcurrency
}
if opts.Queue == "" {
opts.Queue = DefaultQueue
}
if opts.Concurrency <= 0 {
opts.Concurrency = uint32(s.defaultConc)
}

s.q.RLock()
conc, ok := s.queues[opts.Queue]
s.q.RUnlock()
if !ok {
s.registerQueue(opts.Queue, opts.Concurrency)
s.registerHandler(name, Task{name: name, handler: fn, opts: opts})

return nil

}

// If the queue is already defined and the passed concurrency optional
// is same (it can be default queue/conc) so simply register the handler
if opts.Concurrency == conc {
s.registerHandler(name, Task{name: name, handler: fn, opts: opts})
return nil
}

s.registerHandler(name, Task{name: name, handler: fn, opts: opts})
// If queue is already registered but a new conc was defined, return an err
return fmt.Errorf("queue is already defined with %d concurrency", conc)
}

// Server is the main store that holds the broker and the results communication interfaces.
Expand All @@ -85,8 +103,12 @@ type Server struct {
cron *cron.Cron
traceProv *trace.TracerProvider

p sync.RWMutex
tasks map[string]Task
p sync.RWMutex
tasks map[string]Task
q sync.RWMutex
queues map[string]uint32

defaultConc int
}

type ServerOpts struct {
Expand All @@ -109,12 +131,14 @@ func NewServer(o ServerOpts) (*Server, error) {
}

return &Server{
traceProv: o.TraceProvider,
log: o.Logger,
cron: cron.New(),
broker: o.Broker,
results: o.Results,
tasks: make(map[string]Task),
traceProv: o.TraceProvider,
log: o.Logger,
cron: cron.New(),
broker: o.Broker,
results: o.Results,
tasks: make(map[string]Task),
defaultConc: runtime.GOMAXPROCS(0),
queues: make(map[string]uint32),
}, nil
}

Expand Down Expand Up @@ -182,28 +206,27 @@ func (s *Server) GetSuccess(ctx context.Context) ([]string, error) {
// Start() starts the job consumer and processor. It is a blocking function.
func (s *Server) Start(ctx context.Context) {
go s.cron.Start()
// Loop over each registered task.
s.p.RLock()
tasks := s.tasks
s.p.RUnlock()
// Loop over each registered queue.
s.q.RLock()
queues := s.queues
s.q.RUnlock()

var wg sync.WaitGroup
for _, task := range tasks {
for q, conc := range queues {
if s.traceProv != nil {
var span spans.Span
ctx, span = otel.Tracer(tracer).Start(ctx, "start")
defer span.End()
}

task := task
work := make(chan []byte)
wg.Add(1)
go func() {
s.consume(ctx, work, task.opts.Queue)
s.consume(ctx, work, q)
wg.Done()
}()

for i := 0; i < int(task.opts.Concurrency); i++ {
for i := 0; i < int(conc); i++ {
wg.Add(1)
go func() {
s.process(ctx, work)
Expand Down Expand Up @@ -396,6 +419,12 @@ func (s *Server) retryJob(ctx context.Context, msg JobMessage) error {
return nil
}

func (s *Server) registerQueue(name string, conc uint32) {
s.q.Lock()
s.queues[name] = conc
s.q.Unlock()
}

func (s *Server) registerHandler(name string, t Task) {
s.p.Lock()
s.tasks[name] = t
Expand Down
6 changes: 3 additions & 3 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ func newServer(t *testing.T, taskName string, handler func([]byte, JobCtx) error
if err != nil {
t.Fatal(err)
}
srv.RegisterTask(taskName, handler, TaskOpts{
Concurrency: 5,
})
if err := srv.RegisterTask(taskName, handler, TaskOpts{}); err != nil {
t.Fatal(err)
}

return srv
}
Expand Down

0 comments on commit 14957be

Please sign in to comment.