diff --git a/Makefile b/Makefile index bdb1581b7..a51c98ec4 100644 --- a/Makefile +++ b/Makefile @@ -80,7 +80,7 @@ endif test: receptor PATH=${PWD}:${PATH} \ - go test ./... -p 1 -parallel=16 $(TESTCMD) -count=1 -race + go test ./... -p 1 -parallel=16 $(TESTCMD) -count=1 -race -v receptorctl-test: receptorctl/.VERSION @cd receptorctl && tox -e py3 diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index 8cb995e5e..96efdacf2 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -4,11 +4,11 @@ package workceptor import ( + "bufio" "context" "errors" "fmt" "io" - "io/ioutil" "net" "os" "strings" @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -94,11 +95,13 @@ func podRunningAndReady() func(event watch.Event) (bool, error) { conditions[i].Status == corev1.ConditionFalse { statuses := t.Status.ContainerStatuses for j := range statuses { - if statuses[j].State.Waiting.Reason == "ImagePullBackOff" { - if imagePullBackOffRetries == 0 { - return false, ErrImagePullBackOff + if statuses[j].State.Waiting != nil { + if statuses[j].State.Waiting.Reason == "ImagePullBackOff" { + if imagePullBackOffRetries == 0 { + return false, ErrImagePullBackOff + } + imagePullBackOffRetries-- } - imagePullBackOffRetries-- } } } @@ -193,15 +196,19 @@ func (kw *kubeUnit) createPod(env map[string]string) error { } pod.Spec.Containers[0].Env = evs } + + // get pod and store to kw.pod kw.pod, err = kw.clientset.CoreV1().Pods(ked.KubeNamespace).Create(kw.ctx, pod, metav1.CreateOptions{}) if err != nil { return err } + select { case <-kw.ctx.Done(): return fmt.Errorf("cancelled") default: } + kw.UpdateFullStatus(func(status *StatusFileData) { status.State = WorkStatePending status.Detail = "Pod created" @@ -228,26 +235,31 @@ func (kw *kubeUnit) createPod(env map[string]string) error { if kw.podPendingTimeout != time.Duration(0) { ctxPodReady, _ = context.WithTimeout(kw.ctx, kw.podPendingTimeout) } + ev, err := watch2.UntilWithSync(ctxPodReady, lw, &corev1.Pod{}, nil, podRunningAndReady()) if ev == nil || ev.Object == nil { return fmt.Errorf("did not return an event while watching pod for work unit %s", kw.ID()) } + var ok bool kw.pod, ok = ev.Object.(*corev1.Pod) if !ok { return fmt.Errorf("watch did not return a pod") } + if err == ErrPodCompleted { + // Hao: shouldn't we also call kw.Cancel() in these cases? if len(kw.pod.Status.ContainerStatuses) != 1 { return fmt.Errorf("expected 1 container in pod but there were %d", len(kw.pod.Status.ContainerStatuses)) } + cstat := kw.pod.Status.ContainerStatuses[0] if cstat.State.Terminated != nil && cstat.State.Terminated.ExitCode != 0 { return fmt.Errorf("container failed with exit code %d: %s", cstat.State.Terminated.ExitCode, cstat.State.Terminated.Message) } return err - } else if err != nil { + } else if err != nil { // any other error besides ErrPodCompleted kw.Cancel() if len(kw.pod.Status.ContainerStatuses) == 1 { if kw.pod.Status.ContainerStatuses[0].State.Waiting != nil { @@ -262,165 +274,377 @@ func (kw *kubeUnit) createPod(env map[string]string) error { } func (kw *kubeUnit) runWorkUsingLogger() { - skipStdin := false + skipStdin := true + status := kw.Status() ked := status.ExtraData.(*kubeExtraData) - var err error - var errMsg string - if ked.PodName == "" { - // Create the pod - err := kw.createPod(nil) - if err == ErrPodCompleted { - skipStdin = true - } else if err != nil { - errMsg = fmt.Sprintf("Error creating pod: %s", err) + + podName := ked.PodName + podNamespace := ked.KubeNamespace + + if podName == "" { + // create new pod if ked.PodName is empty + if err := kw.createPod(nil); err != nil { + if err != ErrPodCompleted { + errMsg := fmt.Sprintf("Error creating pod: %s", err) + kw.Error(errMsg) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) + + return + } + } else { + // for newly created pod we need to streaming stdin + skipStdin = false } + + podName = kw.pod.Name + podNamespace = kw.pod.Namespace } else { - skipStdin = true - kw.pod, err = kw.clientset.CoreV1().Pods(ked.KubeNamespace).Get(kw.ctx, ked.PodName, metav1.GetOptions{}) - if err != nil { - errMsg = fmt.Sprintf("Error getting pod: %s", err) - } - } - if errMsg != "" { - kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - logger.Error(errMsg) + if podNamespace == "" { + errMsg := fmt.Sprintf("Error creating pod: pod namespace is empty for pod %s", + podName, + ) + kw.Error(errMsg) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - return - } + return + } - // Open the pod log for stdout - logreq := kw.clientset.CoreV1().Pods(kw.pod.ObjectMeta.Namespace).GetLogs(kw.pod.Name, &corev1.PodLogOptions{ - Container: "worker", - Follow: true, - }) - logStream, err := logreq.Stream(kw.ctx) - if err != nil { - errMsg := fmt.Sprintf("Error opening pod stream: %s", err) - kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - logger.Error(errMsg) + // resuming from a previously created pod + var err error + for retries := 5; retries > 0; retries-- { + kw.pod, err = kw.clientset.CoreV1().Pods(podNamespace).Get(kw.ctx, podName, metav1.GetOptions{}) + if err == nil { + break + } else { + time.Sleep(100 * time.Millisecond) + } + } + if err != nil { + errMsg := fmt.Sprintf("Error getting pod %s/%s: %s", podNamespace, podName, err) + kw.Error(errMsg) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - return + return + } } - defer logStream.Close() // Attach stdin stream to the pod var exec remotecommand.Executor if !skipStdin { req := kw.clientset.CoreV1().RESTClient().Post(). Resource("pods"). - Name(kw.pod.Name). - Namespace(kw.pod.Namespace). + Name(podName). + Namespace(podNamespace). SubResource("attach") - req.VersionedParams(&corev1.PodExecOptions{ - Container: "worker", - Stdin: true, - Stdout: false, - Stderr: false, - TTY: false, - }, scheme.ParameterCodec) + + req.VersionedParams( + &corev1.PodExecOptions{ + Container: "worker", + Stdin: true, + Stdout: false, + Stderr: false, + TTY: false, + }, + scheme.ParameterCodec, + ) + + var err error exec, err = remotecommand.NewSPDYExecutor(kw.config, "POST", req.URL()) if err != nil { - kw.UpdateBasicStatus(WorkStateFailed, fmt.Sprintf("Error attaching to pod: %s", err), 0) + errMsg := fmt.Sprintf("Error creating SPDY executor: %s", err) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) return } } - // Check if we were cancelled before starting the streams - select { - case <-kw.ctx.Done(): - kw.UpdateBasicStatus(WorkStateFailed, "Cancelled", 0) + var stdinErr error + var stdoutErr error - return - default: - } - // Open stdin reader + // finishedChan signal the stdin and stdout monitoring goroutine to stop + finishedChan := make(chan struct{}) + + // this will signal the stdin and stdout monitoring goroutine to stop when this function returns + defer close(finishedChan) + + stdinErrChan := make(chan struct{}) // signal that stdin goroutine have errored and stop stdout goroutine + + // open stdin reader that reads from the work unit's data directory var stdin *stdinReader if !skipStdin { + var err error stdin, err = newStdinReader(kw.UnitDir()) - if errors.Is(err, errFileSizeZero) { - skipStdin = true - } else if err != nil { - errMsg := fmt.Sprintf("Error opening stdin file: %s", err) - logger.Error(errMsg) - kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) + if err != nil { + if errors.Is(err, errFileSizeZero) { + skipStdin = true + } else { + errMsg := fmt.Sprintf("Error opening stdin file: %s", err) + kw.Error(errMsg) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - return + return + } + } else { + // goroutine to cancel stdin reader + go func() { + select { + case <-kw.ctx.Done(): + stdin.reader.Close() + + return + case <-finishedChan: + case <-stdin.Done(): + return + } + }() } } - // Open stdout writer + // open stdout writer that writes to work unit's data directory stdout, err := newStdoutWriter(kw.UnitDir()) if err != nil { errMsg := fmt.Sprintf("Error opening stdout file: %s", err) - logger.Error(errMsg) + kw.Error(errMsg) kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) return } - // Goroutine to update status when stdin is fully sent to the pod, which is when we - // update from WorkStatePending to WorkStateRunning. - finishedChan := make(chan struct{}) - if !skipStdin { - kw.UpdateFullStatus(func(status *StatusFileData) { - status.State = WorkStatePending - status.Detail = "Sending stdin to pod" - }) - go func() { - select { - case <-kw.ctx.Done(): - return - case <-finishedChan: - return - case <-stdin.Done(): - err := stdin.Error() - if err == io.EOF { - kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size()) - } else { - kw.UpdateBasicStatus(WorkStateFailed, fmt.Sprintf("Error reading stdin: %s", err), stdout.Size()) - } - } - }() - } else { - kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size()) - } + // goroutine to cancel stdout stream + go func() { + select { + case <-kw.ctx.Done(): + stdout.writer.Close() + + return + case <-stdinErrChan: + stdout.writer.Close() + + return + case <-finishedChan: + return + } + }() - // Actually run the streams. This blocks until the pod finishes. - var errStdin error - var errStdout error streamWait := sync.WaitGroup{} streamWait.Add(2) + if skipStdin { + kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size()) streamWait.Done() } else { go func() { - errStdin = exec.Stream(remotecommand.StreamOptions{ - Stdin: stdin, - Tty: false, + defer streamWait.Done() + + kw.UpdateFullStatus(func(status *StatusFileData) { + status.State = WorkStatePending + status.Detail = "Sending stdin to pod" }) - if errStdin != nil { - logStream.Close() + + var err error + for retries := 5; retries > 0; retries-- { + err = exec.Stream(remotecommand.StreamOptions{ + Stdin: stdin, + Tty: false, + }) + if err != nil { + // NOTE: io.EOF for stdin is handled by remotecommand and will not trigger this + kw.Warning("Error streaming stdin to pod %s/%s. Retrying: %s", + podNamespace, + podName, + err, + ) + time.Sleep(100 * time.Millisecond) + } else { + break + } + } + + if err != nil { + stdinErr = err + errMsg := fmt.Sprintf("Error streaming stdin to pod %s/%s: %s", + podNamespace, + podName, + err, + ) + kw.Error(errMsg) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, stdout.Size()) + + close(stdinErrChan) // signal STDOUT goroutine to stop + } else { + if stdin.Error() == io.EOF { + kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size()) + } else { + // this is probably not possible... + errMsg := fmt.Sprintf("Error reading stdin: %s", stdin.Error()) + kw.Error(errMsg) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, stdout.Size()) + + close(stdinErrChan) // signal STDOUT goroutine to stop + } } - streamWait.Done() }() } - go func() { - _, errStdout = io.Copy(stdout, logStream) - streamWait.Done() - }() + + noReconnect := func() { + // Legacy method, for use on k8s < v1.23.14 + // uses io.Copy to stream data from pod to stdout file + // known issues around this, as logstream can terminate due to log rotation + // or 4 hr timeout + defer streamWait.Done() + var logStream io.ReadCloser + logReq := kw.clientset.CoreV1().Pods(podNamespace).GetLogs( + podName, &corev1.PodLogOptions{ + Container: "worker", + Follow: true, + }, + ) + // get logstream, with retry + for retries := 5; retries > 0; retries-- { + logStream, err = logReq.Stream(kw.ctx) + if err == nil { + break + } else { + errMsg := fmt.Sprintf("Error opening log stream for pod %s/%s. Will retry %d more times.", podNamespace, podName, retries) + kw.Warning(errMsg) + time.Sleep(time.Second) + } + } + if err != nil { + errMsg := fmt.Sprintf("Error opening log stream for pod %s/%s: %s", podNamespace, podName, err) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) + kw.Error(errMsg) + + return + } + + _, stdoutErr = io.Copy(stdout, logStream) + } + + withReconnect := func() { + // preferred method for k8s >= 1.23.14 + defer streamWait.Done() + var sinceTime time.Time + var logStream io.ReadCloser + eofRetries := 5 + successfulWrite := false + remainingEOFAttempts := eofRetries // resets on each successful read from pod stdout + + for { + if stdinErr != nil { + break + } + + // get pod, with retry + for retries := 5; retries > 0; retries-- { + kw.pod, err = kw.clientset.CoreV1().Pods(podNamespace).Get(kw.ctx, podName, metav1.GetOptions{}) + if err == nil { + break + } else { + errMsg := fmt.Sprintf("Error getting pod %s/%s. Will retry %d more times.", podNamespace, podName, retries) + kw.Warning(errMsg) + time.Sleep(time.Second) + } + } + if err != nil { + errMsg := fmt.Sprintf("Error getting pod %s/%s: %s", podNamespace, podName, err) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) + kw.Error(errMsg) + + break + } + + logReq := kw.clientset.CoreV1().Pods(podNamespace).GetLogs( + podName, &corev1.PodLogOptions{ + Container: "worker", + Follow: true, + Timestamps: true, + SinceTime: &metav1.Time{Time: sinceTime}, + }, + ) + // get logstream, with retry + for retries := 5; retries > 0; retries-- { + logStream, err = logReq.Stream(kw.ctx) + if err == nil { + break + } else { + errMsg := fmt.Sprintf("Error opening log stream for pod %s/%s. Will retry %d more times.", podNamespace, podName, retries) + kw.Warning(errMsg) + time.Sleep(time.Second) + } + } + if err != nil { + errMsg := fmt.Sprintf("Error opening log stream for pod %s/%s: %s", podNamespace, podName, err) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) + kw.Error(errMsg) + + break + } + + // read from logstream + streamReader := bufio.NewReader(logStream) + for stdinErr == nil { // check between every line read to see if we need to stop reading + line, err := streamReader.ReadString('\n') + if err == io.EOF { + kw.Debug("Detected EOF for pod %s/%s. Will retry %d more times.", podNamespace, podName, remainingEOFAttempts) + successfulWrite = false + remainingEOFAttempts-- + if remainingEOFAttempts > 0 { + time.Sleep(100 * time.Millisecond) + + break + } + + return + } + if err != nil { + stdoutErr = err + + return + } + split := strings.SplitN(line, " ", 2) + timeStamp := parseTime(split[0]) + if !timeStamp.After(sinceTime) && !successfulWrite { + continue + } + msg := split[1] + + _, err = stdout.Write([]byte(msg)) + if err != nil { + stdoutErr = fmt.Errorf("writing to stdout: %s", err) + + return + } + remainingEOFAttempts = eofRetries // each time we read successfully, reset this counter + sinceTime = *timeStamp + successfulWrite = true + } + + logStream.Close() + } + } + + stdoutWithReconnect := shouldUseReconnect(kw) + if stdoutWithReconnect && stdoutErr == nil { + kw.Debug("streaming stdout with reconnect support") + go withReconnect() + } else { + kw.Debug("streaming stdout with no reconnect support") + go noReconnect() + } + streamWait.Wait() - close(finishedChan) - if errStdin != nil || errStdout != nil { + + if stdinErr != nil || stdoutErr != nil { var errDetail string switch { - case errStdin == nil: - errDetail = fmt.Sprintf("%s", errStdout) - case errStdout == nil: - errDetail = fmt.Sprintf("%s", errStdin) + case stdinErr == nil: + errDetail = fmt.Sprintf("%s", stdoutErr) + case stdoutErr == nil: + errDetail = fmt.Sprintf("%s", stdinErr) default: - errDetail = fmt.Sprintf("stdin: %s, stdout: %s", errStdin, errStdout) + errDetail = fmt.Sprintf("stdin: %s, stdout: %s", stdinErr, stdoutErr) } kw.UpdateBasicStatus(WorkStateFailed, fmt.Sprintf("Stream error running pod: %s", errDetail), stdout.Size()) @@ -429,6 +653,89 @@ func (kw *kubeUnit) runWorkUsingLogger() { kw.UpdateBasicStatus(WorkStateSucceeded, "Finished", stdout.Size()) } +func shouldUseReconnect(kw *kubeUnit) bool { + // Attempt to detect support for streaming from pod with timestamps based on + // Kubernetes server version + // In order to use reconnect method, Kubernetes server must be at least + // v1.23.14 + // v1.24.8 + // v1.25.4 + // These versions contain a critical patch that permits connecting to the + // logstream with timestamps enabled. + // Without the patch, stdout lines would be split after 4K characters into a + // new line, which will cause issues in Receptor. + // https://github.com/kubernetes/kubernetes/issues/77603 + // Can override the detection by setting the RECEPTOR_KUBE_SUPPORT_RECONNECT + // accepted values: "enabled", "disabled", "auto" with "auto" being the default + // all invalid value will assume to be "auto" + + env, ok := os.LookupEnv("RECEPTOR_KUBE_SUPPORT_RECONNECT") + if ok { + switch env { + case "enabled": + return true + case "disabled": + return false + case "auto": + // continue + default: + // continue + } + } + + serverVerInfo, err := kw.clientset.ServerVersion() + if err != nil { + logger.Warning("could not detect Kubernetes server version, will not use reconnect support") + + return false + } + + semver, err := version.ParseSemantic(serverVerInfo.String()) + if err != nil { + logger.Warning("could parse Kubernetes server version %s, will not use reconnect support", serverVerInfo.String()) + + return false + } + + // The patch was backported to minor version 23, 24 and 25. We must check z stream + // based on the minor version + // if minor version == 24, compare with v1.24.8 + // if minor version == 25, compare with v1.25.4 + // all other minor versions compare with v1.23.14 + var compatibleVer string + switch serverVerInfo.Minor { + case "24": + compatibleVer = "v1.24.8" + case "25": + compatibleVer = "v1.25.4" + default: + compatibleVer = "v1.23.14" + } + + if semver.AtLeast(version.MustParseSemantic(compatibleVer)) { + logger.Debug("Kubernetes version %s is at least %s, using reconnect support", serverVerInfo.GitVersion, compatibleVer) + + return true + } + logger.Debug("Kubernetes version %s not at least %s, not using reconnect support", serverVerInfo.GitVersion, compatibleVer) + + return false +} + +func parseTime(s string) *time.Time { + t, err := time.Parse(time.RFC3339, s) + if err == nil { + return &t + } + + t, err = time.Parse(time.RFC3339Nano, s) + if err == nil { + return &t + } + + return nil +} + func getDefaultInterface() (string, error) { nifs, err := net.Interfaces() if err != nil { @@ -675,6 +982,7 @@ func (kw *kubeUnit) connectToKube() error { if err != nil { return err } + kw.clientset, err = kubernetes.NewForConfig(kw.config) if err != nil { return err @@ -688,7 +996,7 @@ func readFileToString(filename string) (string, error) { if filename == "" { return "", nil } - content, err := ioutil.ReadFile(filename) + content, err := os.ReadFile(filename) if err != nil { return "", err } diff --git a/pkg/workceptor/stdio_utils.go b/pkg/workceptor/stdio_utils.go index 8a1452477..48def5241 100644 --- a/pkg/workceptor/stdio_utils.go +++ b/pkg/workceptor/stdio_utils.go @@ -24,7 +24,7 @@ func saveStdoutSize(unitdir string, stdoutSize int64) error { // stdoutWriter writes to a stdout file while also updating the status file. type stdoutWriter struct { unitdir string - writer io.Writer + writer io.WriteCloser bytesWritten int64 } @@ -64,7 +64,7 @@ func (sw *stdoutWriter) Size() int64 { // stdinReader reads from a stdin file and provides a Done function. type stdinReader struct { - reader io.Reader + reader io.ReadCloser lasterr error doneChan chan struct{} doneOnce sync.Once diff --git a/pkg/workceptor/workunitbase.go b/pkg/workceptor/workunitbase.go index 6e7b77618..305d6a13d 100644 --- a/pkg/workceptor/workunitbase.go +++ b/pkg/workceptor/workunitbase.go @@ -91,6 +91,30 @@ func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string) { bwu.ctx, bwu.cancel = context.WithCancel(w.ctx) } +// Error logs message with unitID prepended. +func (bwu *BaseWorkUnit) Error(format string, v ...interface{}) { + format = fmt.Sprintf("[%s] %s", bwu.unitID, format) + logger.Error(format, v...) +} + +// Warning logs message with unitID prepended. +func (bwu *BaseWorkUnit) Warning(format string, v ...interface{}) { + format = fmt.Sprintf("[%s] %s", bwu.unitID, format) + logger.Warning(format, v...) +} + +// Info logs message with unitID prepended. +func (bwu *BaseWorkUnit) Info(format string, v ...interface{}) { + format = fmt.Sprintf("[%s] %s", bwu.unitID, format) + logger.Info(format, v...) +} + +// Debug logs message with unitID prepended. +func (bwu *BaseWorkUnit) Debug(format string, v ...interface{}) { + format = fmt.Sprintf("[%s] %s", bwu.unitID, format) + logger.Debug(format, v...) +} + // SetFromParams sets the in-memory state from parameters. func (bwu *BaseWorkUnit) SetFromParams(params map[string]string) error { return nil