Skip to content

refactor: refactor message handling and encoding in queue system #138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func BenchmarkQueue(b *testing.B) {
m := job.NewMessage(&mockMessage{
message: "foo",
})
m.Encode()

for n := 0; n < b.N; n++ {
if err := q.queue(&m); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ type Worker interface {
type QueuedMessage interface {
Bytes() []byte
}

type TaskMessage interface {
QueuedMessage
Payload() []byte
}
40 changes: 30 additions & 10 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Message struct {
Timeout time.Duration `json:"timeout" msgpack:"timeout"`

// Payload is the payload data of the task.
Payload []byte `json:"body" msgpack:"body"`
Body []byte `json:"body" msgpack:"body"`

// RetryCount set count of retry
// default is 0, no retry.
Expand All @@ -48,19 +48,30 @@ type Message struct {

// Jitter eases contention by randomizing backoff steps
Jitter bool `json:"jitter" msgpack:"jitter"`
}

// Data to save Unsafe cast
Data []byte
// Payload returns the payload data of the Message.
// It returns the byte slice of the payload.
//
// Returns:
// - A byte slice containing the payload data.
func (m *Message) Payload() []byte {
return m.Body
}

// Bytes get internal data
// Bytes returns the byte slice of the Message struct.
// If the marshalling process encounters an error, the function will panic.
// It returns the marshalled byte slice.
//
// Returns:
// - A byte slice containing the msgpack-encoded data.
func (m *Message) Bytes() []byte {
return m.Data
}
b, err := json.Marshal(m)
if err != nil {
panic(err)
}

// Encode for encoding the structure
func (m *Message) Encode() {
m.Data = Encode(m)
return b
}

// NewMessage create new message
Expand All @@ -74,7 +85,7 @@ func NewMessage(m core.QueuedMessage, opts ...AllowOption) Message {
RetryMin: o.retryMin,
RetryMax: o.retryMax,
Timeout: o.timeout,
Payload: m.Bytes(),
Body: m.Bytes(),
}
}

Expand All @@ -92,6 +103,15 @@ func NewTask(task TaskFunc, opts ...AllowOption) Message {
}
}

// Encode takes a Message struct and marshals it into a byte slice using msgpack.
// If the marshalling process encounters an error, the function will panic.
// It returns the marshalled byte slice.
//
// Parameters:
// - m: A pointer to the Message struct to be encoded.
//
// Returns:
// - A byte slice containing the msgpack-encoded data.
func Encode(m *Message) []byte {
b, err := json.Marshal(m)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ func TestMessageEncodeDecode(t *testing.T) {
},
)

m.Encode()
out := Decode(m.Bytes())

assert.Equal(t, int64(100), out.RetryCount)
assert.Equal(t, 30*time.Millisecond, out.RetryDelay)
assert.Equal(t, 3*time.Millisecond, out.Timeout)
assert.Equal(t, "foo", string(out.Payload))
assert.Equal(t, "foo", string(out.Payload()))
assert.Equal(t, 200*time.Millisecond, out.RetryMin)
assert.Equal(t, 20*time.Second, out.RetryMax)
assert.Equal(t, 4.0, out.RetryFactor)
Expand Down
14 changes: 6 additions & 8 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (q *Queue) Wait() {
// Queue to queue single job with binary
func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error {
data := job.NewMessage(message, opts...)
data.Encode()

return q.queue(&data)
}
Expand Down Expand Up @@ -160,7 +159,7 @@ func (q *Queue) work(task core.QueuedMessage) {
q.metric.DecBusyWorker()
e := recover()
if e != nil {
q.logger.Errorf("panic error: %v", e)
q.logger.Fatalf("panic error: %v", e)
}
q.schedule()

Expand All @@ -182,13 +181,12 @@ func (q *Queue) work(task core.QueuedMessage) {
}

func (q *Queue) run(task core.QueuedMessage) error {
data := task.(*job.Message)
if data.Task == nil {
data = job.Decode(task.Bytes())
data.Data = data.Payload
switch t := task.(type) {
case *job.Message:
return q.handle(t)
default:
return errors.New("invalid task type")
}

return q.handle(data)
}

func (q *Queue) handle(m *job.Message) error {
Expand Down
6 changes: 3 additions & 3 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNewQueueWithDefaultWorker(t *testing.T) {
func TestHandleTimeout(t *testing.T) {
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
Body: []byte("foo"),
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestHandleTimeout(t *testing.T) {
func TestJobComplete(t *testing.T) {
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
Body: []byte("foo"),
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
Expand All @@ -132,7 +132,7 @@ func TestJobComplete(t *testing.T) {

m = &job.Message{
Timeout: 250 * time.Millisecond,
Payload: []byte("foo"),
Body: []byte("foo"),
}

w = NewRing(
Expand Down
15 changes: 14 additions & 1 deletion ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync/atomic"

"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
)

var _ core.Worker = (*Ring)(nil)
Expand All @@ -25,9 +26,21 @@ type Ring struct {
stopFlag int32
}

type Data struct {
Payload []byte `json:"payload"`
}

func (d *Data) Bytes() []byte {
return d.Payload
}

// Run to execute new task
func (s *Ring) Run(ctx context.Context, task core.QueuedMessage) error {
return s.runFunc(ctx, task)
v, _ := task.(*job.Message)
data := &Data{
Payload: v.Body,
}
return s.runFunc(ctx, data)
}

// Shutdown the worker
Expand Down
Loading