Skip to content

Commit

Permalink
systemd setup
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake Neyer committed Feb 4, 2021
1 parent a014293 commit f9016d8
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 472 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.vagrant
*console.log
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
test:
vagrant up

clean:
vagrant destroy --force
26 changes: 26 additions & 0 deletions Vagrantfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Vagrant.configure("2") do |config|

config.vm.provision "shell", path: "scripts/install.sh"

config.vm.synced_folder ".", "/opt/stampede"

config.vm.define "k8s1" do |k8s1|
k8s1.vm.hostname = "k8s1"
k8s1.vm.box = "ubuntu/bionic64"
k8s1.vm.network "private_network", type: "dhcp"
end

config.vm.define "k8s2" do |k8s2|
k8s2.vm.hostname = "k8s2"
k8s2.vm.box = "ubuntu/bionic64"
k8s2.vm.network "private_network", type: "dhcp"
end

config.vm.define "k8s3" do |k8s3|
k8s3.vm.hostname = "k8s3"
k8s3.vm.box = "ubuntu/bionic64"
k8s3.vm.network "private_network", type: "dhcp"

end

end
7 changes: 1 addition & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@ import (
"striveworks.us/stampede/pkg"
)

const (
sendAddress = "127.0.0.1:9999"
listenAddress = ":9999"
)

func main() {
node := pkg.New(&pkg.NodeConfig{IsLeader: false})
node := pkg.CreateNode()
node.Start()

}
32 changes: 14 additions & 18 deletions pkg/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,40 @@ package pkg

import (
"net"
"strings"
"time"
)

type LeaderMessage struct {
type Message struct {
Type string `json:"type"`
Timestamp time.Time `json:"time"`
Recipient string `json:"recipient"`
Message string `json:"message"`
Node Node `json:"node"`
}

type LeaderResponse struct {
LeaderMessage LeaderMessage `json:"leader"`
Address net.Addr `json:"address"`
Connection net.PacketConn `json:"connection"`
}

func BecomeLeader(node Node) {
m := LeaderMessage{Type: "Election", Message: "Win", Timestamp: time.Now(), Node: node}
Broadcast(m)
type MessageResponse struct {
Message Message `json:"message"`
Address net.Addr `json:"address"`
Connection net.PacketConn `json:"connection"`
}

func LeaderAsk(node Node) {
m := LeaderMessage{Type: "Election", Message: "Vote", Timestamp: time.Now(), Node: node}
m := Message{Type: "Election", Message: "Vote", Timestamp: time.Now(), Node: node}
Broadcast(m)
}

func DenyElection(node Node) {
m := LeaderMessage{Type: "Election", Message: "Denied", Timestamp: time.Now(), Node: node}
func HeartBeat(node Node) {
m := Message{Type: "Heartbeat", Message: "Alive", Timestamp: time.Now(), Node: node}
Broadcast(m)

}

func HeartBeat(node Node) {
m := LeaderMessage{Type: "Heartbeat", Message: "Alive", Timestamp: time.Now(), Node: node}
func JoinRequest(node Node) {
m := Message{Type: "JoinRequest", Message: "", Timestamp: time.Now(), Node: node}
Broadcast(m)
}

func LeaderEnforce(node Node) {
m := LeaderMessage{Type: "Heartbeat", Message: "Leader", Timestamp: time.Now(), Node: node}
func JoinResponse(uuid string, keys []string) {
m := Message{Type: "JoinResponse", Recipient: uuid, Message: strings.Join(keys, " "), Timestamp: time.Now()}
Broadcast(m)
}
218 changes: 126 additions & 92 deletions pkg/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package pkg

import (
"os"
"os/exec"
"strings"
"time"

"github.com/gofrs/uuid"
Expand All @@ -9,126 +12,157 @@ import (

const (
voteCount = 10
stateFile = "/opt/stampede/is-joined"
)

type Node struct {
UUID string `json:"uuid"`
IsLeader bool `json:"isleader"`
Voting bool `json:"voting"`
ElectionTime time.Time `json:"election"`
LastHeartBeat time.Time `json:"hearbeat"`
}
var (
nodePool map[string]Node
currentNode Node
votes int
)

//NodeConfig ...
type NodeConfig struct {
IsLeader bool
type Node struct {
UUID string `json:"uuid"`
IsLeader bool `json:"isleader"`
IsJoined bool `json:"isjoined"`
LastJoinRequest time.Time `json:"lastjoinrequest"`
Voting bool `json:"voting"`
ElectionTime time.Time `json:"election"`
LastHeartBeat time.Time `json:"hearbeat"`
}

//New ...
func New(cfg *NodeConfig) Node {

//CreateNode ...
func CreateNode() Node {
return Node{
UUID: generateUUID().String(),
IsLeader: cfg.IsLeader,
UUID: generateUUID().String(),
IsLeader: false,
LastJoinRequest: time.Now(),
}
}

func (node Node) Start() error {
listener := make(chan LeaderResponse)
//Use a nodePool map so lookup times are O(1)
nodePool := make(map[string]Node)
// electionDenials := 0
votes := 0
func (node Node) Start() {
if _, err := os.Stat(stateFile); err == nil {
log.Info("Already a cluster member")
os.Exit(0)
}

go Listen(listener)
currentNode = node
nodePool = make(map[string]Node)
votes = 0

go recieve()

for {
// Create non-blocking channel to listen for UDP messages
select {
case response := <-listener:
//Only care about messages that aren't from myself
if response.LeaderMessage.Node.UUID != node.UUID {
log.Info(response)
//Reset Node Heartbeat time
response.LeaderMessage.Node.LastHeartBeat = time.Now()

//Add other nodes to NodePool
nodePool[response.LeaderMessage.Node.UUID] = response.LeaderMessage.Node

// //Increment Election denial
// if response.LeaderMessage.Type == "Election" && response.LeaderMessage.Message == "Denied" {
// electionDenials++
// }
//
// //Reject other leader elections if I am the leader
// if node.IsLeader && response.LeaderMessage.Type == "Election" && response.LeaderMessage.Message == "Vote" {
// log.Info("Blocking the election")
// DenyElection(node)
// }

//TODO
//If I get heartbeats from other nodes and I am the leader, send back shared key
go cleanNodePool()

if !currentNode.IsLeader && currentNode.Voting && votes >= voteCount {
currentNode.IsLeader = true
log.Info("I am the captain now!")

}

if !currentNode.IsLeader {
log.Info("Following")

if !currentNode.Voting {
currentNode.ElectionTime = time.Now()
}
electNode()

if time.Since(currentNode.LastJoinRequest).Seconds() > 20 && !currentNode.IsJoined {
currentNode.LastJoinRequest = time.Now()
JoinRequest(currentNode)
}
default:
}
HeartBeat(currentNode)

time.Sleep(1 * time.Second)
}
}

func electNode() {
latest, err := latestElection(nodePool)
if err != nil {
log.Error(err)
}
if len(nodePool) == 0 || latest.ElectionTime.After(currentNode.ElectionTime) {
currentNode.Voting = true
LeaderAsk(currentNode)
votes++
log.Info(votes, "/", voteCount, " votes")
}
}

//If I am voting and have not heard any denies in election period, become Leader
if !node.IsLeader && node.Voting && votes >= voteCount {
BecomeLeader(node)
node.IsLeader = true
// log.Info("Have heard ", electionDenials, " denials")
log.Info("Assuming leader role")
func cleanNodePool() {
for _, v := range nodePool {
if time.Since(v.LastHeartBeat).Seconds() > 30 {
delete(nodePool, v.UUID)
log.Info("Deleted ", v.UUID, " from nodes")
}
}
}

//If I haven't already voted and it has been longer than the waitInterval, make leader election
if !node.IsLeader {
func recieve() {
listener := make(chan MessageResponse)
go Listen(listener)

//Set intial vote time
if !node.Voting {
node.ElectionTime = time.Now()
}
// Create non-blocking channel to listen for UDP messages
for {
select {
case response := <-listener:
if response.Message.Node.UUID != currentNode.UUID {
response.Message.Node.LastHeartBeat = time.Now()

//If I haven't gotten any heartbeats from other nodes
if len(nodePool) == 0 {
node.Voting = true
LeaderAsk(node)
votes++
log.Info(votes, "/", voteCount, " votes")
} else {
//If I have gotten heartbeats, but the ALL other node's election
// time came after mine still send a vote
latest, err := latestElection(nodePool)
if err != nil {
log.Error(err)
}
log.Info(latest.UUID)
if latest.ElectionTime.After(node.ElectionTime) {
node.Voting = true
LeaderAsk(node)
votes++
log.Info(votes, "/", voteCount, " votes")
nodePool[response.Message.Node.UUID] = response.Message.Node

if response.Message.Type == "JoinRequest" && currentNode.IsLeader {
addNode(response)
}

if response.Message.Type == "JoinResponse" && response.Message.Recipient == currentNode.UUID {
joinCluster(response)
}
}
default:
}
time.Sleep(100 * time.Millisecond)
}
}

HeartBeat(node)
func addNode(response MessageResponse) {
app := "microk8s"
arg := "add-node"

log.Info(len(nodePool))
for _, v := range nodePool {
if time.Since(v.LastHeartBeat).Seconds() > 10 {
delete(nodePool, v.UUID)
log.Info("Deleted ", v.UUID, " from nodes")
}
}
cmd := exec.Command(app, arg)
stdout, err := cmd.Output()
if err != nil {
log.Error(err)
}
msg := strings.Split(string(stdout), "\n")
log.Info(msg)
log.Info("Allowing ", response.Address, ": ", response.Message.Node.UUID, " to join. Sending keys")
JoinResponse(response.Message.Node.UUID, msg[len(msg)-5:])
}

if node.IsLeader {
log.Info("I am the captain now!")
} else {
log.Info("Following")
func joinCluster(response MessageResponse) {
for _, key := range strings.Split(response.Message.Message, " microk8s join ") {
app := "microk8s"
arg := "join"

cmd := exec.Command(app, arg, key)
_, err := cmd.Output()
if err == nil {
currentNode.IsJoined = true
_, err := os.Create(stateFile)
if err != nil {
log.Fatal(err)
}
log.Info("Joined cluster, shutting down...")
os.Exit(0)
}
time.Sleep(1 * time.Second)
}
if !currentNode.IsJoined {
log.Error("Failed to join cluster")
}
}

Expand Down
Loading

0 comments on commit f9016d8

Please sign in to comment.