diff --git a/networkdb/broadcast.go b/networkdb/broadcast.go index 2e07729569..faaf642948 100644 --- a/networkdb/broadcast.go +++ b/networkdb/broadcast.go @@ -1,10 +1,15 @@ package networkdb import ( + "fmt" + "time" + "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" ) +const broadcastTimeout = 5 * time.Second + type networkEventMessage struct { id string node string @@ -44,6 +49,53 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim return nil } +type nodeEventMessage struct { + msg []byte + notify chan<- struct{} +} + +func (m *nodeEventMessage) Invalidates(other memberlist.Broadcast) bool { + return false +} + +func (m *nodeEventMessage) Message() []byte { + return m.msg +} + +func (m *nodeEventMessage) Finished() { + if m.notify != nil { + close(m.notify) + } +} + +func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error { + nEvent := NodeEvent{ + Type: event, + LTime: nDB.networkClock.Increment(), + NodeName: nDB.config.NodeName, + } + + raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent) + if err != nil { + return err + } + + notifyCh := make(chan struct{}) + nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{ + msg: raw, + notify: notifyCh, + }) + + // Wait for the broadcast + select { + case <-notifyCh: + case <-time.After(broadcastTimeout): + return fmt.Errorf("timed out broadcasting node event") + } + + return nil +} + type tableEventMessage struct { id string tname string diff --git a/networkdb/cluster.go b/networkdb/cluster.go index 17563589dc..0186f22399 100644 --- a/networkdb/cluster.go +++ b/networkdb/cluster.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" rnd "math/rand" + "net" "strings" "time" @@ -111,6 +112,13 @@ func (nDB *NetworkDB) clusterInit() error { RetransmitMult: config.RetransmitMult, } + nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{ + NumNodes: func() int { + return len(nDB.nodes) + }, + RetransmitMult: config.RetransmitMult, + } + mlist, err := memberlist.Create(config) if err != nil { return fmt.Errorf("failed to create memberlist: %v", err) @@ -127,6 +135,7 @@ func (nDB *NetworkDB) clusterInit() error { {reapInterval, nDB.reapState}, {config.GossipInterval, nDB.gossip}, {config.PushPullInterval, nDB.bulkSyncTables}, + {1 * time.Second, nDB.reconnectNode}, } { t := time.NewTicker(trigger.interval) go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn) @@ -143,12 +152,20 @@ func (nDB *NetworkDB) clusterJoin(members []string) error { return fmt.Errorf("could not join node to memberlist: %v", err) } + if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil { + return fmt.Errorf("failed to send node join: %v", err) + } + return nil } func (nDB *NetworkDB) clusterLeave() error { mlist := nDB.memberlist + if err := nDB.sendNodeEvent(NodeEventTypeLeave); err != nil { + return fmt.Errorf("failed to send node leave: %v", err) + } + if err := mlist.Leave(time.Second); err != nil { return err } @@ -180,6 +197,42 @@ func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, sto } } +func (nDB *NetworkDB) reconnectNode() { + nDB.RLock() + if len(nDB.failedNodes) == 0 { + nDB.RUnlock() + return + } + + nodes := make([]*node, 0, len(nDB.failedNodes)) + for _, n := range nDB.failedNodes { + nodes = append(nodes, n) + } + nDB.RUnlock() + + // Update all the local state to a new time to force update on + // the node we are trying to rejoin, just in case that node + // has these in leaving/deleting state still. This is + // facilitate fast convergence after recovering from a gossip + // failure. + nDB.updateLocalStateTime() + + node := nodes[randomOffset(len(nodes))] + addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)} + + if _, err := nDB.memberlist.Join([]string{addr.String()}); err != nil { + return + } + + if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil { + logrus.Errorf("failed to send node join during reconnect: %v", err) + return + } + + logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name) + nDB.bulkSync([]string{node.Name}, true) +} + func (nDB *NetworkDB) reapState() { nDB.reapNetworks() nDB.reapTableEntries() @@ -288,7 +341,7 @@ func (nDB *NetworkDB) gossip() { } // Send the compound message - if err := nDB.memberlist.SendToUDP(mnode, compound); err != nil { + if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil { logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err) } } @@ -323,7 +376,7 @@ func (nDB *NetworkDB) bulkSyncTables() { continue } - completed, err := nDB.bulkSync(nid, nodes, false) + completed, err := nDB.bulkSync(nodes, false) if err != nil { logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err) continue @@ -350,7 +403,7 @@ func (nDB *NetworkDB) bulkSyncTables() { } } -func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string, error) { +func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { if !all { // If not all, then just pick one. nodes = nDB.mRandomNodes(1, nodes) @@ -388,7 +441,12 @@ func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string, func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error { var msgs [][]byte - logrus.Debugf("%s: Initiating bulk sync for networks %v with node %s", nDB.config.NodeName, networks, node) + var unsolMsg string + if unsolicited { + unsolMsg = "unsolicited" + } + + logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node) nDB.RLock() mnode := nDB.nodes[node] @@ -454,7 +512,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b nDB.bulkSyncAckTbl[node] = ch nDB.Unlock() - err = nDB.memberlist.SendToTCP(mnode, buf) + err = nDB.memberlist.SendToTCP(&mnode.Node, buf) if err != nil { nDB.Lock() delete(nDB.bulkSyncAckTbl, node) diff --git a/networkdb/delegate.go b/networkdb/delegate.go index 35c126a847..3e96384465 100644 --- a/networkdb/delegate.go +++ b/networkdb/delegate.go @@ -17,6 +17,56 @@ func (d *delegate) NodeMeta(limit int) []byte { return []byte{} } +func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node { + nDB.Lock() + defer nDB.Unlock() + + for _, nodes := range []map[string]*node{ + nDB.failedNodes, + nDB.leftNodes, + nDB.nodes, + } { + if n, ok := nodes[nEvent.NodeName]; ok { + if n.ltime >= nEvent.LTime { + return nil + } + + delete(nDB.failedNodes, n.Name) + return n + } + } + + return nil +} + +func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { + // Update our local clock if the received messages has newer + // time. + nDB.networkClock.Witness(nEvent.LTime) + + n := nDB.checkAndGetNode(nEvent) + if n == nil { + return false + } + + n.ltime = nEvent.LTime + + switch nEvent.Type { + case NodeEventTypeJoin: + nDB.Lock() + nDB.nodes[n.Name] = n + nDB.Unlock() + return true + case NodeEventTypeLeave: + nDB.Lock() + nDB.leftNodes[n.Name] = n + nDB.Unlock() + return true + } + + return false +} + func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { // Update our local clock if the received messages has newer // time. @@ -188,6 +238,27 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { } } +func (nDB *NetworkDB) handleNodeMessage(buf []byte) { + var nEvent NodeEvent + if err := proto.Unmarshal(buf, &nEvent); err != nil { + logrus.Errorf("Error decoding node event message: %v", err) + return + } + + if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast { + var err error + buf, err = encodeRawMessage(MessageTypeNodeEvent, buf) + if err != nil { + logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err) + return + } + + nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{ + msg: buf, + }) + } +} + func (nDB *NetworkDB) handleNetworkMessage(buf []byte) { var nEvent NetworkEvent if err := proto.Unmarshal(buf, &nEvent); err != nil { @@ -256,6 +327,8 @@ func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) { } switch mType { + case MessageTypeNodeEvent: + nDB.handleNodeMessage(data) case MessageTypeNetworkEvent: nDB.handleNetworkMessage(data) case MessageTypeTableEvent: @@ -278,7 +351,9 @@ func (d *delegate) NotifyMsg(buf []byte) { } func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { - return d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit) + msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit) + msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...) + return msgs } func (d *delegate) LocalState(join bool) []byte { @@ -286,7 +361,8 @@ func (d *delegate) LocalState(join bool) []byte { defer d.nDB.RUnlock() pp := NetworkPushPull{ - LTime: d.nDB.networkClock.Time(), + LTime: d.nDB.networkClock.Time(), + NodeName: d.nDB.config.NodeName, } for name, nn := range d.nDB.networks { @@ -336,6 +412,13 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) { d.nDB.networkClock.Witness(pp.LTime) } + nodeEvent := &NodeEvent{ + LTime: pp.LTime, + NodeName: pp.NodeName, + Type: NodeEventTypeJoin, + } + d.nDB.handleNodeEvent(nodeEvent) + for _, n := range pp.Networks { nEvent := &NetworkEvent{ LTime: n.LTime, diff --git a/networkdb/event_delegate.go b/networkdb/event_delegate.go index 7dfea84f6e..019cafbd06 100644 --- a/networkdb/event_delegate.go +++ b/networkdb/event_delegate.go @@ -6,17 +6,31 @@ type eventDelegate struct { nDB *NetworkDB } -func (e *eventDelegate) NotifyJoin(n *memberlist.Node) { +func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) { e.nDB.Lock() - e.nDB.nodes[n.Name] = n + // In case the node is rejoining after a failure or leave, + // wait until an explicit join message arrives before adding + // it to the nodes just to make sure this is not a stale + // join. If you don't know about this node add it immediately. + _, fOk := e.nDB.failedNodes[mn.Name] + _, lOk := e.nDB.leftNodes[mn.Name] + if fOk || lOk { + e.nDB.Unlock() + return + } + + e.nDB.nodes[mn.Name] = &node{Node: *mn} e.nDB.Unlock() } -func (e *eventDelegate) NotifyLeave(n *memberlist.Node) { - e.nDB.deleteNodeTableEntries(n.Name) - e.nDB.deleteNetworkNodeEntries(n.Name) +func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) { + e.nDB.deleteNodeTableEntries(mn.Name) + e.nDB.deleteNetworkEntriesForNode(mn.Name) e.nDB.Lock() - delete(e.nDB.nodes, n.Name) + if n, ok := e.nDB.nodes[mn.Name]; ok { + delete(e.nDB.nodes, mn.Name) + e.nDB.failedNodes[mn.Name] = n + } e.nDB.Unlock() } diff --git a/networkdb/networkdb.go b/networkdb/networkdb.go index ffb2d4690c..c452a90835 100644 --- a/networkdb/networkdb.go +++ b/networkdb/networkdb.go @@ -41,7 +41,13 @@ type NetworkDB struct { // List of all peer nodes in the cluster not-limited to any // network. - nodes map[string]*memberlist.Node + nodes map[string]*node + + // List of all peer nodes which have failed + failedNodes map[string]*node + + // List of all peer nodes which have left + leftNodes map[string]*node // A multi-dimensional map of network/node attachmemts. The // first key is a node name and the second key is a network ID @@ -66,6 +72,9 @@ type NetworkDB struct { // Broadcast queue for network event gossip. networkBroadcasts *memberlist.TransmitLimitedQueue + // Broadcast queue for node event gossip. + nodeBroadcasts *memberlist.TransmitLimitedQueue + // A central stop channel to stop all go routines running on // behalf of the NetworkDB instance. stopCh chan struct{} @@ -82,6 +91,11 @@ type NetworkDB struct { keyring *memberlist.Keyring } +type node struct { + memberlist.Node + ltime serf.LamportTime +} + // network describes the node/network attachment. type network struct { // Network ID @@ -146,7 +160,9 @@ func New(c *Config) (*NetworkDB, error) { config: c, indexes: make(map[int]*radix.Tree), networks: make(map[string]map[string]*network), - nodes: make(map[string]*memberlist.Node), + nodes: make(map[string]*node), + failedNodes: make(map[string]*node), + leftNodes: make(map[string]*node), networkNodes: make(map[string][]string), bulkSyncAckTbl: make(map[string]chan struct{}), broadcaster: events.NewBroadcaster(), @@ -203,10 +219,9 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) { // table, key) tuple and if the NetworkDB is part of the cluster // propogates this event to the cluster. It is an error to create an // entry for the same tuple for which there is already an existing -// entry unless the current entry is deleting state. +// entry. func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { - e, _ := nDB.getEntry(tname, nid, key) - if e != nil && !e.deleting { + if _, err := nDB.GetEntry(tname, nid, key); err == nil { return fmt.Errorf("cannot create entry as the entry in table %s with network id %s and key %s already exists", tname, nid, key) } @@ -287,7 +302,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { return nil } -func (nDB *NetworkDB) deleteNetworkNodeEntries(deletedNode string) { +func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) { nDB.Lock() for nid, nodes := range nDB.networkNodes { updatedNodes := make([]string, 0, len(nodes)) @@ -301,6 +316,8 @@ func (nDB *NetworkDB) deleteNetworkNodeEntries(deletedNode string) { nDB.networkNodes[nid] = updatedNodes } + + delete(nDB.networks, deletedNode) nDB.Unlock() } @@ -390,7 +407,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { } logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid) - if _, err := nDB.bulkSync(nid, networkNodes, true); err != nil { + if _, err := nDB.bulkSync(networkNodes, true); err != nil { logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err) } @@ -493,10 +510,41 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string { var networks []string for nid := range nDB.networks[nDB.config.NodeName] { - if _, ok := nDB.networks[nodeName][nid]; ok { - networks = append(networks, nid) + if n, ok := nDB.networks[nodeName][nid]; ok { + if !n.leaving { + networks = append(networks, nid) + } } } return networks } + +func (nDB *NetworkDB) updateLocalStateTime() { + nDB.Lock() + defer nDB.Unlock() + + ltime := nDB.networkClock.Increment() + for _, n := range nDB.networks[nDB.config.NodeName] { + n.ltime = ltime + } + + ltime = nDB.tableClock.Increment() + nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { + entry := v.(*entry) + if entry.node != nDB.config.NodeName { + return false + } + + params := strings.Split(path[1:], "/") + tname := params[0] + nid := params[1] + key := params[2] + entry.ltime = ltime + + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + + return false + }) +} diff --git a/networkdb/networkdb.pb.go b/networkdb/networkdb.pb.go index 86177cf315..dfbc7131fb 100644 --- a/networkdb/networkdb.pb.go +++ b/networkdb/networkdb.pb.go @@ -10,6 +10,7 @@ It has these top-level messages: GossipMessage + NodeEvent NetworkEvent NetworkEntry NetworkPushPull @@ -67,6 +68,9 @@ const ( // which is a pack of many message of above types, packed into // a single compound message. MessageTypeCompound MessageType = 5 + // NodeEvent message type is used to communicare node + // join/leave events in the cluster + MessageTypeNodeEvent MessageType = 6 ) var MessageType_name = map[int32]string{ @@ -76,6 +80,7 @@ var MessageType_name = map[int32]string{ 3: "PUSH_PULL", 4: "BULK_SYNC", 5: "COMPOUND", + 6: "NODE_EVENT", } var MessageType_value = map[string]int32{ "INVALID": 0, @@ -84,6 +89,7 @@ var MessageType_value = map[string]int32{ "PUSH_PULL": 3, "BULK_SYNC": 4, "COMPOUND": 5, + "NODE_EVENT": 6, } func (x MessageType) String() string { @@ -91,6 +97,32 @@ func (x MessageType) String() string { } func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{0} } +type NodeEvent_Type int32 + +const ( + NodeEventTypeInvalid NodeEvent_Type = 0 + // Join event is generated when this node joins the cluster. + NodeEventTypeJoin NodeEvent_Type = 1 + // Leave event is generated when this node leaves the cluster. + NodeEventTypeLeave NodeEvent_Type = 2 +) + +var NodeEvent_Type_name = map[int32]string{ + 0: "INVALID", + 1: "JOIN", + 2: "LEAVE", +} +var NodeEvent_Type_value = map[string]int32{ + "INVALID": 0, + "JOIN": 1, + "LEAVE": 2, +} + +func (x NodeEvent_Type) String() string { + return proto.EnumName(NodeEvent_Type_name, int32(x)) +} +func (NodeEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1, 0} } + type NetworkEvent_Type int32 const ( @@ -115,7 +147,7 @@ var NetworkEvent_Type_value = map[string]int32{ func (x NetworkEvent_Type) String() string { return proto.EnumName(NetworkEvent_Type_name, int32(x)) } -func (NetworkEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1, 0} } +func (NetworkEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{2, 0} } type TableEvent_Type int32 @@ -148,7 +180,7 @@ var TableEvent_Type_value = map[string]int32{ func (x TableEvent_Type) String() string { return proto.EnumName(TableEvent_Type_name, int32(x)) } -func (TableEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{4, 0} } +func (TableEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{5, 0} } // GossipMessage is a basic message header used by all messages types. type GossipMessage struct { @@ -160,6 +192,21 @@ func (m *GossipMessage) Reset() { *m = GossipMessage{} } func (*GossipMessage) ProtoMessage() {} func (*GossipMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{0} } +// NodeEvent message payload definition. +type NodeEvent struct { + Type NodeEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=networkdb.NodeEvent_Type" json:"type,omitempty"` + // Lamport time using a network lamport clock indicating the + // time this event was generated on the node where it was + // generated. + LTime github_com_hashicorp_serf_serf.LamportTime `protobuf:"varint,2,opt,name=l_time,json=lTime,proto3,customtype=github.com/hashicorp/serf/serf.LamportTime" json:"l_time"` + // Source node name. + NodeName string `protobuf:"bytes,3,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` +} + +func (m *NodeEvent) Reset() { *m = NodeEvent{} } +func (*NodeEvent) ProtoMessage() {} +func (*NodeEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1} } + // NetworkEvent message payload definition. type NetworkEvent struct { Type NetworkEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=networkdb.NetworkEvent_Type" json:"type,omitempty"` @@ -175,7 +222,7 @@ type NetworkEvent struct { func (m *NetworkEvent) Reset() { *m = NetworkEvent{} } func (*NetworkEvent) ProtoMessage() {} -func (*NetworkEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1} } +func (*NetworkEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{2} } // NetworkEntry for push pull of networks. type NetworkEntry struct { @@ -192,18 +239,20 @@ type NetworkEntry struct { func (m *NetworkEntry) Reset() { *m = NetworkEntry{} } func (*NetworkEntry) ProtoMessage() {} -func (*NetworkEntry) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{2} } +func (*NetworkEntry) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{3} } // NetworkPushpull message payload definition. type NetworkPushPull struct { // Lamport time when this push pull was initiated. LTime github_com_hashicorp_serf_serf.LamportTime `protobuf:"varint,1,opt,name=l_time,json=lTime,proto3,customtype=github.com/hashicorp/serf/serf.LamportTime" json:"l_time"` Networks []*NetworkEntry `protobuf:"bytes,2,rep,name=networks" json:"networks,omitempty"` + // Name of the node sending this push pull payload. + NodeName string `protobuf:"bytes,3,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` } func (m *NetworkPushPull) Reset() { *m = NetworkPushPull{} } func (*NetworkPushPull) ProtoMessage() {} -func (*NetworkPushPull) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{3} } +func (*NetworkPushPull) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{4} } func (m *NetworkPushPull) GetNetworks() []*NetworkEntry { if m != nil { @@ -231,7 +280,7 @@ type TableEvent struct { func (m *TableEvent) Reset() { *m = TableEvent{} } func (*TableEvent) ProtoMessage() {} -func (*TableEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{4} } +func (*TableEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{5} } // BulkSync message payload definition. type BulkSyncMessage struct { @@ -251,7 +300,7 @@ type BulkSyncMessage struct { func (m *BulkSyncMessage) Reset() { *m = BulkSyncMessage{} } func (*BulkSyncMessage) ProtoMessage() {} -func (*BulkSyncMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{5} } +func (*BulkSyncMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{6} } // Compound message payload definition. type CompoundMessage struct { @@ -261,7 +310,7 @@ type CompoundMessage struct { func (m *CompoundMessage) Reset() { *m = CompoundMessage{} } func (*CompoundMessage) ProtoMessage() {} -func (*CompoundMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{6} } +func (*CompoundMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{7} } func (m *CompoundMessage) GetMessages() []*CompoundMessage_SimpleMessage { if m != nil { @@ -279,11 +328,12 @@ type CompoundMessage_SimpleMessage struct { func (m *CompoundMessage_SimpleMessage) Reset() { *m = CompoundMessage_SimpleMessage{} } func (*CompoundMessage_SimpleMessage) ProtoMessage() {} func (*CompoundMessage_SimpleMessage) Descriptor() ([]byte, []int) { - return fileDescriptorNetworkdb, []int{6, 0} + return fileDescriptorNetworkdb, []int{7, 0} } func init() { proto.RegisterType((*GossipMessage)(nil), "networkdb.GossipMessage") + proto.RegisterType((*NodeEvent)(nil), "networkdb.NodeEvent") proto.RegisterType((*NetworkEvent)(nil), "networkdb.NetworkEvent") proto.RegisterType((*NetworkEntry)(nil), "networkdb.NetworkEntry") proto.RegisterType((*NetworkPushPull)(nil), "networkdb.NetworkPushPull") @@ -292,6 +342,7 @@ func init() { proto.RegisterType((*CompoundMessage)(nil), "networkdb.CompoundMessage") proto.RegisterType((*CompoundMessage_SimpleMessage)(nil), "networkdb.CompoundMessage.SimpleMessage") proto.RegisterEnum("networkdb.MessageType", MessageType_name, MessageType_value) + proto.RegisterEnum("networkdb.NodeEvent_Type", NodeEvent_Type_name, NodeEvent_Type_value) proto.RegisterEnum("networkdb.NetworkEvent_Type", NetworkEvent_Type_name, NetworkEvent_Type_value) proto.RegisterEnum("networkdb.TableEvent_Type", TableEvent_Type_name, TableEvent_Type_value) } @@ -306,6 +357,18 @@ func (this *GossipMessage) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *NodeEvent) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&networkdb.NodeEvent{") + s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") + s = append(s, "LTime: "+fmt.Sprintf("%#v", this.LTime)+",\n") + s = append(s, "NodeName: "+fmt.Sprintf("%#v", this.NodeName)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func (this *NetworkEvent) GoString() string { if this == nil { return "nil" @@ -336,12 +399,13 @@ func (this *NetworkPushPull) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&networkdb.NetworkPushPull{") s = append(s, "LTime: "+fmt.Sprintf("%#v", this.LTime)+",\n") if this.Networks != nil { s = append(s, "Networks: "+fmt.Sprintf("%#v", this.Networks)+",\n") } + s = append(s, "NodeName: "+fmt.Sprintf("%#v", this.NodeName)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -451,6 +515,40 @@ func (m *GossipMessage) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *NodeEvent) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *NodeEvent) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Type != 0 { + data[i] = 0x8 + i++ + i = encodeVarintNetworkdb(data, i, uint64(m.Type)) + } + if m.LTime != 0 { + data[i] = 0x10 + i++ + i = encodeVarintNetworkdb(data, i, uint64(m.LTime)) + } + if len(m.NodeName) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName))) + i += copy(data[i:], m.NodeName) + } + return i, nil +} + func (m *NetworkEvent) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -568,6 +666,12 @@ func (m *NetworkPushPull) MarshalTo(data []byte) (int, error) { i += n } } + if len(m.NodeName) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName))) + i += copy(data[i:], m.NodeName) + } return i, nil } @@ -783,6 +887,22 @@ func (m *GossipMessage) Size() (n int) { return n } +func (m *NodeEvent) Size() (n int) { + var l int + _ = l + if m.Type != 0 { + n += 1 + sovNetworkdb(uint64(m.Type)) + } + if m.LTime != 0 { + n += 1 + sovNetworkdb(uint64(m.LTime)) + } + l = len(m.NodeName) + if l > 0 { + n += 1 + l + sovNetworkdb(uint64(l)) + } + return n +} + func (m *NetworkEvent) Size() (n int) { var l int _ = l @@ -835,6 +955,10 @@ func (m *NetworkPushPull) Size() (n int) { n += 1 + l + sovNetworkdb(uint64(l)) } } + l = len(m.NodeName) + if l > 0 { + n += 1 + l + sovNetworkdb(uint64(l)) + } return n } @@ -942,6 +1066,18 @@ func (this *GossipMessage) String() string { }, "") return s } +func (this *NodeEvent) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&NodeEvent{`, + `Type:` + fmt.Sprintf("%v", this.Type) + `,`, + `LTime:` + fmt.Sprintf("%v", this.LTime) + `,`, + `NodeName:` + fmt.Sprintf("%v", this.NodeName) + `,`, + `}`, + }, "") + return s +} func (this *NetworkEvent) String() string { if this == nil { return "nil" @@ -975,6 +1111,7 @@ func (this *NetworkPushPull) String() string { s := strings.Join([]string{`&NetworkPushPull{`, `LTime:` + fmt.Sprintf("%v", this.LTime) + `,`, `Networks:` + strings.Replace(fmt.Sprintf("%v", this.Networks), "NetworkEntry", "NetworkEntry", 1) + `,`, + `NodeName:` + fmt.Sprintf("%v", this.NodeName) + `,`, `}`, }, "") return s @@ -1137,6 +1274,123 @@ func (m *GossipMessage) Unmarshal(data []byte) error { } return nil } +func (m *NodeEvent) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: NodeEvent: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: NodeEvent: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Type |= (NodeEvent_Type(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LTime", wireType) + } + m.LTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.LTime |= (github_com_hashicorp_serf_serf.LamportTime(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthNetworkdb + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NodeName = string(data[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipNetworkdb(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthNetworkdb + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *NetworkEvent) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 @@ -1509,6 +1763,35 @@ func (m *NetworkPushPull) Unmarshal(data []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthNetworkdb + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NodeName = string(data[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipNetworkdb(data[iNdEx:]) @@ -2211,56 +2494,61 @@ var ( ) var fileDescriptorNetworkdb = []byte{ - // 812 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xcc, 0x95, 0x4d, 0x6f, 0xe2, 0x46, - 0x18, 0xc7, 0x31, 0x18, 0x02, 0x0f, 0xd0, 0x20, 0x27, 0x4d, 0x5c, 0xa7, 0x25, 0x91, 0x9b, 0x46, - 0x14, 0x55, 0x4e, 0x95, 0x7c, 0x02, 0x5e, 0xac, 0x96, 0xc4, 0x31, 0xc8, 0x40, 0xaa, 0x9e, 0x90, - 0xc1, 0x53, 0xb0, 0x62, 0x6c, 0x0b, 0x9b, 0x54, 0xdc, 0xaa, 0x9e, 0xa2, 0xde, 0x7a, 0xad, 0xd4, - 0x53, 0x7b, 0xee, 0x07, 0xe8, 0xa1, 0xe7, 0xa8, 0xa7, 0xf6, 0xb6, 0xda, 0x43, 0xb4, 0xc9, 0x27, - 0xd8, 0x8f, 0xb0, 0xe3, 0xc1, 0x86, 0x81, 0x44, 0xb9, 0xec, 0x6a, 0xb5, 0x07, 0xc3, 0xbc, 0xfc, - 0xe6, 0xd1, 0xff, 0x79, 0xe6, 0x3f, 0x33, 0xb0, 0x69, 0x23, 0xff, 0x47, 0x67, 0x72, 0x65, 0xf4, - 0x25, 0x77, 0xe2, 0xf8, 0x0e, 0x97, 0x59, 0x0c, 0x08, 0xdb, 0x43, 0x67, 0xe8, 0x90, 0xd1, 0xe3, - 0xa0, 0x35, 0x07, 0xc4, 0x26, 0xe4, 0xbf, 0x71, 0x3c, 0xcf, 0x74, 0x2f, 0x90, 0xe7, 0xe9, 0x43, - 0xc4, 0x95, 0x81, 0xf5, 0x67, 0x2e, 0xe2, 0x99, 0x03, 0xa6, 0xf4, 0xd1, 0xc9, 0x8e, 0xb4, 0x8c, - 0x18, 0x12, 0x1d, 0x3c, 0xab, 0x11, 0x86, 0xe3, 0x80, 0x35, 0x74, 0x5f, 0xe7, 0xe3, 0x98, 0xcd, - 0x69, 0xa4, 0x2d, 0xde, 0xc7, 0x21, 0xa7, 0xce, 0xd7, 0xc8, 0xd7, 0xc8, 0xf6, 0xb9, 0xaf, 0x57, - 0x02, 0x7e, 0x4a, 0x05, 0xa4, 0x31, 0x89, 0x0a, 0xdb, 0x80, 0x94, 0xd5, 0xf3, 0xcd, 0x31, 0x22, - 0x81, 0xd9, 0xea, 0xc9, 0xed, 0xdd, 0x7e, 0xec, 0xe5, 0xdd, 0x7e, 0x79, 0x68, 0xfa, 0xa3, 0x69, - 0x5f, 0x1a, 0x38, 0xe3, 0xe3, 0x91, 0xee, 0x8d, 0xcc, 0x81, 0x33, 0x71, 0x8f, 0x3d, 0x34, 0xf9, - 0x81, 0xfc, 0x48, 0x8a, 0x3e, 0x76, 0x9d, 0x89, 0xdf, 0xc1, 0x2b, 0xb5, 0xa4, 0x15, 0xfc, 0x71, - 0x7b, 0x90, 0xb1, 0x1d, 0x03, 0xf5, 0x6c, 0x1d, 0x47, 0x4b, 0xe0, 0x68, 0x19, 0x2d, 0x1d, 0x0c, - 0xa8, 0xb8, 0xcf, 0x7d, 0x05, 0x10, 0x8a, 0xe9, 0x99, 0x06, 0xcf, 0x06, 0xb3, 0xd5, 0xfc, 0xc3, - 0xdd, 0x7e, 0x26, 0x14, 0xd6, 0xa8, 0x6b, 0x51, 0xfd, 0x1a, 0x86, 0x78, 0xc3, 0x00, 0x1b, 0x88, - 0xe4, 0x4a, 0xb0, 0xd1, 0x50, 0x2f, 0x2b, 0x4a, 0xa3, 0x5e, 0x88, 0x09, 0x7b, 0xbf, 0xfc, 0x7e, - 0xb0, 0x4b, 0x27, 0x12, 0x20, 0x0d, 0xfb, 0x5a, 0xb7, 0x4c, 0x83, 0x13, 0x81, 0x3d, 0x6b, 0x36, - 0xd4, 0x02, 0x23, 0xf0, 0x18, 0xdb, 0x5e, 0xc7, 0xce, 0x1c, 0xd3, 0xe6, 0x0e, 0x21, 0xa9, 0xc8, - 0x95, 0x4b, 0xb9, 0x10, 0x17, 0x3e, 0xc1, 0xd0, 0xc7, 0xeb, 0x90, 0x82, 0xf4, 0x6b, 0x24, 0xe4, - 0x6e, 0xfe, 0x28, 0xc6, 0xfe, 0xfe, 0xb3, 0x48, 0x14, 0x88, 0xff, 0x30, 0xcb, 0x1a, 0xdb, 0xfe, - 0x64, 0xb6, 0x96, 0x09, 0xf3, 0x7c, 0x26, 0xef, 0xad, 0xbe, 0x3c, 0x6c, 0x58, 0x58, 0xbd, 0x69, - 0x0f, 0x49, 0x71, 0xd3, 0x5a, 0xd4, 0x15, 0x7f, 0x65, 0x60, 0x33, 0x94, 0xd6, 0x9a, 0x7a, 0xa3, - 0xd6, 0xd4, 0xb2, 0x28, 0x55, 0xcc, 0xdb, 0xaa, 0x3a, 0x85, 0x74, 0x98, 0xad, 0x87, 0x53, 0x4c, - 0x94, 0xb2, 0x27, 0xbb, 0x4f, 0xd8, 0x2e, 0xa8, 0x9c, 0xb6, 0x00, 0xc5, 0x7f, 0x13, 0x00, 0x1d, - 0xbd, 0x6f, 0xa1, 0xb9, 0x6d, 0xa5, 0x15, 0xdb, 0x0a, 0xd4, 0xfa, 0x25, 0xf4, 0xc1, 0x9b, 0x96, - 0xfb, 0x0c, 0xc0, 0x0f, 0xe4, 0xce, 0x63, 0x25, 0x49, 0xac, 0x0c, 0x19, 0x21, 0xc1, 0x0a, 0x90, - 0xb8, 0x42, 0x33, 0x3e, 0x45, 0xc6, 0x83, 0x26, 0xb7, 0x0d, 0x49, 0xec, 0xdd, 0x29, 0xe2, 0x37, - 0xc8, 0x99, 0x9e, 0x77, 0xc4, 0xbf, 0x22, 0xef, 0x1f, 0xd1, 0xde, 0x27, 0x7e, 0x5d, 0x56, 0x83, - 0x76, 0xfe, 0x21, 0xa4, 0x6a, 0x9a, 0x5c, 0xe9, 0xc8, 0x91, 0xf7, 0x57, 0xb1, 0xda, 0x04, 0xe9, - 0x3e, 0x0a, 0xa8, 0x6e, 0xab, 0x1e, 0x50, 0xf1, 0xa7, 0xa8, 0xae, 0x6b, 0x84, 0x54, 0x5d, 0x56, - 0x64, 0x4c, 0x25, 0x9e, 0xa2, 0xea, 0xc8, 0x42, 0xfe, 0xfa, 0x09, 0xf9, 0x1f, 0x1b, 0xac, 0x3a, - 0xb5, 0xae, 0xda, 0x33, 0x7b, 0x10, 0xdd, 0x6c, 0xef, 0xd0, 0x60, 0x07, 0x90, 0x9d, 0xda, 0x9e, - 0x63, 0x99, 0x03, 0xd3, 0x47, 0x06, 0xd9, 0xf1, 0xb4, 0x46, 0x0f, 0x3d, 0xbf, 0x87, 0x02, 0xe5, - 0x4f, 0x16, 0xfb, 0x33, 0xb3, 0xb4, 0x61, 0x70, 0x68, 0x5c, 0x7d, 0x66, 0x39, 0xba, 0x41, 0xb6, - 0x2b, 0xa7, 0x45, 0x5d, 0xf1, 0x67, 0x9c, 0x53, 0xcd, 0xc1, 0x5a, 0xa6, 0xb6, 0x11, 0xe5, 0x54, - 0x87, 0xf4, 0x78, 0xde, 0xf4, 0x70, 0x56, 0x81, 0xd3, 0x4b, 0x94, 0x53, 0xd7, 0x68, 0xa9, 0x6d, - 0x8e, 0x5d, 0x0b, 0x85, 0x3d, 0x6d, 0xb1, 0x52, 0xf8, 0x12, 0xf2, 0x2b, 0x53, 0x81, 0x88, 0x56, - 0x28, 0x82, 0x59, 0x11, 0x51, 0xfe, 0x2d, 0x0e, 0x59, 0xea, 0x21, 0xe0, 0x3e, 0xa7, 0x0d, 0xb1, - 0x83, 0x77, 0x87, 0xa3, 0x66, 0x23, 0x37, 0x48, 0x90, 0x57, 0xe5, 0xce, 0x77, 0x4d, 0xed, 0xbc, - 0x27, 0x5f, 0xca, 0x6a, 0x07, 0x9b, 0x82, 0xdc, 0x9b, 0x14, 0xba, 0xf2, 0x64, 0x94, 0x21, 0xdb, - 0xa9, 0x54, 0x15, 0x39, 0xa4, 0xc3, 0x9b, 0x91, 0xa2, 0xa9, 0x73, 0x7a, 0x04, 0x99, 0x56, 0xb7, - 0xfd, 0x6d, 0xaf, 0xd5, 0x55, 0x14, 0x6c, 0x90, 0x5d, 0x4c, 0x6e, 0x51, 0xe4, 0xe2, 0x7a, 0xc1, - 0x5c, 0xb5, 0xab, 0x9c, 0xf7, 0xda, 0xdf, 0xab, 0xb5, 0x02, 0xfb, 0x88, 0x8b, 0xcc, 0xc2, 0x7d, - 0x01, 0xe9, 0x5a, 0xf3, 0xa2, 0xd5, 0xec, 0xaa, 0xf5, 0x42, 0xf2, 0x11, 0x16, 0x55, 0x54, 0xd8, - 0x0a, 0xed, 0x46, 0x17, 0xa3, 0xca, 0xbf, 0xb8, 0x2f, 0xc6, 0x5e, 0xdf, 0x17, 0x99, 0x9f, 0x1e, - 0x8a, 0xcc, 0x2d, 0xfe, 0xfe, 0xc3, 0xdf, 0x2b, 0xfc, 0xf5, 0x53, 0xe4, 0xb5, 0x3d, 0x7d, 0x13, - 0x00, 0x00, 0xff, 0xff, 0x7d, 0x9c, 0x5f, 0x56, 0xa1, 0x07, 0x00, 0x00, + // 887 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xcc, 0x96, 0xc1, 0x6e, 0xe3, 0x44, + 0x18, 0xc7, 0xeb, 0xc4, 0x49, 0xe3, 0xaf, 0x0d, 0x1b, 0xbc, 0xdd, 0xad, 0xd7, 0x0b, 0x49, 0x31, + 0xcb, 0x2a, 0x44, 0xe0, 0xa2, 0xee, 0x13, 0x24, 0xb1, 0x05, 0xd9, 0xf5, 0x3a, 0x91, 0x93, 0x14, + 0x71, 0x8a, 0x9c, 0x78, 0x48, 0xac, 0x3a, 0xb6, 0x15, 0x3b, 0x45, 0x39, 0x81, 0x38, 0xad, 0x78, + 0x07, 0x4e, 0xcb, 0x99, 0x07, 0xe0, 0xc0, 0x89, 0xc3, 0x8a, 0x13, 0xdc, 0x10, 0x87, 0x8a, 0xee, + 0x13, 0xf0, 0x08, 0x8c, 0xc7, 0x76, 0x32, 0x4e, 0xa3, 0x5e, 0x40, 0xc0, 0xc1, 0xad, 0x67, 0xe6, + 0xe7, 0xcf, 0xdf, 0xf7, 0x9f, 0xff, 0xe7, 0x09, 0xdc, 0x71, 0x51, 0xf8, 0x85, 0xb7, 0xb8, 0xb0, + 0xc6, 0xb2, 0xbf, 0xf0, 0x42, 0x8f, 0xe7, 0xd6, 0x13, 0xe2, 0xd1, 0xd4, 0x9b, 0x7a, 0x64, 0xf6, + 0x34, 0xba, 0x8b, 0x01, 0xa9, 0x0b, 0xe5, 0x8f, 0xbd, 0x20, 0xb0, 0xfd, 0xe7, 0x28, 0x08, 0xcc, + 0x29, 0xe2, 0x1b, 0xc0, 0x86, 0x2b, 0x1f, 0x09, 0xcc, 0x09, 0x53, 0x7f, 0xe3, 0xec, 0xbe, 0xbc, + 0x89, 0x98, 0x10, 0x03, 0xbc, 0x6a, 0x10, 0x86, 0xe7, 0x81, 0xb5, 0xcc, 0xd0, 0x14, 0x72, 0x98, + 0x3d, 0x34, 0xc8, 0xbd, 0xf4, 0x32, 0x07, 0x9c, 0xee, 0x59, 0x48, 0xbd, 0x44, 0x6e, 0xc8, 0x7f, + 0x98, 0x89, 0xf6, 0x80, 0x8a, 0xb6, 0x66, 0x64, 0x2a, 0x60, 0x07, 0x8a, 0xce, 0x28, 0xb4, 0xe7, + 0x88, 0x84, 0x64, 0x5b, 0x67, 0xaf, 0xae, 0x6a, 0x7b, 0xbf, 0x5f, 0xd5, 0x1a, 0x53, 0x3b, 0x9c, + 0x2d, 0xc7, 0xf2, 0xc4, 0x9b, 0x9f, 0xce, 0xcc, 0x60, 0x66, 0x4f, 0xbc, 0x85, 0x7f, 0x1a, 0xa0, + 0xc5, 0xe7, 0xe4, 0x8f, 0xac, 0x99, 0x73, 0xdf, 0x5b, 0x84, 0x03, 0xfc, 0xa4, 0x51, 0x70, 0xa2, + 0x7f, 0xfc, 0x43, 0xe0, 0x5c, 0xfc, 0x8a, 0x91, 0x6b, 0xe2, 0x68, 0x79, 0x1c, 0x8d, 0x33, 0x4a, + 0xd1, 0x84, 0x8e, 0xc7, 0xd2, 0x97, 0xc0, 0x46, 0x6f, 0xe5, 0xdf, 0x83, 0xfd, 0x8e, 0x7e, 0xde, + 0xd4, 0x3a, 0x4a, 0x65, 0x4f, 0x14, 0xbe, 0xf9, 0xf6, 0xe4, 0x68, 0x9d, 0x56, 0xb4, 0xde, 0x71, + 0x2f, 0x4d, 0xc7, 0xb6, 0xf8, 0x1a, 0xb0, 0x4f, 0xbb, 0x1d, 0xbd, 0xc2, 0x88, 0xf7, 0x30, 0xf3, + 0x66, 0x86, 0x79, 0xea, 0xd9, 0x2e, 0xff, 0x0e, 0x14, 0x34, 0xb5, 0x79, 0xae, 0x56, 0x72, 0xe2, + 0x7d, 0x4c, 0xf0, 0x19, 0x42, 0x43, 0xe6, 0x25, 0x12, 0x0f, 0x5f, 0xbc, 0xac, 0xee, 0xfd, 0xf0, + 0x5d, 0x95, 0xbc, 0x58, 0xba, 0xce, 0xc1, 0xa1, 0x1e, 0x6b, 0x11, 0x0b, 0xf5, 0x51, 0x46, 0xa8, + 0xb7, 0x68, 0xa1, 0x28, 0xec, 0x3f, 0xd0, 0x8a, 0xff, 0x00, 0x20, 0x49, 0x66, 0x64, 0x5b, 0x02, + 0x1b, 0xad, 0xb6, 0xca, 0xaf, 0xaf, 0x6a, 0x5c, 0x92, 0x58, 0x47, 0x31, 0x52, 0x97, 0x75, 0x2c, + 0xe9, 0x05, 0x93, 0x48, 0x5b, 0xa7, 0xa5, 0x7d, 0x88, 0x45, 0x39, 0xa6, 0x0b, 0xa1, 0xd5, 0x95, + 0xd6, 0xea, 0xc6, 0x3b, 0xb0, 0x85, 0x11, 0x81, 0x1f, 0x6d, 0x04, 0x7e, 0x80, 0xa1, 0x7b, 0xdb, + 0xd0, 0x2e, 0x8d, 0x7f, 0x64, 0x36, 0x1a, 0xbb, 0xe1, 0x62, 0xb5, 0x55, 0x09, 0x73, 0x7b, 0x25, + 0xff, 0x9a, 0xbe, 0x02, 0xec, 0x3b, 0x38, 0x7b, 0xdb, 0x9d, 0x12, 0x71, 0x4b, 0x46, 0x3a, 0x94, + 0xbe, 0x67, 0xe0, 0x4e, 0x92, 0x5a, 0x6f, 0x19, 0xcc, 0x7a, 0x4b, 0xc7, 0xa1, 0xb2, 0x62, 0xfe, + 0x6e, 0x56, 0x4f, 0xa0, 0x94, 0x54, 0x1b, 0xe0, 0x12, 0xf3, 0xf5, 0x83, 0xb3, 0xe3, 0x1d, 0xb6, + 0x8b, 0x94, 0x33, 0xd6, 0xe0, 0xed, 0x6d, 0xf5, 0x73, 0x1e, 0x60, 0x60, 0x8e, 0x9d, 0xa4, 0xf9, + 0xe5, 0x8c, 0xa7, 0x45, 0x2a, 0xf8, 0x06, 0xfa, 0xdf, 0x3b, 0x9a, 0x7f, 0x1b, 0x20, 0x8c, 0xd2, + 0x8d, 0x63, 0x15, 0x48, 0x2c, 0x8e, 0xcc, 0x90, 0x60, 0x15, 0xc8, 0x5f, 0xa0, 0x95, 0x50, 0x24, + 0xf3, 0xd1, 0x2d, 0x7f, 0x04, 0x05, 0x6c, 0xec, 0x25, 0x12, 0xf6, 0xc9, 0x67, 0x31, 0x1e, 0x44, + 0x9b, 0x19, 0x37, 0xc6, 0x63, 0xba, 0x31, 0x88, 0x99, 0x37, 0x6a, 0xd0, 0x6d, 0xf1, 0x08, 0x8a, + 0x6d, 0x43, 0x6d, 0x0e, 0xd4, 0xb4, 0x31, 0xb2, 0x58, 0x7b, 0x81, 0xcc, 0x10, 0x45, 0xd4, 0xb0, + 0xa7, 0x44, 0x54, 0x6e, 0x17, 0x35, 0xf4, 0xad, 0x84, 0x52, 0x54, 0x4d, 0xc5, 0x54, 0x7e, 0x17, + 0xa5, 0x20, 0x07, 0x85, 0xdb, 0xed, 0xf3, 0x2b, 0x76, 0x5f, 0x6b, 0xe9, 0x5c, 0xf4, 0x57, 0xee, + 0x24, 0x3d, 0x1c, 0xfe, 0x41, 0xf7, 0x9d, 0xc0, 0xc1, 0xd2, 0x0d, 0x3c, 0xc7, 0x9e, 0xd8, 0x21, + 0xb2, 0xc8, 0x8e, 0x97, 0x0c, 0x7a, 0xea, 0xf6, 0x3d, 0x14, 0x29, 0xf3, 0xb2, 0xd8, 0xbc, 0x1c, + 0xe5, 0x51, 0xdc, 0x51, 0xbe, 0xb9, 0x72, 0x3c, 0xd3, 0x22, 0xdb, 0x75, 0x68, 0xa4, 0x43, 0xe9, + 0x6b, 0x5c, 0x53, 0xdb, 0xc3, 0xb9, 0x2c, 0x5d, 0x2b, 0xad, 0x49, 0x81, 0xd2, 0x3c, 0xbe, 0x0d, + 0x70, 0x55, 0x51, 0x1b, 0xd4, 0x29, 0xa7, 0x6e, 0xd1, 0x72, 0xdf, 0x9e, 0xfb, 0x0e, 0x4a, 0x46, + 0xc6, 0xfa, 0x49, 0xf1, 0x7d, 0x28, 0x67, 0x96, 0xa2, 0x24, 0x7a, 0x49, 0x12, 0x4c, 0x26, 0x89, + 0xc6, 0x4f, 0x39, 0x38, 0xa0, 0xce, 0x52, 0xfe, 0x5d, 0xda, 0x10, 0xe4, 0xf8, 0xa0, 0x56, 0x53, + 0x37, 0xc8, 0x50, 0xd6, 0xd5, 0xc1, 0xa7, 0x5d, 0xe3, 0xd9, 0x48, 0x3d, 0x57, 0xf5, 0x01, 0x36, + 0x05, 0xf9, 0xa8, 0x52, 0x68, 0xe6, 0x3c, 0x69, 0xc0, 0xc1, 0xa0, 0xd9, 0xd2, 0xd4, 0x84, 0x4e, + 0x3e, 0x9b, 0x14, 0x4d, 0xf5, 0xe9, 0x63, 0xe0, 0x7a, 0xc3, 0xfe, 0x27, 0xa3, 0xde, 0x50, 0xd3, + 0xb0, 0x41, 0x8e, 0x31, 0x79, 0x97, 0x22, 0xd7, 0xdf, 0x1e, 0xcc, 0xb5, 0x86, 0xda, 0xb3, 0x51, + 0xff, 0x33, 0xbd, 0x5d, 0x61, 0x6f, 0x70, 0xa9, 0x59, 0xf0, 0xa9, 0x5a, 0x6a, 0x77, 0x9f, 0xf7, + 0xba, 0x43, 0x5d, 0xa9, 0x14, 0x6e, 0x60, 0xa9, 0xa2, 0xf8, 0x84, 0x00, 0xbd, 0xab, 0xa4, 0x19, + 0x16, 0x63, 0x63, 0xd2, 0xf5, 0xa4, 0x87, 0xa8, 0x78, 0x37, 0x31, 0x26, 0x2d, 0x5b, 0x4b, 0xf8, + 0xed, 0xba, 0xba, 0xf7, 0xe7, 0x75, 0x95, 0xf9, 0xea, 0x75, 0x95, 0x79, 0x85, 0xaf, 0x5f, 0xf0, + 0xf5, 0x07, 0xbe, 0xc6, 0x45, 0xf2, 0xd3, 0xe6, 0xc9, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x21, + 0x78, 0x72, 0xc3, 0x0e, 0x09, 0x00, 0x00, } diff --git a/networkdb/networkdb.proto b/networkdb/networkdb.proto index dbc7a921d0..7df1b42dca 100644 --- a/networkdb/networkdb.proto +++ b/networkdb/networkdb.proto @@ -41,6 +41,10 @@ enum MessageType { // which is a pack of many message of above types, packed into // a single compound message. COMPOUND = 5 [(gogoproto.enumvalue_customname) = "MessageTypeCompound"]; + + // NodeEvent message type is used to communicare node + // join/leave events in the cluster + NODE_EVENT = 6 [(gogoproto.enumvalue_customname) = "MessageTypeNodeEvent"]; } // GossipMessage is a basic message header used by all messages types. @@ -49,6 +53,29 @@ message GossipMessage { bytes data = 2; // Payload of the message of any type defined here. } +// NodeEvent message payload definition. +message NodeEvent { + enum Type { + option (gogoproto.goproto_enum_prefix) = false; + option (gogoproto.enum_customname) = "Type"; + + INVALID = 0 [(gogoproto.enumvalue_customname) = "NodeEventTypeInvalid"]; + // Join event is generated when this node joins the cluster. + JOIN = 1 [(gogoproto.enumvalue_customname) = "NodeEventTypeJoin"];; + // Leave event is generated when this node leaves the cluster. + LEAVE = 2 [(gogoproto.enumvalue_customname) = "NodeEventTypeLeave"];; + } + + Type type = 1; + + // Lamport time using a network lamport clock indicating the + // time this event was generated on the node where it was + // generated. + uint64 l_time = 2 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false]; + // Source node name. + string node_name = 3; +} + // NetworkEvent message payload definition. message NetworkEvent { enum Type {