Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Isolate operator from http service misconfiguration - Use internal service #5211

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/apis/elasticsearch/v1/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
configSecretSuffix = "config"
secureSettingsSecretSuffix = "secure-settings"
httpServiceSuffix = "http"
internalHTTPServiceSuffix = "internal-http"
transportServiceSuffix = "transport"
elasticUserSecretSuffix = "elastic-user"
internalUsersSecretSuffix = "internal-users"
Expand Down Expand Up @@ -128,6 +129,10 @@ func TransportService(esName string) string {
return ESNamer.Suffix(esName, transportServiceSuffix)
}

func InternalHTTPService(esName string) string {
return ESNamer.Suffix(esName, internalHTTPServiceSuffix)
}

func HTTPService(esName string) string {
return ESNamer.Suffix(esName, httpServiceSuffix)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/autoscaling/elasticsearch/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ var (
fakeService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testns",
Name: services.ExternalServiceName("testes"),
Name: services.InternalServiceName("testes"),
},
}
fakeEndpoints = &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testns",
Name: services.ExternalServiceName("testes"),
Name: services.InternalServiceName("testes"),
},
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/autoscaling/elasticsearch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newStatusBuilder(log logr.Logger, autoscalingSpec esv1.AutoscalingSpec) *st
// Check if the Service is available.
func (r *ReconcileElasticsearch) isElasticsearchReachable(ctx context.Context, es esv1.Elasticsearch) (bool, error) {
defer tracing.Span(&ctx)()
externalService, err := services.GetExternalService(r.Client, es)
externalService, err := services.GetInternalService(r.Client, es)
naemono marked this conversation as resolved.
Show resolved Hide resolved
if apierrors.IsNotFound(err) {
return false, nil
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/elasticsearch/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,17 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
return results.WithError(err)
}

var internalService *corev1.Service
internalService, err = common.ReconcileService(ctx, d.Client, services.NewInternalService(d.ES), &d.ES)
if err != nil {
return results.WithError(err)
}

certificateResources, res := certificates.Reconcile(
ctx,
d,
d.ES,
[]corev1.Service{*externalService},
[]corev1.Service{*externalService, *internalService},
d.OperatorParameters.CACertRotation,
d.OperatorParameters.CertRotation,
)
Expand Down Expand Up @@ -195,7 +201,7 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
)
defer esClient.Close()

esReachable, err := services.IsServiceReady(d.Client, *externalService)
esReachable, err := services.IsServiceReady(d.Client, *internalService)
if err != nil {
return results.WithError(err)
}
Expand Down
61 changes: 56 additions & 5 deletions pkg/controller/elasticsearch/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
Expand Down Expand Up @@ -67,19 +68,60 @@ func ExternalServiceName(esName string) string {
return esv1.HTTPService(esName)
}

// InternalServiceName returns the name for the internal service
// associated to this cluster, managed by the operator exclusively.
func InternalServiceName(esName string) string {
return esv1.InternalHTTPService(esName)
}

// ExternalTransportServiceHost returns the hostname and the port used to reach Elasticsearch's transport endpoint.
func ExternalTransportServiceHost(es types.NamespacedName) string {
return stringsutil.Concat(TransportServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.TransportPort))
}

// ExternalServiceURL returns the URL used to reach Elasticsearch's external endpoint
// ExternalServiceURL returns the URL used to reach Elasticsearch's external endpoint.
func ExternalServiceURL(es esv1.Elasticsearch) string {
return stringsutil.Concat(es.Spec.HTTP.Protocol(), "://", ExternalServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.HTTPPort))
}

// NewExternalService returns the external service associated to the given cluster
// InternalServiceURL returns the URL used to reach Elasticsearch's internally managed service
naemono marked this conversation as resolved.
Show resolved Hide resolved
func InternalServiceURL(es esv1.Elasticsearch) string {
return stringsutil.Concat(es.Spec.HTTP.Protocol(), "://", InternalServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.HTTPPort))
}

// NewExternalService returns the external service associated to the given cluster.
// It is used by users to perform requests against one of the cluster nodes.
func NewExternalService(es esv1.Elasticsearch) *corev1.Service {
return newServiceWithName(es, ExternalServiceName(es.Name))
}

// NewInternalService returns the internal service associated to the given cluster.
// It is used by the operator to perform requests against the Elasticsearch cluster nodes,
// and does not inherit the spec defined within the Elasticsearch custom resource,
// to remove the possibility of the user misconfiguring access to the ES cluster.
func NewInternalService(es esv1.Elasticsearch) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: InternalServiceName(es.Name),
Namespace: es.Namespace,
Labels: label.NewLabels(k8s.ExtractNamespacedName(&es)),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Name: es.Spec.HTTP.Protocol(),
Protocol: corev1.ProtocolTCP,
Port: network.HTTPPort,
},
},
Selector: label.NewLabels(k8s.ExtractNamespacedName(&es)),
PublishNotReadyAddresses: false,
},
}
}

func newServiceWithName(es esv1.Elasticsearch, serviceName string) *corev1.Service {
naemono marked this conversation as resolved.
Show resolved Hide resolved
nsn := k8s.ExtractNamespacedName(&es)

svc := corev1.Service{
Expand All @@ -88,7 +130,7 @@ func NewExternalService(es esv1.Elasticsearch) *corev1.Service {
}

svc.ObjectMeta.Namespace = es.Namespace
svc.ObjectMeta.Name = ExternalServiceName(es.Name)
svc.ObjectMeta.Name = serviceName

labels := label.NewLabels(nsn)
ports := []corev1.ServicePort{
Expand Down Expand Up @@ -120,11 +162,20 @@ func IsServiceReady(c k8s.Client, service corev1.Service) (bool, error) {

// GetExternalService returns the external service associated to the given Elasticsearch cluster.
func GetExternalService(c k8s.Client, es esv1.Elasticsearch) (corev1.Service, error) {
return getServiceByName(c, es, ExternalServiceName(es.Name))
}

// GetInternalService returns the internally managed service associated to the given Elasticsearch cluster.
func GetInternalService(c k8s.Client, es esv1.Elasticsearch) (corev1.Service, error) {
return getServiceByName(c, es, InternalServiceName(es.Name))
}

func getServiceByName(c k8s.Client, es esv1.Elasticsearch, serviceName string) (corev1.Service, error) {
var svc corev1.Service

namespacedName := types.NamespacedName{
Namespace: es.Namespace,
Name: ExternalServiceName(es.Name),
Name: serviceName,
}

if err := c.Get(context.Background(), namespacedName, &svc); err != nil {
Expand Down Expand Up @@ -153,7 +204,7 @@ func ElasticsearchURL(es esv1.Elasticsearch, pods []corev1.Pod) string {
return podURL
}
}
return ExternalServiceURL(es)
return InternalServiceURL(es)
}

// ElasticsearchPodURL calculates the URL for the given Pod based on the Pods metadata.
Expand Down
42 changes: 39 additions & 3 deletions pkg/controller/elasticsearch/services/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestElasticsearchURL(t *testing.T) {
},
},
},
want: "https://my-cluster-es-http.my-ns.svc:9200",
want: "https://my-cluster-es-internal-http.my-ns.svc:9200",
},
{
name: "scheme change in progress: random pod address",
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestElasticsearchURL(t *testing.T) {
{},
},
},
want: "https://my-cluster-es-http.my-ns.svc:9200",
want: "https://my-cluster-es-internal-http.my-ns.svc:9200",
},
{
name: "unexpected: partially missing pod labels: fallback to service",
Expand All @@ -148,7 +148,7 @@ func TestElasticsearchURL(t *testing.T) {
},
},
},
want: "https://my-cluster-es-http.my-ns.svc:9200",
want: "https://my-cluster-es-internal-http.my-ns.svc:9200",
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -222,6 +222,42 @@ func TestNewExternalService(t *testing.T) {
}
}

func TestNewInternalService(t *testing.T) {
testCases := []struct {
name string
httpConf commonv1.HTTPConfig
wantSvc func() corev1.Service
}{
{
name: "user supplied selector is not applied to internal service",
httpConf: commonv1.HTTPConfig{
Service: commonv1.ServiceTemplate{
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "coordinating-node",
},
},
},
},
wantSvc: func() corev1.Service {
svc := mkHTTPService()
svc.Spec.Type = corev1.ServiceTypeClusterIP
svc.Spec.Ports[0].Name = "https"
svc.Name = "elasticsearch-test-es-internal-http"
return svc
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
es := mkElasticsearch(tc.httpConf)
haveSvc := NewInternalService(es)
compare.JSONEqual(t, tc.wantSvc(), haveSvc)
})
}
}

func mkHTTPService() corev1.Service {
return corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down