Skip to content

Commit

Permalink
Passing ScheduledStartTimeStamp to backend (#60)
Browse files Browse the repository at this point in the history
Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>
  • Loading branch information
shivamkm07 authored Feb 1, 2024
1 parent fb5c4c0 commit bac0292
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Cascading Terminate and Purge support ([#47](https://github.com/microsoft/durabletask-go/pull/47) and [#63](https://github.com/microsoft/durabletask-go/pull/63)) - by [@shivamkm07](https://github.com/shivamkm07)
- Support for scheduled orchestration starts ([#60](https://github.dev/microsoft/durabletask-go/pull/60)) - by [@shivamkm07](https://github.com/shivamkm07)

### Changed

Expand Down
2 changes: 1 addition & 1 deletion backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat
defer span.End()

tc := helpers.TraceContextFromSpan(span)
e := helpers.NewExecutionStartedEvent(req.Name, req.InstanceId, req.Input, nil, tc)
e := helpers.NewExecutionStartedEvent(req.Name, req.InstanceId, req.Input, nil, tc, req.ScheduledStartTimestamp)
if err := c.be.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand Down
2 changes: 1 addition & 1 deletion backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
ctx, span := helpers.StartNewCreateOrchestrationSpan(ctx, req.Name, req.Version.GetValue(), instanceID)
defer span.End()

e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span))
e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span), req.ScheduledStartTimestamp)
if err := g.backend.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions backend/runtimestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorA
completedAction.Result,
s.startEvent.ParentInstance,
s.startEvent.ParentTraceContext,
nil,
),
)

Expand Down Expand Up @@ -188,6 +189,7 @@ func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorA
createSO.Input,
helpers.NewParentInfo(action.Id, s.startEvent.Name, string(s.instanceID)),
currentTraceContext,
nil,
)
s.pendingMessages = append(s.pendingMessages, OrchestratorMessage{HistoryEvent: startEvent, TargetInstanceID: createSO.InstanceId})
} else if sendEvent := action.GetSendEvent(); sendEvent != nil {
Expand Down
4 changes: 3 additions & 1 deletion internal/helpers/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func NewExecutionStartedEvent(
input *wrapperspb.StringValue,
parent *protos.ParentInstanceInfo,
parentTraceContext *protos.TraceContext,
scheduledStartTimeStamp *timestamppb.Timestamp,
) *protos.HistoryEvent {
return &protos.HistoryEvent{
EventId: -1,
Expand All @@ -32,7 +33,8 @@ func NewExecutionStartedEvent(
InstanceId: instanceId,
ExecutionId: wrapperspb.String(uuid.New().String()),
},
ParentTraceContext: parentTraceContext,
ParentTraceContext: parentTraceContext,
ScheduledStartTimestamp: scheduledStartTimeStamp,
},
},
}
Expand Down
18 changes: 9 additions & 9 deletions tests/runtimestate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func Test_CompletedSubOrchestration(t *testing.T) {

parentInfo := helpers.NewParentInfo(expectedTaskID, "Parent", "parent_id")
s := backend.NewOrchestrationRuntimeState("abc", []*protos.HistoryEvent{
helpers.NewExecutionStartedEvent("Child", "child_id", nil, parentInfo, nil),
helpers.NewExecutionStartedEvent("Child", "child_id", nil, parentInfo, nil, nil),
})

actions := []*protos.OrchestratorAction{
Expand Down Expand Up @@ -160,7 +160,7 @@ func Test_RuntimeState_ContinueAsNew(t *testing.T) {
eventPayload := "MyEventPayload"

state := backend.NewOrchestrationRuntimeState(api.InstanceID(iid), []*protos.HistoryEvent{
helpers.NewExecutionStartedEvent(expectedName, iid, nil, nil, nil),
helpers.NewExecutionStartedEvent(expectedName, iid, nil, nil, nil, nil),
})

carryoverEvents := []*protos.HistoryEvent{helpers.NewEventRaisedEvent(eventName, wrapperspb.String(eventPayload))}
Expand Down Expand Up @@ -207,7 +207,7 @@ func Test_CreateTimer(t *testing.T) {
expectedFireAt := time.Now().UTC().Add(72 * time.Hour)

s := backend.NewOrchestrationRuntimeState(iid, []*protos.HistoryEvent{
helpers.NewExecutionStartedEvent("MyOrchestration", iid, nil, nil, nil),
helpers.NewExecutionStartedEvent("MyOrchestration", iid, nil, nil, nil, nil),
})

var actions []*protos.OrchestratorAction
Expand Down Expand Up @@ -246,7 +246,7 @@ func Test_ScheduleTask(t *testing.T) {
expectedInput := "{\"Foo\":5}"

state := backend.NewOrchestrationRuntimeState(iid, []*protos.HistoryEvent{
helpers.NewExecutionStartedEvent("MyOrchestration", iid, wrapperspb.String(expectedInput), nil, nil),
helpers.NewExecutionStartedEvent("MyOrchestration", iid, wrapperspb.String(expectedInput), nil, nil, nil),
})

actions := []*protos.OrchestratorAction{
Expand Down Expand Up @@ -293,7 +293,7 @@ func Test_CreateSubOrchestration(t *testing.T) {
expectedTraceState := "trace_state"

state := backend.NewOrchestrationRuntimeState(api.InstanceID(iid), []*protos.HistoryEvent{
helpers.NewExecutionStartedEvent("Parent", iid, nil, nil, nil),
helpers.NewExecutionStartedEvent("Parent", iid, nil, nil, nil, nil),
})

actions := []*protos.OrchestratorAction{
Expand Down Expand Up @@ -349,7 +349,7 @@ func Test_SendEvent(t *testing.T) {
expectedInput := "foo"

s := backend.NewOrchestrationRuntimeState("abc", []*protos.HistoryEvent{
helpers.NewExecutionStartedEvent("MyOrchestration", "abc", wrapperspb.String(expectedInput), nil, nil),
helpers.NewExecutionStartedEvent("MyOrchestration", "abc", wrapperspb.String(expectedInput), nil, nil, nil),
})

actions := []*protos.OrchestratorAction{
Expand Down Expand Up @@ -381,7 +381,7 @@ func Test_StateIsValid(t *testing.T) {
s := backend.NewOrchestrationRuntimeState("abc", []*protos.HistoryEvent{})
assert.True(t, s.IsValid())
s = backend.NewOrchestrationRuntimeState("abc", []*protos.HistoryEvent{
helpers.NewExecutionStartedEvent("MyOrchestration", "abc", nil, nil, nil),
helpers.NewExecutionStartedEvent("MyOrchestration", "abc", nil, nil, nil, nil),
})
assert.True(t, s.IsValid())
s = backend.NewOrchestrationRuntimeState("abc", []*protos.HistoryEvent{
Expand All @@ -392,8 +392,8 @@ func Test_StateIsValid(t *testing.T) {

func Test_DuplicateEvents(t *testing.T) {
s := backend.NewOrchestrationRuntimeState("abc", []*protos.HistoryEvent{})
if err := s.AddEvent(helpers.NewExecutionStartedEvent("MyOrchestration", "abc", nil, nil, nil)); assert.NoError(t, err) {
err = s.AddEvent(helpers.NewExecutionStartedEvent("MyOrchestration", "abc", nil, nil, nil))
if err := s.AddEvent(helpers.NewExecutionStartedEvent("MyOrchestration", "abc", nil, nil, nil, nil)); assert.NoError(t, err) {
err = s.AddEvent(helpers.NewExecutionStartedEvent("MyOrchestration", "abc", nil, nil, nil, nil))
assert.ErrorIs(t, err, backend.ErrDuplicateEvent)
} else {
return
Expand Down
4 changes: 2 additions & 2 deletions tests/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Test_Executor_WaitForEventSchedulesTimer(t *testing.T) {
oldEvents := []*protos.HistoryEvent{}
newEvents := []*protos.HistoryEvent{
startEvent,
helpers.NewExecutionStartedEvent("Orchestration", string(iid), nil, nil, nil),
helpers.NewExecutionStartedEvent("Orchestration", string(iid), nil, nil, nil, nil),
}

// Execute the orchestrator function and expect to get back a single timer action
Expand Down Expand Up @@ -57,7 +57,7 @@ func Test_Executor_SuspendStopsAllActions(t *testing.T) {
oldEvents := []*protos.HistoryEvent{}
newEvents := []*protos.HistoryEvent{
helpers.NewOrchestratorStartedEvent(),
helpers.NewExecutionStartedEvent("SuspendResumeOrchestration", string(iid), nil, nil, nil),
helpers.NewExecutionStartedEvent("SuspendResumeOrchestration", string(iid), nil, nil, nil, nil),
helpers.NewSuspendOrchestrationEvent(""),
}

Expand Down
4 changes: 2 additions & 2 deletions tests/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Test_TryProcessSingleOrchestrationWorkItem_BasicFlow(t *testing.T) {
ctx := context.Background()
wi := &backend.OrchestrationWorkItem{
InstanceID: "test123",
NewEvents: []*protos.HistoryEvent{helpers.NewExecutionStartedEvent("MyOrch", "test123", nil, nil, nil)},
NewEvents: []*protos.HistoryEvent{helpers.NewExecutionStartedEvent("MyOrch", "test123", nil, nil, nil, nil)},
}
state := &backend.OrchestrationRuntimeState{}
result := &backend.ExecutionResults{Response: &protos.OrchestratorResponse{}}
Expand Down Expand Up @@ -61,7 +61,7 @@ func Test_TryProcessSingleOrchestrationWorkItem_ExecutionStartedAndCompleted(t *
iid := api.InstanceID("test123")

// Simulate getting an ExecutionStarted message from the orchestration queue
startEvent := helpers.NewExecutionStartedEvent("MyOrchestration", string(iid), nil, nil, nil)
startEvent := helpers.NewExecutionStartedEvent("MyOrchestration", string(iid), nil, nil, nil, nil)
wi := &backend.OrchestrationWorkItem{
InstanceID: iid,
NewEvents: []*protos.HistoryEvent{startEvent},
Expand Down

0 comments on commit bac0292

Please sign in to comment.