diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 78055dad..88948939 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -24,6 +24,7 @@ type Config struct { BaseBackoffDuration config.Duration `json:"baseBackoffDuration" pflag:"\"100ms\",Determines the base backoff for exponential retries."` MaxBackoffDuration config.Duration `json:"maxBackoffDuration" pflag:"\"30s\",Determines the max backoff for exponential retries."` MaxErrDuration config.Duration `json:"maxErrDuration" pflag:"\"5m\",Determines the max time to wait on errors."` + FlinkJobVertexTimeout config.Duration `json:"flinkJobVertexTimeout" pflag:"\"3m\",Determines the max time to wait on job vertex state turns into RUNNING."` } func GetConfig() *Config { diff --git a/pkg/controller/config/config_flags.go b/pkg/controller/config/config_flags.go index b2dc7c65..7e631bcb 100755 --- a/pkg/controller/config/config_flags.go +++ b/pkg/controller/config/config_flags.go @@ -53,5 +53,6 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "baseBackoffDuration"), "100ms", "Determines the base backoff for exponential retries.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "maxBackoffDuration"), "30s", "Determines the max backoff for exponential retries.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "maxErrDuration"), "5m", "Determines the max time to wait on errors.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "flinkJobVertexTimeout"), "3m", "Determines the max time to wait on job vertex state turns into RUNNING.") return cmdFlags } diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 05899cb1..e1526690 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -756,14 +756,58 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta return statusUnchanged, errors.Errorf("Could not find job %s", s.flinkController.GetLatestJobID(ctx, app)) } - // wait until all vertices have been scheduled and started - allVerticesStarted := true - for _, v := range job.Vertices { - allVerticesStarted = allVerticesStarted && (v.StartTime > 0) + jobStartTime := getJobStartTimeInUTC(job.StartTime) + now := time.Now().UTC() + cfg := config.GetConfig() + logger.Info(ctx, "Job vertex timeout config is ", cfg.FlinkJobVertexTimeout) + flinkJobVertexTimeout := cfg.FlinkJobVertexTimeout + if now.Before(jobStartTime.Add(flinkJobVertexTimeout.Duration)) { + allVerticesRunning, hasFailure, failedVertexIndex := monitorAllVerticesState(job) + + // fail fast + if hasFailure { + s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed", + fmt.Sprintf( + "Vertex %d with name [%s] state is Failed", failedVertexIndex, job.Vertices[failedVertexIndex].Name)) + return s.deployFailed(app) + } + return updateJobAndReturn(ctx, job, s, allVerticesRunning, app, hash) + } + + s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed", + fmt.Sprintf( + "Not all vertice of the Flink job state is Running before timeout %f minutes", cfg.FlinkJobVertexTimeout.Minutes())) + return s.deployFailed(app) + +} + +func monitorAllVerticesState(job *client.FlinkJobOverview) (bool, bool, int) { + allVerticesRunning := true + // wait until all vertices have been scheduled and running + hasFailure := false + failedVertexIndex := -1 + for index, v := range job.Vertices { + if v.Status == client.Failed { + failedVertexIndex = index + hasFailure = true + break + } + allVerticesRunning = allVerticesRunning && (v.StartTime > 0) && v.Status == client.Running } + return allVerticesRunning, hasFailure, failedVertexIndex +} - if job.State == client.Running && allVerticesStarted { +func getJobStartTimeInUTC(startTime int64) time.Time { + jobStartTimeSec := startTime / 1000 + jobStartTimeNSec := startTime % 1000 + return time.Unix(jobStartTimeSec, jobStartTimeNSec).UTC() +} + +func updateJobAndReturn(ctx context.Context, job *client.FlinkJobOverview, s *FlinkStateMachine, allVerticesRunning bool, + app *v1beta1.FlinkApplication, hash string) (bool, error) { + if job.State == client.Running && allVerticesRunning { // Update job status + logger.Info(ctx, "Job and all vertices states are RUNNING.") jobStatus := s.flinkController.GetLatestJobStatus(ctx, app) jobStatus.JarName = app.Spec.JarName jobStatus.Parallelism = app.Spec.Parallelism @@ -782,7 +826,6 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning) return statusChanged, nil } - return statusUnchanged, nil } diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 2752c3dc..4e962ea7 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -3,6 +3,7 @@ package flinkapplication import ( "context" "errors" + "testing" "time" @@ -14,8 +15,10 @@ import ( "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" "github.com/lyft/flinkk8soperator/pkg/controller/common" + config2 "github.com/lyft/flinkk8soperator/pkg/controller/config" "github.com/lyft/flinkk8soperator/pkg/controller/flink/mock" k8mock "github.com/lyft/flinkk8soperator/pkg/controller/k8/mock" + "github.com/lyft/flytestdlib/config" mockScope "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/promutils/labeled" "github.com/stretchr/testify/assert" @@ -424,17 +427,33 @@ func TestSubmittingToRunning(t *testing.T) { } appHash := flink.HashForApplication(&app) + _ = config2.ConfigSection.SetConfig(&config2.Config{ + FlinkJobVertexTimeout: config.Duration{Duration: 3 * time.Minute}, + }) stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) { return true, nil } - + mockStartTime := time.Now().Add(-1 * time.Minute).UTC().UnixMilli() mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { assert.Equal(t, appHash, hash) return &client.FlinkJobOverview{ - JobID: jobID, - State: client.Running, + JobID: jobID, + State: client.Running, + StartTime: mockStartTime, + Vertices: []client.FlinkJobVertex{ + { + Name: "Vertex 1", + Status: client.Running, + StartTime: mockStartTime, + }, + { + Name: "Vertex 2", + Status: client.Running, + StartTime: mockStartTime, + }, + }, }, nil } @@ -558,6 +577,333 @@ func TestSubmittingToRunning(t *testing.T) { assert.Equal(t, 2, statusUpdateCount) } +func TestSubmittingVertexFailsToStart(t *testing.T) { + jobID := "j1" + + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationSubmittingJob, + DeployHash: "old-hash", + }, + } + appHash := flink.HashForApplication(&app) + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) { + return true, nil + } + _ = config2.ConfigSection.SetConfig(&config2.Config{ + FlinkJobVertexTimeout: config.Duration{Duration: 3 * time.Minute}, + }) + + mockStartTime := time.Now().Add(-1 * time.Minute).UTC().UnixMilli() + mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { + assert.Equal(t, appHash, hash) + return &client.FlinkJobOverview{ + JobID: jobID, + State: client.Running, + StartTime: mockStartTime, + Vertices: []client.FlinkJobVertex{ + { + Name: "Vertex 1", + Status: client.Running, + StartTime: mockStartTime, + }, + { + Name: "Vertex 2", + Status: client.Failed, + StartTime: mockStartTime, + }, + }, + }, nil + } + + startCount := 0 + mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { + + assert.Equal(t, appHash, hash) + assert.Equal(t, app.Spec.JarName, jarName) + assert.Equal(t, app.Spec.Parallelism, parallelism) + assert.Equal(t, app.Spec.EntryClass, entryClass) + assert.Equal(t, app.Spec.ProgramArgs, programArgs) + assert.Equal(t, app.Spec.AllowNonRestoredState, allowNonRestoredState) + assert.Equal(t, app.Status.SavepointPath, savepointPath) + + startCount++ + return jobID, nil + } + + mockFlinkController.GetJobsForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error) { + assert.Equal(t, appHash, hash) + if startCount > 0 { + return []client.FlinkJob{ + { + JobID: jobID, + Status: client.Running, + }, + }, nil + } + return nil, nil + } + + podSelector := "wc7ydhum" + + mockFlinkController.GetDeploymentsForHashFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (deployment *common.FlinkDeployment, err error) { + jm := appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"pod-deployment-selector": podSelector}, + }, + }, + } + + tm := appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"pod-deployment-selector": podSelector}, + }, + }, + } + + return &common.FlinkDeployment{ + Jobmanager: &jm, + Taskmanager: &tm, + Hash: hash, + }, nil + } + + mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) + + getServiceCount := 0 + mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string, version string) (*v1.Service, error) { + assert.Equal(t, "flink", namespace) + assert.Equal(t, "test-app", name) + + getServiceCount++ + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "pod-deployment-selector": "blah", + }, + }, + }, nil + } + + updateCount := 0 + mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + if updateCount == 0 { + // update to the service + service := object.(*v1.Service) + assert.Equal(t, podSelector, service.Spec.Selector["pod-deployment-selector"]) + } else if updateCount == 1 { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, jobFinalizer, application.Finalizers[0]) + } + + updateCount++ + return nil + } + + statusUpdateCount := 0 + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { + if statusUpdateCount == 0 { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) + } else if statusUpdateCount == 1 { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + } + statusUpdateCount++ + return nil + } + + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + + assert.Equal(t, 1, startCount) + assert.Equal(t, 3, updateCount) + assert.Equal(t, 2, statusUpdateCount) +} + +func TestSubmittingVertexStartTimeout(t *testing.T) { + jobID := "j1" + + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationSubmittingJob, + DeployHash: "old-hash", + }, + } + appHash := flink.HashForApplication(&app) + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) { + return true, nil + } + _ = config2.ConfigSection.SetConfig(&config2.Config{ + FlinkJobVertexTimeout: config.Duration{Duration: 3 * time.Minute}, + }) + + mockStartTime := time.Now().Add(-4 * time.Minute).UTC().UnixMilli() + mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { + assert.Equal(t, appHash, hash) + return &client.FlinkJobOverview{ + JobID: jobID, + State: client.Running, + StartTime: mockStartTime, + Vertices: []client.FlinkJobVertex{ + { + Name: "Vertex 1", + Status: client.Running, + StartTime: mockStartTime, + }, + }, + }, nil + } + + startCount := 0 + mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { + + assert.Equal(t, appHash, hash) + assert.Equal(t, app.Spec.JarName, jarName) + assert.Equal(t, app.Spec.Parallelism, parallelism) + assert.Equal(t, app.Spec.EntryClass, entryClass) + assert.Equal(t, app.Spec.ProgramArgs, programArgs) + assert.Equal(t, app.Spec.AllowNonRestoredState, allowNonRestoredState) + assert.Equal(t, app.Status.SavepointPath, savepointPath) + + startCount++ + return jobID, nil + } + + mockFlinkController.GetJobsForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error) { + assert.Equal(t, appHash, hash) + if startCount > 0 { + return []client.FlinkJob{ + { + JobID: jobID, + Status: client.Running, + }, + }, nil + } + return nil, nil + } + + podSelector := "wc7ydhun" + + mockFlinkController.GetDeploymentsForHashFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (deployment *common.FlinkDeployment, err error) { + jm := appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"pod-deployment-selector": podSelector}, + }, + }, + } + + tm := appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"pod-deployment-selector": podSelector}, + }, + }, + } + + return &common.FlinkDeployment{ + Jobmanager: &jm, + Taskmanager: &tm, + Hash: hash, + }, nil + } + + mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) + + getServiceCount := 0 + mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string, version string) (*v1.Service, error) { + assert.Equal(t, "flink", namespace) + assert.Equal(t, "test-app", name) + + getServiceCount++ + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "pod-deployment-selector": "blah", + }, + }, + }, nil + } + + updateCount := 0 + mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + if updateCount == 0 { + // update to the service + service := object.(*v1.Service) + assert.Equal(t, podSelector, service.Spec.Selector["pod-deployment-selector"]) + } else if updateCount == 1 { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, jobFinalizer, application.Finalizers[0]) + } + + updateCount++ + return nil + } + + statusUpdateCount := 0 + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { + if statusUpdateCount == 0 { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) + } else if statusUpdateCount == 1 { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + } + statusUpdateCount++ + return nil + } + + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + + assert.Equal(t, 1, startCount) + assert.Equal(t, 3, updateCount) + assert.Equal(t, 2, statusUpdateCount) +} + func TestHandleNilDeployments(t *testing.T) { jobID := "j1" @@ -1776,10 +2122,24 @@ func TestRunningToDualRunning(t *testing.T) { stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockStartTime := time.Now().Add(-1 * time.Minute).UTC().UnixMilli() mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { return &client.FlinkJobOverview{ - JobID: "jobID2", - State: client.Running, + JobID: "jobID2", + State: client.Running, + StartTime: mockStartTime, + Vertices: []client.FlinkJobVertex{ + { + Name: "Vertex 1", + Status: client.Running, + StartTime: mockStartTime, + }, + { + Name: "Vertex 2", + Status: client.Running, + StartTime: mockStartTime, + }, + }, }, nil }