Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize alive message over other messages #159

Merged
merged 3 commits into from
Aug 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ multiple routes.
package memberlist

import (
"container/list"
"fmt"
"log"
"net"
Expand Down Expand Up @@ -46,7 +47,11 @@ type Memberlist struct {
leaveLock sync.Mutex // Serializes calls to Leave

transport Transport
handoff chan msgHandoff

handoffCh chan struct{}
highPriorityMsgQueue *list.List
lowPriorityMsgQueue *list.List
msgQueueLock sync.Mutex

nodeLock sync.RWMutex
nodes []*nodeState // Known nodes
Expand Down Expand Up @@ -161,17 +166,19 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
}

m := &Memberlist{
config: conf,
shutdownCh: make(chan struct{}),
leaveBroadcast: make(chan struct{}, 1),
transport: transport,
handoff: make(chan msgHandoff, conf.HandoffQueueDepth),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
ackHandlers: make(map[uint32]*ackHandler),
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
logger: logger,
config: conf,
shutdownCh: make(chan struct{}),
leaveBroadcast: make(chan struct{}, 1),
transport: transport,
handoffCh: make(chan struct{}, 1),
highPriorityMsgQueue: list.New(),
lowPriorityMsgQueue: list.New(),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
ackHandlers: make(map[uint32]*ackHandler),
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
logger: logger,
}
m.broadcasts.NumNodes = func() int {
return m.estNumNodes()
Expand Down
33 changes: 16 additions & 17 deletions memberlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/miekg/dns"
"github.com/stretchr/testify/require"
)

var bindLock sync.Mutex
Expand Down Expand Up @@ -424,12 +425,15 @@ func TestMemberList_ResolveAddr_TCP_First(t *testing.T) {
}
port := uint16(m.config.BindPort)
expected := []ipPort{
ipPort{net.ParseIP("127.0.0.1"), port},
// Go now parses IPs like this and returns IP4-mapped IPv6 address.
// Confusingly if you print it you see the same as the input since
// IP.String converts IP4-mapped addresses back to dotted decimal notation
// but the underlying IP bytes don't compare as equal to the actual IPv4
// bytes the resolver will get from DNS.
ipPort{net.ParseIP("127.0.0.1").To4(), port},
ipPort{net.ParseIP("2001:db8:a0b:12f0::1"), port},
}
if !reflect.DeepEqual(ips, expected) {
t.Fatalf("bad: %#v expected: %#v", ips, expected)
}
require.Equal(t, expected, ips)
}
}

Expand Down Expand Up @@ -894,12 +898,14 @@ func TestMemberlist_UserData(t *testing.T) {
m1.schedule()
defer m1.Shutdown()

// Create a second delegate with things to send
d2 := &MockDelegate{}
d2.broadcasts = [][]byte{
bcasts := [][]byte{
[]byte("test"),
[]byte("foobar"),
}

// Create a second delegate with things to send
d2 := &MockDelegate{}
d2.broadcasts = bcasts
d2.state = []byte("my state")

// Create a second node
Expand Down Expand Up @@ -933,16 +939,9 @@ func TestMemberlist_UserData(t *testing.T) {
// Wait for a little while
time.Sleep(3 * time.Millisecond)

// Ensure we got the messages
if len(d1.msgs) != 2 {
t.Fatalf("should have 2 messages!")
}
if !reflect.DeepEqual(d1.msgs[0], []byte("test")) {
t.Fatalf("bad msg %v", d1.msgs[0])
}
if !reflect.DeepEqual(d1.msgs[1], []byte("foobar")) {
t.Fatalf("bad msg %v", d1.msgs[1])
}
// Ensure we got the messages. Ordering of messages is not guaranteed so just
// check we got them both in either order.
require.ElementsMatch(t, bcasts, d1.msgs)

// Check the push/pull state
if !reflect.DeepEqual(d1.remoteState, []byte("my state")) {
Expand Down
76 changes: 57 additions & 19 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,39 +369,77 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
case deadMsg:
fallthrough
case userMsg:
// Determine the message queue, prioritize alive
queue := m.lowPriorityMsgQueue
if msgType == aliveMsg {
queue = m.highPriorityMsgQueue
}

// Check for overflow and append if not full
m.msgQueueLock.Lock()
if queue.Len() >= m.config.HandoffQueueDepth {
m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
} else {
queue.PushBack(msgHandoff{msgType, buf, from})
}
m.msgQueueLock.Unlock()

// Notify of pending message
select {
case m.handoff <- msgHandoff{msgType, buf, from}:
case m.handoffCh <- struct{}{}:
default:
m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
}

default:
m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from))
}
}

// getNextMessage returns the next message to process in priority order, using LIFO
func (m *Memberlist) getNextMessage() (msgHandoff, bool) {
m.msgQueueLock.Lock()
defer m.msgQueueLock.Unlock()

if el := m.highPriorityMsgQueue.Back(); el != nil {
m.highPriorityMsgQueue.Remove(el)
msg := el.Value.(msgHandoff)
return msg, true
} else if el := m.lowPriorityMsgQueue.Back(); el != nil {
m.lowPriorityMsgQueue.Remove(el)
msg := el.Value.(msgHandoff)
return msg, true
}
return msgHandoff{}, false
}

// packetHandler is a long running goroutine that processes messages received
// over the packet interface, but is decoupled from the listener to avoid
// blocking the listener which may cause ping/ack messages to be delayed.
func (m *Memberlist) packetHandler() {
for {
select {
case msg := <-m.handoff:
msgType := msg.msgType
buf := msg.buf
from := msg.from

switch msgType {
case suspectMsg:
m.handleSuspect(buf, from)
case aliveMsg:
m.handleAlive(buf, from)
case deadMsg:
m.handleDead(buf, from)
case userMsg:
m.handleUser(buf, from)
default:
m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
case <-m.handoffCh:
for {
msg, ok := m.getNextMessage()
if !ok {
break
}
msgType := msg.msgType
buf := msg.buf
from := msg.from

switch msgType {
case suspectMsg:
m.handleSuspect(buf, from)
case aliveMsg:
m.handleAlive(buf, from)
case deadMsg:
m.handleDead(buf, from)
case userMsg:
m.handleUser(buf, from)
default:
m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
}
}

case <-m.shutdownCh:
Expand Down Expand Up @@ -1106,7 +1144,7 @@ func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time
}

if ack.SeqNo != ping.SeqNo {
return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn))
return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo)
}

return true, nil
Expand Down
15 changes: 10 additions & 5 deletions transport_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package memberlist

import (
"bytes"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestTransport_Join(t *testing.T) {
Expand Down Expand Up @@ -116,9 +117,13 @@ func TestTransport_Send(t *testing.T) {
}
time.Sleep(100 * time.Millisecond)

received := bytes.Join(d1.msgs, []byte("|"))
expected := []byte("SendTo|SendToUDP|SendToTCP|SendBestEffort|SendReliable")
if !bytes.Equal(received, expected) {
t.Fatalf("bad: %s", received)
expected := []string{"SendTo", "SendToUDP", "SendToTCP", "SendBestEffort", "SendReliable"}

received := make([]string, len(d1.msgs))
for i, bs := range d1.msgs {
received[i] = string(bs)
}
// Some of these are UDP so often get re-ordered making the test flaky if we
// assert send ordering. Sort both slices to be tolerant of re-ordering.
require.ElementsMatch(t, expected, received)
}