Skip to content

Commit

Permalink
refactor(net:message) add NetMessage interface
Browse files Browse the repository at this point in the history
* Design Goal: reduce coupling
* NB: Slices hold references to an underlying array, and if you assign
  one slice to another, both refer to the same array. If a function
  takes a slice argument, changes it makes to the elements of the slice
  will be visible to the caller, analogous to passing a pointer to the
  underlying array.
  • Loading branch information
Brian Tiger Chow committed Sep 13, 2014
1 parent 71a19e1 commit 4c95eb1
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 57 deletions.
41 changes: 29 additions & 12 deletions net/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,55 @@ import (
proto "code.google.com/p/goprotobuf/proto"
)

// Message represents a packet of information sent to or received from a
type NetMessage interface {
Peer() *peer.Peer
Data() []byte
}

func New(p *peer.Peer, data []byte) NetMessage {
return &message{peer: p, data: data}
}

// message represents a packet of information sent to or received from a
// particular Peer.
type Message struct {
type message struct {
// To or from, depending on direction.
Peer *peer.Peer
peer *peer.Peer

// Opaque data
Data []byte
data []byte
}

func (m *message) Peer() *peer.Peer {
return m.peer
}

func (m *message) Data() []byte {
return m.data
}

// FromObject creates a message from a protobuf-marshallable message.
func FromObject(p *peer.Peer, data proto.Message) (*Message, error) {
func FromObject(p *peer.Peer, data proto.Message) (*message, error) {
bytes, err := proto.Marshal(data)
if err != nil {
return nil, err
}
return &Message{
Peer: p,
Data: bytes,
return &message{
peer: p,
data: bytes,
}, nil
}

// Pipe objects represent a bi-directional message channel.
type Pipe struct {
Incoming chan *Message
Outgoing chan *Message
Incoming chan NetMessage
Outgoing chan NetMessage
}

// NewPipe constructs a pipe with channels of a given buffer size.
func NewPipe(bufsize int) *Pipe {
return &Pipe{
Incoming: make(chan *Message, bufsize),
Outgoing: make(chan *Message, bufsize),
Incoming: make(chan NetMessage, bufsize),
Outgoing: make(chan NetMessage, bufsize),
}
}
12 changes: 6 additions & 6 deletions net/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ func (m *Muxer) handleIncomingMessages(ctx context.Context) {
}

// handleIncomingMessage routes message to the appropriate protocol.
func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 *msg.Message) {
func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 msg.NetMessage) {

data, pid, err := unwrapData(m1.Data)
data, pid, err := unwrapData(m1.Data())
if err != nil {
u.PErr("muxer de-serializing error: %v\n", err)
return
}

m2 := &msg.Message{Peer: m1.Peer, Data: data}
m2 := msg.New(m1.Peer(), data)
proto, found := m.Protocols[pid]
if !found {
u.PErr("muxer unknown protocol %v\n", pid)
Expand Down Expand Up @@ -125,14 +125,14 @@ func (m *Muxer) handleOutgoingMessages(ctx context.Context, pid ProtocolID, prot
}

// handleOutgoingMessage wraps out a message and sends it out the
func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 *msg.Message) {
data, err := wrapData(m1.Data, pid)
func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 msg.NetMessage) {
data, err := wrapData(m1.Data(), pid)
if err != nil {
u.PErr("muxer serializing error: %v\n", err)
return
}

m2 := &msg.Message{Peer: m1.Peer, Data: data}
m2 := msg.New(m1.Peer(), data)
select {
case m.GetPipe().Outgoing <- m2:
case <-ctx.Done():
Expand Down
34 changes: 17 additions & 17 deletions net/mux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ func newPeer(t *testing.T, id string) *peer.Peer {
return &peer.Peer{ID: peer.ID(mh)}
}

func testMsg(t *testing.T, m *msg.Message, data []byte) {
if !bytes.Equal(data, m.Data) {
t.Errorf("Data does not match: %v != %v", data, m.Data)
func testMsg(t *testing.T, m msg.NetMessage, data []byte) {
if !bytes.Equal(data, m.Data()) {
t.Errorf("Data does not match: %v != %v", data, m.Data())
}
}

func testWrappedMsg(t *testing.T, m *msg.Message, pid ProtocolID, data []byte) {
data2, pid2, err := unwrapData(m.Data)
func testWrappedMsg(t *testing.T, m msg.NetMessage, pid ProtocolID, data []byte) {
data2, pid2, err := unwrapData(m.Data())
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestSimpleMuxer(t *testing.T) {

// test outgoing p1
for _, s := range []string{"foo", "bar", "baz"} {
p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
p1.Outgoing <- msg.New(peer1, []byte(s))
testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s))
}

Expand All @@ -86,13 +86,13 @@ func TestSimpleMuxer(t *testing.T) {
if err != nil {
t.Error(err)
}
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
mux1.Incoming <- msg.New(peer1, d)
testMsg(t, <-p1.Incoming, []byte(s))
}

// test outgoing p2
for _, s := range []string{"foo", "bar", "baz"} {
p2.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
p2.Outgoing <- msg.New(peer1, []byte(s))
testWrappedMsg(t, <-mux1.Outgoing, pid2, []byte(s))
}

Expand All @@ -102,7 +102,7 @@ func TestSimpleMuxer(t *testing.T) {
if err != nil {
t.Error(err)
}
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
mux1.Incoming <- msg.New(peer1, d)
testMsg(t, <-p2.Incoming, []byte(s))
}
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestSimultMuxer(t *testing.T) {
for i := 0; i < size; i++ {
<-limiter
s := fmt.Sprintf("proto %v out %v", pid, i)
m := &msg.Message{Peer: peer1, Data: []byte(s)}
m := msg.New(peer1, []byte(s))
mux1.Protocols[pid].GetPipe().Outgoing <- m
counts[pid][0][0]++
u.DOut("sent %v\n", s)
Expand All @@ -156,7 +156,7 @@ func TestSimultMuxer(t *testing.T) {
t.Error(err)
}

m := &msg.Message{Peer: peer1, Data: d}
m := msg.New(peer1, d)
mux1.Incoming <- m
counts[pid][1][0]++
u.DOut("sent %v\n", s)
Expand All @@ -167,7 +167,7 @@ func TestSimultMuxer(t *testing.T) {
for {
select {
case m := <-mux1.Outgoing:
data, pid, err := unwrapData(m.Data)
data, pid, err := unwrapData(m.Data())
if err != nil {
t.Error(err)
}
Expand All @@ -186,7 +186,7 @@ func TestSimultMuxer(t *testing.T) {
select {
case m := <-mux1.Protocols[pid].GetPipe().Incoming:
counts[pid][0][1]++
u.DOut("got %v\n", string(m.Data))
u.DOut("got %v\n", string(m.Data()))
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestStopping(t *testing.T) {

// test outgoing p1
for _, s := range []string{"foo", "bar", "baz"} {
p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
p1.Outgoing <- msg.New(peer1, []byte(s))
testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s))
}

Expand All @@ -249,7 +249,7 @@ func TestStopping(t *testing.T) {
if err != nil {
t.Error(err)
}
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
mux1.Incoming <- msg.New(peer1, d)
testMsg(t, <-p1.Incoming, []byte(s))
}

Expand All @@ -260,7 +260,7 @@ func TestStopping(t *testing.T) {

// test outgoing p1
for _, s := range []string{"foo", "bar", "baz"} {
p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
p1.Outgoing <- msg.New(peer1, []byte(s))
select {
case <-mux1.Outgoing:
t.Error("should not have received anything.")
Expand All @@ -274,7 +274,7 @@ func TestStopping(t *testing.T) {
if err != nil {
t.Error(err)
}
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
mux1.Incoming <- msg.New(peer1, d)
select {
case <-p1.Incoming:
t.Error("should not have received anything.")
Expand Down
4 changes: 2 additions & 2 deletions net/service/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Request struct {
PeerID peer.ID

// Response is the channel of incoming responses.
Response chan *msg.Message
Response chan msg.NetMessage
}

// NewRequest creates a request for given peer.ID
Expand All @@ -88,7 +88,7 @@ func NewRequest(pid peer.ID) (*Request, error) {
return &Request{
ID: id,
PeerID: pid,
Response: make(chan *msg.Message, 1),
Response: make(chan msg.NetMessage, 1),
}, nil
}

Expand Down
20 changes: 10 additions & 10 deletions net/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Handler interface {

// HandleMessage receives an incoming message, and potentially returns
// a response message to send back.
HandleMessage(context.Context, *msg.Message) (*msg.Message, error)
HandleMessage(context.Context, msg.NetMessage) (msg.NetMessage, error)
}

// Service is a networking component that protocols can use to multiplex
Expand Down Expand Up @@ -69,16 +69,16 @@ func (s *Service) Stop() {
}

// SendMessage sends a message out
func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID) error {
func (s *Service) SendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error {

// serialize ServiceMessage wrapper
data, err := wrapData(m.Data, rid)
data, err := wrapData(m.Data(), rid)
if err != nil {
return err
}

// send message
m2 := &msg.Message{Peer: m.Peer, Data: data}
m2 := msg.New(m.Peer(), data)
select {
case s.Outgoing <- m2:
case <-ctx.Done():
Expand All @@ -89,10 +89,10 @@ func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID
}

// SendRequest sends a request message out and awaits a response.
func (s *Service) SendRequest(ctx context.Context, m *msg.Message) (*msg.Message, error) {
func (s *Service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {

// create a request
r, err := NewRequest(m.Peer.ID)
r, err := NewRequest(m.Peer().ID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -145,14 +145,14 @@ func (s *Service) handleIncomingMessages(ctx context.Context) {
}
}

func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) {
func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {

// unwrap the incoming message
data, rid, err := unwrapData(m.Data)
data, rid, err := unwrapData(m.Data())
if err != nil {
u.PErr("de-serializing error: %v\n", err)
}
m2 := &msg.Message{Peer: m.Peer, Data: data}
m2 := msg.New(m.Peer(), data)

// if it's a request (or has no RequestID), handle it
if rid == nil || rid.IsRequest() {
Expand All @@ -177,7 +177,7 @@ func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) {
u.PErr("RequestID should identify a response here.\n")
}

key := RequestKey(m.Peer.ID, RequestID(rid))
key := RequestKey(m.Peer().ID, RequestID(rid))
s.RequestsLock.RLock()
r, found := s.Requests[key]
s.RequestsLock.RUnlock()
Expand Down
20 changes: 10 additions & 10 deletions net/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
// ReverseHandler reverses all Data it receives and sends it back.
type ReverseHandler struct{}

func (t *ReverseHandler) HandleMessage(ctx context.Context, m *msg.Message) (
*msg.Message, error) {
func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) (
msg.NetMessage, error) {

d := m.Data
d := m.Data()
for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 {
d[i], d[j] = d[j], d[i]
}

return &msg.Message{Peer: m.Peer, Data: d}, nil
return msg.New(m.Peer(), d), nil
}

func newPeer(t *testing.T, id string) *peer.Peer {
Expand All @@ -47,11 +47,11 @@ func TestServiceHandler(t *testing.T) {
t.Error(err)
}

m1 := &msg.Message{Peer: peer1, Data: d}
m1 := msg.New(peer1, d)
s.Incoming <- m1
m2 := <-s.Outgoing

d, rid, err := unwrapData(m2.Data)
d, rid, err := unwrapData(m2.Data())
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -85,14 +85,14 @@ func TestServiceRequest(t *testing.T) {
}
}()

m1 := &msg.Message{Peer: peer1, Data: []byte("beep")}
m1 := msg.New(peer1, []byte("beep"))
m2, err := s1.SendRequest(ctx, m1)
if err != nil {
t.Error(err)
}

if !bytes.Equal(m2.Data, []byte("peeb")) {
t.Errorf("service handler data incorrect: %v != %v", m2.Data, "oof")
if !bytes.Equal(m2.Data(), []byte("peeb")) {
t.Errorf("service handler data incorrect: %v != %v", m2.Data(), "oof")
}
}

Expand All @@ -117,7 +117,7 @@ func TestServiceRequestTimeout(t *testing.T) {
}
}()

m1 := &msg.Message{Peer: peer1, Data: []byte("beep")}
m1 := msg.New(peer1, []byte("beep"))
m2, err := s1.SendRequest(ctx, m1)
if err == nil || m2 != nil {
t.Error("should've timed out")
Expand Down

1 comment on commit 4c95eb1

@jbenet
Copy link
Member

@jbenet jbenet commented on 4c95eb1 Sep 13, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea!

Please sign in to comment.