Skip to content

Support configurable pre-stop command for containers #2403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 2, 2021
6 changes: 6 additions & 0 deletions docs/workloads/async/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
period_seconds: <int> # how often (in seconds) to perform the probe (default: 10)
success_threshold: <int> # minimum consecutive successes for the probe to be considered successful after having failed (default: 1)
failure_threshold: <int> # minimum consecutive failures for the probe to be considered failed after having succeeded (default: 3)
pre_stop: # a pre-stop lifecycle hook for the container; will be executed before container termination (optional)
http_get: # specifies an http endpoint to send a request to (only one of http_get, tcp_socket, and exec may be specified)
port: <int|string> # the port to access on the container (required)
path: <string> # the path to access on the HTTP server (default: /)
exec: # specifies a command to run (only one of http_get, tcp_socket, and exec may be specified)
command: <list[string]> # the command to execute inside the container, which is exec'd (not run inside a shell); the working directory is root ('/') in the container's filesystem (required)
autoscaling: # autoscaling configuration (default: see below)
min_replicas: <int> # minimum number of replicas (default: 1; min value: 0)
max_replicas: <int> # maximum number of replicas (default: 100)
Expand Down
6 changes: 6 additions & 0 deletions docs/workloads/realtime/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
period_seconds: <int> # how often (in seconds) to perform the probe (default: 10)
success_threshold: <int> # minimum consecutive successes for the probe to be considered successful after having failed (default: 1)
failure_threshold: <int> # minimum consecutive failures for the probe to be considered failed after having succeeded (default: 3)
pre_stop: # a pre-stop lifecycle hook for the container; will be executed before container termination (optional)
http_get: # specifies an http endpoint to send a request to (only one of http_get, tcp_socket, and exec may be specified)
port: <int|string> # the port to access on the container (required)
path: <string> # the path to access on the HTTP server (default: /)
exec: # specifies a command to run (only one of http_get, tcp_socket, and exec may be specified)
command: <list[string]> # the command to execute inside the container, which is exec'd (not run inside a shell); the working directory is root ('/') in the container's filesystem (required)
autoscaling: # autoscaling configuration (default: see below)
min_replicas: <int> # minimum number of replicas (default: 1)
max_replicas: <int> # maximum number of replicas (default: 100)
Expand Down
69 changes: 55 additions & 14 deletions pkg/types/spec/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ func containersValidation(kind userconfig.Kind) *cr.StructFieldValidation {
validations = append(validations, probeValidation("ReadinessProbe", false))
}

if kind == userconfig.RealtimeAPIKind || kind == userconfig.AsyncAPIKind {
validations = append(validations, preStopValidation())
}

return &cr.StructFieldValidation{
StructField: "Containers",
StructListValidation: &cr.StructListValidation{
Expand Down Expand Up @@ -306,8 +310,8 @@ func networkingValidation() *cr.StructFieldValidation {

func probeValidation(structFieldName string, hasExecProbe bool) *cr.StructFieldValidation {
validations := []*cr.StructFieldValidation{
httpGetProbeValidation(),
tcpSocketProbeValidation(),
httpGetHandlerValidation(),
tcpSocketHandlerValidation(),
{
StructField: "InitialDelaySeconds",
Int32Validation: &cr.Int32Validation{
Expand Down Expand Up @@ -346,7 +350,7 @@ func probeValidation(structFieldName string, hasExecProbe bool) *cr.StructFieldV
}

if hasExecProbe {
validations = append(validations, execProbeValidation())
validations = append(validations, execHandlerValidation())
}

return &cr.StructFieldValidation{
Expand All @@ -360,7 +364,22 @@ func probeValidation(structFieldName string, hasExecProbe bool) *cr.StructFieldV
}
}

func httpGetProbeValidation() *cr.StructFieldValidation {
func preStopValidation() *cr.StructFieldValidation {
return &cr.StructFieldValidation{
StructField: "PreStop",
StructValidation: &cr.StructValidation{
Required: false,
AllowExplicitNull: true,
DefaultNil: true,
StructFieldValidations: []*cr.StructFieldValidation{
httpGetHandlerValidation(),
execHandlerValidation(),
},
},
}
}

func httpGetHandlerValidation() *cr.StructFieldValidation {
return &cr.StructFieldValidation{
StructField: "HTTPGet",
StructValidation: &cr.StructValidation{
Expand Down Expand Up @@ -390,7 +409,7 @@ func httpGetProbeValidation() *cr.StructFieldValidation {
}
}

func tcpSocketProbeValidation() *cr.StructFieldValidation {
func tcpSocketHandlerValidation() *cr.StructFieldValidation {
return &cr.StructFieldValidation{
StructField: "TCPSocket",
StructValidation: &cr.StructValidation{
Expand All @@ -412,7 +431,7 @@ func tcpSocketProbeValidation() *cr.StructFieldValidation {
}
}

func execProbeValidation() *cr.StructFieldValidation {
func execHandlerValidation() *cr.StructFieldValidation {
return &cr.StructFieldValidation{
StructField: "Exec",
StructValidation: &cr.StructValidation{
Expand Down Expand Up @@ -783,6 +802,12 @@ func validateContainers(
}
}

if container.PreStop != nil {
if err := validatePreStop(*container.PreStop); err != nil {
return errors.Wrap(err, s.Index(i), userconfig.PreStopKey)
}
}

compute := container.Compute
if compute.Shm != nil && compute.Mem != nil && compute.Shm.Cmp(compute.Mem.Quantity) > 0 {
return errors.Wrap(ErrorShmCannotExceedMem(*compute.Shm, *compute.Mem), s.Index(i), userconfig.ComputeKey)
Expand All @@ -794,23 +819,39 @@ func validateContainers(
}

func validateProbe(probe userconfig.Probe, supportsExecProbe bool) error {
numSpecifiedProbes := 0
numSpecifiedHandlers := 0
if probe.HTTPGet != nil {
numSpecifiedProbes++
numSpecifiedHandlers++
}
if probe.TCPSocket != nil {
numSpecifiedProbes++
numSpecifiedHandlers++
}
if probe.Exec != nil {
numSpecifiedProbes++
numSpecifiedHandlers++
}

if numSpecifiedProbes != 1 {
validProbes := []string{userconfig.HTTPGetKey, userconfig.TCPSocketKey}
if numSpecifiedHandlers != 1 {
validHandlers := []string{userconfig.HTTPGetKey, userconfig.TCPSocketKey}
if supportsExecProbe {
validProbes = append(validProbes, userconfig.ExecKey)
validHandlers = append(validHandlers, userconfig.ExecKey)
}
return ErrorSpecifyExactlyOneField(numSpecifiedProbes, validProbes...)
return ErrorSpecifyExactlyOneField(numSpecifiedHandlers, validHandlers...)
}

return nil
}

func validatePreStop(preStop userconfig.PreStop) error {
numSpecifiedHandlers := 0
if preStop.HTTPGet != nil {
numSpecifiedHandlers++
}
if preStop.Exec != nil {
numSpecifiedHandlers++
}

if numSpecifiedHandlers != 1 {
return ErrorSpecifyExactlyOneField(numSpecifiedHandlers, userconfig.HTTPGetKey, userconfig.ExecKey)
}

return nil
Expand Down
71 changes: 51 additions & 20 deletions pkg/types/userconfig/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ type Container struct {
Command []string `json:"command" yaml:"command"`
Args []string `json:"args" yaml:"args"`

ReadinessProbe *Probe `json:"readiness_probe" yaml:"readiness_probe"`
LivenessProbe *Probe `json:"liveness_probe" yaml:"liveness_probe"`
ReadinessProbe *Probe `json:"readiness_probe" yaml:"readiness_probe"`
LivenessProbe *Probe `json:"liveness_probe" yaml:"liveness_probe"`
PreStop *PreStop `json:"pre_stop" yaml:"pre_stop"`

Compute *Compute `json:"compute" yaml:"compute"`
}
Expand All @@ -78,26 +79,31 @@ type Networking struct {
}

type Probe struct {
HTTPGet *HTTPGetProbe `json:"http_get" yaml:"http_get"`
TCPSocket *TCPSocketProbe `json:"tcp_socket" yaml:"tcp_socket"`
Exec *ExecProbe `json:"exec" yaml:"exec"`
InitialDelaySeconds int32 `json:"initial_delay_seconds" yaml:"initial_delay_seconds"`
TimeoutSeconds int32 `json:"timeout_seconds" yaml:"timeout_seconds"`
PeriodSeconds int32 `json:"period_seconds" yaml:"period_seconds"`
SuccessThreshold int32 `json:"success_threshold" yaml:"success_threshold"`
FailureThreshold int32 `json:"failure_threshold" yaml:"failure_threshold"`
HTTPGet *HTTPGetHandler `json:"http_get" yaml:"http_get"`
TCPSocket *TCPSocketHandler `json:"tcp_socket" yaml:"tcp_socket"`
Exec *ExecHandler `json:"exec" yaml:"exec"`
InitialDelaySeconds int32 `json:"initial_delay_seconds" yaml:"initial_delay_seconds"`
TimeoutSeconds int32 `json:"timeout_seconds" yaml:"timeout_seconds"`
PeriodSeconds int32 `json:"period_seconds" yaml:"period_seconds"`
SuccessThreshold int32 `json:"success_threshold" yaml:"success_threshold"`
FailureThreshold int32 `json:"failure_threshold" yaml:"failure_threshold"`
}

type HTTPGetProbe struct {
type PreStop struct {
HTTPGet *HTTPGetHandler `json:"http_get" yaml:"http_get"`
Exec *ExecHandler `json:"exec" yaml:"exec"`
}

type HTTPGetHandler struct {
Path string `json:"path" yaml:"path"`
Port int32 `json:"port" yaml:"port"`
}

type TCPSocketProbe struct {
type TCPSocketHandler struct {
Port int32 `json:"port" yaml:"port"`
}

type ExecProbe struct {
type ExecHandler struct {
Command []string `json:"command" yaml:"command"`
}

Expand Down Expand Up @@ -387,6 +393,11 @@ func (container *Container) UserStr() string {
sb.WriteString(s.Indent(container.LivenessProbe.UserStr(), " "))
}

if container.PreStop != nil {
sb.WriteString(fmt.Sprintf("%s:\n", PreStopKey))
sb.WriteString(s.Indent(container.PreStop.UserStr(), " "))
}

if container.Compute != nil {
sb.WriteString(fmt.Sprintf("%s:\n", ComputeKey))
sb.WriteString(s.Indent(container.Compute.UserStr(), " "))
Expand Down Expand Up @@ -428,24 +439,39 @@ func (probe *Probe) UserStr() string {
return sb.String()
}

func (httpProbe *HTTPGetProbe) UserStr() string {
func (preStop *PreStop) UserStr() string {
var sb strings.Builder

sb.WriteString(fmt.Sprintf("%s: %s\n", PathKey, httpProbe.Path))
sb.WriteString(fmt.Sprintf("%s: %d\n", PortKey, httpProbe.Port))
if preStop.HTTPGet != nil {
sb.WriteString(fmt.Sprintf("%s:\n", HTTPGetKey))
sb.WriteString(s.Indent(preStop.HTTPGet.UserStr(), " "))
}
if preStop.Exec != nil {
sb.WriteString(fmt.Sprintf("%s:\n", ExecKey))
sb.WriteString(s.Indent(preStop.Exec.UserStr(), " "))
}

return sb.String()
}

func (tcpSocketProbe *TCPSocketProbe) UserStr() string {
func (httpHandler *HTTPGetHandler) UserStr() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%s: %d\n", PortKey, tcpSocketProbe.Port))

sb.WriteString(fmt.Sprintf("%s: %s\n", PathKey, httpHandler.Path))
sb.WriteString(fmt.Sprintf("%s: %d\n", PortKey, httpHandler.Port))

return sb.String()
}

func (execProbe *ExecProbe) UserStr() string {
func (tcpSocketHandler *TCPSocketHandler) UserStr() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%s: %s\n", CommandKey, s.ObjFlatNoQuotes(execProbe.Command)))
sb.WriteString(fmt.Sprintf("%s: %d\n", PortKey, tcpSocketHandler.Port))
return sb.String()
}

func (execHandler *ExecHandler) UserStr() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%s: %s\n", CommandKey, s.ObjFlatNoQuotes(execHandler.Command)))
return sb.String()
}

Expand Down Expand Up @@ -589,17 +615,22 @@ func (api *API) TelemetryEvent() map[string]interface{} {

var numReadinessProbes int
var numLivenessProbes int
var numPreStops int
for _, container := range api.Pod.Containers {
if container.ReadinessProbe != nil {
numReadinessProbes++
}
if container.LivenessProbe != nil {
numLivenessProbes++
}
if container.PreStop != nil {
numPreStops++
}
}

event["pod.containers._num_readiness_probes"] = numReadinessProbes
event["pod.containers._num_liveness_probes"] = numLivenessProbes
event["pod.containers._num_pre_stops"] = numPreStops

totalCompute := GetPodComputeRequest(api)
if totalCompute.CPU != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/types/userconfig/config_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
ArgsKey = "args"
ReadinessProbeKey = "readiness_probe"
LivenessProbeKey = "liveness_probe"
PreStopKey = "pre_stop"

// Probe
HTTPGetKey = "http_get"
Expand Down
30 changes: 30 additions & 0 deletions pkg/workloads/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,36 @@ func GetProbeSpec(probe *userconfig.Probe) *kcore.Probe {
}
}

func GetLifecycleSpec(preStop *userconfig.PreStop) *kcore.Lifecycle {
if preStop == nil {
return nil
}

var httpGetAction *kcore.HTTPGetAction
var execAction *kcore.ExecAction

if preStop.HTTPGet != nil {
httpGetAction = &kcore.HTTPGetAction{
Path: strings.TrimPrefix(preStop.HTTPGet.Path, "/"), // the leading / is automatically added by k8s
Port: intstr.IntOrString{
IntVal: preStop.HTTPGet.Port,
},
}
}
if preStop.Exec != nil {
execAction = &kcore.ExecAction{
Command: preStop.Exec.Command,
}
}

return &kcore.Lifecycle{
PreStop: &kcore.Handler{
HTTPGet: httpGetAction,
Exec: execAction,
},
}
}

func GetReadinessProbesFromContainers(containers []*userconfig.Container) map[string]kcore.Probe {
probes := map[string]kcore.Probe{}

Expand Down
1 change: 1 addition & 0 deletions pkg/workloads/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func userPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) {
VolumeMounts: containerMounts,
LivenessProbe: GetProbeSpec(container.LivenessProbe),
ReadinessProbe: readinessProbe,
Lifecycle: GetLifecycleSpec(container.PreStop),
Resources: kcore.ResourceRequirements{
Requests: containerResourceList,
Limits: containerResourceLimitsList,
Expand Down