Skip to content

Commit

Permalink
Improve logs extension
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Loskutnikov <anatoly.loskutnikov@gmail.com>
  • Loading branch information
Tiberivs committed Mar 21, 2021
1 parent 6ee23ec commit 093df8b
Showing 1 changed file with 109 additions and 70 deletions.
179 changes: 109 additions & 70 deletions extensions/logs/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,128 +18,192 @@ package logs

import (
"context"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

"github.com/kelseyhightower/envconfig"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

var workerCount int = runtime.NumCPU()
var excludeNamespaceFlag = flag.String("log-exclude-k8s-ns", "kube-system,local-path-storage",
"comma separated list of excluded kubernetes namespaces")
var contextTimeoutFlag = flag.String("log-context-timeout", "15s",
"log context timeout")
var logDirFlag = flag.String("log-dir", envOrDefault("ARTIFACTS_DIR", "logs"),
"container logs directory")
var config Config

type Config struct {
ArtifactsDir string `default:"logs" desc:"Directory for storing container logs" split_words:"true"`
ExcludeK8sNs string `default:"kube-system,local-path-storage" desc:"Comma-separated list of excluded kubernetes namespaces" split_words:"true"`
ContextTimeout time.Duration `default:"15s" desc:"Context timeout for kubernetes queries" split_words:"true"`
LogWorkerCount int `default:"4" desc:"Number of log collector workers" split_words:"true"`
}

type Suite struct {
suite.Suite

kubeClient kubernetes.Interface

testStartTime time.Time
ctxTimeout time.Duration
kubeClient kubernetes.Interface
nsmContainers map[types.UID]bool
logQueue chan logItem
waitGroup sync.WaitGroup
}

type logSource struct {
type logItem struct {
namespace string
pod string
logDir string
logOptions *corev1.PodLogOptions
}

func init() {
if err := envconfig.Usage("", &config); err != nil {
panic(err)
}

if err := envconfig.Process("", &config); err != nil {
panic(err)
}
}

func (s *Suite) SetupSuite() {
var err error

s.ctxTimeout, err = time.ParseDuration(*contextTimeoutFlag)
require.NoError(s.T(), err)

s.kubeClient, err = newKubeClient()
require.NoError(s.T(), err)

s.logQueue = make(chan logItem)
for i := 0; i < config.LogWorkerCount; i++ {
s.waitGroup.Add(1)
go func() {
defer s.waitGroup.Done()
for src := range s.logQueue {
s.saveLog(src)
}
}()
}
}

func (s *Suite) TearDownSuite() {
close(s.logQueue)
s.waitGroup.Wait()
}

func (s *Suite) SetupTest() {
s.testStartTime = time.Now()
s.nsmContainers = make(map[types.UID]bool)

ctx, cancel := context.WithTimeout(context.Background(), config.ContextTimeout)
defer cancel()

pods, err := s.kubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
require.NoError(s.T(), err)

for podIdx := range pods.Items {
pod := &pods.Items[podIdx]

if !isExcludedNamespace(pod.Namespace) {
s.nsmContainers[pod.UID] = true
}
}
}

func (s *Suite) AfterTest(suiteName, testName string) {
logPath := fmt.Sprintf("%s/%s/%s", *logDirFlag, suiteName, testName)
require.NoError(s.T(), os.MkdirAll(logPath, os.ModePerm))
logDir := filepath.Join(config.ArtifactsDir, suiteName, testName)
require.NoError(s.T(), os.MkdirAll(logDir, os.ModePerm))

logOptions := corev1.PodLogOptions{
Timestamps: true,
SinceTime: &metav1.Time{Time: s.testStartTime},
}

ctx, cancel := context.WithTimeout(context.Background(), s.ctxTimeout)
ctx, cancel := context.WithTimeout(context.Background(), config.ContextTimeout)
defer cancel()

list, err := s.kubeClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
pods, err := s.kubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
require.NoError(s.T(), err)

var waitGroup sync.WaitGroup
var logSources = make(chan logSource)
for i := 0; i < workerCount; i++ {
for podIdx := range pods.Items {
pod := &pods.Items[podIdx]

if isExcludedNamespace(pod.Namespace) {
continue
}

nextLogItem := logItem{
namespace: pod.Namespace,
pod: pod.Name,
logDir: logDir,
logOptions: &logOptions,
}

if _, ok := s.nsmContainers[pod.UID]; ok {
s.logQueue <- nextLogItem
continue
}

waitGroup.Add(1)
go func() {
defer waitGroup.Done()
s.worker(ctx, logPath, logSources)
s.saveLog(nextLogItem)
}()
}

for nsIdx := range list.Items {
ns := &list.Items[nsIdx]
waitGroup.Wait()
}

if isExcludedNamespace(ns.Name) {
continue
}
func (s *Suite) saveLog(src logItem) {
ctx, cancel := context.WithTimeout(context.Background(), config.ContextTimeout)
defer cancel()

pods, err := s.kubeClient.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{})
require.NoError(s.T(), err)
data, err := s.kubeClient.CoreV1().
Pods(src.namespace).
GetLogs(src.pod, src.logOptions).
DoRaw(ctx)

for podIdx := range pods.Items {
pod := &pods.Items[podIdx]
logSources <- logSource{
namespace: ns.Name,
pod: pod.Name,
logOptions: &logOptions,
}
}
require.NoError(s.T(), err)

if len(data) > 0 {
logFile := filepath.Join(src.logDir, src.pod+".log")
require.NoError(s.T(), ioutil.WriteFile(logFile, data, os.ModePerm))
}
}

close(logSources)
waitGroup.Wait()
func envOrDefault(key, defaultValue string) string {
value, ok := os.LookupEnv(key)
if !ok {
value = defaultValue
}

return value
}

// newKubeClient creates new k8s client
func newKubeClient() (kubernetes.Interface, error) {
defaultPath := filepath.Join(os.Getenv("HOME"), ".kube", "config")
path := envOrDefault("KUBECONFIG", defaultPath)

config, err := clientcmd.BuildConfigFromFlags("", path)
kubeconfig, err := clientcmd.BuildConfigFromFlags("", path)
if err != nil {
return nil, err
}

config.Burst = workerCount * 100
config.QPS = float32(workerCount) * 100
kubeconfig.Burst = config.LogWorkerCount * 100
kubeconfig.QPS = float32(config.LogWorkerCount) * 100

return kubernetes.NewForConfig(config)
return kubernetes.NewForConfig(kubeconfig)
}

func isExcludedNamespace(ns string) bool {
excludeList := strings.Split(*excludeNamespaceFlag, `,`)
excludeList := strings.Split(config.ExcludeK8sNs, `,`)
for _, excluded := range excludeList {
if ns == excluded {
return true
Expand All @@ -148,28 +212,3 @@ func isExcludedNamespace(ns string) bool {

return false
}

func (s *Suite) worker(ctx context.Context, logPath string, sources <-chan logSource) {
for src := range sources {
data, err := s.kubeClient.CoreV1().
Pods(src.namespace).
GetLogs(src.pod, src.logOptions).
DoRaw(ctx)

require.NoError(s.T(), err)

if len(data) > 0 {
logFile := fmt.Sprintf("%s/%s.log", logPath, src.pod)
require.NoError(s.T(), ioutil.WriteFile(logFile, data, os.ModePerm))
}
}
}

func envOrDefault(key, defaultValue string) string {
value, ok := os.LookupEnv(key)
if !ok {
value = defaultValue
}

return value
}

0 comments on commit 093df8b

Please sign in to comment.