Skip to content

Commit

Permalink
Small refactoring and add missing comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
stefannegrea committed Dec 22, 2020
1 parent 221ccfa commit 04070bb
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
13 changes: 10 additions & 3 deletions middleware.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
package workers

// JobFunc is a message processor
type JobFunc func(message *Msg) error

// MiddlewareFunc is an extra function on the processing pipeline
type MiddlewareFunc func(queue string, m *Manager, next JobFunc) JobFunc

// Middlewares contains the lists of all configured middleware functions
type Middlewares []MiddlewareFunc

// Append adds middleware to the end of the processing pipeline
func (m Middlewares) Append(mid MiddlewareFunc) Middlewares {
return append(m, mid)
}

// Prepend adds middleware to the front of the processing pipeline
func (m Middlewares) Prepend(mid MiddlewareFunc) Middlewares {
return append(Middlewares{mid}, m...)
}

func (ms Middlewares) build(queue string, mgr *Manager, final JobFunc) JobFunc {
for i := len(ms) - 1; i >= 0; i-- {
final = ms[i](queue, mgr, final)
func (m Middlewares) build(queue string, mgr *Manager, final JobFunc) JobFunc {
for i := len(m) - 1; i >= 0; i-- {
final = m[i](queue, mgr, final)
}
return final
}

// NewMiddlewares creates the processing pipeline given the list of middleware funcs
func NewMiddlewares(mids ...MiddlewareFunc) Middlewares {
return Middlewares(mids)
}
Expand All @@ -32,6 +38,7 @@ var defaultMiddlewares = NewMiddlewares(
StatsMiddleware,
)

// DefaultMiddlewares creates the default middleware pipeline
func DefaultMiddlewares() Middlewares {
return defaultMiddlewares
}
Expand Down
21 changes: 15 additions & 6 deletions msg.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,54 @@
package workers

import (
"github.com/bitly/go-simplejson"
"reflect"

"github.com/bitly/go-simplejson"
)

type data struct {
*simplejson.Json
}

// Msg is the struct for job data (parameters and metadata)
type Msg struct {
*data
original string
ack bool
startedAt int64
}

// Args is the set of parameters for a message
type Args struct {
*data
}

// Class returns class attribute of a message
func (m *Msg) Class() string {
return m.Get("class").MustString()
}

// Jid returns job id attribute of a message
func (m *Msg) Jid() string {
return m.Get("jid").MustString()
}

// Args returns arguments attribute of a message
func (m *Msg) Args() *Args {
if args, ok := m.CheckGet("args"); ok {
return &Args{&data{args}}
} else {
d, _ := newData("[]")
return &Args{d}
}

d, _ := newData("[]")
return &Args{d}
}

// OriginalJson returns the original JSON message
func (m *Msg) OriginalJson() string {
return m.original
}

// ToJson return data in JSON format th message
func (d *data) ToJson() string {
json, err := d.Encode()

Expand All @@ -52,10 +60,11 @@ func (d *data) ToJson() string {
}

func (d *data) Equals(other interface{}) bool {
otherJson := reflect.ValueOf(other).MethodByName("ToJson").Call([]reflect.Value{})
return d.ToJson() == otherJson[0].String()
otherJSON := reflect.ValueOf(other).MethodByName("ToJson").Call([]reflect.Value{})
return d.ToJson() == otherJSON[0].String()
}

// NewMsg returns a new message
func NewMsg(content string) (*Msg, error) {
d, err := newData(content)
if err != nil {
Expand Down
21 changes: 11 additions & 10 deletions storage/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type redisStore struct {
// Compile-time check to ensure that Redis store does in fact implement the Store interface
var _ Store = &redisStore{}

// NewRedisStore returns a new Redis store with the given namespace and preconfigured client
func NewRedisStore(namespace string, client *redis.Client) Store {
return &redisStore{
namespace: namespace,
Expand All @@ -44,9 +45,9 @@ func (r *redisStore) DequeueMessage(queue string, inprogressQueue string, timeou
time.Sleep(1 * time.Second)

return "", err
} else {
return message, nil
}

return message, nil
}

func (r *redisStore) EnqueueMessage(queue string, priority float64, message string) error {
Expand Down Expand Up @@ -253,17 +254,17 @@ func (r *redisStore) getRetryJson(retryStats []string) ([]RetryJobStats, error)
return nil, err
}

error_msg, err := allRetryStats.Get("error_message").String()
errorMsg, err := allRetryStats.Get("error_message").String()
if err != nil {
return nil, err
}

failed_at, err := allRetryStats.Get("failed_at").String()
failedAt, err := allRetryStats.Get("failed_at").String()
if err != nil {
return nil, err
}

job_id, err := allRetryStats.Get("jid").String()
jobID, err := allRetryStats.Get("jid").String()
if err != nil {
return nil, err
}
Expand All @@ -273,7 +274,7 @@ func (r *redisStore) getRetryJson(retryStats []string) ([]RetryJobStats, error)
return nil, err
}

retry_count, err := allRetryStats.Get("retry_count").Int64()
retryCount, err := allRetryStats.Get("retry_count").Int64()
if err != nil {
return nil, err
}
Expand All @@ -282,11 +283,11 @@ func (r *redisStore) getRetryJson(retryStats []string) ([]RetryJobStats, error)
for i := 0; i < len(retryStats[0]); i++ {
retryJobStats = append(retryJobStats, RetryJobStats{
Class: class,
ErrorMessage: error_msg,
FailedAt: failed_at,
JobID: job_id,
ErrorMessage: errorMsg,
FailedAt: failedAt,
JobID: jobID,
Queue: queue,
RetryCount: retry_count,
RetryCount: retryCount,
})
}

Expand Down
6 changes: 5 additions & 1 deletion storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ type StorageError string

func (e StorageError) Error() string { return string(e) }

const NoMessage = StorageError("no message")
// list of known errors
const (
NoMessage = StorageError("no message")
)

type Stats struct {
Processed int64
Expand All @@ -37,6 +40,7 @@ type RetryJobStats struct {
RetryCount int64
}

// Store is the interface for storing and retrieving data
type Store interface {

// General queue operations
Expand Down

0 comments on commit 04070bb

Please sign in to comment.