Skip to content

Commit

Permalink
Use a prefix for scheduler workflow ids (#3201)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored and yycptt committed Aug 12, 2022
1 parent dae8a51 commit 51817aa
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
37 changes: 25 additions & 12 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"encoding/binary"
"fmt"
"strings"
"sync/atomic"
"time"
"unicode/utf8"
Expand Down Expand Up @@ -3012,8 +3013,9 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
return nil, errSchedulesNotAllowed
}

// a schedule id is a workflow id so validate it the same way
if err := wh.validateWorkflowID(request.ScheduleId); err != nil {
workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

if err := wh.validateWorkflowID(workflowID); err != nil {
return nil, err
}

Expand Down Expand Up @@ -3103,7 +3105,7 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
// Create StartWorkflowExecutionRequest
startReq := &workflowservice.StartWorkflowExecutionRequest{
Namespace: request.Namespace,
WorkflowId: request.ScheduleId,
WorkflowId: workflowID,
WorkflowType: &commonpb.WorkflowType{Name: scheduler.WorkflowType},
TaskQueue: &taskqueuepb.TaskQueue{Name: workercommon.PerNSWorkerTaskQueue},
Input: inputPayload,
Expand Down Expand Up @@ -3150,7 +3152,8 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
return nil, err
}

execution := &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId}
workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId
execution := &commonpb.WorkflowExecution{WorkflowId: workflowID}

// first describe to get memo and search attributes
describeResponse, err := wh.historyClient.DescribeWorkflowExecution(ctx, &historyservice.DescribeWorkflowExecutionRequest{
Expand Down Expand Up @@ -3314,6 +3317,8 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow
return nil, errRequestIDTooLong
}

workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
Expand All @@ -3334,7 +3339,7 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow
sizeLimitWarn,
sizeLimitError,
namespaceID.String(),
request.GetScheduleId(),
workflowID,
"", // don't have runid yet
wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
wh.throttledLogger,
Expand All @@ -3347,7 +3352,7 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow
NamespaceId: namespaceID.String(),
SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{
Namespace: request.Namespace,
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId},
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID},
SignalName: scheduler.SignalNameUpdate,
Input: inputPayloads,
Identity: request.Identity,
Expand Down Expand Up @@ -3385,6 +3390,8 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows
return nil, errRequestIDTooLong
}

workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
Expand All @@ -3404,7 +3411,7 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows
sizeLimitWarn,
sizeLimitError,
namespaceID.String(),
request.GetScheduleId(),
workflowID,
"", // don't have runid yet
wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
wh.throttledLogger,
Expand All @@ -3417,7 +3424,7 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows
NamespaceId: namespaceID.String(),
SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{
Namespace: request.Namespace,
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId},
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID},
SignalName: scheduler.SignalNamePatch,
Input: inputPayloads,
Identity: request.Identity,
Expand Down Expand Up @@ -3451,6 +3458,8 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques
return nil, errSchedulesNotAllowed
}

workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
Expand All @@ -3468,7 +3477,7 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques
sizeLimitWarn,
sizeLimitError,
namespaceID.String(),
request.ScheduleId,
workflowID,
"",
wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
wh.throttledLogger,
Expand All @@ -3480,7 +3489,7 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques
NamespaceId: namespaceID.String(),
Request: &workflowservice.QueryWorkflowRequest{
Namespace: request.Namespace,
Execution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId},
Execution: &commonpb.WorkflowExecution{WorkflowId: workflowID},
Query: &querypb.WorkflowQuery{
QueryType: scheduler.QueryNameListMatchingTimes,
QueryArgs: queryPayload,
Expand Down Expand Up @@ -3521,6 +3530,8 @@ func (wh *WorkflowHandler) DeleteSchedule(ctx context.Context, request *workflow
return nil, errSchedulesNotAllowed
}

workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
Expand All @@ -3530,7 +3541,7 @@ func (wh *WorkflowHandler) DeleteSchedule(ctx context.Context, request *workflow
NamespaceId: namespaceID.String(),
TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{
Namespace: request.Namespace,
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId},
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID},
Reason: "terminated by DeleteSchedule",
Identity: request.Identity,
},
Expand Down Expand Up @@ -3601,8 +3612,10 @@ func (wh *WorkflowHandler) ListSchedules(ctx context.Context, request *workflows
info := wh.decodeScheduleListInfo(searchAttributes)
searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes)
memo := wh.cleanScheduleMemo(ex.GetMemo())
workflowID := ex.GetExecution().GetWorkflowId()
scheduleID := strings.TrimPrefix(workflowID, scheduler.WorkflowIDPrefix)
schedules[i] = &schedpb.ScheduleListEntry{
ScheduleId: ex.GetExecution().GetWorkflowId(),
ScheduleId: scheduleID,
Memo: memo,
SearchAttributes: searchAttributes,
Info: info,
Expand Down
3 changes: 3 additions & 0 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ import (
)

const (
// Schedules are implemented by a workflow whose ID is this string plus the schedule ID.
WorkflowIDPrefix = "temporal-sys-scheduler:"

// This is an example of a timestamp that's appended to the workflow
// id, used for validation in the frontend.
AppendedTimestampForValidation = "-2009-11-10T23:00:00Z"
Expand Down

0 comments on commit 51817aa

Please sign in to comment.