Skip to content

Commit

Permalink
[#334]: fix: add additional message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Feb 16, 2023
2 parents a0939a5 + 90cbaa7 commit 517b6c4
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 7 deletions.
5 changes: 5 additions & 0 deletions aggregatedpool/workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package aggregatedpool

import (
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -149,6 +150,10 @@ func (wp *Workflow) OnWorkflowTaskStarted(t time.Duration) {
wp.pipeline = wp.pipeline[1:]

if msg.IsCommand() {
if msg.UndefinedResponse() {
panic(fmt.Sprintf("undefined response: %s", msg.Command.(*internal.UndefinedResponse).Message))
}

err = wp.handleMessage(msg)
}

Expand Down
24 changes: 19 additions & 5 deletions internal/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
signalExternalWorkflowCommand = "SignalExternalWorkflow"
cancelExternalWorkflowCommand = "CancelExternalWorkflow"

undefinedResponse = "UndefinedResponse"

cancelCommand = "Cancel"
panicCommand = "Panic"
)
Expand All @@ -59,16 +61,12 @@ type Context struct {
type Message struct {
// ID contains ID of the command, response or error.
ID uint64 `json:"id"`

// Command of the message in unmarshalled form. Pointer.
Command any `json:"command,omitempty"`

// Failure associated with command id.
Failure *failure.Failure `json:"failure,omitempty"`

// Payloads contains message specific payloads in binary format.
Payloads *commonpb.Payloads `json:"payloads,omitempty"`

// Header
Header *commonpb.Header `json:"header,omitempty"`
}
Expand All @@ -83,6 +81,14 @@ func (msg *Message) IsCommand() bool {
return msg.Command != nil
}

func (msg *Message) UndefinedResponse() bool {
if _, ok := msg.Command.(*UndefinedResponse); ok {
return true
}

return false
}

func (msg *Message) Reset() {
msg.ID = 0
msg.Command = nil
Expand Down Expand Up @@ -251,6 +257,11 @@ type CancelExternalWorkflow struct {
RunID string `json:"runID"`
}

// UndefinedResponse indicates that we should panic the workflow
type UndefinedResponse struct {
Message string `json:"message"`
}

// Cancel one or multiple internal promises (activities, local activities, timers, child workflows).
type Cancel struct {
// CommandIDs to be canceled.
Expand Down Expand Up @@ -471,7 +482,10 @@ func InitCommand(name string) (any, error) {
case panicCommand:
return &Panic{}, nil

case undefinedResponse:
return &UndefinedResponse{}, nil

default:
return nil, errors.E(op, errors.Errorf("undefined command name: %s", name))
return nil, errors.E(op, errors.Errorf("undefined command name: %s, possible outdated RoadRunner version", name))
}
}
2 changes: 1 addition & 1 deletion proto/api
Submodule api updated 36 files
+2 −1 .github/CODEOWNERS
+1 −1 Makefile
+2 −2 temporal/api/batch/v1/message.proto
+7 −19 temporal/api/command/v1/message.proto
+2 −2 temporal/api/common/v1/message.proto
+2 −2 temporal/api/enums/v1/batch_operation.proto
+3 −8 temporal/api/enums/v1/command_type.proto
+3 −2 temporal/api/enums/v1/common.proto
+8 −8 temporal/api/enums/v1/event_type.proto
+25 −2 temporal/api/enums/v1/failed_cause.proto
+2 −2 temporal/api/enums/v1/namespace.proto
+2 −2 temporal/api/enums/v1/query.proto
+2 −2 temporal/api/enums/v1/reset.proto
+2 −2 temporal/api/enums/v1/schedule.proto
+2 −2 temporal/api/enums/v1/task_queue.proto
+24 −8 temporal/api/enums/v1/update.proto
+2 −2 temporal/api/enums/v1/workflow.proto
+2 −2 temporal/api/errordetails/v1/message.proto
+2 −2 temporal/api/failure/v1/message.proto
+2 −2 temporal/api/filter/v1/message.proto
+48 −21 temporal/api/history/v1/message.proto
+0 −87 temporal/api/interaction/v1/message.proto
+4 −2 temporal/api/namespace/v1/message.proto
+5 −2 temporal/api/operatorservice/v1/request_response.proto
+2 −2 temporal/api/operatorservice/v1/service.proto
+29 −11 temporal/api/protocol/v1/message.proto
+2 −2 temporal/api/query/v1/message.proto
+2 −2 temporal/api/replication/v1/message.proto
+2 −2 temporal/api/schedule/v1/message.proto
+63 −0 temporal/api/sdk/v1/task_complete_metadata.proto
+2 −2 temporal/api/taskqueue/v1/message.proto
+111 −0 temporal/api/update/v1/message.proto
+2 −2 temporal/api/version/v1/message.proto
+2 −2 temporal/api/workflow/v1/message.proto
+59 −30 temporal/api/workflowservice/v1/request_response.proto
+4 −4 temporal/api/workflowservice/v1/service.proto
2 changes: 1 addition & 1 deletion proto/protocol/v1/protocol.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 517b6c4

Please sign in to comment.