Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: disable activity workers #592

Merged
merged 5 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions aggregatedpool/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ type Activity struct {
seqID uint64
running sync.Map

pldPool *sync.Pool
pldPool *sync.Pool
disableActivityWorkers bool
}

func NewActivityDefinition(ac common.Codec, p common.Pool, log *zap.Logger) *Activity {
func NewActivityDefinition(ac common.Codec, p common.Pool, log *zap.Logger, disableActivityWorkers bool) *Activity {
return &Activity{
log: log,
codec: ac,
Expand All @@ -43,6 +44,7 @@ func NewActivityDefinition(ac common.Codec, p common.Pool, log *zap.Logger) *Act
return new(payload.Payload)
},
},
disableActivityWorkers: disableActivityWorkers,
}
}

Expand Down
15 changes: 15 additions & 0 deletions aggregatedpool/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo
log.Debug("workflow registered", zap.String(tq, wi[i].TaskQueue), zap.Any("workflow name", wi[i].Workflows[j].Name))
}

if actDef.disableActivityWorkers {
log.Debug("activity workers disabled", zap.String(tq, wi[i].TaskQueue))
goto RegisterWorkflows
}

for j := 0; j < len(wi[i].Activities); j++ {
rustatian marked this conversation as resolved.
Show resolved Hide resolved
wrk.RegisterActivityWithOptions(actDef.execute, tActivity.RegisterOptions{
Name: wi[i].Activities[j].Name,
Expand All @@ -95,6 +100,16 @@ func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo
log.Debug("activity registered", zap.String(tq, wi[i].TaskQueue), zap.Any("workflow name", wi[i].Activities[j].Name))
}

RegisterWorkflows:
for j := 0; j < len(wi[i].Workflows); j++ {
wrk.RegisterWorkflowWithOptions(wDef, workflow.RegisterOptions{
Name: wi[i].Workflows[j].Name,
DisableAlreadyRegisteredCheck: false,
})

log.Debug("workflow registered", zap.String(tq, wi[i].TaskQueue), zap.Any("workflow name", wi[i].Workflows[j].Name))
}

workers = append(workers, wrk)
}

Expand Down
4 changes: 3 additions & 1 deletion common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (

"github.com/roadrunner-server/pool/payload"
"github.com/roadrunner-server/pool/pool"
staticPool "github.com/roadrunner-server/pool/pool/static_pool"
"github.com/roadrunner-server/pool/state/process"
"github.com/roadrunner-server/pool/worker"
"github.com/temporalio/roadrunner-temporal/v5/internal"
"go.temporal.io/sdk/interceptor"
"go.uber.org/zap"

staticPool "github.com/roadrunner-server/pool/pool/static_pool"
)

type Interceptor interface {
Expand Down Expand Up @@ -64,4 +65,5 @@ type Configurer interface {
// Server creates workers for the application.
type Server interface {
NewPool(ctx context.Context, cfg *pool.Config, env map[string]string, _ *zap.Logger) (*staticPool.Pool, error)
NewPoolWithOptions(ctx context.Context, cfg *pool.Config, env map[string]string, _ *zap.Logger, options ...staticPool.Options) (*staticPool.Pool, error)
}
7 changes: 4 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (

// Config of the temporal client and dependent services.
type Config struct {
Metrics *Metrics `mapstructure:"metrics"`
Activities *pool.Config `mapstructure:"activities"`
TLS *TLS `mapstructure:"tls, omitempty"`
Metrics *Metrics `mapstructure:"metrics"`
Activities *pool.Config `mapstructure:"activities"`
TLS *TLS `mapstructure:"tls, omitempty"`
DisableActivityWorkers bool `mapstructure:"disable_activity_workers"`

Address string `mapstructure:"address"`
Namespace string `mapstructure:"namespace"`
Expand Down
19 changes: 12 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@ go 1.23
toolchain go1.23.2

require (
github.com/goccy/go-json v0.10.3
github.com/goccy/go-json v0.10.4
github.com/google/uuid v1.6.0
github.com/roadrunner-server/api/v4 v4.16.0
github.com/roadrunner-server/endure/v2 v2.6.1
github.com/roadrunner-server/errors v1.4.1
github.com/roadrunner-server/events v1.0.1
github.com/roadrunner-server/pool v1.1.1
github.com/roadrunner-server/pool v1.1.2
github.com/stretchr/testify v1.10.0
github.com/uber-go/tally/v4 v4.1.17-0.20240412215630-22fe011f5ff0
go.temporal.io/api v1.43.0
go.temporal.io/sdk v1.31.0
go.temporal.io/sdk/contrib/tally v0.2.0
go.temporal.io/server v1.25.2
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.35.2
google.golang.org/protobuf v1.36.0
)

require (
go.opentelemetry.io/otel v1.33.0 // indirect
go.opentelemetry.io/otel/sdk v1.33.0 // indirect
)

replace github.com/uber-go/tally/v4 => github.com/uber-go/tally/v4 v4.1.10
Expand Down Expand Up @@ -56,14 +61,14 @@ require (
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d // indirect
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/grpc v1.68.1
google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
google.golang.org/grpc v1.69.0
gopkg.in/yaml.v3 v3.0.1 // indirect
)
36 changes: 22 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM=
github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/googleapis v1.4.1/go.mod h1:2lpHqI5OcWCtVElxXnPt+s8oJvMpySlOyM6xDCrzib4=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down Expand Up @@ -179,8 +181,8 @@ github.com/roadrunner-server/events v1.0.1 h1:waCkKhxhzdK3VcI1xG22l+h+0J+Nfdpxjh
github.com/roadrunner-server/events v1.0.1/go.mod h1:WZRqoEVaFm209t52EuoT7ISUtvX6BrCi6bI/7pjkVC0=
github.com/roadrunner-server/goridge/v3 v3.8.3 h1:XmjrOFnI6ZbQTPaP39DEk8KwLUNTgjluK3pcZaW6ixQ=
github.com/roadrunner-server/goridge/v3 v3.8.3/go.mod h1:4TZU8zgkKIZCsH51qwGMpvyXCT59u/8z6q8sCe4ZGAQ=
github.com/roadrunner-server/pool v1.1.1 h1:ThMGASxLV/GvVXk/J80E2yY1nozfPwFlv93fH0xYDMg=
github.com/roadrunner-server/pool v1.1.1/go.mod h1:p6t1w4hcNgZGTgap+RYqoPIQ9jDXJwsr217+ROsVFro=
github.com/roadrunner-server/pool v1.1.2 h1:jdXGh/WG2YUp7SrlvXP+ZDYbCWvHdAaecYOHL4+ub3c=
github.com/roadrunner-server/pool v1.1.2/go.mod h1:KSdUYPMXHtCp4Qky31DCEV7n6ArETv9y1GyvzroY9mM=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down Expand Up @@ -222,6 +224,12 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw=
go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ=
go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM=
go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc=
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA=
go.temporal.io/api v1.43.0 h1:lBhq+u5qFJqGMXwWsmg/i8qn1UA/3LCwVc88l2xUMHg=
Expand Down Expand Up @@ -255,8 +263,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d h1:0olWaB5pg3+oychR51GUVCEsGkeCU/2JxjBgIo4f3M0=
golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c=
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 h1:1UoZQm6f0P/ZO0w1Ri+f+ifG/gXhegadRdwBIXEFWDo=
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down Expand Up @@ -361,10 +369,10 @@ google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a h1:OAiGFfOiA0v9MRYsSidp3ubZaBnteRUyn3xB2ZQ5G/E=
google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a h1:hgh8P4EuoxpsuKMXX/To36nOFD7vixReXgn8lPGnt+o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU=
google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 h1:ChAdCYNQFDk5fYvFZMywKLIijG7TC2m1C2CMEu11G3o=
google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484/go.mod h1:KRUmxRI4JmbpAm8gcZM4Jsffi859fo5LQjILwuqj9z8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand All @@ -374,8 +382,8 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0=
google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw=
google.golang.org/grpc v1.69.0 h1:quSiOM1GJPmPH5XtU+BCoVXcDVJJAzNcoyfC2cCjGkI=
google.golang.org/grpc v1.69.0/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand All @@ -388,8 +396,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
Loading
Loading