diff --git a/charts/thin/templates/config/runtime.yaml b/charts/thin/templates/config/runtime.yaml deleted file mode 100644 index 33d7674c9d0..00000000000 --- a/charts/thin/templates/config/runtime.yaml +++ /dev/null @@ -1,22 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: {{ template "thin.fullname" . }}-runtimeset - labels: - app: {{ template "thin.name" . }} - chart: {{ template "thin.chart" . }} - release: {{ .Release.Name }} - heritage: {{ .Release.Service }} - {{- include "library.fluid.labels" . | nindent 4 }} - ownerReferences: - {{- if .Values.owner.enabled }} - - apiVersion: {{ .Values.owner.apiVersion }} - blockOwnerDeletion: {{ .Values.owner.blockOwnerDeletion }} - controller: {{ .Values.owner.controller }} - kind: {{ .Values.owner.kind }} - name: {{ .Values.owner.name }} - uid: {{ .Values.owner.uid }} - {{- end }} -data: - runtime.json: | - {{ .Values.runtimeValue }} diff --git a/charts/thin/templates/fuse/daemonset.yaml b/charts/thin/templates/fuse/daemonset.yaml index 625db9ebc15..97098126ad9 100644 --- a/charts/thin/templates/fuse/daemonset.yaml +++ b/charts/thin/templates/fuse/daemonset.yaml @@ -119,9 +119,6 @@ spec: - mountPath: /etc/fluid/config name: thin-conf readOnly: true - - mountPath: /etc/fluid/runtime - name: runtime - readOnly: true {{- if .Values.fuse.cacheDir }} - name: cache-dir mountPath: {{ .Values.fuse.cacheDir }} @@ -155,13 +152,6 @@ spec: path: config.json defaultMode: 0444 {{- end }} - - name: runtime - configMap: - name: {{ template "thin.fullname" . }}-runtimeset - items: - - key: runtime.json - path: runtime.json - defaultMode: 0444 {{- if .Values.fuse.volumes }} {{ toYaml .Values.fuse.volumes | indent 8 }} {{- end }} diff --git a/charts/thin/templates/worker/statefuleset.yaml b/charts/thin/templates/worker/statefuleset.yaml index 45f0897e97d..27937ab373c 100644 --- a/charts/thin/templates/worker/statefuleset.yaml +++ b/charts/thin/templates/worker/statefuleset.yaml @@ -1,3 +1,4 @@ +{{ if .Values.worker.enabled -}} apiVersion: apps/v1 kind: StatefulSet metadata: @@ -107,3 +108,4 @@ spec: {{- if .Values.worker.volumes }} {{ toYaml .Values.worker.volumes | indent 8 }} {{- end }} +{{- end -}} diff --git a/charts/thin/values.yaml b/charts/thin/values.yaml index 99c00494be4..325308f7b2e 100644 --- a/charts/thin/values.yaml +++ b/charts/thin/values.yaml @@ -33,6 +33,7 @@ runtimeValue: "" ## WORKER ## worker: + enabled: false image: "" imageTag: "" imagePullPolicy: "" diff --git a/pkg/application/inject/fuse/mutator/mutator_default.go b/pkg/application/inject/fuse/mutator/mutator_default.go index b8a918909de..45498a8ce26 100644 --- a/pkg/application/inject/fuse/mutator/mutator_default.go +++ b/pkg/application/inject/fuse/mutator/mutator_default.go @@ -24,7 +24,6 @@ import ( "github.com/go-logr/logr" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -331,24 +330,16 @@ func (helper *defaultMutatorHelper) prepareFuseContainerPostStartScript() error return err } - ownerReference := metav1.OwnerReference{ - APIVersion: dataset.APIVersion, - Kind: dataset.Kind, - Name: dataset.Name, - UID: dataset.UID, - } - // Fluid assumes pvc name is the same with runtime's name gen := poststart.NewDefaultPostStartScriptGenerator() - cmKey := gen.GetConfigMapKeyByOwner(types.NamespacedName{Namespace: datasetNamespace, Name: datasetName}, template.FuseMountInfo.FsType) - cm := gen.BuildConfigMap(ownerReference, cmKey) - + cmKey := gen.GetNamespacedConfigMapKey(types.NamespacedName{Namespace: datasetNamespace, Name: datasetName}, template.FuseMountInfo.FsType) found, err := kubeclient.IsConfigMapExist(helper.client, cmKey.Name, cmKey.Namespace) if err != nil { return err } if !found { + cm := gen.BuildConfigMap(dataset, cmKey) err = helper.client.Create(context.TODO(), cm) if err != nil { // If ConfigMap creation succeeds concurrently, continue to mutate diff --git a/pkg/application/inject/fuse/mutator/mutator_unprivileged.go b/pkg/application/inject/fuse/mutator/mutator_unprivileged.go index 84e5b7258d1..350ced422a0 100644 --- a/pkg/application/inject/fuse/mutator/mutator_unprivileged.go +++ b/pkg/application/inject/fuse/mutator/mutator_unprivileged.go @@ -25,7 +25,6 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -130,17 +129,10 @@ func (mutator *unprivilegedMutatorHelper) prepareFuseContainerPostStartScript() return err } - ownerReference := metav1.OwnerReference{ - APIVersion: dataset.APIVersion, - Kind: dataset.Kind, - Name: dataset.Name, - UID: dataset.UID, - } - // Fluid assumes pvc name is the same with runtime's name gen := poststart.NewDefaultPostStartScriptGenerator() - cmKey := gen.GetConfigMapKeyByOwner(types.NamespacedName{Namespace: datasetNamespace, Name: datasetName}, template.FuseMountInfo.FsType) - cm := gen.BuildConfigMap(ownerReference, cmKey) + cmKey := gen.GetNamespacedConfigMapKey(types.NamespacedName{Namespace: datasetNamespace, Name: datasetName}, template.FuseMountInfo.FsType) + cm := gen.BuildConfigMap(dataset, cmKey) found, err := kubeclient.IsConfigMapExist(mutator.client, cmKey.Name, cmKey.Namespace) if err != nil { diff --git a/pkg/application/inject/fuse/poststart/check_fuse_default.go b/pkg/application/inject/fuse/poststart/check_fuse_default.go index 83a041f9e17..6ea98fb59a6 100644 --- a/pkg/application/inject/fuse/poststart/check_fuse_default.go +++ b/pkg/application/inject/fuse/poststart/check_fuse_default.go @@ -81,7 +81,7 @@ type defaultPostStartScriptGenerator struct { func NewDefaultPostStartScriptGenerator() *defaultPostStartScriptGenerator { return &defaultPostStartScriptGenerator{ scriptGeneratorHelper: scriptGeneratorHelper{ - configMapName: "check-mount", + configMapName: "default-check-mount", scriptFileName: "check-mount.sh", scriptMountPath: "/check-mount.sh", scriptContent: replacer.Replace(contentPrivilegedSidecar), diff --git a/pkg/application/inject/fuse/poststart/script_gen_helper.go b/pkg/application/inject/fuse/poststart/script_gen_helper.go index 6bdd976f05c..ff868ff7431 100644 --- a/pkg/application/inject/fuse/poststart/script_gen_helper.go +++ b/pkg/application/inject/fuse/poststart/script_gen_helper.go @@ -17,6 +17,7 @@ limitations under the License. package poststart import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "strings" corev1 "k8s.io/api/core/v1" @@ -35,27 +36,26 @@ type scriptGeneratorHelper struct { scriptMountPath string } -func (helper *scriptGeneratorHelper) BuildConfigMap(ownerReference metav1.OwnerReference, configMapKey types.NamespacedName) *corev1.ConfigMap { +func (helper *scriptGeneratorHelper) BuildConfigMap(dataset *datav1alpha1.Dataset, configMapKey types.NamespacedName) *corev1.ConfigMap { data := map[string]string{} data[helper.scriptFileName] = helper.scriptContent // data[helper.scriptFileName] = replacer.Replace(helper.scriptContent) return &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Name: configMapKey.Name, - Namespace: configMapKey.Namespace, - OwnerReferences: []metav1.OwnerReference{ownerReference}, + Name: configMapKey.Name, + Namespace: configMapKey.Namespace, Labels: map[string]string{ - common.LabelAnnotationDatasetId: utils.GetDatasetId(configMapKey.Namespace, ownerReference.Name, string(ownerReference.UID)), + common.LabelAnnotationDatasetId: utils.GetDatasetId(configMapKey.Namespace, dataset.Name, string(dataset.UID)), }, }, Data: data, } } -func (helper *scriptGeneratorHelper) GetConfigMapKeyByOwner(datasetKey types.NamespacedName, runtimeType string) types.NamespacedName { +func (helper *scriptGeneratorHelper) GetNamespacedConfigMapKey(datasetKey types.NamespacedName, runtimeType string) types.NamespacedName { return types.NamespacedName{ Namespace: datasetKey.Namespace, - Name: datasetKey.Name + "-" + strings.ToLower(runtimeType) + "-" + helper.configMapName, + Name: strings.ToLower(runtimeType) + "-" + helper.configMapName, } } diff --git a/pkg/common/label.go b/pkg/common/label.go index 3c9d3d22c4d..62643c01159 100644 --- a/pkg/common/label.go +++ b/pkg/common/label.go @@ -77,6 +77,12 @@ const ( // "Sidecar": for only sidecar to skip check mount ready, AnnotationSkipCheckMountReadyTarget = LabelAnnotationPrefix + "skip-check-mount-ready-target" + // AnnotationEnableRuntimeSetConfigMap + AnnotationEnableRuntimeSetConfig = "runtime." + LabelAnnotationPrefix + "enable-set-config" + + // AnnotationEnableRuntimeSetConfigMap + AnnotationEnableRuntimeHelmValueConfig = "runtime." + LabelAnnotationPrefix + "enable-helm-value-config" + // LabelAnnotationMountingDatasets is a label/annotation key indicating which datasets are currently being used by a pod. // i.e. fluid.io/datasets-in-use LabelAnnotationDatasetsInUse = LabelAnnotationPrefix + "datasets-in-use" diff --git a/pkg/ddc/thin/dataset.go b/pkg/ddc/thin/dataset.go index 28a6649755d..e06aa360fd8 100644 --- a/pkg/ddc/thin/dataset.go +++ b/pkg/ddc/thin/dataset.go @@ -31,11 +31,6 @@ import ( ) func (t *ThinEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (err error) { - runtime, err := t.getRuntime() - if err != nil { - return err - } - // 2. update the dataset status err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { dataset, err := utils.GetDataset(t.Client, t.name, t.namespace) @@ -85,7 +80,6 @@ func (t *ThinEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (err e cond) } - datasetToUpdate.Status.CacheStates = runtime.Status.CacheStates if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) { t.Log.V(1).Info("Update DatasetStatus", "dataset", fmt.Sprintf("%s/%s", datasetToUpdate.GetNamespace(), datasetToUpdate.GetName())) err = t.Client.Status().Update(context.TODO(), datasetToUpdate) @@ -108,39 +102,6 @@ func (t *ThinEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (err e } func (t *ThinEngine) UpdateCacheOfDataset() (err error) { - runtime, err := t.getRuntime() - if err != nil { - return err - } - - // 2.update the dataset status - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - dataset, err := utils.GetDataset(t.Client, t.name, t.namespace) - if err != nil { - return err - } - datasetToUpdate := dataset.DeepCopy() - - datasetToUpdate.Status.CacheStates = runtime.Status.CacheStates - - t.Log.Info("the dataset status", "status", datasetToUpdate.Status) - - if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) { - t.Log.V(1).Info("Update RuntimeStatus", "runtime", fmt.Sprintf("%s/%s", runtime.GetNamespace(), runtime.GetName())) - err = t.Client.Status().Update(context.TODO(), datasetToUpdate) - return err - } else { - t.Log.Info("No need to update the cache of the data") - } - - return nil - }) - - if err != nil { - return utils.LoggingErrorExceptConflict(t.Log, err, "Failed to Update dataset", - types.NamespacedName{Namespace: t.namespace, Name: t.name}) - } - return } diff --git a/pkg/ddc/thin/health_check.go b/pkg/ddc/thin/health_check.go index 32b67e46eea..4d4172ce9d0 100644 --- a/pkg/ddc/thin/health_check.go +++ b/pkg/ddc/thin/health_check.go @@ -17,128 +17,20 @@ package thin import ( - "context" - "fmt" - "reflect" - - data "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/utils" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/retry" ) func (t ThinEngine) CheckRuntimeHealthy() (err error) { - // 1. Check the healthy of the workers - err = t.checkWorkersHealthy() - if err != nil { - t.Log.Error(err, "The workers are not healthy") - updateErr := t.UpdateDatasetStatus(data.FailedDatasetPhase) - if updateErr != nil { - t.Log.Error(updateErr, "Failed to update dataset") - } - return - } - - // 2. Check the healthy of the fuse + // Check the healthy of the fuse err = t.checkFuseHealthy() if err != nil { - t.Log.Error(err, "The fuse is not healthy") - updateErr := t.UpdateDatasetStatus(data.FailedDatasetPhase) - if updateErr != nil { - t.Log.Error(updateErr, "Failed to update dataset") - } + t.Log.Error(err, "checkFuseHealthy failed") return } - updateErr := t.UpdateDatasetStatus(data.BoundDatasetPhase) - if updateErr != nil { - t.Log.Error(updateErr, "Failed to update dataset") - } - return } -// checkWorkersHealthy check workers number changed -func (t *ThinEngine) checkWorkersHealthy() (err error) { - workerName := t.getWorkerName() - - // Check the status of workers - workers, err := kubeclient.GetStatefulSet(t.Client, workerName, t.namespace) - if err != nil { - return err - } - - healthy := false - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - - runtime, err := t.getRuntime() - if err != nil { - return err - } - - runtimeToUpdate := runtime.DeepCopy() - if workers.Status.ReadyReplicas == 0 && *workers.Spec.Replicas > 0 { - if len(runtimeToUpdate.Status.Conditions) == 0 { - runtimeToUpdate.Status.Conditions = []data.RuntimeCondition{} - } - cond := utils.NewRuntimeCondition(data.RuntimeWorkersReady, "The workers are not ready.", - fmt.Sprintf("The statefulset %s in %s are not ready, the Unavailable number is %d, please fix it.", - workers.Name, - workers.Namespace, - *workers.Spec.Replicas-workers.Status.ReadyReplicas), v1.ConditionFalse) - - _, oldCond := utils.GetRuntimeCondition(runtimeToUpdate.Status.Conditions, cond.Type) - - if oldCond == nil || oldCond.Type != cond.Type { - runtimeToUpdate.Status.Conditions = - utils.UpdateRuntimeCondition(runtimeToUpdate.Status.Conditions, - cond) - } - - runtimeToUpdate.Status.WorkerPhase = data.RuntimePhaseNotReady - - t.Log.Error(err, "the workers are not ready") - } else { - healthy = true - cond := utils.NewRuntimeCondition(data.RuntimeWorkersReady, "The workers are ready.", - "The workers are ready", v1.ConditionTrue) - - _, oldCond := utils.GetRuntimeCondition(runtimeToUpdate.Status.Conditions, cond.Type) - - if oldCond == nil || oldCond.Type != cond.Type { - runtimeToUpdate.Status.Conditions = - utils.UpdateRuntimeCondition(runtimeToUpdate.Status.Conditions, - cond) - } - } - runtimeToUpdate.Status.WorkerNumberReady = workers.Status.ReadyReplicas - runtimeToUpdate.Status.WorkerNumberAvailable = workers.Status.CurrentReplicas - if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) { - updateErr := t.Client.Status().Update(context.TODO(), runtimeToUpdate) - if updateErr != nil { - return updateErr - } - } - - return err - }) - - if err != nil { - t.Log.Error(err, "Failed update runtime") - return err - } - - if !healthy { - err = fmt.Errorf("the workers %s in %s are not ready, the unhealthy number %d", - workers.Name, - workers.Namespace, - *workers.Spec.Replicas-workers.Status.ReadyReplicas) - } - - return err -} - // checkFuseHealthy check fuses number changed func (t *ThinEngine) checkFuseHealthy() error { return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { diff --git a/pkg/ddc/thin/master.go b/pkg/ddc/thin/master.go index 24e068a1d0a..56bc2f552e5 100644 --- a/pkg/ddc/thin/master.go +++ b/pkg/ddc/thin/master.go @@ -17,19 +17,16 @@ package thin import ( - "context" - "reflect" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/util/retry" ) -func (t ThinEngine) CheckMasterReady() (ready bool, err error) { - // thinRuntime has no master role +func (t ThinEngine) CheckMasterReady() (bool, error) { + if _, err := kubeclient.GetDaemonset(t.Client, t.getFuseName(), t.namespace); err != nil { + return false, err + } + return true, nil } @@ -39,7 +36,7 @@ func (t ThinEngine) ShouldSetupMaster() (should bool, err error) { return } - switch runtime.Status.WorkerPhase { + switch runtime.Status.FusePhase { case datav1alpha1.RuntimePhaseNone: should = true default: @@ -49,61 +46,19 @@ func (t ThinEngine) ShouldSetupMaster() (should bool, err error) { } func (t ThinEngine) SetupMaster() (err error) { - workerName := t.getWorkerName() - + fuseName := t.getFuseName() // 1. Setup - _, err = kubeclient.GetStatefulSet(t.Client, workerName, t.namespace) + _, err = kubeclient.GetDaemonset(t.Client, fuseName, t.namespace) if err != nil && apierrs.IsNotFound(err) { - //1. Is not found error - t.Log.V(1).Info("SetupMaster", "worker", workerName) + //3.1. Is not found error + t.Log.Info("SetupMaster", "fuse", fuseName) return t.setupMasterInternal() } else if err != nil { - //2. Other errors + //3.2. Other errors return - } else { - //3.The fuse has been set up - t.Log.V(1).Info("The worker has been set.") - } - - // 2. Update the status of the runtime - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - runtime, err := t.getRuntime() - if err != nil { - return err - } - runtimeToUpdate := runtime.DeepCopy() - - runtimeToUpdate.Status.WorkerPhase = datav1alpha1.RuntimePhaseNotReady - replicas := runtimeToUpdate.Spec.Worker.Replicas - if replicas == 0 { - replicas = 1 - } - - // Init selector for worker - runtimeToUpdate.Status.Selector = t.getWorkerSelectors() - runtimeToUpdate.Status.DesiredWorkerNumberScheduled = replicas - runtimeToUpdate.Status.ValueFileConfigmap = t.getHelmValuesConfigMapName() - - if len(runtimeToUpdate.Status.Conditions) == 0 { - runtimeToUpdate.Status.Conditions = []datav1alpha1.RuntimeCondition{} - } - cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeWorkersInitialized, datav1alpha1.RuntimeWorkersInitializedReason, - "The worker is initialized.", corev1.ConditionTrue) - runtimeToUpdate.Status.Conditions = - utils.UpdateRuntimeCondition(runtimeToUpdate.Status.Conditions, - cond) - - if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) { - return t.Client.Status().Update(context.TODO(), runtimeToUpdate) - } - - return nil - }) - - if err != nil { - t.Log.Error(err, "Update runtime status") - return err } + // 2.The fuse has been set up + t.Log.V(1).Info("The fuse has been set.") return } diff --git a/pkg/ddc/thin/master_internal.go b/pkg/ddc/thin/master_internal.go index 014c43d7b9a..acd88eaba8d 100644 --- a/pkg/ddc/thin/master_internal.go +++ b/pkg/ddc/thin/master_internal.go @@ -19,6 +19,7 @@ package thin import ( "fmt" "os" + "strconv" "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/yaml" @@ -64,11 +65,15 @@ func (t *ThinEngine) setupMasterInternal() (err error) { } func (t *ThinEngine) generateThinValueFile(runtime *datav1alpha1.ThinRuntime, profile *datav1alpha1.ThinRuntimeProfile) (valueFileName string, err error) { - //0. Check if the configmap exists - err = kubeclient.DeleteConfigMap(t.Client, t.getHelmValuesConfigMapName(), t.namespace) - if err != nil { - t.Log.Error(err, "Failed to clean value files") - return + enableRuntimeHelmValueConfig := t.ifRuntimeHelmValueEnable() + + if enableRuntimeHelmValueConfig { + //0. Check if the configmap exists + err = kubeclient.DeleteConfigMap(t.Client, t.getHelmValuesConfigMapName(), t.namespace) + if err != nil { + t.Log.Error(err, "Failed to clean value files") + return + } } // labelName := common.LabelAnnotationStorageCapacityPrefix + e.runtimeType + "-" + e.name @@ -101,17 +106,35 @@ func (t *ThinEngine) generateThinValueFile(runtime *datav1alpha1.ThinRuntime, pr return } - //3. Save the configfile into configmap - runtimeInfo := t.runtimeInfo - ownerDatasetId := utils.GetDatasetId(runtimeInfo.GetNamespace(), runtimeInfo.GetName(), runtimeInfo.GetOwnerDatasetUID()) - err = kubeclient.CreateConfigMap(t.Client, t.getHelmValuesConfigMapName(), t.namespace, "data", data, ownerDatasetId) - if err != nil { - return + if enableRuntimeHelmValueConfig { + //3. Save the configfile into configmap + runtimeInfo := t.runtimeInfo + ownerDatasetId := utils.GetDatasetId(runtimeInfo.GetNamespace(), runtimeInfo.GetName(), runtimeInfo.GetOwnerDatasetUID()) + err = kubeclient.CreateConfigMap(t.Client, t.getHelmValuesConfigMapName(), t.namespace, "data", data, ownerDatasetId) + if err != nil { + return + } } return valueFileName, err } +func (t *ThinEngine) ifRuntimeHelmValueEnable() bool { + runtime := t.runtime + if runtime == nil { + return false + } + value, exist := runtime.Annotations[common.AnnotationEnableRuntimeHelmValueConfig] + if !exist { + return false + } + enableRuntimeHelmValueConfig, err := strconv.ParseBool(value) + if err != nil { + return false + } + return enableRuntimeHelmValueConfig +} + func (t *ThinEngine) getHelmValuesConfigMapName() string { return fmt.Sprintf("%s-%s-values", t.name, t.engineImpl) } diff --git a/pkg/ddc/thin/node.go b/pkg/ddc/thin/node.go index cc02c9a9ff0..8940f2f8a5a 100644 --- a/pkg/ddc/thin/node.go +++ b/pkg/ddc/thin/node.go @@ -16,17 +16,6 @@ package thin -import ( - "github.com/fluid-cloudnative/fluid/pkg/utils/dataset/lifecycle" -) - func (t ThinEngine) SyncScheduleInfoToCacheNodes() (err error) { - err = lifecycle.SyncScheduleInfoToCacheNodes(t.runtimeInfo, t.Client) - if err != nil { - return - } - - updated, err := t.UpdateRuntimeSetConfigIfNeeded() - t.Log.V(1).Info("UpdateRuntimeSetConfigIfNeeded", "updated", updated) return } diff --git a/pkg/ddc/thin/replicas.go b/pkg/ddc/thin/replicas.go index 9a66423ede0..828cb04f23a 100644 --- a/pkg/ddc/thin/replicas.go +++ b/pkg/ddc/thin/replicas.go @@ -18,36 +18,8 @@ package thin import ( cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" - "github.com/fluid-cloudnative/fluid/pkg/utils" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" ) func (t ThinEngine) SyncReplicas(ctx cruntime.ReconcileRequestContext) (err error) { - var ( - workerName string = t.getWorkerName() - namespace string = t.namespace - ) - - workers, err := kubeclient.GetStatefulSet(t.Client, workerName, namespace) - if err != nil { - return err - } - - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - runtime, err := t.getRuntime() - if err != nil { - return err - } - runtimeToUpdate := runtime.DeepCopy() - err = t.Helper.SyncReplicas(ctx, runtimeToUpdate, runtimeToUpdate.Status, workers) - return err - }) - if err != nil { - return utils.LoggingErrorExceptConflict(t.Log, err, "Failed to sync the replicas", - types.NamespacedName{Namespace: t.namespace, Name: t.name}) - } - - return + return nil } diff --git a/pkg/ddc/thin/status.go b/pkg/ddc/thin/status.go index d63488c82c1..c99902922c2 100644 --- a/pkg/ddc/thin/status.go +++ b/pkg/ddc/thin/status.go @@ -18,91 +18,56 @@ package thin import ( "context" - "fmt" - "reflect" - "time" - - data "github.com/fluid-cloudnative/fluid/api/v1alpha1" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/util/retry" + "reflect" + "time" ) -func (t *ThinEngine) CheckAndUpdateRuntimeStatus() (ready bool, err error) { - var ( - workerReady bool - workerName string = t.getWorkerName() - namespace string = t.namespace - ) - +func (t *ThinEngine) CheckAndUpdateRuntimeStatus() (bool, error) { dataset, err := utils.GetDataset(t.Client, t.name, t.namespace) if err != nil { - return ready, err + return false, err } - // 1. Worker should be ready - workers, err := kubeclient.GetStatefulSet(t.Client, workerName, namespace) - if err != nil { - return ready, err - } - - var workerNodeAffinity = kubeclient.MergeNodeSelectorAndNodeAffinity(workers.Spec.Template.Spec.NodeSelector, workers.Spec.Template.Spec.Affinity) - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { runtime, err := t.getRuntime() if err != nil { return err } - runtimeToUpdate := runtime.DeepCopy() - if reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) { - t.Log.V(1).Info("The runtime is equal after deepcopy") - } - // todo: maybe set query shell in runtime - // 0. Update the cache status - if len(runtime.Status.CacheStates) == 0 { - runtimeToUpdate.Status.CacheStates = map[common.CacheStateName]string{} + if runtimeToUpdate.Status.FusePhase == datav1alpha1.RuntimePhaseReady { + return nil } - // set node affinity - runtimeToUpdate.Status.CacheAffinity = workerNodeAffinity - - runtimeToUpdate.Status.CacheStates[common.CacheCapacity] = "N/A" - runtimeToUpdate.Status.CacheStates[common.CachedPercentage] = "N/A" - runtimeToUpdate.Status.CacheStates[common.Cached] = "N/A" - runtimeToUpdate.Status.CacheStates[common.CacheHitRatio] = "N/A" - runtimeToUpdate.Status.CacheStates[common.CacheThroughputRatio] = "N/A" - - runtimeToUpdate.Status.WorkerNumberReady = int32(workers.Status.ReadyReplicas) - runtimeToUpdate.Status.WorkerNumberUnavailable = int32(*workers.Spec.Replicas - workers.Status.ReadyReplicas) - runtimeToUpdate.Status.WorkerNumberAvailable = int32(workers.Status.CurrentReplicas) - if runtime.Replicas() == 0 { - runtimeToUpdate.Status.WorkerPhase = data.RuntimePhaseReady - workerReady = true - } else if workers.Status.ReadyReplicas > 0 { - if runtime.Replicas() == workers.Status.ReadyReplicas { - runtimeToUpdate.Status.WorkerPhase = data.RuntimePhaseReady - workerReady = true - } else if workers.Status.ReadyReplicas >= 1 { - runtimeToUpdate.Status.WorkerPhase = data.RuntimePhasePartialReady - workerReady = true + runtimeToUpdate.Status.FusePhase = datav1alpha1.RuntimePhaseReady + + if len(runtime.Status.CacheStates) == 0 { + runtimeToUpdate.Status.CacheStates = map[common.CacheStateName]string{ + common.CacheCapacity: "N/A", + common.CachedPercentage: "N/A", + common.Cached: "N/A", + common.CacheHitRatio: "N/A", + common.CacheThroughputRatio: "N/A", } - } else { - runtimeToUpdate.Status.WorkerPhase = data.RuntimePhaseNotReady } - if workerReady { - ready = true + runtimeToUpdate.Status.ValueFileConfigmap = "N/A" + if t.ifRuntimeHelmValueEnable() { + runtimeToUpdate.Status.ValueFileConfigmap = t.getHelmValuesConfigMapName() } // Update the setup time of thinFS runtime - if ready && runtimeToUpdate.Status.SetupDuration == "" { + if runtimeToUpdate.Status.SetupDuration == "" { runtimeToUpdate.Status.SetupDuration = utils.CalculateDuration(runtimeToUpdate.CreationTimestamp.Time, time.Now()) } - var statusMountsToUpdate []data.Mount + // update mount status + var statusMountsToUpdate []datav1alpha1.Mount for _, mount := range dataset.Status.Mounts { optionExcludedMount := mount.DeepCopy() optionExcludedMount.EncryptOptions = nil @@ -110,68 +75,27 @@ func (t *ThinEngine) CheckAndUpdateRuntimeStatus() (ready bool, err error) { statusMountsToUpdate = append(statusMountsToUpdate, *optionExcludedMount) } runtimeToUpdate.Status.Mounts = statusMountsToUpdate - runtimeToUpdate.Status.ValueFileConfigmap = t.getHelmValuesConfigMapName() - if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) { - t.Log.V(1).Info("Update RuntimeStatus", "runtime", fmt.Sprintf("%s/%s", runtime.GetNamespace(), runtime.GetName())) - err = t.Client.Status().Update(context.TODO(), runtimeToUpdate) - if err != nil { - t.Log.Error(err, "Failed to update the runtime") - } - } else { - t.Log.Info("Do nothing because the runtime status is not changed.") + // update condition + if len(runtimeToUpdate.Status.Conditions) == 0 { + runtimeToUpdate.Status.Conditions = []datav1alpha1.RuntimeCondition{} } + cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeWorkersInitialized, datav1alpha1.RuntimeWorkersInitializedReason, + "The fuse is initialized.", corev1.ConditionTrue) + runtimeToUpdate.Status.Conditions = + utils.UpdateRuntimeCondition(runtimeToUpdate.Status.Conditions, + cond) - return err - }) - - return -} - -func (t *ThinEngine) UpdateRuntimeSetConfigIfNeeded() (updated bool, err error) { - fuseAddresses, err := t.Helper.GetIpAddressesOfFuse() - if err != nil { - return - } - - workerAddresses, err := t.Helper.GetIpAddressesOfWorker() - if err != nil { - return - } - - configMapName := t.runtimeInfo.GetName() + "-runtimeset" - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - cm, err := kubeclient.GetConfigmapByName(t.Client, configMapName, t.namespace) - if err != nil { - return err - } - - if cm == nil { - t.Log.Info("configmap is not found", "key", configMapName) - return nil - } - - cmToUpdate := cm.DeepCopy() - result, err := t.toRuntimeSetConfig(workerAddresses, - fuseAddresses) - if err != nil { - return err - } - cmToUpdate.Data["runtime.json"] = result - - if !reflect.DeepEqual(cm, cmToUpdate) { - err = t.Client.Update(context.TODO(), cmToUpdate) - if err != nil { - t.Log.Error(err, "Failed to update the ip addresses of runtime") - } - updated = true - } else { - t.Log.Info("Do nothing because the ip addresses of runtime are not changed.") + if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) { + return t.Client.Status().Update(context.TODO(), runtimeToUpdate) } return nil - }) - return + if err != nil { + t.Log.Error(err, "Update runtime status") + return false, err + } + return true, nil } diff --git a/pkg/ddc/thin/status_test.go b/pkg/ddc/thin/status_test.go index 50bb5d9cb97..cf5f9efe54a 100644 --- a/pkg/ddc/thin/status_test.go +++ b/pkg/ddc/thin/status_test.go @@ -17,14 +17,9 @@ package thin import ( - "context" - "reflect" "testing" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/common" - ctrlhelper "github.com/fluid-cloudnative/fluid/pkg/ctrl" - "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" . "github.com/smartystreets/goconvey/convey" @@ -33,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" ) @@ -278,321 +272,3 @@ func TestThinEngine_CheckAndUpdateRuntimeStatus(t *testing.T) { }) } - -func TestThinEngine_UpdateRuntimeSetConfigIfNeeded(t *testing.T) { - type fields struct { - worker *appsv1.StatefulSet - pods []*corev1.Pod - ds *appsv1.DaemonSet - nodes []*corev1.Node - name string - namespace string - } - testcases := []struct { - name string - fields fields - configMap *corev1.ConfigMap - want string - wantUpdated bool - }{ - { - name: "create", - fields: fields{ - name: "spark", - namespace: "big-data", - worker: &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "spark-worker", - Namespace: "big-data", - UID: "uid1", - }, - Spec: appsv1.StatefulSetSpec{}, - }, - pods: []*corev1.Pod{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "spark-worker-0", - Namespace: "big-data", - OwnerReferences: []metav1.OwnerReference{{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - Name: "spark-worker", - UID: "uid1", - Controller: ptr.To(true), - }}, - Labels: map[string]string{ - "app": "thin", - "role": "thin-worker", - "fluid.io/dataset": "big-data-spark", - }, - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - }}, - nodes: []*corev1.Node{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node1", - Labels: map[string]string{ - "fluid.io/f-big-data-spark": "true", - }, - }, Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: "192.168.0.1", - }, - }, - }, - }, { - ObjectMeta: metav1.ObjectMeta{ - Name: "node2", - Labels: map[string]string{ - "fluid.io/f-big-data-spark": "true", - "fluid.io/s-big-data-spark": "true", - "fluid.io/s-thin-big-data-spark": "true", - }, - }, Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: "192.168.0.2", - }, - }, - }, - }}, - }, - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "spark-runtimeset", - Namespace: "big-data", - }, Data: map[string]string{ - "runtime.json": "", - }, - }, want: "{\"workers\":[\"192.168.0.2\"],\"fuses\":[\"192.168.0.1\",\"192.168.0.2\"]}", - wantUpdated: true, - }, - { - name: "nochange_configmap", - fields: fields{ - name: "hbase", - namespace: "big-data", - worker: &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase-worker", - Namespace: "big-data", - UID: "uid2", - }, - Spec: appsv1.StatefulSetSpec{}, - }, - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase-worker-0", - Namespace: "big-data", - OwnerReferences: []metav1.OwnerReference{{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - Name: "hbase-worker", - UID: "uid2", - Controller: ptr.To(true), - }}, - Labels: map[string]string{ - "app": "thin", - "role": "thin-worker", - "fluid.io/dataset": "big-data-hbase", - }, - }, - Spec: corev1.PodSpec{NodeName: "node3"}, - }, - }, - nodes: []*corev1.Node{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node3", - Labels: map[string]string{ - "fluid.io/f-big-data-hbase": "true", - "fluid.io/s-big-data-hbase": "true", - "fluid.io/s-thin-big-data-hbase": "true", - }, - }, - Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: "10.0.0.2", - }, - }, - }, - }, { - ObjectMeta: metav1.ObjectMeta{ - Name: "node4", - Labels: map[string]string{"fluid.io/s-default-hbase": "true", - "fluid.io/s-thin-big-data-hbase": "true"}, - }, Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: "172.17.0.9", - }, - }, - }, - }}, - }, - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase-runtimeset", - Namespace: "big-data", - }, Data: map[string]string{ - "runtime.json": "{\"workers\":[\"10.0.0.2\",\"172.17.0.9\"],\"fuses\":[\"10.0.0.2\"]}", - }, - }, want: "{\"workers\":[\"10.0.0.2\",\"172.17.0.9\"],\"fuses\":[\"10.0.0.2\"]}", - wantUpdated: false, - }, - { - name: "nomatch", - fields: fields{ - name: "hbase-a", - namespace: "big-data", - worker: &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase-a-worker", - Namespace: "big-data", - UID: "uid3", - }, - Spec: appsv1.StatefulSetSpec{}, - }, - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase-a-worker-0", - Namespace: "big-data", - Labels: map[string]string{ - "app": "thin", - "role": "thin-worker", - "fluid.io/dataset": "big-data-hbase-a", - }, - }, - Spec: corev1.PodSpec{NodeName: "node5"}, - }, - }, - nodes: []*corev1.Node{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node5", - }, Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: "10.0.0.1", - }, - }, - }, - }, { - ObjectMeta: metav1.ObjectMeta{ - Name: "node6", - Labels: map[string]string{ - "fluid.io/s-default-hbase-a": "true", - }, - }, Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: "10.0.0.2", - }, - }, - }, - }}, - }, - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase-a-runtimeset", - Namespace: "big-data", - }, Data: map[string]string{ - "runtime.json": "{\"workers\":[],\"fuses\":[]}", - }, - }, want: "{\"workers\":[],\"fuses\":[]}", - wantUpdated: false, - }, - } - - runtimeObjs := []runtime.Object{} - - for _, testcase := range testcases { - runtimeObjs = append(runtimeObjs, testcase.fields.worker) - - if testcase.fields.ds != nil { - runtimeObjs = append(runtimeObjs, testcase.fields.ds) - } - for _, pod := range testcase.fields.pods { - runtimeObjs = append(runtimeObjs, pod) - } - - for _, node := range testcase.fields.nodes { - runtimeObjs = append(runtimeObjs, node) - } - - runtimeObjs = append(runtimeObjs, testcase.configMap) - - // runtimeObjs = append(runtimeObjs, testcase.fields.pods) - } - c := fake.NewFakeClientWithScheme(testScheme, runtimeObjs...) - - for _, testcase := range testcases { - engine := getTestThinEngineNode(c, testcase.fields.name, testcase.fields.namespace, true) - runtimeInfo, err := base.BuildRuntimeInfo(testcase.fields.name, - testcase.fields.namespace, - common.ThinRuntime) - if err != nil { - t.Errorf("BuildRuntimeInfo() error = %v", err) - } - - engine.Helper = ctrlhelper.BuildHelper(runtimeInfo, c, engine.Log) - updated, err := engine.UpdateRuntimeSetConfigIfNeeded() - if err != nil { - t.Errorf("Got error %t.", err) - } - - cm := corev1.ConfigMap{} - err = c.Get(context.TODO(), types.NamespacedName{ - Namespace: testcase.configMap.Namespace, - Name: testcase.configMap.Name, - }, &cm) - if err != nil { - t.Errorf("Got error %t.", err) - } - got := cm.Data["runtime.json"] - if !reflect.DeepEqual(testcase.want, got) { - t.Errorf("testcase %v UpdateRuntimeSetConfigIfNeeded()'s wanted %v, actual %v", - testcase.name, testcase.want, got) - } - - if testcase.wantUpdated != updated { - t.Errorf("testcase %v UpdateRuntimeSetConfigIfNeeded()'s wantUpdated %v, actual %v", - testcase.name, testcase.wantUpdated, updated) - } - - // 2.Try the second time to make sure it idempotent and no update - - updated, err = engine.UpdateRuntimeSetConfigIfNeeded() - if err != nil { - t.Errorf("Got error %t.", err) - } - cm = corev1.ConfigMap{} - err = c.Get(context.TODO(), types.NamespacedName{ - Namespace: testcase.configMap.Namespace, - Name: testcase.configMap.Name, - }, &cm) - if err != nil { - t.Errorf("Got error %t.", err) - } - got = cm.Data["runtime.json"] - if !reflect.DeepEqual(testcase.want, got) { - t.Errorf("testcase %v UpdateRuntimeSetConfigIfNeeded()'s wanted %v, actual %v", - testcase.name, testcase.want, got) - } - if updated { - t.Errorf("testcase %v UpdateRuntimeSetConfigIfNeeded()'s wantUpdated false, actual %v", - testcase.name, updated) - } - // if reflect.DeepEqual() - - } -} diff --git a/pkg/ddc/thin/transform.go b/pkg/ddc/thin/transform.go index 0fbcde64d73..1cb1b5108f2 100644 --- a/pkg/ddc/thin/transform.go +++ b/pkg/ddc/thin/transform.go @@ -53,19 +53,16 @@ func (t *ThinEngine) transform(runtime *datav1alpha1.ThinRuntime, profile *datav value.FullnameOverride = t.name value.OwnerDatasetId = utils.GetDatasetId(t.namespace, t.name, t.runtimeInfo.GetOwnerDatasetUID()) value.Owner = transformer.GenerateOwnerReferenceFromObject(runtime) - toRuntimeSetConfig, err := t.toRuntimeSetConfig(nil, nil) - if err != nil { - return - } - value.RuntimeValue = toRuntimeSetConfig // transform toleration t.transformTolerations(dataset, value) // transform the workers - err = t.transformWorkers(runtime, profile, value) - if err != nil { - return + if runtime.Spec.Worker.Enabled { + err = t.transformWorkers(runtime, profile, value) + if err != nil { + return + } } // transform the fuse diff --git a/pkg/ddc/thin/transform_config.go b/pkg/ddc/thin/transform_config.go index 7750f272771..6c899058f22 100644 --- a/pkg/ddc/thin/transform_config.go +++ b/pkg/ddc/thin/transform_config.go @@ -192,24 +192,3 @@ func (t *ThinEngine) extractVolumeMountOptions(pv *corev1.PersistentVolume) (mou return } - -func (t *ThinEngine) toRuntimeSetConfig(workers []string, fuses []string) (result string, err error) { - if workers == nil { - workers = []string{} - } - - if fuses == nil { - fuses = []string{} - } - - status := RuntimeSetConfig{ - Workers: workers, - Fuses: fuses, - } - var runtimeStr []byte - runtimeStr, err = json.Marshal(status) - if err != nil { - return - } - return string(runtimeStr), err -} diff --git a/pkg/ddc/thin/transform_test.go b/pkg/ddc/thin/transform_test.go index 11e5964aee4..b0c79c18d7d 100644 --- a/pkg/ddc/thin/transform_test.go +++ b/pkg/ddc/thin/transform_test.go @@ -23,9 +23,7 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" - "github.com/fluid-cloudnative/fluid/pkg/utils/testutil" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -235,199 +233,3 @@ func TestThinEngine_parseWorkerImage(t1 *testing.T) { }) } } - -func TestThinEngine_transformWorkers(t1 *testing.T) { - profile := &datav1alpha1.ThinRuntimeProfile{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: datav1alpha1.ThinRuntimeProfileSpec{ - FileSystemType: "test", - Worker: datav1alpha1.ThinCompTemplateSpec{ - Image: "test", - ImageTag: "v1", - ImagePullPolicy: "Always", - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - // should be inherited - corev1.ResourceCPU: resource.MustParse("100m"), - // should be overridden - corev1.ResourceMemory: resource.MustParse("2Gi"), - }, - }, - Env: []corev1.EnvVar{{ - Name: "a", - Value: "b", - }}, - NodeSelector: map[string]string{"a": "b"}, - Ports: []corev1.ContainerPort{{ - Name: "port", - ContainerPort: 8080, - }}, - NetworkMode: datav1alpha1.HostNetworkMode, - VolumeMounts: []corev1.VolumeMount{{ - Name: "a", - MountPath: "/test", - }}, - }, - Volumes: []corev1.Volume{{ - Name: "a", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{Path: "/test"}, - }, - }}, - }, - } - runtime := &datav1alpha1.ThinRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "fluid", - }, - Spec: datav1alpha1.ThinRuntimeSpec{ - ThinRuntimeProfileName: "test", - Worker: datav1alpha1.ThinCompTemplateSpec{ - Replicas: 1, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("200m"), - corev1.ResourceMemory: resource.MustParse("4Gi"), - }, - }, - Env: []corev1.EnvVar{{ - Name: "b", - ValueFrom: &corev1.EnvVarSource{ - ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: "test-cm"}, - }, - }, - }}, - NodeSelector: map[string]string{"b": "c"}, - VolumeMounts: []corev1.VolumeMount{{ - Name: "b", - MountPath: "/b", - }}, - LivenessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - }, - }, - InitialDelaySeconds: 1, - TimeoutSeconds: 1, - PeriodSeconds: 1, - SuccessThreshold: 1, - FailureThreshold: 1, - }, - ReadinessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - }, - }, - InitialDelaySeconds: 1, - TimeoutSeconds: 1, - PeriodSeconds: 1, - SuccessThreshold: 1, - FailureThreshold: 1, - }, - }, - Volumes: []corev1.Volume{{ - Name: "b", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{Path: "/b"}, - }, - }}, - }, - } - wantValue := &ThinValue{ - Worker: Worker{ - Image: "test", - ImageTag: "v1", - ImagePullPolicy: "Always", - Resources: common.Resources{ - Requests: map[corev1.ResourceName]string{ - corev1.ResourceCPU: "100m", - corev1.ResourceMemory: "1Gi", - }, - Limits: map[corev1.ResourceName]string{ - corev1.ResourceCPU: "200m", - corev1.ResourceMemory: "4Gi", - }, - }, - HostNetwork: true, - Envs: []corev1.EnvVar{{ - Name: "a", - Value: "b", - }, { - Name: "b", - ValueFrom: &corev1.EnvVarSource{ - ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: "test-cm", - }, - }, - }, - }}, - NodeSelector: map[string]string{"b": "c"}, - Ports: []corev1.ContainerPort{{ - Name: "port", - ContainerPort: 8080, - }}, - LivenessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - }, - }, - InitialDelaySeconds: 1, - TimeoutSeconds: 1, - PeriodSeconds: 1, - SuccessThreshold: 1, - FailureThreshold: 1, - }, - ReadinessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - }, - }, - InitialDelaySeconds: 1, - TimeoutSeconds: 1, - PeriodSeconds: 1, - SuccessThreshold: 1, - FailureThreshold: 1, - }, - Volumes: []corev1.Volume{{ - Name: "a", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{Path: "/test"}, - }, - }, { - Name: "b", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{Path: "/b"}, - }, - }}, - VolumeMounts: []corev1.VolumeMount{{ - Name: "a", - MountPath: "/test", - }, { - Name: "b", - MountPath: "/b", - }}, - }, - } - value := &ThinValue{} - t1.Run("test", func(t1 *testing.T) { - t := &ThinEngine{Log: fake.NullLogger()} - if err := t.transformWorkers(runtime, profile, value); err != nil { - t1.Errorf("transformWorkers() error = %v", err) - } - if !testutil.DeepEqualIgnoringSliceOrder(t1, value, wantValue) { - t1.Errorf("parseFromProfile() got = %v, want = %v", value, wantValue) - } - }) -} diff --git a/pkg/ddc/thin/type.go b/pkg/ddc/thin/type.go index 30efff3b1f2..38d216f4227 100644 --- a/pkg/ddc/thin/type.go +++ b/pkg/ddc/thin/type.go @@ -41,6 +41,7 @@ type ThinValue struct { } type Worker struct { + Enabled bool `json:"enabled,omitempty"` Image string `json:"image,omitempty"` ImageTag string `json:"imageTag,omitempty"` ImagePullPolicy string `json:"imagePullPolicy,omitempty"` diff --git a/pkg/ddc/thin/worker.go b/pkg/ddc/thin/worker.go index c3b6d127a78..d076505f0e4 100644 --- a/pkg/ddc/thin/worker.go +++ b/pkg/ddc/thin/worker.go @@ -27,6 +27,9 @@ import ( ) func (t ThinEngine) CheckWorkersReady() (ready bool, err error) { + if !t.runtime.Spec.Worker.Enabled { + return true, nil + } var ( workerName string = t.getWorkerName() namespace string = t.namespace @@ -55,6 +58,9 @@ func (t ThinEngine) CheckWorkersReady() (ready bool, err error) { } func (t ThinEngine) ShouldSetupWorkers() (should bool, err error) { + if !t.runtime.Spec.Worker.Enabled { + return false, nil + } runtime, err := t.getRuntime() if err != nil { return @@ -71,6 +77,9 @@ func (t ThinEngine) ShouldSetupWorkers() (should bool, err error) { } func (t ThinEngine) SetupWorkers() (err error) { + if !t.runtime.Spec.Worker.Enabled { + return nil + } var ( workerName string = t.getWorkerName() namespace string = t.namespace