diff --git a/README.md b/README.md index 8c87dc12..ece15f73 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ -# gocraft/work [![GoDoc](https://godoc.org/github.com/gocraft/work?status.png)](https://godoc.org/github.com/gocraft/work) +# DispatchMe/go-work [![GoDoc](https://godoc.org/github.com/DispatchMe/go-work?status.png)](https://godoc.org/github.com/DispatchMe/go-work) -gocraft/work lets you enqueue and processes background jobs in Go. Jobs are durable and backed by Redis. Very similar to Sidekiq for Go. +**This is a fork of [gocraft/work](https://www.github.com/gocraft/work) that removes their strange usage of custom contexts. The use of reflection to support that was detracting from efficiency and didn't feel very "go-like", so we made this change** + +DispatchMe/go-work lets you enqueue and processes background jobs in Go. Jobs are durable and backed by Redis. Very similar to Sidekiq for Go. * Fast and efficient. Faster than [this](https://www.github.com/jrallison/go-workers), [this](https://www.github.com/benmanns/goworker), and [this](https://www.github.com/albrow/jobs). See below for benchmarks. * Reliable - don't lose jobs even if your process crashes. @@ -11,6 +13,10 @@ gocraft/work lets you enqueue and processes background jobs in Go. Jobs are dura * Web UI to manage failed jobs and observe the system. * Periodically enqueue jobs on a cron-like schedule. +## Run tests + +Redis must be installed to avoid a panic when running tests. + ## Enqueue new jobs To enqueue jobs, you need to make an Enqueuer with a redis namespace and a redigo pool. Each enqueued job has a name and can take optional arguments. Arguments are k/v pairs (serialized as JSON internally). @@ -19,8 +25,8 @@ To enqueue jobs, you need to make an Enqueuer with a redis namespace and a redig package main import ( - "github.com/garyburd/redigo/redis" - "github.com/gocraft/work" + "github.com/gomodule/redigo/redis" + "github.com/DispatchMe/go-work" ) // Make a redis pool @@ -33,12 +39,18 @@ var redisPool = &redis.Pool{ }, } +type SendEmailJobParameters struct { + Address string + Subject string + CustomerID int +} + // Make an enqueuer with a particular namespace var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool) func main() { // Enqueue a job named "send_email" with the specified parameters. - _, err := enqueuer.Enqueue("send_email", work.Q{"address": "test@example.com", "subject": "hello world", "customer_id": 4}) + _, err := enqueuer.Enqueue("send_email", &SendEmailJobParameters{Address: "test@example.com", Subject: "hello world", CustomerID: 4}) if err != nil { log.Fatal(err) } @@ -55,8 +67,8 @@ In order to process jobs, you'll need to make a WorkerPool. Add middleware and j package main import ( - "github.com/garyburd/redigo/redis" - "github.com/gocraft/work" + "github.com/gomodule/redigo/redis" + "github.com/DispatchMe/go-work" "os" "os/signal" ) @@ -71,27 +83,21 @@ var redisPool = &redis.Pool{ }, } -type Context struct{ - customerID int64 -} - func main() { // Make a new pool. Arguments: - // Context{} is a struct that will be the context for the request. // 10 is the max concurrency // "my_app_namespace" is the Redis namespace // redisPool is a Redis pool - pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool) + pool := work.NewWorkerPool(10, "my_app_namespace", redisPool) // Add middleware that will be executed for each job - pool.Middleware((*Context).Log) - pool.Middleware((*Context).FindCustomer) + pool.Middleware(LogMiddleware) // Map the name of jobs to handler functions - pool.Job("send_email", (*Context).SendEmail) + pool.Job("send_email", SendEmailHandler) // Customize options: - pool.JobWithOptions("export", JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export) + pool.JobWithOptions("export", JobOptions{Priority: 10, MaxFails: 1}, ExportHandler) // Start processing jobs pool.Start() @@ -105,50 +111,32 @@ func main() { pool.Stop() } -func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error { - fmt.Println("Starting job: ", job.Name) +func LogMiddleware(ctx *work.Context, next work.NextMiddlewareFunc) error { + fmt.Println("Starting job: ", ctx.Job.Name) return next() } -func (c *Context) FindCustomer(job *work.Job, next work.NextMiddlewareFunc) error { - // If there's a customer_id param, set it in the context for future middleware and handlers to use. - if _, ok := job.Args["customer_id"]; ok { - c.customerID = job.ArgInt64("customer_id") - if err := job.ArgError(); err != nil { - return err - } - } - - return next() -} - -func (c *Context) SendEmail(job *work.Job) error { +func SendEmailHandler(ctx *work.Context) error { // Extract arguments: - addr := job.ArgString("address") - subject := job.ArgString("subject") - if err := job.ArgError(); err != nil { + args := new(SendEmailJobParameters) + err := ctx.Job.UnmarshalPayload(args) + if err != nil { return err } // Go ahead and send the email... - // sendEmailTo(addr, subject) + // sendEmailTo(args.Address, args.Subject) return nil } -func (c *Context) Export(job *work.Job) error { +func ExportHandler(ctx *work.Context) error { return nil } ``` ## Special Features -### Contexts - -Just like in [gocraft/web](https://www.github.com/gocraft/web), gocraft/work lets you use your own contexts. Your context can be empty or it can have various fields in it. The fields can be whatever you want - it's your type! When a new job is processed by a worker, we'll allocate an instance of this struct and pass it to your middleware and handlers. This allows you to pass information from one middleware function to the next, and onto your handlers. - -Custom contexts aren't really needed for trivial example applications, but are very important for production apps. For instance, one field in your context can be your tagged logger. Your tagged logger augments your log statements with a job-id. This lets you filter your logs by that job-id. - ### Check-ins Since this is a background job processing library, it's fairly common to have jobs that that take a long time to execute. Imagine you have a job that takes an hour to run. It can often be frustrating to know if it's hung, or about to finish, or if it has 30 more minutes to go. @@ -156,12 +144,12 @@ Since this is a background job processing library, it's fairly common to have jo To solve this, you can instrument your jobs to "checkin" every so often with a string message. This checkin status will show up in the web UI. For instance, your job could look like this: ```go -func (c *Context) Export(job *work.Job) error { +func ExportHandler(ctx *work.Context) error { rowsToExport := getRows() for i, row := range rowsToExport { exportRow(row) if i % 1000 == 0 { - job.Checkin("i=" + fmt.Sprint(i)) // Here's the magic! This tells gocraft/work our status + ctx.Job.Checkin("i=" + fmt.Sprint(i)) // Here's the magic! This tells DispatchMe/go-work our status } } } @@ -170,9 +158,11 @@ func (c *Context) Export(job *work.Job) error { Then in the web UI, you'll see the status of the worker: +``` | Name | Arguments | Started At | Check-in At | Check-in | | --- | --- | --- | --- | --- | | export | {"account_id": 123} | 2016/07/09 04:16:51 | 2016/07/09 05:03:13 | i=335000 | +``` ### Scheduled Jobs @@ -197,22 +187,22 @@ job, err = enqueuer.EnqueueUniqueIn("clear_cache", 300, work.Q{"object_id_": "78 ### Periodic Enqueueing (Cron) -You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The [scheduling specification](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once. +You can periodically enqueue jobs on your DispatchMe/go-work cluster using your worker pool. The [scheduling specification](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once. ```go -pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool) +pool := work.NewWorkerPool(10, "my_app_namespace", redisPool) pool.PeriodicallyEnqueue("0 0 * * * *", "calculate_caches") // This will enqueue a "calculate_caches" job every hour -pool.Job("calculate_caches", (*Context).CalculateCaches) // Still need to register a handler for this job separately +pool.Job("calculate_caches", CalculateCaches) // Still need to register a handler for this job separately ``` ## Run the Web UI -The web UI provides a view to view the state of your gocraft/work cluster, inspect queued jobs, and retry or delete dead jobs. +The web UI provides a view to view the state of your DispatchMe/go-work cluster, inspect queued jobs, and retry or delete dead jobs. Building an installing the binary: ```bash -go get github.com/gocraft/work/cmd/workwebui -go install github.com/gocraft/work/cmd/workwebui +go get github.com/DispatchMe/go-work/cmd/workwebui +go install github.com/DispatchMe/go-work/cmd/workwebui ``` Then, you can run it: @@ -248,11 +238,11 @@ You'll see a view that looks like this: * The worker will then run the job. The job will either finish successfully or result in an error or panic. * If the process completely crashes, the reaper will eventually find it in its in-progress queue and requeue it. * If the job is successful, we'll simply remove the job from the in-progress queue. -* If the job returns an error or panic, we'll see how many retries a job has left. If it doesn't have any, we'll move it to the dead queue. If it has retries left, we'll consume a retry and add the job to the retry queue. +* If the job returns an error or panic, we'll see how many retries a job has left. If it doesn't have any, we'll move it to the dead queue. If it has retries left, we'll consume a retry and add the job to the retry queue. ### Workers and WorkerPools -* WorkerPools provide the public API of gocraft/work. +* WorkerPools provide the public API of DispatchMe/go-work. * You can attach jobs and middleware to them. * You can start and stop them. * Based on their concurrency setting, they'll spin up N worker goroutines. @@ -298,7 +288,7 @@ You'll see a view that looks like this: * "worker observation" - A snapshot made by an observer of what a worker is working on. * "periodic enqueuer" - A process that runs with a worker pool that periodically enqueues new jobs based on cron schedules. * "job" - the actual bundle of data that constitutes one job -* "job name" - each job has a name, like "create_watch" +* "job name" - each job has a name, like `create_watch` * "job type" - backend/private nomenclature for the handler+options for processing a job * "queue" - each job creates a queue with the same name as the job. only jobs named X go into the X queue. * "retry jobs" - If a job fails and needs to be retried, it will be put on this queue. @@ -308,26 +298,18 @@ You'll see a view that looks like this: ## Benchmarks -The benches folder contains various benchmark code. In each case, we enqueue 100k jobs across 5 queues. The jobs are almost no-op jobs: they simply increment an atomic counter. We then measure the rate of change of the counter to obtain our measurement. +The benches folder used to contain various benchmark code. In each case, we enqueued 100k jobs across 5 queues. The jobs were almost no-op jobs: they simply incremented an atomic counter. We then measured the rate of change of the counter to obtain our measurement. These were some test results: | Library | Speed | | --- | --- | -| [gocraft/work](https://www.github.com/gocraft/work) | **20944 jobs/s** | +| [DispatchMe/go-work](https://www.github.com/DispatchMe/go-work) | **20944 jobs/s** | | [jrallison/go-workers](https://www.github.com/jrallison/go-workers) | 19945 jobs/s | | [benmanns/goworker](https://www.github.com/benmanns/goworker) | 10328.5 jobs/s | | [albrow/jobs](https://www.github.com/albrow/jobs) | 40 jobs/s | - -## gocraft - -gocraft offers a toolkit for building web apps. Currently these packages are available: - -* [gocraft/web](https://github.com/gocraft/web) - Go Router + Middleware. Your Contexts. -* [gocraft/dbr](https://github.com/gocraft/dbr) - Additions to Go's database/sql for super fast performance and convenience. -* [gocraft/health](https://github.com/gocraft/health) - Instrument your web apps with logging and metrics. -* [gocraft/work](https://github.com/gocraft/work) - Process background jobs in Go. - -These packages were developed by the [engineering team](https://eng.uservoice.com) at [UserVoice](https://www.uservoice.com) and currently power much of its infrastructure and tech stack. +The comparison benchmarks were run against repositories that were stale and unmaintained by fall of 2018. Invalid +import paths were causing tests to fail in go-lib, which has background and indexer packages that rely on this +repository. As the benchmarks were not currently needed, they were removed. ## Authors diff --git a/benches/bench_goworker/main.go b/benches/bench_goworker/main.go deleted file mode 100644 index cfc8243a..00000000 --- a/benches/bench_goworker/main.go +++ /dev/null @@ -1,123 +0,0 @@ -package main - -import ( - "fmt" - "github.com/benmanns/goworker" - "github.com/garyburd/redigo/redis" - "github.com/gocraft/health" - "os" - "sync/atomic" - "time" -) - -func myJob(queue string, args ...interface{}) error { - atomic.AddInt64(&totcount, 1) - //fmt.Println("job! ", queue) - return nil -} - -var namespace = "bench_test" -var pool = newPool(":6379") - -// go run *.go -queues="myqueue,myqueue2,myqueue3,myqueue4,myqueue5" -namespace="bench_test:" -concurrency=50 -use-nuber -func main() { - - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - stream.Event("wat") - cleanKeyspace() - - queues := []string{"myqueue", "myqueue2", "myqueue3", "myqueue4", "myqueue5"} - numJobs := 100000 / len(queues) - - job := stream.NewJob("enqueue_all") - for _, q := range queues { - enqueueJobs(q, numJobs) - } - job.Complete(health.Success) - - goworker.Register("MyClass", myJob) - - go monitor() - - // Blocks until process is told to exit via unix signal - goworker.Work() -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for { - select { - case <-t: - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - prev = v - } - } - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func enqueueJobs(queue string, count int) { - conn := pool.Get() - defer conn.Close() - - for i := 0; i < count; i++ { - //workers.Enqueue(queue, "Foo", []int{i}) - conn.Do("RPUSH", "bench_test:queue:"+queue, `{"class":"MyClass","args":[]}`) - } -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 3, - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/benches/bench_goworkers/main.go b/benches/bench_goworkers/main.go deleted file mode 100644 index bd67b7d5..00000000 --- a/benches/bench_goworkers/main.go +++ /dev/null @@ -1,131 +0,0 @@ -package main - -import ( - "fmt" - "github.com/garyburd/redigo/redis" - "github.com/gocraft/health" - "github.com/jrallison/go-workers" - "os" - "sync/atomic" - "time" -) - -func myJob(m *workers.Msg) { - atomic.AddInt64(&totcount, 1) -} - -var namespace = "bench_test" -var pool = newPool(":6379") - -func main() { - - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - stream.Event("wat") - cleanKeyspace() - - workers.Configure(map[string]string{ - // location of redis instance - "server": "localhost:6379", - // instance of the database - "database": "0", - // number of connections to keep open with redis - "pool": "10", - // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash) - "process": "1", - "namespace": namespace, - }) - workers.Middleware = &workers.Middlewares{} - - queues := []string{"myqueue", "myqueue2", "myqueue3", "myqueue4", "myqueue5"} - numJobs := 100000 / len(queues) - - job := stream.NewJob("enqueue_all") - for _, q := range queues { - enqueueJobs(q, numJobs) - } - job.Complete(health.Success) - - for _, q := range queues { - workers.Process(q, myJob, 10) - } - - go monitor() - - // Blocks until process is told to exit via unix signal - workers.Run() -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for { - select { - case <-t: - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - prev = v - } - } - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func enqueueJobs(queue string, count int) { - for i := 0; i < count; i++ { - workers.Enqueue(queue, "Foo", []int{i}) - } -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 3, - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/benches/bench_jobs/main.go b/benches/bench_jobs/main.go deleted file mode 100644 index cf68f5fc..00000000 --- a/benches/bench_jobs/main.go +++ /dev/null @@ -1,140 +0,0 @@ -package main - -import ( - "fmt" - "github.com/albrow/jobs" - "github.com/garyburd/redigo/redis" - "github.com/gocraft/health" - "os" - "sync/atomic" - "time" -) - -var namespace = "jobs" -var pool = newPool(":6379") - -func epsilonHandler(i int) error { - atomic.AddInt64(&totcount, 1) - return nil -} - -func main() { - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - cleanKeyspace() - - queueNames := []string{"myqueue", "myqueue2", "myqueue3", "myqueue4", "myqueue5"} - queues := []*jobs.Type{} - - for _, qn := range queueNames { - q, err := jobs.RegisterType(qn, 3, epsilonHandler) - if err != nil { - panic(err) - } - queues = append(queues, q) - } - - job := stream.NewJob("enqueue_all") - - numJobs := 40000 / len(queues) - for _, q := range queues { - for i := 0; i < numJobs; i++ { - _, err := q.Schedule(100, time.Now(), i) - if err != nil { - panic(err) - } - } - } - - job.Complete(health.Success) - - go monitor() - - job = stream.NewJob("run_all") - pool, err := jobs.NewPool(&jobs.PoolConfig{ - // NumWorkers: 1000, - // BatchSize: 3000, - }) - if err != nil { - panic(err) - } - defer func() { - pool.Close() - if err := pool.Wait(); err != nil { - panic(err) - } - }() - if err := pool.Start(); err != nil { - panic(err) - } - job.Complete(health.Success) - select {} -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for { - select { - case <-t: - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - prev = v - } - } - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 20, - MaxIdle: 20, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/benches/bench_work/main.go b/benches/bench_work/main.go deleted file mode 100644 index 9184b61b..00000000 --- a/benches/bench_work/main.go +++ /dev/null @@ -1,130 +0,0 @@ -package main - -import ( - "fmt" - "github.com/garyburd/redigo/redis" - "github.com/gocraft/health" - "github.com/gocraft/work" - "os" - "sync/atomic" - "time" -) - -var namespace = "bench_test" -var pool = newPool(":6379") - -type context struct{} - -func epsilonHandler(job *work.Job) error { - //fmt.Println("hi") - //a := job.Args[0] - //fmt.Printf("job: %s arg: %v\n", job.Name, a) - atomic.AddInt64(&totcount, 1) - return nil -} - -func main() { - stream := health.NewStream().AddSink(&health.WriterSink{os.Stdout}) - cleanKeyspace() - - numJobs := 10 - jobNames := []string{} - - for i := 0; i < numJobs; i++ { - jobNames = append(jobNames, fmt.Sprintf("job%d", i)) - } - - job := stream.NewJob("enqueue_all") - enqueueJobs(jobNames, 10000) - job.Complete(health.Success) - - workerPool := work.NewWorkerPool(context{}, 20, namespace, pool) - for _, jobName := range jobNames { - workerPool.Job(jobName, epsilonHandler) - } - go monitor() - - job = stream.NewJob("run_all") - workerPool.Start() - workerPool.Drain() - job.Complete(health.Success) - select {} -} - -var totcount int64 - -func monitor() { - t := time.Tick(1 * time.Second) - - curT := 0 - c1 := int64(0) - c2 := int64(0) - prev := int64(0) - -DALOOP: - for { - select { - case <-t: - curT++ - v := atomic.AddInt64(&totcount, 0) - fmt.Printf("after %d seconds, count is %d\n", curT, v) - if curT == 1 { - c1 = v - } else if curT == 3 { - c2 = v - } - if v == prev { - break DALOOP - } - prev = v - } - } - fmt.Println("Jobs/sec: ", float64(c2-c1)/2.0) - os.Exit(0) -} - -func enqueueJobs(jobs []string, count int) { - enq := work.NewEnqueuer(namespace, pool) - for _, jobName := range jobs { - for i := 0; i < count; i++ { - enq.Enqueue(jobName, work.Q{"i": i}) - } - } -} - -func cleanKeyspace() { - conn := pool.Get() - defer conn.Close() - - keys, err := redis.Strings(conn.Do("KEYS", namespace+"*")) - if err != nil { - panic("could not get keys: " + err.Error()) - } - for _, k := range keys { - //fmt.Println("deleting ", k) - if _, err := conn.Do("DEL", k); err != nil { - panic("could not del: " + err.Error()) - } - } -} - -func newPool(addr string) *redis.Pool { - return &redis.Pool{ - MaxActive: 20, - MaxIdle: 20, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", addr) - if err != nil { - return nil, err - } - return c, nil - //return redis.NewLoggingConn(c, log.New(os.Stdout, "", 0), "redis"), err - }, - Wait: true, - //TestOnBorrow: func(c redis.Conn, t time.Time) error { - // _, err := c.Do("PING") - // return err - //}, - } -} diff --git a/client.go b/client.go index c7a2e7e9..3e614a37 100644 --- a/client.go +++ b/client.go @@ -2,7 +2,7 @@ package work import ( "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "sort" "strconv" "strings" @@ -125,7 +125,7 @@ type WorkerObservation struct { JobName string `json:"job_name"` JobID string `json:"job_id"` StartedAt int64 `json:"started_at"` - ArgsJSON string `json:"args_json"` + Payload []byte `json:"payload"` Checkin string `json:"checkin"` CheckinAt int64 `json:"checkin_at"` } @@ -182,8 +182,8 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) { ob.JobID = value } else if key == "started_at" { ob.StartedAt, err = strconv.ParseInt(value, 10, 64) - } else if key == "args" { - ob.ArgsJSON = value + } else if key == "payload" { + ob.Payload = []byte(value) } else if key == "checkin" { ob.Checkin = value } else if key == "checkin_at" { @@ -480,7 +480,7 @@ func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error { } if job.Unique { - uniqueKey, err := redisKeyUniqueJob(c.namespace, job.Name, job.Args) + uniqueKey, err := redisKeyUniqueJob(c.namespace, job.Name, job.Payload) if err != nil { logError("client.delete_scheduled_job.redis_key_unique_job", err) return err diff --git a/client_test.go b/client_test.go index a4cd7929..a9a8a6cb 100644 --- a/client_test.go +++ b/client_test.go @@ -2,7 +2,7 @@ package work import ( "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" "testing" "time" @@ -15,14 +15,14 @@ func TestClientWorkerPoolHeartbeats(t *testing.T) { ns := "work" cleanKeyspace(ns, pool) - wp := NewWorkerPool(TestContext{}, 10, ns, pool) - wp.Job("wat", func(job *Job) error { return nil }) - wp.Job("bob", func(job *Job) error { return nil }) + wp := NewWorkerPool(10, ns, pool) + wp.Job("wat", func(ctx *Context) error { return nil }) + wp.Job("bob", func(ctx *Context) error { return nil }) wp.Start() - wp2 := NewWorkerPool(TestContext{}, 11, ns, pool) - wp2.Job("foo", func(job *Job) error { return nil }) - wp2.Job("bar", func(job *Job) error { return nil }) + wp2 := NewWorkerPool(11, ns, pool) + wp2.Job("foo", func(ctx *Context) error { return nil }) + wp2.Job("bar", func(ctx *Context) error { return nil }) wp2.Start() time.Sleep(20 * time.Millisecond) @@ -73,12 +73,12 @@ func TestClientWorkerObservations(t *testing.T) { _, err = enqueuer.Enqueue("foo", Q{"a": 3, "b": 4}) assert.Nil(t, err) - wp := NewWorkerPool(TestContext{}, 10, ns, pool) - wp.Job("wat", func(job *Job) error { + wp := NewWorkerPool(10, ns, pool) + wp.Job("wat", func(ctx *Context) error { time.Sleep(50 * time.Millisecond) return nil }) - wp.Job("foo", func(job *Job) error { + wp.Job("foo", func(ctx *Context) error { time.Sleep(50 * time.Millisecond) return nil }) @@ -97,13 +97,13 @@ func TestClientWorkerObservations(t *testing.T) { if ob.JobName == "foo" { fooCount++ assert.True(t, ob.IsBusy) - assert.Equal(t, `{"a":3,"b":4}`, ob.ArgsJSON) + assert.Equal(t, []byte(`{"a":3,"b":4}`), ob.Payload) assert.True(t, (nowEpochSeconds()-ob.StartedAt) <= 3) assert.True(t, ob.JobID != "") } else if ob.JobName == "wat" { watCount++ assert.True(t, ob.IsBusy) - assert.Equal(t, `{"a":1,"b":2}`, ob.ArgsJSON) + assert.Equal(t, []byte(`{"a":1,"b":2}`), ob.Payload) assert.True(t, (nowEpochSeconds()-ob.StartedAt) <= 3) assert.True(t, ob.JobID != "") } else { @@ -143,14 +143,14 @@ func TestClientQueues(t *testing.T) { // Start a pool to work on it. It's going to work on the queues // side effect of that is knowing which jobs are avail - wp := NewWorkerPool(TestContext{}, 10, ns, pool) - wp.Job("wat", func(job *Job) error { + wp := NewWorkerPool(10, ns, pool) + wp.Job("wat", func(ctx *Context) error { return nil }) - wp.Job("foo", func(job *Job) error { + wp.Job("foo", func(ctx *Context) error { return nil }) - wp.Job("zaz", func(job *Job) error { + wp.Job("zaz", func(ctx *Context) error { return nil }) wp.Start() @@ -213,8 +213,11 @@ func TestClientScheduledJobs(t *testing.T) { assert.EqualValues(t, 1425263409, jobs[1].EnqueuedAt) assert.EqualValues(t, 1425263409, jobs[2].EnqueuedAt) - assert.EqualValues(t, interface{}(1), jobs[0].Args["a"]) - assert.EqualValues(t, interface{}(2), jobs[0].Args["b"]) + var q Q + err = jobs[0].UnmarshalPayload(&q) + assert.Nil(t, err) + assert.EqualValues(t, interface{}(1), q["a"]) + assert.EqualValues(t, interface{}(2), q["b"]) assert.EqualValues(t, 0, jobs[0].Fails) assert.EqualValues(t, 0, jobs[1].Fails) @@ -244,8 +247,8 @@ func TestClientRetryJobs(t *testing.T) { setNowEpochSecondsMock(1425263429) - wp := NewWorkerPool(TestContext{}, 10, ns, pool) - wp.Job("wat", func(job *Job) error { + wp := NewWorkerPool(10, ns, pool) + wp.Job("wat", func(ctx *Context) error { return fmt.Errorf("ohno") }) wp.Start() @@ -262,7 +265,9 @@ func TestClientRetryJobs(t *testing.T) { assert.EqualValues(t, 1425263429, jobs[0].FailedAt) assert.Equal(t, "wat", jobs[0].Name) assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt) - assert.EqualValues(t, interface{}(1), jobs[0].Args["a"]) + var q Q + assert.Nil(t, jobs[0].UnmarshalPayload(&q)) + assert.EqualValues(t, interface{}(1), q["a"]) assert.EqualValues(t, 1, jobs[0].Fails) assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt) assert.Equal(t, "ohno", jobs[0].LastErr) @@ -283,8 +288,8 @@ func TestClientDeadJobs(t *testing.T) { setNowEpochSecondsMock(1425263429) - wp := NewWorkerPool(TestContext{}, 10, ns, pool) - wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { + wp := NewWorkerPool(10, ns, pool) + wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(ctx *Context) error { return fmt.Errorf("ohno") }) wp.Start() @@ -302,7 +307,9 @@ func TestClientDeadJobs(t *testing.T) { assert.EqualValues(t, 1425263429, jobs[0].FailedAt) assert.Equal(t, "wat", jobs[0].Name) assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt) - assert.EqualValues(t, interface{}(1), jobs[0].Args["a"]) + var q Q + assert.Nil(t, jobs[0].UnmarshalPayload(&q)) + assert.EqualValues(t, interface{}(1), q["a"]) assert.EqualValues(t, 1, jobs[0].Fails) assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt) assert.Equal(t, "ohno", jobs[0].LastErr) @@ -422,7 +429,7 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) { Name: name, ID: makeIdentifier(), EnqueuedAt: encAt, - Args: map[string]interface{}{"a": "wat"}, + Payload: []byte(`{"a": "wat"}`), Fails: 3, LastErr: "sorry", FailedAt: failAt, @@ -447,9 +454,10 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) { job1 := getQueuedJob(ns, pool, name) if assert.NotNil(t, job1) { + var q Q + assert.Nil(t, job1.UnmarshalPayload(&q)) assert.Equal(t, name, job1.Name) - assert.Equal(t, "wat", job1.ArgString("a")) - assert.NoError(t, job1.ArgError()) + assert.Equal(t, "wat", q["a"].(string)) } } @@ -553,7 +561,7 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) { Name: "wat1", ID: makeIdentifier(), EnqueuedAt: 12345, - Args: nil, + Payload: nil, Fails: 3, LastErr: "sorry", FailedAt: 12347, @@ -574,7 +582,7 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) { Name: "dontexist", ID: makeIdentifier(), EnqueuedAt: 12345, - Args: nil, + Payload: nil, Fails: 3, LastErr: "sorry", FailedAt: 12347, @@ -662,8 +670,8 @@ func TestClientDeleteRetryJob(t *testing.T) { setNowEpochSecondsMock(1425263429) - wp := NewWorkerPool(TestContext{}, 10, ns, pool) - wp.Job("wat", func(job *Job) error { + wp := NewWorkerPool(10, ns, pool) + wp.Job("wat", func(ctx *Context) error { return fmt.Errorf("ohno") }) wp.Start() @@ -687,7 +695,7 @@ func insertDeadJob(ns string, pool *redis.Pool, name string, encAt, failAt int64 Name: name, ID: makeIdentifier(), EnqueuedAt: encAt, - Args: nil, + Payload: nil, Fails: 3, LastErr: "sorry", FailedAt: failAt, diff --git a/cmd/workenqueue/main.go b/cmd/workenqueue/main.go index fb08c3c3..9e5ac026 100644 --- a/cmd/workenqueue/main.go +++ b/cmd/workenqueue/main.go @@ -4,7 +4,7 @@ import ( "encoding/json" "flag" "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/gocraft/work" "os" "time" @@ -13,7 +13,7 @@ import ( var redisHostPort = flag.String("redis", ":6379", "redis hostport") var redisNamespace = flag.String("ns", "work", "redis namespace") var jobName = flag.String("job", "", "job name") -var jobArgs = flag.String("args", "{}", "job arguments") +var jobPayload = flag.String("payload", "{}", "job payload") func main() { flag.Parse() @@ -25,15 +25,15 @@ func main() { pool := newPool(*redisHostPort) - var args map[string]interface{} - err := json.Unmarshal([]byte(*jobArgs), &args) + var payload map[string]interface{} + err := json.Unmarshal([]byte(*jobPayload), &payload) if err != nil { fmt.Println("invalid args:", err) os.Exit(1) } en := work.NewEnqueuer(*redisNamespace, pool) - en.Enqueue(*jobName, args) + en.Enqueue(*jobName, payload) } func newPool(addr string) *redis.Pool { diff --git a/cmd/workfakedata/main.go b/cmd/workfakedata/main.go index 357fbf0d..0cb6e577 100644 --- a/cmd/workfakedata/main.go +++ b/cmd/workfakedata/main.go @@ -3,7 +3,7 @@ package main import ( "flag" "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/gocraft/work" "math/rand" "time" diff --git a/cmd/workwebui/main.go b/cmd/workwebui/main.go index a596f760..9bb21731 100644 --- a/cmd/workwebui/main.go +++ b/cmd/workwebui/main.go @@ -8,7 +8,7 @@ import ( "strconv" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/gocraft/work/webui" ) diff --git a/context.go b/context.go new file mode 100644 index 00000000..826c4661 --- /dev/null +++ b/context.go @@ -0,0 +1,21 @@ +package work + +type Context struct { + Job *Job + data map[string]interface{} +} + +func NewContext(job *Job) *Context { + return &Context{ + Job: job, + data: make(map[string]interface{}), + } +} + +func (c *Context) Get(key string) interface{} { + return c.data[key] +} + +func (c *Context) Set(key string, value interface{}) { + c.data[key] = value +} diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index f8caa2e6..26f2e234 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) const ( diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index d23ba4fb..d1f0b046 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -1,7 +1,7 @@ package work import ( - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" "testing" "time" diff --git a/enqueue.go b/enqueue.go index c1f4ed6c..4ecd4148 100644 --- a/enqueue.go +++ b/enqueue.go @@ -1,10 +1,9 @@ package work import ( + "github.com/gomodule/redigo/redis" "sync" "time" - - "github.com/garyburd/redigo/redis" ) // Enqueuer can enqueue jobs. @@ -37,12 +36,18 @@ func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer { // Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args ar needed. // Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"}) -func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, error) { +func (e *Enqueuer) Enqueue(jobName string, payload interface{}) (*Job, error) { job := &Job{ Name: jobName, ID: makeIdentifier(), EnqueuedAt: nowEpochSeconds(), - Args: args, + } + + if payload != nil { + err := job.SetPayload(payload) + if err != nil { + return nil, err + } } rawJSON, err := job.serialize() @@ -65,12 +70,18 @@ func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, e } // EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds. -func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) { +func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, payload interface{}) (*ScheduledJob, error) { job := &Job{ Name: jobName, ID: makeIdentifier(), EnqueuedAt: nowEpochSeconds(), - Args: args, + } + + if payload != nil { + err := job.SetPayload(payload) + if err != nil { + return nil, err + } } rawJSON, err := job.serialize() @@ -101,8 +112,8 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri // EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. // In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. // EnqueueUnique returns the job if it was enqueued and nil if it wasn't -func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) { - uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args) +func (e *Enqueuer) EnqueueUnique(jobName string, payload interface{}) (*Job, error) { + uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, payload) if err != nil { return nil, err } @@ -111,9 +122,14 @@ func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (* Name: jobName, ID: makeIdentifier(), EnqueuedAt: nowEpochSeconds(), - Args: args, Unique: true, } + if payload != nil { + err = job.SetPayload(payload) + if err != nil { + return nil, err + } + } rawJSON, err := job.serialize() if err != nil { @@ -140,8 +156,8 @@ func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (* } // EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. -func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) { - uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args) +func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, payload interface{}) (*ScheduledJob, error) { + uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, payload) if err != nil { return nil, err } @@ -150,10 +166,16 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma Name: jobName, ID: makeIdentifier(), EnqueuedAt: nowEpochSeconds(), - Args: args, Unique: true, } + if payload != nil { + err = job.SetPayload(payload) + if err != nil { + return nil, err + } + } + rawJSON, err := job.serialize() if err != nil { return nil, err @@ -182,6 +204,7 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma if res == "ok" && err == nil { return scheduledJob, nil } + return nil, err } diff --git a/enqueue_test.go b/enqueue_test.go index b14af410..216ef238 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -18,9 +18,11 @@ func TestEnqueue(t *testing.T) { assert.True(t, len(job.ID) > 10) // Something is in it assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", job.ArgString("b")) - assert.EqualValues(t, 1, job.ArgInt64("a")) - assert.NoError(t, job.ArgError()) + var args Q + assert.Nil(t, job.UnmarshalPayload(&args)) + + assert.Equal(t, "cool", args["b"].(string)) + assert.EqualValues(t, 1, args["a"].(float64)) // Make sure "wat" is in the known jobs assert.EqualValues(t, []string{"wat"}, knownJobs(pool, redisKeyKnownJobs(ns))) @@ -38,9 +40,10 @@ func TestEnqueue(t *testing.T) { assert.True(t, len(j.ID) > 10) // Something is in it assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", j.ArgString("b")) - assert.EqualValues(t, 1, j.ArgInt64("a")) - assert.NoError(t, j.ArgError()) + var q Q + assert.Nil(t, j.UnmarshalPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) // Now enqueue another job, make sure that we can enqueue multiple _, err = enqueuer.Enqueue("wat", Q{"a": 1, "b": "cool"}) @@ -65,9 +68,10 @@ func TestEnqueueIn(t *testing.T) { assert.True(t, len(job.ID) > 10) // Something is in it assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", job.ArgString("b")) - assert.EqualValues(t, 1, job.ArgInt64("a")) - assert.NoError(t, job.ArgError()) + var q Q + assert.Nil(t, job.UnmarshalPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) } @@ -91,9 +95,10 @@ func TestEnqueueIn(t *testing.T) { assert.True(t, len(j.ID) > 10) // Something is in it assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", j.ArgString("b")) - assert.EqualValues(t, 1, j.ArgInt64("a")) - assert.NoError(t, j.ArgError()) + var q Q + assert.Nil(t, j.UnmarshalPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) } func TestEnqueueUnique(t *testing.T) { @@ -109,9 +114,10 @@ func TestEnqueueUnique(t *testing.T) { assert.True(t, len(job.ID) > 10) // Something is in it assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", job.ArgString("b")) - assert.EqualValues(t, 1, job.ArgInt64("a")) - assert.NoError(t, job.ArgError()) + var q Q + assert.Nil(t, job.UnmarshalPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) } job, err = enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "cool"}) @@ -134,14 +140,14 @@ func TestEnqueueUnique(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, job) - // Process the queues. Ensure the right numbero of jobs was processed + // Process the queues. Ensure the right number of jobs was processed var wats, taws int64 - wp := NewWorkerPool(TestContext{}, 3, ns, pool) - wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { + wp := NewWorkerPool(3, ns, pool) + wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(ctx *Context) error { wats++ return nil }) - wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { + wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(ctx *Context) error { taws++ return fmt.Errorf("ohno") }) @@ -182,9 +188,11 @@ func TestEnqueueUniqueIn(t *testing.T) { assert.True(t, len(job.ID) > 10) // Something is in it assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", job.ArgString("b")) - assert.EqualValues(t, 1, job.ArgInt64("a")) - assert.NoError(t, job.ArgError()) + + var q Q + assert.Nil(t, job.UnmarshalPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) } @@ -202,9 +210,10 @@ func TestEnqueueUniqueIn(t *testing.T) { assert.True(t, len(j.ID) > 10) // Something is in it assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", j.ArgString("b")) - assert.EqualValues(t, 1, j.ArgInt64("a")) - assert.NoError(t, j.ArgError()) + var q Q + assert.Nil(t, j.UnmarshalPayload(&q)) + assert.Equal(t, "cool", q["b"].(string)) + assert.EqualValues(t, 1, q["a"].(float64)) assert.True(t, j.Unique) // Now try to enqueue more stuff and ensure it diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..a74e1998 --- /dev/null +++ b/go.mod @@ -0,0 +1,12 @@ +module github.com/DispatchMe/go-work + +require ( + github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gocraft/web v0.0.0-20170925135945-d8611de039df + github.com/gocraft/work v0.5.1 + github.com/gomodule/redigo v2.0.0+incompatible + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967 + github.com/stretchr/testify v1.2.2 +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..ec406cb1 --- /dev/null +++ b/go.sum @@ -0,0 +1,16 @@ +github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd h1:ePesaBzdTmoMQjwqRCLP2jY+jjWMBpwws/LEQdt1fMM= +github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd/go.mod h1:TNehV1AhBwtT7Bd+rh8G6MoGDbBLNs/sKdk3nvr4Yzg= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gocraft/web v0.0.0-20170925135945-d8611de039df h1:3gd2HoQ1d5bN0/U4aTmrB4hRKUnycYeYYBsNGY/A9fU= +github.com/gocraft/web v0.0.0-20170925135945-d8611de039df/go.mod h1:Ag7UMbZNGrnHwaXPJOUKJIVgx4QOWMOWZngrvsN6qak= +github.com/gocraft/work v0.5.1 h1:3bRjMiOo6N4zcRgZWV3Y7uX7R22SF+A9bPTk4xRXr34= +github.com/gocraft/work v0.5.1/go.mod h1:pc3n9Pb5FAESPPGfM0nL+7Q1xtgtRnF8rr/azzhQVlM= +github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= +github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967 h1:x7xEyJDP7Hv3LVgvWhzioQqbC/KtuUhTigKlH/8ehhE= +github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/heartbeater.go b/heartbeater.go index 14714e6a..55c214d4 100644 --- a/heartbeater.go +++ b/heartbeater.go @@ -2,7 +2,7 @@ package work import ( // "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "os" "sort" "strings" diff --git a/heartbeater_test.go b/heartbeater_test.go index dbc436b1..0247416a 100644 --- a/heartbeater_test.go +++ b/heartbeater_test.go @@ -2,7 +2,7 @@ package work import ( // "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" "testing" "time" diff --git a/job.go b/job.go index 7b3a7076..ef3d9617 100644 --- a/job.go +++ b/job.go @@ -2,19 +2,21 @@ package work import ( "encoding/json" - "fmt" - "math" - "reflect" ) // Job represents a job. type Job struct { // Inputs when making a new job - Name string `json:"name,omitempty"` - ID string `json:"id"` - EnqueuedAt int64 `json:"t"` - Args map[string]interface{} `json:"args"` - Unique bool `json:"unique,omitempty"` + Name string `json:"name,omitempty"` + ID string `json:"id"` + EnqueuedAt int64 `json:"t"` + + // Payload can be any json-marshallable object. This leads to a + // double json-marshal which could be a bit better optimized, + // but makes it easier to have custom job definitions. + Payload []byte `json:"payload"` + + Unique bool `json:"unique,omitempty"` // Inputs when retrying Fails int64 `json:"fails,omitempty"` // number of times this job has failed @@ -28,6 +30,23 @@ type Job struct { observer *observer } +func (j *Job) SetPayload(payload interface{}) error { + marshaled, err := json.Marshal(payload) + if err != nil { + return err + } + j.Payload = marshaled + return nil +} + +func (j *Job) UnmarshalPayload(dest interface{}) error { + if len(j.Payload) == 0 { + return nil + } + + return json.Unmarshal(j.Payload, dest) +} + // Q is a shortcut to easily specify arguments for jobs when enqueueing them. // Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com", "track": true}) type Q map[string]interface{} @@ -48,14 +67,6 @@ func (j *Job) serialize() ([]byte, error) { return json.Marshal(j) } -// setArg sets a single named argument on the job. -func (j *Job) setArg(key string, val interface{}) { - if j.Args == nil { - j.Args = make(map[string]interface{}) - } - j.Args[key] = val -} - func (j *Job) failed(err error) { j.Fails++ j.LastErr = err.Error() @@ -68,115 +79,3 @@ func (j *Job) Checkin(msg string) { j.observer.observeCheckin(j.Name, j.ID, msg) } } - -// ArgString returns j.Args[key] typed to a string. If the key is missing or of the wrong type, it sets an argument error -// on the job. This function is meant to be used in the body of a job handling function while extracting arguments, -// followed by a single call to j.ArgError(). -func (j *Job) ArgString(key string) string { - v, ok := j.Args[key] - if ok { - typedV, ok := v.(string) - if ok { - return typedV - } - j.argError = typecastError("string", key, v) - } else { - j.argError = missingKeyError("string", key) - } - return "" -} - -// ArgInt64 returns j.Args[key] typed to an int64. If the key is missing or of the wrong type, it sets an argument error -// on the job. This function is meant to be used in the body of a job handling function while extracting arguments, -// followed by a single call to j.ArgError(). -func (j *Job) ArgInt64(key string) int64 { - v, ok := j.Args[key] - if ok { - rVal := reflect.ValueOf(v) - if isIntKind(rVal) { - return rVal.Int() - } else if isUintKind(rVal) { - vUint := rVal.Uint() - if vUint <= math.MaxInt64 { - return int64(vUint) - } - } else if isFloatKind(rVal) { - vFloat64 := rVal.Float() - vInt64 := int64(vFloat64) - if vFloat64 == math.Trunc(vFloat64) && vInt64 <= 9007199254740892 && vInt64 >= -9007199254740892 { - return vInt64 - } - } - j.argError = typecastError("int64", key, v) - } else { - j.argError = missingKeyError("int64", key) - } - return 0 -} - -// ArgFloat64 returns j.Args[key] typed to a float64. If the key is missing or of the wrong type, it sets an argument error -// on the job. This function is meant to be used in the body of a job handling function while extracting arguments, -// followed by a single call to j.ArgError(). -func (j *Job) ArgFloat64(key string) float64 { - v, ok := j.Args[key] - if ok { - rVal := reflect.ValueOf(v) - if isIntKind(rVal) { - return float64(rVal.Int()) - } else if isUintKind(rVal) { - return float64(rVal.Uint()) - } else if isFloatKind(rVal) { - return rVal.Float() - } - j.argError = typecastError("float64", key, v) - } else { - j.argError = missingKeyError("float64", key) - } - return 0.0 -} - -// ArgBool returns j.Args[key] typed to a bool. If the key is missing or of the wrong type, it sets an argument error -// on the job. This function is meant to be used in the body of a job handling function while extracting arguments, -// followed by a single call to j.ArgError(). -func (j *Job) ArgBool(key string) bool { - v, ok := j.Args[key] - if ok { - typedV, ok := v.(bool) - if ok { - return typedV - } - j.argError = typecastError("bool", key, v) - } else { - j.argError = missingKeyError("bool", key) - } - return false -} - -// ArgError returns the last error generated when extracting typed params. Returns nil if extracting the args went fine. -func (j *Job) ArgError() error { - return j.argError -} - -func isIntKind(v reflect.Value) bool { - k := v.Kind() - return k == reflect.Int || k == reflect.Int8 || k == reflect.Int16 || k == reflect.Int32 || k == reflect.Int64 -} - -func isUintKind(v reflect.Value) bool { - k := v.Kind() - return k == reflect.Uint || k == reflect.Uint8 || k == reflect.Uint16 || k == reflect.Uint32 || k == reflect.Uint64 -} - -func isFloatKind(v reflect.Value) bool { - k := v.Kind() - return k == reflect.Float32 || k == reflect.Float64 -} - -func missingKeyError(jsonType, key string) error { - return fmt.Errorf("looking for a %s in job.Arg[%s] but key wasn't found", jsonType, key) -} - -func typecastError(jsonType, key string, v interface{}) error { - actualType := reflect.TypeOf(v) - return fmt.Errorf("looking for a %s in job.Arg[%s] but value wasn't right type: %v(%v)", jsonType, key, actualType, v) -} diff --git a/job_test.go b/job_test.go index 97e2b99e..578f9001 100644 --- a/job_test.go +++ b/job_test.go @@ -2,254 +2,23 @@ package work import ( "github.com/stretchr/testify/assert" - "math" "testing" ) -func TestJobArgumentExtraction(t *testing.T) { - j := Job{} - j.setArg("str1", "bar") - - j.setArg("int1", int64(77)) - j.setArg("int2", 77) - j.setArg("int3", uint64(77)) - j.setArg("int4", float64(77.0)) - - j.setArg("bool1", true) - - j.setArg("float1", 3.14) - - // - // Success cases: - // - vString := j.ArgString("str1") - assert.Equal(t, vString, "bar") - assert.NoError(t, j.ArgError()) - - vInt64 := j.ArgInt64("int1") - assert.EqualValues(t, vInt64, 77) - assert.NoError(t, j.ArgError()) - - vInt64 = j.ArgInt64("int2") - assert.EqualValues(t, vInt64, 77) - assert.NoError(t, j.ArgError()) - - vInt64 = j.ArgInt64("int3") - assert.EqualValues(t, vInt64, 77) - assert.NoError(t, j.ArgError()) - - vInt64 = j.ArgInt64("int4") - assert.EqualValues(t, vInt64, 77) - assert.NoError(t, j.ArgError()) - - vBool := j.ArgBool("bool1") - assert.Equal(t, vBool, true) - assert.NoError(t, j.ArgError()) - - vFloat := j.ArgFloat64("float1") - assert.Equal(t, vFloat, 3.14) - assert.NoError(t, j.ArgError()) - - // Missing key results in error: - vString = j.ArgString("str_missing") - assert.Equal(t, vString, "") - assert.Error(t, j.ArgError()) - j.argError = nil - assert.NoError(t, j.ArgError()) - - vInt64 = j.ArgInt64("int_missing") - assert.EqualValues(t, vInt64, 0) - assert.Error(t, j.ArgError()) - j.argError = nil - assert.NoError(t, j.ArgError()) - - vBool = j.ArgBool("bool_missing") - assert.Equal(t, vBool, false) - assert.Error(t, j.ArgError()) - j.argError = nil - assert.NoError(t, j.ArgError()) - - vFloat = j.ArgFloat64("float_missing") - assert.Equal(t, vFloat, 0.0) - assert.Error(t, j.ArgError()) - j.argError = nil - assert.NoError(t, j.ArgError()) - - // Missing string; Make sure we don't reset it with successes after - vString = j.ArgString("str_missing") - assert.Equal(t, vString, "") - assert.Error(t, j.ArgError()) - _ = j.ArgString("str1") - _ = j.ArgInt64("int1") - _ = j.ArgBool("bool1") - _ = j.ArgFloat64("float1") - assert.Error(t, j.ArgError()) +type FakeJobPayload struct { + Str1 string `json:"str1"` + Int1 int `json:"int1"` } -func TestJobArgumentExtractionBadString(t *testing.T) { - var testCases = []struct { - key string - val interface{} - good bool - }{ - {"a", 1, false}, - {"b", false, false}, - {"c", "yay", true}, - } - - j := Job{} - - for _, tc := range testCases { - j.setArg(tc.key, tc.val) - } - - for _, tc := range testCases { - r := j.ArgString(tc.key) - err := j.ArgError() - if tc.good { - if err != nil { - t.Errorf("Failed test case: %v; err = %v\n", tc, err) - } - if r != tc.val.(string) { - t.Errorf("Failed test case: %v; r = %v\n", tc, r) - } - } else { - if err == nil { - t.Errorf("Failed test case: %v; but err was nil\n", tc) - } - if r != "" { - t.Errorf("Failed test case: %v; but r was %v\n", tc, r) - } - } - j.argError = nil - } -} - -func TestJobArgumentExtractionBadBool(t *testing.T) { - var testCases = []struct { - key string - val interface{} - good bool - }{ - {"a", 1, false}, - {"b", "boo", false}, - {"c", true, true}, - {"d", false, true}, - } - - j := Job{} - - for _, tc := range testCases { - j.setArg(tc.key, tc.val) - } - - for _, tc := range testCases { - r := j.ArgBool(tc.key) - err := j.ArgError() - if tc.good { - if err != nil { - t.Errorf("Failed test case: %v; err = %v\n", tc, err) - } - if r != tc.val.(bool) { - t.Errorf("Failed test case: %v; r = %v\n", tc, r) - } - } else { - if err == nil { - t.Errorf("Failed test case: %v; but err was nil\n", tc) - } - if r != false { - t.Errorf("Failed test case: %v; but r was %v\n", tc, r) - } - } - j.argError = nil - } -} - -func TestJobArgumentExtractionBadInt(t *testing.T) { - var testCases = []struct { - key string - val interface{} - good bool - }{ - {"a", "boo", false}, - {"b", true, false}, - {"c", 1.1, false}, - {"d", 19007199254740892.0, false}, - {"e", -19007199254740892.0, false}, - {"f", uint64(math.MaxInt64) + 1, false}, - - {"z", 0, true}, - {"y", 9007199254740892, true}, - {"x", 9007199254740892.0, true}, - {"w", 573839921, true}, - {"v", -573839921, true}, - {"u", uint64(math.MaxInt64), true}, - } - - j := Job{} - - for _, tc := range testCases { - j.setArg(tc.key, tc.val) - } - - for _, tc := range testCases { - r := j.ArgInt64(tc.key) - err := j.ArgError() - if tc.good { - if err != nil { - t.Errorf("Failed test case: %v; err = %v\n", tc, err) - } - } else { - if err == nil { - t.Errorf("Failed test case: %v; but err was nil\n", tc) - } - if r != 0 { - t.Errorf("Failed test case: %v; but r was %v\n", tc, r) - } - } - j.argError = nil - } -} - -func TestJobArgumentExtractionBadFloat(t *testing.T) { - var testCases = []struct { - key string - val interface{} - good bool - }{ - {"a", "boo", false}, - {"b", true, false}, - - {"z", 0, true}, - {"y", 9007199254740892, true}, - {"x", 9007199254740892.0, true}, - {"w", 573839921, true}, - {"v", -573839921, true}, - {"u", math.MaxFloat64, true}, - {"t", math.SmallestNonzeroFloat64, true}, - } - - j := Job{} - - for _, tc := range testCases { - j.setArg(tc.key, tc.val) - } - - for _, tc := range testCases { - r := j.ArgFloat64(tc.key) - err := j.ArgError() - if tc.good { - if err != nil { - t.Errorf("Failed test case: %v; err = %v\n", tc, err) - } - } else { - if err == nil { - t.Errorf("Failed test case: %v; but err was nil\n", tc) - } - if r != 0 { - t.Errorf("Failed test case: %v; but r was %v\n", tc, r) - } - } - j.argError = nil - } +func TestJobPayload(t *testing.T) { + j := new(Job) + assert.Nil(t, j.SetPayload(&FakeJobPayload{ + Str1: "foo", + Int1: 2, + })) + + payload := new(FakeJobPayload) + assert.Nil(t, j.UnmarshalPayload(payload)) + assert.Equal(t, "foo", payload.Str1) + assert.Equal(t, 2, payload.Int1) } diff --git a/observer.go b/observer.go index d92c74e5..51f556e9 100644 --- a/observer.go +++ b/observer.go @@ -1,9 +1,8 @@ package work import ( - "encoding/json" "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "time" ) @@ -49,7 +48,7 @@ type observation struct { // These need to be set when starting a job startedAt int64 - arguments map[string]interface{} + payload []byte // If we're done w/ the job, err will indicate the success/failure of it err error // nil: success. not nil: the error we got when running the job @@ -90,13 +89,13 @@ func (o *observer) drain() { <-o.doneDrainingChan } -func (o *observer) observeStarted(jobName, jobID string, arguments map[string]interface{}) { +func (o *observer) observeStarted(jobName, jobID string, payload []byte) { o.observationsChan <- &observation{ kind: observationKindStarted, jobName: jobName, jobID: jobID, startedAt: nowEpochSeconds(), - arguments: arguments, + payload: payload, } } @@ -200,24 +199,13 @@ func (o *observer) writeStatus(obv *observation) error { // checkin -> obv.checkin // checkin_at -> obv.checkinAt - var argsJSON []byte - if len(obv.arguments) == 0 { - argsJSON = []byte("") - } else { - var err error - argsJSON, err = json.Marshal(obv.arguments) - if err != nil { - return err - } - } - args := make([]interface{}, 0, 13) args = append(args, key, "job_name", obv.jobName, "job_id", obv.jobID, "started_at", obv.startedAt, - "args", argsJSON, + "payload", obv.payload, ) if (obv.checkin != "") && (obv.checkinAt > 0) { diff --git a/observer_test.go b/observer_test.go index 086d83c7..633f481f 100644 --- a/observer_test.go +++ b/observer_test.go @@ -1,13 +1,19 @@ package work import ( + "encoding/json" "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" "testing" // "time" ) +func parsePayload(payload Q) []byte { + data, _ := json.Marshal(payload) + return data +} + func TestObserverStarted(t *testing.T) { pool := newTestPool(":6379") ns := "work" @@ -18,7 +24,7 @@ func TestObserverStarted(t *testing.T) { observer := newObserver(ns, pool, "abcd") observer.start() - observer.observeStarted("foo", "bar", Q{"a": 1, "b": "wat"}) + observer.observeStarted("foo", "bar", parsePayload(Q{"a": 1, "b": "wat"})) //observer.observeDone("foo", "bar", nil) observer.drain() observer.stop() @@ -27,7 +33,7 @@ func TestObserverStarted(t *testing.T) { assert.Equal(t, "foo", h["job_name"]) assert.Equal(t, "bar", h["job_id"]) assert.Equal(t, fmt.Sprint(tMock), h["started_at"]) - assert.Equal(t, `{"a":1,"b":"wat"}`, h["args"]) + assert.Equal(t, `{"a":1,"b":"wat"}`, h["payload"]) } func TestObserverStartedDone(t *testing.T) { @@ -40,7 +46,7 @@ func TestObserverStartedDone(t *testing.T) { observer := newObserver(ns, pool, "abcd") observer.start() - observer.observeStarted("foo", "bar", Q{"a": 1, "b": "wat"}) + observer.observeStarted("foo", "bar", parsePayload(Q{"a": 1, "b": "wat"})) observer.observeDone("foo", "bar", nil) observer.drain() observer.stop() @@ -59,7 +65,7 @@ func TestObserverCheckin(t *testing.T) { tMock := int64(1425263401) setNowEpochSecondsMock(tMock) defer resetNowEpochSecondsMock() - observer.observeStarted("foo", "bar", Q{"a": 1, "b": "wat"}) + observer.observeStarted("foo", "bar", parsePayload(Q{"a": 1, "b": "wat"})) tMockCheckin := int64(1425263402) setNowEpochSecondsMock(tMockCheckin) @@ -71,7 +77,7 @@ func TestObserverCheckin(t *testing.T) { assert.Equal(t, "foo", h["job_name"]) assert.Equal(t, "bar", h["job_id"]) assert.Equal(t, fmt.Sprint(tMock), h["started_at"]) - assert.Equal(t, `{"a":1,"b":"wat"}`, h["args"]) + assert.Equal(t, `{"a":1,"b":"wat"}`, h["payload"]) assert.Equal(t, "doin it", h["checkin"]) assert.Equal(t, fmt.Sprint(tMockCheckin), h["checkin_at"]) } @@ -86,7 +92,7 @@ func TestObserverCheckinFromJob(t *testing.T) { tMock := int64(1425263401) setNowEpochSecondsMock(tMock) defer resetNowEpochSecondsMock() - observer.observeStarted("foo", "barbar", Q{"a": 1, "b": "wat"}) + observer.observeStarted("foo", "barbar", parsePayload(Q{"a": 1, "b": "wat"})) tMockCheckin := int64(1425263402) setNowEpochSecondsMock(tMockCheckin) diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index 91085f70..bc00df72 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -2,7 +2,7 @@ package work import ( "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/robfig/cron" "math/rand" "time" @@ -101,7 +101,7 @@ func (pe *periodicEnqueuer) enqueue() error { // This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history. EnqueuedAt: epoch, - Args: nil, + Payload: nil, } rawJSON, err := job.serialize() diff --git a/periodic_enqueuer_test.go b/periodic_enqueuer_test.go index 68d52b2f..ac6ba67c 100644 --- a/periodic_enqueuer_test.go +++ b/periodic_enqueuer_test.go @@ -1,7 +1,7 @@ package work import ( - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/robfig/cron" "github.com/stretchr/testify/assert" "testing" @@ -59,7 +59,7 @@ func TestPeriodicEnqueuer(t *testing.T) { for i, e := range expected { assert.EqualValues(t, scheduledJobs[i].RunAt, scheduledJobs[i].EnqueuedAt) - assert.Nil(t, scheduledJobs[i].Args) + assert.Nil(t, scheduledJobs[i].Payload) assert.Equal(t, e.name, scheduledJobs[i].Name) assert.Equal(t, e.id, scheduledJobs[i].ID) diff --git a/redis.go b/redis.go index c3f8c45b..f0a06c95 100644 --- a/redis.go +++ b/redis.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "strings" ) func redisNamespacePrefix(namespace string) string { @@ -56,7 +57,7 @@ func redisKeyHeartbeat(namespace, workerPoolID string) string { return redisNamespacePrefix(namespace) + "worker_pools:" + workerPoolID } -func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) { +func redisKeyUniqueJob(namespace, jobName string, payload interface{}) (string, error) { var buf bytes.Buffer buf.WriteString(redisNamespacePrefix(namespace)) @@ -64,14 +65,25 @@ func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) ( buf.WriteString(jobName) buf.WriteRune(':') - if args != nil { - err := json.NewEncoder(&buf).Encode(args) + if payload != nil { + var err error + // If it's already a byte array, use that. Otherwise encode it + if byteArr, ok := payload.([]byte); ok { + _, err = buf.Write(byteArr) + } else { + + err = json.NewEncoder(&buf).Encode(payload) + } + if err != nil { return "", err } } - return buf.String(), nil + // JSON encode adds a new line at the end so will not be the same as + // the decoded byte array. Need to strip it. + // https://github.com/golang/go/issues/7767 + return strings.TrimSuffix(buf.String(), "\n"), nil } func redisKeyLastPeriodicEnqueue(namespace string) string { diff --git a/requeuer.go b/requeuer.go index cd79de3f..26da9140 100644 --- a/requeuer.go +++ b/requeuer.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) type requeuer struct { diff --git a/requeuer_test.go b/requeuer_test.go index 3d1a7f40..7a7a8d13 100644 --- a/requeuer_test.go +++ b/requeuer_test.go @@ -1,7 +1,7 @@ package work import ( - // "github.com/garyburd/redigo/redis" + // "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" "testing" // "fmt" diff --git a/run.go b/run.go index c6f38615..b6fc579d 100644 --- a/run.go +++ b/run.go @@ -2,13 +2,11 @@ package work import ( "fmt" - "reflect" ) // returns an error if the job fails, or there's a panic, or we couldn't reflect correctly. // if we return an error, it signals we want the job to be retried. -func runJob(job *Job, ctxType reflect.Type, middleware []*middlewareHandler, jt *jobType) (returnCtx reflect.Value, returnError error) { - returnCtx = reflect.New(ctxType) +func runJob(ctx *Context, middleware []Middleware, jt *jobType) (returnError error) { currentMiddleware := 0 maxMiddleware := len(middleware) @@ -17,25 +15,10 @@ func runJob(job *Job, ctxType reflect.Type, middleware []*middlewareHandler, jt if currentMiddleware < maxMiddleware { mw := middleware[currentMiddleware] currentMiddleware++ - if mw.IsGeneric { - return mw.GenericMiddlewareHandler(job, next) - } - res := mw.DynamicMiddleware.Call([]reflect.Value{returnCtx, reflect.ValueOf(job), reflect.ValueOf(next)}) - x := res[0].Interface() - if x == nil { - return nil - } - return x.(error) + return mw(ctx, next) } - if jt.IsGeneric { - return jt.GenericHandler(job) - } - res := jt.DynamicHandler.Call([]reflect.Value{returnCtx, reflect.ValueOf(job)}) - x := res[0].Interface() - if x == nil { - return nil - } - return x.(error) + + return jt.Handler(ctx) } defer func() { diff --git a/run_test.go b/run_test.go index deb62dc2..4a7a696f 100644 --- a/run_test.go +++ b/run_test.go @@ -3,168 +3,164 @@ package work import ( "fmt" "github.com/stretchr/testify/assert" - "reflect" "testing" ) +func appendToContext(ctx *Context, val interface{}) { + existing := ctx.Get("record") + vs := "" + if existing != nil { + vs = existing.(string) + } + vs = vs + val.(string) + ctx.Set("record", vs) +} + func TestRunBasicMiddleware(t *testing.T) { - mw1 := func(j *Job, next NextMiddlewareFunc) error { - j.setArg("mw1", "mw1") + mw1 := func(ctx *Context, next NextMiddlewareFunc) error { + ctx.Job.SetPayload(Q{"mw1": "mw1", "a": "foo"}) return next() } - mw2 := func(c *tstCtx, j *Job, next NextMiddlewareFunc) error { - c.record(j.Args["mw1"].(string)) - c.record("mw2") + mw2 := func(ctx *Context, next NextMiddlewareFunc) error { + var q Q + assert.NoError(t, ctx.Job.UnmarshalPayload(&q)) + appendToContext(ctx, q["mw1"]) + appendToContext(ctx, "mw2") return next() } - mw3 := func(c *tstCtx, j *Job, next NextMiddlewareFunc) error { - c.record("mw3") + mw3 := func(ctx *Context, next NextMiddlewareFunc) error { + appendToContext(ctx, "mw3") return next() } - h1 := func(c *tstCtx, j *Job) error { - c.record("h1") - c.record(j.Args["a"].(string)) + h1 := func(ctx *Context) error { + var q Q + assert.NoError(t, ctx.Job.UnmarshalPayload(&q)) + appendToContext(ctx, "h1") + appendToContext(ctx, q["a"]) return nil } - middleware := []*middlewareHandler{ - {IsGeneric: true, GenericMiddlewareHandler: mw1}, - {IsGeneric: false, DynamicMiddleware: reflect.ValueOf(mw2)}, - {IsGeneric: false, DynamicMiddleware: reflect.ValueOf(mw3)}, - } + middleware := []Middleware{mw1, mw2, mw3} jt := &jobType{ - Name: "foo", - IsGeneric: false, - DynamicHandler: reflect.ValueOf(h1), + Name: "foo", + Handler: h1, } job := &Job{ Name: "foo", - Args: map[string]interface{}{"a": "foo"}, } - v, err := runJob(job, tstCtxType, middleware, jt) + ctx := NewContext(job) + err := runJob(ctx, middleware, jt) assert.NoError(t, err) - c := v.Interface().(*tstCtx) - assert.Equal(t, "mw1mw2mw3h1foo", c.String()) + assert.Equal(t, "mw1mw2mw3h1foo", ctx.Get("record")) } func TestRunHandlerError(t *testing.T) { - mw1 := func(j *Job, next NextMiddlewareFunc) error { + mw1 := func(ctx *Context, next NextMiddlewareFunc) error { return next() } - h1 := func(c *tstCtx, j *Job) error { - c.record("h1") + h1 := func(ctx *Context) error { + appendToContext(ctx, "h1") return fmt.Errorf("h1_err") } - middleware := []*middlewareHandler{ - {IsGeneric: true, GenericMiddlewareHandler: mw1}, - } + middleware := []Middleware{mw1} jt := &jobType{ - Name: "foo", - IsGeneric: false, - DynamicHandler: reflect.ValueOf(h1), + Name: "foo", + Handler: h1, } job := &Job{ Name: "foo", } - v, err := runJob(job, tstCtxType, middleware, jt) + ctx := NewContext(job) + err := runJob(ctx, middleware, jt) assert.Error(t, err) assert.Equal(t, "h1_err", err.Error()) - c := v.Interface().(*tstCtx) - assert.Equal(t, "h1", c.String()) + assert.Equal(t, "h1", ctx.Get("record")) } func TestRunMwError(t *testing.T) { - mw1 := func(j *Job, next NextMiddlewareFunc) error { + mw1 := func(ctx *Context, next NextMiddlewareFunc) error { return fmt.Errorf("mw1_err") } - h1 := func(c *tstCtx, j *Job) error { - c.record("h1") + h1 := func(ctx *Context) error { + appendToContext(ctx, "h1") return fmt.Errorf("h1_err") } - middleware := []*middlewareHandler{ - {IsGeneric: true, GenericMiddlewareHandler: mw1}, - } + middleware := []Middleware{mw1} jt := &jobType{ - Name: "foo", - IsGeneric: false, - DynamicHandler: reflect.ValueOf(h1), + Name: "foo", + Handler: h1, } job := &Job{ Name: "foo", } - _, err := runJob(job, tstCtxType, middleware, jt) + ctx := NewContext(job) + err := runJob(ctx, middleware, jt) assert.Error(t, err) assert.Equal(t, "mw1_err", err.Error()) } func TestRunHandlerPanic(t *testing.T) { - mw1 := func(j *Job, next NextMiddlewareFunc) error { + mw1 := func(ctx *Context, next NextMiddlewareFunc) error { return next() } - h1 := func(c *tstCtx, j *Job) error { - c.record("h1") - + h1 := func(ctx *Context) error { + appendToContext(ctx, "h1") panic("dayam") } - middleware := []*middlewareHandler{ - {IsGeneric: true, GenericMiddlewareHandler: mw1}, - } - + middleware := []Middleware{mw1} jt := &jobType{ - Name: "foo", - IsGeneric: false, - DynamicHandler: reflect.ValueOf(h1), + Name: "foo", + Handler: h1, } job := &Job{ Name: "foo", } - _, err := runJob(job, tstCtxType, middleware, jt) + ctx := NewContext(job) + err := runJob(ctx, middleware, jt) assert.Error(t, err) assert.Equal(t, "dayam", err.Error()) } func TestRunMiddlewarePanic(t *testing.T) { - mw1 := func(j *Job, next NextMiddlewareFunc) error { + mw1 := func(ctx *Context, next NextMiddlewareFunc) error { panic("dayam") } - h1 := func(c *tstCtx, j *Job) error { - c.record("h1") + h1 := func(ctx *Context) error { + appendToContext(ctx, "h1") return nil } - middleware := []*middlewareHandler{ - {IsGeneric: true, GenericMiddlewareHandler: mw1}, - } + middleware := []Middleware{mw1} jt := &jobType{ - Name: "foo", - IsGeneric: false, - DynamicHandler: reflect.ValueOf(h1), + Name: "foo", + Handler: h1, } job := &Job{ Name: "foo", } - _, err := runJob(job, tstCtxType, middleware, jt) + ctx := NewContext(job) + err := runJob(ctx, middleware, jt) assert.Error(t, err) assert.Equal(t, "dayam", err.Error()) } diff --git a/webui/internal/assets/src/DeadJobs.js b/webui/internal/assets/src/DeadJobs.js index ea044acf..cbf846ba 100644 --- a/webui/internal/assets/src/DeadJobs.js +++ b/webui/internal/assets/src/DeadJobs.js @@ -143,7 +143,7 @@ export default class DeadJobs extends React.Component { this.check(job)}/> {job.name} - {JSON.stringify(job.args)} + {job.payload} {job.err} diff --git a/webui/internal/assets/src/DeadJobs.test.js b/webui/internal/assets/src/DeadJobs.test.js index c5aa1a3c..531f1689 100644 --- a/webui/internal/assets/src/DeadJobs.test.js +++ b/webui/internal/assets/src/DeadJobs.test.js @@ -16,8 +16,8 @@ describe('DeadJobs', () => { deadJobs.setState({ count: 2, jobs: [ - {id: 1, name: 'test', args: {}, t: 1467760821, err: 'err1'}, - {id: 2, name: 'test2', args: {}, t: 1467760822, err: 'err2'} + {id: 1, name: 'test', payload: {}, t: 1467760821, err: 'err1'}, + {id: 2, name: 'test2', payload: {}, t: 1467760822, err: 'err2'} ] }); @@ -86,7 +86,7 @@ describe('DeadJobs', () => { job.push({ id: i, name: 'test', - args: {}, + payload: '{}', t: 1467760821, err: 'err', }); diff --git a/webui/internal/assets/src/Processes.js b/webui/internal/assets/src/Processes.js index d45a6f08..8052b78e 100644 --- a/webui/internal/assets/src/Processes.js +++ b/webui/internal/assets/src/Processes.js @@ -26,7 +26,7 @@ class BusyWorkers extends React.Component { return ( {worker.job_name} - {worker.args_json} + {worker.payload} {worker.checkin} diff --git a/webui/internal/assets/src/Processes.test.js b/webui/internal/assets/src/Processes.test.js index 5fb3892a..24991210 100644 --- a/webui/internal/assets/src/Processes.test.js +++ b/webui/internal/assets/src/Processes.test.js @@ -21,7 +21,7 @@ describe('Processes', () => { started_at: 1467753603, checkin_at: 1467753603, checkin: '123', - args_json: '{}' + payload: '{}' } ], workerPool: [ @@ -44,7 +44,7 @@ describe('Processes', () => { expect(processes.state.workerPool.length).toEqual(1); expect(processes.workerCount).toEqual(3); - const expectedBusyWorker = [ { args_json: '{}', checkin: '123', checkin_at: 1467753603, job_name: 'job1', started_at: 1467753603, worker_id: '2' } ]; + const expectedBusyWorker = [ { payload: '{}', checkin: '123', checkin_at: 1467753603, job_name: 'job1', started_at: 1467753603, worker_id: '2' } ]; let output = r.getRenderOutput(); let busyWorkers = findAllByTag(output, 'BusyWorkers'); diff --git a/webui/internal/assets/src/RetryJobs.js b/webui/internal/assets/src/RetryJobs.js index c274b6e7..541a752f 100644 --- a/webui/internal/assets/src/RetryJobs.js +++ b/webui/internal/assets/src/RetryJobs.js @@ -59,7 +59,7 @@ export default class RetryJobs extends React.Component { return ( {job.name} - {JSON.stringify(job.args)} + {job.payload} {job.err} diff --git a/webui/internal/assets/src/RetryJobs.test.js b/webui/internal/assets/src/RetryJobs.test.js index 1ad7f680..e304e4f1 100644 --- a/webui/internal/assets/src/RetryJobs.test.js +++ b/webui/internal/assets/src/RetryJobs.test.js @@ -15,8 +15,8 @@ describe('RetryJobs', () => { retryJobs.setState({ count: 2, jobs: [ - {id: 1, name: 'test', args: {}, t: 1467760821, err: 'err1'}, - {id: 2, name: 'test2', args: {}, t: 1467760822, err: 'err2'} + {id: 1, name: 'test', payload: {}, t: 1467760821, err: 'err1'}, + {id: 2, name: 'test2', payload: {}, t: 1467760822, err: 'err2'} ] }); @@ -34,7 +34,7 @@ describe('RetryJobs', () => { job.push({ id: i, name: 'test', - args: {}, + payload: '{}', t: 1467760821, err: 'err', }); diff --git a/webui/internal/assets/src/ScheduledJobs.js b/webui/internal/assets/src/ScheduledJobs.js index 2a576605..c8201dff 100644 --- a/webui/internal/assets/src/ScheduledJobs.js +++ b/webui/internal/assets/src/ScheduledJobs.js @@ -58,7 +58,7 @@ export default class ScheduledJobs extends React.Component { return ( {job.name} - {JSON.stringify(job.args)} + {job.payload} ); diff --git a/webui/internal/assets/src/ScheduledJobs.test.js b/webui/internal/assets/src/ScheduledJobs.test.js index 0eb8805e..49fde430 100644 --- a/webui/internal/assets/src/ScheduledJobs.test.js +++ b/webui/internal/assets/src/ScheduledJobs.test.js @@ -15,8 +15,8 @@ describe('ScheduledJobs', () => { scheduledJobs.setState({ count: 2, jobs: [ - {id: 1, name: 'test', args: {}, t: 1467760821, err: 'err1'}, - {id: 2, name: 'test2', args: {}, t: 1467760822, err: 'err2'} + {id: 1, name: 'test', payload: {}, t: 1467760821, err: 'err1'}, + {id: 2, name: 'test2', payload: {}, t: 1467760822, err: 'err2'} ] }); @@ -34,7 +34,7 @@ describe('ScheduledJobs', () => { job.push({ id: i, name: 'test', - args: {}, + payload: '{}', t: 1467760821, err: 'err', }); diff --git a/webui/webui.go b/webui/webui.go index 428a761e..8e24e52d 100644 --- a/webui/webui.go +++ b/webui/webui.go @@ -7,11 +7,11 @@ import ( "strconv" "sync" + "github.com/DispatchMe/go-work" + "github.com/DispatchMe/go-work/webui/internal/assets" "github.com/braintree/manners" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/gocraft/web" - "github.com/gocraft/work" - "github.com/gocraft/work/webui/internal/assets" ) // Server implements an HTTP server which exposes a JSON API to view and manage gocraft/work items. diff --git a/webui/webui_test.go b/webui/webui_test.go index 3dda02dd..68884b59 100644 --- a/webui/webui_test.go +++ b/webui/webui_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/gocraft/work" "github.com/stretchr/testify/assert" ) diff --git a/worker.go b/worker.go index ab42ea53..6ccf2501 100644 --- a/worker.go +++ b/worker.go @@ -3,20 +3,18 @@ package work import ( "fmt" "math/rand" - "reflect" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) type worker struct { - workerID string - poolID string - namespace string - pool *redis.Pool - jobTypes map[string]*jobType - middleware []*middlewareHandler - contextType reflect.Type + workerID string + poolID string + namespace string + pool *redis.Pool + jobTypes map[string]*jobType + middleware []Middleware redisFetchScript *redis.Script sampler prioritySampler @@ -29,16 +27,15 @@ type worker struct { doneDrainingChan chan struct{} } -func newWorker(namespace string, poolID string, pool *redis.Pool, contextType reflect.Type, middleware []*middlewareHandler, jobTypes map[string]*jobType) *worker { +func newWorker(namespace string, poolID string, pool *redis.Pool, middleware []Middleware, jobTypes map[string]*jobType) *worker { workerID := makeIdentifier() ob := newObserver(namespace, pool, workerID) w := &worker{ - workerID: workerID, - poolID: poolID, - namespace: namespace, - pool: pool, - contextType: contextType, + workerID: workerID, + poolID: poolID, + namespace: namespace, + pool: pool, observer: ob, @@ -55,7 +52,7 @@ func newWorker(namespace string, poolID string, pool *redis.Pool, contextType re } // note: can't be called while the thing is started -func (w *worker) updateMiddlewareAndJobTypes(middleware []*middlewareHandler, jobTypes map[string]*jobType) { +func (w *worker) updateMiddlewareAndJobTypes(middleware []Middleware, jobTypes map[string]*jobType) { w.middleware = middleware sampler := prioritySampler{} for _, jt := range jobTypes { @@ -182,9 +179,10 @@ func (w *worker) processJob(job *Job) { w.deleteUniqueJob(job) } if jt, ok := w.jobTypes[job.Name]; ok { - w.observeStarted(job.Name, job.ID, job.Args) + ctx := NewContext(job) + w.observeStarted(job.Name, job.ID, job.Payload) job.observer = w.observer // for Checkin - _, runErr := runJob(job, w.contextType, w.middleware, jt) + runErr := runJob(ctx, w.middleware, jt) w.observeDone(job.Name, job.ID, runErr) if runErr != nil { job.failed(runErr) @@ -202,7 +200,7 @@ func (w *worker) processJob(job *Job) { } func (w *worker) deleteUniqueJob(job *Job) { - uniqueKey, err := redisKeyUniqueJob(w.namespace, job.Name, job.Args) + uniqueKey, err := redisKeyUniqueJob(w.namespace, job.Name, job.Payload) if err != nil { logError("worker.delete_unique_job.key", err) } @@ -249,7 +247,8 @@ func (w *worker) addToRetry(job *Job, runErr error) { var backoff BackoffCalculator // Choose the backoff provider - jt, ok := w.jobTypes[job.Name]; if ok { + jt, ok := w.jobTypes[job.Name] + if ok { backoff = jt.Backoff } diff --git a/worker_pool.go b/worker_pool.go index e4930f59..f95ff18b 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -1,11 +1,9 @@ package work import ( - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/robfig/cron" - "reflect" "sort" - "strings" "sync" ) @@ -16,9 +14,8 @@ type WorkerPool struct { namespace string // eg, "myapp-work" pool *redis.Pool - contextType reflect.Type jobTypes map[string]*jobType - middleware []*middlewareHandler + middleware []Middleware started bool periodicJobs []*periodicJob @@ -30,13 +27,12 @@ type WorkerPool struct { periodicEnqueuer *periodicEnqueuer } +type Handler func(ctx *Context) error + type jobType struct { Name string JobOptions - - IsGeneric bool - GenericHandler GenericHandler - DynamicHandler reflect.Value + Handler Handler } // You may provide your own backoff function for retrying failed jobs or use the builtin one. @@ -47,68 +43,43 @@ type BackoffCalculator func(job *Job) int64 // JobOptions can be passed to JobWithOptions. type JobOptions struct { - Priority uint // Priority from 1 to 10000 - MaxFails uint // 1: send straight to dead (unless SkipDead) - SkipDead bool // If true, don't send failed jobs to the dead queue when retries are exhausted. + Priority uint // Priority from 1 to 10000 + MaxFails uint // 1: send straight to dead (unless SkipDead) + SkipDead bool // If true, don't send failed jobs to the dead queue when retries are exhausted. Backoff BackoffCalculator // If not set, uses the default backoff algorithm } -// GenericHandler is a job handler without any custom context. -type GenericHandler func(*Job) error - -// GenericMiddlewareHandler is a middleware without any custom context. -type GenericMiddlewareHandler func(*Job, NextMiddlewareFunc) error - // NextMiddlewareFunc is a function type (whose instances are named 'next') that you call to advance to the next middleware. type NextMiddlewareFunc func() error -type middlewareHandler struct { - IsGeneric bool - DynamicMiddleware reflect.Value - GenericMiddlewareHandler GenericMiddlewareHandler -} - // NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. concurrency specifies how many workers to spin up - each worker can process jobs concurrently. -func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool) *WorkerPool { +func NewWorkerPool(concurrency uint, namespace string, pool *redis.Pool) *WorkerPool { if pool == nil { panic("NewWorkerPool needs a non-nil *redis.Pool") } - ctxType := reflect.TypeOf(ctx) - validateContextType(ctxType) wp := &WorkerPool{ workerPoolID: makeIdentifier(), concurrency: concurrency, namespace: namespace, pool: pool, - contextType: ctxType, jobTypes: make(map[string]*jobType), } for i := uint(0); i < wp.concurrency; i++ { - w := newWorker(wp.namespace, wp.workerPoolID, wp.pool, wp.contextType, nil, wp.jobTypes) + w := newWorker(wp.namespace, wp.workerPoolID, wp.pool, nil, wp.jobTypes) wp.workers = append(wp.workers, w) } return wp } +type Middleware func(ctx *Context, next NextMiddlewareFunc) error + // Middleware appends the specified function to the middleware chain. The fn can take one of these forms: // (*ContextType).func(*Job, NextMiddlewareFunc) error, (ContextType matches the type of ctx specified when creating a pool) // func(*Job, NextMiddlewareFunc) error, for the generic middleware format. -func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool { - vfn := reflect.ValueOf(fn) - validateMiddlewareType(wp.contextType, vfn) - - mw := &middlewareHandler{ - DynamicMiddleware: vfn, - } - - if gmh, ok := fn.(func(*Job, NextMiddlewareFunc) error); ok { - mw.IsGeneric = true - mw.GenericMiddlewareHandler = gmh - } - +func (wp *WorkerPool) Middleware(mw Middleware) *WorkerPool { wp.middleware = append(wp.middleware, mw) for _, w := range wp.workers { @@ -122,24 +93,18 @@ func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool { // fn can take one of these forms: // (*ContextType).func(*Job) error, (ContextType matches the type of ctx specified when creating a pool) // func(*Job) error, for the generic handler format. -func (wp *WorkerPool) Job(name string, fn interface{}) *WorkerPool { - return wp.JobWithOptions(name, JobOptions{}, fn) +func (wp *WorkerPool) Job(name string, handler Handler) *WorkerPool { + return wp.JobWithOptions(name, JobOptions{}, handler) } // JobWithOptions adds a handler for 'name' jobs as per the Job function, but permits you specify additional options such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them. -func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool { +func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, handler Handler) *WorkerPool { jobOpts = applyDefaultsAndValidate(jobOpts) - vfn := reflect.ValueOf(fn) - validateHandlerType(wp.contextType, vfn) jt := &jobType{ - Name: name, - DynamicHandler: vfn, - JobOptions: jobOpts, - } - if gh, ok := fn.(func(*Job) error); ok { - jt.IsGeneric = true - jt.GenericHandler = gh + Name: name, + Handler: handler, + JobOptions: jobOpts, } wp.jobTypes[name] = jt @@ -265,158 +230,6 @@ func (wp *WorkerPool) writeKnownJobsToRedis() { } } -func newJobTypeGeneric(name string, opts JobOptions, handler GenericHandler) *jobType { - return &jobType{ - Name: name, - JobOptions: opts, - IsGeneric: true, - GenericHandler: handler, - } -} - -// validateContextType will panic if context is invalid -func validateContextType(ctxType reflect.Type) { - if ctxType.Kind() != reflect.Struct { - panic("work: Context needs to be a struct type") - } -} - -func validateHandlerType(ctxType reflect.Type, vfn reflect.Value) { - if !isValidHandlerType(ctxType, vfn) { - panic(instructiveMessage(vfn, "a handler", "handler", "job *work.Job", ctxType)) - } -} - -func validateMiddlewareType(ctxType reflect.Type, vfn reflect.Value) { - if !isValidMiddlewareType(ctxType, vfn) { - panic(instructiveMessage(vfn, "middleware", "middleware", "job *work.Job, next NextMiddlewareFunc", ctxType)) - } -} - -// Since it's easy to pass the wrong method as a middleware/handler, and since the user can't rely on static type checking since we use reflection, -// lets be super helpful about what they did and what they need to do. -// Arguments: -// - vfn is the failed method -// - addingType is for "You are adding {addingType} to a worker pool...". Eg, "middleware" or "a handler" -// - yourType is for "Your {yourType} function can have...". Eg, "middleware" or "handler" or "error handler" -// - args is like "rw web.ResponseWriter, req *web.Request, next web.NextMiddlewareFunc" -// - NOTE: args can be calculated if you pass in each type. BUT, it doesn't have example argument name, so it has less copy/paste value. -func instructiveMessage(vfn reflect.Value, addingType string, yourType string, args string, ctxType reflect.Type) string { - // Get context type without package. - ctxString := ctxType.String() - splitted := strings.Split(ctxString, ".") - if len(splitted) <= 1 { - ctxString = splitted[0] - } else { - ctxString = splitted[1] - } - - str := "\n" + strings.Repeat("*", 120) + "\n" - str += "* You are adding " + addingType + " to a worker pool with context type '" + ctxString + "'\n" - str += "*\n*\n" - str += "* Your " + yourType + " function can have one of these signatures:\n" - str += "*\n" - str += "* // If you don't need context:\n" - str += "* func YourFunctionName(" + args + ") error\n" - str += "*\n" - str += "* // If you want your " + yourType + " to accept a context:\n" - str += "* func (c *" + ctxString + ") YourFunctionName(" + args + ") error // or,\n" - str += "* func YourFunctionName(c *" + ctxString + ", " + args + ") error\n" - str += "*\n" - str += "* Unfortunately, your function has this signature: " + vfn.Type().String() + "\n" - str += "*\n" - str += strings.Repeat("*", 120) + "\n" - - return str -} - -func isValidHandlerType(ctxType reflect.Type, vfn reflect.Value) bool { - fnType := vfn.Type() - - if fnType.Kind() != reflect.Func { - return false - } - - numIn := fnType.NumIn() - numOut := fnType.NumOut() - - if numOut != 1 { - return false - } - - outType := fnType.Out(0) - var e *error - - if outType != reflect.TypeOf(e).Elem() { - return false - } - - var j *Job - if numIn == 1 { - if fnType.In(0) != reflect.TypeOf(j) { - return false - } - } else if numIn == 2 { - if fnType.In(0) != reflect.PtrTo(ctxType) { - return false - } - if fnType.In(1) != reflect.TypeOf(j) { - return false - } - } else { - return false - } - - return true -} - -func isValidMiddlewareType(ctxType reflect.Type, vfn reflect.Value) bool { - fnType := vfn.Type() - - if fnType.Kind() != reflect.Func { - return false - } - - numIn := fnType.NumIn() - numOut := fnType.NumOut() - - if numOut != 1 { - return false - } - - outType := fnType.Out(0) - var e *error - - if outType != reflect.TypeOf(e).Elem() { - return false - } - - var j *Job - var nfn NextMiddlewareFunc - if numIn == 2 { - if fnType.In(0) != reflect.TypeOf(j) { - return false - } - if fnType.In(1) != reflect.TypeOf(nfn) { - return false - } - } else if numIn == 3 { - if fnType.In(0) != reflect.PtrTo(ctxType) { - return false - } - if fnType.In(1) != reflect.TypeOf(j) { - return false - } - if fnType.In(2) != reflect.TypeOf(nfn) { - return false - } - } else { - return false - } - - return true -} - func applyDefaultsAndValidate(jobOpts JobOptions) JobOptions { if jobOpts.Priority == 0 { jobOpts.Priority = 1 diff --git a/worker_pool_test.go b/worker_pool_test.go index 7cd8b06b..23eae03a 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -1,78 +1,13 @@ package work import ( - "bytes" - "fmt" - "github.com/stretchr/testify/assert" - "reflect" "testing" ) -type tstCtx struct { - a int - bytes.Buffer -} - -func (c *tstCtx) record(s string) { - _, _ = c.WriteString(s) -} - -var tstCtxType = reflect.TypeOf(tstCtx{}) - -func TestWorkerPoolHandlerValidations(t *testing.T) { - var cases = []struct { - fn interface{} - good bool - }{ - {func(j *Job) error { return nil }, true}, - {func(c *tstCtx, j *Job) error { return nil }, true}, - {func(c *tstCtx, j *Job) {}, false}, - {func(c *tstCtx, j *Job) string { return "" }, false}, - {func(c *tstCtx, j *Job) (error, string) { return nil, "" }, false}, - {func(c *tstCtx) error { return nil }, false}, - {func(c tstCtx, j *Job) error { return nil }, false}, - {func() error { return nil }, false}, - {func(c *tstCtx, j *Job, wat string) error { return nil }, false}, - } - - for i, testCase := range cases { - r := isValidHandlerType(tstCtxType, reflect.ValueOf(testCase.fn)) - if testCase.good != r { - t.Errorf("idx %d: should return %v but returned %v", i, testCase.good, r) - } - } -} - -func TestWorkerPoolMiddlewareValidations(t *testing.T) { - var cases = []struct { - fn interface{} - good bool - }{ - {func(j *Job, n NextMiddlewareFunc) error { return nil }, true}, - {func(c *tstCtx, j *Job, n NextMiddlewareFunc) error { return nil }, true}, - {func(c *tstCtx, j *Job) error { return nil }, false}, - {func(c *tstCtx, j *Job, n NextMiddlewareFunc) {}, false}, - {func(c *tstCtx, j *Job, n NextMiddlewareFunc) string { return "" }, false}, - {func(c *tstCtx, j *Job, n NextMiddlewareFunc) (error, string) { return nil, "" }, false}, - {func(c *tstCtx, n NextMiddlewareFunc) error { return nil }, false}, - {func(c tstCtx, j *Job, n NextMiddlewareFunc) error { return nil }, false}, - {func() error { return nil }, false}, - {func(c *tstCtx, j *Job, wat string) error { return nil }, false}, - {func(c *tstCtx, j *Job, n NextMiddlewareFunc, wat string) error { return nil }, false}, - } - - for i, testCase := range cases { - r := isValidMiddlewareType(tstCtxType, reflect.ValueOf(testCase.fn)) - if testCase.good != r { - t.Errorf("idx %d: should return %v but returned %v", i, testCase.good, r) - } - } -} - func TestWorkerPoolStartStop(t *testing.T) { pool := newTestPool(":6379") ns := "work" - wp := NewWorkerPool(TestContext{}, 10, ns, pool) + wp := NewWorkerPool(10, ns, pool) wp.Start() wp.Start() wp.Stop() @@ -80,33 +15,3 @@ func TestWorkerPoolStartStop(t *testing.T) { wp.Start() wp.Stop() } - -func TestWorkerPoolValidations(t *testing.T) { - pool := newTestPool(":6379") - ns := "work" - wp := NewWorkerPool(TestContext{}, 10, ns, pool) - - func() { - defer func() { - if panicErr := recover(); panicErr != nil { - assert.Regexp(t, "Your middleware function can have one of these signatures", fmt.Sprintf("%v", panicErr)) - } else { - t.Errorf("expected a panic when using bad middleware") - } - }() - - wp.Middleware(TestWorkerPoolValidations) - }() - - func() { - defer func() { - if panicErr := recover(); panicErr != nil { - assert.Regexp(t, "Your handler function can have one of these signatures", fmt.Sprintf("%v", panicErr)) - } else { - t.Errorf("expected a panic when using a bad handler") - } - }() - - wp.Job("wat", TestWorkerPoolValidations) - }() -} diff --git a/worker_test.go b/worker_test.go index 045e4713..e5cea8b9 100644 --- a/worker_test.go +++ b/worker_test.go @@ -6,10 +6,14 @@ import ( "testing" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" ) +type WorkerTestJobArgs struct { + A float64 +} + func TestWorkerBasics(t *testing.T) { pool := newTestPool(":6379") ns := "work" @@ -27,40 +31,43 @@ func TestWorkerBasics(t *testing.T) { jobTypes[job1] = &jobType{ Name: job1, JobOptions: JobOptions{Priority: 1}, - IsGeneric: true, - GenericHandler: func(job *Job) error { - arg1 = job.Args["a"].(float64) + Handler: func(ctx *Context) error { + args := new(WorkerTestJobArgs) + ctx.Job.UnmarshalPayload(args) + arg1 = args.A return nil }, } jobTypes[job2] = &jobType{ Name: job2, JobOptions: JobOptions{Priority: 1}, - IsGeneric: true, - GenericHandler: func(job *Job) error { - arg2 = job.Args["a"].(float64) + Handler: func(ctx *Context) error { + args := new(WorkerTestJobArgs) + ctx.Job.UnmarshalPayload(args) + arg2 = args.A return nil }, } jobTypes[job3] = &jobType{ Name: job3, JobOptions: JobOptions{Priority: 1}, - IsGeneric: true, - GenericHandler: func(job *Job) error { - arg3 = job.Args["a"].(float64) + Handler: func(ctx *Context) error { + args := new(WorkerTestJobArgs) + ctx.Job.UnmarshalPayload(args) + arg3 = args.A return nil }, } enqueuer := NewEnqueuer(ns, pool) - _, err := enqueuer.Enqueue(job1, Q{"a": 1}) + _, err := enqueuer.Enqueue(job1, &WorkerTestJobArgs{1}) assert.Nil(t, err) - _, err = enqueuer.Enqueue(job2, Q{"a": 2}) + _, err = enqueuer.Enqueue(job2, &WorkerTestJobArgs{2}) assert.Nil(t, err) - _, err = enqueuer.Enqueue(job3, Q{"a": 3}) + _, err = enqueuer.Enqueue(job3, &WorkerTestJobArgs{3}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, nil, jobTypes) w.start() w.drain() w.stop() @@ -98,8 +105,7 @@ func TestWorkerInProgress(t *testing.T) { jobTypes[job1] = &jobType{ Name: job1, JobOptions: JobOptions{Priority: 1}, - IsGeneric: true, - GenericHandler: func(job *Job) error { + Handler: func(ctx *Context) error { time.Sleep(30 * time.Millisecond) return nil }, @@ -109,7 +115,7 @@ func TestWorkerInProgress(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, nil, jobTypes) w.start() // instead of w.forceIter(), we'll wait for 10 milliseconds to let the job start @@ -122,7 +128,7 @@ func TestWorkerInProgress(t *testing.T) { w.observer.drain() h := readHash(pool, redisKeyWorkerObservation(ns, w.workerID)) assert.Equal(t, job1, h["job_name"]) - assert.Equal(t, `{"a":1}`, h["args"]) + assert.Equal(t, `{"a":1}`, h["payload"]) // NOTE: we could check for job_id and started_at, but it's a PITA and it's tested in observer_test. w.drain() @@ -148,8 +154,7 @@ func TestWorkerRetry(t *testing.T) { jobTypes[job1] = &jobType{ Name: job1, JobOptions: JobOptions{Priority: 1, MaxFails: 3}, - IsGeneric: true, - GenericHandler: func(job *Job) error { + Handler: func(ctx *Context) error { return fmt.Errorf("sorry kid") }, } @@ -157,7 +162,7 @@ func TestWorkerRetry(t *testing.T) { enqueuer := NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, nil, jobTypes) w.start() w.drain() w.stop() @@ -198,8 +203,7 @@ func TestWorkerRetryWithCustomBackoff(t *testing.T) { jobTypes[job1] = &jobType{ Name: job1, JobOptions: JobOptions{Priority: 1, MaxFails: 3, Backoff: custombo}, - IsGeneric: true, - GenericHandler: func(job *Job) error { + Handler: func(ctx *Context) error { return fmt.Errorf("sorry kid") }, } @@ -207,7 +211,7 @@ func TestWorkerRetryWithCustomBackoff(t *testing.T) { enqueuer := NewEnqueuer(ns, pool) _, err := enqueuer.Enqueue(job1, Q{"a": 1}) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, nil, jobTypes) w.start() w.drain() w.stop() @@ -244,16 +248,14 @@ func TestWorkerDead(t *testing.T) { jobTypes[job1] = &jobType{ Name: job1, JobOptions: JobOptions{Priority: 1, MaxFails: 0}, - IsGeneric: true, - GenericHandler: func(job *Job) error { + Handler: func(ctx *Context) error { return fmt.Errorf("sorry kid1") }, } jobTypes[job2] = &jobType{ Name: job2, JobOptions: JobOptions{Priority: 1, MaxFails: 0, SkipDead: true}, - IsGeneric: true, - GenericHandler: func(job *Job) error { + Handler: func(ctx *Context) error { return fmt.Errorf("sorry kid2") }, } @@ -263,7 +265,7 @@ func TestWorkerDead(t *testing.T) { assert.Nil(t, err) _, err = enqueuer.Enqueue(job2, nil) assert.Nil(t, err) - w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes) + w := newWorker(ns, "1", pool, nil, jobTypes) w.start() w.drain() w.stop() @@ -297,7 +299,7 @@ func TestStop(t *testing.T) { return c, nil }, } - wp := NewWorkerPool(TestContext{}, 10, "work", redisPool) + wp := NewWorkerPool(10, "work", redisPool) wp.Start() wp.Stop() } @@ -315,8 +317,8 @@ func BenchmarkJobProcessing(b *testing.B) { } } - wp := NewWorkerPool(TestContext{}, 10, ns, pool) - wp.Job("wat", func(c *TestContext, job *Job) error { + wp := NewWorkerPool(10, ns, pool) + wp.Job("wat", func(ctx *Context) error { return nil })