Skip to content

Commit

Permalink
Protocol leg work
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake Neyer committed Feb 3, 2021
1 parent 8023289 commit 2579ea6
Show file tree
Hide file tree
Showing 8 changed files with 883 additions and 49 deletions.
62 changes: 62 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
module striveworks.us/stampede

go 1.15

require (
9fans.net/go v0.0.2 // indirect
github.com/Djarvur/go-err113 v0.1.0 // indirect
github.com/alecthomas/gometalinter v3.0.0+incompatible // indirect
github.com/alecthomas/units v0.0.0-20201120081800-1786d5ef83d4 // indirect
github.com/ashanbrown/forbidigo v1.1.0 // indirect
github.com/esimonov/ifshort v1.0.1 // indirect
github.com/fatih/gomodifytags v1.13.0 // indirect
github.com/go-critic/go-critic v0.5.4 // indirect
github.com/gofrs/uuid v4.0.0+incompatible
github.com/golangci/golangci-lint v1.36.0 // indirect
github.com/golangci/misspell v0.3.5 // indirect
github.com/golangci/revgrep v0.0.0-20180812185044-276a5c0a1039 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gostaticanalysis/analysisutil v0.6.1 // indirect
github.com/jingyugao/rowserrcheck v0.0.0-20210130005344-c6a0c12dd98d // indirect
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
github.com/magiconair/properties v1.8.4 // indirect
github.com/matoous/godox v0.0.0-20200801072554-4fb83dc2941e // indirect
github.com/mattn/go-runewidth v0.0.10 // indirect
github.com/mbilski/exhaustivestruct v1.2.0 // indirect
github.com/mdempsky/gocode v0.0.0-20200405233807-4acdcbdea79d // indirect
github.com/mgechev/revive v1.0.3 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/nicksnyder/go-i18n v1.10.1 // indirect
github.com/opennota/check v0.0.0-20180911053232-0c771f5545ff // indirect
github.com/pelletier/go-toml v1.8.1 // indirect
github.com/quasilyte/regex/syntax v0.0.0-20200805063351-8f842688393c // indirect
github.com/ramya-rao-a/go-outline v0.0.0-20200117021646-2a048b4510eb // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/godef v1.1.2 // indirect
github.com/sirupsen/logrus v1.7.0
github.com/spf13/afero v1.5.1 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518 // indirect
github.com/stretchr/objx v0.3.0 // indirect
github.com/stripe/safesql v0.2.0 // indirect
github.com/tdakkota/asciicheck v0.0.0-20200416200610-e657995f937b // indirect
github.com/tetafro/godot v1.4.4 // indirect
github.com/timakin/bodyclose v0.0.0-20200424151742-cb6215831a94 // indirect
github.com/tomarrell/wrapcheck v0.0.0-20201130113247-1683564d9756 // indirect
github.com/tpng/gopkgs v0.0.0-20180428091733-81e90e22e204 // indirect
github.com/tsenart/deadcode v0.0.0-20160724212837-210d2dc333e9 // indirect
github.com/zmb3/goaddimport v0.0.0-20170810013102-4ab94a07ab86 // indirect
github.com/zmb3/gogetdoc v0.0.0-20190228002656-b37376c5da6a // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.4.1 // indirect
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect
golang.org/x/text v0.3.5 // indirect
golang.org/x/tools v0.1.0 // indirect
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20191105091915-95d230a53780 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
honnef.co/go/tools v0.1.1 // indirect
mvdan.cc/unparam v0.0.0-20210104141923-aac4ce9116a7 // indirect
)
599 changes: 599 additions & 0 deletions go.sum

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package main

import (
"striveworks.us/stampede/pkg/listen"
"striveworks.us/stampede/pkg"
)

const (
sendAddress = "127.0.0.1:9999"
listenAddress = ":9999"
)

func main() {
Listen()
node := pkg.New(&pkg.NodeConfig{IsLeader: false})
node.Start()

}
24 changes: 0 additions & 24 deletions pkg/broadcast.go

This file was deleted.

40 changes: 40 additions & 0 deletions pkg/leader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package pkg

import (
"net"
"time"
)

type LeaderMessage struct {
Type string `json:"type"`
Timestamp time.Time `json:"time"`
Message string `json:"message"`
Node Node `json:"node"`
}

type LeaderResponse struct {
LeaderMessage LeaderMessage `json:"leader"`
Address net.Addr `json:"address"`
Connection net.PacketConn `json:"connection"`
}

func BecomeLeader(node Node) {
m := LeaderMessage{Type: "Election", Message: "Win", Timestamp: time.Now(), Node: node}
Broadcast(m)
}

func LeaderAsk(node Node) {
m := LeaderMessage{Type: "Election", Message: "Vote", Timestamp: time.Now(), Node: node}
Broadcast(m)
}

func DenyElection(node Node) {
m := LeaderMessage{Type: "Election", Message: "Denied", Timestamp: time.Now(), Node: node}
Broadcast(m)

}

func HeartBeat(node Node) {
m := LeaderMessage{Type: "Heartbeat", Message: "Alive", Timestamp: time.Now(), Node: node}
Broadcast(m)
}
23 changes: 0 additions & 23 deletions pkg/listen.go

This file was deleted.

114 changes: 114 additions & 0 deletions pkg/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package pkg

import (
"time"

"github.com/gofrs/uuid"
log "github.com/sirupsen/logrus"
)

const (
waitInterval = 3
electionInterval = 3
)

type Node struct {
UUID string `json:"uuid"`
IsLeader bool `json:"isleader"`
Voting bool `json:"voting"`
ElectionTime time.Time `json:"election"`
}

//NodeConfig ...
type NodeConfig struct {
IsLeader bool
}

//New ...
func New(cfg *NodeConfig) Node {

return Node{
UUID: generateUUID().String(),
IsLeader: cfg.IsLeader,
}
}

func (node Node) Start() error {
listener := make(chan LeaderResponse)
//Use a nodePool map so lookup times are O(1)
nodePool := make(map[string]Node)
electionDenials := 0
startTime := time.Now()

go Listen(listener)

for {
// Create non-blocking channel to listen for UDP messages
select {
case response := <-listener:
log.Info(response)
//Only care about messages that aren't from myself
if response.LeaderMessage.Node.UUID != node.UUID {

//Add other nodes to NodePool if they do not exist
if _, ok := nodePool[node.UUID]; ok {
} else {
nodePool[node.UUID] = node
}

//Increment Election denial
if response.LeaderMessage.Type == "Election" && response.LeaderMessage.Message == "Denied" {
electionDenials++
}

//Reject other leader elections if I am the leader
if node.IsLeader && response.LeaderMessage.Type == "Election" && response.LeaderMessage.Message == "Vote" {
DenyElection(node)
}

//TODO
//If I get heartbeats from other nodes and I am the leader, send back shared key

}
default:
}

waitElapsed := time.Since(startTime).Seconds()
electionElapsed := time.Since(node.ElectionTime).Seconds()

//If I am voting and have not heard any denies in election interval, become Leader
if !node.IsLeader && node.Voting && electionElapsed > electionInterval && electionDenials == 0 {
BecomeLeader(node)
node.IsLeader = true
log.Info("Have heard ", electionDenials, " denials")
log.Info("Assuming leader role")
}

//If I haven't gotten any heartbeats from other nodes in specified interval and I haven't already voted, make leader election
if !node.IsLeader && waitElapsed > waitInterval && !node.Voting && len(nodePool) == 0 {
node.Voting = true
node.ElectionTime = time.Now()
LeaderAsk(node)
log.Info("Sending election request")
}

//TODO
//Case when multiple nodes are competing for leader role
HeartBeat(node)
time.Sleep(1 * time.Second)
}
}

var nsUUID = uuid.Must(uuid.FromString("34b13033-50e7-4083-97f5-d389cf3a1c0e"))

func generateUUID() uuid.UUID {
id, err := uuid.NewV1()
if err != nil {
id, err = uuid.NewV4()
if err != nil {
return uuid.NewV5(nsUUID, time.Now().String())
}
}

return id
}
59 changes: 59 additions & 0 deletions pkg/udp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package pkg

import (
"encoding/json"
"log"
"net"
)

const (
sendAddress = "127.0.0.1:9999"
listenAddress = ":9999"
)

//Broadcast ..
func Broadcast(leaderMessage LeaderMessage) {
addr, err := net.ResolveUDPAddr("udp4", sendAddress)
if err != nil {
panic(err)
}

conn, err := net.DialUDP("udp4", nil, addr)

var jsonData []byte
jsonData, err = json.Marshal(leaderMessage)
if err != nil {
log.Println(err)
}
_, err = conn.Write(jsonData)
if err != nil {
panic(err)
}

}

//Listen ..
func Listen(c chan LeaderResponse) {
defer close(c)
conn, err := net.ListenPacket("udp4", listenAddress)
if err != nil {
panic(err)
}
defer conn.Close()

for {
buf := make([]byte, 1024)
n, addr, err := conn.ReadFrom(buf)
if err != nil {
panic(err)
}
data := buf[:n]
var response LeaderMessage
err = json.Unmarshal(data, &response)

result := LeaderResponse{LeaderMessage: response, Address: addr, Connection: conn}

c <- result

}
}

0 comments on commit 2579ea6

Please sign in to comment.