From aa841354af027b3a92eb7b2dcdcc287641909496 Mon Sep 17 00:00:00 2001 From: Gennady Laventman Date: Tue, 14 Mar 2017 17:04:12 +0200 Subject: [PATCH] [FAB-2778] Msg store update Update message store to automatically expire messages. Includes new go routine and expiration callback functions Change-Id: I7c4ec11c7392015e65de5d0553bd1e4c8aca3e5c Signed-off-by: Gennady Laventman --- gossip/gossip/msgstore/msgs.go | 164 ++++++++++++++++++++++++++-- gossip/gossip/msgstore/msgs_test.go | 141 ++++++++++++++++++++++++ 2 files changed, 294 insertions(+), 11 deletions(-) diff --git a/gossip/gossip/msgstore/msgs.go b/gossip/gossip/msgstore/msgs.go index 8d730813ceb..285d7876553 100644 --- a/gossip/gossip/msgstore/msgs.go +++ b/gossip/gossip/msgstore/msgs.go @@ -19,9 +19,13 @@ package msgstore import ( "sync" + "time" + "github.com/hyperledger/fabric/gossip/common" ) +var noopLock = func() {} + // invalidationTrigger is invoked on each message that was invalidated because of a message addition // i.e: if add(0), add(1) was called one after the other, and the store has only {1} after the sequence of invocations // then the invalidation trigger on 0 was called when 1 was added. @@ -30,7 +34,49 @@ type invalidationTrigger func(message interface{}) // NewMessageStore returns a new MessageStore with the message replacing // policy and invalidation trigger passed. func NewMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) MessageStore { - return &messageStoreImpl{pol: pol, lock: &sync.RWMutex{}, messages: make([]*msg, 0), invTrigger: trigger} + return newMsgStore(pol, trigger) +} + +// NewMessageStoreExpirable returns a new MessageStore with the message replacing +// policy and invalidation trigger passed. It supports old message expiration after msgTTL, during expiration first external +// lock taken, expiration callback invoked and external lock released. Callback and external lock can be nil. +func NewMessageStoreExpirable(pol common.MessageReplacingPolicy, trigger invalidationTrigger, msgTTL time.Duration, externalLock func(), externalUnlock func(), externalExpire func(interface{})) MessageStore { + store := newMsgStore(pol, trigger) + + store.expirable = true + store.msgTTL = msgTTL + + if externalLock != nil { + store.externalLock = externalLock + } + + if externalUnlock != nil { + store.externalUnlock = externalUnlock + } + + if externalExpire != nil { + store.expireMsgCallback = externalExpire + } + + go store.expirationRoutine() + return store +} + +func newMsgStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) *messageStoreImpl { + return &messageStoreImpl{ + pol: pol, + messages: make([]*msg, 0), + invTrigger: trigger, + + expirable: false, + externalLock: noopLock, + externalUnlock: noopLock, + expireMsgCallback: func(m interface{}) {}, + expiredCount: 0, + + doneCh: make(chan struct{}), + } + } // MessageStore adds messages to an internal buffer. @@ -44,22 +90,42 @@ type MessageStore interface { // returns true or false whether the message was added to the store Add(msg interface{}) bool + // Checks if message is valid for insertion to store + // returns true or false whether the message can be added to the store + CheckValid(msg interface{}) bool + // size returns the amount of messages in the store Size() int // get returns all messages in the store Get() []interface{} + + // Stop all associated go routines + Stop() } type messageStoreImpl struct { pol common.MessageReplacingPolicy - lock *sync.RWMutex + lock sync.RWMutex messages []*msg invTrigger invalidationTrigger + + expirable bool + msgTTL time.Duration + expiredCount int + + externalLock func() + externalUnlock func() + expireMsgCallback func(msg interface{}) + + doneCh chan struct{} + stopOnce sync.Once } type msg struct { - data interface{} + data interface{} + created time.Time + expired bool } // add adds a message to the store @@ -78,13 +144,23 @@ func (s *messageStoreImpl) Add(message interface{}) bool { s.messages = append(s.messages[:i], s.messages[i+1:]...) n-- i-- - break - default: - break } } - s.messages = append(s.messages, &msg{data: message}) + s.messages = append(s.messages, &msg{data: message, created: time.Now()}) + return true +} + +// Checks if message is valid for insertion to store +func (s *messageStoreImpl) CheckValid(message interface{}) bool { + s.lock.RLock() + defer s.lock.RUnlock() + + for _, m := range s.messages { + if s.pol(message, m.data) == common.MessageInvalidated { + return false + } + } return true } @@ -92,18 +168,84 @@ func (s *messageStoreImpl) Add(message interface{}) bool { func (s *messageStoreImpl) Size() int { s.lock.RLock() defer s.lock.RUnlock() - return len(s.messages) + return len(s.messages) - s.expiredCount } // get returns all messages in the store func (s *messageStoreImpl) Get() []interface{} { + res := make([]interface{}, 0) + s.lock.RLock() defer s.lock.RUnlock() + for _, msg := range s.messages { + if !msg.expired { + res = append(res, msg.data) + } + } + return res +} + +func (s *messageStoreImpl) expireMessages() { + s.externalLock() + s.lock.Lock() + defer s.lock.Unlock() + defer s.externalUnlock() + n := len(s.messages) - res := make([]interface{}, n) for i := 0; i < n; i++ { - res[i] = s.messages[i].data + m := s.messages[i] + if !m.expired { + if time.Since(m.created) > s.msgTTL { + m.expired = true + s.expireMsgCallback(m.data) + s.expiredCount++ + } + } else { + if time.Since(m.created) > (s.msgTTL * 2) { + s.messages = append(s.messages[:i], s.messages[i+1:]...) + n-- + i-- + s.expiredCount-- + } + + } } - return res +} + +func (s *messageStoreImpl) needToExpire() bool { + s.lock.RLock() + defer s.lock.RUnlock() + for _, msg := range s.messages { + if !msg.expired && time.Since(msg.created) > s.msgTTL { + return true + } else if time.Since(msg.created) > (s.msgTTL * 2) { + return true + } + } + return false +} + +func (s *messageStoreImpl) expirationRoutine() { + for { + select { + case <-s.doneCh: + return + case <-time.After(s.expirationCheckInterval()): + if s.needToExpire() { + s.expireMessages() + } + } + } +} + +func (s *messageStoreImpl) Stop() { + stopFunc := func() { + close(s.doneCh) + } + s.stopOnce.Do(stopFunc) +} + +func (s *messageStoreImpl) expirationCheckInterval() time.Duration { + return s.msgTTL / 100 } diff --git a/gossip/gossip/msgstore/msgs_test.go b/gossip/gossip/msgstore/msgs_test.go index e7bd8759adc..c629f660d67 100644 --- a/gossip/gossip/msgstore/msgs_test.go +++ b/gossip/gossip/msgstore/msgs_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "sync" + "github.com/hyperledger/fabric/gossip/common" "github.com/stretchr/testify/assert" ) @@ -51,6 +53,16 @@ func compareInts(this interface{}, that interface{}) common.InvalidationResult { return common.MessageInvalidated } +func nonReplaceInts(this interface{}, that interface{}) common.InvalidationResult { + a := this.(int) + b := that.(int) + if a == b { + return common.MessageInvalidated + } + + return common.MessageNoAction +} + func TestSize(t *testing.T) { msgStore := NewMessageStore(alwaysNoAction, noopTrigger) msgStore.Add(0) @@ -108,6 +120,7 @@ func TestNewMessagesInvalidated(t *testing.T) { } func TestConcurrency(t *testing.T) { + t.Parallel() stopFlag := int32(0) msgStore := NewMessageStore(compareInts, noopTrigger) looper := func(f func()) func() { @@ -141,3 +154,131 @@ func TestConcurrency(t *testing.T) { atomic.CompareAndSwapInt32(&stopFlag, 0, 1) } + +func TestExpiration(t *testing.T) { + t.Parallel() + expired := make([]int, 0) + msgTTL := time.Second * 3 + + msgStore := NewMessageStoreExpirable(nonReplaceInts, noopTrigger, msgTTL, nil, nil, func(m interface{}) { + expired = append(expired, m.(int)) + }) + + for i := 0; i < 10; i++ { + assert.True(t, msgStore.Add(i), "Adding", i) + } + + assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - first batch") + + time.Sleep(time.Second * 2) + + for i := 0; i < 10; i++ { + assert.False(t, msgStore.CheckValid(i)) + assert.False(t, msgStore.Add(i)) + } + + for i := 10; i < 20; i++ { + assert.True(t, msgStore.CheckValid(i)) + assert.True(t, msgStore.Add(i)) + assert.False(t, msgStore.CheckValid(i)) + } + assert.Equal(t, 20, msgStore.Size(), "Wrong number of items in store - second batch") + + time.Sleep(time.Second * 2) + + for i := 0; i < 20; i++ { + assert.False(t, msgStore.Add(i)) + } + + assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after first batch expiration") + assert.Equal(t, 10, len(expired), "Wrong number of expired msgs - after first batch expiration") + + time.Sleep(time.Second * 4) + + assert.Equal(t, 0, msgStore.Size(), "Wrong number of items in store - after second batch expiration") + assert.Equal(t, 20, len(expired), "Wrong number of expired msgs - after second batch expiration") + + for i := 0; i < 10; i++ { + assert.True(t, msgStore.CheckValid(i)) + assert.True(t, msgStore.Add(i)) + assert.False(t, msgStore.CheckValid(i)) + } + + assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after second batch expiration and first banch re-added") + +} + +func TestExpirationConcurrency(t *testing.T) { + t.Parallel() + expired := make([]int, 0) + msgTTL := time.Second * 3 + lock := &sync.RWMutex{} + + msgStore := NewMessageStoreExpirable(nonReplaceInts, noopTrigger, msgTTL, + func() { + lock.Lock() + }, + func() { + lock.Unlock() + }, + func(m interface{}) { + expired = append(expired, m.(int)) + }) + + lock.Lock() + for i := 0; i < 10; i++ { + assert.True(t, msgStore.Add(i), "Adding", i) + } + assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - first batch") + lock.Unlock() + + time.Sleep(time.Second * 2) + + lock.Lock() + time.Sleep(time.Second * 2) + + for i := 0; i < 10; i++ { + assert.False(t, msgStore.Add(i)) + } + + assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after first batch expiration, external lock taken") + assert.Equal(t, 0, len(expired), "Wrong number of expired msgs - after first batch expiration, external lock taken") + lock.Unlock() + + time.Sleep(time.Second * 1) + + lock.Lock() + for i := 0; i < 10; i++ { + assert.False(t, msgStore.Add(i)) + } + + assert.Equal(t, 0, msgStore.Size(), "Wrong number of items in store - after first batch expiration, expiration should run") + assert.Equal(t, 10, len(expired), "Wrong number of expired msgs - after first batch expiration, expiration should run") + + lock.Unlock() +} + +func TestStop(t *testing.T) { + t.Parallel() + expired := make([]int, 0) + msgTTL := time.Second * 3 + + msgStore := NewMessageStoreExpirable(nonReplaceInts, noopTrigger, msgTTL, nil, nil, func(m interface{}) { + expired = append(expired, m.(int)) + }) + + for i := 0; i < 10; i++ { + assert.True(t, msgStore.Add(i), "Adding", i) + } + + assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - first batch") + + msgStore.Stop() + + time.Sleep(time.Second * 4) + + assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after first batch expiration, but store was stopped, so no expiration") + assert.Equal(t, 0, len(expired), "Wrong number of expired msgs - after first batch expiration, but store was stopped, so no expiration") + + msgStore.Stop() +}