Skip to content

Commit

Permalink
fix: query Prometheus through a dedicated service
Browse files Browse the repository at this point in the history
The prometheus-operated service is a statefulset governing service. It
is intended to be used by the statefulset controller for assigning a
network identity to individual pods, and is not supposed to be used for
http communication.

This commit creates a dedicated service for each Prometheus statefulset
and reconfigures the GrafanaDataSource to query it instead.
  • Loading branch information
fpetkovski authored Nov 19, 2021
1 parent e607afe commit 58586e8
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ linters:

issues:
exclude-rules:
- path: zz_generated.deepcopy.go
linters:
- goimports
- path: _test.go
linters:
- errcheck
22 changes: 10 additions & 12 deletions pkg/controllers/monitoring-stack/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package monitoringstack
import (
stack "rhobs/monitoring-stack-operator/pkg/apis/v1alpha1"

monv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

monv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func newAlertmanager(
Expand All @@ -29,11 +30,11 @@ func newAlertmanager(
ObjectMeta: metav1.ObjectMeta{
Name: ms.Name,
Namespace: ms.Namespace,
Labels: commonLabels(ms.Name, instanceSelectorKey, instanceSelectorValue),
Labels: objectLabels(ms.Name, ms.Name, instanceSelectorKey, instanceSelectorValue),
},
Spec: monv1.AlertmanagerSpec{
PodMetadata: &monv1.EmbeddedObjectMetadata{
Labels: commonLabels(ms.Name, instanceSelectorKey, instanceSelectorValue),
Labels: podLabels("alertmanager", ms.Name),
},
Replicas: &replicas,
ServiceAccountName: rbacResourceName,
Expand All @@ -43,23 +44,20 @@ func newAlertmanager(
}
}

func newAlertmanagerService(ms *stack.MonitoringStack) *corev1.Service {
func newAlertmanagerService(ms *stack.MonitoringStack, instanceSelectorKey string, instanceSelectorValue string) *corev1.Service {
name := ms.Name + "-alertmanager"
return &corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: ms.Name + "-alertmanager",
Name: name,
Namespace: ms.Namespace,
Labels: map[string]string{
"app.kubernetes.io/part-of": ms.Name,
},
Labels: objectLabels(name, ms.Name, instanceSelectorKey, instanceSelectorValue),
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app.kubernetes.io/part-of": ms.Name,
},
Selector: podLabels("alertmanager", ms.Name),
Ports: []corev1.ServicePort{
{
Name: "web",
Expand Down
78 changes: 71 additions & 7 deletions pkg/controllers/monitoring-stack/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,14 @@ func stackComponentPatchers(ms *stack.MonitoringStack, instanceSelectorKey strin
},
{
empty: func() client.Object {
service := newAlertmanagerService(ms)
service := newAlertmanagerService(ms, instanceSelectorKey, instanceSelectorValue)
return &corev1.Service{
TypeMeta: service.TypeMeta,
ObjectMeta: service.ObjectMeta,
}
},
patch: func(existing client.Object) (client.Object, error) {
service := newAlertmanagerService(ms)
service := newAlertmanagerService(ms, instanceSelectorKey, instanceSelectorValue)

if existing == nil {
return service, nil
Expand Down Expand Up @@ -233,6 +233,33 @@ func stackComponentPatchers(ms *stack.MonitoringStack, instanceSelectorKey strin
return desired, nil
},
},
{
empty: func() client.Object {
service := newPrometheusService(ms, instanceSelectorKey, instanceSelectorValue)
return &corev1.Service{
TypeMeta: service.TypeMeta,
ObjectMeta: service.ObjectMeta,
}
},
patch: func(existing client.Object) (client.Object, error) {
service := newPrometheusService(ms, instanceSelectorKey, instanceSelectorValue)

if existing == nil {
return service, nil
}

desired, ok := existing.(*corev1.Service)
if !ok {
return nil, NewObjectTypeError(service, existing)
}

// The ClusterIP field is immutable and we have to take it from the observed object.
service.Spec.ClusterIP = desired.Spec.ClusterIP
desired.Spec = service.Spec
desired.Labels = service.Labels
return desired, nil
},
},
{
empty: func() client.Object {
dataSource := newGrafanaDataSource(ms)
Expand Down Expand Up @@ -261,8 +288,8 @@ func stackComponentPatchers(ms *stack.MonitoringStack, instanceSelectorKey strin
}

func newGrafanaDataSource(ms *stack.MonitoringStack) *grafanav1alpha1.GrafanaDataSource {
datasourceName := fmt.Sprintf("ms-%s-%s", ms.GetNamespace(), ms.GetName())
prometheusURL := fmt.Sprintf("prometheus-operated.%s:9090", ms.GetNamespace())
datasourceName := fmt.Sprintf("ms-%s-%s", ms.Namespace, ms.Name)
prometheusURL := fmt.Sprintf("%s-prometheus.%s:9090", ms.Name, ms.Namespace)
return &grafanav1alpha1.GrafanaDataSource{
TypeMeta: metav1.TypeMeta{
APIVersion: "integreatly.org/v1alpha1",
Expand Down Expand Up @@ -344,10 +371,14 @@ func newPrometheus(
ObjectMeta: metav1.ObjectMeta{
Name: ms.Name,
Namespace: ms.Namespace,
Labels: commonLabels(ms.Name, instanceSelectorKey, instanceSelectorValue),
Labels: objectLabels(ms.Name, ms.Name, instanceSelectorKey, instanceSelectorValue),
},

Spec: monv1.PrometheusSpec{
PodMetadata: &monv1.EmbeddedObjectMetadata{
Labels: podLabels("prometheus", ms.Name),
},

// Prometheus does not use an Enum for LogLevel, so need to convert to string
LogLevel: string(ms.Spec.LogLevel),

Expand All @@ -367,7 +398,7 @@ func newPrometheus(
Alertmanagers: []monv1.AlertmanagerEndpoints{
{
APIVersion: "v2",
Name: newAlertmanagerService(ms).Name,
Name: ms.Name + "-alertmanager",
Namespace: ms.Namespace,
Scheme: "http",
Port: intstr.FromString("web"),
Expand Down Expand Up @@ -416,6 +447,31 @@ func newRoleBinding(ms *stack.MonitoringStack, rbacResourceName string) *rbacv1.
return roleBinding
}

func newPrometheusService(ms *stack.MonitoringStack, instanceSelectorKey string, instanceSelectorValue string) *corev1.Service {
name := ms.Name + "-prometheus"
return &corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ms.Namespace,
Labels: objectLabels(name, ms.Name, instanceSelectorKey, instanceSelectorValue),
},
Spec: corev1.ServiceSpec{
Selector: podLabels("prometheus", ms.Name),
Ports: []corev1.ServicePort{
{
Name: "web",
Port: 9090,
TargetPort: intstr.FromInt(9090),
},
},
},
}
}

func newAdditionalScrapeConfigsSecret(ms *stack.MonitoringStack, name string) *corev1.Secret {
return &corev1.Secret{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -581,9 +637,17 @@ func newAdditionalScrapeConfigsSecret(ms *stack.MonitoringStack, name string) *c
}
}

func commonLabels(msName string, instanceSelectorKey string, instanceSelectorValue string) map[string]string {
func objectLabels(name string, msName string, instanceSelectorKey string, instanceSelectorValue string) map[string]string {
return map[string]string{
instanceSelectorKey: instanceSelectorValue,
"app.kubernetes.io/name": name,
"app.kubernetes.io/part-of": msName,
}
}

func podLabels(component string, msName string) map[string]string {
return map[string]string{
"app.kubernetes.io/component": component,
"app.kubernetes.io/part-of": msName,
}
}
10 changes: 5 additions & 5 deletions test/e2e/framework/assertions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"testing"
"time"

corev1 "k8s.io/api/core/v1"
appsv1 "k8s.io/api/apps/v1"

"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -52,14 +52,14 @@ func (f *Framework) AssertResourceEventuallyExists(name string, namespace string
}
}

// AssertPodEventuallyRuns asserts that a pod eventually gets into a Running phase
func (f *Framework) AssertPodEventuallyRuns(name string, namespace string) func(t *testing.T) {
// AssertStatefulsetReady asserts that a statefulset has the desired number of pods running
func (f *Framework) AssertStatefulsetReady(name string, namespace string) func(t *testing.T) {
return func(t *testing.T) {
key := types.NamespacedName{Name: name, Namespace: namespace}
if err := wait.Poll(5*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
pod := &corev1.Pod{}
pod := &appsv1.StatefulSet{}
err := f.K8sClient.Get(context.Background(), key, pod)
return err == nil && pod.Status.Phase == corev1.PodRunning, nil
return err == nil && pod.Status.ReadyReplicas == *pod.Spec.Replicas, nil
}); err != nil {
t.Fatal(err)
}
Expand Down
37 changes: 37 additions & 0 deletions test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package framework

import (
"bytes"
"context"
"fmt"
"net/http"
"net/url"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
Expand Down Expand Up @@ -48,3 +52,36 @@ func (f *Framework) StartPortForward(podName string, ns string, port string, sto
<-readyChan
return nil
}

// StartServicePortForward initiates a port forwarding connection to a service on the localhost interface.
//
// The function call blocks until the port forwarding proxy server is ready to receive connections.
func (f *Framework) StartServicePortForward(serviceName string, ns string, port string, stopChan chan struct{}) error {
pods, err := f.getPodsForService(serviceName, ns)
if err != nil {
return err
}
if len(pods) == 0 {
return fmt.Errorf("no pods found for service %s/%s", serviceName, ns)
}
return f.StartPortForward(pods[0].Name, ns, port, stopChan)
}

func (f *Framework) getPodsForService(name string, namespace string) ([]corev1.Pod, error) {
var svc corev1.Service
key := types.NamespacedName{
Namespace: namespace,
Name: name,
}
if err := f.K8sClient.Get(context.Background(), key, &svc); err != nil {
return nil, err
}

selector := svc.Spec.Selector
var pods corev1.PodList
if err := f.K8sClient.List(context.Background(), &pods, client.MatchingLabels(selector)); err != nil {
return nil, err
}

return pods.Items, nil
}
8 changes: 4 additions & 4 deletions test/e2e/monitoring_stack_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ func assertPrometheusScrapesItself(t *testing.T) {
ms := newMonitoringStack(t, "self-scrape")
err := f.K8sClient.Create(context.Background(), ms)
assert.NilError(t, err)
f.AssertPodEventuallyRuns("prometheus-self-scrape-0", e2eTestNamespace)(t)
f.AssertStatefulsetReady("prometheus-self-scrape", e2eTestNamespace)(t)

stopChan := make(chan struct{})
defer close(stopChan)
if err := wait.Poll(5*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
err = f.StartPortForward("prometheus-self-scrape-0", e2eTestNamespace, "9090", stopChan)
err = f.StartServicePortForward("self-scrape-prometheus", e2eTestNamespace, "9090", stopChan)
return err == nil, nil
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -233,12 +233,12 @@ func assertAlertmanagerReceivesAlerts(t *testing.T) {
if err := f.K8sClient.Create(context.Background(), rule); err != nil {
t.Fatal(err)
}
f.AssertPodEventuallyRuns("alertmanager-alerting-0", e2eTestNamespace)(t)
f.AssertStatefulsetReady("alertmanager-alerting", e2eTestNamespace)(t)

stopChan := make(chan struct{})
defer close(stopChan)
if err := wait.Poll(5*time.Second, 5*time.Minute, func() (bool, error) {
err := f.StartPortForward("alertmanager-alerting-0", e2eTestNamespace, "9093", stopChan)
err := f.StartServicePortForward("alerting-alertmanager", e2eTestNamespace, "9093", stopChan)
return err == nil, nil
}); err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 58586e8

Please sign in to comment.