Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve deadlock caused by fetching snapshot during update #126

Merged
merged 3 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,57 +304,74 @@ func (c *Conn) receivedOrderBook(ob orderBook) error {
}

func (c *Conn) receivedUpdate(u Update) error {
valid, err := c.processUpdate(u)
if err != nil {
return err
}

// If update is not valid, ignore
if !valid {
return nil
neilgarb marked this conversation as resolved.
Show resolved Hide resolved
}

if c.updateCallback != nil {
c.updateCallback(u)
}

return nil
}

// Validate and process update into orderbook.
// Return bool indicating if update is valid.
func (c *Conn) processUpdate(u Update) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()

if c.seq == 0 {
// State not initialized so we can't update it.
return nil
return false, nil
}

if u.Sequence <= c.seq {
// Old update. We can just discard it.
return nil
return false, nil
}

if u.Sequence != c.seq+1 {
return errors.New("streaming: update received out of sequence")
return false, errors.New("streaming: update received out of sequence")
}

// Process trades
for _, t := range u.TradeUpdates {
if err := c.processTrade(*t); err != nil {
return err
return false, err
}
}

// Process create
if u.CreateUpdate != nil {
if err := c.processCreate(*u.CreateUpdate); err != nil {
return err
return false, err
}
}

// Process delete
if u.DeleteUpdate != nil {
if err := c.processDelete(*u.DeleteUpdate); err != nil {
return err
return false, err
}
}

// Process status
if u.StatusUpdate != nil {
if err := c.processStatus(*u.StatusUpdate); err != nil {
return err
return false, err
}
}

c.seq = u.Sequence

if c.updateCallback != nil {
c.updateCallback(u)
}

return nil
return true, nil
}

func decTrade(m map[string]order, id string, base decimal.Decimal) (
Expand Down Expand Up @@ -483,7 +500,7 @@ func (c *Conn) Snapshot() Snapshot {
}
}

// Status returns the currenct status of the streaming connection.
// Status returns the current status of the streaming connection.
func (c *Conn) Status() luno.Status {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
50 changes: 50 additions & 0 deletions streaming/streaming_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package streaming

import (
"testing"
"time"

"github.com/luno/luno-go"
"github.com/luno/luno-go/decimal"
Expand Down Expand Up @@ -583,3 +584,52 @@ func compareOrderBookEntry(t *testing.T, want, got luno.OrderBookEntry) {
t.Errorf("got = %v, want %v", got, want)
}
}

func TestReceiveUpdateSnapshot(t *testing.T) {
chDone := make(chan struct{})

c := &Conn{
asks: asksMap(),
bids: bidsMap(),
seq: 1,
status: luno.StatusActive,
}

onUpdate := func(up Update) {
// Get snapshot to confirm mutex does not create deadlock
_ = c.Snapshot()
chDone <- struct{}{}
}

c.updateCallback = onUpdate

tu := []*TradeUpdate{
{
Sequence: 2,
Base: decimal.NewFromFloat64(0.02, 2),
Counter: decimal.NewFromFloat64(0.002, 2),
MakerOrderID: "1",
TakerOrderID: "32",
},
{
Sequence: 3,
Base: decimal.NewFromFloat64(0.01, 2),
Counter: decimal.NewFromFloat64(0.001, 2),
MakerOrderID: "1",
TakerOrderID: "34",
},
}

go func() {
err := c.receivedUpdate(Update{Sequence: 2, TradeUpdates: tu})
if err != nil {
t.Errorf("Expected success got: %v", err)
}
}()

select {
case <-chDone:
case <-time.After(time.Second):
t.Errorf("timeout trying to retrieve snapshot on update")
}
}