Skip to content

Commit

Permalink
Add ttl for sticky task list
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Jan 20, 2018
1 parent 48e40e2 commit dc80840
Show file tree
Hide file tree
Showing 16 changed files with 372 additions and 41 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

188 changes: 185 additions & 3 deletions .gen/go/shared/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func TimeoutTypePtr(t s.TimeoutType) *s.TimeoutType {
return &t
}

// TaskListKindPtr makes a copy and returns the pointer to a TaskListKind.
func TaskListKindPtr(t s.TaskListKind) *s.TaskListKind {
return &t
}

// DecisionTaskFailedCausePtr makes a copy and returns the pointer to a DecisionTaskFailedCause.
func DecisionTaskFailedCausePtr(t s.DecisionTaskFailedCause) *s.DecisionTaskFailedCause {
return &t
Expand Down
56 changes: 53 additions & 3 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const (

// minimum current execution retention TTL when current execution is deleted, in seconds
minCurrentExecutionRetentionTTL = int32(24 * time.Hour / time.Second)

stickyTaskListTTL = int32(86400) // if sticky task_list stopped being updated, remove it in one day
)

const (
Expand Down Expand Up @@ -208,7 +210,8 @@ const (
`domain_id: ?, ` +
`name: ?, ` +
`type: ?, ` +
`ack_level: ? ` +
`ack_level: ?, ` +
`kind: ? ` +
`}`

templateTaskType = `{` +
Expand Down Expand Up @@ -527,6 +530,16 @@ const (
`and type = ? ` +
`and task_id = ? ` +
`IF range_id = ?`

templateUpdateTaskListQueryWithTTL = `INSERT INTO tasks (` +
`domain_id, ` +
`task_list_name, ` +
`task_list_type, ` +
`type, ` +
`task_id, ` +
`range_id, ` +
`task_list ` +
`) VALUES (?, ?, ?, ?, ?, ?, ` + templateTaskListType + `) USING TTL ?`
)

var (
Expand Down Expand Up @@ -1380,7 +1393,9 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
request.DomainID,
request.TaskList,
request.TaskType,
0)
0,
request.TaskListKind,
)
} else if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("LeaseTaskList operation failed. TaskList: %v, TaskType: %v, Error: %v",
Expand All @@ -1394,12 +1409,14 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
}
} else {
ackLevel = tlDB["ack_level"].(int64)
taskListKind := tlDB["kind"].(int)
query = d.session.Query(templateUpdateTaskListQuery,
rangeID+1,
request.DomainID,
&request.TaskList,
request.TaskType,
ackLevel,
taskListKind,
request.DomainID,
&request.TaskList,
request.TaskType,
Expand All @@ -1426,20 +1443,51 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
Msg: fmt.Sprintf("LeaseTaskList failed to apply. db rangeID %v", previousRangeID),
}
}
tli := &TaskListInfo{DomainID: request.DomainID, Name: request.TaskList, TaskType: request.TaskType, RangeID: rangeID + 1, AckLevel: ackLevel}
tli := &TaskListInfo{DomainID: request.DomainID, Name: request.TaskList, TaskType: request.TaskType, RangeID: rangeID + 1, AckLevel: ackLevel, Kind: request.TaskListKind}
return &LeaseTaskListResponse{TaskListInfo: tli}, nil
}

// From TaskManager interface
func (d *cassandraPersistence) UpdateTaskList(request *UpdateTaskListRequest) (*UpdateTaskListResponse, error) {
tli := request.TaskListInfo

if tli.Kind == TaskListKindWorker { // if task_list is sticky, then update with TTL
fmt.Println("vancexu in updateTaskList with TTL")
query := d.session.Query(templateUpdateTaskListQueryWithTTL,
tli.DomainID,
&tli.Name,
tli.TaskType,
rowTypeTaskList,
taskListTaskID,
tli.RangeID,
tli.DomainID,
&tli.Name,
tli.TaskType,
tli.AckLevel,
tli.Kind,
stickyTaskListTTL,
)
err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("UpdateTaskList operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("UpdateTaskList operation failed. Error: %v", err),
}
}
return &UpdateTaskListResponse{}, nil
}

query := d.session.Query(templateUpdateTaskListQuery,
tli.RangeID,
tli.DomainID,
&tli.Name,
tli.TaskType,
tli.AckLevel,
tli.Kind,
tli.DomainID,
&tli.Name,
tli.TaskType,
Expand Down Expand Up @@ -1482,6 +1530,7 @@ func (d *cassandraPersistence) CreateTasks(request *CreateTasksRequest) (*Create
domainID := request.TaskListInfo.DomainID
taskList := request.TaskListInfo.Name
taskListType := request.TaskListInfo.TaskType
taskListKind := request.TaskListInfo.Kind
ackLevel := request.TaskListInfo.AckLevel

for _, task := range request.Tasks {
Expand Down Expand Up @@ -1519,6 +1568,7 @@ func (d *cassandraPersistence) CreateTasks(request *CreateTasksRequest) (*Create
taskList,
taskListType,
ackLevel,
taskListKind,
domainID,
taskList,
taskListType,
Expand Down
Loading

0 comments on commit dc80840

Please sign in to comment.