diff --git a/queue/queue.go b/queue/queue.go index e32309952..34992f8df 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -103,6 +103,7 @@ type Opts struct { Ttl time.Duration // task time to live Ttr time.Duration // task time to execute Delay time.Duration // delayed execution + Utube string } func (opts Opts) toMap() map[string]interface{} { @@ -124,6 +125,10 @@ func (opts Opts) toMap() map[string]interface{} { ret["pri"] = opts.Pri } + if opts.Utube != "" { + ret["utube"] = opts.Utube + } + return ret } @@ -276,8 +281,8 @@ func (q *queue) Kick(count uint64) (uint64, error) { // Delete the task identified by its id. func (q *queue) Delete(taskId uint64) error { - _, err := q._delete(taskId) - return err + _, err := q._delete(taskId) + return err } // Return the number of tasks in a queue broken down by task_state, and the number of requests broken down by the type of request. diff --git a/queue/queue_test.go b/queue/queue_test.go index 535faec5f..d1a909120 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -2,6 +2,7 @@ package queue_test import ( "fmt" + "math" "testing" "time" @@ -747,3 +748,73 @@ func TestTtlQueue_Put(t *testing.T) { } } } + +func TestUtube_Put(t *testing.T) { + conn, err := Connect(server, opts) + if err != nil { + t.Errorf("Failed to connect: %s", err.Error()) + return + } + if conn == nil { + t.Errorf("conn is nil after Connect") + return + } + defer conn.Close() + + name := "test_utube" + cfg := queue.Cfg{ + Temporary: true, + Kind: queue.UTUBE, + IfNotExists: true, + } + q := queue.New(conn, name) + if err = q.Create(cfg); err != nil { + t.Errorf("Failed to create queue: %s", err.Error()) + return + } + defer func() { + //Drop + err := q.Drop() + if err != nil { + t.Errorf("Failed drop queue: %s", err.Error()) + } + }() + + data1 := &customData{"test-data-0"} + _, err = q.PutWithOpts(data1, queue.Opts{Utube: "test-utube-consumer-key"}) + if err != nil { + t.Fatalf("Failed put task to queue: %s", err.Error()) + } + data2 := &customData{"test-data-1"} + _, err = q.PutWithOpts(data2, queue.Opts{Utube: "test-utube-consumer-key"}) + if err != nil { + t.Fatalf("Failed put task to queue: %s", err.Error()) + } + + go func() { + t1, err := q.TakeTimeout(2 * time.Second) + if err != nil { + t.Fatalf("Failed to take task from utube: %s", err.Error()) + } + + time.Sleep(2 * time.Second) + if err := t1.Ack(); err != nil { + t.Fatalf("Failed to ack task: %s", err.Error()) + } + }() + + time.Sleep(100 * time.Millisecond) + // the queue should be blocked for ~2 seconds + start := time.Now() + t2, err := q.TakeTimeout(2 * time.Second) + if err != nil { + t.Fatalf("Failed to take task from utube: %s", err.Error()) + } + if err := t2.Ack(); err != nil { + t.Fatalf("Failed to ack task: %s", err.Error()) + } + end := time.Now() + if math.Abs(float64(end.Sub(start)-2*time.Second)) > float64(200*time.Millisecond) { + t.Fatalf("Blocking time is less than expected: actual = %.2fs, expected = 1s", end.Sub(start).Seconds()) + } +}