Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/ipfs/go-ipfs into iss653
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesvp committed May 6, 2015
2 parents 50e3b15 + 0d521ff commit 99f15e6
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 162 deletions.
46 changes: 21 additions & 25 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package bitswap

import (
"errors"
"fmt"
"math"
"sync"
"time"
Expand Down Expand Up @@ -324,47 +325,32 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
}

// TODO(brian): handle errors
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) {
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error {
defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()

if p == "" {
log.Debug("Received message from nil peer!")
// TODO propagate the error upward
return "", nil
}
if incoming == nil {
log.Debug("Got nil bitswap message!")
// TODO propagate the error upward
return "", nil
}

// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(p, incoming)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger

var keys []u.Key
for _, block := range incoming.Blocks() {
bs.blocksRecvd++
if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
bs.dupBlocksRecvd++
}
log.Debugf("got block %s from %s", block, p)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil {
log.Debug(err)
return fmt.Errorf("ReceiveMessage HasBlock error: %s", err)
}
cancel()
}

var keys []u.Key
for _, block := range incoming.Blocks() {
keys = append(keys, block.Key())
}
bs.cancelBlocks(ctx, keys)

// TODO: consider changing this function to not return anything
return "", nil
bs.cancelBlocks(ctx, keys)
return nil
}

// Connected/Disconnected warns bitswap about peer connections
Expand All @@ -391,14 +377,24 @@ func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
message := bsmsg.New()
message.SetFull(false)
for _, k := range bkeys {
log.Debug("cancel block: %s", k)
message.Cancel(k)
}

wg := sync.WaitGroup{}
for _, p := range bs.engine.Peers() {
err := bs.send(ctx, p, message)
if err != nil {
log.Debugf("Error sending message: %s", err)
}
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := bs.send(ctx, p, message)
if err != nil {
log.Warningf("Error sending message: %s", err)
return
}
}(p)
}
wg.Wait()
return
}

func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
Expand Down
11 changes: 3 additions & 8 deletions exchange/bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ type BitSwapNetwork interface {
peer.ID,
bsmsg.BitSwapMessage) error

// SendRequest sends a BitSwap message to a peer and waits for a response.
SendRequest(
context.Context,
peer.ID,
bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error)

// SetDelegate registers the Reciver to handle messages received from the
// network.
SetDelegate(Receiver)
Expand All @@ -35,8 +29,9 @@ type BitSwapNetwork interface {
// Implement Receiver to receive messages from the BitSwapNetwork
type Receiver interface {
ReceiveMessage(
ctx context.Context, sender peer.ID, incoming bsmsg.BitSwapMessage) (
destination peer.ID, outgoing bsmsg.BitSwapMessage)
ctx context.Context,
sender peer.ID,
incoming bsmsg.BitSwapMessage) error

ReceiveError(error)

Expand Down
70 changes: 8 additions & 62 deletions exchange/bitswap/testnet/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,6 @@ import (
testutil "github.com/ipfs/go-ipfs/util/testutil"
)

func TestSendRequestToCooperativePeer(t *testing.T) {
net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))

recipientPeer := testutil.RandIdentityOrFatal(t)

t.Log("Get two network adapters")

initiator := net.Adapter(testutil.RandIdentityOrFatal(t))
recipient := net.Adapter(recipientPeer)

expectedStr := "response from recipient"
recipient.SetDelegate(lambda(func(
ctx context.Context,
from peer.ID,
incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) {

t.Log("Recipient received a message from the network")

// TODO test contents of incoming message

m := bsmsg.New()
m.AddBlock(blocks.NewBlock([]byte(expectedStr)))

return from, m
}))

t.Log("Build a message and send a synchronous request to recipient")

message := bsmsg.New()
message.AddBlock(blocks.NewBlock([]byte("data")))
response, err := initiator.SendRequest(
context.Background(), recipientPeer.ID(), message)
if err != nil {
t.Fatal(err)
}

t.Log("Check the contents of the response from recipient")

if response == nil {
t.Fatal("Should have received a response")
}

for _, blockFromRecipient := range response.Blocks() {
if string(blockFromRecipient.Data) == expectedStr {
return
}
}
t.Fatal("Should have returned after finding expected block data")
}

func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
responderPeer := testutil.RandIdentityOrFatal(t)
Expand All @@ -80,20 +29,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
responder.SetDelegate(lambda(func(
ctx context.Context,
fromWaiter peer.ID,
msgFromWaiter bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) {
msgFromWaiter bsmsg.BitSwapMessage) error {

msgToWaiter := bsmsg.New()
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
waiter.SendMessage(ctx, fromWaiter, msgToWaiter)

return fromWaiter, msgToWaiter
return nil
}))

waiter.SetDelegate(lambda(func(
ctx context.Context,
fromResponder peer.ID,
msgFromResponder bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) {
msgFromResponder bsmsg.BitSwapMessage) error {

// TODO assert that this came from the correct peer and that the message contents are as expected
ok := false
Expand All @@ -108,7 +56,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
t.Fatal("Message not received from the responder")

}
return "", nil
return nil
}))

messageSentAsync := bsmsg.New()
Expand All @@ -123,7 +71,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
}

type receiverFunc func(ctx context.Context, p peer.ID,
incoming bsmsg.BitSwapMessage) (peer.ID, bsmsg.BitSwapMessage)
incoming bsmsg.BitSwapMessage) error

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
Expand All @@ -133,13 +81,11 @@ func lambda(f receiverFunc) bsnet.Receiver {
}

type lambdaImpl struct {
f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage)
f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p peer.ID, incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) {
p peer.ID, incoming bsmsg.BitSwapMessage) error {
return lam.f(ctx, p, incoming)
}

Expand Down
63 changes: 1 addition & 62 deletions exchange/bitswap/testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,61 +72,7 @@ func (n *network) deliver(

n.delay.Wait()

nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)

if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
return errors.New("Malformed client request")
}

if nextPeer == "" && nextMsg == nil { // no response to send
return nil
}

nextReceiver, ok := n.clients[nextPeer]
if !ok {
return errors.New("Cannot locate peer on network")
}
go n.deliver(nextReceiver, nextPeer, nextMsg)
return nil
}

// TODO
func (n *network) SendRequest(
ctx context.Context,
from peer.ID,
to peer.ID,
message bsmsg.BitSwapMessage) (
incoming bsmsg.BitSwapMessage, err error) {

r, ok := n.clients[to]
if !ok {
return nil, errors.New("Cannot locate peer on network")
}
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)

// TODO dedupe code
if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
r.ReceiveError(errors.New("Malformed client request"))
return nil, nil
}

// TODO dedupe code
if nextPeer == "" && nextMsg == nil {
return nil, nil
}

// TODO test when receiver doesn't immediately respond to the initiator of the request
if nextPeer != from {
go func() {
nextReceiver, ok := n.clients[nextPeer]
if !ok {
// TODO log the error?
}
n.deliver(nextReceiver, nextPeer, nextMsg)
}()
return nil, nil
}
return nextMsg, nil
return r.ReceiveMessage(context.TODO(), from, message)
}

type networkClient struct {
Expand All @@ -143,13 +89,6 @@ func (nc *networkClient) SendMessage(
return nc.network.SendMessage(ctx, nc.local, to, message)
}

func (nc *networkClient) SendRequest(
ctx context.Context,
to peer.ID,
message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) {
return nc.network.SendRequest(ctx, nc.local, to, message)
}

// FindProvidersAsync returns a channel of providers for the given key
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {

Expand Down
2 changes: 1 addition & 1 deletion importer/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var BlockSizeLimit = 1048576 // 1 MB
// rough estimates on expected sizes
var roughDataBlockSize = chunk.DefaultBlockSize
var roughLinkBlockSize = 1 << 13 // 8KB
var roughLinkSize = 258 + 8 + 5 // sha256 multihash + size + no name + protobuf framing
var roughLinkSize = 34 + 8 + 5 // sha256 multihash + size + no name + protobuf framing

// DefaultLinksPerBlock governs how the importer decides how many links there
// will be per block. This calculation is based on expected distributions of:
Expand Down
6 changes: 3 additions & 3 deletions p2p/crypto/secio/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) e
// =============================================================================
// step 1.2 Selection -- select/agree on best encryption parameters

// to determine order, use cmp(H(lr||rpk), H(rr||lpk)).
// to determine order, use cmp(H(remote_pubkey||local_rand), H(local_pubkey||remote_rand)).
oh1 := u.Hash(append(proposeIn.GetPubkey(), nonceOut...))
oh2 := u.Hash(append(myPubKeyBytes, proposeIn.GetRand()...))
order := bytes.Compare(oh1, oh2)
Expand Down Expand Up @@ -203,7 +203,7 @@ func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) e
return err
}

// Receive + Parse their Propose packet and generate an Exchange packet.
// Receive + Parse their Exchange packet.
exchangeIn := new(pb.Exchange)
if _, err := readMsgCtx(ctx, s.insecureM, exchangeIn); err != nil {
return err
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) e
// log.Debug("2.3 mac + cipher.")

// =============================================================================
// step 3. Finish -- send expected message (the nonces), verify encryption works
// step 3. Finish -- send expected message to verify encryption works (send local nonce)

// setup ETM ReadWriter
w := NewETMWriter(s.insecure, s.local.cipher, s.local.mac)
Expand Down
2 changes: 1 addition & 1 deletion test/sharness/t0040-add-and-cat.sh
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ test_expect_success EXPENSIVE "ipfs add bigfile succeeds" '
'

test_expect_success EXPENSIVE "ipfs add bigfile output looks good" '
HASH="QmSVxWkYfbJ3cowQUUgF4iF4CQd92vubxw7bs2aZAVRUD9" &&
HASH="QmU9SWAPPmNEKZB8umYMmjYvN7VyHqABNvdA6GUi4MMEz3" &&
echo "added $HASH mountdir/bigfile" >expected &&
test_cmp expected actual
'
Expand Down

0 comments on commit 99f15e6

Please sign in to comment.