Skip to content

Commit

Permalink
Merge pull request #786 from dgageot/fix-missing-logs
Browse files Browse the repository at this point in the history
Fix missing logs
  • Loading branch information
r2d4 authored Jul 11, 2018
2 parents cf68b39 + 3e99b88 commit f614956
Showing 1 changed file with 17 additions and 27 deletions.
44 changes: 17 additions & 27 deletions pkg/skaffold/kubernetes/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io"
"os/exec"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -85,6 +86,7 @@ func (a *LogAggregator) Start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
watcher.Stop()
return
case evt, ok := <-watcher.ResultChan():
if !ok {
Expand All @@ -102,7 +104,7 @@ func (a *LogAggregator) Start(ctx context.Context) error {
}

if a.podSelector.Select(pod) {
a.streamLogs(ctx, client, pod)
go a.streamLogs(ctx, client, pod)
}
}
}
Expand All @@ -113,11 +115,9 @@ func (a *LogAggregator) Start(ctx context.Context) error {
}

func (a *LogAggregator) streamLogs(ctx context.Context, client corev1.PodsGetter, pod *v1.Pod) error {
pods := client.Pods(pod.Namespace)

for _, container := range pod.Status.ContainerStatuses {
containerID := container.ContainerID
if containerID == "" {
if containerID == "" || !container.Ready {
continue
}

Expand All @@ -128,36 +128,26 @@ func (a *LogAggregator) streamLogs(ctx context.Context, client corev1.PodsGetter

logrus.Infof("Stream logs from pod: %s container: %s", pod.Name, container.Name)

sinceSeconds := int64(time.Since(a.startTime).Seconds() + 0.5)
// 0s means all the logs
if sinceSeconds == 0 {
sinceSeconds = 1
}

req := pods.GetLogs(pod.Name, &v1.PodLogOptions{
Follow: true,
Container: container.Name,
SinceSeconds: &sinceSeconds,
})
tr, tw := io.Pipe()
go func() {
sinceSeconds := int64(time.Since(a.startTime).Seconds() + 0.5)
// 0s means all the logs
if sinceSeconds == 0 {
sinceSeconds = 1
}

rc, err := req.Stream()
if err != nil {
a.trackedContainers.remove(containerID)
return errors.Wrap(err, "setting up container log stream")
}
cmd := exec.CommandContext(ctx, "kubectl", "logs", fmt.Sprintf("--since=%ds", sinceSeconds), "-f", pod.Name, "-c", container.Name)
cmd.Stdout = tw
cmd.Run()
}()

color := a.colorPicker.Pick(pod)
prefix := color.Sprint(prefix(pod, container))

go func() {
defer func() {
a.trackedContainers.remove(containerID)
rc.Close()
}()

if err := a.streamRequest(ctx, prefix, rc); err != nil {
if err := a.streamRequest(ctx, prefix, tr); err != nil {
logrus.Errorf("streaming request %s", err)
}
a.trackedContainers.remove(containerID)
}()
}

Expand Down

0 comments on commit f614956

Please sign in to comment.