This is a redis-based queue for usage in Go. This project is a fork from go-redis-queue I liked this solution a lot but i missed a few features.
- Ability to add arbitrary tasks to a queue in redis
- Option to Dedup tasks based on the task signature.
- Ability to schedule tasks in the future.
- Atomic Push and Pop from queue. Two workers cannot get the same job.
- Sorted FIFO queue.
- Can act like a priority queue by scheduling a job with a really old timestamp
- Simple API
- Insert multiple job at once
- Remove a job
- Have multiple times the same job (same content)
Adding jobs to a queue.
import "github.com/missena-corp/airq"
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil { ... }
defer c.Close()
q := airq.New("queue_name", airq.WithConn(c))
added, taskID, err := q.Push(&airq.Job{Content: "basic item"})
if err != nil { ... }
queueSize, err := q.Pending()
if err != nil { ... }
added, taskID, err := q.Push(&airq.Job{
Content: "scheduled item",
When: time.Now().Add(10*time.Minute),
})
if err != nil { ... }
A simple worker processing jobs from a queue:
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil { ... }
defer c.Close()
q := airq.New("queue_name", airq.WithConn(c))
for !timeToQuit {
job, err = q.Pop()
if err != nil { ... }
if job != "" {
// process the job.
} else {
time.Sleep(2*time.Second)
}
}
A batch worker processing jobs from a queue:
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil { ... }
defer c.Close()
q := airq.New("queue_name", airq.WithConn(c))
for !timeToQuit {
jobs, err := q.PopJobs(100) // argument is "limit"
if err != nil { ... }
if len(jobs) > 0 {
for i, job := range jobs {
// process the job.
}
} else {
time.Sleep(2*time.Second)
}
}