Skip to content

Commit

Permalink
Merge from main of the test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
pconstantinou committed Feb 9, 2024
1 parent 94d7d68 commit af450bf
Show file tree
Hide file tree
Showing 14 changed files with 663 additions and 193 deletions.
41 changes: 26 additions & 15 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, e
}

// Enqueue queues jobs to be executed asynchronously
func (m *MemBackend) Enqueue(_ context.Context, job *jobs.Job) (jobID string, err error) {
func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) {
options := neoq.JobOptions{}
for _, opt := range jobOptions {
opt(&options)
}

var queueChan chan *jobs.Job
var qc any
var ok bool
Expand Down Expand Up @@ -96,28 +101,34 @@ func (m *MemBackend) Enqueue(_ context.Context, job *jobs.Job) (jobID string, er
job.RunAfter = now
}

if job.Queue == "" {
err = jobs.ErrNoQueueSpecified
return
}

err = jobs.FingerprintJob(job)
if err != nil {
err = errors.Join(jobs.ErrCantGenerateFingerprint, err)
return
}

// if the job fingerprint is already known, don't queue the job
if _, found := m.fingerprints.Load(job.Fingerprint); found {
return jobs.DuplicateJobID, nil
}

m.fingerprints.Store(job.Fingerprint, job)
m.mu.Lock()
m.jobCount++
m.mu.Unlock()
if !options.Override {
return jobs.DuplicateJobID, jobs.ErrJobFingerprintConflict
}
oldJob, found := m.fingerprints.Swap(job.Fingerprint, job)
if found {
// Return the same JobID to make it the same as posgres
job.ID = oldJob.(*jobs.Job).ID
} else {
m.logger.Info("Expected to get job but none was returned for fingerprint %s", job.Fingerprint)
}
jobID = fmt.Sprint(job.ID)

job.ID = m.jobCount
jobID = fmt.Sprint(m.jobCount)
} else {
m.fingerprints.Store(job.Fingerprint, job)
m.mu.Lock()
m.jobCount++
m.mu.Unlock()
job.ID = m.jobCount
jobID = fmt.Sprint(m.jobCount)
}

if job.RunAfter.Equal(now) {
queueChan <- job
Expand Down
19 changes: 13 additions & 6 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/backends"
"github.com/acaloiaro/neoq/backends/memory"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/jobs"
Expand All @@ -21,9 +22,9 @@ import (
"golang.org/x/exp/slog"
)

const (
queue = "testing"
)
var queue = "testing"
var q1 = "queue1"
var q2 = "queue2"

var (
errPeriodicTimeout = errors.New("timed out waiting for periodic job")
Expand Down Expand Up @@ -231,9 +232,6 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
jobsProcessed1 := 0
jobsProcessed2 := 0

q1 := "queue1"
q2 := "queue2"

done1 := make(chan bool)
done2 := make(chan bool)

Expand Down Expand Up @@ -427,3 +425,12 @@ result_loop:
t.Error(err)
}
}

func TestSuite(t *testing.T) {
ctx := context.Background()
n, err := neoq.New(ctx, neoq.WithBackend(memory.Backend), neoq.WithLogLevel(logging.LogLevelDebug))
if err != nil {
t.Fatal(err)
}
backends.NewNeoQTestSuite(n).Run(t)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP CONSTRAINT neoq_jobs_fingerprint_run_unique_idx;
DROP INDEX neoq_jobs_fingerprint_unique_idx;
CREATE UNIQUE INDEX IF NOT EXISTS neoq_jobs_fingerprint_unique_idx ON neoq_jobs (fingerprint, status) WHERE NOT (status = 'processed');
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE UNIQUE INDEX IF NOT EXISTS neoq_jobs_fingerprint_run_unique_idx ON neoq_jobs (queue, fingerprint, status, ran_at);
DROP INDEX neoq_jobs_fingerprint_unique_idx;
CREATE UNIQUE INDEX IF NOT EXISTS neoq_jobs_fingerprint_unique_idx ON neoq_jobs (queue, fingerprint, status) WHERE NOT (status = 'processed');
42 changes: 32 additions & 10 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,13 @@ func (p *PgBackend) initializeDB() (err error) {
sslMode)
m, err := migrate.NewWithSourceInstance("iofs", migrations, pqConnectionString)
if err != nil {
p.logger.Error("unable to run migrations", slog.Any("error", err))
pqConnectionString = fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=%s&x-migrations-table=neoq_schema_migrations",
pgxCfg.User,
"*******",
pgxCfg.Host,
pgxCfg.Database,
sslMode)
p.logger.Error("unable to run migrations", slog.Any("error", err), slog.Any("url", pqConnectionString))
return
}
// We don't need the migration tooling to hold it's connections to the DB once it has been completed.
Expand All @@ -377,7 +383,11 @@ func (p *PgBackend) initializeDB() (err error) {
}

// Enqueue adds jobs to the specified queue
func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) {
func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) {
options := neoq.JobOptions{}
for _, opt := range jobOptions {
opt(&options)
}
if job.Queue == "" {
err = jobs.ErrNoQueueSpecified
return
Expand All @@ -403,18 +413,17 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
// Rollback is safe to call even if the tx is already closed, so if
// the tx commits successfully, this is a no-op
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed
jobID, err = p.enqueueJob(ctx, tx, job)
jobID, err = p.enqueueJob(ctx, tx, job, options)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == pgerrcode.UniqueViolation {
err = ErrDuplicateJob
err = jobs.ErrJobFingerprintConflict
return
}
}
p.logger.Error("error enqueueing job", slog.String("queue", job.Queue), slog.Any("error", err))
err = fmt.Errorf("error enqueuing job: %w", err)
return
}

err = tx.Commit(ctx)
Expand Down Expand Up @@ -544,16 +553,29 @@ func (p *PgBackend) Shutdown(ctx context.Context) {
//
// Jobs that are not already fingerprinted are fingerprinted before being added
// Duplicate jobs are not added to the queue. Any two unprocessed jobs with the same fingerprint are duplicates
func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (jobID string, err error) {
func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job, options neoq.JobOptions) (jobID string, err error) {
err = jobs.FingerprintJob(j)
if err != nil {
return
return jobID,
fmt.Errorf("%w: %v", jobs.ErrCantGenerateFingerprint, err)
}

p.logger.Debug("adding job to the queue", slog.String("queue", j.Queue))
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
if !options.Override {
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
} else {
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (queue, status, fingerprint, ran_at) DO
UPDATE SET
payload=$3, run_after=$4, deadline=$5, max_retries=$6
RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
if err != nil {
p.logger.Error("error enqueueing override job", slog.Any("error", err))
}
}

if err != nil {
err = fmt.Errorf("unable add job to queue: %w", err)
return
Expand Down
19 changes: 18 additions & 1 deletion backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/backends"
"github.com/acaloiaro/neoq/backends/postgres"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
Expand All @@ -24,6 +25,7 @@ import (

const (
ConcurrentWorkers = 8
queue1 = "queue1"
)

var errPeriodicTimeout = errors.New("timed out waiting for periodic job")
Expand Down Expand Up @@ -245,7 +247,7 @@ func TestDuplicateJobRejection(t *testing.T) {
})

// we submitted two duplicate jobs; the error should be a duplicate job error
if !errors.Is(e2, postgres.ErrDuplicateJob) {
if !errors.Is(e2, jobs.ErrJobFingerprintConflict) {
err = e2
}

Expand Down Expand Up @@ -773,3 +775,18 @@ func Test_ConnectionTimeout(t *testing.T) {
t.Error(err)
}
}

func SetupSuite(t *testing.T) (neoq.Neoq, context.Context) {
connString, _ := prepareAndCleanupDB(t)

ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
t.Fatal(err)
}
return nq, ctx
}
func TestSuite(t *testing.T) {
n, _ := SetupSuite(t)
backends.NewNeoQTestSuite(n).Run(t)
}
13 changes: 12 additions & 1 deletion backends/redis/redis_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,12 @@ func WithShutdownTimeout(timeout time.Duration) neoq.ConfigOption {
}

// Enqueue queues jobs to be executed asynchronously
func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) {
func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) {
options := neoq.JobOptions{}
for _, opt := range jobOptions {
opt(&options)
}

if job.Queue == "" {
err = jobs.ErrNoQueueSpecified
return
Expand All @@ -195,6 +200,12 @@ func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string
if err != nil {
return
}
if options.Override {
err = b.inspector.DeleteTask(job.Queue, job.Fingerprint)
if errors.Is(err, asynq.ErrTaskNotFound) {
b.logger.Debug("Overriding a task that does not exists queue:[%s] Fingerprint: [%s]", job.Queue, job.Fingerprint)
}
}
task := asynq.NewTask(job.Queue, payload)
_, err = b.client.EnqueueContext(ctx, task, jobToTaskOptions(job)...)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions backends/redis/redis_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/backends"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
Expand Down Expand Up @@ -441,3 +442,25 @@ result_loop:
t.Error(err)
}
}

func TestSuite(t *testing.T) {
connString := os.Getenv("TEST_REDIS_URL")
if connString == "" {
t.Skip("Skipping: TEST_REDIS_URL not set")
return
}

password := os.Getenv("REDIS_PASSWORD")
ctx := context.Background()
nq, err := neoq.New(
ctx,
neoq.WithBackend(Backend),
WithAddr(connString),
WithPassword(password),
WithShutdownTimeout(500*time.Millisecond))
if err != nil {
t.Fatal(err)
}

backends.NewNeoQTestSuite(nq).Run(t)
}
30 changes: 30 additions & 0 deletions backends/suite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package backends

import (
"context"
"testing"

"github.com/acaloiaro/neoq"
"github.com/stretchr/testify/suite"
)

type NeoQTestSuite struct {
suite.Suite
NeoQ neoq.Neoq
}

// NewNeoQTestSuite constructs a new NeoQ test suite that can be used to test
// any impementation of the queue
func NewNeoQTestSuite(q neoq.Neoq) *NeoQTestSuite {
n := new(NeoQTestSuite)
n.NeoQ = q
return n
}

func (s *NeoQTestSuite) Run(t *testing.T) {
suite.Run(t, s)
}

func (s *NeoQTestSuite) TearDownSuite() {
s.NeoQ.Shutdown(context.Background())
}
Loading

0 comments on commit af450bf

Please sign in to comment.