Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Jan 12, 2018
1 parent 86d437a commit 5ced7dd
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 54 deletions.
7 changes: 4 additions & 3 deletions common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ type Iterator interface {
// Close closes the iterator
// and releases any allocated resources
Close()
// Entries returns a channel of MapEntry
// objects that can be used in a range loop
Entries() <-chan Entry
// HasNext return true if there is more items to be returned
HasNext() bool
// Next return the next item
Next() Entry
}

// Entry represents a key-value entry within the map
Expand Down
75 changes: 38 additions & 37 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type (
}

iteratorImpl struct {
stopCh chan struct{}
dataCh chan Entry
lru *lru
nextItem *list.Element
}

entryImpl struct {
Expand All @@ -59,51 +59,52 @@ type (

// Close closes the iterator
func (it *iteratorImpl) Close() {
close(it.stopCh)
it.lru.mut.Unlock()
}

// Entries returns a channel of map entries
func (it *iteratorImpl) Entries() <-chan Entry {
return it.dataCh
// HasNext return true if there is more items to be returned
func (it *iteratorImpl) HasNext() bool {
for it.nextItem != nil {
entry := it.nextItem.Value.(*entryImpl)
if !it.lru.isEntryExpired(entry) {
// Entry is valid
return true
}

nextItem := it.nextItem.Next()
it.lru.deleteInternal(it.nextItem)
it.nextItem = nextItem
}

return false
}

// Next return the next item
func (it *iteratorImpl) Next() Entry {
if !it.HasNext() {
panic("LRU cache iterator Next called when there is no next item")
}

entry := it.nextItem.Value.(*entryImpl)
it.nextItem = it.nextItem.Next()
// make a copy of the entry so there will be no concurrent access to this entry
entry = &entryImpl{
key: entry.key,
value: entry.value,
timestamp: entry.timestamp,
}
return entry
}

// Iterator returns an iterator to the map. This map
// does not use re-entrant locks, so access or modification
// to the map during iteration can cause a dead lock.
func (c *lru) Iterator() Iterator {

c.mut.Lock()
iterator := &iteratorImpl{
dataCh: make(chan Entry, 8),
stopCh: make(chan struct{}),
lru: c,
nextItem: c.byAccess.Front(),
}

go func(iterator *iteratorImpl) {
c.mut.Lock()
for _, element := range c.byKey {
entry := element.Value.(*entryImpl)
if c.isEntryExpired(entry) {
// Entry has expired
c.deleteInternal(element)
continue
}
// make a copy of the entry so there will be no concurrent access to this entry
entry = &entryImpl{
key: entry.key,
value: entry.value,
timestamp: entry.timestamp,
}
select {
case iterator.dataCh <- entry:
case <-iterator.stopCh:
c.mut.Unlock()
close(iterator.dataCh)
return
}
}
c.mut.Unlock()
close(iterator.dataCh)
}(iterator)

return iterator
}

Expand Down
12 changes: 7 additions & 5 deletions common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,11 @@ func TestLRUCacheConcurrentAccess(t *testing.T) {
for j := 0; j < 50; j++ {
result := []Entry{}
it := cache.Iterator()
defer it.Close()
for entry := range it.Entries() {
for it.HasNext() {
entry := it.Next()
result = append(result, entry)
}
it.Close()
}
}()
}
Expand Down Expand Up @@ -215,9 +216,10 @@ func TestIterator(t *testing.T) {

actual := map[string]string{}

ite := cache.Iterator()
defer ite.Close()
for entry := range ite.Entries() {
it := cache.Iterator()
defer it.Close()
for it.HasNext() {
entry := it.Next()
actual[entry.Key().(string)] = entry.Value().(string)
}

Expand Down
6 changes: 3 additions & 3 deletions common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func BoolPtr(v bool) *bool {
return &v
}

// TimePtr makes a copy and returns the pointer to a string for time.Time, as ISO 8601 format.
func TimePtr(v time.Time) *int64 {
time := v.UTC().UnixNano()
// TimeInt64Ptr makes a copy and returns the pointer to a string for time.Time, as ISO 8601 format.
func TimeInt64Ptr(v time.Time) *int64 {
time := v.UnixNano()
return &time
}

Expand Down
2 changes: 1 addition & 1 deletion service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (e *matchingEngineImpl) DescribeTaskList(ctx context.Context, request *m.De
for _, poller := range tlMgr.GetAllPollerInfo() {
pollers = append(pollers, &workflow.PollerInfo{
Identity: common.StringPtr(poller.identity),
Timestamp: common.TimePtr(poller.timestamp),
Timestamp: common.TimeInt64Ptr(poller.timestamp),
})
}
return &workflow.DescribeTaskListResponse{Pollers: pollers}, nil
Expand Down
12 changes: 7 additions & 5 deletions service/matching/pollerHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ type pollerHistory struct {
}

func newPollerHistory() *pollerHistory {
opts := &cache.Options{}
opts.InitialCapacity = pollerHistoryInitSize
opts.TTL = pollerHistoryTTL
opts.Pin = false
opts := &cache.Options{
InitialCapacity: pollerHistoryInitSize,
TTL: pollerHistoryTTL,
Pin: false,
}

return &pollerHistory{
history: cache.New(pollerHistoryInitMaxSize, opts),
Expand All @@ -71,7 +72,8 @@ func (pollers *pollerHistory) getAllPollerInfo() []*pollerInfo {

ite := pollers.history.Iterator()
defer ite.Close()
for entry := range ite.Entries() {
for ite.HasNext() {
entry := ite.Next()
key := entry.Key().(pollerIdentity)
timestamp := entry.Timestamp()
result = append(result, &pollerInfo{
Expand Down

0 comments on commit 5ced7dd

Please sign in to comment.