diff --git a/vendor/github.com/hashicorp/memberlist/Makefile b/vendor/github.com/hashicorp/memberlist/Makefile index 891e8364ae06..e34a0818d7e9 100644 --- a/vendor/github.com/hashicorp/memberlist/Makefile +++ b/vendor/github.com/hashicorp/memberlist/Makefile @@ -16,4 +16,4 @@ deps: go get -d -v ./... echo $(DEPS) | xargs -n1 go get -d -.PNONY: test cov integ +.PHONY: test cov integ diff --git a/vendor/github.com/hashicorp/memberlist/README.md b/vendor/github.com/hashicorp/memberlist/README.md index 0adc075e815e..e6fed4e64981 100644 --- a/vendor/github.com/hashicorp/memberlist/README.md +++ b/vendor/github.com/hashicorp/memberlist/README.md @@ -65,7 +65,7 @@ For complete documentation, see the associated [Godoc](http://godoc.org/github.c ## Protocol -memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf). However, we extend the protocol in a number of ways: +memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://ieeexplore.ieee.org/document/1028914/). However, we extend the protocol in a number of ways: * Several extensions are made to increase propagation speed and convergence rate. diff --git a/vendor/github.com/hashicorp/memberlist/memberlist.go b/vendor/github.com/hashicorp/memberlist/memberlist.go index bdf333b436a4..bd8abd23f858 100644 --- a/vendor/github.com/hashicorp/memberlist/memberlist.go +++ b/vendor/github.com/hashicorp/memberlist/memberlist.go @@ -15,6 +15,7 @@ multiple routes. package memberlist import ( + "container/list" "fmt" "log" "net" @@ -34,6 +35,7 @@ type Memberlist struct { sequenceNum uint32 // Local sequence number incarnation uint32 // Local incarnation number numNodes uint32 // Number of known nodes (estimate) + pushPullReq uint32 // Number of push/pull requests config *Config shutdown int32 // Used as an atomic boolean value @@ -45,7 +47,11 @@ type Memberlist struct { leaveLock sync.Mutex // Serializes calls to Leave transport Transport - handoff chan msgHandoff + + handoffCh chan struct{} + highPriorityMsgQueue *list.List + lowPriorityMsgQueue *list.List + msgQueueLock sync.Mutex nodeLock sync.RWMutex nodes []*nodeState // Known nodes @@ -160,17 +166,19 @@ func newMemberlist(conf *Config) (*Memberlist, error) { } m := &Memberlist{ - config: conf, - shutdownCh: make(chan struct{}), - leaveBroadcast: make(chan struct{}, 1), - transport: transport, - handoff: make(chan msgHandoff, conf.HandoffQueueDepth), - nodeMap: make(map[string]*nodeState), - nodeTimers: make(map[string]*suspicion), - awareness: newAwareness(conf.AwarenessMaxMultiplier), - ackHandlers: make(map[uint32]*ackHandler), - broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, - logger: logger, + config: conf, + shutdownCh: make(chan struct{}), + leaveBroadcast: make(chan struct{}, 1), + transport: transport, + handoffCh: make(chan struct{}, 1), + highPriorityMsgQueue: list.New(), + lowPriorityMsgQueue: list.New(), + nodeMap: make(map[string]*nodeState), + nodeTimers: make(map[string]*suspicion), + awareness: newAwareness(conf.AwarenessMaxMultiplier), + ackHandlers: make(map[uint32]*ackHandler), + broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, + logger: logger, } m.broadcasts.NumNodes = func() int { return m.estNumNodes() @@ -639,7 +647,9 @@ func (m *Memberlist) Shutdown() error { // Shut down the transport first, which should block until it's // completely torn down. If we kill the memberlist-side handlers // those I/O handlers might get stuck. - m.transport.Shutdown() + if err := m.transport.Shutdown(); err != nil { + m.logger.Printf("[ERR] Failed to shutdown transport: %v", err) + } // Now tear down everything else. atomic.StoreInt32(&m.shutdown, 1) diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go index 58e1fce20884..f6a0d45fedb9 100644 --- a/vendor/github.com/hashicorp/memberlist/net.go +++ b/vendor/github.com/hashicorp/memberlist/net.go @@ -8,9 +8,10 @@ import ( "hash/crc32" "io" "net" + "sync/atomic" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" ) @@ -71,7 +72,8 @@ const ( compoundOverhead = 2 // Assumed overhead per entry in compoundHeader userMsgOverhead = 1 blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process - maxPushStateBytes = 10 * 1024 * 1024 + maxPushStateBytes = 20 * 1024 * 1024 + maxPushPullRequests = 128 // Maximum number of concurrent push/pull requests ) // ping request sent directly to node @@ -238,6 +240,16 @@ func (m *Memberlist) handleConn(conn net.Conn) { m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn)) } case pushPullMsg: + // Increment counter of pending push/pulls + numConcurrent := atomic.AddUint32(&m.pushPullReq, 1) + defer atomic.AddUint32(&m.pushPullReq, ^uint32(0)) + + // Check if we have too many open push/pull requests + if numConcurrent >= maxPushPullRequests { + m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests") + return + } + join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec) if err != nil { m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn)) @@ -357,10 +369,25 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim case deadMsg: fallthrough case userMsg: + // Determine the message queue, prioritize alive + queue := m.lowPriorityMsgQueue + if msgType == aliveMsg { + queue = m.highPriorityMsgQueue + } + + // Check for overflow and append if not full + m.msgQueueLock.Lock() + if queue.Len() >= m.config.HandoffQueueDepth { + m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) + } else { + queue.PushBack(msgHandoff{msgType, buf, from}) + } + m.msgQueueLock.Unlock() + + // Notify of pending message select { - case m.handoff <- msgHandoff{msgType, buf, from}: + case m.handoffCh <- struct{}{}: default: - m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) } default: @@ -368,28 +395,51 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim } } +// getNextMessage returns the next message to process in priority order, using LIFO +func (m *Memberlist) getNextMessage() (msgHandoff, bool) { + m.msgQueueLock.Lock() + defer m.msgQueueLock.Unlock() + + if el := m.highPriorityMsgQueue.Back(); el != nil { + m.highPriorityMsgQueue.Remove(el) + msg := el.Value.(msgHandoff) + return msg, true + } else if el := m.lowPriorityMsgQueue.Back(); el != nil { + m.lowPriorityMsgQueue.Remove(el) + msg := el.Value.(msgHandoff) + return msg, true + } + return msgHandoff{}, false +} + // packetHandler is a long running goroutine that processes messages received // over the packet interface, but is decoupled from the listener to avoid // blocking the listener which may cause ping/ack messages to be delayed. func (m *Memberlist) packetHandler() { for { select { - case msg := <-m.handoff: - msgType := msg.msgType - buf := msg.buf - from := msg.from - - switch msgType { - case suspectMsg: - m.handleSuspect(buf, from) - case aliveMsg: - m.handleAlive(buf, from) - case deadMsg: - m.handleDead(buf, from) - case userMsg: - m.handleUser(buf, from) - default: - m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) + case <-m.handoffCh: + for { + msg, ok := m.getNextMessage() + if !ok { + break + } + msgType := msg.msgType + buf := msg.buf + from := msg.from + + switch msgType { + case suspectMsg: + m.handleSuspect(buf, from) + case aliveMsg: + m.handleAlive(buf, from) + case deadMsg: + m.handleDead(buf, from) + case userMsg: + m.handleUser(buf, from) + default: + m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) + } } case <-m.shutdownCh: @@ -1059,7 +1109,7 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error { // operations, given the deadline. The bool return parameter is true if we // we able to round trip a ping to the other node. func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) { - conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout) + conn, err := m.transport.DialTimeout(addr, deadline.Sub(time.Now())) if err != nil { // If the node is actually dead we expect this to fail, so we // shouldn't spam the logs with it. After this point, errors @@ -1094,7 +1144,7 @@ func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time } if ack.SeqNo != ping.SeqNo { - return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn)) + return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo) } return true, nil diff --git a/vendor/github.com/hashicorp/serf/serf/delegate.go b/vendor/github.com/hashicorp/serf/serf/delegate.go index 871b72e5030c..567c7fe4abe7 100644 --- a/vendor/github.com/hashicorp/serf/serf/delegate.go +++ b/vendor/github.com/hashicorp/serf/serf/delegate.go @@ -6,6 +6,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/memberlist" ) // delegate is the memberlist.Delegate implementation that Serf uses. @@ -13,6 +14,8 @@ type delegate struct { serf *Serf } +var _ memberlist.Delegate = &delegate{} + func (d *delegate) NodeMeta(limit int) []byte { roleBytes := d.serf.encodeTags(d.serf.config.Tags) if len(roleBytes) > limit { diff --git a/vendor/github.com/hashicorp/serf/serf/keymanager.go b/vendor/github.com/hashicorp/serf/serf/keymanager.go index fd53182fc53c..bea038cd2488 100644 --- a/vendor/github.com/hashicorp/serf/serf/keymanager.go +++ b/vendor/github.com/hashicorp/serf/serf/keymanager.go @@ -189,4 +189,4 @@ func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse, defer k.l.RUnlock() return k.handleKeyRequest("", listKeysQuery, opts) -} \ No newline at end of file +} diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index 548807a9d807..bb6c22fe7b67 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -1331,7 +1331,7 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) { // handleNodeConflict is invoked when a join detects a conflict over a name. // This means two different nodes (IP/Port) are claiming the same name. Memberlist -// will reject the "new" node mapping, but we can still be notified +// will reject the "new" node mapping, but we can still be notified. func (s *Serf) handleNodeConflict(existing, other *memberlist.Node) { // Log a basic warning if the node is not us... if existing.Name != s.config.NodeName { diff --git a/vendor/github.com/hashicorp/serf/serf/snapshot.go b/vendor/github.com/hashicorp/serf/serf/snapshot.go index 9f5adebe6252..d2eda0ea239a 100644 --- a/vendor/github.com/hashicorp/serf/serf/snapshot.go +++ b/vendor/github.com/hashicorp/serf/serf/snapshot.go @@ -25,10 +25,34 @@ nodes to re-join, as well as restore our clock values to avoid replaying old events. */ -const flushInterval = 500 * time.Millisecond -const clockUpdateInterval = 500 * time.Millisecond -const tmpExt = ".compact" -const snapshotErrorRecoveryInterval = 30 * time.Second +const ( + // flushInterval is how often we force a flush of the snapshot file + flushInterval = 500 * time.Millisecond + + // clockUpdateInterval is how often we fetch the current lamport time of the cluster and write to the snapshot file + clockUpdateInterval = 500 * time.Millisecond + + // tmpExt is the extention we use for the temporary file during compaction + tmpExt = ".compact" + + // snapshotErrorRecoveryInterval is how often we attempt to recover from + // errors writing to the snapshot file. + snapshotErrorRecoveryInterval = 30 * time.Second + + // eventChSize is the size of the event buffers between Serf and the + // consuming application. If this is exhausted we will block Serf and Memberlist. + eventChSize = 2048 + + // shutdownFlushTimeout is the time limit to write pending events to the snapshot during a shutdown + shutdownFlushTimeout = 250 * time.Millisecond + + // snapshotBytesPerNode is an estimated bytes per node to snapshot + snapshotBytesPerNode = 128 + + // snapshotCompactionThreshold is the threshold we apply to + // the snapshot size estimate (nodes * bytes per node) before compacting. + snapshotCompactionThreshold = 2 +) // Snapshotter is responsible for ingesting events and persisting // them to disk, and providing a recovery mechanism at start time. @@ -38,6 +62,7 @@ type Snapshotter struct { fh *os.File buffered *bufio.Writer inCh <-chan Event + streamCh chan Event lastFlush time.Time lastClock LamportTime lastEventClock LamportTime @@ -45,7 +70,7 @@ type Snapshotter struct { leaveCh chan struct{} leaving bool logger *log.Logger - maxSize int64 + minCompactSize int64 path string offset int64 outCh chan<- Event @@ -72,13 +97,14 @@ func (p PreviousNode) String() string { // Setting rejoinAfterLeave makes leave not clear the state, and can be used // if you intend to rejoin the same cluster after a leave. func NewSnapshotter(path string, - maxSize int, + minCompactSize int, rejoinAfterLeave bool, logger *log.Logger, clock *LamportClock, outCh chan<- Event, shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) { - inCh := make(chan Event, 1024) + inCh := make(chan Event, eventChSize) + streamCh := make(chan Event, eventChSize) // Try to open the file fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644) @@ -101,12 +127,13 @@ func NewSnapshotter(path string, fh: fh, buffered: bufio.NewWriter(fh), inCh: inCh, + streamCh: streamCh, lastClock: 0, lastEventClock: 0, lastQueryClock: 0, leaveCh: make(chan struct{}), logger: logger, - maxSize: int64(maxSize), + minCompactSize: int64(minCompactSize), path: path, offset: offset, outCh: outCh, @@ -122,6 +149,7 @@ func NewSnapshotter(path string, } // Start handling new commands + go snap.teeStream() go snap.stream() return inCh, snap, nil } @@ -171,11 +199,69 @@ func (s *Snapshotter) Leave() { } } +// teeStream is a long running routine that is used to copy events +// to the output channel and the internal event handler. +func (s *Snapshotter) teeStream() { + flushEvent := func(e Event) { + // Forward to the internal stream, do not block + select { + case s.streamCh <- e: + default: + } + + // Forward the event immediately, do not block + if s.outCh != nil { + select { + case s.outCh <- e: + default: + } + } + } + +OUTER: + for { + select { + case e := <-s.inCh: + flushEvent(e) + case <-s.shutdownCh: + break OUTER + } + } + + // Drain any remaining events before exiting + for { + select { + case e := <-s.inCh: + flushEvent(e) + default: + return + } + } +} + // stream is a long running routine that is used to handle events func (s *Snapshotter) stream() { clockTicker := time.NewTicker(clockUpdateInterval) defer clockTicker.Stop() + // flushEvent is used to handle writing out an event + flushEvent := func(e Event) { + // Stop recording events after a leave is issued + if s.leaving { + return + } + switch typed := e.(type) { + case MemberEvent: + s.processMemberEvent(typed) + case UserEvent: + s.processUserEvent(typed) + case *Query: + s.processQuery(typed) + default: + s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e) + } + } + for { select { case <-s.leaveCh: @@ -193,31 +279,32 @@ func (s *Snapshotter) stream() { s.logger.Printf("[ERR] serf: failed to sync leave to snapshot: %v", err) } - case e := <-s.inCh: - // Forward the event immediately - if s.outCh != nil { - s.outCh <- e - } - - // Stop recording events after a leave is issued - if s.leaving { - continue - } - switch typed := e.(type) { - case MemberEvent: - s.processMemberEvent(typed) - case UserEvent: - s.processUserEvent(typed) - case *Query: - s.processQuery(typed) - default: - s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e) - } + case e := <-s.streamCh: + flushEvent(e) case <-clockTicker.C: s.updateClock() case <-s.shutdownCh: + // Setup a timeout + flushTimeout := time.After(shutdownFlushTimeout) + + // Snapshot the clock + s.updateClock() + + // Clear out the buffers + FLUSH: + for { + select { + case e := <-s.streamCh: + flushEvent(e) + case <-flushTimeout: + break FLUSH + default: + break FLUSH + } + } + if err := s.buffered.Flush(); err != nil { s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err) } @@ -321,12 +408,25 @@ func (s *Snapshotter) appendLine(l string) error { // Check if a compaction is necessary s.offset += int64(n) - if s.offset > s.maxSize { + if s.offset > s.snapshotMaxSize() { return s.compact() } return nil } +// snapshotMaxSize computes the maximum size and is used to force periodic compaction. +func (s *Snapshotter) snapshotMaxSize() int64 { + nodes := int64(len(s.aliveNodes)) + estSize := nodes * snapshotBytesPerNode + threshold := estSize * snapshotCompactionThreshold + + // Apply a minimum threshold to avoid frequent compaction + if threshold < s.minCompactSize { + threshold = s.minCompactSize + } + return threshold +} + // Compact is used to compact the snapshot once it is too large func (s *Snapshotter) compact() error { defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now()) diff --git a/vendor/vendor.json b/vendor/vendor.json index 2d9b6d89834b..74ab6d9a4c61 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -99,12 +99,12 @@ {"path":"github.com/hashicorp/hil","checksumSHA1":"kqCMCHy2b+RBMKC+ER+OPqp8C3E=","revision":"1e86c6b523c55d1fa6c6e930ce80b548664c95c2","revisionTime":"2016-07-11T23:18:37Z"}, {"path":"github.com/hashicorp/hil/ast","checksumSHA1":"UICubs001+Q4MsUf9zl2vcMzWQQ=","revision":"1e86c6b523c55d1fa6c6e930ce80b548664c95c2","revisionTime":"2016-07-11T23:18:37Z"}, {"path":"github.com/hashicorp/logutils","checksumSHA1":"vt+P9D2yWDO3gdvdgCzwqunlhxU=","revision":"0dc08b1671f34c4250ce212759ebd880f743d883","revisionTime":"2015-06-09T07:04:31Z"}, - {"path":"github.com/hashicorp/memberlist","checksumSHA1":"88DoUaWD6hS1KTt57RMQ7wxHu/k=","revision":"9bdd37bfb26bd039c08b0f36be6f80ceede4aaf3","revisionTime":"2017-11-17T04:34:18Z"}, + {"path":"github.com/hashicorp/memberlist","checksumSHA1":"q6yTL5vSGnWxUtcocVU3YIG/HNc=","revision":"b195c8e4fcc6284fff1583fd6ab09e68ca207551","revisionTime":"2018-08-09T14:04:54Z"}, {"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3","revisionTime":"2015-11-16T02:03:38Z"}, {"path":"github.com/hashicorp/raft","checksumSHA1":"JjJtGJi1ywWhVhs/PvTXxe4TeD8=","revision":"6d14f0c70869faabd9e60ba7ed88a6cbbd6a661f","revisionTime":"2017-10-03T22:09:13Z","version":"v1.0.0","versionExact":"v1.0.0"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, - {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"4b67f2c2b2bb5b748d934a6d48221062e43d2274","revisionTime":"2018-05-04T20:06:40Z"}, - {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"QrT+nzyXsD/MmhTjjhcPdnALZ1I=","revision":"4b67f2c2b2bb5b748d934a6d48221062e43d2274","revisionTime":"2018-05-04T20:06:40Z"}, + {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"19bbd39e421bdf3559d5025fb2c760f5ffa56233","revisionTime":"2018-08-09T14:17:58Z"}, + {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"axdQxCEwvUr1AygfYIMMxPkS1pY=","revision":"19bbd39e421bdf3559d5025fb2c760f5ffa56233","revisionTime":"2018-08-09T14:17:58Z"}, {"path":"github.com/hashicorp/vault/api","checksumSHA1":"LYQZ+o7zJCda/6LibdN0spFco34=","revision":"533003e27840d9646cb4e7d23b3a113895da1dd0","revisionTime":"2018-06-20T14:55:40Z","version":"v0.10.3","versionExact":"v0.10.3"}, {"path":"github.com/hashicorp/vault/audit","checksumSHA1":"2JOC+Ur0S3U8Gqv2cfNB3zxgSBk=","revision":"c737968235c8673b872350f0a047877bee396342","revisionTime":"2018-06-20T16:45:32Z"}, {"path":"github.com/hashicorp/vault/builtin/logical/database/dbplugin","checksumSHA1":"RCwWixWwKG6j2vF9iVoxbCzo6p4=","revision":"c737968235c8673b872350f0a047877bee396342","revisionTime":"2018-06-20T16:45:32Z"},