Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Same task received/processed by more than one worker #90

Closed
gunnsth opened this issue Feb 18, 2020 · 4 comments · Fixed by #170
Closed

[BUG] Same task received/processed by more than one worker #90

gunnsth opened this issue Feb 18, 2020 · 4 comments · Fixed by #170
Assignees
Labels
bug Something isn't working

Comments

@gunnsth
Copy link

gunnsth commented Feb 18, 2020

Describe the bug
The problem is that I spawned a taskqueuer that queued tasks ranging from now - 10 minutes to now + 10 minutes. With 4 workers running (each at concurrency 1).

The output I got was:

taskrunner2_1  | Creating redis worker
taskrunner2_1  | asynq: pid=1 2020/02/18 22:46:41.351359 INFO: Starting processing
taskrunner_1   | Creating redis worker
taskrunner_1   | asynq: pid=1 2020/02/18 22:46:41.359319 INFO: Starting processing
taskrunner_1   | asynq: pid=1 2020/02/18 22:46:41.443077 INFO: Send signal TSTP to stop processing new tasks
taskrunner_1   | asynq: pid=1 2020/02/18 22:46:41.443085 INFO: Send signal TERM or INT to terminate the process
taskrunner_1   | Got task: task 3
taskrunner_1   | (*asynq.Task)(0xc00000e220)({
taskrunner_1   |  Type: (string) (len=6) "task 3",
taskrunner_1   |  Payload: (asynq.Payload) {
taskrunner_1   |   data: (map[string]interface {}) (len=1) {
taskrunner_1   |    (string) (len=1) "i": (float64) -8
taskrunner_1   |   }
taskrunner_1   |  }
taskrunner_1   | })
taskrunner_1   | Got task: task 4
taskrunner_1   | (*asynq.Task)(0xc00000e440)({
taskrunner_1   |  Type: (string) (len=6) "task 4",
taskrunner_1   |  Payload: (asynq.Payload) {
taskrunner_1   |   data: (map[string]interface {}) (len=1) {
taskrunner_1   |    (string) (len=1) "i": (float64) -7
taskrunner_1   |   }
taskrunner_1   |  }
taskrunner_1   | })
taskrunner2_1  | Got task: task 1
taskrunner2_1  | (*asynq.Task)(0xc00000e380)({
taskrunner2_1  |  Type: (string) (len=6) asynq: pid=1 2020/02/18 22:46:41.443166 INFO: Send signal TSTP to stop processing new tasks
taskrunner2_1  | asynq: pid=1 2020/02/18 22:46:41.443173 INFO: Send signal TERM or INT to terminate the process
taskrunner2_1  | "task 1",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -10
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
taskrunner2_1  | Got task: task 2
taskrunner2_1  | (*asynq.Task)(0xc00000e540)({
taskrunner2_1  |  Type: (string) (len=6) "task 2",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -9
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
taskrunner2_1  | Got task: task 3
taskrunner2_1  | (*asynq.Task)(0xc00007c900)({
taskrunner2_1  |  Type: (string) (len=6) "task 3",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -8
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
taskrunner2_1  | Got task: task 4
taskrunner2_1  | (*asynq.Task)(0xc00000e640)({
taskrunner2_1  |  Type: (string) (len=6) "task 4",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -7
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
taskrunner2_1  | Got task: task 6
taskrunner2_1  | (*asynq.Task)(0xc00007ca00)({
taskrunner2_1  |  Type: (string) (len=6) "task 6",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -5
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
taskrunner2_1  | Got task: task 7
taskrunner2_1  | (*asynq.Task)(0xc00000e740)({
taskrunner2_1  |  Type: (string) (len=6) "task 7",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -4
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
...

So tasks 3 and 4 were received twice, which could lead to problems, although I admit the case I am working with is a bit strange. I.e. minutes in the past etc.

To Reproduce
Steps to reproduce the behavior (Code snippets if applicable):

  1. Setup background processing ...
    Start 4 taskrunners
	bg := asynq.NewBackground(r, &asynq.Config{
		Concurrency: 1,
	})

	bg.Run(asynq.HandlerFunc(handler))
}

func handler(ctx context.Context, t *asynq.Task) error {
	fmt.Printf("Got task: %s\n", t.Type)
	spew.Dump(t)
	return nil
}
  1. Enqueue tasks ...
	fmt.Printf("Creating redis client\n")
	client := asynq.NewClient(r)

	n := 1
	for i := -10; i < 10; i++ {
		name := fmt.Sprintf("task %d", n)
		t := asynq.NewTask(name, map[string]interface{}{"i": i})
		err := client.Schedule(t, time.Now().Add(time.Duration(i)*time.Minute))
		if err != nil {
			fmt.Printf("Error scheduling\n")
			panic(err)
		}

		n+
}
  1. See Error ...
    As per the output above, tasks 3 and 4 were received by two workers. Would be good if we can guarantee that each task can only get processed once.

Note the way I am spawning the workers and queuer, it is possible that the tasks are queued before some workers start. It is all starting at the same time. Ideally that would not matter.

Expected behavior
Expected that each task could only be received by one worker / processed once.

Screenshots
N/A

Environment (please complete the following information):

  • Running on Ubuntu Linux
  • Spawning up redis, taskqueuer, 4x taskrunners in a single docker compose file.
  • Version asynq 0.4

Additional context
If needed, I can clean up my docker compose environment and provide a fully contained example.

@gunnsth gunnsth added the bug Something isn't working label Feb 18, 2020
@hibiken
Copy link
Owner

hibiken commented Feb 19, 2020

Thanks for filing this bug report!

I ran the same code on my machine with 4 worker processes reading from the same redis instance but could not reproduce the bug.

It could be that you've run the client code multiple times and there were duplicate tasks in Redis. Would you mind trying this again with clean Redis DB?

You can asynqmon stats to make sure that there's no tasks in Redis.
You can flush redis by running redis-cli flushdb

@gunnsth
Copy link
Author

gunnsth commented Feb 19, 2020

@hibiken I prepared an environment where this can be reproduced (uses docker, docker-compose):
https://github.com/hibiken/asynq/compare/master...gunnsth:issue90-reproduce?expand=1
to reproduce it:

$ cd testdata
$ docker-compose up

It will not always give exactly the same results, I guess there's some stochastic/random factor that determines which taskrunner catches tasks.

Example outputs:

taskrunner_1   | Tasks processed
taskrunner_1   | (map[int]int) (len=5) {
taskrunner_1   |  (int) 10: (int) 1,
taskrunner_1   |  (int) 2: (int) 1,
taskrunner_1   |  (int) 4: (int) 1,
taskrunner_1   |  (int) 6: (int) 1,
taskrunner_1   |  (int) 8: (int) 1
taskrunner_1   | }
taskrunner3_1  | Tasks processed
taskrunner3_1  | (map[int]int) (len=3) {
taskrunner3_1  |  (int) 1: (int) 1,
taskrunner3_1  |  (int) 2: (int) 1,
taskrunner3_1  |  (int) 20: (int) 1
taskrunner3_1  | }
taskrunner2_1  | Tasks processed
taskrunner2_1  | (map[int]int) (len=7) {
taskrunner2_1  |  (int) 1: (int) 1,
taskrunner2_1  |  (int) 3: (int) 1,
taskrunner2_1  |  (int) 5: (int) 1,
taskrunner2_1  |  (int) 7: (int) 1,
taskrunner2_1  |  (int) 9: (int) 1,
taskrunner2_1  |  (int) 11: (int) 1,
taskrunner2_1  |  (int) 14: (int) 1
taskrunner2_1  | }

We see that both taskrunner1 and 3 have processed task 2. And taskrunner3 and 2 have both processed task1.

Is there any way we can ensure that this does not happen?
There should not be any duplicate tasks since this creates a fresh redis instance.
(to be sure, it's easy to clean all the docker images).

Would it be possible to clear all tasks prior to start creating tasks (programmatically)? Just to make sure?

@hibiken
Copy link
Owner

hibiken commented Feb 20, 2020

@gunnsth
Could you try adding this to your taskqueue.go to flush DB to start from clean slate?

import "github.com/go-redis/redis/v7"

func main() {
    // Flush DB first to start from a clean slate.
    rdb := redis.NewClient(&redis.Options{
       Addr: "redis:6379",
       Password: "xxxx",
    })
    if err := rdb.FlushDB().Err(); err != nil {
        log.Fatalln(err)
    }
 
    // ... create asynq.Client and schedule tasks (your existing code)
}

Let me know if you are still seeing duplicate tasks.
Otherwise, we can close this issue 👍

@hibiken
Copy link
Owner

hibiken commented Jun 13, 2020

@gunnsth I finally figured out what's causing this. Fix is in #170

@hibiken hibiken reopened this Jun 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants