-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add API GetPollerHistory #483
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At high level, one question is how do we add the Tasklist queue size and queue waiting time?
|
||
struct PollerInfo { | ||
// ISO 8601 format | ||
10: optional string timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are using UnixNano (i64) as timestamp in our APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
10: optional list<PollerInfo> pollers | ||
} | ||
|
||
enum TaskListType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use TaskType here? We are going to use TaskListType for the type of task list like REGULAR, STICKY, and HOST_SPECIFIC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at least right now, the task_list_type is already used in cassandra:
-- Stores activity or workflow tasks
CREATE TABLE tasks (
domain_id uuid,
task_list_name text,
task_list_type int, -- enum TaskListType {ActivityTask, DecisionTask}
type int, -- enum rowType {Task, TaskList}
task_id bigint, -- unique identifier for tasks, monotonically increasing
range_id bigint, -- Used to ensure that only one process can write to the table
task frozen,
task_list frozen<task_list>,
PRIMARY KEY ((domain_id, task_list_name, task_list_type), type, task_id)
) WITH COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
};
/** | ||
* GetPollerHistory returns pollers which poll from given tasklist in last few minutes. | ||
**/ | ||
shared.GetPollerHistoryResponse GetPollerHistory(1: shared.GetPollerHistoryRequest request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we want expose something like DescTaskList. For now it only include the poller info, but soon we are going to expose the task list queue size and task list wait time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i did just give some random name to the API and parameters, let us wait until @molteanu come back
) | ||
|
||
type ( | ||
pollerIdentity struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name is confusing. The identity is the identity supplied by worker which could be customized by user to any string. I think the idea here is to guarantee the uniqueness. Probably use some different name here to distinguish that would help understand the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, there is a pollerID, which is actually a one time request ID; identity (which is specified in the API) is to some degree too generic......
let us wait for more comment about the names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall the poller info part looks good. I have concern about the implementation of the cache iterator, see inline comments.
common/cache/cache.go
Outdated
Close() | ||
// Entries returns a channel of MapEntry | ||
// objects that can be used in a range loop | ||
Entries() <-chan Entry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user forget to call Close() it will leak resource. Why using goroutinue/channel instead of the HasNext() and Next() pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok but the iterator must have a close method,
this is because, if the release of the lock is handled inside the iterator, caller can choose not to iterator over all items in the lru cache.
type Iterator interface {
HasNext() bool
Next() Entry
Close()
}
common/cache/lru.go
Outdated
stopCh: make(chan struct{}), | ||
} | ||
|
||
go func(iterator *iteratorImpl) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should use this approach for iterator. You are creating a goroutinue whenever iterator is needed. Goroutinue is cheap but it is not free. You also do context switch back and forth while iterate through the collection, which is much slower.
Why not just use the HasNext() and Next() pattern?
common/cache/lru_test.go
Outdated
it := cache.Iterator() | ||
defer it.Close() | ||
for entry := range it.Entries() { | ||
result = append(result, entry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not clear what we are testing here. i don't see validation part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test test the concurrent execution of put / get / iteration. so as long as go test -race not reporting error, or there is no SIGSEGV, we are fine
common/convert.go
Outdated
@@ -57,6 +61,12 @@ func BoolPtr(v bool) *bool { | |||
return &v | |||
} | |||
|
|||
// TimePtr makes a copy and returns the pointer to a string for time.Time, as ISO 8601 format. | |||
func TimePtr(v time.Time) *int64 { | |||
time := v.UTC().UnixNano() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to call UTC().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fine
service/matching/matchingEngine.go
Outdated
for _, poller := range tlMgr.GetAllPollerInfo() { | ||
pollers = append(pollers, &workflow.PollerInfo{ | ||
Identity: common.StringPtr(poller.identity), | ||
Timestamp: common.TimePtr(poller.timestamp), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this TimePtr is misleading, it is still int64 pointer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cannot use Int64Ptr since golang does not support overloading, use TimeInt64Ptr
service/matching/pollerHistory.go
Outdated
opts := &cache.Options{} | ||
opts.InitialCapacity = pollerHistoryInitSize | ||
opts.TTL = pollerHistoryTTL | ||
opts.Pin = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put the fields initialization in the field when create the struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a bug in the iterator.
common/cache/lru.go
Outdated
// Next return the next item | ||
func (it *iteratorImpl) Next() Entry { | ||
if !it.HasNext() { | ||
panic("LRU cache iterator Next called when there is no next item") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is problematic because HasNext() checks TTL so it could return false even if you checked HasNext() before you call Next().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better solution is to move the code in HasNext() to a new method, say prepareNext(), then in HasNext() you just return nextItem != nil, and Next() method simply return the value from nextItem, and before it returns, it move to next item and calls the prepareNext() for next calls. That way you only do validation once in prepareNext and you don't need an extra field timestamp in your iterator. (But you do need an initial call to prepareNext() when you create the iterator.)
common/convert.go
Outdated
@@ -57,6 +61,12 @@ func BoolPtr(v bool) *bool { | |||
return &v | |||
} | |||
|
|||
// TimeInt64Ptr makes a copy and returns the pointer to a string for time.Time, as ISO 8601 format. | |||
func TimeInt64Ptr(v time.Time) *int64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this function and just use Int64Ptr(v.UnixNano()) instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this helper function is useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have many places using Int64Ptr for timestamp. This new method only adds confusing to the codebase. The doc for this method is not correct. If you feel strong about this, you might want to send a separate diff for this refactor that updates all the other places as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fine
common/cache/lru.go
Outdated
// Next return the next item | ||
func (it *iteratorImpl) Next() Entry { | ||
if !it.HasNext() { | ||
panic("LRU cache iterator Next called when there is no next item") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better solution is to move the code in HasNext() to a new method, say prepareNext(), then in HasNext() you just return nextItem != nil, and Next() method simply return the value from nextItem, and before it returns, it move to next item and calls the prepareNext() for next calls. That way you only do validation once in prepareNext and you don't need an extra field timestamp in your iterator. (But you do need an initial call to prepareNext() when you create the iterator.)
common/convert.go
Outdated
@@ -57,6 +61,12 @@ func BoolPtr(v bool) *bool { | |||
return &v | |||
} | |||
|
|||
// TimeInt64Ptr makes a copy and returns the pointer to a string for time.Time, as ISO 8601 format. | |||
func TimeInt64Ptr(v time.Time) *int64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have many places using Int64Ptr for timestamp. This new method only adds confusing to the codebase. The doc for this method is not correct. If you feel strong about this, you might want to send a separate diff for this refactor that updates all the other places as well.
common/cache/lru.go
Outdated
existing := entry.value | ||
if allowUpdate { | ||
entry.value = value | ||
} | ||
if c.ttl != 0 { | ||
entry.expiration = time.Now().Add(c.ttl) | ||
entry.timestamp = time.Now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we change the expiration to timestamp? i think expiration is better because it is more clear what it means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in terms of functionality, timestamp or expiration both works.
in terms of getting the last time a entry is refreshed, timestamp works, expiration do not. that is the reason
DescribeTaskList return poller information, which contains the poller identity and last time this poller poll a given tasklist.
solve #467, #365, #144