Skip to content

Commit

Permalink
[#151]: feature: support API v4
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jan 23, 2023
2 parents 30b078d + dfbc197 commit 7c46348
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 42 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/aws/smithy-go v1.13.5
github.com/goccy/go-json v0.10.0
github.com/google/uuid v1.3.0
github.com/roadrunner-server/api/v3 v3.1.2
github.com/roadrunner-server/api/v4 v4.0.0
github.com/roadrunner-server/errors v1.2.0
github.com/roadrunner-server/sdk/v4 v4.0.0
github.com/stretchr/testify v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/roadrunner-server/api/v3 v3.1.2 h1:IgBUajBktfH9hyNFxdPQ3bXgzU8SCurTx9LUFf95kns=
github.com/roadrunner-server/api/v3 v3.1.2/go.mod h1:hlv546SS3tZVIDu68YekPCLS/82Y/AoS8Eu5xysvwBQ=
github.com/roadrunner-server/api/v4 v4.0.0 h1:4zAnlMHp2BKgxxPSuPGQSVCMtPKX/R+/czWewpDkPak=
github.com/roadrunner-server/api/v4 v4.0.0/go.mod h1:tbk/rqlNiLFAchTKrXvsJ4boAg0qZmxyK8vWH2PlV8U=
github.com/roadrunner-server/errors v1.2.0 h1:qBmNXt8Iex9QnYTjCkbJKsBZu2EtYkQCM06GUDcQBbI=
github.com/roadrunner-server/errors v1.2.0/go.mod h1:z0ECxZp/dDa5RahtMcy4mBIavVxiZ9vwE5kByl7kFtY=
github.com/roadrunner-server/sdk/v4 v4.0.0 h1:obdmZLtSA1g/1zsklMcGyRpR5q7tCx+wLiLHupVR8zc=
Expand Down
10 changes: 5 additions & 5 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"sync"
"time"

"github.com/roadrunner-server/api/v3/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v3/plugins/v1/priority_queue"
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v4/plugins/v1/priority_queue"
"github.com/roadrunner-server/sqs/v4/sqsjobs"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -59,13 +59,13 @@ func (p *Plugin) Name() string {
return pluginName
}

func (p *Plugin) ConsumerFromConfig(configKey string, pq pq.Queue) (jobs.Consumer, error) {
func (p *Plugin) DriverFromConfig(configKey string, pq pq.Queue, pipeline jobs.Pipeline, _ chan<- jobs.Commander) (jobs.Driver, error) {
p.mu.RLock()
defer p.mu.RUnlock()
return sqsjobs.NewSQSConsumer(configKey, p.insideAWS, p.log, p.cfg, pq)
return sqsjobs.FromConfig(configKey, p.insideAWS, pipeline, p.log, p.cfg, pq)
}

func (p *Plugin) ConsumerFromPipeline(pipe jobs.Pipeline, pq pq.Queue) (jobs.Consumer, error) {
func (p *Plugin) DriverFromPipeline(pipe jobs.Pipeline, pq pq.Queue, _ chan<- jobs.Commander) (jobs.Driver, error) {
p.mu.RLock()
defer p.mu.RUnlock()
return sqsjobs.FromPipeline(pipe, p.insideAWS, p.log, p.cfg, pq)
Expand Down
60 changes: 30 additions & 30 deletions sqsjobs/consumer.go → sqsjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,24 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/aws/smithy-go"
"github.com/roadrunner-server/api/v3/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v3/plugins/v1/priority_queue"
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v4/plugins/v1/priority_queue"
"github.com/roadrunner-server/errors"
"go.uber.org/zap"
)

const pluginName string = "sqs"

var _ jobs.Driver = (*Driver)(nil)

type Configurer interface {
// UnmarshalKey takes a single key and unmarshal it into a Struct.
UnmarshalKey(name string, out any) error
// Has checks if config section exists.
Has(name string) bool
}

type Consumer struct {
type Driver struct {
mu sync.Mutex
cond sync.Cond
msgInFlight *int64
Expand Down Expand Up @@ -64,7 +66,7 @@ type Consumer struct {
pauseCh chan struct{}
}

func NewSQSConsumer(configKey string, insideAWS bool, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Consumer, error) {
func FromConfig(configKey string, insideAWS bool, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error) {
const op = errors.Op("new_sqs_consumer")

// if no such key - error
Expand Down Expand Up @@ -94,8 +96,8 @@ func NewSQSConsumer(configKey string, insideAWS bool, log *zap.Logger, cfg Confi

conf.InitDefault()

// initialize job Consumer
jb := &Consumer{
// initialize job Driver
jb := &Driver{
cond: sync.Cond{L: &sync.Mutex{}},
pq: pq,
log: log,
Expand Down Expand Up @@ -125,6 +127,8 @@ func NewSQSConsumer(configKey string, insideAWS bool, log *zap.Logger, cfg Confi
return nil, errors.E(op, err)
}

jb.pipeline.Store(&pipe)

// To successfully create a new queue, you must provide a
// queue name that adheres to the limits related to queues
// (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
Expand All @@ -136,7 +140,7 @@ func NewSQSConsumer(configKey string, insideAWS bool, log *zap.Logger, cfg Confi
return jb, nil
}

func FromPipeline(pipe jobs.Pipeline, insideAWS bool, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Consumer, error) {
func FromPipeline(pipe jobs.Pipeline, insideAWS bool, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error) {
const op = errors.Op("new_sqs_consumer")

// if no global section
Expand Down Expand Up @@ -167,8 +171,8 @@ func FromPipeline(pipe jobs.Pipeline, insideAWS bool, log *zap.Logger, cfg Confi
return nil, errors.E(op, err)
}

// initialize job Consumer
jb := &Consumer{
// initialize job Driver
jb := &Driver{
cond: sync.Cond{L: &sync.Mutex{}},
pq: pq,
log: log,
Expand Down Expand Up @@ -199,6 +203,7 @@ func FromPipeline(pipe jobs.Pipeline, insideAWS bool, log *zap.Logger, cfg Confi
return nil, errors.E(op, err)
}

jb.pipeline.Store(&pipe)
// To successfully create a new queue, you must provide a
// queue name that adheres to the limits related to queues
// (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
Expand All @@ -210,7 +215,7 @@ func FromPipeline(pipe jobs.Pipeline, insideAWS bool, log *zap.Logger, cfg Confi
return jb, nil
}

func (c *Consumer) Push(ctx context.Context, jb jobs.Job) error {
func (c *Driver) Push(ctx context.Context, jb jobs.Job) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered

Expand All @@ -234,7 +239,7 @@ func (c *Consumer) Push(ctx context.Context, jb jobs.Job) error {
return nil
}

func (c *Consumer) State(ctx context.Context) (*jobs.State, error) {
func (c *Driver) State(ctx context.Context) (*jobs.State, error) {
const op = errors.Op("sqs_state")
attr, err := c.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
QueueUrl: c.queueURL,
Expand Down Expand Up @@ -277,12 +282,7 @@ func (c *Consumer) State(ctx context.Context) (*jobs.State, error) {
return out, nil
}

func (c *Consumer) Register(_ context.Context, p jobs.Pipeline) error {
c.pipeline.Store(&p)
return nil
}

func (c *Consumer) Run(_ context.Context, p jobs.Pipeline) error {
func (c *Driver) Run(_ context.Context, p jobs.Pipeline) error {
start := time.Now()
const op = errors.Op("sqs_run")

Expand All @@ -305,7 +305,7 @@ func (c *Consumer) Run(_ context.Context, p jobs.Pipeline) error {
return nil
}

func (c *Consumer) Stop(context.Context) error {
func (c *Driver) Stop(context.Context) error {
start := time.Now()

if atomic.LoadUint32(&c.listeners) > 0 {
Expand All @@ -323,20 +323,18 @@ func (c *Consumer) Stop(context.Context) error {
return nil
}

func (c *Consumer) Pause(_ context.Context, p string) {
func (c *Driver) Pause(_ context.Context, p string) error {
start := time.Now()
// load atomic value
pipe := *c.pipeline.Load()
if pipe.Name() != p {
c.log.Error("no such pipeline", zap.String("requested", p), zap.String("actual", pipe.Name()))
return
return errors.Errorf("no such pipeline: %s", pipe.Name())
}

l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
c.log.Warn("no active listeners, nothing to pause")
return
return errors.Str("no active listeners, nothing to pause")
}

atomic.AddUint32(&c.listeners, ^uint32(0))
Expand All @@ -351,22 +349,22 @@ func (c *Consumer) Pause(_ context.Context, p string) {
c.cond.Signal()

c.log.Debug("pipeline was paused", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", time.Now()), zap.Duration("elapsed", time.Since(start)))

return nil
}

func (c *Consumer) Resume(_ context.Context, p string) {
func (c *Driver) Resume(_ context.Context, p string) error {
start := time.Now()
// load atomic value
pipe := *c.pipeline.Load()
if pipe.Name() != p {
c.log.Error("no such pipeline", zap.String("requested", p), zap.String("actual", pipe.Name()))
return
return errors.Errorf("no such pipeline: %s", pipe.Name())
}

l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 1 {
c.log.Warn("sqs listener already in the active state")
return
return errors.Str("sqs listener is already in the active state")
}

var ctx context.Context
Expand All @@ -377,9 +375,11 @@ func (c *Consumer) Resume(_ context.Context, p string) {
// increase num of listeners
atomic.AddUint32(&c.listeners, 1)
c.log.Debug("pipeline was resumed", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", time.Now()), zap.Duration("elapsed", time.Since(start)))

return nil
}

func (c *Consumer) handleItem(ctx context.Context, msg *Item) error {
func (c *Driver) handleItem(ctx context.Context, msg *Item) error {
d, err := msg.pack(c.queueURL, c.queue, c.messageGroupID)
if err != nil {
return err
Expand Down Expand Up @@ -429,7 +429,7 @@ func checkEnv(insideAWS bool, key, secret, sessionToken, endpoint, region string
return client, nil
}

func manageQueue(jb *Consumer) error {
func manageQueue(jb *Driver) error {
var err error
switch jb.skipDeclare {
case true:
Expand Down
6 changes: 3 additions & 3 deletions sqsjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/goccy/go-json"
"github.com/google/uuid"
"github.com/roadrunner-server/api/v3/plugins/v1/jobs"
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v4/utils"
"go.uber.org/zap"
Expand Down Expand Up @@ -242,7 +242,7 @@ func (i *Item) pack(queueURL, origQueue *string, mg string) (*sqs.SendMessageInp
}, nil
}

func (c *Consumer) fromMsg(msg *types.Message) (*Item, error) {
func (c *Driver) fromMsg(msg *types.Message) (*Item, error) {
item, err := c.unpack(msg)
if err == nil {
return item, nil
Expand Down Expand Up @@ -314,7 +314,7 @@ func (c *Consumer) fromMsg(msg *types.Message) (*Item, error) {
}
}

func (c *Consumer) unpack(msg *types.Message) (*Item, error) {
func (c *Driver) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
Expand Down
2 changes: 1 addition & 1 deletion sqsjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
auto string = "deduced_by_rr"
)

func (c *Consumer) listen(ctx context.Context) { //nolint:gocognit
func (c *Driver) listen(ctx context.Context) { //nolint:gocognit
go func() {
for {
select {
Expand Down

0 comments on commit 7c46348

Please sign in to comment.