Skip to content

Commit

Permalink
test/e2e,cmd/test: scrape metrics for test servers and e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
s-urbaniak committed Feb 10, 2023
1 parent 09cbd2f commit d05581a
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 10 deletions.
23 changes: 21 additions & 2 deletions cmd/sharded-test-server/frontproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/fatih/color"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -102,7 +103,10 @@ func startFrontProxy(
}

// write root shard kubeconfig
configLoader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&clientcmd.ClientConfigLoadingRules{ExplicitPath: filepath.Join(workDirPath, ".kcp-0/admin.kubeconfig")}, nil)
configLoader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: filepath.Join(workDirPath, ".kcp-0/admin.kubeconfig")},
&clientcmd.ConfigOverrides{CurrentContext: "system:admin"},
)
raw, err := configLoader.RawConfig()
if err != nil {
return err
Expand Down Expand Up @@ -232,8 +236,23 @@ func startFrontProxy(
writer.StopOut()
}
fmt.Fprintf(successOut, "kcp-front-proxy is ready\n")
cfg, err := configLoader.ClientConfig()
if err != nil {
return err
}
return scrapeMetrics(ctx, cfg, workDirPath)
}

func scrapeMetrics(ctx context.Context, cfg *rest.Config, workDir string) error {
promUrl, set := os.LookupEnv("PROMETHEUS_URL")
if !set {
return nil
}

return nil
jobName := fmt.Sprintf("kcp-front-proxy-%d", time.Now().Unix())
return framework.ScrapeMetrics(ctx, cfg, promUrl, workDir, jobName, map[string]string{
"server": "kcp-front-proxy",
})
}

func writeAdminKubeConfig(hostIP string, workDirPath string) error {
Expand Down
5 changes: 3 additions & 2 deletions cmd/sharded-test-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,12 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb

// Wait for shards to be ready
shardsErrCh := make(chan indexErrTuple)
for i, shard := range shards {
terminatedCh, err := shard.WaitForReady(ctx)
for i, s := range shards {
terminatedCh, err := s.WaitForReady(ctx)
if err != nil {
return err
}
shard.ScrapeMetrics(ctx, s, workDirPath)
go func(i int, terminatedCh <-chan error) {
err := <-terminatedCh
shardsErrCh <- indexErrTuple{i, err}
Expand Down
26 changes: 26 additions & 0 deletions cmd/test-server/kcp/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,32 @@ func (s *Shard) GatherMetrics(ctx context.Context) {
logger.Info("wrote metrics file", "path", metricsFile)
}

func ScrapeMetrics(ctx context.Context, s *Shard, workDir string) error {
logger := klog.FromContext(ctx).WithValues("shard", s.name)
promUrl, set := os.LookupEnv("PROMETHEUS_URL")
if !set {
logger.Info("PROMETHEUS_URL environment variable unset, skipping Prometheus scrape config generation")
return nil
}

logger.Info("scraping shard metrics using Prometheus")
kubeconfigPath := filepath.Join(s.runtimeDir, "admin.kubeconfig")
configLoader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&clientcmd.ConfigOverrides{CurrentContext: "system:admin"},
)
config, err := configLoader.ClientConfig()
if err != nil {
logger.Error(err, "unable to collect metrics: error getting client config")
return err
}

jobName := fmt.Sprintf("%s-%d", s.name, time.Now().Unix())
return framework.ScrapeMetrics(ctx, config, promUrl, workDir, jobName, map[string]string{
"server": s.name,
})
}

// there doesn't seem to be any simple way to get a metav1.Status from the Go client, so we get
// the content in a string-formatted error, unfortunately.
func unreadyComponentsFromError(err error) sets.String {
Expand Down
13 changes: 9 additions & 4 deletions cmd/test-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func start(shardFlags []string, quiet bool) error {
}

logFilePath := flag.Lookup("log-file-path").Value.String()
shard := shard.NewShard(
s := shard.NewShard(
"kcp",
".kcp",
logFilePath,
Expand All @@ -108,11 +108,16 @@ func start(shardFlags []string, quiet bool) error {
"--client-ca-file", filepath.Join(".kcp", "client-ca.crt"),
),
)
if err := shard.Start(ctx, quiet); err != nil {
if err := s.Start(ctx, quiet); err != nil {
return err
}

errCh, err := shard.WaitForReady(ctx)
errCh, err := s.WaitForReady(ctx)
if err != nil {
return err
}

err = shard.ScrapeMetrics(ctx, s, ".")
if err != nil {
return err
}
Expand All @@ -134,7 +139,7 @@ func start(shardFlags []string, quiet bool) error {
metricsCtx, metricsCancel := context.WithTimeout(ctx, wait.ForeverTestTimeout)
defer metricsCancel()

shard.GatherMetrics(metricsCtx)
s.GatherMetrics(metricsCtx)

return nil
}
170 changes: 168 additions & 2 deletions test/e2e/framework/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
Expand All @@ -39,6 +42,7 @@ import (
"github.com/kcp-dev/logicalcluster/v3"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
gopkgyaml "gopkg.in/yaml.v2"

apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -207,6 +211,164 @@ func GatherMetrics(ctx context.Context, t *testing.T, server RunningServer, dire
}
}

func ScrapeMetricsForServer(t *testing.T, srv RunningServer) {
promUrl, set := os.LookupEnv("PROMETHEUS_URL")
if !set {
t.Logf("PROMETHEUS_URL environment variable unset, skipping Prometheus scrape config generation")
return
}
promCfgDir, set := os.LookupEnv("ARTIFACT_DIR")
if !set {
t.Logf("ARTIFACT_DIR environment variable unset, skipping Prometheus scrape config generation")
return
}
jobName := fmt.Sprintf("kcp-%s-%s-%d", srv.Name(), t.Name(), time.Now().Unix())
labels := map[string]string{
"server": srv.Name(),
"test": t.Name(),
}

ctx, cancel := context.WithTimeout(context.Background(), wait.ForeverTestTimeout)
defer cancel()
require.NoError(t, ScrapeMetrics(ctx, srv.RootShardSystemMasterBaseConfig(t), promUrl, promCfgDir, jobName, labels))
}

func ScrapeMetrics(ctx context.Context, cfg *rest.Config, promUrl, promCfgDir, jobName string, labels map[string]string) error {
type staticConfigs struct {
Targets []string `yaml:"targets,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
}
type tlsConfig struct {
InsecureSkipVerify bool `yaml:"insecure_skip_verify,omitempty"`
}
type scrapeConfig struct {
JobName string `yaml:"job_name,omitempty"`
ScrapeInterval string `yaml:"scrape_interval,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
TlsConfig tlsConfig `yaml:"tls_config,omitempty"`
Scheme string `yaml:"scheme,omitempty"`
StaticConfigs []staticConfigs `yaml:"static_configs,omitempty"`
}
type config struct {
ScrapeConfigs []scrapeConfig `yaml:"scrape_configs,omitempty"`
}
promCfg := config{}

scrapeConfigFile := filepath.Join(promCfgDir, "prom.yaml")
f, err := os.OpenFile(scrapeConfigFile, os.O_RDWR|os.O_CREATE, 0o644)
if err != nil {
return err
}

// lock config file exclusively, blocks all other producers until unlocked or process (test) exits
err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
if err != nil {
return err
}
err = gopkgyaml.NewDecoder(f).Decode(&promCfg)
if err != nil && !errors.Is(err, io.EOF) {
return err
}

hostUrl, err := url.Parse(cfg.Host)
if err != nil {
return err
}
promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, scrapeConfig{
JobName: jobName,
ScrapeInterval: (5 * time.Second).String(),
BearerToken: cfg.BearerToken,
TlsConfig: tlsConfig{InsecureSkipVerify: true},
Scheme: hostUrl.Scheme,
StaticConfigs: []staticConfigs{{
Targets: []string{hostUrl.Host},
Labels: labels,
}},
})
err = f.Truncate(0)
if err != nil {
return err
}
_, err = f.Seek(0, 0)
if err != nil {
return err
}
err = gopkgyaml.NewEncoder(f).Encode(&promCfg)
if err != nil {
return err
}
err = syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
if err != nil {
return err
}
f.Close()

_, err = http.Post(promUrl+"/-/reload", "", nil)
if err != nil {
return err
}

type targets struct {
Data struct {
ActiveTargets []struct {
Health string `json:"health,omitempty"`
ScrapePool string `json:"scrapePool,omitempty"`
} `json:"activeTargets,omitempty"`
} `json:"data,omitempty"`
}

var wg sync.WaitGroup
wg.Add(1)
err = nil
go func() {
i := 0
defer wg.Done()
for {
err = func() error {
b, err := http.Get(promUrl + "/api/v1/targets")
if err != nil {
return err
}
defer b.Body.Close()
var promTargets targets
err = json.NewDecoder(b.Body).Decode(&promTargets)
if err != nil {
return err
}

for _, activeTarget := range promTargets.Data.ActiveTargets {
if activeTarget.ScrapePool != jobName {
continue
}
if activeTarget.Health == "up" {
return nil
}
break
}
return errors.New("target not yet active")
}()
// if no error occurred, the target is up, we can finish querying
if err == nil {
return
}

select {
case <-time.After(time.Second):
i++
if i > 10 {
err = errors.New("kcp prometheus scrape target never became healthy")
return
}
case <-ctx.Done():
err = ctx.Err()
return
}
}
}()
wg.Wait()
return err
}

func CreateClientCA(t *testing.T) (string, string) {
clientCADir := t.TempDir()
_, err := crypto.MakeSelfSignedCA(
Expand Down Expand Up @@ -272,6 +434,10 @@ func newKcpFixture(t *testing.T, cfgs ...kcpConfig) *kcpFixture {
}
wg.Wait()

for _, server := range servers {
ScrapeMetricsForServer(t, server)
}

if t.Failed() {
t.Fatal("Fixture setup failed: one or more servers did not become ready")
}
Expand Down Expand Up @@ -631,8 +797,8 @@ func (c *kcpServer) Run(opts ...RunOption) error {
if err != nil && ctx.Err() == nil {
// we care about errors in the process that did not result from the
// context expiring and us ending the process
data := c.filterKcpLogs(&log)
c.t.Errorf("`kcp` failed: %v logs:\n%v", err, data)
_ = c.filterKcpLogs(&log)
c.t.Errorf("`kcp` failed: %v", err)
}
}()

Expand Down

0 comments on commit d05581a

Please sign in to comment.