Skip to content

Commit

Permalink
preflights
Browse files Browse the repository at this point in the history
  • Loading branch information
marccampbell committed Jul 17, 2019
1 parent 279afcb commit a420e36
Show file tree
Hide file tree
Showing 10 changed files with 471 additions and 286 deletions.
281 changes: 0 additions & 281 deletions cmd/preflight/cli/run.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,11 @@
package cli

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"time"

troubleshootv1beta1 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta1"
"github.com/replicatedhq/troubleshoot/pkg/k8sutil"
preflightrunner "github.com/replicatedhq/troubleshoot/pkg/preflight"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

func Run() *cobra.Command {
Expand Down Expand Up @@ -64,265 +42,6 @@ func Run() *cobra.Command {
return cmd
}

func runPreflightsCRD(v *viper.Viper) error {
troubleshootClient, err := createTroubleshootK8sClient()
if err != nil {
return err
}

preflightName := v.GetString("preflight")
if preflightName == "" {
preflights, err := troubleshootClient.Preflights(v.GetString("namespace")).List(metav1.ListOptions{})
if err != nil {
return err
}

if len(preflights.Items) == 1 {
preflightName = preflights.Items[0].Name
}
}

if preflightName == "" {
return errors.New("unable to preflight, try using the --preflight flags")
}

// generate a unique name
now := time.Now()
suffix := fmt.Sprintf("%d", now.Unix())

preflightJobName := fmt.Sprintf("%s-job-%s", preflightName, suffix[len(suffix)-4:])
preflightJob := troubleshootv1beta1.PreflightJob{
ObjectMeta: metav1.ObjectMeta{
Name: preflightJobName,
Namespace: v.GetString("namespace"),
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "preflightjob.troubleshoot.replicated.com",
},
Spec: troubleshootv1beta1.PreflightJobSpec{
Preflight: troubleshootv1beta1.PreflightRef{
Name: preflightName,
Namespace: v.GetString("namespace"),
},
Image: v.GetString("image"),
ImagePullPolicy: v.GetString("pullpolicy"),
CollectorImage: v.GetString("collector-image"),
CollectorImagePullPolicy: v.GetString("collector-pullpolicy"),
},
}
if _, err := troubleshootClient.PreflightJobs(v.GetString("namespace")).Create(&preflightJob); err != nil {
return err
}

// Poll the status of the Custom Resource for it to include a callback
var found *troubleshootv1beta1.PreflightJob
start := time.Now()
for {
current, err := troubleshootClient.PreflightJobs(v.GetString("namespace")).Get(preflightJobName, metav1.GetOptions{})
if err != nil && kuberneteserrors.IsNotFound(err) {
continue
} else if err != nil {
return err
}

if current.Status.IsServerReady {
found = current
break
}

if time.Now().Sub(start) > time.Duration(time.Second*10) {
return errors.New("preflightjob failed to start")
}

time.Sleep(time.Millisecond * 200)
}

// Connect to the callback
stopChan, err := k8sutil.PortForward(v.GetString("kubecontext"), 8000, 8000, found.Status.ServerPodNamespace, found.Status.ServerPodName)
if err != nil {
return err
}

if err := receivePreflightResults(found.Namespace, found.Name); err != nil {
return err
}

// Write

close(stopChan)
return nil
}

func runPreflightsNoCRD(v *viper.Viper, arg string) error {
preflightContent := ""
if !isURL(arg) {
if _, err := os.Stat(arg); os.IsNotExist(err) {
return fmt.Errorf("%s was not found", arg)
}

b, err := ioutil.ReadFile(arg)
if err != nil {
return err
}

preflightContent = string(b)
} else {
resp, err := http.Get(arg)
if err != nil {
return err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

preflightContent = string(body)
}

preflight := troubleshootv1beta1.Preflight{}
if err := yaml.Unmarshal([]byte(preflightContent), &preflight); err != nil {
return fmt.Errorf("unable to parse %s as a preflight", arg)
}

cfg, err := config.GetConfig()
if err != nil {
return err
}

client, err := client.New(cfg, client.Options{})
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
}
restClient := clientset.CoreV1().RESTClient()

// deploy an object that "owns" everything to aid in cleanup
owner := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("preflight-%s-owner", preflight.Name),
Namespace: v.GetString("namespace"),
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
Data: make(map[string]string),
}
if err := client.Create(context.Background(), &owner); err != nil {
return err
}
defer func() {
if err := client.Delete(context.Background(), &owner); err != nil {
fmt.Println("failed to clean up preflight.")
}
}()

// deploy all collectors
desiredCollectors := make([]*troubleshootv1beta1.Collect, 0, 0)
for _, definedCollector := range preflight.Spec.Collectors {
desiredCollectors = append(desiredCollectors, definedCollector)
}
desiredCollectors = ensureCollectorInList(desiredCollectors, troubleshootv1beta1.Collect{ClusterInfo: &troubleshootv1beta1.ClusterInfo{}})
desiredCollectors = ensureCollectorInList(desiredCollectors, troubleshootv1beta1.Collect{ClusterResources: &troubleshootv1beta1.ClusterResources{}})

podsCreated := make([]*corev1.Pod, 0, 0)
podsDeleted := make([]*corev1.Pod, 0, 0)

resyncPeriod := time.Second
ctx := context.Background()
watchList := cache.NewListWatchFromClient(restClient, "pods", "", fields.Everything())
_, controller := cache.NewInformer(watchList, &corev1.Pod{}, resyncPeriod,
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
newPod, ok := newObj.(*corev1.Pod)
if !ok {
return
}
oldPod, ok := oldObj.(*corev1.Pod)
if !ok {
return
}
labels := newPod.Labels
troubleshootRole, ok := labels["troubleshoot-role"]
if !ok || troubleshootRole != "preflight" {
return
}
preflightName, ok := labels["preflight"]
if !ok || preflightName != preflight.Name {
return
}

if oldPod.Status.Phase == newPod.Status.Phase {
return
}

if newPod.Status.Phase != corev1.PodSucceeded {
return
}

podLogOpts := corev1.PodLogOptions{}

req := clientset.CoreV1().Pods(newPod.Namespace).GetLogs(newPod.Name, &podLogOpts)
podLogs, err := req.Stream()
if err != nil {
fmt.Println("get stream")
return
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
fmt.Println("copy logs")
return
}

fmt.Printf(buf.String())

if err := client.Delete(context.Background(), newPod); err != nil {
fmt.Println("delete pod")
}
podsDeleted = append(podsDeleted, newPod)
},
})
go func() {
controller.Run(ctx.Done())
}()

s := runtime.NewScheme()
s.AddKnownTypes(schema.GroupVersion{Group: "", Version: "v1"}, &corev1.ConfigMap{})
for _, collector := range desiredCollectors {
_, pod, err := preflightrunner.CreateCollector(client, s, &owner, preflight.Name, v.GetString("namespace"), collector, v.GetString("image"), v.GetString("pullpolicy"))
if err != nil {
return err
}
podsCreated = append(podsCreated, pod)
}

start := time.Now()
for {
if start.Add(time.Second * 30).Before(time.Now()) {
fmt.Println("timeout running preflight")
return err
}

if len(podsDeleted) == len(podsCreated) {
break
}

time.Sleep(time.Millisecond * 200)
}

ctx.Done()
return nil
}

func ensureCollectorInList(list []*troubleshootv1beta1.Collect, collector troubleshootv1beta1.Collect) []*troubleshootv1beta1.Collect {
for _, inList := range list {
if collector.ClusterResources != nil && inList.ClusterResources != nil {
Expand Down
Loading

0 comments on commit a420e36

Please sign in to comment.