From f9016d8cc37084a1a8efae03016772343719e894 Mon Sep 17 00:00:00 2001 From: Jake Neyer Date: Thu, 4 Feb 2021 16:39:18 -0500 Subject: [PATCH] systemd setup --- .gitignore | 2 + Makefile | 5 + Vagrantfile | 26 +++++ main.go | 7 +- pkg/leader.go | 32 +++--- pkg/node.go | 218 ++++++++++++++++++++++++----------------- pkg/udp.go | 8 +- scripts/init.sh | 49 --------- scripts/install.sh | 216 +++++----------------------------------- scripts/pre-install.sh | 109 --------------------- 10 files changed, 200 insertions(+), 472 deletions(-) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 Vagrantfile delete mode 100644 scripts/init.sh delete mode 100644 scripts/pre-install.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..626569e --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.vagrant +*console.log diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..991e9c9 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +test: + vagrant up + +clean: + vagrant destroy --force diff --git a/Vagrantfile b/Vagrantfile new file mode 100644 index 0000000..9f71309 --- /dev/null +++ b/Vagrantfile @@ -0,0 +1,26 @@ +Vagrant.configure("2") do |config| + + config.vm.provision "shell", path: "scripts/install.sh" + + config.vm.synced_folder ".", "/opt/stampede" + + config.vm.define "k8s1" do |k8s1| + k8s1.vm.hostname = "k8s1" + k8s1.vm.box = "ubuntu/bionic64" + k8s1.vm.network "private_network", type: "dhcp" + end + + config.vm.define "k8s2" do |k8s2| + k8s2.vm.hostname = "k8s2" + k8s2.vm.box = "ubuntu/bionic64" + k8s2.vm.network "private_network", type: "dhcp" + end + + config.vm.define "k8s3" do |k8s3| + k8s3.vm.hostname = "k8s3" + k8s3.vm.box = "ubuntu/bionic64" + k8s3.vm.network "private_network", type: "dhcp" + + end + +end diff --git a/main.go b/main.go index 49f0d04..a5043a4 100644 --- a/main.go +++ b/main.go @@ -4,13 +4,8 @@ import ( "striveworks.us/stampede/pkg" ) -const ( - sendAddress = "127.0.0.1:9999" - listenAddress = ":9999" -) - func main() { - node := pkg.New(&pkg.NodeConfig{IsLeader: false}) + node := pkg.CreateNode() node.Start() } diff --git a/pkg/leader.go b/pkg/leader.go index d2d4aac..9dc3c89 100644 --- a/pkg/leader.go +++ b/pkg/leader.go @@ -2,44 +2,40 @@ package pkg import ( "net" + "strings" "time" ) -type LeaderMessage struct { +type Message struct { Type string `json:"type"` Timestamp time.Time `json:"time"` + Recipient string `json:"recipient"` Message string `json:"message"` Node Node `json:"node"` } -type LeaderResponse struct { - LeaderMessage LeaderMessage `json:"leader"` - Address net.Addr `json:"address"` - Connection net.PacketConn `json:"connection"` -} - -func BecomeLeader(node Node) { - m := LeaderMessage{Type: "Election", Message: "Win", Timestamp: time.Now(), Node: node} - Broadcast(m) +type MessageResponse struct { + Message Message `json:"message"` + Address net.Addr `json:"address"` + Connection net.PacketConn `json:"connection"` } func LeaderAsk(node Node) { - m := LeaderMessage{Type: "Election", Message: "Vote", Timestamp: time.Now(), Node: node} + m := Message{Type: "Election", Message: "Vote", Timestamp: time.Now(), Node: node} Broadcast(m) } -func DenyElection(node Node) { - m := LeaderMessage{Type: "Election", Message: "Denied", Timestamp: time.Now(), Node: node} +func HeartBeat(node Node) { + m := Message{Type: "Heartbeat", Message: "Alive", Timestamp: time.Now(), Node: node} Broadcast(m) - } -func HeartBeat(node Node) { - m := LeaderMessage{Type: "Heartbeat", Message: "Alive", Timestamp: time.Now(), Node: node} +func JoinRequest(node Node) { + m := Message{Type: "JoinRequest", Message: "", Timestamp: time.Now(), Node: node} Broadcast(m) } -func LeaderEnforce(node Node) { - m := LeaderMessage{Type: "Heartbeat", Message: "Leader", Timestamp: time.Now(), Node: node} +func JoinResponse(uuid string, keys []string) { + m := Message{Type: "JoinResponse", Recipient: uuid, Message: strings.Join(keys, " "), Timestamp: time.Now()} Broadcast(m) } diff --git a/pkg/node.go b/pkg/node.go index a209fca..f538f19 100644 --- a/pkg/node.go +++ b/pkg/node.go @@ -1,6 +1,9 @@ package pkg import ( + "os" + "os/exec" + "strings" "time" "github.com/gofrs/uuid" @@ -9,126 +12,157 @@ import ( const ( voteCount = 10 + stateFile = "/opt/stampede/is-joined" ) -type Node struct { - UUID string `json:"uuid"` - IsLeader bool `json:"isleader"` - Voting bool `json:"voting"` - ElectionTime time.Time `json:"election"` - LastHeartBeat time.Time `json:"hearbeat"` -} +var ( + nodePool map[string]Node + currentNode Node + votes int +) -//NodeConfig ... -type NodeConfig struct { - IsLeader bool +type Node struct { + UUID string `json:"uuid"` + IsLeader bool `json:"isleader"` + IsJoined bool `json:"isjoined"` + LastJoinRequest time.Time `json:"lastjoinrequest"` + Voting bool `json:"voting"` + ElectionTime time.Time `json:"election"` + LastHeartBeat time.Time `json:"hearbeat"` } -//New ... -func New(cfg *NodeConfig) Node { - +//CreateNode ... +func CreateNode() Node { return Node{ - UUID: generateUUID().String(), - IsLeader: cfg.IsLeader, + UUID: generateUUID().String(), + IsLeader: false, + LastJoinRequest: time.Now(), } } -func (node Node) Start() error { - listener := make(chan LeaderResponse) - //Use a nodePool map so lookup times are O(1) - nodePool := make(map[string]Node) - // electionDenials := 0 - votes := 0 +func (node Node) Start() { + if _, err := os.Stat(stateFile); err == nil { + log.Info("Already a cluster member") + os.Exit(0) + } - go Listen(listener) + currentNode = node + nodePool = make(map[string]Node) + votes = 0 + + go recieve() for { - // Create non-blocking channel to listen for UDP messages - select { - case response := <-listener: - //Only care about messages that aren't from myself - if response.LeaderMessage.Node.UUID != node.UUID { - log.Info(response) - //Reset Node Heartbeat time - response.LeaderMessage.Node.LastHeartBeat = time.Now() - - //Add other nodes to NodePool - nodePool[response.LeaderMessage.Node.UUID] = response.LeaderMessage.Node - - // //Increment Election denial - // if response.LeaderMessage.Type == "Election" && response.LeaderMessage.Message == "Denied" { - // electionDenials++ - // } - // - // //Reject other leader elections if I am the leader - // if node.IsLeader && response.LeaderMessage.Type == "Election" && response.LeaderMessage.Message == "Vote" { - // log.Info("Blocking the election") - // DenyElection(node) - // } - - //TODO - //If I get heartbeats from other nodes and I am the leader, send back shared key + go cleanNodePool() + + if !currentNode.IsLeader && currentNode.Voting && votes >= voteCount { + currentNode.IsLeader = true + log.Info("I am the captain now!") + + } + + if !currentNode.IsLeader { + log.Info("Following") + if !currentNode.Voting { + currentNode.ElectionTime = time.Now() + } + electNode() + + if time.Since(currentNode.LastJoinRequest).Seconds() > 20 && !currentNode.IsJoined { + currentNode.LastJoinRequest = time.Now() + JoinRequest(currentNode) } - default: } + HeartBeat(currentNode) + + time.Sleep(1 * time.Second) + } +} + +func electNode() { + latest, err := latestElection(nodePool) + if err != nil { + log.Error(err) + } + if len(nodePool) == 0 || latest.ElectionTime.After(currentNode.ElectionTime) { + currentNode.Voting = true + LeaderAsk(currentNode) + votes++ + log.Info(votes, "/", voteCount, " votes") + } +} - //If I am voting and have not heard any denies in election period, become Leader - if !node.IsLeader && node.Voting && votes >= voteCount { - BecomeLeader(node) - node.IsLeader = true - // log.Info("Have heard ", electionDenials, " denials") - log.Info("Assuming leader role") +func cleanNodePool() { + for _, v := range nodePool { + if time.Since(v.LastHeartBeat).Seconds() > 30 { + delete(nodePool, v.UUID) + log.Info("Deleted ", v.UUID, " from nodes") } + } +} - //If I haven't already voted and it has been longer than the waitInterval, make leader election - if !node.IsLeader { +func recieve() { + listener := make(chan MessageResponse) + go Listen(listener) - //Set intial vote time - if !node.Voting { - node.ElectionTime = time.Now() - } + // Create non-blocking channel to listen for UDP messages + for { + select { + case response := <-listener: + if response.Message.Node.UUID != currentNode.UUID { + response.Message.Node.LastHeartBeat = time.Now() - //If I haven't gotten any heartbeats from other nodes - if len(nodePool) == 0 { - node.Voting = true - LeaderAsk(node) - votes++ - log.Info(votes, "/", voteCount, " votes") - } else { - //If I have gotten heartbeats, but the ALL other node's election - // time came after mine still send a vote - latest, err := latestElection(nodePool) - if err != nil { - log.Error(err) - } - log.Info(latest.UUID) - if latest.ElectionTime.After(node.ElectionTime) { - node.Voting = true - LeaderAsk(node) - votes++ - log.Info(votes, "/", voteCount, " votes") + nodePool[response.Message.Node.UUID] = response.Message.Node + + if response.Message.Type == "JoinRequest" && currentNode.IsLeader { + addNode(response) } + if response.Message.Type == "JoinResponse" && response.Message.Recipient == currentNode.UUID { + joinCluster(response) + } } + default: } + time.Sleep(100 * time.Millisecond) + } +} - HeartBeat(node) +func addNode(response MessageResponse) { + app := "microk8s" + arg := "add-node" - log.Info(len(nodePool)) - for _, v := range nodePool { - if time.Since(v.LastHeartBeat).Seconds() > 10 { - delete(nodePool, v.UUID) - log.Info("Deleted ", v.UUID, " from nodes") - } - } + cmd := exec.Command(app, arg) + stdout, err := cmd.Output() + if err != nil { + log.Error(err) + } + msg := strings.Split(string(stdout), "\n") + log.Info(msg) + log.Info("Allowing ", response.Address, ": ", response.Message.Node.UUID, " to join. Sending keys") + JoinResponse(response.Message.Node.UUID, msg[len(msg)-5:]) +} - if node.IsLeader { - log.Info("I am the captain now!") - } else { - log.Info("Following") +func joinCluster(response MessageResponse) { + for _, key := range strings.Split(response.Message.Message, " microk8s join ") { + app := "microk8s" + arg := "join" + + cmd := exec.Command(app, arg, key) + _, err := cmd.Output() + if err == nil { + currentNode.IsJoined = true + _, err := os.Create(stateFile) + if err != nil { + log.Fatal(err) + } + log.Info("Joined cluster, shutting down...") + os.Exit(0) } - time.Sleep(1 * time.Second) + } + if !currentNode.IsJoined { + log.Error("Failed to join cluster") } } diff --git a/pkg/udp.go b/pkg/udp.go index 01cb1c8..aef8eee 100644 --- a/pkg/udp.go +++ b/pkg/udp.go @@ -11,7 +11,7 @@ const ( ) //Broadcast .. -func Broadcast(leaderMessage LeaderMessage) { +func Broadcast(leaderMessage Message) { addr, err := net.ResolveUDPAddr("udp4", address) if err != nil { panic(err) @@ -32,7 +32,7 @@ func Broadcast(leaderMessage LeaderMessage) { } //Listen .. -func Listen(c chan LeaderResponse) { +func Listen(c chan MessageResponse) { defer close(c) addr, err := net.ResolveUDPAddr("udp4", address) if err != nil { @@ -50,10 +50,10 @@ func Listen(c chan LeaderResponse) { panic(err) } data := buf[:n] - var response LeaderMessage + var response Message err = json.Unmarshal(data, &response) - result := LeaderResponse{LeaderMessage: response, Address: addr, Connection: conn} + result := MessageResponse{Message: response, Address: addr, Connection: conn} c <- result diff --git a/scripts/init.sh b/scripts/init.sh deleted file mode 100644 index e6a7471..0000000 --- a/scripts/init.sh +++ /dev/null @@ -1,49 +0,0 @@ -#/bin/env bash -set -ex - -source install.sh -source pre-install.sh - -host1="chariotmaster01.america.striveworks.us" -host2="chariotmaster02.america.striveworks.us" -host3="chariotmaster03.america.striveworks.us" - -RKE_USER="rke" - -if [ ! -f /root/${RKE_USER}-password.txt]; then - RKE=$(cat "/root/${RKE_USER}-password.txt") -else - RKE_PASS=$(tr -dc A-Za-z0-9 "/root/${RKE_USER}-password.txt" -fi - - -# Setup RKE user -setup_user - -# Disable swap -disable_swap - -# Kernel param config -kernel_setup - -# Install docker -install_docker - -# Allow tcp forwarding -allow_tcp_ssh_forwarding - - -if [ "$CONTROLLER" = "1" ]; - then - - # Create SSH keys for RKE/cluster - create_ssh_keys $host1 $host2 $host3 - # Get RKE binary - get_rke_binary - # Build RKE config - build_cluster_config $host1 $host2 $host3 - # Init RKE - cd /opt - rke_up -fi diff --git a/scripts/install.sh b/scripts/install.sh index 8f16954..a689e40 100644 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -1,203 +1,31 @@ #/bin/env bash set -ex +mkdir -p /opt/stampede -function get_rke_binary { - wget https://github.com/rancher/rke/releases/download/v1.0.16/rke_linux-amd64 -O rke - chmod +x rke - cp rke /usr/local/bin -} +#Install Go +snap install go --classic +#Build binary +cd /opt/stampede && go build -o /usr/local/bin/stampede . -function build_cluster_config () { -cat << EOF > /home/$RKE_USER/cluster.yml -nodes: -- address: "$1" - port: "22" - internal_address: "" - role: - - controlplane - - etcd - hostname_override: "" - user: $RKE_USER - docker_socket: /var/run/docker.sock - ssh_key: "" - ssh_key_path: /home/$RKE_USER/.ssh/id_rsa - ssh_cert: "" - ssh_cert_path: "" - labels: {} - taints: [] -- address: "$2" - port: "22" - internal_address: "" - role: - - controlplane - - etcd - hostname_override: "" - user: $RKE_USER - docker_socket: /var/run/docker.sock - ssh_key: "" - ssh_key_path: /home/$RKE_USER/.ssh/id_rsa - ssh_cert: "" - ssh_cert_path: "" - labels: {} - taints: [] -- address: "$3" - port: "22" - internal_address: "" - role: - - controlplane - - etcd - hostname_override: "" - user: $RKE_USER - docker_socket: /var/run/docker.sock - ssh_key: "" - ssh_key_path: /home/$RKE_USER/.ssh/id_rsa - ssh_cert: "" - ssh_cert_path: "" - labels: {} - taints: [] -services: - etcd: - image: "" - extra_args: {} - extra_binds: [] - extra_env: [] - external_urls: [] - ca_cert: "" - cert: "" - key: "" - path: "" - uid: 0 - gid: 0 - snapshot: null - retention: "" - creation: "" - backup_config: null - kube-api: - image: "" - extra_args: {} - extra_binds: [] - extra_env: [] - service_cluster_ip_range: 10.43.0.0/16 - service_node_port_range: "" - pod_security_policy: false - always_pull_images: false - secrets_encryption_config: null - audit_log: null - admission_configuration: null - event_rate_limit: null - kube-controller: - image: "" - extra_args: {} - extra_binds: [] - extra_env: [] - cluster_cidr: 10.42.0.0/16 - service_cluster_ip_range: 10.43.0.0/16 - scheduler: - image: "" - extra_args: {} - extra_binds: [] - extra_env: [] - kubelet: - image: "" - extra_args: {} - extra_binds: [] - extra_env: [] - cluster_domain: cluster.local - infra_container_image: "" - cluster_dns_server: 10.43.0.10 - fail_swap_on: false - generate_serving_certificate: false - kubeproxy: - image: "" - extra_args: {} - extra_binds: [] - extra_env: [] -network: - plugin: none -authentication: - strategy: x509 - sans: [] - webhook: null -addons: "" -addons_include: [] -system_images: - etcd: rancher/coreos-etcd:v3.4.3-rancher1 - alpine: rancher/rke-tools:v0.1.69 - nginx_proxy: rancher/rke-tools:v0.1.69 - cert_downloader: rancher/rke-tools:v0.1.69 - kubernetes_services_sidecar: rancher/rke-tools:v0.1.69 - kubedns: rancher/k8s-dns-kube-dns:1.15.0 - dnsmasq: rancher/k8s-dns-dnsmasq-nanny:1.15.0 - kubedns_sidecar: rancher/k8s-dns-sidecar:1.15.0 - kubedns_autoscaler: rancher/cluster-proportional-autoscaler:1.7.1 - coredns: rancher/coredns-coredns:1.6.5 - coredns_autoscaler: rancher/cluster-proportional-autoscaler:1.7.1 - nodelocal: rancher/k8s-dns-node-cache:1.15.7 - kubernetes: rancher/hyperkube:v1.17.17-rancher1 - flannel: rancher/coreos-flannel:v0.12.0 - flannel_cni: rancher/flannel-cni:v0.3.0-rancher6 - calico_node: rancher/calico-node:v3.13.4 - calico_cni: rancher/calico-cni:v3.13.4 - calico_controllers: rancher/calico-kube-controllers:v3.13.4 - calico_ctl: rancher/calico-ctl:v3.13.4 - calico_flexvol: rancher/calico-pod2daemon-flexvol:v3.13.4 - canal_node: rancher/calico-node:v3.13.4 - canal_cni: rancher/calico-cni:v3.13.4 - canal_flannel: rancher/coreos-flannel:v0.12.0 - canal_flexvol: rancher/calico-pod2daemon-flexvol:v3.13.4 - weave_node: weaveworks/weave-kube:2.6.4 - weave_cni: weaveworks/weave-npc:2.6.4 - pod_infra_container: rancher/pause:3.1 - ingress: rancher/nginx-ingress-controller:nginx-0.35.0-rancher2 - ingress_backend: rancher/nginx-ingress-controller-defaultbackend:1.5-rancher1 - metrics_server: rancher/metrics-server:v0.3.6 - windows_pod_infra_container: rancher/kubelet-pause:v0.1.4 -ssh_key_path: ~/.ssh/id_rsa -ssh_cert_path: "" -ssh_agent_auth: false -authorization: - mode: rbac - options: {} -ignore_docker_version: false -kubernetes_version: "" -private_registries: [] -ingress: - provider: "" - options: {} - node_selector: {} - extra_args: {} - dns_policy: "" - extra_envs: [] - extra_volumes: [] - extra_volume_mounts: [] -cluster_name: "" -cloud_provider: - name: "" -prefix_path: "" -addon_job_timeout: 0 -bastion_host: - address: "" - port: "" - user: "" - ssh_key: "" - ssh_key_path: "" - ssh_cert: "" - ssh_cert_path: "" -monitoring: - provider: "" - options: {} - node_selector: {} -restore: - restore: false - snapshot_name: "" -dns: null +#Install microk8s +snap install microk8s --classic --channel=1.18/stable -EOF -} +#Send multicast traffic through default interface +ip route add 224.0.0.0/4 dev $(route | grep '^default' | grep -o '[^ ]*$') + +#Setup stampede systemd service +cat << EOF > /lib/systemd/system/stampede.service +[Unit] +Description=stampede is a microk8s bootstrapping utility to elect a leader and add nodes +[Service] +Type=simple +Restart=always +RestartSec=5s +ExecStart=/usr/local/bin/stampede -function rke_up { - sudo -H -u rke bash -c 'cd $HOME && rke up --debug' -} +[Install] +WantedBy=multi-user.target +EOF diff --git a/scripts/pre-install.sh b/scripts/pre-install.sh deleted file mode 100644 index 5a60c4d..0000000 --- a/scripts/pre-install.sh +++ /dev/null @@ -1,109 +0,0 @@ -#/bin/env bash -set -ex - - -function setup_user { - # Setup Docker for non-root user - groupadd -f docker - - # Create RKE User - useradd $RKE_USER \ - --groups docker \ - --create-home \ - --password $RKE_PASS \ - || true - - passwd $RKE_USER --stdin <<< "$RKE_PASS" - # Add RKE User to sudoers - grep -qxF "$RKE_USER ALL=(ALL:ALL) ALL" /etc/sudoers || echo "$RKE_USER ALL=(ALL:ALL) ALL" >> /etc/sudoers - -} - -function create_ssh_keys { - apt-get install sshpass -y - rm -rf /home/$RKE_USER/.ssh/ - mkdir -p /home/$RKE_USER/.ssh/ - mkdir -p $HOME/.ssh - ssh-keygen -b 2048 -t rsa -f /home/$RKE_USER/.ssh/id_rsa -q -N "" - cat /home/$RKE_USER/.ssh/id_rsa.pub >> /home/$RKE_USER/.ssh/authorized_keys - chown $RKE_USER:$RKE_USER /home/$RKE_USER/.ssh/id_rsa - chown $RKE_USER:$RKE_USER /home/$RKE_USER/.ssh/id_rsa.pub - chown $RKE_USER:$RKE_USER /home/$RKE_USER/.ssh/authorized_keys - - for host in $1 $2 $3 - do - if [ "$(hostname -f)" != "$host" ]; - then - sshpass -f /root/${RKE_USER}-password.txt ssh-copy-id -i /home/$RKE_USER/.ssh/id_rsa.pub $RKE_USER@$host - fi - done -} - -function disable_swap { - # Disable Swap - swapoff -a - sed -i '/ swap / s/^/#/' /etc/fstab - -} - - -function kernel_setup { - apt-get update - apt-get dist-upgrade -y - - # Check all kernel modules are present - for module in br_netfilter ip6_udp_tunnel ip_set ip_set_hash_ip ip_set_hash_net iptable_filter iptable_nat iptable_mangle iptable_raw nf_conntrack_netlink nf_conntrack nf_conntrack_ipv4 nf_defrag_ipv4 nf_nat nf_nat_ipv4 nf_nat_masquerade_ipv4 nfnetlink udp_tunnel veth vxlan x_tables xt_addrtype xt_conntrack xt_comment xt_mark xt_multiport xt_nat xt_recent xt_set xt_statistic xt_tcpudp; - do - modprobe $module - if ! lsmod | grep -q $module; then - echo "module $module is not present"; - exit 1 - fi; - done - - - echo "br_netfilter" > /etc/modules-load.d/bridge.conf - # Ensure net.bridge.bridge-nf-call-iptables is enabled in the kernel - sysctl net.bridge.bridge-nf-call-iptables=1 - sysctl net.bridge.bridge-nf-call-ip6tables=1 - sysctl net.bridge.bridge-nf-call-arptables=1 - -} - - -function install_docker { - # Remove any old versions of Docker - apt-get remove docker docker-engine docker.io containerd runc - - # Instal deps - apt-get update - apt-get install -y \ - apt-transport-https \ - ca-certificates \ - curl \ - gnupg-agent \ - software-properties-common - - - # Add docker CE repo - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - - add-apt-repository \ - "deb [arch=amd64] https://download.docker.com/linux/ubuntu \ - $(lsb_release -cs) \ - stable" - - apt-get update - #Instal docker - apt-get -y install docker-ce=18.06.1~ce~3-0~ubuntu containerd.io --allow-downgrades - - #Enable docker on startup - systemctl restart docker.service - systemctl enable docker.service - systemctl enable containerd.service - -} - - -function allow_tcp_ssh_forwarding { - grep -qxF "AllowTcpForwarding yes" /etc/ssh/sshd_config || echo "AllowTcpForwarding yes" >> /etc/ssh/sshd_config -}