Skip to content

Commit

Permalink
refactor: job handler api
Browse files Browse the repository at this point in the history
  • Loading branch information
smsunarto committed Jun 30, 2024
1 parent 9c322a4 commit 0e7736d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 38 deletions.
16 changes: 6 additions & 10 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import (

const jobDBKeyPrefix = "job-"

// JobType is a user-defined job struct that is processed by the job queue.
// The struct must implement the Process method.
type JobType interface {
Process(JobContext) error
}
type JobHandler = func(JobContext, struct{}) error

// JobContext provides context for a job which is injected into the job Process method.
type JobContext interface {
Expand All @@ -20,17 +16,17 @@ type JobContext interface {
}

// Type job must implement the JobContext interface
var _ JobContext = (*job[JobType])(nil)
var _ JobContext = (*job[struct{}])(nil)

// job is an internal representation of a job in the job queue.
type job[T JobType] struct {
type job[T any] struct {
ID uint64 `json:"id"`
Payload T `json:"payload"`
Status JobStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
}

func newJob[T JobType](id uint64, payload T) *job[T] {
func newJob[T any](id uint64, payload T) *job[T] {
return &job[T]{
ID: id,
Payload: payload,
Expand All @@ -47,9 +43,9 @@ func (j *job[T]) JobCreatedAt() time.Time {
return j.CreatedAt
}

func (j *job[T]) Process() error {
func (j *job[T]) Process(handler func(JobContext, T) error) error {
// Attempt to process the job
if err := j.Payload.Process(j); err != nil {
if err := handler(j, j.Payload); err != nil {
return err
}

Expand Down
24 changes: 14 additions & 10 deletions jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ const defaultFetchInterval = 100 * time.Millisecond
const defaultJobBufferSize = 1000
const defaultJobIDSequenceSize = 100

type JobQueue[T JobType] struct {
db *badger.DB
wg sync.WaitGroup
logger zerolog.Logger
cancel context.CancelFunc
type JobQueue[T any] struct {
db *badger.DB
wg sync.WaitGroup
logger zerolog.Logger
cancel context.CancelFunc
handler func(JobContext, T) error

jobID *badger.Sequence
isJobIDInQueue *xsync.MapOf[uint64, bool]
Expand All @@ -47,7 +48,9 @@ type JobQueue[T JobType] struct {
// NewJobQueue creates a new JobQueue with the specified database, name, and number
// of worker goroutines. It initializes the job queue, starts the worker goroutines,
// and returns the JobQueue instance and an error, if any.
func NewJobQueue[T JobType](dbPath string, name string, workers int, opts ...Option[T]) (*JobQueue[T], error) {
func NewJobQueue[T any](
dbPath string, name string, workers int, handler func(JobContext, T) error, opts ...Option[T],
) (*JobQueue[T], error) {
if workers < 0 {
return nil, errors.New("invalid number of workers")
} else if workers == 0 {
Expand All @@ -60,9 +63,10 @@ func NewJobQueue[T JobType](dbPath string, name string, workers int, opts ...Opt
}

jq := &JobQueue[T]{
db: db,
wg: sync.WaitGroup{},
logger: log.With().Str("module", "JobQueue").Str("jobName", name).Logger(),
db: db,
wg: sync.WaitGroup{},
logger: log.With().Str("module", "JobQueue").Str("jobName", name).Logger(),
handler: handler,

isJobIDInQueue: xsync.NewMapOf[uint64, bool](),
jobs: make(chan *job[T], defaultJobBufferSize),
Expand Down Expand Up @@ -150,7 +154,7 @@ func (jq *JobQueue[T]) processJob(job *job[T]) error {
logger.Info().Msg("Processing job")
}

if err := job.Process(); err != nil {
if err := job.Process(jq.handler); err != nil {
return fmt.Errorf("failed to process job: %w", err)
}

Expand Down
31 changes: 16 additions & 15 deletions jobqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ func init() { //nolint:gochecknoinits // for testing
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
}

type TestJob struct {
type testJob struct {
Msg string
}

func (j TestJob) Process(ctx JobContext) error {
fmt.Println("Test job processed:", j.Msg, ctx.JobID(), ctx.JobCreatedAt().Unix()) //nolint:forbidigo // for testing
func testJobHandler(ctx JobContext, job testJob) error {
fmt.Println("Test job processed:", job.Msg, ctx.JobID(),

Check failure on line 29 in jobqueue_test.go

View workflow job for this annotation

GitHub Actions / Go

use of `fmt.Println` forbidden by pattern `^(fmt\.Print(|f|ln)|print|println)$` (forbidigo)
ctx.JobCreatedAt().Unix()) //nolint:forbidigo // for testing

Check failure on line 30 in jobqueue_test.go

View workflow job for this annotation

GitHub Actions / Go

directive `//nolint:forbidigo // for testing` is unused for linter "forbidigo" (nolintlint)
return nil
}

Expand All @@ -39,7 +40,7 @@ func TestNewJobQueue(t *testing.T) {
dbPath string
queueName string
workers int
options []Option[TestJob]
options []Option[testJob]
expectedError bool
cleanupNeeded bool
}{
Expand Down Expand Up @@ -88,7 +89,7 @@ func TestNewJobQueue(t *testing.T) {
}

// Act
jq, err := NewJobQueue[TestJob](tc.dbPath, tc.queueName, tc.workers, tc.options...)
jq, err := NewJobQueue[testJob](tc.dbPath, tc.queueName, tc.workers, testJobHandler, tc.options...)

// Assert
if tc.expectedError {
Expand All @@ -114,7 +115,7 @@ func TestNewJobQueue(t *testing.T) {
func TestJobQueue_Enqueue(t *testing.T) {
cleanupBadgerDB(t)

jq, err := NewJobQueue[TestJob](BadgerDBPath, "test-job", 0)
jq, err := NewJobQueue[testJob](BadgerDBPath, "test-job", 0, testJobHandler)
assert.NoError(t, err)

t.Cleanup(func() {
Expand All @@ -123,7 +124,7 @@ func TestJobQueue_Enqueue(t *testing.T) {
})

for i := 0; i < 10; i++ {
j := TestJob{Msg: fmt.Sprintf("hello %d", i)}
j := testJob{Msg: fmt.Sprintf("hello %d", i)}

id, err := jq.Enqueue(j)
assert.NoError(t, err)
Expand All @@ -132,7 +133,7 @@ func TestJobQueue_Enqueue(t *testing.T) {
value, err := readJob(jq.db, id)
assert.NoError(t, err)

var dbJob job[TestJob]
var dbJob job[testJob]
err = json.Unmarshal(value, &dbJob)
assert.NoError(t, err)

Expand All @@ -147,7 +148,7 @@ func TestJobQueue_Enqueue(t *testing.T) {
func TestJobQueue_ProcessJob(t *testing.T) {
cleanupBadgerDB(t)

jq, err := NewJobQueue[TestJob](BadgerDBPath, "test-job", 0)
jq, err := NewJobQueue[testJob](BadgerDBPath, "test-job", 0, testJobHandler)
assert.NoError(t, err)

t.Cleanup(func() {
Expand All @@ -158,7 +159,7 @@ func TestJobQueue_ProcessJob(t *testing.T) {
// Queue a bunch of jobs
ids := make([]uint64, 0)
for i := 0; i < 10; i++ {
j := TestJob{Msg: fmt.Sprintf("hello %d", i)}
j := testJob{Msg: fmt.Sprintf("hello %d", i)}

id, err := jq.Enqueue(j)
assert.NoError(t, err)
Expand All @@ -172,7 +173,7 @@ func TestJobQueue_ProcessJob(t *testing.T) {

// Check that the job is what we're expecting
assert.Equal(t, ids[i], j.ID)
assert.Equal(t, TestJob{Msg: fmt.Sprintf("hello %d", i)}, j.Payload)
assert.Equal(t, testJob{Msg: fmt.Sprintf("hello %d", i)}, j.Payload)
assert.Equal(t, JobStatusPending, j.Status)
assert.WithinDuration(t, time.Now(), j.CreatedAt, time.Second)

Expand All @@ -194,29 +195,29 @@ func TestJobQueue_Recovery(t *testing.T) {
cleanupBadgerDB(t)

// Create initial job queue
jq, err := NewJobQueue[TestJob]("/tmp/badger", "test-job", 0)
jq, err := NewJobQueue[testJob]("/tmp/badger", "test-job", 0, testJobHandler)
assert.NoError(t, err)

t.Cleanup(func() {
cleanupBadgerDB(t)
})

// Enqueue job to initial job queue
id, err := jq.Enqueue(TestJob{Msg: "hello"})
id, err := jq.Enqueue(testJob{Msg: "hello"})
assert.NoError(t, err)

// Stop initial job queue
assert.NoError(t, jq.Stop())

// Create recovered job queue
recoveredJq, err := NewJobQueue[TestJob]("/tmp/badger", "test-job", 0)
recoveredJq, err := NewJobQueue[testJob]("/tmp/badger", "test-job", 0, testJobHandler)
assert.NoError(t, err)

j := <-recoveredJq.jobs

// Verify that the job is recovered correctly
assert.Equal(t, id, j.ID)
assert.Equal(t, j.Payload, TestJob{Msg: "hello"})
assert.Equal(t, j.Payload, testJob{Msg: "hello"})

// Process the job in recovered job queue
assert.NoError(t, recoveredJq.processJob(j))
Expand Down
6 changes: 3 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ import (
"time"
)

type Option[T JobType] func(*JobQueue[T])
type Option[T any] func(*JobQueue[T])

// WithFetchInterval sets the interval at which the job queue fetches jobs from BadgerDB.
func WithFetchInterval[T JobType](interval time.Duration) Option[T] {
func WithFetchInterval[T any](interval time.Duration) Option[T] {
return func(jq *JobQueue[T]) {
jq.logger.Debug().Msg(fmt.Sprintf("Fetch interval set to %vms", interval.Milliseconds()))
jq.fetchInterval = interval
}
}

// WithJobBufferSize sets the size of the job channel.
func WithJobBufferSize[T JobType](size int) Option[T] {
func WithJobBufferSize[T any](size int) Option[T] {
return func(jq *JobQueue[T]) {
jq.logger.Debug().Msg(fmt.Sprintf("Job buffer size set to %d", size))
jq.jobs = make(chan *job[T], size)
Expand Down

0 comments on commit 0e7736d

Please sign in to comment.