Skip to content

Commit

Permalink
feature: configurable janitor interval and deletion batch size (#715)
Browse files Browse the repository at this point in the history
* feature: configurable janitor interval and deletion batch size

* warn user when they set a big number of janitor batch size

* Update CHANGELOG.md

---------

Co-authored-by: Agung Hariadi Tedja <agung.tedja@kumparan.com>
  • Loading branch information
zhenqianz and Agung Hariadi Tedja authored May 6, 2024
1 parent 1740088 commit d04888e
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.25.0] - 2023-01-02

### Added
- Added configuration for Janitor's Interval and Deletion Batch Size (PR: https://github.com/hibiken/asynq/pull/715)

## [0.24.1] - 2023-05-01

### Changed
Expand Down
2 changes: 1 addition & 1 deletion internal/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ type Broker interface {
ReclaimStaleAggregationSets(qname string) error

// Task retention related method
DeleteExpiredCompletedTasks(qname string) error
DeleteExpiredCompletedTasks(qname string, batchSize int) error

// Lease related methods
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
Expand Down
4 changes: 1 addition & 3 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,9 +1241,7 @@ return table.getn(ids)`)

// DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set,
// and delete all expired tasks.
func (r *RDB) DeleteExpiredCompletedTasks(qname string) error {
// Note: Do this operation in fix batches to prevent long running script.
const batchSize = 100
func (r *RDB) DeleteExpiredCompletedTasks(qname string, batchSize int) error {
for {
n, err := r.deleteExpiredCompletedTasks(qname, batchSize)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/rdb/rdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2542,8 +2542,8 @@ func TestDeleteExpiredCompletedTasks(t *testing.T) {
h.FlushDB(t, r.client)
h.SeedAllCompletedQueues(t, r.client, tc.completed)

if err := r.DeleteExpiredCompletedTasks(tc.qname); err != nil {
t.Errorf("DeleteExpiredCompletedTasks(%q) failed: %v", tc.qname, err)
if err := r.DeleteExpiredCompletedTasks(tc.qname, 100); err != nil {
t.Errorf("DeleteExpiredCompletedTasks(%q, 100) failed: %v", tc.qname, err)
continue
}

Expand Down
6 changes: 3 additions & 3 deletions internal/testbroker/testbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"sync"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
)

var errRedisDown = errors.New("testutil: redis is down")
Expand Down Expand Up @@ -145,13 +145,13 @@ func (tb *TestBroker) ForwardIfReady(qnames ...string) error {
return tb.real.ForwardIfReady(qnames...)
}

func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string) error {
func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string, batchSize int) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.DeleteExpiredCompletedTasks(qname)
return tb.real.DeleteExpiredCompletedTasks(qname, batchSize)
}

func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {
Expand Down
15 changes: 10 additions & 5 deletions janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ type janitor struct {

// average interval between checks.
avgInterval time.Duration

// number of tasks to be deleted when janitor runs to delete the expired completed tasks.
batchSize int
}

type janitorParams struct {
logger *log.Logger
broker base.Broker
queues []string
interval time.Duration
logger *log.Logger
broker base.Broker
queues []string
interval time.Duration
batchSize int
}

func newJanitor(params janitorParams) *janitor {
Expand All @@ -43,6 +47,7 @@ func newJanitor(params janitorParams) *janitor {
done: make(chan struct{}),
queues: params.queues,
avgInterval: params.interval,
batchSize: params.batchSize,
}
}

Expand Down Expand Up @@ -73,7 +78,7 @@ func (j *janitor) start(wg *sync.WaitGroup) {

func (j *janitor) exec() {
for _, qname := range j.queues {
if err := j.broker.DeleteExpiredCompletedTasks(qname); err != nil {
if err := j.broker.DeleteExpiredCompletedTasks(qname, j.batchSize); err != nil {
j.logger.Errorf("Failed to delete expired completed tasks from queue %q: %v",
qname, err)
}
Expand Down
10 changes: 6 additions & 4 deletions janitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ func TestJanitor(t *testing.T) {
defer r.Close()
rdbClient := rdb.NewRDB(r)
const interval = 1 * time.Second
const batchSize = 100
janitor := newJanitor(janitorParams{
logger: testLogger,
broker: rdbClient,
queues: []string{"default", "custom"},
interval: interval,
logger: testLogger,
broker: rdbClient,
queues: []string{"default", "custom"},
interval: interval,
batchSize: batchSize,
})

now := time.Now()
Expand Down
38 changes: 34 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ type Config struct {
//
// If unset or nil, the group aggregation feature will be disabled on the server.
GroupAggregator GroupAggregator

// JanitorInterval specifies the average interval of janitor checks for expired completed tasks.
//
// If unset or zero, default interval of 8 seconds is used.
JanitorInterval time.Duration

// JanitorBatchSize specifies the number of expired completed tasks to be deleted in one run.
//
// If unset or zero, default batch size of 100 is used.
// Make sure to not put a big number as the batch size to prevent a long-running script.
JanitorBatchSize int
}

// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
Expand Down Expand Up @@ -408,6 +419,10 @@ const (
defaultDelayedTaskCheckInterval = 5 * time.Second

defaultGroupGracePeriod = 1 * time.Minute

defaultJanitorInterval = 8 * time.Second

defaultJanitorBatchSize = 100
)

// NewServer returns a new Server given a redis connection option
Expand Down Expand Up @@ -547,11 +562,26 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
interval: healthcheckInterval,
healthcheckFunc: cfg.HealthCheckFunc,
})

janitorInterval := cfg.JanitorInterval
if janitorInterval == 0 {
janitorInterval = defaultJanitorInterval
}

janitorBatchSize := cfg.JanitorBatchSize
if janitorBatchSize == 0 {
janitorBatchSize = defaultJanitorBatchSize
}
if janitorBatchSize > defaultJanitorBatchSize {
logger.Warnf("Janitor batch size of %d is greater than the recommended batch size of %d. "+
"This might cause a long-running script", janitorBatchSize, defaultJanitorBatchSize)
}
janitor := newJanitor(janitorParams{
logger: logger,
broker: rdb,
queues: qnames,
interval: 8 * time.Second,
logger: logger,
broker: rdb,
queues: qnames,
interval: janitorInterval,
batchSize: janitorBatchSize,
})
aggregator := newAggregator(aggregatorParams{
logger: logger,
Expand Down

0 comments on commit d04888e

Please sign in to comment.