Skip to content

Commit

Permalink
refactor(dht/pb) move proto to pb package
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Tiger Chow committed Oct 24, 2014
1 parent bd5a1c0 commit 3bf7462
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 84 deletions.
11 changes: 0 additions & 11 deletions routing/dht/Makefile

This file was deleted.

44 changes: 22 additions & 22 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
inet "github.com/jbenet/go-ipfs/net"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"

"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
)

var log = u.Logger("dht")
Expand Down Expand Up @@ -128,7 +128,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
}

// deserialize msg
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mData, pmes)
if err != nil {
log.Error("Error unmarshaling data")
Expand All @@ -140,7 +140,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N

// Print out diagnostic
log.Debug("[peer: %s] Got message type: '%s' [from = %s]\n",
dht.self, Message_MessageType_name[int32(pmes.GetType())], mPeer)
dht.self, pb.Message_MessageType_name[int32(pmes.GetType())], mPeer)

// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
Expand Down Expand Up @@ -174,7 +174,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N

// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {

mes, err := msg.FromObject(p, pmes)
if err != nil {
Expand All @@ -185,7 +185,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)

// Print out diagnostic
log.Debug("Sent message type: '%s' [to = %s]",
Message_MessageType_name[int32(pmes.GetType())], p)
pb.Message_MessageType_name[int32(pmes.GetType())], p)

rmes, err := dht.sender.SendRequest(ctx, mes)
if err != nil {
Expand All @@ -198,7 +198,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
rtt := time.Since(start)
rmes.Peer().SetLatency(rtt)

rpmes := new(Message)
rpmes := new(pb.Message)
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
return nil, err
}
Expand All @@ -210,7 +210,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
key string, value []byte) error {

pmes := newMessage(Message_PUT_VALUE, string(key), 0)
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes.Value = value
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
Expand All @@ -225,10 +225,10 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,

func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {

pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)

// add self as the provider
pmes.ProviderPeers = peersToPBPeers([]peer.Peer{dht.self})
pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self})

rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
Expand Down Expand Up @@ -290,9 +290,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
key u.Key, level int) (*Message, error) {
key u.Key, level int) (*pb.Message, error) {

pmes := newMessage(Message_GET_VALUE, string(key), level)
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}

Expand All @@ -301,7 +301,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
peerlist []*Message_Peer, level int) ([]byte, error) {
peerlist []*pb.Message_Peer, level int) ([]byte, error) {

for _, pinfo := range peerlist {
p, err := dht.ensureConnectedToPeer(pinfo)
Expand Down Expand Up @@ -379,8 +379,8 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
return nil, nil
}

func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*Message, error) {
pmes := newMessage(Message_FIND_NODE, string(id), level)
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
return dht.sendRequest(ctx, p, pmes)
}

Expand All @@ -390,13 +390,13 @@ func (dht *IpfsDHT) printTables() {
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) {
pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}

// TODO: Could be done async
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer {
var provArr []peer.Peer
for _, prov := range peers {
p, err := dht.peerFromInfo(prov)
Expand All @@ -420,7 +420,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
level := pmes.GetClusterLevel()
cluster := dht.routingTables[level]

Expand All @@ -430,7 +430,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
}

// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []peer.Peer {
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand Down Expand Up @@ -469,7 +469,7 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
return p, nil
}

func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {

id := peer.ID(pbp.GetId())

Expand All @@ -492,7 +492,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
return p, nil
}

func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error) {
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *pb.Message_Peer) (peer.Peer, error) {
p, err := dht.peerFromInfo(pbp)
if err != nil {
return nil, err
Expand Down
27 changes: 14 additions & 13 deletions routing/dht/ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"

"time"
Expand Down Expand Up @@ -127,13 +128,13 @@ func TestGetFailures(t *testing.T) {
// u.POut("NotFound Test\n")
// Reply with failures to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}

resp := &Message{
resp := &pb.Message{
Type: pmes.Type,
}
m, err := msg.FromObject(mes.Peer(), resp)
Expand All @@ -153,9 +154,9 @@ func TestGetFailures(t *testing.T) {

fs.handlers = nil
// Now we test this DHT's handleGetValue failure
typ := Message_GET_VALUE
typ := pb.Message_GET_VALUE
str := "hello"
req := Message{
req := pb.Message{
Type: &typ,
Key: &str,
Value: []byte{0},
Expand All @@ -169,7 +170,7 @@ func TestGetFailures(t *testing.T) {

mes = d.HandleMessage(ctx, mes)

pmes := new(Message)
pmes := new(pb.Message)
err = proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -215,21 +216,21 @@ func TestNotFound(t *testing.T) {

// Reply with random peers to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}

switch pmes.GetType() {
case Message_GET_VALUE:
resp := &Message{Type: pmes.Type}
case pb.Message_GET_VALUE:
resp := &pb.Message{Type: pmes.Type}

peers := []peer.Peer{}
for i := 0; i < 7; i++ {
peers = append(peers, _randPeer())
}
resp.CloserPeers = peersToPBPeers(peers)
resp.CloserPeers = pb.PeersToPBPeers(peers)
mes, err := msg.FromObject(mes.Peer(), resp)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -282,17 +283,17 @@ func TestLessThanKResponses(t *testing.T) {

// Reply with random peers to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}

switch pmes.GetType() {
case Message_GET_VALUE:
resp := &Message{
case pb.Message_GET_VALUE:
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: peersToPBPeers([]peer.Peer{other}),
CloserPeers: pb.PeersToPBPeers([]peer.Peer{other}),
}

mes, err := msg.FromObject(mes.Peer(), resp)
Expand Down
Loading

0 comments on commit 3bf7462

Please sign in to comment.