Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Archived tasks that are trimmed are not deleted #730

Open
Harrison-Miller opened this issue Aug 31, 2023 · 1 comment
Open

[BUG] Archived tasks that are trimmed are not deleted #730

Harrison-Miller opened this issue Aug 31, 2023 · 1 comment
Assignees
Labels
bug Something isn't working

Comments

@Harrison-Miller
Copy link
Contributor

Harrison-Miller commented Aug 31, 2023

Describe the bug
Currently the keys of archived tasks are stored in a sorted set: asynq:{qname}:archived
These keys are trimmed from the sorted set every time a task is archived, however the actual key is never deleted.
This leads to exploding memory usage from redis if you reach maxArchiveTasks (10k).

Keys that are trimmed are left orphaned from that set and can't be deleted using the inspector.DeleteAllArchivedTasks.
You can see that these orphaned archived tasks exists by running on a redis/asynq server not currently receiving tasks:
KEYS asynq:{qname}:t*
and comparing the count to
ZCARD asynq:{qname}:archived

Other issues have already brought up that the lack of configuration support for:
maxArchiveSize
archivedExpirationInDays
I believe that being able to configure these settings is crucial for production environments. However it is a fine default settings, if the intended behavior worked.

However this bug is a very serious issue that can cause redis clusters to crash if they have limited memory.

To Reproduce

  1. Submit 10k Tasks
  2. Archive the tasks
  3. Submit and Archive 1 more task
  4. Observe that the number of tasks in asynq:{qname}:archived is not equal to the total number of task keys in redis

Test Case

func TestArchiveTrim(t *testing.T) {
	r := setup(t)
	defer r.Close()
	now := time.Now()
	r.SetClock(timeutil.NewSimulatedClock(now))

	errMsg := "SMTP server not responding"

	// create 10k archived tasks
	taskCount := maxArchiveSize - 1
	archivedTasks := make([]base.Z, 0)
	for i := 0; i < taskCount; i++ {

		id := uuid.NewString()
		task := base.TaskMessage{
			ID:      id,
			Type:    "send_email",
			Payload: nil,
			Queue:   "default",
		}
		archivedTasks = append(archivedTasks, base.Z{
			Message: h.TaskMessageWithError(task, errMsg, now),
			Score:   now.Add(-1 * time.Hour).Unix(),
		})
	}

	h.FlushDB(t, r.client) // clean up db before each test case
	h.SeedAllArchivedQueues(t, r.client, map[string][]base.Z{
		"default": archivedTasks,
	})

	archivedEntriesBefore := h.GetArchivedEntries(t, r.client, "default")
	if len(archivedEntriesBefore) != taskCount {
		t.Errorf("len of archived entries before = %v, want %v", len(archivedEntriesBefore), maxArchiveSize-1)
		return
	}

	// set up task that will cause archive queue to be trimmed
	id := uuid.NewString()
	target := &base.TaskMessage{
		ID:      id,
		Type:    "send_email",
		Payload: nil,
		Queue:   "default",
	}

	h.SeedAllActiveQueues(t, r.client, map[string][]*base.TaskMessage{
		"default": {target},
	})
	h.SeedAllLease(t, r.client, map[string][]base.Z{
		"default": {{Message: target, Score: now.Add(10 * time.Second).Unix()}},
	})

	err := r.Archive(context.Background(), target, errMsg)
	if err != nil {
		t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err)
		return
	}

	archivedEntriesInSet := h.GetArchivedEntries(t, r.client, "default")
	if len(archivedEntriesInSet) != taskCount {
		t.Errorf("len of archived entries = %v, want %v", len(archivedEntriesInSet), taskCount)
		return
	}

	// check that the target task is where we expect it
	newestTask := archivedEntriesInSet[len(archivedEntriesInSet)-1].Message
	if newestTask.ID != target.ID {
		t.Errorf("newest task in archive set = %v, want %v", newestTask.ID, target.ID)
		return
	}

	// now check if trim actually deleted the keys see if it's equal to taskCount
	vals := r.client.Keys(context.Background(), base.TaskKeyPrefix("default")+"*").Val()
	if len(vals) != taskCount {
		t.Errorf("len of archived keys = %v, want %v", len(vals), taskCount)
		return
	}
}
@Harrison-Miller Harrison-Miller added the bug Something isn't working label Aug 31, 2023
@Harrison-Miller
Copy link
Contributor Author

I am working on a PR to fix this issue, will submit it for review in a day or two.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants