Skip to content

Commit

Permalink
Merge pull request #1283 from s-urbaniak/metrics-controller-cherry-pick
Browse files Browse the repository at this point in the history
Bug 2037274: metrics controller
  • Loading branch information
openshift-merge-robot authored Jan 12, 2022
2 parents 1ad5cdd + a48400b commit 800657a
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.3.0
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.26.0
github.com/robfig/cron v1.2.0
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.1.3
Expand Down
58 changes: 58 additions & 0 deletions pkg/operator/metricscontroller/legacy_cn_certs_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package metricscontroller

import (
"context"
"fmt"
"time"

prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"

operatorv1 "github.com/openshift/api/operator/v1"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/v1helpers"
)

// NewLegacyCNCertsMetricsSyncFunc creates a metrics sync function that executes the given query
// and interprets the result value as the count of invalid certificates containing CN fields used as host names.
// If the value is >=1 the `InvalidCertsDetected` condition state will be set.
//
// The supported prometheus query result types are:
// - vector: only the first sample of the vector will be used for evaluation.
// - scalar: the returned value will be used.
func NewLegacyCNCertsMetricsSyncFunc(query string, operatorClient v1helpers.OperatorClient) MetricsSyncFunc {
return func(ctx context.Context, controllerContext factory.SyncContext, client prometheusv1.API) error {
result, _, err := client.Query(ctx, query, time.Now())
if err != nil {
return fmt.Errorf("error executing prometheus query: %w", err)
}

var value model.SampleValue
switch result.Type() {
case model.ValVector:
value = result.(model.Vector)[0].Value
case model.ValScalar:
value = result.(*model.Scalar).Value
default:
return fmt.Errorf("unexpected prometheus query return type: %T", result)
}

_, _, err = v1helpers.UpdateStatus(operatorClient, v1helpers.UpdateConditionFn(newInvalidCertsCondition(float64(value))))
return err
}
}

func newInvalidCertsCondition(invalidCertsCount float64) operatorv1.OperatorCondition {
condition := operatorv1.OperatorCondition{
Type: "InvalidCertsUpgradeable",
Status: operatorv1.ConditionTrue,
}

if invalidCertsCount >= 1.0 {
condition.Status = operatorv1.ConditionFalse
condition.Reason = "InvalidCertsDetected"
condition.Message = fmt.Sprintf("%d server certificates without SAN detected", int(invalidCertsCount))
}

return condition
}
160 changes: 160 additions & 0 deletions pkg/operator/metricscontroller/legacy_cn_certs_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package metricscontroller

import (
"context"
"errors"
"reflect"
"testing"
"time"

prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
prometheusmodel "github.com/prometheus/common/model"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

operatorv1 "github.com/openshift/api/operator/v1"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/v1helpers"
)

type nopCloser struct{}

func (n *nopCloser) CloseIdleConnections() {}

type mockPrometheusClient struct {
prometheusv1.API

queryResult prometheusmodel.Value
queryError error
}

func (m *mockPrometheusClient) Query(ctx context.Context, query string, ts time.Time) (prometheusmodel.Value, prometheusv1.Warnings, error) {
return m.queryResult, nil, m.queryError
}

func TestLegacyCNCertsController(t *testing.T) {
for _, tc := range []struct {
name string
queryResult prometheusmodel.Value
queryError error
wantSyncError string
wantConditions []operatorv1.OperatorCondition
}{
{
name: "vector - valid certs",
queryResult: prometheusmodel.Vector([]*prometheusmodel.Sample{
{
Value: 0.0,
},
{
// second vector value exposes invalid certs,
// however the first one is picked.
Value: 1.0,
},
}),
wantConditions: []operatorv1.OperatorCondition{
{
Type: "InvalidCertsUpgradeable",
Status: "True",
},
},
},
{
name: "vector - invalid certs",
queryResult: prometheusmodel.Vector([]*prometheusmodel.Sample{
{
Value: 1.0,
},
{
// second vector value exposes no invalid certs,
// however the first one is picked.
Value: 0.0,
},
}),
wantConditions: []operatorv1.OperatorCondition{
{
Type: "InvalidCertsUpgradeable",
Status: "False",
Message: "1 server certificates without SAN detected",
Reason: "InvalidCertsDetected",
},
},
},
{
name: "scalar - valid certs",
queryResult: &prometheusmodel.Scalar{
Value: 0.0,
},
wantConditions: []operatorv1.OperatorCondition{
{
Type: "InvalidCertsUpgradeable",
Status: "True",
},
},
},
{
name: "scalar - invalid certs",
queryResult: &prometheusmodel.Scalar{
Value: 10.0,
},
wantConditions: []operatorv1.OperatorCondition{
{
Type: "InvalidCertsUpgradeable",
Status: "False",
Message: "10 server certificates without SAN detected",
Reason: "InvalidCertsDetected",
},
},
},
{
name: "scalar - invalid type",
queryResult: &prometheusmodel.String{Value: "foo"},
wantSyncError: "unexpected prometheus query return type: *model.String",
},
{
name: "prometheus failure",
queryError: errors.New("prometheus exploded"),
wantSyncError: "error executing prometheus query: prometheus exploded",
},
} {
t.Run(tc.name, func(t *testing.T) {
m := &mockPrometheusClient{
queryResult: tc.queryResult,
queryError: tc.queryError,
}

client := v1helpers.NewFakeOperatorClient(&operatorv1.OperatorSpec{}, &operatorv1.OperatorStatus{}, nil)

c := &metricsController{
operatorClient: client,
newPrometheusClientFunc: func() (prometheusv1.API, idleConnectionCloser, error) {
return m, &nopCloser{}, nil
},
metricsSyncFunc: NewLegacyCNCertsMetricsSyncFunc("", client),
}

gotSyncErr := ""
if err := c.sync(context.Background(), factory.NewSyncContext(tc.name, events.NewInMemoryRecorder(tc.name))); err != nil {
gotSyncErr = err.Error()
}

if gotSyncErr != tc.wantSyncError {
t.Fatalf("got sync error %q, want %q", gotSyncErr, tc.wantSyncError)
}

_, status, _, err := client.GetOperatorState()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

for i := range status.Conditions {
status.Conditions[i].LastTransitionTime = metav1.Time{}
}

if got := status.Conditions; !reflect.DeepEqual(got, tc.wantConditions) {
t.Errorf("got conditions %+v, want %+v", got, tc.wantConditions)
}
})
}
}
115 changes: 115 additions & 0 deletions pkg/operator/metricscontroller/metrics_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package metricscontroller

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/http"
"time"

prometheusapi "github.com/prometheus/client_golang/api"
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"

"k8s.io/client-go/transport"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/v1helpers"
)

// MetricsSyncFunc is used to set the controller synchronization function for metrics controller.
// It includes the prometheus client.
type MetricsSyncFunc func(ctx context.Context, controllerContext factory.SyncContext, promClient prometheusv1.API) error

type metricsController struct {
operatorClient v1helpers.OperatorClient
prometheusQuery string
newPrometheusClientFunc func() (prometheusv1.API, idleConnectionCloser, error)
metricsSyncFunc MetricsSyncFunc
recorder events.Recorder
}

// NewMetricsController creates a new metrics controller for the given name using an in-cluster prometheus client.
// The given service CA path will be used to read out the CA bundle being trusted by the prometheus client.
// The controller executes the given metrics sync function every minute.
func NewMetricsController(name string, operatorClient v1helpers.OperatorClient, recorder events.Recorder, serviceCAPath string, metricsSyncFunc MetricsSyncFunc) factory.Controller {
c := &metricsController{
operatorClient: operatorClient,
newPrometheusClientFunc: func() (prometheusv1.API, idleConnectionCloser, error) {
return newInClusterPrometheusClient(serviceCAPath)
},
metricsSyncFunc: metricsSyncFunc,
recorder: recorder,
}

return factory.New().
WithSync(c.sync).
ResyncEvery(time.Minute).
ToController(name, recorder)
}

func (c *metricsController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
client, closer, err := c.newPrometheusClientFunc()
if err != nil {
err = fmt.Errorf("error instantiating prometheus client: %w", err)
c.recorder.Warning("PrometheusClientFailed", err.Error())
return err
}
// if idle connections won't be closed, memory leaks will occur.
defer closer.CloseIdleConnections()
return c.metricsSyncFunc(ctx, syncCtx, client)
}

type idleConnectionCloser interface {
CloseIdleConnections()
}

func newInClusterPrometheusClient(serviceCAPath string) (prometheusv1.API, idleConnectionCloser, error) {
serviceCABytes, err := ioutil.ReadFile(serviceCAPath)
if err != nil {
return nil, nil, fmt.Errorf("error reading service CA: %w", err)
}

httpTransport, err := newTransport(serviceCABytes)
if err != nil {
return nil, nil, fmt.Errorf("error instantiating prometheus client transport: %w", err)
}

saToken, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
if err != nil {
return nil, nil, fmt.Errorf("error reading service account token: %w", err)
}

client, err := prometheusapi.NewClient(prometheusapi.Config{
Address: "https://" + net.JoinHostPort("thanos-querier.openshift-monitoring.svc", "9091"),
RoundTripper: transport.NewBearerAuthRoundTripper(
string(saToken),
httpTransport,
),
})
if err != nil {
return nil, nil, fmt.Errorf("error creating prometheus client: %w", err)
}

return prometheusv1.NewAPI(client), httpTransport, nil
}

func newTransport(serviceCABytes []byte) (*http.Transport, error) {
roots := x509.NewCertPool()
roots.AppendCertsFromPEM(serviceCABytes)

return &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
RootCAs: roots,
},
}, nil
}
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ github.com/prometheus/client_golang/prometheus/testutil/promlint
# github.com/prometheus/client_model v0.2.0
github.com/prometheus/client_model/go
# github.com/prometheus/common v0.26.0
## explicit
github.com/prometheus/common/expfmt
github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg
github.com/prometheus/common/model
Expand Down

0 comments on commit 800657a

Please sign in to comment.