Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support self provisioned ES in streaming strategy #842

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions pkg/controller/jaeger/jaeger_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/jaegertracing/jaeger-operator/pkg/storage"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it placed here by make format?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

It either places it after core imports or after 3rd party imports in a separate block. However when I put it into operator imports it leaves it there.


"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand Down Expand Up @@ -161,20 +163,7 @@ func (r *ReconcileJaeger) Reconcile(request reconcile.Request) (reconcile.Result

originalInstance := *instance

opts := client.MatchingLabels(map[string]string{
"app.kubernetes.io/instance": instance.Name,
"app.kubernetes.io/managed-by": "jaeger-operator",
})
list := &corev1.SecretList{}
if err := r.client.List(ctx, list, opts); err != nil {
instance.Status.Phase = v1.JaegerPhaseFailed
if err := r.client.Status().Update(ctx, instance); err != nil {
// we let it return the real error later
logFields.WithError(err).Error("failed to store the failed status into the current CustomResource after preconditions")
}
return reconcile.Result{}, tracing.HandleError(err, span)
}
str := r.runStrategyChooser(ctx, instance, list.Items)
str := r.runStrategyChooser(ctx, instance)

updated, err := r.apply(ctx, *instance, str)
if err != nil {
Expand Down Expand Up @@ -226,16 +215,16 @@ func validate(jaeger *v1.Jaeger) error {
return nil
}

func (r *ReconcileJaeger) runStrategyChooser(ctx context.Context, instance *v1.Jaeger, secrets []corev1.Secret) strategy.S {
func (r *ReconcileJaeger) runStrategyChooser(ctx context.Context, instance *v1.Jaeger) strategy.S {
if nil == r.strategyChooser {
return defaultStrategyChooser(ctx, instance, secrets)
return defaultStrategyChooser(ctx, instance)
}

return r.strategyChooser(ctx, instance)
}

func defaultStrategyChooser(ctx context.Context, instance *v1.Jaeger, secrets []corev1.Secret) strategy.S {
return strategy.For(ctx, instance, secrets)
func defaultStrategyChooser(ctx context.Context, instance *v1.Jaeger) strategy.S {
return strategy.For(ctx, instance)
}

func (r *ReconcileJaeger) apply(ctx context.Context, jaeger v1.Jaeger, str strategy.S) (v1.Jaeger, error) {
Expand All @@ -248,6 +237,31 @@ func (r *ReconcileJaeger) apply(ctx context.Context, jaeger v1.Jaeger, str strat
return jaeger, tracing.HandleError(err, span)
}

// ES cert handling requires secretes from environment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/secretes/secrets

// therefore running this here and not in the strategy
if storage.ShouldDeployElasticsearch(jaeger.Spec.Storage) {
opts := client.MatchingLabels(map[string]string{
"app.kubernetes.io/instance": jaeger.Name,
"app.kubernetes.io/managed-by": "jaeger-operator",
})
secrets := &corev1.SecretList{}
if err := r.client.List(ctx, secrets, opts); err != nil {
jaeger.Status.Phase = v1.JaegerPhaseFailed
if err := r.client.Status().Update(ctx, &jaeger); err != nil {
// we let it return the real error later
jaeger.Logger().WithError(err).Error("failed to store the failed status into the current CustomResource after preconditions")
}
return jaeger, tracing.HandleError(err, span)
}
es := &storage.ElasticsearchDeployment{Jaeger: &jaeger, CertScript: "./scripts/cert_generation.sh", Secrets: secrets.Items}
err = es.CreateCerts()
if err != nil {
es.Jaeger.Logger().WithError(err).Error("failed to create Elasticsearch certificates, Elasticsearch won't be deployed")
return jaeger, err
}
str.WithSecrets(append(str.Secrets(), es.ExtractSecrets()...))
}

// secrets have to be created before ES - they are mounted to the ES pod
if err := r.applySecrets(ctx, jaeger, str.Secrets()); err != nil {
return jaeger, tracing.HandleError(err, span)
Expand Down
8 changes: 3 additions & 5 deletions pkg/strategy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
)

// For returns the appropriate Strategy for the given Jaeger instance
func For(ctx context.Context, jaeger *v1.Jaeger, secrets []corev1.Secret) S {
func For(ctx context.Context, jaeger *v1.Jaeger) S {
tracer := global.TraceProvider().GetTracer(v1.ReconciliationTracer)
ctx, span := tracer.Start(ctx, "strategy.For")
defer span.End()
Expand All @@ -47,12 +47,10 @@ func For(ctx context.Context, jaeger *v1.Jaeger, secrets []corev1.Secret) S {
}

if jaeger.Spec.Strategy == v1.DeploymentStrategyStreaming {
es := &storage.ElasticsearchDeployment{Jaeger: jaeger, CertScript: esCertGenerationScript, Secrets: secrets}
return newStreamingStrategy(ctx, jaeger, es)
return newStreamingStrategy(ctx, jaeger)
}

es := &storage.ElasticsearchDeployment{Jaeger: jaeger, CertScript: esCertGenerationScript, Secrets: secrets}
return newProductionStrategy(ctx, jaeger, es)
return newProductionStrategy(ctx, jaeger)
}

// normalize changes the incoming Jaeger object so that the defaults are applied when
Expand Down
12 changes: 6 additions & 6 deletions pkg/strategy/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
func TestNewControllerForAllInOneAsDefault(t *testing.T) {
jaeger := v1.NewJaeger(types.NamespacedName{Name: "TestNewControllerForAllInOneAsDefault"})

ctrl := For(context.TODO(), jaeger, []corev1.Secret{})
ctrl := For(context.TODO(), jaeger)
assert.Equal(t, ctrl.Type(), v1.DeploymentStrategyAllInOne)
}

func TestNewControllerForAllInOneAsExplicitValue(t *testing.T) {
jaeger := v1.NewJaeger(types.NamespacedName{Name: "TestNewControllerForAllInOneAsExplicitValue"})
jaeger.Spec.Strategy = v1.DeploymentStrategyDeprecatedAllInOne // same as 'all-in-one'

ctrl := For(context.TODO(), jaeger, []corev1.Secret{})
ctrl := For(context.TODO(), jaeger)
assert.Equal(t, ctrl.Type(), v1.DeploymentStrategyAllInOne)
}

Expand All @@ -32,7 +32,7 @@ func TestNewControllerForProduction(t *testing.T) {
jaeger.Spec.Strategy = v1.DeploymentStrategyProduction
jaeger.Spec.Storage.Type = "elasticsearch"

ctrl := For(context.TODO(), jaeger, []corev1.Secret{})
ctrl := For(context.TODO(), jaeger)
assert.Equal(t, ctrl.Type(), v1.DeploymentStrategyProduction)
}

Expand All @@ -51,7 +51,7 @@ func TestElasticsearchAsStorageOptions(t *testing.T) {
"es.server-urls": "http://elasticsearch-example-es-cluster:9200",
})

ctrl := For(context.TODO(), jaeger, []corev1.Secret{})
ctrl := For(context.TODO(), jaeger)
deps := ctrl.Deployments()
assert.Len(t, deps, 2) // query and collector, for a production setup
counter := 0
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestDeprecatedAllInOneStrategy(t *testing.T) {
Strategy: v1.DeploymentStrategyDeprecatedAllInOne,
},
}
For(context.TODO(), jaeger, []corev1.Secret{})
For(context.TODO(), jaeger)
assert.Equal(t, v1.DeploymentStrategyAllInOne, jaeger.Spec.Strategy)
}

Expand All @@ -130,7 +130,7 @@ func TestStorageMemoryOnlyUsedWithAllInOneStrategy(t *testing.T) {
},
},
}
For(context.TODO(), jaeger, []corev1.Secret{})
For(context.TODO(), jaeger)
assert.Equal(t, v1.DeploymentStrategyAllInOne, jaeger.Spec.Strategy)
}

Expand Down
13 changes: 4 additions & 9 deletions pkg/strategy/production.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/jaegertracing/jaeger-operator/pkg/storage"
)

func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.ElasticsearchDeployment) S {
func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger) S {
tracer := global.TraceProvider().GetTracer(v1.ReconciliationTracer)
ctx, span := tracer.Start(ctx, "newProductionStrategy")
defer span.End()
Expand Down Expand Up @@ -116,7 +116,7 @@ func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.E
for i := range esRollover {
jobs = append(jobs, &esRollover[i].Spec.JobTemplate.Spec.Template.Spec)
}
autoProvisionElasticsearch(&c, es, jobs, []*appsv1.Deployment{queryDep, cDep})
autoProvisionElasticsearch(&c, jaeger, jobs, []*appsv1.Deployment{queryDep, cDep})
}

// the index cleaner ES job, which may have been changed by the ES self-provisioning routine
Expand All @@ -133,18 +133,13 @@ func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.E
return c
}

func autoProvisionElasticsearch(manifest *S, es *storage.ElasticsearchDeployment, curatorPods []*corev1.PodSpec, deployments []*appsv1.Deployment) {
err := es.CreateCerts()
if err != nil {
es.Jaeger.Logger().WithError(err).Error("failed to create Elasticsearch certificates, Elasticsearch won't be deployed")
return
}
func autoProvisionElasticsearch(manifest *S, jaeger *v1.Jaeger, curatorPods []*corev1.PodSpec, deployments []*appsv1.Deployment) {
es := &storage.ElasticsearchDeployment{Jaeger: jaeger}
for i := range deployments {
es.InjectStorageConfiguration(&deployments[i].Spec.Template.Spec)
}
for _, pod := range curatorPods {
es.InjectSecretsConfiguration(pod)
}
manifest.secrets = es.ExtractSecrets()
manifest.elasticsearches = append(manifest.elasticsearches, *es.Elasticsearch())
}
25 changes: 10 additions & 15 deletions pkg/strategy/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -23,7 +22,7 @@ func init() {

func TestCreateProductionDeployment(t *testing.T) {
name := "TestCreateProductionDeployment"
c := newProductionStrategy(context.Background(), v1.NewJaeger(types.NamespacedName{Name: name}), &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), v1.NewJaeger(types.NamespacedName{Name: name}))
assertDeploymentsAndServicesForProduction(t, name, c, false, false, false)
}

Expand All @@ -35,7 +34,7 @@ func TestCreateProductionDeploymentOnOpenShift(t *testing.T) {
jaeger := v1.NewJaeger(types.NamespacedName{Name: name})
normalize(context.Background(), jaeger)

c := newProductionStrategy(context.Background(), jaeger, &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), jaeger)
assertDeploymentsAndServicesForProduction(t, name, c, false, true, false)
}

Expand All @@ -45,7 +44,7 @@ func TestCreateProductionDeploymentWithDaemonSetAgent(t *testing.T) {
j := v1.NewJaeger(types.NamespacedName{Name: name})
j.Spec.Agent.Strategy = "DaemonSet"

c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), j)
assertDeploymentsAndServicesForProduction(t, name, c, true, false, false)
}

Expand All @@ -59,7 +58,7 @@ func TestCreateProductionDeploymentWithUIConfigMap(t *testing.T) {
},
})

c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), j)
assertDeploymentsAndServicesForProduction(t, name, c, false, false, true)
}

Expand All @@ -86,7 +85,7 @@ func TestOptionsArePassed(t *testing.T) {
},
}

ctrl := For(context.Background(), jaeger, []corev1.Secret{})
ctrl := For(context.Background(), jaeger)
deployments := ctrl.Deployments()
for _, dep := range deployments {
args := dep.Spec.Template.Spec.Containers[0].Args
Expand All @@ -110,7 +109,7 @@ func TestDelegateProductionDependencies(t *testing.T) {
// for now, we just have storage dependencies
j := v1.NewJaeger(types.NamespacedName{Name: "TestDelegateProductionDependencies"})
j.Spec.Storage.Type = "cassandra"
c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), j)
assert.Equal(t, c.Dependencies(), storage.Dependencies(j))
}

Expand Down Expand Up @@ -165,19 +164,19 @@ func assertDeploymentsAndServicesForProduction(t *testing.T, name string, s S, h

func TestSparkDependenciesProduction(t *testing.T) {
testSparkDependencies(t, func(jaeger *v1.Jaeger) S {
return newProductionStrategy(context.Background(), jaeger, &storage.ElasticsearchDeployment{Jaeger: jaeger})
return newProductionStrategy(context.Background(), jaeger)
})
}

func TestEsIndexCleanerProduction(t *testing.T) {
testEsIndexCleaner(t, func(jaeger *v1.Jaeger) S {
return newProductionStrategy(context.Background(), jaeger, &storage.ElasticsearchDeployment{Jaeger: jaeger})
return newProductionStrategy(context.Background(), jaeger)
})
}

func TestAgentSidecarIsInjectedIntoQueryForStreamingForProduction(t *testing.T) {
j := v1.NewJaeger(types.NamespacedName{Name: "TestAgentSidecarIsInjectedIntoQueryForStreamingForProduction"})
c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), j)
for _, dep := range c.Deployments() {
if strings.HasSuffix(dep.Name, "-query") {
assert.Equal(t, 2, len(dep.Spec.Template.Spec.Containers))
Expand All @@ -194,11 +193,7 @@ func TestElasticsearchInject(t *testing.T) {
j.Spec.Storage.EsIndexCleaner.Enabled = &verdad
j.Spec.Storage.EsIndexCleaner.NumberOfDays = &one
j.Spec.Storage.Options = v1.NewOptions(map[string]interface{}{"es.use-aliases": true})
es := &storage.ElasticsearchDeployment{Jaeger: j, CertScript: "../../scripts/cert_generation.sh"}
err := es.CleanCerts()
require.NoError(t, err)
defer es.CleanCerts()
c := newProductionStrategy(context.Background(), j, es)
c := newProductionStrategy(context.Background(), j)
// there should be index-cleaner, rollover, lookback
assert.Equal(t, 3, len(c.cronJobs))
assertEsInjectSecrets(t, c.cronJobs[0].Spec.JobTemplate.Spec.Template.Spec)
Expand Down
4 changes: 2 additions & 2 deletions pkg/strategy/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/jaegertracing/jaeger-operator/pkg/util"
)

func newStreamingStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.ElasticsearchDeployment) S {
func newStreamingStrategy(ctx context.Context, jaeger *v1.Jaeger) S {
tracer := global.TraceProvider().GetTracer(v1.ReconciliationTracer)
ctx, span := tracer.Start(ctx, "newStreamingStrategy")
defer span.End()
Expand Down Expand Up @@ -139,7 +139,7 @@ func newStreamingStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.El
if ingesterDep != nil {
deps = append(deps, ingesterDep)
}
autoProvisionElasticsearch(&manifest, es, jobs, deps)
autoProvisionElasticsearch(&manifest, jaeger, jobs, deps)
}
manifest.deployments = []appsv1.Deployment{*cDep, *queryDep}
if ingesterDep != nil {
Expand Down
Loading