Skip to content
This repository has been archived by the owner on Nov 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #207 from sergefdrv/message-log-extension
Browse files Browse the repository at this point in the history
Support obtaining and resetting the message log
  • Loading branch information
nhoriguchi authored Jun 22, 2021
2 parents 60297ae + c3ba51d commit ce22fdb
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 27 deletions.
73 changes: 53 additions & 20 deletions core/internal/messagelog/messagelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,45 @@ import (
// they appear in the log. Closing the channel passed to this function
// indicates the returned channel should be closed. Nil channel may be
// passed if there's no need to close the returned channel.
//
// Messages returns all messages currently in the log.
//
// Reset replaces messages stored in the log with the supplied ones.
type MessageLog interface {
Append(msg messages.Message)
Stream(done <-chan struct{}) <-chan messages.Message
Messages() []messages.Message
Reset(msgs []messages.Message)
}

type messageLog struct {
lock sync.RWMutex
sync.RWMutex

// Messages in order added
msgs []messages.Message

// Buffered channels to notify about new messages
newAdded []chan<- struct{}
// Channel to close and recreate when adding new messages
newAdded chan struct{}

// Channel to close and recreate on reset
resetChan chan struct{}
}

// New creates a new instance of the message log.
func New() MessageLog {
return &messageLog{}
return &messageLog{
newAdded: make(chan struct{}),
resetChan: make(chan struct{}),
}
}

func (log *messageLog) Append(msg messages.Message) {
log.lock.Lock()
defer log.lock.Unlock()
log.Lock()
defer log.Unlock()

log.msgs = append(log.msgs, msg)

for _, newAdded := range log.newAdded {
select {
case newAdded <- struct{}{}:
default:
}
}
close(log.newAdded)
log.newAdded = make(chan struct{})
}

func (log *messageLog) Stream(done <-chan struct{}) <-chan messages.Message {
Expand All @@ -80,30 +87,56 @@ func (log *messageLog) Stream(done <-chan struct{}) <-chan messages.Message {
func (log *messageLog) supplyMessages(ch chan<- messages.Message, done <-chan struct{}) {
defer close(ch)

newAdded := make(chan struct{}, 1)
log.lock.Lock()
log.newAdded = append(log.newAdded, newAdded)
log.lock.Unlock()
log.RLock()
resetChan := log.resetChan
log.RUnlock()

next := 0
var next int
loop:
for {
log.lock.RLock()
log.RLock()
select {
case <-resetChan:
resetChan = log.resetChan
next = 0
default:
}
newAdded := log.newAdded
msgs := log.msgs[next:]
next = len(log.msgs)
log.lock.RUnlock()
log.RUnlock()

for _, msg := range msgs {
select {
case ch <- msg:
case <-resetChan:
continue loop
case <-done:
return
}
}

select {
case <-newAdded:
case <-resetChan:
case <-done:
return
}
}
}

func (log *messageLog) Messages() []messages.Message {
log.RLock()
defer log.RUnlock()

return log.msgs
}

func (log *messageLog) Reset(msgs []messages.Message) {
log.Lock()
defer log.Unlock()

log.msgs = msgs
close(log.resetChan)
log.resetChan = make(chan struct{})
}
94 changes: 87 additions & 7 deletions core/internal/messagelog/messagelog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,86 @@ func TestStream(t *testing.T) {
assert.False(t, more, "Channel not closed")
}

func TestMessages(t *testing.T) {
const nrMessages = 5

log := New()
msgs := makeManyMsgs(nrMessages)

for _, msg := range msgs {
log.Append(msg)
}
assert.Equalf(t, msgs, log.Messages(), "Unexpected messages")
}

func TestReset(t *testing.T) {
const nrMessages = 23

log := New()
ch := log.Stream(nil)

msgs := makeManyMsgs(nrMessages)
log.Reset(msgs)

for i, m := range msgs {
assert.Equalf(t, m, <-ch, "Unexpected message %d", i)
}

msgs2 := makeManyMsgs(nrMessages)
log.Reset(msgs2)

ch2 := log.Stream(nil)
for i, m := range msgs2 {
assert.Equalf(t, m, <-ch, "Unexpected message %d", i)
assert.Equalf(t, m, <-ch2, "Unexpected message %d", i)
}
assert.Equalf(t, msgs2, log.Messages(), "Unexpected messages")
}

func TestResetConcurrent(t *testing.T) {
const nrStreams = 3
const nrMessages = 23

log := New()

msgs := makeManyMsgs(nrMessages)
msgs2 := makeManyMsgs(nrMessages)
log.Reset(msgs)

wg := new(sync.WaitGroup)
wg.Add(nrStreams)
for id := 0; id < nrStreams; id++ {
ch := log.Stream(nil)

go func(streamID int) {
defer wg.Done()

var i, j int
for j < len(msgs2) {
ok := assert.Conditionf(t, func() bool {
m := <-ch
switch {
case i < len(msgs) && m == msgs[i]:
i++
return assert.Zero(t, j)
case m == msgs2[j]:
j++
return true
default:
return false
}
}, "Unexpected message from stream %d", streamID)
if !ok {
break
}
}
}(id)
}

log.Reset(msgs2)
wg.Wait()
}

func TestConcurrent(t *testing.T) {
const nrStreams = 3
const nrMessages = 5
Expand All @@ -97,8 +177,8 @@ func TestConcurrent(t *testing.T) {
ch := log.Stream(done)

for i, msg := range msgs {
assert.Equalf(t, msg, <-ch,
"Unexpected message %d from stream %d", i, streamID)
assert.Equalf(t, msg, <-ch, "Unexpected message %d from stream %d", i, streamID)
assert.Equalf(t, msgs[:i], log.Messages()[:i], "Unexpected messages in the log")
}

close(done)
Expand Down Expand Up @@ -137,8 +217,8 @@ func TestWithFaulty(t *testing.T) {
}

for i, msg := range msgs {
assert.Equalf(t, msg, <-ch,
"Unexpected message %d from stream %d", i, streamID)
assert.Equalf(t, msg, <-ch, "Unexpected message %d from stream %d", i, streamID)
assert.Equalf(t, msgs[:i], log.Messages()[:i], "Unexpected messages in the log")
}

wg.Done()
Expand All @@ -152,15 +232,15 @@ func TestWithFaulty(t *testing.T) {
wg.Wait()
}

func makeManyMsgs(nrMessages int) []messages.ReplicaMessage {
msgs := make([]messages.ReplicaMessage, nrMessages)
func makeManyMsgs(nrMessages int) []messages.Message {
msgs := make([]messages.Message, nrMessages)
for i := 0; i < nrMessages; i++ {
msgs[i] = makeMsg()
}
return msgs
}

func makeMsg() messages.ReplicaMessage {
func makeMsg() messages.Message {
return struct {
messages.ReplicaMessage
i int
Expand Down
26 changes: 26 additions & 0 deletions core/internal/messagelog/mocks/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ce22fdb

Please sign in to comment.