Skip to content

Commit

Permalink
Feat/v2 (#33)
Browse files Browse the repository at this point in the history
* refactor: accept job id instead of uuid

* chore: bump to v2

* feat: support for eta
  • Loading branch information
kalbhor authored Jun 8, 2023
1 parent 0a8a376 commit 93c7056
Show file tree
Hide file tree
Showing 22 changed files with 279 additions and 223 deletions.
54 changes: 27 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

![taskqueue](https://user-images.githubusercontent.com/14031096/170992942-3b62e055-6d9e-4c08-a277-ed6d6e9a4c2a.png)

[![Run Tests](https://github.com/kalbhor/Tasqueue/actions/workflows/test.yml/badge.svg)](https://github.com/kalbhor/Tasqueue/actions/workflows/test.yml) [![Go Report Card](https://goreportcard.com/badge/github.com/kalbhor/tasqueue)](https://goreportcard.com/report/github.com/kalbhor/tasqueue)
[![Run Tests](https://github.com/kalbhor/tasqueue/v2/actions/workflows/test.yml/badge.svg)](https://github.com/kalbhor/tasqueue/v2/actions/workflows/test.yml) [![Go Report Card](https://goreportcard.com/badge/github.com/kalbhor/tasqueue/v2)](https://goreportcard.com/report/github.com/kalbhor/tasqueue/v2)

**Tasqueue** is a simple, lightweight distributed job/worker implementation in Go

### Installation

`go get -u github.com/kalbhor/tasqueue`
`go get -u github.com/kalbhor/tasqueue/v2`

- [Concepts](#concepts)
- [Server](#server)
Expand Down Expand Up @@ -73,9 +73,9 @@ package main
import (
"log"

"github.com/kalbhor/tasqueue"
rb "github.com/kalbhor/tasqueue/brokers/redis"
rr "github.com/kalbhor/tasqueue/results/redis"
"github.com/kalbhor/tasqueue/v2"
rb "github.com/kalbhor/tasqueue/v2/brokers/redis"
rr "github.com/kalbhor/tasqueue/v2/results/redis"
"github.com/zerodha/logf"
)

Expand Down Expand Up @@ -132,7 +132,7 @@ package tasks
import (
"encoding/json"

"github.com/kalbhor/tasqueue"
"github.com/kalbhor/tasqueue/v2"
)

type SumPayload struct {
Expand Down Expand Up @@ -183,8 +183,8 @@ A tasqueue job represents a unit of work pushed onto the queue, that requires pr
```go
// JobOpts holds the various options available to configure a job.
type JobOpts struct {
// Optional UUID passed by client. If empty, Tasqueue generates it.
UUID string
// Optional ID passed by client. If empty, Tasqueue generates it.
ID string

Queue string
MaxRetries uint32
Expand All @@ -207,10 +207,10 @@ if err != nil {

#### Enqueuing a job

Once a job is created, it can be enqueued via the server for processing. Calling `srv.Enqueue` returns a job uuid which can be used to query the status of the job.
Once a job is created, it can be enqueued via the server for processing. Calling `srv.Enqueue` returns a job id which can be used to query the status of the job.

```go
uuid, err := srv.Enqueue(ctx, job)
id, err := srv.Enqueue(ctx, job)
if err != nil {
log.Fatal(err)
}
Expand All @@ -221,7 +221,7 @@ if err != nil {
To query the details of a job that was enqueued, we can use `srv.GetJob`. It returns a `JobMessage` which contains details related to a job.

```go
jobMsg, err := srv.GetJob(ctx, uuid)
jobMsg, err := srv.GetJob(ctx, id)
if err != nil {
log.Fatal(err)
}
Expand All @@ -232,8 +232,8 @@ Fields available in a `JobMessage` (embeds `Meta`):
```go
// Meta contains fields related to a job. These are updated when a task is consumed.
type Meta struct {
UUID string
OnSuccessUUID string
ID string
OnSuccessID string
Status string
Queue string
Schedule string
Expand Down Expand Up @@ -280,10 +280,10 @@ if err != nil {

#### Enqueuing a group

Once a group is created, it can be enqueued via the server for processing. Calling `srv.EnqueueGroup` returns a group uuid which can be used to query the status of the group.
Once a group is created, it can be enqueued via the server for processing. Calling `srv.EnqueueGroup` returns a group id which can be used to query the status of the group.

```go
groupUUID, err := srv.EnqueueGroup(ctx, grp)
groupID, err := srv.EnqueueGroup(ctx, grp)
if err != nil {
log.Fatal(err)
}
Expand All @@ -294,7 +294,7 @@ if err != nil {
To query the details of a group that was enqueued, we can use `srv.GetGroup`. It returns a `GroupMessage` which contains details related to a group.

```go
groupMsg, err := srv.GetGroup(ctx, groupUUID)
groupMsg, err := srv.GetGroup(ctx, groupID)
if err != nil {
log.Fatal(err)
}
Expand All @@ -305,9 +305,9 @@ Fields available in a `GroupMessage` (embeds `GroupMeta`):
```go
// GroupMeta contains fields related to a group job. These are updated when a task is consumed.
type GroupMeta struct {
UUID string
ID string
Status string
// JobStatus is a map of individual job uuid -> status
// JobStatus is a map of individual job id -> status
JobStatus map[string]string
}
```
Expand Down Expand Up @@ -340,10 +340,10 @@ if err != nil {

#### Enqueuing a chain

Once a chain is created, it can be enqueued via the server for processing. Calling `srv.EnqueueChain` returns a chain uuid which can be used to query the status of the chain.
Once a chain is created, it can be enqueued via the server for processing. Calling `srv.EnqueueChain` returns a chain id which can be used to query the status of the chain.

```go
chainUUID, err := srv.EnqueueChain(ctx, chn)
chainID, err := srv.EnqueueChain(ctx, chn)
if err != nil {
log.Fatal(err)
}
Expand All @@ -358,7 +358,7 @@ A job in the chain can access the results of the previous job in the chain by ge
To query the details of a chain that was enqueued, we can use `srv.GetChain`. It returns a `ChainMessage` which contains details related to a chian.

```go
chainMsg, err := srv.GetChain(ctx, chainUUID)
chainMsg, err := srv.GetChain(ctx, chainID)
if err != nil {
log.Fatal(err)
}
Expand All @@ -369,12 +369,12 @@ Fields available in a `ChainMessage` (embeds `ChainMeta`):
```go
// ChainMeta contains fields related to a chain job.
type ChainMeta struct {
UUID string
ID string
// Status of the overall chain
Status string
// UUID of the current job part of chain
JobUUID string
// List of UUIDs of completed jobs
// ID of the current job part of chain
JobID string
// List of IDs of completed jobs
PrevJobs []string
}
```
Expand All @@ -386,7 +386,7 @@ A result is arbitrary `[]byte` data saved by a handler or callback via `JobCtx.S
#### Get Result

```go
b, err := srv.GetResult(ctx, jobUUID)
b, err := srv.GetResult(ctx, jobID)
if err != nil {
log.Fatal(err)
}
Expand All @@ -397,7 +397,7 @@ if err != nil {
DeleteJob removes the job's saved metadata from the store

```go
err := srv.DeleteResult(ctx, jobUUID)
err := srv.DeleteResult(ctx, jobID)
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"testing"

"github.com/go-redis/redis"
rb "github.com/kalbhor/tasqueue/brokers/redis"
rr "github.com/kalbhor/tasqueue/results/redis"
rb "github.com/kalbhor/tasqueue/v2/brokers/redis"
rr "github.com/kalbhor/tasqueue/v2/results/redis"
"github.com/zerodha/logf"
)

Expand Down
5 changes: 5 additions & 0 deletions brokers/in-memory/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"
)

type Broker struct {
Expand Down Expand Up @@ -74,3 +75,7 @@ func (r *Broker) GetPending(ctx context.Context, queue string) ([]string, error)

return pending, nil
}

func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error {
return fmt.Errorf("in-memory broker does not support this method")
}
5 changes: 5 additions & 0 deletions brokers/nats-js/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nats
import (
"context"
"fmt"
"time"

"github.com/nats-io/nats.go"
"github.com/zerodha/logf"
Expand Down Expand Up @@ -110,3 +111,7 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
func (b *Broker) GetPending(ctx context.Context, queue string) ([]string, error) {
return nil, fmt.Errorf("nats broker does not support this method")
}

func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error {
return fmt.Errorf("nats broker does not support this method")
}
51 changes: 51 additions & 0 deletions brokers/redis/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/go-redis/redis/v8"
Expand All @@ -12,6 +13,7 @@ import (

const (
DefaultPollPeriod = time.Second
sortedSetKey = "tasqueue:ss:%s"
)

type Options struct {
Expand Down Expand Up @@ -68,7 +70,16 @@ func (b *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error {
return b.conn.LPush(ctx, queue, msg).Err()
}

func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error {
return b.conn.ZAdd(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.Z{
Score: float64(ts.UnixNano()),
Member: msg,
}).Err()
}

func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
go b.consumeScheduled(ctx, queue)

for {
select {
case <-ctx.Done():
Expand All @@ -93,6 +104,46 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
}
}

func (b *Broker) consumeScheduled(ctx context.Context, queue string) {
poll := time.NewTicker(b.pollPeriod)

for {
select {
case <-ctx.Done():
b.log.Debug("shutting down scheduled consumer..")
return
case <-poll.C:
b.conn.Watch(ctx, func(tx *redis.Tx) error {
// Fetch the tasks with score less than current time. These tasks have been scheduled
// to be queued.
tasks, err := tx.ZRevRangeByScore(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.ZRangeBy{
Min: "0",
Max: strconv.FormatInt(time.Now().UnixNano(), 10),
Offset: 0,
Count: 1,
}).Result()
if err != nil {
return err
}

for _, task := range tasks {
if err := b.Enqueue(ctx, []byte(task), queue); err != nil {
return err
}
}

// Remove the tasks
if err := tx.ZRem(ctx, fmt.Sprintf(sortedSetKey, queue), tasks).Err(); err != nil {
return err
}

return nil
})
}

}
}

func blpopResult(rs []string) (string, error) {
if len(rs) != 2 {
return "", fmt.Errorf("BLPop result should have exactly 2 strings. Got : %v", rs)
Expand Down
Loading

0 comments on commit 93c7056

Please sign in to comment.