diff --git a/model/event_state.go b/model/event_state.go new file mode 100644 index 0000000..ede11f9 --- /dev/null +++ b/model/event_state.go @@ -0,0 +1,86 @@ +// Copyright 2022 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "encoding/json" +) + +// EventState used to wait for events from event sources, then consumes them and invoke one or more actions to run in sequence or parallel +type EventState struct { + // TODO: EventState doesn't have usedForCompensation field. + BaseState + + // If true consuming one of the defined events causes its associated actions to be performed. If false all of the defined events must be consumed in order for actions to be performed + // Defaults to true + Exclusive bool `json:"exclusive,omitempty"` + // Define the events to be consumed and optional actions to be performed + OnEvents []OnEvents `json:"onEvents" validate:"required,min=1,dive"` + // State specific timeouts + Timeout *EventStateTimeout `json:"timeouts,omitempty"` +} + +type eventStateForUnmarshal EventState + +// UnmarshalJSON unmarshal EventState object from json bytes +func (e *EventState) UnmarshalJSON(data []byte) error { + v := eventStateForUnmarshal{ + Exclusive: true, + } + err := json.Unmarshal(data, &v) + if err != nil { + return err + } + + *e = EventState(v) + return nil +} + +// OnEvents define which actions are be be performed for the one or more events. +type OnEvents struct { + // References one or more unique event names in the defined workflow events + EventRefs []string `json:"eventRefs" validate:"required,min=1"` + // Specifies how actions are to be performed (in sequence or parallel) + // Defaults to sequential + ActionMode ActionMode `json:"actionMode,omitempty" validate:"required,oneof=sequential parallel"` + // Actions to be performed if expression matches + Actions []Action `json:"actions,omitempty" validate:"omitempty,dive"` + // Event data filter + EventDataFilter EventDataFilter `json:"eventDataFilter,omitempty"` +} + +type onEventsForUnmarshal OnEvents + +// UnmarshalJSON unmarshal OnEvents object from json bytes +func (o *OnEvents) UnmarshalJSON(data []byte) error { + v := onEventsForUnmarshal{ + ActionMode: ActionModeSequential, + } + + err := json.Unmarshal(data, &v) + if err != nil { + return err + } + + *o = OnEvents(v) + return nil +} + +// EventStateTimeout defines timeout settings for event state +type EventStateTimeout struct { + StateExecTimeout StateExecTimeout `json:"stateExecTimeout,omitempty"` + ActionExecTimeout string `json:"actionExecTimeout,omitempty" validate:"omitempty,iso8601duration"` + EventTimeout string `json:"eventTimeout,omitempty" validate:"omitempty,iso8601duration"` +} diff --git a/model/event_state_test.go b/model/event_state_test.go new file mode 100644 index 0000000..b203373 --- /dev/null +++ b/model/event_state_test.go @@ -0,0 +1,148 @@ +// Copyright 2022 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEventStateUnmarshalJSON(t *testing.T) { + type testCase struct { + desp string + data string + expect EventState + err string + } + testCases := []testCase{ + { + desp: "all fields set", + data: `{"name": "1", "Type": "event", "exclusive": false, "onEvents": [{"eventRefs": ["E1", "E2"], "actionMode": "parallel"}], "timeouts": {"actionExecTimeout": "PT5M", "eventTimeout": "PT5M", "stateExecTimeout": "PT5M"}}`, + expect: EventState{ + BaseState: BaseState{ + Name: "1", + Type: StateTypeEvent, + }, + Exclusive: false, + OnEvents: []OnEvents{ + { + EventRefs: []string{"E1", "E2"}, + ActionMode: "parallel", + }, + }, + Timeout: &EventStateTimeout{ + EventTimeout: "PT5M", + ActionExecTimeout: "PT5M", + StateExecTimeout: StateExecTimeout{ + Total: "PT5M", + }, + }, + }, + err: ``, + }, + { + desp: "default exclusive", + data: `{"name": "1", "Type": "event", "onEvents": [{"eventRefs": ["E1", "E2"], "actionMode": "parallel"}], "timeouts": {"actionExecTimeout": "PT5M", "eventTimeout": "PT5M", "stateExecTimeout": "PT5M"}}`, + expect: EventState{ + BaseState: BaseState{ + Name: "1", + Type: StateTypeEvent, + }, + Exclusive: true, + OnEvents: []OnEvents{ + { + EventRefs: []string{"E1", "E2"}, + ActionMode: "parallel", + }, + }, + Timeout: &EventStateTimeout{ + EventTimeout: "PT5M", + ActionExecTimeout: "PT5M", + StateExecTimeout: StateExecTimeout{ + Total: "PT5M", + }, + }, + }, + err: ``, + }, + } + for _, tc := range testCases { + t.Run(tc.desp, func(t *testing.T) { + v := EventState{} + err := json.Unmarshal([]byte(tc.data), &v) + + if tc.err != "" { + assert.Error(t, err) + assert.Regexp(t, tc.err, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tc.expect, v) + }) + } +} + +func TestOnEventsUnmarshalJSON(t *testing.T) { + type testCase struct { + desp string + data string + expect OnEvents + err string + } + testCases := []testCase{ + { + desp: "all fields set", + data: `{"eventRefs": ["E1", "E2"], "actionMode": "parallel"}`, + expect: OnEvents{ + EventRefs: []string{"E1", "E2"}, + ActionMode: ActionModeParallel, + }, + err: ``, + }, + { + desp: "default action mode", + data: `{"eventRefs": ["E1", "E2"]}`, + expect: OnEvents{ + EventRefs: []string{"E1", "E2"}, + ActionMode: ActionModeSequential, + }, + err: ``, + }, + { + desp: "invalid object format", + data: `"eventRefs": ["E1", "E2"], "actionMode": "parallel"}`, + expect: OnEvents{}, + err: `invalid character ':' after top-level value`, + }, + } + for _, tc := range testCases { + t.Run(tc.desp, func(t *testing.T) { + v := OnEvents{} + err := json.Unmarshal([]byte(tc.data), &v) + + if tc.err != "" { + assert.Error(t, err) + assert.Regexp(t, tc.err, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tc.expect, v) + }) + } +} diff --git a/model/states.go b/model/states.go index c82e141..b4e876f 100644 --- a/model/states.go +++ b/model/states.go @@ -126,58 +126,6 @@ func (s *BaseState) GetStateDataFilter() *StateDataFilter { return s.StateDataFi // GetMetadata ... func (s *BaseState) GetMetadata() *Metadata { return s.Metadata } -// EventState This state is used to wait for events from event sources, then consumes them and invoke one or more actions to run in sequence or parallel -type EventState struct { - BaseState - // If true consuming one of the defined events causes its associated actions to be performed. If false all of the defined events must be consumed in order for actions to be performed - Exclusive bool `json:"exclusive,omitempty"` - // Define the events to be consumed and optional actions to be performed - OnEvents []OnEvents `json:"onEvents" validate:"required,min=1,dive"` - // State specific timeouts - Timeout *EventStateTimeout `json:"timeouts,omitempty"` -} - -// UnmarshalJSON ... -func (e *EventState) UnmarshalJSON(data []byte) error { - if err := json.Unmarshal(data, &e.BaseState); err != nil { - return err - } - - eventStateMap := make(map[string]interface{}) - if err := json.Unmarshal(data, &eventStateMap); err != nil { - return err - } - - e.Exclusive = true - - if eventStateMap["exclusive"] != nil { - exclusiveVal, ok := eventStateMap["exclusive"].(bool) - if ok { - e.Exclusive = exclusiveVal - } - } - - eventStateRaw := make(map[string]json.RawMessage) - if err := json.Unmarshal(data, &eventStateRaw); err != nil { - return err - } - if err := json.Unmarshal(eventStateRaw["onEvents"], &e.OnEvents); err != nil { - return err - } - if err := unmarshalKey("timeouts", eventStateRaw, &e.Timeout); err != nil { - return err - } - - return nil -} - -// EventStateTimeout ... -type EventStateTimeout struct { - StateExecTimeout StateExecTimeout `json:"stateExecTimeout,omitempty"` - ActionExecTimeout string `json:"actionExecTimeout,omitempty"` - EventTimeout string `json:"eventTimeout,omitempty"` -} - // OperationState Defines actions be performed. Does not wait for incoming events type OperationState struct { BaseState @@ -244,6 +192,7 @@ type ForEachState struct { // State specific timeout Timeouts *ForEachStateTimeout `json:"timeouts,omitempty"` // Mode Specifies how iterations are to be performed (sequentially or in parallel) + // Defaults to parallel Mode ForEachModeType `json:"mode,omitempty"` } diff --git a/model/workflow.go b/model/workflow.go index 4d49590..7d237cd 100644 --- a/model/workflow.go +++ b/model/workflow.go @@ -17,9 +17,10 @@ package model import ( "encoding/json" "fmt" + "reflect" + "github.com/go-playground/validator/v10" val "github.com/serverlessworkflow/sdk-go/v2/validator" - "reflect" ) // InvokeKind defines how the target is invoked. @@ -33,13 +34,21 @@ const ( InvokeKindAsync InvokeKind = "async" ) +// ActionMode specifies how actions are to be performed. +type ActionMode string + const ( - // DefaultExpressionLang ... - DefaultExpressionLang = "jq" - // ActionModeSequential ... + // ActionModeSequential specifies actions should be performed in sequence ActionModeSequential ActionMode = "sequential" - // ActionModeParallel ... + + // ActionModeParallel specifies actions should be performed in parallel ActionModeParallel ActionMode = "parallel" +) + +const ( + // DefaultExpressionLang ... + DefaultExpressionLang = "jq" + // UnlimitedTimeout description for unlimited timeouts UnlimitedTimeout = "unlimited" ) @@ -75,9 +84,6 @@ func continueAsStructLevelValidation(structLevel validator.StructLevel) { } } -// ActionMode ... -type ActionMode string - // BaseWorkflow describes the partial Workflow definition that does not rely on generic interfaces // to make it easy for custom unmarshalers implementations to unmarshal the common data structure. type BaseWorkflow struct { @@ -462,18 +468,6 @@ type OnError struct { End *End `json:"end,omitempty"` } -// OnEvents ... -type OnEvents struct { - // References one or more unique event names in the defined workflow events - EventRefs []string `json:"eventRefs" validate:"required,min=1"` - // Specifies how actions are to be performed (in sequence of parallel) - ActionMode ActionMode `json:"actionMode,omitempty"` - // Actions to be performed if expression matches - Actions []Action `json:"actions,omitempty" validate:"omitempty,dive"` - // Event data filter - EventDataFilter EventDataFilter `json:"eventDataFilter,omitempty"` -} - // End definition type End struct { // If true, completes all execution flows in the given workflow instance