From 56de4b19e365d0ffd9b393ef1865a38f589f956b Mon Sep 17 00:00:00 2001
From: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
Date: Fri, 25 Mar 2022 20:07:41 +0530
Subject: [PATCH] Added execution config changes (#378)

* Added execution config changes

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* lint fixes

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* using executionConfig data during launch

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* resolve conflicts

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* Removed defaults for labels and annotations

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* added more coverage

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* added more coverage

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* Updating idl and lint fixes

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* Adde missing go.sum

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* feedback changes to return immediately if any field is set while overriding

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* using released flyteidl

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* using released flyteidl

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>
---
 go.mod                                        |   2 +-
 go.sum                                        |   4 +-
 pkg/manager/impl/execution_manager.go         | 116 +++++--
 pkg/manager/impl/execution_manager_test.go    | 300 +++++++++++++++++-
 pkg/manager/impl/util/shared.go               |  20 ++
 pkg/manager/impl/util/shared_test.go          |  68 +++-
 pkg/runtime/application_config_provider.go    |   1 +
 .../interfaces/application_configuration.go   |  42 +++
 8 files changed, 513 insertions(+), 40 deletions(-)

diff --git a/go.mod b/go.mod
index 0828db3958..d3e59c405f 100644
--- a/go.mod
+++ b/go.mod
@@ -11,7 +11,7 @@ require (
 	github.com/benbjohnson/clock v1.1.0
 	github.com/coreos/go-oidc v2.2.1+incompatible
 	github.com/evanphx/json-patch v4.9.0+incompatible
-	github.com/flyteorg/flyteidl v0.24.2
+	github.com/flyteorg/flyteidl v0.24.6
 	github.com/flyteorg/flyteplugins v0.10.16
 	github.com/flyteorg/flytepropeller v0.16.36
 	github.com/flyteorg/flytestdlib v0.4.13
diff --git a/go.sum b/go.sum
index 967f3c492d..6d08b9a9a2 100644
--- a/go.sum
+++ b/go.sum
@@ -352,8 +352,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
 github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
 github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
 github.com/flyteorg/flyteidl v0.24.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
-github.com/flyteorg/flyteidl v0.24.2 h1:RQzWmtVQR+NKAppjw7xTsIn6gosP0Q/j58tfF6Cr6h4=
-github.com/flyteorg/flyteidl v0.24.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
+github.com/flyteorg/flyteidl v0.24.6 h1:n2796X9Sw7mNDtXWwsJr84DLQpz8Cptvb7LptfJLxag=
+github.com/flyteorg/flyteidl v0.24.6/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
 github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw=
 github.com/flyteorg/flyteplugins v0.10.16/go.mod h1:YBWV8QnFakDJfLyua8pYddiWqszAqseBKIJPNMERlos=
 github.com/flyteorg/flytepropeller v0.16.36 h1:5uE8JsutrPVyLVrRJ8BgvhZUOmTBFkEkn5xmIOo21nU=
diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go
index 1d895f551e..ead7cf8adb 100644
--- a/pkg/manager/impl/execution_manager.go
+++ b/pkg/manager/impl/execution_manager.go
@@ -428,41 +428,91 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request
 	return parentNodeExecutionID, sourceExecutionID, nil
 }
 
+// WorkflowExecutionConfigInterface is used as common interface for capturing the common behavior catering to the needs
+// of fetching the WorkflowExecutionConfig across LaunchPlanSpec, ExecutionCreateRequest
+// MatchableResource_WORKFLOW_EXECUTION_CONFIG and ApplicationConfig
+type WorkflowExecutionConfigInterface interface {
+	// GetMaxParallelism Can be used to control the number of parallel nodes to run within the workflow. This is useful to achieve fairness.
+	GetMaxParallelism() int32
+	// GetRawOutputDataConfig Encapsulates user settings pertaining to offloaded data (i.e. Blobs, Schema, query data, etc.).
+	GetRawOutputDataConfig() *admin.RawOutputDataConfig
+	// GetSecurityContext Indicates security context permissions for executions triggered with this matchable attribute.
+	GetSecurityContext() *core.SecurityContext
+	// GetAnnotations Custom annotations to be applied to a triggered execution resource.
+	GetAnnotations() *admin.Annotations
+	// GetLabels Custom labels to be applied to a triggered execution resource.
+	GetLabels() *admin.Labels
+}
+
+// Merge into workflowExecConfig from spec and return true if any value has been changed
+func mergeIntoExecConfig(workflowExecConfig *admin.WorkflowExecutionConfig, spec WorkflowExecutionConfigInterface) bool {
+	isChanged := false
+	if workflowExecConfig.GetMaxParallelism() == 0 && spec.GetMaxParallelism() > 0 {
+		workflowExecConfig.MaxParallelism = spec.GetMaxParallelism()
+		isChanged = true
+	}
+	if workflowExecConfig.GetSecurityContext() == nil && spec.GetSecurityContext() != nil {
+		workflowExecConfig.SecurityContext = spec.GetSecurityContext()
+		isChanged = true
+	}
+	// Launchplan spec has label, annotation and rawOutputDataConfig initialized with empty values.
+	// Hence we do a deep check in the following conditions before assignment
+	if (workflowExecConfig.GetRawOutputDataConfig() == nil ||
+		len(workflowExecConfig.GetRawOutputDataConfig().GetOutputLocationPrefix()) == 0) &&
+		(spec.GetRawOutputDataConfig() != nil && len(spec.GetRawOutputDataConfig().OutputLocationPrefix) > 0) {
+		workflowExecConfig.RawOutputDataConfig = spec.GetRawOutputDataConfig()
+		isChanged = true
+	}
+	if (workflowExecConfig.GetLabels() == nil || len(workflowExecConfig.GetLabels().Values) == 0) &&
+		(spec.GetLabels() != nil && len(spec.GetLabels().Values) > 0) {
+		workflowExecConfig.Labels = spec.GetLabels()
+		isChanged = true
+	}
+	if (workflowExecConfig.GetAnnotations() == nil || len(workflowExecConfig.GetAnnotations().Values) == 0) &&
+		(spec.GetAnnotations() != nil && len(spec.GetAnnotations().Values) > 0) {
+		workflowExecConfig.Annotations = spec.GetAnnotations()
+		isChanged = true
+	}
+	return isChanged
+}
+
 // Produces execution-time attributes for workflow execution.
 // Defaults to overridable execution values set in the execution create request, then looks at the launch plan values
 // (if any) before defaulting to values set in the matchable resource db and further if matchable resources don't
 // exist then defaults to one set in application configuration
 func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admin.ExecutionCreateRequest,
 	launchPlan *admin.LaunchPlan) (*admin.WorkflowExecutionConfig, error) {
-	if request.Spec.MaxParallelism > 0 {
-		return &admin.WorkflowExecutionConfig{
-			MaxParallelism: request.Spec.MaxParallelism,
-		}, nil
+
+	workflowExecConfig := &admin.WorkflowExecutionConfig{}
+	// merge the request spec into workflowExecConfig
+	if isChanged := mergeIntoExecConfig(workflowExecConfig, request.Spec); isChanged {
+		return workflowExecConfig, nil
 	}
-	if launchPlan != nil && launchPlan.Spec.MaxParallelism > 0 {
-		return &admin.WorkflowExecutionConfig{
-			MaxParallelism: launchPlan.Spec.MaxParallelism,
-		}, nil
+
+	if launchPlan != nil && launchPlan.Spec != nil {
+		// merge the launch plan spec into workflowExecConfig
+		if isChanged := mergeIntoExecConfig(workflowExecConfig, launchPlan.Spec); isChanged {
+			return workflowExecConfig, nil
+		}
 	}
 
-	resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
-		Project:      request.Project,
-		Domain:       request.Domain,
-		ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
-	})
+	matchableResource, err := util.GetMatchableResource(ctx, m.resourceManager,
+		admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, request.Project, request.Domain)
 	if err != nil {
-		if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
-			logger.Errorf(ctx, "Failed to get workflow execution config overrides with error: %v", err)
-			return nil, err
-		}
+		return nil, err
 	}
-	if resource != nil && resource.Attributes.GetWorkflowExecutionConfig() != nil {
-		return resource.Attributes.GetWorkflowExecutionConfig(), nil
+
+	if matchableResource != nil && matchableResource.Attributes.GetWorkflowExecutionConfig() != nil {
+		// merge the matchable resource workflow execution config into workflowExecConfig
+		if isChanged := mergeIntoExecConfig(workflowExecConfig,
+			matchableResource.Attributes.GetWorkflowExecutionConfig()); isChanged {
+			return workflowExecConfig, nil
+		}
 	}
+	//  merge the application config into workflowExecConfig
+	mergeIntoExecConfig(workflowExecConfig, m.config.ApplicationConfiguration().GetTopLevelConfig())
 	// Defaults to one from the application config
-	return &admin.WorkflowExecutionConfig{
-		MaxParallelism: m.config.ApplicationConfiguration().GetTopLevelConfig().GetMaxParallelism(),
-	}, nil
+	return workflowExecConfig, nil
 }
 
 func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *admin.ExecutionCreateRequest) (
@@ -579,21 +629,23 @@ func (m *ExecutionManager) launchSingleTaskExecution(
 	}
 
 	var labels map[string]string
-	if requestSpec.Labels != nil {
-		labels = requestSpec.Labels.Values
+	if executionConfig.Labels != nil {
+		labels = executionConfig.Labels.Values
 	}
+
 	labels, err = m.addProjectLabels(ctx, request.Project, labels)
 	if err != nil {
 		return nil, nil, err
 	}
+
 	var annotations map[string]string
-	if requestSpec.Annotations != nil {
-		annotations = requestSpec.Annotations.Values
+	if executionConfig.Annotations != nil {
+		annotations = executionConfig.Annotations.Values
 	}
 
-	rawOutputDataConfig := launchPlan.Spec.RawOutputDataConfig
-	if requestSpec.RawOutputDataConfig != nil {
-		rawOutputDataConfig = requestSpec.RawOutputDataConfig
+	var rawOutputDataConfig *admin.RawOutputDataConfig
+	if executionConfig.RawOutputDataConfig != nil {
+		rawOutputDataConfig = executionConfig.RawOutputDataConfig
 	}
 
 	clusterAssignment, err := m.getClusterAssignment(ctx, &request)
@@ -817,7 +869,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
 	namespace := common.GetNamespaceName(
 		m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), workflowExecutionID.Project, workflowExecutionID.Domain)
 
-	labels, err := resolveStringMap(requestSpec.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries())
+	labels, err := resolveStringMap(executionConfig.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries())
 	if err != nil {
 		return nil, nil, err
 	}
@@ -825,11 +877,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
 	if err != nil {
 		return nil, nil, err
 	}
-	annotations, err := resolveStringMap(requestSpec.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries())
+	annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries())
 	if err != nil {
 		return nil, nil, err
 	}
-	rawOutputDataConfig := launchPlan.Spec.RawOutputDataConfig
+	var rawOutputDataConfig *admin.RawOutputDataConfig
 	if requestSpec.RawOutputDataConfig != nil {
 		rawOutputDataConfig = requestSpec.RawOutputDataConfig
 	}
diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go
index 895dc45c51..f5f18fd67a 100644
--- a/pkg/manager/impl/execution_manager_test.go
+++ b/pkg/manager/impl/execution_manager_test.go
@@ -3622,6 +3622,299 @@ func TestCreateSingleTaskExecution(t *testing.T) {
 	assert.NoError(t, err)
 }
 
+func TestGetExecutionConfigOverrides(t *testing.T) {
+
+	requestLabels := map[string]string{"requestLabelKey": "requestLabelValue"}
+	requestAnnotations := map[string]string{"requestAnnotationKey": "requestAnnotationValue"}
+	requestOutputLocationPrefix := "requestOutputLocationPrefix"
+	requestK8sServiceAccount := "requestK8sServiceAccount"
+	requestMaxParallelism := int32(10)
+
+	launchPlanLabels := map[string]string{"launchPlanLabelKey": "launchPlanLabelValue"}
+	launchPlanAnnotations := map[string]string{"launchPlanAnnotationKey": "launchPlanAnnotationValue"}
+	launchPlanOutputLocationPrefix := "launchPlanOutputLocationPrefix"
+	launchPlanK8sServiceAccount := "launchPlanK8sServiceAccount"
+	launchPlanAssumableIamRole := "launchPlanAssumableIamRole"
+	launchPlanMaxParallelism := int32(50)
+
+	applicationConfig := runtime.NewConfigurationProvider()
+
+	defaultK8sServiceAccount := applicationConfig.ApplicationConfiguration().GetTopLevelConfig().K8SServiceAccount
+	defaultMaxParallelism := applicationConfig.ApplicationConfiguration().GetTopLevelConfig().MaxParallelism
+
+	rmLabels := map[string]string{"rmLabelKey": "rmLabelValue"}
+	rmAnnotations := map[string]string{"rmAnnotationKey": "rmAnnotationValue"}
+	rmOutputLocationPrefix := "rmOutputLocationPrefix"
+	rmK8sServiceAccount := "rmK8sServiceAccount"
+	rmMaxParallelism := int32(80)
+
+	resourceManager := managerMocks.MockResourceManager{}
+	executionManager := ExecutionManager{
+		resourceManager: &resourceManager,
+		config:          applicationConfig,
+	}
+	resourceManager.GetResourceFunc = func(ctx context.Context,
+		request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
+		assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
+			Project:      workflowIdentifier.Project,
+			Domain:       workflowIdentifier.Domain,
+			ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
+		})
+		return &managerInterfaces.ResourceResponse{
+			Attributes: &admin.MatchingAttributes{
+				Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
+					WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{
+						MaxParallelism: rmMaxParallelism,
+						Labels:         &admin.Labels{Values: rmLabels},
+						Annotations:    &admin.Annotations{Values: rmAnnotations},
+						RawOutputDataConfig: &admin.RawOutputDataConfig{
+							OutputLocationPrefix: rmOutputLocationPrefix,
+						},
+						SecurityContext: &core.SecurityContext{
+							RunAs: &core.Identity{
+								K8SServiceAccount: rmK8sServiceAccount,
+							},
+						},
+					},
+				},
+			},
+		}, nil
+	}
+
+	t.Run("request with full config", func(t *testing.T) {
+		request := &admin.ExecutionCreateRequest{
+			Project: workflowIdentifier.Project,
+			Domain:  workflowIdentifier.Domain,
+			Spec: &admin.ExecutionSpec{
+				Labels:      &admin.Labels{Values: requestLabels},
+				Annotations: &admin.Annotations{Values: requestAnnotations},
+				RawOutputDataConfig: &admin.RawOutputDataConfig{
+					OutputLocationPrefix: requestOutputLocationPrefix,
+				},
+				SecurityContext: &core.SecurityContext{
+					RunAs: &core.Identity{
+						K8SServiceAccount: requestK8sServiceAccount,
+					},
+				},
+				MaxParallelism: requestMaxParallelism,
+			},
+		}
+		execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil)
+		assert.NoError(t, err)
+		assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism)
+		assert.Equal(t, requestK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
+		assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
+		assert.Equal(t, requestLabels, execConfig.GetLabels().Values)
+		assert.Equal(t, requestAnnotations, execConfig.GetAnnotations().Values)
+	})
+	t.Run("request with partial config", func(t *testing.T) {
+		request := &admin.ExecutionCreateRequest{
+			Project: workflowIdentifier.Project,
+			Domain:  workflowIdentifier.Domain,
+			Spec: &admin.ExecutionSpec{
+				Labels: &admin.Labels{Values: requestLabels},
+				RawOutputDataConfig: &admin.RawOutputDataConfig{
+					OutputLocationPrefix: requestOutputLocationPrefix,
+				},
+				MaxParallelism: requestMaxParallelism,
+			},
+		}
+		launchPlan := &admin.LaunchPlan{
+			Spec: &admin.LaunchPlanSpec{
+				Annotations:         &admin.Annotations{Values: launchPlanAnnotations},
+				Labels:              &admin.Labels{Values: launchPlanLabels},
+				RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: launchPlanOutputLocationPrefix},
+				SecurityContext: &core.SecurityContext{
+					RunAs: &core.Identity{
+						K8SServiceAccount: launchPlanK8sServiceAccount,
+						IamRole:           launchPlanAssumableIamRole,
+					},
+				},
+				MaxParallelism: launchPlanMaxParallelism,
+			},
+		}
+		execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
+		assert.NoError(t, err)
+		assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism)
+		assert.Nil(t, execConfig.SecurityContext)
+		assert.Nil(t, execConfig.Annotations)
+		assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
+		assert.Equal(t, requestLabels, execConfig.GetLabels().Values)
+	})
+	t.Run("request with no config", func(t *testing.T) {
+		request := &admin.ExecutionCreateRequest{
+			Project: workflowIdentifier.Project,
+			Domain:  workflowIdentifier.Domain,
+			Spec:    &admin.ExecutionSpec{},
+		}
+		launchPlan := &admin.LaunchPlan{
+			Spec: &admin.LaunchPlanSpec{
+				Labels:      &admin.Labels{Values: launchPlanLabels},
+				Annotations: &admin.Annotations{Values: launchPlanAnnotations},
+				RawOutputDataConfig: &admin.RawOutputDataConfig{
+					OutputLocationPrefix: launchPlanOutputLocationPrefix,
+				},
+				SecurityContext: &core.SecurityContext{
+					RunAs: &core.Identity{
+						K8SServiceAccount: launchPlanK8sServiceAccount,
+					},
+				},
+				MaxParallelism: launchPlanMaxParallelism,
+			},
+		}
+		execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
+		assert.NoError(t, err)
+		assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism)
+		assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
+		assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
+		assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values)
+		assert.Equal(t, launchPlanAnnotations, execConfig.GetAnnotations().Values)
+	})
+	t.Run("launchplan with partial config", func(t *testing.T) {
+		request := &admin.ExecutionCreateRequest{
+			Project: workflowIdentifier.Project,
+			Domain:  workflowIdentifier.Domain,
+			Spec:    &admin.ExecutionSpec{},
+		}
+		launchPlan := &admin.LaunchPlan{
+			Spec: &admin.LaunchPlanSpec{
+				Labels:      &admin.Labels{Values: launchPlanLabels},
+				Annotations: &admin.Annotations{Values: launchPlanAnnotations},
+				RawOutputDataConfig: &admin.RawOutputDataConfig{
+					OutputLocationPrefix: launchPlanOutputLocationPrefix,
+				},
+				SecurityContext: &core.SecurityContext{
+					RunAs: &core.Identity{
+						K8SServiceAccount: launchPlanK8sServiceAccount,
+					},
+				},
+				MaxParallelism: launchPlanMaxParallelism,
+			},
+		}
+		execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
+		assert.NoError(t, err)
+		assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism)
+		assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
+		assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
+		assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values)
+		assert.Equal(t, launchPlanAnnotations, execConfig.GetAnnotations().Values)
+	})
+	t.Run("launchplan with no config", func(t *testing.T) {
+		request := &admin.ExecutionCreateRequest{
+			Project: workflowIdentifier.Project,
+			Domain:  workflowIdentifier.Domain,
+			Spec:    &admin.ExecutionSpec{},
+		}
+		launchPlan := &admin.LaunchPlan{
+			Spec: &admin.LaunchPlanSpec{},
+		}
+		execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
+		assert.NoError(t, err)
+		assert.Equal(t, rmMaxParallelism, execConfig.MaxParallelism)
+		assert.Equal(t, rmK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
+		assert.Equal(t, rmOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
+		assert.Equal(t, rmLabels, execConfig.GetLabels().Values)
+		assert.Equal(t, rmAnnotations, execConfig.GetAnnotations().Values)
+	})
+	t.Run("matchable resource partial config", func(t *testing.T) {
+		resourceManager.GetResourceFunc = func(ctx context.Context,
+			request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
+			assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
+				Project:      workflowIdentifier.Project,
+				Domain:       workflowIdentifier.Domain,
+				ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
+			})
+			return &managerInterfaces.ResourceResponse{
+				Attributes: &admin.MatchingAttributes{
+					Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
+						WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{
+							MaxParallelism: rmMaxParallelism,
+							Annotations:    &admin.Annotations{Values: rmAnnotations},
+							SecurityContext: &core.SecurityContext{
+								RunAs: &core.Identity{
+									K8SServiceAccount: rmK8sServiceAccount,
+								},
+							},
+						},
+					},
+				},
+			}, nil
+		}
+		request := &admin.ExecutionCreateRequest{
+			Project: workflowIdentifier.Project,
+			Domain:  workflowIdentifier.Domain,
+			Spec:    &admin.ExecutionSpec{},
+		}
+		launchPlan := &admin.LaunchPlan{
+			Spec: &admin.LaunchPlanSpec{},
+		}
+		execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
+		assert.NoError(t, err)
+		assert.Equal(t, rmMaxParallelism, execConfig.MaxParallelism)
+		assert.Equal(t, rmK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
+		assert.Nil(t, execConfig.GetRawOutputDataConfig())
+		assert.Nil(t, execConfig.GetLabels())
+		assert.Equal(t, rmAnnotations, execConfig.GetAnnotations().Values)
+	})
+	t.Run("matchable resource with no config", func(t *testing.T) {
+		resourceManager.GetResourceFunc = func(ctx context.Context,
+			request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
+			assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
+				Project:      workflowIdentifier.Project,
+				Domain:       workflowIdentifier.Domain,
+				ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
+			})
+			return &managerInterfaces.ResourceResponse{
+				Attributes: &admin.MatchingAttributes{
+					Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
+						WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{},
+					},
+				},
+			}, nil
+		}
+		request := &admin.ExecutionCreateRequest{
+			Project: workflowIdentifier.Project,
+			Domain:  workflowIdentifier.Domain,
+			Spec:    &admin.ExecutionSpec{},
+		}
+		launchPlan := &admin.LaunchPlan{
+			Spec: &admin.LaunchPlanSpec{},
+		}
+		execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
+		assert.NoError(t, err)
+		assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
+		assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
+		assert.Nil(t, execConfig.GetRawOutputDataConfig())
+		assert.Nil(t, execConfig.GetLabels())
+		assert.Nil(t, execConfig.GetAnnotations())
+	})
+	t.Run("matchable resource failure", func(t *testing.T) {
+		resourceManager.GetResourceFunc = func(ctx context.Context,
+			request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
+			assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
+				Project:      workflowIdentifier.Project,
+				Domain:       workflowIdentifier.Domain,
+				ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
+			})
+			return nil, fmt.Errorf("failed to fetch the resources")
+		}
+		request := &admin.ExecutionCreateRequest{
+			Project: workflowIdentifier.Project,
+			Domain:  workflowIdentifier.Domain,
+			Spec:    &admin.ExecutionSpec{},
+		}
+		launchPlan := &admin.LaunchPlan{
+			Spec: &admin.LaunchPlanSpec{},
+		}
+		execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
+		assert.Equal(t, fmt.Errorf("failed to fetch the resources"), err)
+		assert.Nil(t, execConfig.GetSecurityContext())
+		assert.Nil(t, execConfig.GetRawOutputDataConfig())
+		assert.Nil(t, execConfig.GetLabels())
+		assert.Nil(t, execConfig.GetAnnotations())
+	})
+}
+
 func TestGetExecutionConfig(t *testing.T) {
 	resourceManager := managerMocks.MockResourceManager{}
 	resourceManager.GetResourceFunc = func(ctx context.Context,
@@ -3642,8 +3935,10 @@ func TestGetExecutionConfig(t *testing.T) {
 		}, nil
 	}
 
+	applicationConfig := runtime.NewConfigurationProvider()
 	executionManager := ExecutionManager{
 		resourceManager: &resourceManager,
+		config:          applicationConfig,
 	}
 	execConfig, err := executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{
 		Project: workflowIdentifier.Project,
@@ -3658,7 +3953,6 @@ func TestGetExecutionConfig_Spec(t *testing.T) {
 	resourceManager := managerMocks.MockResourceManager{}
 	resourceManager.GetResourceFunc = func(ctx context.Context,
 		request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
-		t.Errorf("When a user specifies max parallelism in a spec, the db should not be queried")
 		return nil, nil
 	}
 	applicationConfig := runtime.NewConfigurationProvider()
@@ -3678,7 +3972,7 @@ func TestGetExecutionConfig_Spec(t *testing.T) {
 		},
 	})
 	assert.NoError(t, err)
-	assert.Equal(t, execConfig.MaxParallelism, int32(100))
+	assert.Equal(t, int32(100), execConfig.MaxParallelism)
 
 	execConfig, err = executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{
 		Project: workflowIdentifier.Project,
@@ -3690,7 +3984,7 @@ func TestGetExecutionConfig_Spec(t *testing.T) {
 		},
 	})
 	assert.NoError(t, err)
-	assert.Equal(t, execConfig.MaxParallelism, int32(50))
+	assert.Equal(t, int32(50), execConfig.MaxParallelism)
 
 	resourceManager = managerMocks.MockResourceManager{}
 	resourceManager.GetResourceFunc = func(ctx context.Context,
diff --git a/pkg/manager/impl/util/shared.go b/pkg/manager/impl/util/shared.go
index f79da35482..bed18fdd12 100644
--- a/pkg/manager/impl/util/shared.go
+++ b/pkg/manager/impl/util/shared.go
@@ -9,6 +9,7 @@ import (
 	"github.com/flyteorg/flyteadmin/pkg/errors"
 	"github.com/flyteorg/flyteadmin/pkg/manager/impl/shared"
 	"github.com/flyteorg/flyteadmin/pkg/manager/impl/validation"
+	"github.com/flyteorg/flyteadmin/pkg/manager/interfaces"
 	repoInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
 	"github.com/flyteorg/flyteadmin/pkg/repositories/models"
 	"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"
@@ -233,3 +234,22 @@ func GetTaskExecutionModel(
 	}
 	return &taskExecutionModel, nil
 }
+
+// GetMatchableResource gets matchable resource for resourceType and project - domain combination.
+// Returns nil with nothing is found or return an error
+func GetMatchableResource(ctx context.Context, resourceManager interfaces.ResourceInterface, resourceType admin.MatchableResource,
+	project, domain string) (*interfaces.ResourceResponse, error) {
+	matchableResource, err := resourceManager.GetResource(ctx, interfaces.ResourceRequest{
+		Project:      project,
+		Domain:       domain,
+		ResourceType: resourceType,
+	})
+	if err != nil {
+		if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
+			logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain with error: %v", resourceType,
+				project, domain, err)
+			return nil, err
+		}
+	}
+	return matchableResource, nil
+}
diff --git a/pkg/manager/impl/util/shared_test.go b/pkg/manager/impl/util/shared_test.go
index 25ba0044aa..fce7b682b3 100644
--- a/pkg/manager/impl/util/shared_test.go
+++ b/pkg/manager/impl/util/shared_test.go
@@ -6,17 +6,19 @@ import (
 	"strings"
 	"testing"
 
-	"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
-
 	"github.com/flyteorg/flyteadmin/pkg/common"
 	commonMocks "github.com/flyteorg/flyteadmin/pkg/common/mocks"
 	flyteAdminErrors "github.com/flyteorg/flyteadmin/pkg/errors"
 	"github.com/flyteorg/flyteadmin/pkg/manager/impl/testutils"
+	managerInterfaces "github.com/flyteorg/flyteadmin/pkg/manager/interfaces"
+	managerMocks "github.com/flyteorg/flyteadmin/pkg/manager/mocks"
 	"github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
 	repositoryMocks "github.com/flyteorg/flyteadmin/pkg/repositories/mocks"
 	"github.com/flyteorg/flyteadmin/pkg/repositories/models"
 	"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
+	"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
 	"github.com/flyteorg/flytestdlib/storage"
+
 	"github.com/golang/protobuf/proto"
 	"github.com/stretchr/testify/assert"
 	"google.golang.org/grpc/codes"
@@ -474,3 +476,65 @@ func TestListActiveLaunchPlanVersionsFilters(t *testing.T) {
 	assert.Equal(t, activeExpr.Args, int32(admin.LaunchPlanState_ACTIVE))
 	assert.Equal(t, activeExpr.Query, testutils.StateQueryPattern)
 }
+
+func TestGetMatchableResource(t *testing.T) {
+	resourceType := admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG
+	project := "dummyProject"
+	domain := "dummyDomain"
+	t.Run("successful fetch", func(t *testing.T) {
+		resourceManager := &managerMocks.MockResourceManager{}
+		resourceManager.GetResourceFunc = func(ctx context.Context,
+			request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
+			assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
+				Project:      project,
+				Domain:       domain,
+				ResourceType: resourceType,
+			})
+			return &managerInterfaces.ResourceResponse{
+				Attributes: &admin.MatchingAttributes{
+					Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
+						WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{
+							MaxParallelism: 12,
+						},
+					},
+				},
+			}, nil
+		}
+
+		mr, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain)
+		assert.Equal(t, int32(12), mr.Attributes.GetWorkflowExecutionConfig().MaxParallelism)
+		assert.Nil(t, err)
+	})
+
+	t.Run("not found", func(t *testing.T) {
+		resourceManager := &managerMocks.MockResourceManager{}
+		resourceManager.GetResourceFunc = func(ctx context.Context,
+			request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
+			assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
+				Project:      project,
+				Domain:       domain,
+				ResourceType: resourceType,
+			})
+			return nil, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "resource not found")
+		}
+
+		_, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain)
+		assert.Nil(t, err)
+	})
+
+	t.Run("internal error", func(t *testing.T) {
+		resourceManager := &managerMocks.MockResourceManager{}
+		resourceManager.GetResourceFunc = func(ctx context.Context,
+			request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
+			assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
+				Project:      project,
+				Domain:       domain,
+				ResourceType: resourceType,
+			})
+			return nil, flyteAdminErrors.NewFlyteAdminError(codes.Internal, "internal error")
+		}
+
+		_, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain)
+		assert.NotNil(t, err)
+	})
+}
diff --git a/pkg/runtime/application_config_provider.go b/pkg/runtime/application_config_provider.go
index 3731b7c726..6c00ae9173 100644
--- a/pkg/runtime/application_config_provider.go
+++ b/pkg/runtime/application_config_provider.go
@@ -38,6 +38,7 @@ var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.Applic
 	EventVersion:          2,
 	AsyncEventsBufferSize: 100,
 	MaxParallelism:        25,
+	K8SServiceAccount:     "default",
 })
 
 var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{
diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go
index 9f2a5d0ab1..3b3cbf4681 100644
--- a/pkg/runtime/interfaces/application_configuration.go
+++ b/pkg/runtime/interfaces/application_configuration.go
@@ -1,6 +1,8 @@
 package interfaces
 
 import (
+	"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
+	"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
 	"github.com/flyteorg/flytestdlib/config"
 
 	"golang.org/x/time/rate"
@@ -65,6 +67,19 @@ type ApplicationConfig struct {
 	// This is useful to achieve fairness. Note: MapTasks are regarded as one unit,
 	// and parallelism/concurrency of MapTasks is independent from this.
 	MaxParallelism int32 `json:"maxParallelism"`
+	// Labels to apply to the execution resource.
+	Labels map[string]string `json:"labels,omitempty"`
+	// Annotations to apply to the execution resource.
+	Annotations map[string]string `json:"annotations,omitempty"`
+
+	// Optional: security context override to apply this execution.
+	// iam_role references the fully qualified name of Identity & Access Management role to impersonate.
+	AssumableIamRole string `json:"assumableIamRole"`
+	// k8s_service_account references a kubernetes service account to impersonate.
+	K8SServiceAccount string `json:"k8sServiceAccount"`
+
+	// Prefix for where offloaded data from user workflows will be written
+	OutputLocationPrefix string `json:"outputLocationPrefix"`
 }
 
 func (a *ApplicationConfig) GetRoleNameKey() string {
@@ -95,6 +110,33 @@ func (a *ApplicationConfig) GetMaxParallelism() int32 {
 	return a.MaxParallelism
 }
 
+func (a *ApplicationConfig) GetRawOutputDataConfig() *admin.RawOutputDataConfig {
+	return &admin.RawOutputDataConfig{
+		OutputLocationPrefix: a.OutputLocationPrefix,
+	}
+}
+
+func (a *ApplicationConfig) GetSecurityContext() *core.SecurityContext {
+	return &core.SecurityContext{
+		RunAs: &core.Identity{
+			IamRole:           a.AssumableIamRole,
+			K8SServiceAccount: a.K8SServiceAccount,
+		},
+	}
+}
+
+func (a *ApplicationConfig) GetAnnotations() *admin.Annotations {
+	return &admin.Annotations{
+		Values: a.Annotations,
+	}
+}
+
+func (a *ApplicationConfig) GetLabels() *admin.Labels {
+	return &admin.Labels{
+		Values: a.Labels,
+	}
+}
+
 // This section holds common config for AWS
 type AWSConfig struct {
 	Region string `json:"region"`