diff --git a/aggregatedpool/workflow.go b/aggregatedpool/workflow.go index bee62a44..8e38c057 100644 --- a/aggregatedpool/workflow.go +++ b/aggregatedpool/workflow.go @@ -1,6 +1,7 @@ package aggregatedpool import ( + "fmt" "sync/atomic" "time" @@ -149,6 +150,10 @@ func (wp *Workflow) OnWorkflowTaskStarted(t time.Duration) { wp.pipeline = wp.pipeline[1:] if msg.IsCommand() { + if msg.UndefinedResponse() { + panic(fmt.Sprintf("undefined response: %s", msg.Command.(*internal.UndefinedResponse).Message)) + } + err = wp.handleMessage(msg) } diff --git a/go.mod b/go.mod index 38e63a5a..4204201e 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.3.0 github.com/roadrunner-server/endure/v2 v2.2.0 github.com/roadrunner-server/errors v1.2.0 - github.com/roadrunner-server/sdk/v4 v4.1.0 + github.com/roadrunner-server/sdk/v4 v4.2.0 github.com/stretchr/testify v1.8.1 github.com/uber-go/tally/v4 v4.1.6 go.temporal.io/api v1.17.0 @@ -40,7 +40,7 @@ require ( github.com/prometheus/common v0.39.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect github.com/roadrunner-server/goridge/v3 v3.6.2 // indirect - github.com/roadrunner-server/tcplisten v1.2.1 // indirect + github.com/roadrunner-server/tcplisten v1.3.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect @@ -51,7 +51,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.2 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/net v0.6.0 // indirect + golang.org/x/net v0.7.0 // indirect golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect diff --git a/go.sum b/go.sum index 19ed1272..e4ec1664 100644 --- a/go.sum +++ b/go.sum @@ -642,10 +642,10 @@ github.com/roadrunner-server/errors v1.2.0 h1:qBmNXt8Iex9QnYTjCkbJKsBZu2EtYkQCM0 github.com/roadrunner-server/errors v1.2.0/go.mod h1:z0ECxZp/dDa5RahtMcy4mBIavVxiZ9vwE5kByl7kFtY= github.com/roadrunner-server/goridge/v3 v3.6.2 h1:LH5HXfCygDp05KnOaXpa4fqVPWTsH7V3lfvPtMwFU3k= github.com/roadrunner-server/goridge/v3 v3.6.2/go.mod h1:3B95k/wM5GGAD0h2hZlJagS9PlTDGs5jh8MpZYC12vA= -github.com/roadrunner-server/sdk/v4 v4.1.0 h1:akICxF9GQOHggvHW01Try5QiRmr2IrCN0RYWHyGdjDQ= -github.com/roadrunner-server/sdk/v4 v4.1.0/go.mod h1:aIzXmg8DZBJ4Tbtvihp/s6VH4e2oSdivOqm/8V+HuUc= -github.com/roadrunner-server/tcplisten v1.2.1 h1:9hVVMlCRvMPewnJCnfSe/kKAqn2ZOF3wHy+ji0M/NKU= -github.com/roadrunner-server/tcplisten v1.2.1/go.mod h1:TRJLGwIruiJ7QhmGVRgJFY5Ch72mPoLhLAxuxLnavpU= +github.com/roadrunner-server/sdk/v4 v4.2.0 h1:hqNlqJV2MXZ8DF1wJnouUdV/55Hae6VL37fVXT1aIr8= +github.com/roadrunner-server/sdk/v4 v4.2.0/go.mod h1:aIzXmg8DZBJ4Tbtvihp/s6VH4e2oSdivOqm/8V+HuUc= +github.com/roadrunner-server/tcplisten v1.3.0 h1:VDd6IbP8oIjm5vKvMVozeZgeHgOcoP0XYLOyOqcZHCY= +github.com/roadrunner-server/tcplisten v1.3.0/go.mod h1:VR6Ob5am0oEuLMOeLiVvQxG9ShykAEgrlvZddX8EfoU= github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -831,8 +831,9 @@ golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfS golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= -golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/internal/protocol.go b/internal/protocol.go index cb4c041b..a06aa818 100644 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -39,6 +39,8 @@ const ( signalExternalWorkflowCommand = "SignalExternalWorkflow" cancelExternalWorkflowCommand = "CancelExternalWorkflow" + undefinedResponse = "UndefinedResponse" + cancelCommand = "Cancel" panicCommand = "Panic" ) @@ -59,16 +61,12 @@ type Context struct { type Message struct { // ID contains ID of the command, response or error. ID uint64 `json:"id"` - // Command of the message in unmarshalled form. Pointer. Command any `json:"command,omitempty"` - // Failure associated with command id. Failure *failure.Failure `json:"failure,omitempty"` - // Payloads contains message specific payloads in binary format. Payloads *commonpb.Payloads `json:"payloads,omitempty"` - // Header Header *commonpb.Header `json:"header,omitempty"` } @@ -83,6 +81,14 @@ func (msg *Message) IsCommand() bool { return msg.Command != nil } +func (msg *Message) UndefinedResponse() bool { + if _, ok := msg.Command.(*UndefinedResponse); ok { + return true + } + + return false +} + func (msg *Message) Reset() { msg.ID = 0 msg.Command = nil @@ -251,6 +257,11 @@ type CancelExternalWorkflow struct { RunID string `json:"runID"` } +// UndefinedResponse indicates that we should panic the workflow +type UndefinedResponse struct { + Message string `json:"message"` +} + // Cancel one or multiple internal promises (activities, local activities, timers, child workflows). type Cancel struct { // CommandIDs to be canceled. @@ -471,7 +482,10 @@ func InitCommand(name string) (any, error) { case panicCommand: return &Panic{}, nil + case undefinedResponse: + return &UndefinedResponse{}, nil + default: - return nil, errors.E(op, errors.Errorf("undefined command name: %s", name)) + return nil, errors.E(op, errors.Errorf("undefined command name: %s, possible outdated RoadRunner version", name)) } } diff --git a/proto/api b/proto/api index e381e518..81ffad23 160000 --- a/proto/api +++ b/proto/api @@ -1 +1 @@ -Subproject commit e381e51864ec8f43a90750ef936705258b8f64b2 +Subproject commit 81ffad237fd9805a2216d22559aaf7acdc8ced17 diff --git a/proto/protocol/v1/protocol.pb.go b/proto/protocol/v1/protocol.pb.go index c7bc7b34..a2495a7b 100644 --- a/proto/protocol/v1/protocol.pb.go +++ b/proto/protocol/v1/protocol.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.10 +// protoc v3.21.12 // source: protocol/v1/protocol.proto package protocolV1