Skip to content

Commit

Permalink
Add ttl for sticky task list
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Jan 17, 2018
1 parent 48e40e2 commit 7e87da1
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 45 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

40 changes: 38 additions & 2 deletions .gen/go/shared/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 53 additions & 16 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const (

// minimum current execution retention TTL when current execution is deleted, in seconds
minCurrentExecutionRetentionTTL = int32(24 * time.Hour / time.Second)

stickyTaskListTTL = int32(86400) // if sticky task_list stopped being updated, remove it in one day
)

const (
Expand Down Expand Up @@ -208,7 +210,8 @@ const (
`domain_id: ?, ` +
`name: ?, ` +
`type: ?, ` +
`ack_level: ? ` +
`ack_level: ?, ` +
`kind: ? ` +
`}`

templateTaskType = `{` +
Expand Down Expand Up @@ -527,6 +530,16 @@ const (
`and type = ? ` +
`and task_id = ? ` +
`IF range_id = ?`

templateUpdateTaskListQueryWithTTL = `INSERT INTO tasks (` +
`domain_id, ` +
`task_list_name, ` +
`task_list_type, ` +
`type, ` +
`task_id, ` +
`range_id, ` +
`task_list ` +
`) VALUES (?, ?, ?, ?, ?, ?, ` + templateTaskListType + `) USING TTL ?`
)

var (
Expand Down Expand Up @@ -1380,7 +1393,9 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
request.DomainID,
request.TaskList,
request.TaskType,
0)
0,
request.TaskListKind,
)
} else if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("LeaseTaskList operation failed. TaskList: %v, TaskType: %v, Error: %v",
Expand All @@ -1394,19 +1409,38 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
}
} else {
ackLevel = tlDB["ack_level"].(int64)
query = d.session.Query(templateUpdateTaskListQuery,
rangeID+1,
request.DomainID,
&request.TaskList,
request.TaskType,
ackLevel,
request.DomainID,
&request.TaskList,
request.TaskType,
rowTypeTaskList,
taskListTaskID,
rangeID,
)
taskListKind := tlDB["kind"].(int)
if taskListKind == TaskListKindWorker { // if task_list is sticky, then update with TTL
query = d.session.Query(templateUpdateTaskListQueryWithTTL,
request.DomainID,
request.TaskList,
request.TaskType,
rowTypeTaskList,
taskListTaskID,
rangeID+1,
request.DomainID,
request.TaskList,
request.TaskType,
ackLevel,
TaskListKindWorker,
stickyTaskListTTL,
)
} else {
query = d.session.Query(templateUpdateTaskListQuery,
rangeID+1,
request.DomainID,
&request.TaskList,
request.TaskType,
ackLevel,
TaskListKindNormal,
request.DomainID,
&request.TaskList,
request.TaskType,
rowTypeTaskList,
taskListTaskID,
rangeID,
)
}
}
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
Expand All @@ -1426,7 +1460,7 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
Msg: fmt.Sprintf("LeaseTaskList failed to apply. db rangeID %v", previousRangeID),
}
}
tli := &TaskListInfo{DomainID: request.DomainID, Name: request.TaskList, TaskType: request.TaskType, RangeID: rangeID + 1, AckLevel: ackLevel}
tli := &TaskListInfo{DomainID: request.DomainID, Name: request.TaskList, TaskType: request.TaskType, RangeID: rangeID + 1, AckLevel: ackLevel, Kind: request.TaskListKind}
return &LeaseTaskListResponse{TaskListInfo: tli}, nil
}

Expand All @@ -1440,6 +1474,7 @@ func (d *cassandraPersistence) UpdateTaskList(request *UpdateTaskListRequest) (*
&tli.Name,
tli.TaskType,
tli.AckLevel,
tli.Kind,
tli.DomainID,
&tli.Name,
tli.TaskType,
Expand Down Expand Up @@ -1482,6 +1517,7 @@ func (d *cassandraPersistence) CreateTasks(request *CreateTasksRequest) (*Create
domainID := request.TaskListInfo.DomainID
taskList := request.TaskListInfo.Name
taskListType := request.TaskListInfo.TaskType
taskListKind := request.TaskListInfo.Kind
ackLevel := request.TaskListInfo.AckLevel

for _, task := range request.Tasks {
Expand Down Expand Up @@ -1519,6 +1555,7 @@ func (d *cassandraPersistence) CreateTasks(request *CreateTasksRequest) (*Create
taskList,
taskListType,
ackLevel,
taskListKind,
domainID,
taskList,
taskListType,
Expand Down
16 changes: 16 additions & 0 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,22 @@ func (s *cassandraPersistenceSuite) TestLeaseTaskList() {
s.EqualValues(0, tli.AckLevel)
}

func (s *cassandraPersistenceSuite) TestLeaseTaskList_Sticky() {
domainID := "00136543-72ad-4615-b7e9-44bca9775b45"
taskList := "aaaaaaa"
response, err := s.TaskMgr.LeaseTaskList(&LeaseTaskListRequest{
DomainID: domainID,
TaskList: taskList,
TaskType: TaskListTypeDecision,
TaskListKind: TaskListKindWorker,
})
s.NoError(err)
tli := response.TaskListInfo
s.EqualValues(1, tli.RangeID)
s.EqualValues(0, tli.AckLevel)
s.EqualValues(TaskListKindWorker, tli.Kind)
}

func (s *cassandraPersistenceSuite) TestTimerTasks() {
domainID := "8bfb47be-5b57-4d66-9109-5fb35e20b1d7"
workflowExecution := gen.WorkflowExecution{
Expand Down
14 changes: 11 additions & 3 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ const (
TaskListTypeActivity
)

// Kinds of task lists
const (
TaskListKindNormal = iota
TaskListKindWorker
)

// Transfer task types
const (
TransferTaskTypeDecisionTask = iota
Expand Down Expand Up @@ -191,6 +197,7 @@ type (
TaskType int
RangeID int64
AckLevel int64
Kind int
}

// TaskInfo describes either activity or decision task
Expand Down Expand Up @@ -472,9 +479,10 @@ type (

// LeaseTaskListRequest is used to request lease of a task list
LeaseTaskListRequest struct {
DomainID string
TaskList string
TaskType int
DomainID string
TaskList string
TaskType int
TaskListKind int
}

// LeaseTaskListResponse is response to LeaseTaskListRequest
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ struct ActivityType {

struct TaskList {
10: optional string name
20: optional i32 kind
}

struct TaskListMetadata {
Expand Down
1 change: 1 addition & 0 deletions schema/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ CREATE TYPE task_list (
name text,
type int, -- enum TaskRowType {ActivityTask, DecisionTask}
ack_level bigint, -- task_id of the last acknowledged message
kind int, -- enum TaskListKind {Normal, Worker}
);

CREATE TYPE domain (
Expand Down
1 change: 1 addition & 0 deletions schema/cadence/versioned/v0.3/add_tasklist_kind.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE task_list ADD kind int ;
5 changes: 3 additions & 2 deletions schema/cadence/versioned/v0.3/manifest.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"CurrVersion": "0.3",
"MinCompatibleVersion": "0.3",
"Description": "add client_library_version, client_feature_version and client_impl to mutable state",
"Description": "add client_library_version, client_feature_version and client_impl to mutable state; add kind to task_list",
"SchemaUpdateCqlFiles": [
"add_client_version.cql",
"add_last_first_event_id.cql"
"add_last_first_event_id.cql",
"add_tasklist_kind.cql"
]
}
1 change: 1 addition & 0 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (t *transferQueueProcessorImpl) processDecisionTask(task *persistence.Trans
startTimestamp := mb.executionInfo.StartTimestamp
if mb.isStickyTaskListEnabled() {
taskList.Name = common.StringPtr(mb.executionInfo.StickyTaskList)
taskList.Kind = common.Int32Ptr(persistence.TaskListKindWorker)
timeout = mb.executionInfo.StickyScheduleToStartTimeout
}
release()
Expand Down
Loading

0 comments on commit 7e87da1

Please sign in to comment.