Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call _nodes/shutdown from pre-stop hook #6544

Merged
merged 18 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/operating-eck/upgrading-eck.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ This will update the ECK installation to the latest binary and update the CRDs a

Upgrading the operator results in a one-time update to existing managed resources in the cluster. This potentially triggers a rolling restart of pods by Kubernetes to apply those changes. The following list contains the ECK operator versions that would cause a rolling restart after they have been installed.

1.6, 1.9, 2.0, 2.1, 2.2, 2.4, 2.5
1.6, 1.9, 2.0, 2.1, 2.2, 2.4, 2.5, 2.8

If you have a very large Elasticsearch cluster or multiple Elastic Stack deployments, this rolling restart might be disruptive or inconvenient. To have more control over when the pods belonging to a particular deployment should be restarted, you can <<{p}-exclude-resource,add an annotation>> to the corresponding resources to temporarily exclude them from being managed by the operator. When the time is convenient, you can remove the annotation and let the rolling restart go through.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,10 @@ spec:
- name: PRE_STOP_ADDITIONAL_WAIT_SECONDS
value: "5"
----

The pre-stop lifecycle hook also tries to gracefully shut down the Elasticsearch node in case of a termination that is not caused by the ECK operator. Examples of such terminations could be Kubernetes node maintenance or a Kubernetes upgrade. In these cases the script will try to interact with the Elasticsearch API to notify Elasticsearch of the impending termination of the node. The intent is to avoid relocation and recovery of shards while the Elasticsearch node is only temporarily unavailable.

This is done on a best effort basis. In particular requests to an Elasticsearch cluster already in the process of shutting down might fail if the Kubernetes service has already been removed.
The script allows for `PRE_STOP_MAX_DNS_ERRORS` which default to 2 before giving up.

When using local persistent volumes a different behaviour might be desirable because the Elasticsearch node's associated storage will not be available anymore on the new Kubernetes node. `PRE_STOP_SHUTDOWN_TYPE` allows to override the default shutdown type to one of the link:https://www.elastic.co/guide/en/elasticsearch/reference/current/put-shutdown.html[possible values]. Please be aware that setting it to anything other than `restart` might mean that the pre-stop hook will run longer than `terminationGracePeriodSeconds` of the Pod while moving data out of the terminating Pod and will not be able to complete unless you also adjust that value in the `podTemplate`.
8 changes: 7 additions & 1 deletion pkg/controller/elasticsearch/configmap/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/initcontainer"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/nodespec"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/services"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s"
)

Expand All @@ -43,11 +44,16 @@ func ReconcileScriptsConfigMap(ctx context.Context, c k8s.Client, es esv1.Elasti
return err
}

preStopScript, err := nodespec.RenderPreStopHookScript(services.InternalServiceURL(es))
if err != nil {
return err
}

scriptsConfigMap := NewConfigMapWithData(
types.NamespacedName{Namespace: es.Namespace, Name: esv1.ScriptsConfigMap(es.Name)},
map[string]string{
nodespec.ReadinessProbeScriptConfigKey: nodespec.ReadinessProbeScript,
nodespec.PreStopHookScriptConfigKey: nodespec.PreStopHookScript,
nodespec.PreStopHookScriptConfigKey: preStopScript,
initcontainer.PrepareFsScriptConfigKey: fsScript,
initcontainer.SuspendScriptConfigKey: initcontainer.SuspendScript,
initcontainer.SuspendedHostsFile: initcontainer.RenderSuspendConfiguration(es),
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/elasticsearch/driver/downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func HandleDownscale(
// initiate shutdown of nodes that should be removed
// if leaving nodes is empty this should cancel any ongoing shutdowns
leavingNodes := leavingNodeNames(downscales)
if err := downscaleCtx.nodeShutdown.ReconcileShutdowns(downscaleCtx.parentCtx, leavingNodes); err != nil {
terminatingNodes := k8s.PodNames(k8s.TerminatingPods(actualPods))
if err := downscaleCtx.nodeShutdown.ReconcileShutdowns(downscaleCtx.parentCtx, leavingNodes, terminatingNodes); err != nil {
return results.WithError(err)
}

Expand Down
41 changes: 33 additions & 8 deletions pkg/controller/elasticsearch/driver/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ func podsToUpgrade(
return toUpgrade, nil
}

func terminatingPodNames(client k8s.Client, statefulSets sset.StatefulSetList) ([]string, error) {
pods, err := statefulSets.GetActualPods(client)
if err != nil {
return nil, err
}
return k8s.PodNames(k8s.TerminatingPods(pods)), nil
}

func doFlush(ctx context.Context, es esv1.Elasticsearch, esClient esclient.Client) error {
log := ulog.FromContext(ctx)
targetEsVersion, err := version.Parse(es.Spec.Version)
Expand Down Expand Up @@ -319,17 +327,31 @@ func (d *defaultDriver) maybeCompleteNodeUpgrades(
reason := fmt.Sprintf("Completing node upgrade: %s", reason)
return results.WithReconciliationState(defaultRequeue.WithReason(reason))
}

statefulSets, err := sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&d.ES))
if err != nil {
return results.WithError(err)
}
// Also make sure that when cleaning up node shutdowns we don't remove shutdown records for terminating Pods.
// The expectation mechanism covers planned spec changes for Pods. However, Pods might also be deleted due to external factors
// like Kubernetes node upgrades or manual admin intervention. We orchestrate node shutdown in these cases via a pre-stop hook
// and don't want to interrupt that process until it completes. This a best-effort attempt as observation of shutdown records in
// Elasticsearch and Pod deletion might not be in sync due to cache lag.
terminating, err := terminatingPodNames(d.Client, statefulSets)
if err != nil {
return results.WithError(err)
}

// once expectations are satisfied we can already delete shutdowns that are complete and where the node
// is back in the cluster to avoid completed shutdowns from accumulating and affecting node availability calculations
// in Elasticsearch for example for indices with `auto_expand_replicas` setting.
if supportsNodeShutdown(esClient.Version()) {
// clear all shutdowns of type restart that have completed
results = results.WithError(nodeShutdown.Clear(ctx, esclient.ShutdownComplete.Applies, nodeShutdown.OnlyNodesInCluster))
}

statefulSets, err := sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&d.ES))
if err != nil {
return results.WithError(err)
results = results.WithError(nodeShutdown.Clear(ctx,
esclient.ShutdownComplete.Applies,
nodeShutdown.OnlyNodesInCluster,
nodeShutdown.OnlyNonTerminatingNodes(terminating),
))
}

// Make sure all nodes scheduled for upgrade are back into the cluster.
Expand All @@ -352,7 +374,10 @@ func (d *defaultDriver) maybeCompleteNodeUpgrades(
if supportsNodeShutdown(esClient.Version()) {
// clear all shutdowns of type restart that have completed including those where the node is no longer in the cluster
// or node state was lost due to an external event
results = results.WithError(nodeShutdown.Clear(ctx, esclient.ShutdownComplete.Applies))
results = results.WithError(nodeShutdown.Clear(ctx,
esclient.ShutdownComplete.Applies,
nodeShutdown.OnlyNonTerminatingNodes(terminating),
))
}
return results
}
Expand Down Expand Up @@ -414,7 +439,7 @@ func (ctx *upgradeCtx) requestNodeRestarts(podsToRestart []corev1.Pod) error {
}
// Note that ReconcileShutdowns would cancel ongoing shutdowns when called with no podNames
// this is however not the case in the rolling upgrade logic where we exit early if no pod needs to be rotated.
return ctx.nodeShutdown.ReconcileShutdowns(ctx.parentCtx, podNames)
return ctx.nodeShutdown.ReconcileShutdowns(ctx.parentCtx, podNames, k8s.PodNames(k8s.TerminatingPods(ctx.currentPods)))
}

func (ctx *upgradeCtx) prepareClusterForNodeRestart(podsToUpgrade []corev1.Pod) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/elasticsearch/migration/migrate_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewShardMigration(es esv1.Elasticsearch, c esclient.Client, s esclient.Shar
}

// ReconcileShutdowns migrates data away from the leaving nodes or removes any allocation filtering if no nodes are leaving.
func (sm *ShardMigration) ReconcileShutdowns(ctx context.Context, leavingNodes []string) error {
func (sm *ShardMigration) ReconcileShutdowns(ctx context.Context, leavingNodes, _ []string) error {
return migrateData(ctx, sm.es, sm.c, leavingNodes)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/elasticsearch/nodespec/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
func DefaultEnvVars(httpCfg commonv1.HTTPConfig, headlessServiceName string) []corev1.EnvVar {
return defaults.ExtendPodDownwardEnvVars(
[]corev1.EnvVar{
{Name: settings.EnvProbePasswordPath, Value: path.Join(esvolume.ProbeUserSecretMountPath, user.ProbeUserName)},
{Name: settings.EnvProbePasswordPath, Value: path.Join(esvolume.PodMountedUsersSecretMountPath, user.ProbeUserName)},
{Name: settings.EnvProbeUsername, Value: user.ProbeUserName},
{Name: settings.EnvReadinessProbeProtocol, Value: httpCfg.Protocol()},
{Name: settings.HeadlessServiceName, Value: headlessServiceName},
Expand Down
187 changes: 183 additions & 4 deletions pkg/controller/elasticsearch/nodespec/lifecycle_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
package nodespec

import (
"bytes"
"path"
"path/filepath"
"text/template"

v1 "k8s.io/api/core/v1"

"github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/user"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/volume"
)

Expand All @@ -20,9 +25,10 @@ func NewPreStopHook() *v1.LifecycleHandler {
}

const PreStopHookScriptConfigKey = "pre-stop-hook-script.sh"
const PreStopHookScript = `#!/usr/bin/env bash

set -euo pipefail
var preStopHookScriptTemplate = template.Must(template.New("pre-stop").Parse(`#!/usr/bin/env bash

set -uo pipefail

# This script will wait for up to $PRE_STOP_ADDITIONAL_WAIT_SECONDS before allowing termination of the Pod
# This slows down the process shutdown and allows to make changes to the pool gracefully, without blackholing traffic when DNS
Expand All @@ -36,5 +42,178 @@ set -euo pipefail
# target the Pod IP before Elasticsearch stops.
PRE_STOP_ADDITIONAL_WAIT_SECONDS=${PRE_STOP_ADDITIONAL_WAIT_SECONDS:=50}

sleep $PRE_STOP_ADDITIONAL_WAIT_SECONDS
`
# PRE_STOP_SHUTDOWN_TYPE controls the type of shutdown that will be communicated to Elasticsearch. This should not be
# changed to anything but restart. Specifically setting remove can lead to extensive data migration that might exceed the
# terminationGracePeriodSeconds and lead to an incomplete shutdown.
shutdown_type=${PRE_STOP_SHUTDOWN_TYPE:=restart}

# capture response bodies in a temp file for better error messages and to extract necessary information for subsequent requests
resp_body=$(mktemp)
# shellcheck disable=SC2064
trap "rm -f $resp_body" EXIT

script_start=$(date +%s)

# compute time in seconds since the given start time
function duration() {
local start=$1
end=$(date +%s)
echo $((end-start))
}

# use DNS errors as a proxy to abort this script early if there is no chance of successful completion
# DNS errors are for example expected when the whole cluster including its service is being deleted
# and the service URL can no longer be resolved even though we still have running Pods.
max_dns_errors=${PRE_STOP_MAX_DNS_ERRORS:=2}
global_dns_error_cnt=0

function request() {
local status exit
status=$(curl -k -sS -o "$resp_body" -w "%{http_code}" "$@")
exit=$?
if [ "$exit" -ne 0 ] || [ "$status" -lt 200 ] || [ "$status" -gt 299 ]; then
# track curl DNS errors separately
if [ "$exit" -eq 6 ]; then ((global_dns_error_cnt++)); fi
# make sure we have a non-zero exit code in the presence of errors
if [ "$exit" -eq 0 ]; then exit=1; fi
log "$status" "$3" #by convention the third arg contains the URL
return $exit
fi
global_dns_error_cnt=0
return 0
}

function retry() {
local retries=$1
shift

local count=0
until "$@"; do
exit=$?
wait=$((2 ** count))
count=$((count + 1))
if [ $global_dns_error_cnt -gt "$max_dns_errors" ]; then
error_exit "too many DNS errors, giving up"
fi
if [ $count -lt "$retries" ]; then
log "retry $count/$retries exited $exit, retrying in $wait seconds"
sleep $wait
else
log "retry $count/$retries exited $exit, no more retries left"
return $exit
fi
done
return 0
}

function log() {
local timestamp
timestamp=$(date --iso-8601=seconds)
echo "{\"@timestamp\": \"${timestamp}\", \"message\": \"$*\", \"ecs.version\": \"1.2.0\", \"event.dataset\": \"elasticsearch.pre-stop-hook\"}" | tee /proc/1/fd/2 2> /dev/null
}

function error_exit() {
log "$@"
delayed_exit 1
}

function delayed_exit() {
local elapsed
elapsed=$(duration "$script_start")
local remaining=$((PRE_STOP_ADDITIONAL_WAIT_SECONDS - elapsed))
log "delaying termination for $remaining seconds"
sleep $remaining
exit ${1-0}
}

function supports_node_shutdown() {
local version="$1"
version=${version#[vV]}
major="${version%%\.*}"
minor="${version#*.}"
minor="${minor%.*}"
patch="${version##*.}"
# node shutdown is supported as of 7.15.2
if [ "$major" -lt 7 ] || { [ "$major" -eq 7 ] && [ "$minor" -eq 15 ] && [ "$patch" -lt 2 ]; }; then
return 1
fi
return 0
}

version=""
if [[ -f "{{.LabelsFile}}" ]]; then
# get Elasticsearch version from the downward API
version=$(grep "{{.VersionLabelName}}" {{.LabelsFile}} | cut -d '=' -f 2)
# remove quotes
version=$(echo "${version}" | tr -d '"')
fi

# if ES version does not support node shutdown exit early
if ! supports_node_shutdown "$version"; then
delayed_exit
fi

# setup basic auth if credentials are available
if [ -f "{{.PreStopUserPasswordPath}}" ]; then
PROBE_PASSWORD=$(<{{.PreStopUserPasswordPath}})
BASIC_AUTH="-u {{.PreStopUserName}}:${PROBE_PASSWORD}"
else
BASIC_AUTH=''
fi

ES_URL={{.ServiceURL}}

log "retrieving node ID"
retry 10 request -X GET "$ES_URL/_cat/nodes?full_id=true&h=id,name" $BASIC_AUTH
if [ "$?" -ne 0 ]; then
error_exit "failed to retrieve node ID"
fi

NODE_ID=$(grep "$POD_NAME" "$resp_body" | cut -f 1 -d ' ')

# check if there is an ongoing shutdown request
request -X GET $ES_URL/_nodes/"$NODE_ID"/shutdown $BASIC_AUTH
if grep -q -v '"nodes":\[\]' "$resp_body"; then
log "shutdown managed by ECK operator"
delayed_exit
fi

log "initiating node shutdown"
retry 10 request -X PUT $ES_URL/_nodes/"$NODE_ID"/shutdown $BASIC_AUTH -H 'Content-Type: application/json' -d"
pebrc marked this conversation as resolved.
Show resolved Hide resolved
{
\"type\": \"$shutdown_type\",
\"reason\": \"pre-stop hook\"
}
"
if [ "$?" -ne 0 ]; then
error_exit "failed to call node shutdown API"
fi

while :
do
log "waiting for node shutdown to complete"
request -X GET $ES_URL/_nodes/"$NODE_ID"/shutdown $BASIC_AUTH
if [ "$?" -eq 0 ] && grep -q -v 'IN_PROGRESS\|STALLED' "$resp_body"; then
break
fi
sleep 10
done

delayed_exit
`))

func RenderPreStopHookScript(svcURL string) (string, error) {
vars := map[string]string{
"PreStopUserName": user.PreStopUserName,
"PreStopUserPasswordPath": filepath.Join(volume.PodMountedUsersSecretMountPath, user.PreStopUserName),
// edge case: protocol change (http/https) combined with external node shutdown might not work out well due to
// script propagation delays. But it is not a legitimate production use case as users are not expected to change
// protocol on production systems
"ServiceURL": svcURL,
"LabelsFile": filepath.Join(volume.DownwardAPIMountPath, volume.LabelsFile),
"VersionLabelName": label.VersionLabelName,
}
var script bytes.Buffer
err := preStopHookScriptTemplate.Execute(&script, vars)
return script.String(), err
}
2 changes: 1 addition & 1 deletion pkg/controller/elasticsearch/nodespec/podspec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestBuildPodTemplateSpec(t *testing.T) {
initContainerEnv := defaults.ExtendPodDownwardEnvVars(
[]corev1.EnvVar{
{Name: "my-env", Value: "my-value"},
{Name: settings.EnvProbePasswordPath, Value: path.Join(esvolume.ProbeUserSecretMountPath, user.ProbeUserName)},
{Name: settings.EnvProbePasswordPath, Value: path.Join(esvolume.PodMountedUsersSecretMountPath, user.ProbeUserName)},
{Name: settings.EnvProbeUsername, Value: user.ProbeUserName},
{Name: settings.EnvReadinessProbeProtocol, Value: sampleES.Spec.HTTP.Protocol()},
{Name: settings.HeadlessServiceName, Value: HeadlessServiceName(esv1.StatefulSet(sampleES.Name, nodeSet.Name))},
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/elasticsearch/nodespec/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func buildVolumes(
configVolume := settings.ConfigSecretVolume(esv1.StatefulSet(esName, nodeSpec.Name))
probeSecret := volume.NewSelectiveSecretVolumeWithMountPath(
esv1.InternalUsersSecret(esName), esvolume.ProbeUserVolumeName,
esvolume.ProbeUserSecretMountPath, []string{user.ProbeUserName},
esvolume.PodMountedUsersSecretMountPath, []string{user.ProbeUserName, user.PreStopUserName},
)
httpCertificatesVolume := volume.NewSecretVolumeWithMountPath(
certificates.InternalCertsSecretName(esv1.ESNamer, esName),
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/elasticsearch/shutdown/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type NodeShutdownStatus struct {
type Interface interface {
// ReconcileShutdowns retrieves ongoing shutdowns and based on the given node names either cancels or creates new
// shutdowns.
ReconcileShutdowns(ctx context.Context, leavingNodes []string) error
ReconcileShutdowns(ctx context.Context, leavingNodes []string, terminatingNodes []string) error
// ShutdownStatus returns the current shutdown status for the given node. It returns an error if no shutdown is in
// progress.
ShutdownStatus(ctx context.Context, podName string) (NodeShutdownStatus, error)
Expand All @@ -48,11 +48,11 @@ type observed struct {
observer Observer
}

func (o *observed) ReconcileShutdowns(ctx context.Context, leavingNodes []string) error {
func (o *observed) ReconcileShutdowns(ctx context.Context, leavingNodes []string, terminatingNodes []string) error {
if o.observer != nil {
o.observer.OnReconcileShutdowns(leavingNodes)
}
return o.Interface.ReconcileShutdowns(ctx, leavingNodes)
return o.Interface.ReconcileShutdowns(ctx, leavingNodes, terminatingNodes)
}

func (o *observed) ShutdownStatus(ctx context.Context, podName string) (NodeShutdownStatus, error) {
Expand Down
Loading