Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use approximate memory usage for QueueInfo #310

Merged
merged 1 commit into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Changed `Queue` function to not to convert the provided queue name to lowercase. Queue names are now case-sensitive.
- `QueueInfo.MemoryUsage` is now an approximate usage value.

### Fixed

- Fixed latency issue around memory usage (see https://github.com/hibiken/asynq/issues/309).

## [0.18.1] - 2020-07-04

Expand Down
1 change: 1 addition & 0 deletions inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type QueueInfo struct {
Queue string

// Total number of bytes that the queue and its tasks require to be stored in redis.
// It is an approximate memory usage value in bytes since the value is computed by sampling.
MemoryUsage int64

// Size is the total number of tasks in the queue.
Expand Down
98 changes: 75 additions & 23 deletions internal/rdb/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type Stats struct {
// Name of the queue (e.g. "default", "critical").
Queue string
// MemoryUsage is the total number of bytes the queue and its tasks require
// to be stored in redis.
// to be stored in redis. It is an approximate memory usage value in bytes
// since the value is computed by sampling.
MemoryUsage int64
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Expand Down Expand Up @@ -172,31 +173,82 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
return stats, nil
}

// Computes memory usage for the given queue by sampling tasks
// from each redis list/zset. Returns approximate memory usage value
// in bytes.
//
// KEYS[1] -> asynq:{qname}:active
// KEYS[2] -> asynq:{qname}:pending
// KEYS[3] -> asynq:{qname}:scheduled
// KEYS[4] -> asynq:{qname}:retry
// KEYS[5] -> asynq:{qname}:archived
//
// ARGV[1] -> asynq:{qname}:t:
// ARGV[2] -> sample_size (e.g 20)
var memoryUsageCmd = redis.NewScript(`
local sample_size = tonumber(ARGV[2])
if sample_size <= 0 then
return redis.error_reply("sample size must be a positive number")
end
local memusg = 0
for i=1,2 do
local ids = redis.call("LRANGE", KEYS[i], 0, sample_size - 1)
local sample_total = 0
if (table.getn(ids) > 0) then
for _, id in ipairs(ids) do
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
sample_total = sample_total + bytes
end
local n = redis.call("LLEN", KEYS[i])
local avg = sample_total / table.getn(ids)
memusg = memusg + (avg * n)
end
local m = redis.call("MEMORY", "USAGE", KEYS[i])
if (m) then
memusg = memusg + m
end
end
for i=3,5 do
local ids = redis.call("ZRANGE", KEYS[i], 0, sample_size - 1)
local sample_total = 0
if (table.getn(ids) > 0) then
for _, id in ipairs(ids) do
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
sample_total = sample_total + bytes
end
local n = redis.call("ZCARD", KEYS[i])
local avg = sample_total / table.getn(ids)
memusg = memusg + (avg * n)
end
local m = redis.call("MEMORY", "USAGE", KEYS[i])
if (m) then
memusg = memusg + m
end
end
return memusg
`)

func (r *RDB) memoryUsage(qname string) (int64, error) {
var op errors.Op = "rdb.memoryUsage"
var (
keys []string
data []string
cursor uint64
err error
)
for {
data, cursor, err = r.client.Scan(cursor, fmt.Sprintf("asynq:{%s}*", qname), 100).Result()
if err != nil {
return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "scan", Err: err})
}
keys = append(keys, data...)
if cursor == 0 {
break
}
const sampleSize = 20
keys := []string{
base.ActiveKey(qname),
base.PendingKey(qname),
base.ScheduledKey(qname),
base.RetryKey(qname),
base.ArchivedKey(qname),
}
var usg int64
for _, k := range keys {
n, err := r.client.MemoryUsage(k).Result()
if err != nil {
return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "memory usage", Err: err})
}
usg += n
argv := []interface{}{
base.TaskKeyPrefix(qname),
sampleSize,
}
res, err := memoryUsageCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
}
usg, err := cast.ToInt64E(res)
if err != nil {
return 0, errors.E(op, errors.Internal, fmt.Sprintf("could not cast script return value to int64"))
}
return usg, nil
}
Expand Down