Skip to content

Commit

Permalink
Add completed state
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken authored Nov 6, 2021
1 parent ddb1798 commit 741a3c5
Show file tree
Hide file tree
Showing 36 changed files with 1,235 additions and 202 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ package-json.lock
# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# main binary
# binaries
asynqmon
api
dist/

# Editor configs
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
.PHONY: assets build docker
.PHONY: api assets build docker

NODE_PATH ?= $(PWD)/ui/node_modules
assets:
@if [ ! -d "$(NODE_PATH)" ]; then cd ./ui && yarn install --modules-folder $(NODE_PATH); fi
cd ./ui && yarn build --modules-folder $(NODE_PATH)

# This target skips the overhead of building UI assets.
# Intended to be used during development.
api:
go build -o api ./cmd/asynqmon

# Build a release binary.
build: assets
go build -o asynqmon ./cmd/asynqmon
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ func main() {
RedisConnOpt: asynq.RedisClientOpt{Addr: ":6379"},
})

http.Handle(h.RootPath(), h)
// Note: We need the tailing slash when using net/http.ServeMux.
http.Handle(h.RootPath()+"/", h)

// Go to http://localhost:8080/monitoring to see asynqmon homepage.
log.Fatal(http.ListenAndServe(":8080", nil))
Expand Down
8 changes: 8 additions & 0 deletions cmd/asynqmon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
flagRedisInsecureTLS bool
flagRedisClusterNodes string
flagMaxPayloadLength int
flagMaxResultLength int
)

func init() {
Expand All @@ -39,6 +40,7 @@ func init() {
flag.BoolVar(&flagRedisInsecureTLS, "redis-insecure-tls", false, "disable TLS certificate host checks")
flag.StringVar(&flagRedisClusterNodes, "redis-cluster-nodes", "", "comma separated list of host:port addresses of cluster nodes")
flag.IntVar(&flagMaxPayloadLength, "max-payload-length", 200, "maximum number of utf8 characters printed in the payload cell in the Web UI")
flag.IntVar(&flagMaxResultLength, "max-result-length", 200, "maximum number of utf8 characters printed in the result cell in the Web UI")
}

// TODO: Write test and refactor this code.
Expand Down Expand Up @@ -102,6 +104,7 @@ func main() {
h := asynqmon.New(asynqmon.Options{
RedisConnOpt: redisConnOpt,
PayloadFormatter: asynqmon.PayloadFormatterFunc(formatPayload),
ResultFormatter: asynqmon.ResultFormatterFunc(formatResult),
})
defer h.Close()

Expand All @@ -125,6 +128,11 @@ func formatPayload(taskType string, payload []byte) string {
return truncate(payloadStr, flagMaxPayloadLength)
}

func formatResult(taskType string, result []byte) string {
resultStr := asynqmon.DefaultResultFormatter.FormatResult(taskType, result)
return truncate(resultStr, flagMaxResultLength)
}

// truncates string s to limit length (in utf8).
func truncate(s string, limit int) string {
i := 0
Expand Down
82 changes: 79 additions & 3 deletions conversion_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,30 @@ import (
// - conversion function from an external type to an internal type
// ****************************************************************************

// PayloadFormatter is used to convert payload bytes to string shown in the UI.
// PayloadFormatter is used to convert payload bytes to a string shown in the UI.
type PayloadFormatter interface {
// FormatPayload takes the task's typename and payload and returns a string representation of the payload.
FormatPayload(taskType string, payload []byte) string
}

type PayloadFormatterFunc func(string, []byte) string

// FormatPayload returns a string representation of the payload of the given taskType.
func (f PayloadFormatterFunc) FormatPayload(taskType string, payload []byte) string {
return f(taskType, payload)
}

// ResultFormatter is used to convert result bytes to a string shown in the UI.
type ResultFormatter interface {
// FormatResult takes the task's typename and result and returns a string representation of the result.
FormatResult(taskType string, result []byte) string
}

type ResultFormatterFunc func(string, []byte) string

func (f ResultFormatterFunc) FormatResult(taskType string, result []byte) string {
return f(taskType, result)
}

// DefaultPayloadFormatter is the PayloadFormater used by default.
// It prints the given payload bytes as is if the bytes are printable, otherwise it prints a message to indicate
// that the bytes are not printable.
Expand All @@ -37,6 +48,16 @@ var DefaultPayloadFormatter = PayloadFormatterFunc(func(_ string, payload []byte
return string(payload)
})

// DefaultResultFormatter is the ResultFormatter used by default.
// It prints the given result bytes as is if the bytes are printable, otherwise it prints a message to indicate
// that the bytes are not printable.
var DefaultResultFormatter = ResultFormatterFunc(func(_ string, result []byte) string {
if !isPrintable(result) {
return "non-printable bytes"
}
return string(result)
})

// isPrintable reports whether the given data is comprised of all printable runes.
func isPrintable(data []byte) bool {
if !utf8.Valid(data) {
Expand Down Expand Up @@ -67,6 +88,7 @@ type queueStateSnapshot struct {
Scheduled int `json:"scheduled"`
Retry int `json:"retry"`
Archived int `json:"archived"`
Completed int `json:"completed"`

// Total number of tasks processed during the given date.
// The number includes both succeeded and failed tasks.
Expand All @@ -91,6 +113,7 @@ func toQueueStateSnapshot(s *asynq.QueueInfo) *queueStateSnapshot {
Scheduled: s.Scheduled,
Retry: s.Retry,
Archived: s.Archived,
Completed: s.Completed,
Processed: s.Processed,
Succeeded: s.Processed - s.Failed,
Failed: s.Failed,
Expand Down Expand Up @@ -152,6 +175,22 @@ type taskInfo struct {
// NextProcessAt is the time the task is scheduled to be processed in RFC3339 format.
// If not applicable, empty string.
NextProcessAt string `json:"next_process_at"`
// CompletedAt is the time the task was successfully processed in RFC3339 format.
// If not applicable, empty string.
CompletedAt string `json:"completed_at"`
// Result is the result data associated with the task.
Result string `json:"result"`
// TTL is the number of seconds the task has left to be retained in the queue.
// This is calculated by (CompletedAt + ResultTTL) - Now.
TTL int64 `json:"ttl_seconds"`
}

// taskTTL calculates TTL for the given task.
func taskTTL(task *asynq.TaskInfo) time.Duration {
if task.State != asynq.TaskStateCompleted {
return 0 // N/A
}
return task.CompletedAt.Add(task.Retention).Sub(time.Now())
}

// formatTimeInRFC3339 formats t in RFC3339 if the value is non-zero.
Expand All @@ -163,7 +202,7 @@ func formatTimeInRFC3339(t time.Time) string {
return t.Format(time.RFC3339)
}

func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *taskInfo {
func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter, rf ResultFormatter) *taskInfo {
return &taskInfo{
ID: info.ID,
Queue: info.Queue,
Expand All @@ -177,6 +216,9 @@ func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *taskInfo {
Timeout: int(info.Timeout.Seconds()),
Deadline: formatTimeInRFC3339(info.Deadline),
NextProcessAt: formatTimeInRFC3339(info.NextProcessAt),
CompletedAt: formatTimeInRFC3339(info.CompletedAt),
Result: rf.FormatResult("", info.Result),
TTL: int64(taskTTL(info).Seconds()),
}
}

Expand Down Expand Up @@ -343,6 +385,40 @@ func toArchivedTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*archivedTask
return out
}

type completedTask struct {
*baseTask
CompletedAt time.Time `json:"completed_at"`
Result string `json:"result"`
// Number of seconds left for retention (i.e. (CompletedAt + ResultTTL) - Now)
TTL int64 `json:"ttl_seconds"`
}

func toCompletedTask(ti *asynq.TaskInfo, pf PayloadFormatter, rf ResultFormatter) *completedTask {
base := &baseTask{
ID: ti.ID,
Type: ti.Type,
Payload: pf.FormatPayload(ti.Type, ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
LastError: ti.LastErr,
}
return &completedTask{
baseTask: base,
CompletedAt: ti.CompletedAt,
TTL: int64(taskTTL(ti).Seconds()),
Result: rf.FormatResult(ti.Type, ti.Result),
}
}

func toCompletedTasks(in []*asynq.TaskInfo, pf PayloadFormatter, rf ResultFormatter) []*completedTask {
out := make([]*completedTask, len(in))
for i, ti := range in {
out[i] = toCompletedTask(ti, pf, rf)
}
return out
}

type schedulerEntry struct {
ID string `json:"id"`
Spec string `json:"spec"`
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/go-redis/redis/v8 v8.11.3
github.com/gorilla/mux v1.8.0
github.com/hibiken/asynq v0.18.6
github.com/hibiken/asynq v0.19.0
github.com/rs/cors v1.7.0
)

2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/hibiken/asynq v0.18.6 h1:pBjtGh2QhDe1+/0yaSc56ANpdQ77BQgVfMIrj+NJrUM=
github.com/hibiken/asynq v0.18.6/go.mod h1:tyc63ojaW8SJ5SBm8mvI4DDONsguP5HE85EEl4Qr5Ig=
github.com/hibiken/asynq v0.19.0 h1:AoJhoivymyFhF92ZAmVzxd7jr0RM264HdgkbjPc+x+M=
github.com/hibiken/asynq v0.19.0/go.mod h1:tyc63ojaW8SJ5SBm8mvI4DDONsguP5HE85EEl4Qr5Ig=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down
38 changes: 27 additions & 11 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type Options struct {
//
// This field is optional.
PayloadFormatter PayloadFormatter

// ResultFormatter is used to convert result bytes to string shown in the UI.
//
// This field is optional.
ResultFormatter ResultFormatter
}

// HTTPHandler is a http.Handler for asynqmon application.
Expand Down Expand Up @@ -78,8 +83,9 @@ func (h *HTTPHandler) Close() error {
}

// RootPath returns the root URL path used for asynqmon application.
// Returned path string does not have the trailing slash.
func (h *HTTPHandler) RootPath() string {
return h.rootPath + "/"
return h.rootPath
}

//go:embed ui/build/*
Expand All @@ -88,9 +94,14 @@ var staticContents embed.FS
func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspector) *mux.Router {
router := mux.NewRouter().PathPrefix(opts.RootPath).Subrouter()

var pf PayloadFormatter = DefaultPayloadFormatter
var payloadFmt PayloadFormatter = DefaultPayloadFormatter
if opts.PayloadFormatter != nil {
pf = opts.PayloadFormatter
payloadFmt = opts.PayloadFormatter
}

var resultFmt ResultFormatter = DefaultResultFormatter
if opts.ResultFormatter != nil {
resultFmt = opts.ResultFormatter
}

api := router.PathPrefix("/api").Subrouter()
Expand All @@ -105,20 +116,20 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto
api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector)).Methods("GET")

// Task endpoints.
api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, pf)).Methods("GET")
api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST")

api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, pf)).Methods("GET")
api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")

api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, pf)).Methods("GET")
api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
Expand All @@ -129,7 +140,7 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto
api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")

api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, pf)).Methods("GET")
api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
Expand All @@ -140,21 +151,26 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto
api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")

api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, pf)).Methods("GET")
api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")

api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, pf)).Methods("GET")
api.HandleFunc("/queues/{qname}/completed_tasks", newListCompletedTasksHandlerFunc(inspector, payloadFmt, resultFmt)).Methods("GET")
api.HandleFunc("/queues/{qname}/completed_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/completed_tasks:delete_all", newDeleteAllCompletedTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/completed_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")

api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, payloadFmt, resultFmt)).Methods("GET")

// Servers endpoints.
api.HandleFunc("/servers", newListServersHandlerFunc(inspector, pf)).Methods("GET")
api.HandleFunc("/servers", newListServersHandlerFunc(inspector, payloadFmt)).Methods("GET")

// Scheduler Entry endpoints.
api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, pf)).Methods("GET")
api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, payloadFmt)).Methods("GET")
api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET")

// Redis info endpoint.
Expand Down
Loading

0 comments on commit 741a3c5

Please sign in to comment.