Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Memory leak] Remove dangling pointers in the blockchain subscription #194

Merged
merged 3 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 15 additions & 52 deletions blockchain/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ func (m *MockSubscription) Close() {

// subscription is the Blockchain event subscription object
type subscription struct {
updateCh chan void // Channel for update information
closeCh chan void // Channel for close signals
elem *eventElem // Reference to the blockchain event wrapper
updateCh chan *Event // Channel for update information
closeCh chan void // Channel for close signals
}

// GetEventCh creates a new event channel, and returns it
Expand All @@ -72,17 +71,10 @@ func (s *subscription) GetEventCh() chan *Event {
// GetEvent returns the event from the subscription (BLOCKING)
func (s *subscription) GetEvent() *Event {
for {
if s.elem.next != nil {
s.elem = s.elem.next
evnt := s.elem.event

return evnt
}

// Wait for an update
select {
case <-s.updateCh:
continue
case ev := <-s.updateCh:
return ev
case <-s.closeCh:
return nil
}
Expand Down Expand Up @@ -117,7 +109,7 @@ type Event struct {
Type EventType

// Source is the source that generated the blocks for the event
// right now it can be either the Sealer or the Syncer. TODO
// right now it can be either the Sealer or the Syncer
Source string
}

Expand Down Expand Up @@ -160,73 +152,44 @@ func (b *Blockchain) SubscribeEvents() Subscription {
return b.stream.subscribe()
}

// eventElem contains the event, as well as the next list event
type eventElem struct {
event *Event
next *eventElem
}

// eventStream is the structure that contains the event list,
// as well as the update channel which it uses to notify of updates
type eventStream struct {
lock sync.Mutex
head *eventElem

// channel to notify updates
updateCh []chan void
updateCh []chan *Event
}

// subscribe Creates a new blockchain event subscription
func (e *eventStream) subscribe() *subscription {
head, updateCh := e.Head()
s := &subscription{
elem: head,
updateCh: updateCh,
return &subscription{
updateCh: e.newUpdateCh(),
closeCh: make(chan void),
}

return s
}

// Head returns the event list head
func (e *eventStream) Head() (*eventElem, chan void) {
// newUpdateCh returns the event update channel
func (e *eventStream) newUpdateCh() chan *Event {
e.lock.Lock()
head := e.head

ch := make(chan void)

if e.updateCh == nil {
e.updateCh = make([]chan void, 0)
}
defer e.lock.Unlock()

ch := make(chan *Event, 1)
e.updateCh = append(e.updateCh, ch)

e.lock.Unlock()

return head, ch
return ch
}

// push adds a new Event, and notifies listeners
func (e *eventStream) push(event *Event) {
e.lock.Lock()

newHead := &eventElem{
event: event,
}

if e.head != nil {
e.head.next = newHead
}

e.head = newHead
defer e.lock.Unlock()

// Notify the listeners
for _, update := range e.updateCh {
select {
case update <- void{}:
case update <- event:
default:
}
}

e.lock.Unlock()
}
87 changes: 31 additions & 56 deletions blockchain/subscription_test.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,53 @@
package blockchain

import (
"sync"
"testing"
"time"

"github.com/dogechain-lab/dogechain/types"
"github.com/stretchr/testify/assert"
)

func TestSubscriptionLinear(t *testing.T) {
e := &eventStream{}
func TestSubscription(t *testing.T) {
t.Parallel()

// add a genesis block to eventstream
e.push(&Event{
NewChain: []*types.Header{
{Number: 0},
},
})

sub := e.subscribe()
var (
e = &eventStream{}
sub = e.subscribe()
caughtEventNum = uint64(0)
event = &Event{
NewChain: []*types.Header{
{
Number: 100,
},
},
}

eventCh := make(chan *Event)
wg sync.WaitGroup
)

go func() {
for {
task := sub.GetEvent()
eventCh <- task
}
}()
defer sub.Close()

for i := 1; i < 10; i++ {
evnt := &Event{}
updateCh := sub.GetEventCh()

evnt.AddNewHeader(&types.Header{Number: uint64(i)})
e.push(evnt)
wg.Add(1)

timeoutDelay := time.NewTimer(1 * time.Second)
go func() {
defer wg.Done()

// it should fire updateCh
select {
case evnt := <-eventCh:
if evnt.NewChain[0].Number != uint64(i) {
t.Fatal("bad")
}
case <-timeoutDelay.C:
t.Fatal("timeout")
case ev := <-updateCh:
caughtEventNum = ev.NewChain[0].Number
case <-time.After(5 * time.Second):
}
}
}

func TestSubscriptionSlowConsumer(t *testing.T) {
e := &eventStream{}
}()

e.push(&Event{
NewChain: []*types.Header{
{Number: 0},
},
})
// Send the event to the channel
e.push(event)

sub := e.subscribe()
// Wait for the event to be parsed
wg.Wait()

// send multiple events
for i := 1; i < 10; i++ {
e.push(&Event{
NewChain: []*types.Header{
{Number: uint64(i)},
},
})
}

// consume events now
for i := 1; i < 10; i++ {
evnt := sub.GetEvent()
if evnt.NewChain[0].Number != uint64(i) {
t.Fatal("bad")
}
}
assert.Equal(t, event.NewChain[0].Number, caughtEventNum)
}