Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FirstRunAt field to start workflow option logic #6178

Merged
merged 12 commits into from
Jul 26, 2024
Merged
208 changes: 200 additions & 8 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

767 changes: 385 additions & 382 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

767 changes: 385 additions & 382 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

469 changes: 235 additions & 234 deletions .gen/proto/shared/v1/history.pb.yarpc.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/startreedata/pinot-client-go v0.2.0 // latest release supports pinot v0.12.0 which is also internal version
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible // indirect
github.com/uber/cadence-idl v0.0.0-20240627204638-12f43fe756a0
github.com/uber/cadence-idl v0.0.0-20240723221048-0482c040f91d
github.com/uber/ringpop-go v0.8.5 // indirect
github.com/uber/tchannel-go v1.22.2 // indirect
github.com/urfave/cli v1.22.4
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240627204638-12f43fe756a0 h1:r4ZCsIfOVK06jnr8nBh9mR8Npxunh7aoldONrz6Kb9o=
github.com/uber/cadence-idl v0.0.0-20240627204638-12f43fe756a0/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240723221048-0482c040f91d h1:1dX3Pr0wEW0TQFhj0lwCJPuYUtd7pOhScbiiwNiL1Tw=
github.com/uber/cadence-idl v0.0.0-20240723221048-0482c040f91d/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
6 changes: 6 additions & 0 deletions common/types/mapper/proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3386,6 +3386,7 @@ func FromSignalWithStartWorkflowExecutionRequest(t *types.SignalWithStartWorkflo
Header: FromHeader(t.Header),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
FirstRunAt: unixNanoToTime(t.FirstRunAtTimestamp),
timl3136 marked this conversation as resolved.
Show resolved Hide resolved
},
SignalName: t.SignalName,
SignalInput: FromPayload(t.SignalInput),
Expand Down Expand Up @@ -3418,6 +3419,7 @@ func ToSignalWithStartWorkflowExecutionRequest(t *apiv1.SignalWithStartWorkflowE
Header: ToHeader(t.StartRequest.Header),
DelayStartSeconds: durationToSeconds(t.StartRequest.DelayStart),
JitterStartSeconds: durationToSeconds(t.StartRequest.JitterStart),
FirstRunAtTimestamp: timeToUnixNano(t.StartRequest.FirstRunAt),
}
}

Expand Down Expand Up @@ -3568,6 +3570,7 @@ func FromStartChildWorkflowExecutionInitiatedEventAttributes(t *types.StartChild
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
FirstRunAt: unixNanoToTime(t.FirstRunAtTimestamp),
}
}

Expand All @@ -3594,6 +3597,7 @@ func ToStartChildWorkflowExecutionInitiatedEventAttributes(t *apiv1.StartChildWo
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
DelayStartSeconds: durationToSeconds(t.DelayStart),
JitterStartSeconds: durationToSeconds(t.JitterStart),
FirstRunAtTimestamp: timeToUnixNano(t.FirstRunAt),
}
}

Expand Down Expand Up @@ -3743,6 +3747,7 @@ func FromStartWorkflowExecutionRequest(t *types.StartWorkflowExecutionRequest) *
Header: FromHeader(t.Header),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
FirstRunAt: unixNanoToTime(t.FirstRunAtTimeStamp),
}
}

Expand All @@ -3768,6 +3773,7 @@ func ToStartWorkflowExecutionRequest(t *apiv1.StartWorkflowExecutionRequest) *ty
Header: ToHeader(t.Header),
DelayStartSeconds: durationToSeconds(t.DelayStart),
JitterStartSeconds: durationToSeconds(t.JitterStart),
FirstRunAtTimeStamp: timeToUnixNano(t.FirstRunAt),
}
}

Expand Down
6 changes: 6 additions & 0 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -5046,6 +5046,7 @@ func FromSignalWithStartWorkflowExecutionRequest(t *types.SignalWithStartWorkflo
Memo: FromMemo(t.Memo),
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
Header: FromHeader(t.Header),
FirstRunAtTimestamp: t.FirstRunAtTimestamp,
}
}

Expand Down Expand Up @@ -5073,6 +5074,7 @@ func ToSignalWithStartWorkflowExecutionRequest(t *shared.SignalWithStartWorkflow
Memo: ToMemo(t.Memo),
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
Header: ToHeader(t.Header),
FirstRunAtTimestamp: t.FirstRunAtTimestamp,
}
}

Expand Down Expand Up @@ -5210,6 +5212,7 @@ func FromStartChildWorkflowExecutionInitiatedEventAttributes(t *types.StartChild
Header: FromHeader(t.Header),
Memo: FromMemo(t.Memo),
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
FirstRunAtTimestamp: t.FirstRunAtTimestamp,
}
}

Expand All @@ -5235,6 +5238,7 @@ func ToStartChildWorkflowExecutionInitiatedEventAttributes(t *shared.StartChildW
Header: ToHeader(t.Header),
Memo: ToMemo(t.Memo),
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
FirstRunAtTimestamp: t.FirstRunAtTimestamp,
}
}

Expand Down Expand Up @@ -5369,6 +5373,7 @@ func FromStartWorkflowExecutionRequest(t *types.StartWorkflowExecutionRequest) *
Header: FromHeader(t.Header),
DelayStartSeconds: t.DelayStartSeconds,
JitterStartSeconds: t.JitterStartSeconds,
FirstRunAtTimestamp: t.FirstRunAtTimeStamp,
}
}

Expand All @@ -5395,6 +5400,7 @@ func ToStartWorkflowExecutionRequest(t *shared.StartWorkflowExecutionRequest) *t
Header: ToHeader(t.Header),
DelayStartSeconds: t.DelayStartSeconds,
JitterStartSeconds: t.JitterStartSeconds,
FirstRunAtTimeStamp: t.FirstRunAtTimestamp,
timl3136 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
11 changes: 11 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -5932,6 +5932,7 @@ type SignalWithStartWorkflowExecutionRequest struct {
Header *Header `json:"header,omitempty"`
DelayStartSeconds *int32 `json:"delayStartSeconds,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
FirstRunAtTimestamp *int64 `json:"firstRunAtTimestamp,omitempty"`
}

func (v *SignalWithStartWorkflowExecutionRequest) SerializeForLogging() (string, error) {
Expand Down Expand Up @@ -6226,6 +6227,7 @@ type StartChildWorkflowExecutionInitiatedEventAttributes struct {
SearchAttributes *SearchAttributes `json:"searchAttributes,omitempty"`
DelayStartSeconds *int32 `json:"delayStartSeconds,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
FirstRunAtTimestamp *int64 `json:"firstRunAtTimestamp,omitempty"`
}

// GetDomain is an internal getter (TBD...)
Expand Down Expand Up @@ -6352,6 +6354,7 @@ type StartWorkflowExecutionRequest struct {
Header *Header `json:"header,omitempty"`
DelayStartSeconds *int32 `json:"delayStartSeconds,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
FirstRunAtTimeStamp *int64 `json:"firstRunAtTimeStamp,omitempty"`
}

func (v *StartWorkflowExecutionRequest) SerializeForLogging() (string, error) {
Expand Down Expand Up @@ -6409,6 +6412,14 @@ func (v *StartWorkflowExecutionRequest) GetJitterStartSeconds() (o int32) {
return
}

// GetFirstRunAtTimeStamp is an internal getter (TBD...)
func (v *StartWorkflowExecutionRequest) GetFirstRunAtTimeStamp() (o int64) {
if v != nil && v.FirstRunAtTimeStamp != nil {
return *v.FirstRunAtTimeStamp
}
return
}

// GetRequestID is an internal getter (TBD...)
func (v *StartWorkflowExecutionRequest) GetRequestID() (o string) {
if v != nil {
Expand Down
24 changes: 16 additions & 8 deletions common/types/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestSignalWithStartWorkflowExecutionRequestSerializeForLogging(t *testing.T
}{
"complete request without error": {
input: createNewSignalWithStartWorkflowExecutionRequest(),
expectedOutput: "{\"domain\":\"testDomain\",\"workflowId\":\"testWorkflowID\",\"workflowType\":{\"name\":\"testWorkflowType\"},\"taskList\":{\"name\":\"testTaskList\",\"kind\":\"STICKY\"},\"executionStartToCloseTimeoutSeconds\":1,\"taskStartToCloseTimeoutSeconds\":1,\"identity\":\"testIdentity\",\"requestId\":\"DF66E35D-A5B0-425D-8731-6AAC4A4B6368\",\"workflowIdReusePolicy\":\"AllowDuplicate\",\"signalName\":\"testRequest\",\"control\":\"dGVzdENvbnRyb2w=\",\"retryPolicy\":{\"initialIntervalInSeconds\":1,\"backoffCoefficient\":1,\"maximumIntervalInSeconds\":1,\"maximumAttempts\":1,\"nonRetriableErrorReasons\":[\"testArray\"],\"expirationIntervalInSeconds\":1},\"cronSchedule\":\"testSchedule\",\"header\":{},\"delayStartSeconds\":1,\"jitterStartSeconds\":1}",
expectedOutput: "{\"domain\":\"testDomain\",\"workflowId\":\"testWorkflowID\",\"workflowType\":{\"name\":\"testWorkflowType\"},\"taskList\":{\"name\":\"testTaskList\",\"kind\":\"STICKY\"},\"executionStartToCloseTimeoutSeconds\":1,\"taskStartToCloseTimeoutSeconds\":1,\"identity\":\"testIdentity\",\"requestId\":\"DF66E35D-A5B0-425D-8731-6AAC4A4B6368\",\"workflowIdReusePolicy\":\"AllowDuplicate\",\"signalName\":\"testRequest\",\"control\":\"dGVzdENvbnRyb2w=\",\"retryPolicy\":{\"initialIntervalInSeconds\":1,\"backoffCoefficient\":1,\"maximumIntervalInSeconds\":1,\"maximumAttempts\":1,\"nonRetriableErrorReasons\":[\"testArray\"],\"expirationIntervalInSeconds\":1},\"cronSchedule\":\"testSchedule\",\"header\":{},\"delayStartSeconds\":1,\"jitterStartSeconds\":1,\"firstRunAtTimestamp\":1}",
expectedErrorOutput: nil,
},

Expand Down Expand Up @@ -187,7 +187,13 @@ func TestSerializeRequest(t *testing.T) {
testReq := createNewSignalWithStartWorkflowExecutionRequest()
serializeRes, err := SerializeRequest(testReq)

expectRes := "{\"domain\":\"testDomain\",\"workflowId\":\"testWorkflowID\",\"workflowType\":{\"name\":\"testWorkflowType\"},\"taskList\":{\"name\":\"testTaskList\",\"kind\":\"STICKY\"},\"executionStartToCloseTimeoutSeconds\":1,\"taskStartToCloseTimeoutSeconds\":1,\"identity\":\"testIdentity\",\"requestId\":\"DF66E35D-A5B0-425D-8731-6AAC4A4B6368\",\"workflowIdReusePolicy\":\"AllowDuplicate\",\"signalName\":\"testRequest\",\"control\":\"dGVzdENvbnRyb2w=\",\"retryPolicy\":{\"initialIntervalInSeconds\":1,\"backoffCoefficient\":1,\"maximumIntervalInSeconds\":1,\"maximumAttempts\":1,\"nonRetriableErrorReasons\":[\"testArray\"],\"expirationIntervalInSeconds\":1},\"cronSchedule\":\"testSchedule\",\"header\":{},\"delayStartSeconds\":1,\"jitterStartSeconds\":1}"
expectRes := "{\"domain\":\"testDomain\",\"workflowId\":\"testWorkflowID\",\"workflowType\":{\"name\":\"testWorkflowType\"}," +
"\"taskList\":{\"name\":\"testTaskList\",\"kind\":\"STICKY\"},\"executionStartToCloseTimeoutSeconds\":1," +
"\"taskStartToCloseTimeoutSeconds\":1,\"identity\":\"testIdentity\",\"requestId\":\"DF66E35D-A5B0-425D-8731-6AAC4A4B6368\"," +
"\"workflowIdReusePolicy\":\"AllowDuplicate\",\"signalName\":\"testRequest\",\"control\":\"dGVzdENvbnRyb2w=\"," +
"\"retryPolicy\":{\"initialIntervalInSeconds\":1,\"backoffCoefficient\":1,\"maximumIntervalInSeconds\":1," +
"\"maximumAttempts\":1,\"nonRetriableErrorReasons\":[\"testArray\"],\"expirationIntervalInSeconds\":1}," +
"\"cronSchedule\":\"testSchedule\",\"header\":{},\"delayStartSeconds\":1,\"jitterStartSeconds\":1,\"firstRunAtTimestamp\":1}"
expectErr := error(nil)

assert.Equal(t, expectRes, serializeRes)
Expand All @@ -206,6 +212,7 @@ func createNewSignalWithStartWorkflowExecutionRequest() *SignalWithStartWorkflow
testWorkflowIDReusePolicy := WorkflowIDReusePolicy(1)
testDelayStartSeconds := int32(1)
testJitterStartSeconds := int32(1)
testFirstRunAtTimestamp := int64(1)
piiTestArray := []byte("testInputPII")
piiTestMap := make(map[string][]byte)
piiTestMap["PII"] = piiTestArray
Expand Down Expand Up @@ -235,12 +242,13 @@ func createNewSignalWithStartWorkflowExecutionRequest() *SignalWithStartWorkflow
NonRetriableErrorReasons: []string{"testArray"},
ExpirationIntervalInSeconds: 1,
},
CronSchedule: "testSchedule",
Memo: &Memo{Fields: piiTestMap},
SearchAttributes: &SearchAttributes{IndexedFields: piiTestMap},
Header: &Header{Fields: map[string][]byte{}},
DelayStartSeconds: &testDelayStartSeconds,
JitterStartSeconds: &testJitterStartSeconds,
CronSchedule: "testSchedule",
Memo: &Memo{Fields: piiTestMap},
SearchAttributes: &SearchAttributes{IndexedFields: piiTestMap},
Header: &Header{Fields: map[string][]byte{}},
DelayStartSeconds: &testDelayStartSeconds,
JitterStartSeconds: &testJitterStartSeconds,
FirstRunAtTimestamp: &testFirstRunAtTimestamp,
}
return testReq
}
Expand Down
1 change: 1 addition & 0 deletions common/types/testdata/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ var (
Header: &Header,
Memo: &Memo,
SearchAttributes: &SearchAttributes,
FirstRunAtTimestamp: &Timestamp1,
}
StartChildWorkflowExecutionFailedEventAttributes = types.StartChildWorkflowExecutionFailedEventAttributes{
Domain: DomainName,
Expand Down
2 changes: 2 additions & 0 deletions common/types/testdata/service_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ var (
Memo: &Memo,
SearchAttributes: &SearchAttributes,
Header: &Header,
FirstRunAtTimeStamp: &Timestamp1,
}
StartWorkflowExecutionResponse = types.StartWorkflowExecutionResponse{
RunID: RunID,
Expand Down Expand Up @@ -356,6 +357,7 @@ var (
Memo: &Memo,
SearchAttributes: &SearchAttributes,
Header: &Header,
FirstRunAtTimestamp: &Timestamp1,
}
SignalWithStartWorkflowExecutionAsyncRequest = types.SignalWithStartWorkflowExecutionAsyncRequest{
SignalWithStartWorkflowExecutionRequest: &SignalWithStartWorkflowExecutionRequest,
Expand Down
40 changes: 26 additions & 14 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,22 +547,34 @@ func CreateHistoryStartWorkflowRequest(

delayStartSeconds := startRequest.GetDelayStartSeconds()
jitterStartSeconds := startRequest.GetJitterStartSeconds()
firstRunAtTimestamp := startRequest.GetFirstRunAtTimeStamp()

firstDecisionTaskBackoffSeconds := delayStartSeconds
if len(startRequest.GetCronSchedule()) > 0 {
delayedStartTime := now.Add(time.Second * time.Duration(delayStartSeconds))
var err error
firstDecisionTaskBackoffSeconds, err = backoff.GetBackoffForNextScheduleInSeconds(
startRequest.GetCronSchedule(), delayedStartTime, delayedStartTime, jitterStartSeconds)
if err != nil {
return nil, err
}

// backoff seconds was calculated based on delayed start time, so we need to
// add the delayStartSeconds to that backoff.
firstDecisionTaskBackoffSeconds += delayStartSeconds
} else if jitterStartSeconds > 0 {
// Add a random jitter to start time, if requested.
firstDecisionTaskBackoffSeconds += rand.Int31n(jitterStartSeconds + 1)
// if the user specified a timestamp for the first run, we will use that as the start time,
// ignoring the delayStartSeconds, jitterStartSeconds, and cronSchedule
// The following condition guarantees two things:
// - The logic is only triggered when the user specifies a first run timestamp
// - AND that timestamp is only triggered ONCE hence not interfering with other scheduling logic
if firstRunAtTimestamp > now.UnixNano() {
firstDecisionTaskBackoffSeconds = int32((firstRunAtTimestamp - now.UnixNano()) / int64(time.Second))
} else {
if len(startRequest.GetCronSchedule()) > 0 {
delayedStartTime := now.Add(time.Second * time.Duration(delayStartSeconds))
var err error
firstDecisionTaskBackoffSeconds, err = backoff.GetBackoffForNextScheduleInSeconds(
startRequest.GetCronSchedule(), delayedStartTime, delayedStartTime, jitterStartSeconds)
if err != nil {
return nil, err
}

// backoff seconds was calculated based on delayed start time, so we need to
// add the delayStartSeconds to that backoff.
firstDecisionTaskBackoffSeconds += delayStartSeconds
} else if jitterStartSeconds > 0 {
// Add a random jitter to start time, if requested.
firstDecisionTaskBackoffSeconds += rand.Int31n(jitterStartSeconds + 1)
}
}

histRequest.FirstDecisionTaskBackoffSeconds = Int32Ptr(firstDecisionTaskBackoffSeconds)
Expand Down
Loading
Loading