Skip to content

Commit

Permalink
Merge pull request #198 from jbenet/fix/bitswap-races
Browse files Browse the repository at this point in the history
fix(bitswap) data races
  • Loading branch information
Brian Tiger Chow committed Oct 24, 2014
2 parents bd5a1c0 + c848202 commit f73d632
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 49 deletions.
4 changes: 2 additions & 2 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ type Blockstore interface {
Put(*blocks.Block) error
}

func NewBlockstore(d ds.Datastore) Blockstore {
func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore {
return &blockstore{
datastore: d,
}
}

type blockstore struct {
datastore ds.Datastore
datastore ds.ThreadSafeDatastore
}

func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) {
Expand Down
7 changes: 4 additions & 3 deletions blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"testing"

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
)

// TODO(brian): TestGetReturnsNil

func TestGetWhenKeyNotPresent(t *testing.T) {
bs := NewBlockstore(ds.NewMapDatastore())
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
_, err := bs.Get(u.Key("not present"))

if err != nil {
Expand All @@ -23,7 +24,7 @@ func TestGetWhenKeyNotPresent(t *testing.T) {
}

func TestPutThenGetBlock(t *testing.T) {
bs := NewBlockstore(ds.NewMapDatastore())
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))

err := bs.Put(block)
Expand All @@ -46,7 +47,7 @@ func TestValueTypeMismatch(t *testing.T) {
datastore := ds.NewMapDatastore()
datastore.Put(block.Key().DsKey(), "data that isn't a block!")

blockstore := NewBlockstore(datastore)
blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))

_, err := blockstore.Get(block.Key())
if err != ValueTypeMismatch {
Expand Down
2 changes: 1 addition & 1 deletion exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var log = u.Logger("bitswap")
// provided NetMessage service
func NetMessageSession(parent context.Context, p peer.Peer,
net inet.Network, srv inet.Service, directory bsnet.Routing,
d ds.Datastore, nice bool) exchange.Interface {
d ds.ThreadSafeDatastore, nice bool) exchange.Interface {

networkAdapter := bsnet.NetMessageAdapter(srv, net, nil)
bs := &bitswap{
Expand Down
3 changes: 2 additions & 1 deletion exchange/bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
bstore "github.com/jbenet/go-ipfs/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
Expand Down Expand Up @@ -279,7 +280,7 @@ func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
adapter := net.Adapter(p)
htc := rs.Client(p)

blockstore := bstore.NewBlockstore(ds.NewMapDatastore())
blockstore := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
const alwaysSendToPeer = true
bs := &bitswap{
blockstore: blockstore,
Expand Down
21 changes: 1 addition & 20 deletions exchange/bitswap/strategy/ledger.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package strategy

import (
"sync"
"time"

peer "github.com/jbenet/go-ipfs/peer"
Expand All @@ -21,9 +20,8 @@ func newLedger(p peer.Peer, strategy strategyFunc) *ledger {
}

// ledger stores the data exchange relationship between two peers.
// NOT threadsafe
type ledger struct {
lock sync.RWMutex

// Partner is the remote Peer.
Partner peer.Peer

Expand All @@ -46,48 +44,31 @@ type ledger struct {
}

func (l *ledger) ShouldSend() bool {
l.lock.Lock()
defer l.lock.Unlock()

return l.Strategy(l)
}

func (l *ledger) SentBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()

l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesSent += uint64(n)
}

func (l *ledger) ReceivedBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()

l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesRecv += uint64(n)
}

// TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(k u.Key) {
l.lock.Lock()
defer l.lock.Unlock()

l.wantList[k] = struct{}{}
}

func (l *ledger) WantListContains(k u.Key) bool {
l.lock.RLock()
defer l.lock.RUnlock()

_, ok := l.wantList[k]
return ok
}

func (l *ledger) ExchangeCount() uint64 {
l.lock.RLock()
defer l.lock.RUnlock()
return l.exchangeCount
}
22 changes: 0 additions & 22 deletions exchange/bitswap/strategy/ledger_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1 @@
package strategy

import (
"sync"
"testing"
)

func TestRaceConditions(t *testing.T) {
const numberOfExpectedExchanges = 10000
l := new(ledger)
var wg sync.WaitGroup
for i := 0; i < numberOfExpectedExchanges; i++ {
wg.Add(1)
go func() {
defer wg.Done()
l.ReceivedBytes(1)
}()
}
wg.Wait()
if l.ExchangeCount() != numberOfExpectedExchanges {
t.Fail()
}
}
26 changes: 26 additions & 0 deletions exchange/bitswap/strategy/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package strategy

import (
"errors"
"sync"

bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
"github.com/jbenet/go-ipfs/peer"
Expand All @@ -26,6 +27,7 @@ func New(nice bool) Strategy {
}

type strategist struct {
lock sync.RWMutex
ledgerMap
strategyFunc
}
Expand All @@ -38,6 +40,9 @@ type peerKey u.Key

// Peers returns a list of peers
func (s *strategist) Peers() []peer.Peer {
s.lock.RLock()
defer s.lock.RUnlock()

response := make([]peer.Peer, 0)
for _, ledger := range s.ledgerMap {
response = append(response, ledger.Partner)
Expand All @@ -46,20 +51,32 @@ func (s *strategist) Peers() []peer.Peer {
}

func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
s.lock.RLock()
defer s.lock.RUnlock()

ledger := s.ledger(p)
return ledger.WantListContains(k)
}

func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool {
s.lock.RLock()
defer s.lock.RUnlock()

ledger := s.ledger(p)
return ledger.ShouldSend()
}

func (s *strategist) Seed(int64) {
s.lock.Lock()
defer s.lock.Unlock()

// TODO
}

func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
s.lock.Lock()
defer s.lock.Unlock()

// TODO find a more elegant way to handle this check
if p == nil {
return errors.New("Strategy received nil peer")
Expand All @@ -85,6 +102,9 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error
// send happen atomically

func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
s.lock.Lock()
defer s.lock.Unlock()

l := s.ledger(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data))
Expand All @@ -96,10 +116,16 @@ func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
}

func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 {
s.lock.RLock()
defer s.lock.RUnlock()

return s.ledger(p).Accounting.BytesSent
}

func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 {
s.lock.RLock()
defer s.lock.RUnlock()

return s.ledger(p).Accounting.BytesRecv
}

Expand Down

0 comments on commit f73d632

Please sign in to comment.