Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metadata task #838

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (a *aggregator) aggregate(t time.Time) {
}
tasks := make([]*Task, len(msgs))
for i, m := range msgs {
tasks[i] = NewTask(m.Type, m.Payload)
tasks[i] = NewTask(m.Type, m.Payload, nil)
}
aggregatedTask := a.ga.Aggregate(gname, tasks)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
Expand Down
30 changes: 15 additions & 15 deletions aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func TestAggregator(t *testing.T) {
maxDelay: 0, // no maxdelay limit
maxSize: 0, // no maxsize limit
aggregateFunc: func(gname string, tasks []*Task) *Task {
return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
return NewTask(gname, nil, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
},
tasks: []*Task{
NewTask("task1", nil, Group("mygroup")),
NewTask("task2", nil, Group("mygroup")),
NewTask("task3", nil, Group("mygroup")),
NewTask("task1", nil, nil, Group("mygroup")),
NewTask("task2", nil, nil, Group("mygroup")),
NewTask("task3", nil, nil, Group("mygroup")),
},
enqueueFrequency: 300 * time.Millisecond,
waitTime: 3 * time.Second,
Expand All @@ -65,13 +65,13 @@ func TestAggregator(t *testing.T) {
maxDelay: 4 * time.Second,
maxSize: 0, // no maxsize limit
aggregateFunc: func(gname string, tasks []*Task) *Task {
return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
return NewTask(gname, nil, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
},
tasks: []*Task{
NewTask("task1", nil, Group("mygroup")), // time 0
NewTask("task2", nil, Group("mygroup")), // time 1s
NewTask("task3", nil, Group("mygroup")), // time 2s
NewTask("task4", nil, Group("mygroup")), // time 3s
NewTask("task1", nil, nil, Group("mygroup")), // time 0
NewTask("task2", nil, nil, Group("mygroup")), // time 1s
NewTask("task3", nil, nil, Group("mygroup")), // time 2s
NewTask("task4", nil, nil, Group("mygroup")), // time 3s
},
enqueueFrequency: 1 * time.Second,
waitTime: 4 * time.Second,
Expand All @@ -92,14 +92,14 @@ func TestAggregator(t *testing.T) {
maxDelay: 0, // no maxdelay limit
maxSize: 5,
aggregateFunc: func(gname string, tasks []*Task) *Task {
return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
return NewTask(gname, nil, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
},
tasks: []*Task{
NewTask("task1", nil, Group("mygroup")),
NewTask("task2", nil, Group("mygroup")),
NewTask("task3", nil, Group("mygroup")),
NewTask("task4", nil, Group("mygroup")),
NewTask("task5", nil, Group("mygroup")),
NewTask("task1", nil, nil, Group("mygroup")),
NewTask("task2", nil, nil, Group("mygroup")),
NewTask("task3", nil, nil, Group("mygroup")),
NewTask("task4", nil, nil, Group("mygroup")),
NewTask("task5", nil, nil, Group("mygroup")),
},
enqueueFrequency: 300 * time.Millisecond,
waitTime: defaultAggregationCheckInterval * 2,
Expand Down
25 changes: 16 additions & 9 deletions asynq.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"strings"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
)

// Task represents a unit of work to be performed.
Expand All @@ -31,10 +31,14 @@ type Task struct {

// w is the ResultWriter for the task.
w *ResultWriter

// md holds metadata of the task.
md Metadata
}

func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Metadata() map[string]string { return t.md }

// ResultWriter returns a pointer to the ResultWriter associated with the task.
//
Expand All @@ -44,19 +48,21 @@ func (t *Task) ResultWriter() *ResultWriter { return t.w }

// NewTask returns a new Task given a type name and payload data.
// Options can be passed to configure task processing behavior.
func NewTask(typename string, payload []byte, opts ...Option) *Task {
func NewTask(typename string, payload []byte, md Metadata, opts ...Option) *Task {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change.

return &Task{
typename: typename,
payload: payload,
md: md,
opts: opts,
}
}

// newTask creates a task with the given typename, payload and ResultWriter.
func newTask(typename string, payload []byte, w *ResultWriter) *Task {
func newTask(typename string, payload []byte, md Metadata, w *ResultWriter) *Task {
return &Task{
typename: typename,
payload: payload,
md: md,
w: w,
}
}
Expand Down Expand Up @@ -438,10 +444,11 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
//
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
// Supported formats are:
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
//
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func makeTask(n int) *Task {
if err != nil {
panic(err)
}
return NewTask(fmt.Sprintf("task%d", n), b)
return NewTask(fmt.Sprintf("task%d", n), b, nil)
}

// Simple E2E Benchmark testing with no scheduled tasks and retries.
Expand Down Expand Up @@ -222,7 +222,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
b.Log("Starting enqueueing")
enqueued := 0
for enqueued < 100000 {
t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued}))
t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued}), nil)
if _, err := client.Enqueue(t); err != nil {
b.Logf("could not enqueue task %d: %v", enqueued, err)
continue
Expand Down
9 changes: 5 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"strings"
"time"

"github.com/redis/go-redis/v9"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
)

// A Client is responsible for scheduling tasks.
Expand Down Expand Up @@ -150,9 +150,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) }
// TTL duration must be greater than or equal to 1 second.
//
// Uniqueness of a task is based on the following properties:
// - Task Type
// - Task Payload
// - Queue Name
// - Task Type
// - Task Payload
// - Queue Name
func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
Expand Down Expand Up @@ -369,6 +369,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
}
msg := &base.TaskMessage{
ID: opt.taskID,
Metadata: task.Metadata(),
Type: task.Type(),
Payload: task.Payload(),
Queue: opt.queue,
Expand Down
32 changes: 16 additions & 16 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t))
defer client.Close()

task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil)

var (
now = time.Now()
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestClientEnqueue(t *testing.T) {
client := NewClient(getRedisConnOpt(t))
defer client.Close()

task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil)
now := time.Now()

tests := []struct {
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestClientEnqueueWithGroupOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t))
defer client.Close()

task := NewTask("mytask", []byte("foo"))
task := NewTask("mytask", []byte("foo"), nil)
now := time.Now()

tests := []struct {
Expand Down Expand Up @@ -635,7 +635,7 @@ func TestClientEnqueueWithTaskIDOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t))
defer client.Close()

task := NewTask("send_email", nil)
task := NewTask("send_email", nil, nil)
now := time.Now()

tests := []struct {
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestClientEnqueueWithConflictingTaskID(t *testing.T) {
defer client.Close()

const taskID = "custom_id"
task := NewTask("foo", nil)
task := NewTask("foo", nil, nil)

if _, err := client.Enqueue(task, TaskID(taskID)); err != nil {
t.Fatalf("First task: Enqueue failed: %v", err)
Expand All @@ -729,7 +729,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t))
defer client.Close()

task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil)
now := time.Now()

tests := []struct {
Expand Down Expand Up @@ -852,7 +852,7 @@ func TestClientEnqueueError(t *testing.T) {
client := NewClient(getRedisConnOpt(t))
defer client.Close()

task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil)

tests := []struct {
desc string
Expand All @@ -873,27 +873,27 @@ func TestClientEnqueueError(t *testing.T) {
},
{
desc: "With empty task typename",
task: NewTask("", h.JSON(map[string]interface{}{})),
task: NewTask("", h.JSON(map[string]interface{}{}), nil),
opts: []Option{},
},
{
desc: "With blank task typename",
task: NewTask(" ", h.JSON(map[string]interface{}{})),
task: NewTask(" ", h.JSON(map[string]interface{}{}), nil),
opts: []Option{},
},
{
desc: "With empty task ID",
task: NewTask("foo", nil),
task: NewTask("foo", nil, nil),
opts: []Option{TaskID("")},
},
{
desc: "With blank task ID",
task: NewTask("foo", nil),
task: NewTask("foo", nil, nil),
opts: []Option{TaskID(" ")},
},
{
desc: "With unique option less than 1s",
task: NewTask("foo", nil),
task: NewTask("foo", nil, nil),
opts: []Option{Unique(300 * time.Millisecond)},
},
}
Expand Down Expand Up @@ -1015,7 +1015,7 @@ func TestClientWithDefaultOptions(t *testing.T) {
h.FlushDB(t, r)
c := NewClient(getRedisConnOpt(t))
defer c.Close()
task := NewTask(tc.tasktype, tc.payload, tc.defaultOpts...)
task := NewTask(tc.tasktype, tc.payload, nil, tc.defaultOpts...)
gotInfo, err := c.Enqueue(task, tc.opts...)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1052,7 +1052,7 @@ func TestClientEnqueueUnique(t *testing.T) {
ttl time.Duration
}{
{
NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})),
NewTask("email", h.JSON(map[string]interface{}{"user_id": 123}), nil),
time.Hour,
},
}
Expand Down Expand Up @@ -1096,7 +1096,7 @@ func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) {
ttl time.Duration
}{
{
NewTask("reindex", nil),
NewTask("reindex", nil, nil),
time.Hour,
10 * time.Minute,
},
Expand Down Expand Up @@ -1142,7 +1142,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
ttl time.Duration
}{
{
NewTask("reindex", nil),
NewTask("reindex", nil, nil),
time.Now().Add(time.Hour),
10 * time.Minute,
},
Expand Down
4 changes: 2 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func ExampleScheduler() {
&asynq.SchedulerOpts{Location: time.Local},
)

if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil)); err != nil {
if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil, nil)); err != nil {
log.Fatal(err)
}
if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil)); err != nil {
if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil, nil)); err != nil {
log.Fatal(err)
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ require (
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/stretchr/testify v1.8.4 // indirect
)
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA=
github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
Expand Down
4 changes: 2 additions & 2 deletions inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"strings"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
)

// Inspector is a client interface to inspect and mutate the state of
Expand Down Expand Up @@ -890,7 +890,7 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
return nil, err
}
for _, e := range res {
task := NewTask(e.Type, e.Payload)
task := NewTask(e.Type, e.Payload, nil)
var opts []Option
for _, s := range e.Opts {
if o, err := parseOption(s); err == nil {
Expand Down
4 changes: 2 additions & 2 deletions inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3324,14 +3324,14 @@ func TestInspectorSchedulerEntries(t *testing.T) {
want: []*SchedulerEntry{
{
Spec: "* * * * *",
Task: NewTask("foo", nil),
Task: NewTask("foo", nil, nil),
Opts: nil,
Next: now.Add(5 * time.Hour),
Prev: now.Add(-2 * time.Hour),
},
{
Spec: "@every 20m",
Task: NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})),
Task: NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"}), nil),
Opts: []Option{Queue("bar"), MaxRetry(20)},
Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute),
Expand Down
Loading
Loading