From 61cc2a05e72559b3925bb19b9082400599c247f0 Mon Sep 17 00:00:00 2001 From: "smile.tan" Date: Wed, 10 Jul 2024 14:42:10 +0800 Subject: [PATCH] compute group finished deploy --- .../v1/doris_disaggregated_cluster_types.go | 34 ++- api/disaggregated/cluster/v1/unique_id.go | 21 +- .../cluster/v1/zz_generated.deepcopy.go | 48 +++-- .../v1/disaggregatedmetaservice_types.go | 2 +- api/disaggregated/metaservice/v1/type_util.go | 3 +- cmd/operator/main.go | 1 + .../utils/disaggregated_ms/ms_http/http.go | 6 +- .../disaggregated_ms/ms_http/metaservice.go | 1 + .../utils/disaggregated_ms/ms_meta/struct.go | 18 +- pkg/common/utils/resource/service.go | 2 +- .../disaggregated_cluster_controller.go | 200 ++++++++++++------ .../computegroups/controller.go | 62 +++++- .../computegroups/util.go | 7 + .../computegroups/util_test.go | 17 ++ .../ms/controller.go | 2 +- pkg/controller/sub_controller/events.go | 2 + 16 files changed, 315 insertions(+), 111 deletions(-) create mode 100644 pkg/controller/sub_controller/disaggregated_cluster/computegroups/util.go create mode 100644 pkg/controller/sub_controller/disaggregated_cluster/computegroups/util_test.go diff --git a/api/disaggregated/cluster/v1/doris_disaggregated_cluster_types.go b/api/disaggregated/cluster/v1/doris_disaggregated_cluster_types.go index a40d3881..541aa3a1 100644 --- a/api/disaggregated/cluster/v1/doris_disaggregated_cluster_types.go +++ b/api/disaggregated/cluster/v1/doris_disaggregated_cluster_types.go @@ -6,13 +6,12 @@ import ( ) type DorisDisaggregatedClusterSpec struct { - //TODO: give the example config. //VaultConfigmap specify the configmap that have configuration of file object information. example S3. //configmap have to config, please reference the doc. - VaultConfigmap string `json:"vaultConfigmap,omitempty"` + InstanceConfigMap string `json:"instanceConfigMap,omitempty"` //MetaService describe the metaservice that cluster want to storage metadata. - MetaService MetaService `json:"metaService,omitempty"` + DisMS DisMS `json:"disMS,omitempty"` //FeSpec describe the fe specification of doris disaggregated cluster. FeSpec FeSpec `json:"feSpec,omitempty"` @@ -21,7 +20,7 @@ type DorisDisaggregatedClusterSpec struct { ComputeGroups []ComputeGroup `json:"computeGroups,omitempty"` } -type MetaService struct { +type DisMS struct { //Namespace specify the namespace of metaservice deployed. Namespace string `json:"namespace,omitempty"` //Name specify the name of metaservice resource. @@ -199,9 +198,31 @@ type DorisDisaggregatedClusterStatus struct { //FEStatus describe the fe status. FEStatus FEStatus `json:"feStatus,omitempty"` + ClusterHealth ClusterHealth `json:"clusterHealth,omitempty"` + //ComputeGroupStatuses reflect a list of computecgroup status. ComputeGroupStatuses []ComputeGroupStatus `json:"computeGroupStatuses,omitempty"` } +type Health string + +var ( + Green Health = "green" + Yellow Health = "yellow" + Red Health = "red" +) + +type ClusterHealth struct { + //represents the cluster overall status. + Health Health `json:"health,omitempty"` + //represents the fe available or not. + FeAvailable bool `json:"feAvailable,omitempty"` + //the number of compute group. + CGCount int32 `json:"cgCount,omitempty"` + //the available numbers of compute group. + CGAvailableCount int32 `json:"cgAvailableCount,omitempty"` + //the full available numbers of compute group, represents all pod in compute group are ready. + CGFullAvailableCount int32 `json:"cgFullAvailableCount,omitempty"` +} type Phase string @@ -265,6 +286,11 @@ type FEStatus struct { // +kubebuilder:object:root=true // +kubebuilder:subresource:status // +kubebuilder:resource:shortName=ddc +// +kubebuilder:printcolumn:name="ClusterHealth",type=string,JSONPath=`.status.clusterHealth.health` +// +kubebuilder:printcolumn:name="FEPhase",type=string,JSONPath=`.status.feStatus.phase` +// +kubebuilder:printcolumn:name="CGCount",type=integer,JSONPath=`.status.clusterHealth.cgCount` +// +kubebuilder:printcolumn:name="CGAvailableCount",type=integer,JSONPath=`.status.clusterHealth.cgAvailableCount` +// +kubebuilder:printcolumn:name="CGFullAvailableCount",type=integer,JSONPath=`.status.clusterHealth.cgFullAvailableCount` // +kubebuilder:storageversion // DorisDisaggregatedCluster defined as CRD format, have type, metadata, spec, status, fields. type DorisDisaggregatedCluster struct { diff --git a/api/disaggregated/cluster/v1/unique_id.go b/api/disaggregated/cluster/v1/unique_id.go index 8a84ea22..095135c0 100644 --- a/api/disaggregated/cluster/v1/unique_id.go +++ b/api/disaggregated/cluster/v1/unique_id.go @@ -1,6 +1,7 @@ package v1 import ( + "fmt" "strings" ) @@ -9,7 +10,10 @@ please use get function to replace new function. */ func newCGStatefulsetName(ddcName /*dorisDisaggregatedCluster Name*/, cgName /*computegroup's name*/ string) string { - return ddcName + "-" + cgName + //cgName use "_", but name in kubernetes object use "-" + stName := ddcName + "-" + cgName + stName = strings.ReplaceAll(stName, "_", "-") + return stName } // RE:[a-zA-Z][0-9a-zA-Z_]+ @@ -17,9 +21,9 @@ func newCGClusterId(namespace, stsName string) string { return strings.ReplaceAll(namespace+"_"+stsName, "-", "_") } -// RE:[a-zA-Z][0-9a-zA-Z_]+ -func newCGCloudUniqueId(namespace, instanceName, statefulsetName string) string { - return strings.ReplaceAll("1:"+namespace+"_"+instanceName+":"+statefulsetName, "-", "_") +// cloudUniqueID match "1:$instanceId:[a-zA-Z]" +func newCGCloudUniqueId(instanceId, statefulsetName string) string { + return fmt.Sprintf("1:%s:%s", instanceId, statefulsetName) } func (ddc *DorisDisaggregatedCluster) GetCGStatefulsetName(cg *ComputeGroup) string { @@ -41,7 +45,7 @@ func (ddc *DorisDisaggregatedCluster) GetInstanceId() string { return ddc.Status.InstanceId } - // need config in CR. + // need config in vaultConfigMap. return "" } func (ddc *DorisDisaggregatedCluster) GetCGClusterId(cg *ComputeGroup) string { @@ -76,7 +80,8 @@ func (ddc *DorisDisaggregatedCluster) GetCGCloudUniqueId(cg *ComputeGroup) strin statefulsetName := ddc.GetCGStatefulsetName(cg) //update cg' clusterId for auto assemble, if not config. if cg.CloudUniqueId == "" { - cg.CloudUniqueId = newCGCloudUniqueId(ddc.Namespace, ddc.Name, statefulsetName) + instanceId := ddc.GetInstanceId() + cg.CloudUniqueId = newCGCloudUniqueId(instanceId, statefulsetName) } return cg.CloudUniqueId @@ -98,7 +103,9 @@ func (ddc *DorisDisaggregatedCluster) GetCGServiceName(cg *ComputeGroup) string return svcName } - return ddc.Name + "-" + cg.Name + svcName = ddc.Name + "-" + cg.Name + svcName = strings.ReplaceAll(svcName, "_", "-") + return svcName } func (ddc *DorisDisaggregatedCluster) GetFEServiceName() string { diff --git a/api/disaggregated/cluster/v1/zz_generated.deepcopy.go b/api/disaggregated/cluster/v1/zz_generated.deepcopy.go index 9c26e066..8c4bb91c 100644 --- a/api/disaggregated/cluster/v1/zz_generated.deepcopy.go +++ b/api/disaggregated/cluster/v1/zz_generated.deepcopy.go @@ -26,6 +26,21 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterHealth) DeepCopyInto(out *ClusterHealth) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterHealth. +func (in *ClusterHealth) DeepCopy() *ClusterHealth { + if in == nil { + return nil + } + out := new(ClusterHealth) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CommonSpec) DeepCopyInto(out *CommonSpec) { *out = *in @@ -175,6 +190,21 @@ func (in *ConfigMap) DeepCopy() *ConfigMap { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DisMS) DeepCopyInto(out *DisMS) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DisMS. +func (in *DisMS) DeepCopy() *DisMS { + if in == nil { + return nil + } + out := new(DisMS) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DorisDisaggregatedCluster) DeepCopyInto(out *DorisDisaggregatedCluster) { *out = *in @@ -237,7 +267,7 @@ func (in *DorisDisaggregatedClusterList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DorisDisaggregatedClusterSpec) DeepCopyInto(out *DorisDisaggregatedClusterSpec) { *out = *in - out.MetaService = in.MetaService + out.DisMS = in.DisMS in.FeSpec.DeepCopyInto(&out.FeSpec) if in.ComputeGroups != nil { in, out := &in.ComputeGroups, &out.ComputeGroups @@ -262,6 +292,7 @@ func (in *DorisDisaggregatedClusterSpec) DeepCopy() *DorisDisaggregatedClusterSp func (in *DorisDisaggregatedClusterStatus) DeepCopyInto(out *DorisDisaggregatedClusterStatus) { *out = *in out.FEStatus = in.FEStatus + out.ClusterHealth = in.ClusterHealth if in.ComputeGroupStatuses != nil { in, out := &in.ComputeGroupStatuses, &out.ComputeGroupStatuses *out = make([]ComputeGroupStatus, len(*in)) @@ -335,21 +366,6 @@ func (in *FeSpec) DeepCopy() *FeSpec { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *MetaService) DeepCopyInto(out *MetaService) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaService. -func (in *MetaService) DeepCopy() *MetaService { - if in == nil { - return nil - } - out := new(MetaService) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PersistentVolume) DeepCopyInto(out *PersistentVolume) { *out = *in diff --git a/api/disaggregated/metaservice/v1/disaggregatedmetaservice_types.go b/api/disaggregated/metaservice/v1/disaggregatedmetaservice_types.go index 30d7888c..875a9fe7 100644 --- a/api/disaggregated/metaservice/v1/disaggregatedmetaservice_types.go +++ b/api/disaggregated/metaservice/v1/disaggregatedmetaservice_types.go @@ -12,7 +12,7 @@ import ( // +kubebuilder:printcolumn:name="MSStatus",type=string,JSONPath=`.status.msStatus.phase` // +kubebuilder:printcolumn:name="RecyclerStatus",type=string,JSONPath=`.status.recyclerStatus.phase` // +kubebuilder:storageversion -// +kubebuilder:resource:shortName=ddms +// +kubebuilder:resource:shortName=ddm // DorisDisaggregatedMetaService is the Schema for the DorisDisaggregatedMetaServices API type DorisDisaggregatedMetaService struct { metav1.TypeMeta `json:",inline"` diff --git a/api/disaggregated/metaservice/v1/type_util.go b/api/disaggregated/metaservice/v1/type_util.go index f9c24ef2..fdf0fd11 100644 --- a/api/disaggregated/metaservice/v1/type_util.go +++ b/api/disaggregated/metaservice/v1/type_util.go @@ -126,6 +126,5 @@ func IsReconcilingStatusPhase(c MetaServicePhase) bool { } func (ddm *DorisDisaggregatedMetaService) GetMSServiceName() string { - //TODO: cancel after merge code - return "" + return GenerateCommunicateServiceName(ddm, Component_MS) } diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 89df17ca..e317b276 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -77,6 +77,7 @@ func init() { //add foundationdb scheme utilruntime.Must(v1beta2.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme + controller.Controllers = append(controller.Controllers, &controller.DorisClusterReconciler{}, &unnamedwatches.WResource{}, &controller.DisaggregatedClusterReconciler{}, &controller.DisaggregatedMetaServiceReconciler{}) } diff --git a/pkg/common/utils/disaggregated_ms/ms_http/http.go b/pkg/common/utils/disaggregated_ms/ms_http/http.go index 357dc63a..6244eb23 100644 --- a/pkg/common/utils/disaggregated_ms/ms_http/http.go +++ b/pkg/common/utils/disaggregated_ms/ms_http/http.go @@ -10,9 +10,9 @@ import ( ) const ( - CREATE_INSTANCE_PREFIX_TEMPLATE = `%s/MetaService/http/create_instance?token=%s` - DELETE_INSTANCE_PREFIX_TEMPLATE = `%s/MetaService/http/drop_instance?token=%s` - GET_INSTANCE_PREFIX_TEMPLATE = `%s/MetaService/http/get_instance?token=%s&instance_id=%s` + CREATE_INSTANCE_PREFIX_TEMPLATE = `http://%s/MetaService/http/create_instance?token=%s` + DELETE_INSTANCE_PREFIX_TEMPLATE = `http://%s/MetaService/http/drop_instance?token=%s` + GET_INSTANCE_PREFIX_TEMPLATE = `http://%s/MetaService/http/get_instance?token=%s&instance_id=%s` ) //realize the metaservice interface diff --git a/pkg/common/utils/disaggregated_ms/ms_http/metaservice.go b/pkg/common/utils/disaggregated_ms/ms_http/metaservice.go index f24716f9..4e09e7de 100644 --- a/pkg/common/utils/disaggregated_ms/ms_http/metaservice.go +++ b/pkg/common/utils/disaggregated_ms/ms_http/metaservice.go @@ -8,6 +8,7 @@ type MSResponse struct { const ( SuccessCode string = "OK" + ALREADY_EXIST string = "ALREADY_EXISTED" NotFound string = "NOT_FOUND" INTERNAL_ERROR string = "INTERNAL_ERROR" ) diff --git a/pkg/common/utils/disaggregated_ms/ms_meta/struct.go b/pkg/common/utils/disaggregated_ms/ms_meta/struct.go index 804984b3..2b44a5f7 100644 --- a/pkg/common/utils/disaggregated_ms/ms_meta/struct.go +++ b/pkg/common/utils/disaggregated_ms/ms_meta/struct.go @@ -1,10 +1,15 @@ package ms_meta +// vault +const ( + Instance_id string = "instance_id" + Name string = "name" + User_id string = "user_id" + Vault string = "vault" +) + // S3 const ( - Instance_id string = "instance_id" - Instance_name string = "name" - User_id string = "user_id" Obj_info string = "obj_info" Obj_info_ak string = "ak" Obj_info_sk string = "sk" @@ -14,10 +19,9 @@ const ( Obj_info_region string = "region" Obj_info_external_endpoint string = "external_endpoint" Obj_info_provider string = "provider" - Obj_info_user_id string = "user_id" - Ram_user string = "ram_user" - Ram_user_ak string = "ak" - Ram_user_sk string = "sk" + //Ram_user string = "ram_user" + //Ram_user_ak string = "ak" + //Ram_user_sk string = "sk" ) // HDFS diff --git a/pkg/common/utils/resource/service.go b/pkg/common/utils/resource/service.go index ad321fd5..ae00f3ce 100644 --- a/pkg/common/utils/resource/service.go +++ b/pkg/common/utils/resource/service.go @@ -278,7 +278,7 @@ func GetPortKey(configKey string) string { case BROKER_IPC_PORT: return strings.ReplaceAll(BROKER_IPC_PORT, "_", "-") case BRPC_LISTEN_PORT: - return "ms-brpc-port" + return "brpc-port" default: return "" } diff --git a/pkg/controller/disaggregated_cluster_controller.go b/pkg/controller/disaggregated_cluster_controller.go index bced4a82..7c3d9dfe 100644 --- a/pkg/controller/disaggregated_cluster_controller.go +++ b/pkg/controller/disaggregated_cluster_controller.go @@ -10,6 +10,7 @@ import ( "github.com/selectdb/doris-operator/pkg/common/utils/disaggregated_ms/ms_http" "github.com/selectdb/doris-operator/pkg/common/utils/disaggregated_ms/ms_meta" "github.com/selectdb/doris-operator/pkg/common/utils/hash" + "github.com/selectdb/doris-operator/pkg/common/utils/resource" sc "github.com/selectdb/doris-operator/pkg/controller/sub_controller" dcgs "github.com/selectdb/doris-operator/pkg/controller/sub_controller/disaggregated_cluster/computegroups" dfe "github.com/selectdb/doris-operator/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe" @@ -43,7 +44,7 @@ var ( const ( ms_http_token_key = "http_token" - object_info_key = "vault" + instance_conf_key = "instance.conf" ms_conf_name = "selectdb_cloud.conf" ) @@ -63,7 +64,7 @@ type DisaggregatedClusterReconciler struct { } func (dc *DisaggregatedClusterReconciler) Init(mgr ctrl.Manager, options *Options) { - dc.instanceMeta = make(map[string]interface{}) + im := make(map[string]interface{}) scs := make(map[string]sc.DisaggregatedSubController) dfec := dfe.New(mgr) scs[dfec.GetControllerName()] = dfec @@ -71,9 +72,10 @@ func (dc *DisaggregatedClusterReconciler) Init(mgr ctrl.Manager, options *Option scs[dcgsc.GetControllerName()] = dcgsc if err := (&DisaggregatedClusterReconciler{ - Client: mgr.GetClient(), - Recorder: mgr.GetEventRecorderFor(disaggregatedClusterController), - Scs: scs, + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor(disaggregatedClusterController), + Scs: scs, + instanceMeta: im, }).SetupWithManager(mgr); err != nil { klog.Error(err, "unable to create controller ", "disaggregatedClusterReconciler") os.Exit(1) @@ -182,8 +184,7 @@ func (dc *DisaggregatedClusterReconciler) resourceBuilder(builder *ctrl.Builder) // 1. check and register instance info. info register in memory. periodical sync. // 2. sync resource. // 3. clear need delete resource. -// 4. reorganize status. -// 5. update cr or status. +// 4. display new status(eorganize status, update cr or status) func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { var ddc dv1.DorisDisaggregatedCluster err := dc.Get(ctx, req.NamespacedName, &ddc) @@ -191,70 +192,109 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec klog.Warningf("disaggreatedClusterReconciler not find resource DorisDisaggregatedCluster namespaceName %s", req.NamespacedName) return ctrl.Result{}, nil } + hv := hash.HashObject(ddc.Spec) - if err = dc.getMSInfo(ctx, &ddc); err != nil { + if err = dc.setStatusMSInfo(ctx, &ddc); err != nil { return ctrl.Result{}, err } - //TODO: wait interface fixed. - event, err := dc.ApplyInstanceInfo(ctx, &ddc) - if event != nil { - dc.Recorder.Event(&ddc, string(event.Type), string(event.Reason), string(event.Message)) - } - if err != nil { + var res ctrl.Result + //get instance config, validating config, display some instance info in DorisDisaggregatedCluster, apply instance info into ms. + if res, err = func() (ctrl.Result, error) { + instanceConf, err := dc.getInstanceConfig(ctx, &ddc) + if err != nil { + return ctrl.Result{}, err + } + + if err := dc.validateInstanceInfo(instanceConf); err != nil { + return ctrl.Result{}, err + } + + //display InstanceInfo in DorisDisaggregatedCluster + dc.displayInstanceInfo(instanceConf, &ddc) + + //TODO: wait interface fixed. realize update ak,sk. + event, err := dc.ApplyInstanceMeta(ddc.Status.MsEndpoint, ddc.Status.MsToken, instanceConf) + if event != nil { + dc.Recorder.Event(&ddc, string(event.Type), string(event.Reason), event.Message) + } return ctrl.Result{}, err + }(); err != nil { + return res, err } + //sync resource. - if res, err := dc.reconcileSub(ctx, &ddc); err != nil { + if res, err = dc.reconcileSub(ctx, &ddc); err != nil { return res, err } // clear unused resources. - if res, err := dc.clearUnusedResources(ctx, &ddc); err != nil { + if res, err = dc.clearUnusedResources(ctx, &ddc); err != nil { return res, err } - //reorganize status. - if res, err := dc.reorganizeStatus(&ddc); err != nil { - return res, err - } + //display new status. + res, err = func() (ctrl.Result, error) { + //reorganize status. + if res, err = dc.reorganizeStatus(&ddc); err != nil { + return res, err + } - //update cr or status - if res, err := dc.updateObjectORStatus(ctx, &ddc); err != nil { - return res, err - } + //update cr or status + if res, err = dc.updateObjectORStatus(ctx, &ddc, hv); err != nil { + return res, err + } - return ctrl.Result{}, nil + return ctrl.Result{}, nil + }() + + return res, err +} + +func (dc *DisaggregatedClusterReconciler) displayInstanceInfo(instanceConf map[string]interface{}, ddc *dv1.DorisDisaggregatedCluster) { + instanceId := (instanceConf[ms_meta.Instance_id]).(string) + ddc.Status.InstanceId = instanceId } -func (dc *DisaggregatedClusterReconciler) ApplyInstanceInfo(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (*sc.Event, error) { - if ddc.Spec.VaultConfigmap == "" { - return &sc.Event{Type: sc.EventWarning, Reason: sc.ObjectInfoInvalid, Message: "vaultConfigmap should config a configMap that have object info."}, errors.New("vaultConfigmap for object info should be specified") +func (dc *DisaggregatedClusterReconciler) getInstanceConfig(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (map[string]interface{}, error) { + if ddc.Spec.InstanceConfigMap == "" { + dc.Recorder.Event(ddc, string(sc.EventWarning), string(sc.ObjectInfoInvalid), "vaultConfigmap should config a configMap that have object info.") + return nil, errors.New("vaultConfigmap for object info should be specified") } - cmName := ddc.Spec.VaultConfigmap + cmName := ddc.Spec.InstanceConfigMap var cm corev1.ConfigMap if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: cmName}, &cm); err != nil { - return &sc.Event{Type: sc.EventWarning, Reason: sc.ObjectInfoInvalid, Message: fmt.Sprintf("name %s configmap get failed, err=%s", cmName, err.Error())}, err + dc.Recorder.Event(ddc, string(sc.EventWarning), string(sc.ObjectInfoInvalid), fmt.Sprintf("name %s configmap get failed, err=%s", cmName, err.Error())) + return nil, err } - if _, ok := cm.Data[object_info_key]; !ok { - return &sc.Event{Type: sc.EventWarning, Reason: sc.ObjectInfoInvalid, Message: fmt.Sprintf("%s configmap data have not config key %s for object info.", cmName, object_info_key)}, errors.New(fmt.Sprintf("%s configmap data have not config key %s for object info.", cmName, object_info_key)) + if _, ok := cm.Data[instance_conf_key]; !ok { + dc.Recorder.Event(ddc, string(sc.EventWarning), string(sc.ObjectInfoInvalid), fmt.Sprintf("%s configmap data have not config key %s for object info.", cmName, instance_conf_key)) + return nil, errors.New(fmt.Sprintf("%s configmap data have not config key %s for object info.", cmName, instance_conf_key)) } - v := cm.Data[object_info_key] + v := cm.Data[instance_conf_key] instance := map[string]interface{}{} err := json.Unmarshal([]byte(v), &instance) if err != nil { - return &sc.Event{Type: sc.EventWarning, Reason: sc.ObjectInfoInvalid, Message: fmt.Sprintf("json unmarshal error=%s", err.Error())}, err + dc.Recorder.Event(ddc, string(sc.EventWarning), string(sc.ObjectInfoInvalid), fmt.Sprintf("json unmarshal error=%s", err.Error())) + return nil, err } - if err := dc.validateVaultInfo(instance); err != nil { - return &sc.Event{Type: sc.EventWarning, Reason: sc.ObjectInfoInvalid, Message: "validate failed, " + err.Error()}, err + + return instance, nil +} + +func (dc *DisaggregatedClusterReconciler) ApplyInstanceMeta(endpoint, token string, instanceConf map[string]interface{}) (*sc.Event, error) { + instanceId := (instanceConf[ms_meta.Instance_id]).(string) + event, err := dc.CreateOrUpdateObjectMeta(endpoint, token, instanceConf) + if err != nil { + return event, err } - endpoint := ddc.Status.MsEndpoint - token := ddc.Status.MsToken - return dc.CreateOrUpdateObjectInfo(endpoint, token, instance) + // store instance info for next update ak, sk etc... + dc.instanceMeta[instanceId] = instanceConf + return nil, nil } func (dc *DisaggregatedClusterReconciler) isModified(instance map[string]interface{}) bool { @@ -264,16 +304,18 @@ func (dc *DisaggregatedClusterReconciler) isModified(instance map[string]interfa func (dc *DisaggregatedClusterReconciler) haveCreated(instanceId string) bool { _, ok := dc.instanceMeta[instanceId] + //TODO: get from ms check return ok } -func (dc *DisaggregatedClusterReconciler) CreateOrUpdateObjectInfo(endpoint, token string, instance map[string]interface{}) (*sc.Event, error) { +func (dc *DisaggregatedClusterReconciler) CreateOrUpdateObjectMeta(endpoint, token string, instance map[string]interface{}) (*sc.Event, error) { idv := instance[ms_meta.Instance_id] instanceId := idv.(string) if !dc.haveCreated(instanceId) { return dc.createObjectInfo(endpoint, token, instance) } + // if not match in memory, should compare with ms. if !dc.isModified(instance) { return nil, nil } @@ -287,14 +329,36 @@ func (dc *DisaggregatedClusterReconciler) createObjectInfo(endpoint, token strin if err != nil { return &sc.Event{Type: sc.EventWarning, Reason: sc.MSInteractError, Message: err.Error()}, errors.New("createObjectInfo failed, err " + err.Error()) } - if mr.Code != ms_http.SuccessCode { + if mr.Code != ms_http.SuccessCode && mr.Code != ms_http.ALREADY_EXIST { return &sc.Event{Type: sc.EventWarning, Reason: sc.ObjectConfigError, Message: mr.Msg}, errors.New("createObjectInfo " + mr.Code + mr.Msg) } return &sc.Event{Type: sc.EventNormal, Reason: sc.InstanceMetaCreated}, nil } -func (dc *DisaggregatedClusterReconciler) validateVaultInfo(instance map[string]interface{}) error { - if obj, ok := instance[ms_meta.Obj_info]; ok { +func (dc *DisaggregatedClusterReconciler) validateInstanceInfo(instanceConf map[string]interface{}) error { + idv := instanceConf[ms_meta.Instance_id] + if idv == nil { + return errors.New("not config instance id") + } + id, ok := idv.(string) + if !ok || id == "" { + return errors.New("not config instance id") + } + return dc.validateVaultInfo(instanceConf) +} + +func (dc *DisaggregatedClusterReconciler) validateVaultInfo(instanceConf map[string]interface{}) error { + vi := instanceConf[ms_meta.Vault] + if vi == nil { + return errors.New("have not vault config") + } + + vault, ok := vi.(map[string]interface{}) + if !ok { + return errors.New("vault not json format") + } + + if obj, ok := vault[ms_meta.Obj_info]; ok { objMap, ok := obj.(map[string]interface{}) if !ok { return errors.New("obj_info not json format") @@ -303,7 +367,7 @@ func (dc *DisaggregatedClusterReconciler) validateVaultInfo(instance map[string] return dc.validateS3(objMap) } - if i, ok := instance[ms_meta.Key_hdfs_info]; ok { + if i, ok := vault[ms_meta.Key_hdfs_info]; ok { hdfsMap, ok := i.(map[string]interface{}) if !ok { return errors.New("hdfs not json format") @@ -365,21 +429,20 @@ func (dc *DisaggregatedClusterReconciler) validateString(m map[string]interface{ return nil } -func (dc *DisaggregatedClusterReconciler) getMSInfo(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) error { +func (dc *DisaggregatedClusterReconciler) setStatusMSInfo(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) error { if ddc.Status.MsEndpoint != "" && ddc.Status.MsToken != "" { return nil } var ddms dmsv1.DorisDisaggregatedMetaService - msNamespace := ddc.Spec.MetaService.Namespace - msName := ddc.Spec.MetaService.Name + msNamespace := ddc.Spec.DisMS.Namespace + msName := ddc.Spec.DisMS.Name if err := dc.Get(ctx, types.NamespacedName{Namespace: msNamespace, Name: msName}, &ddms); err != nil { klog.Errorf("disaggregatedClusterReconciler getMSInfo namespace %s name %s faild, err=%s", msNamespace, msName, err.Error()) dc.Recorder.Event(ddc, string(sc.EventWarning), string(sc.DisaggregatedMetaServiceGetFailed), fmt.Sprintf("namespace %s name %s get failed,err%s", msNamespace, msName, err.Error())) return err } - //TODO: get metaservice serviceName msSvcName := ddms.GetMSServiceName() msPort := dmsv1.MsPort msEndpoint := msSvcName + "." + msNamespace + ":" + msPort @@ -398,8 +461,7 @@ func (dc *DisaggregatedClusterReconciler) getMSInfo(ctx context.Context, ddc *dv continue } - if _, ok := kcm.Data[ms_conf_name]; ok { - v := kcm.Data[ms_conf_name] + if v, ok := kcm.Data[ms_conf_name]; ok { viper.ReadConfig(bytes.NewBuffer([]byte(v))) mscvs = viper.AllSettings() break @@ -409,13 +471,16 @@ func (dc *DisaggregatedClusterReconciler) getMSInfo(ctx context.Context, ddc *dv token := v.(string) ddc.Status.MsToken = token } + if v, ok := mscvs[resource.BRPC_LISTEN_PORT]; ok { + msPort = v.(string) + } return nil } func (dc *DisaggregatedClusterReconciler) clearUnusedResources(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { - for _, sc := range dc.Scs { - sc.ClearResources(ctx, ddc) + for _, subC := range dc.Scs { + subC.ClearResources(ctx, ddc) } return ctrl.Result{}, nil @@ -429,13 +494,20 @@ func (dc *DisaggregatedClusterReconciler) reorganizeStatus(ddc *dv1.DorisDisaggr return requeueIfError(err) } } + + ddc.Status.ClusterHealth.Health = dv1.Green + if ddc.Status.FEStatus.AvailableStatus != dv1.Available || ddc.Status.ClusterHealth.CGAvailableCount <= (ddc.Status.ClusterHealth.CGCount/2) { + ddc.Status.ClusterHealth.Health = dv1.Red + } else if ddc.Status.ClusterHealth.CGAvailableCount < ddc.Status.ClusterHealth.CGCount { + ddc.Status.ClusterHealth.Health = dv1.Yellow + } return ctrl.Result{}, nil } func (dc *DisaggregatedClusterReconciler) reconcileSub(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { - for _, sc := range dc.Scs { - if err := sc.Sync(ctx, ddc); err != nil { - klog.Errorf("disaggreatedClusterReconciler sub reconciler %s sync err=%s.", sc.GetControllerName(), err.Error()) + for _, subC := range dc.Scs { + if err := subC.Sync(ctx, ddc); err != nil { + klog.Errorf("disaggreatedClusterReconciler sub reconciler %s sync err=%s.", subC.GetControllerName(), err.Error()) return ctrl.Result{}, err } } @@ -444,15 +516,15 @@ func (dc *DisaggregatedClusterReconciler) reconcileSub(ctx context.Context, ddc } // when spec revert by operator should update cr or directly update status. -func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { - old_hv := ddc.Annotations[dv1.DisaggregatedSpecHashValueAnnotation] - hv := hash.HashObject(ddc.Spec) - if ddc.Annotations == nil { - ddc.Annotations = map[string]string{dv1.DisaggregatedSpecHashValueAnnotation: hv} - } - if old_hv != hv { - ddc.Annotations[dv1.DisaggregatedSpecHashValueAnnotation] = hv - //TODO: test status updated or not. +func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster, preHv string) (ctrl.Result, error) { + postHv := hash.HashObject(ddc.Spec) + if preHv != postHv { + var eddc dv1.DorisDisaggregatedCluster + if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err == nil || !apierrors.IsNotFound(err) { + if eddc.ResourceVersion != "" { + ddc.ResourceVersion = eddc.ResourceVersion + } + } if err := dc.Update(ctx, ddc); err != nil { klog.Errorf("disaggreatedClusterReconciler update DorisDisaggregatedCluster namespace %s name %s failed, err=%s", ddc.Namespace, ddc.Name, err.Error()) return ctrl.Result{}, err diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index dc94da11..3e53d09e 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "regexp" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sync" @@ -49,11 +50,11 @@ func (dccs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj return nil } - //check compute group config duplicated or not. - if dupl, duplicate := dccs.validateDuplicated(ddc.Spec.ComputeGroups); duplicate { - klog.Errorf("disaggregatedComputeGroupsController computeGroups have duplicate unique identifier %s.", dupl) - dccs.k8sRecorder.Eventf(ddc, string(sc.EventWarning), string(sc.CGUniqueIdentifierDuplicate), "unique identifer "+dupl+" duplicate in compute groups.") - return errors.New(dupl + " is duplicated in computegroups") + // validating compute group information. + if event, res := dccs.validateComputeGroup(ddc.Spec.ComputeGroups); !res { + klog.Errorf("disaggregatedComputeGroupsController namespace=%s name=%s validateComputeGroup have not match specifications %s.", ddc.Namespace, ddc.Name, sc.EventString(event)) + dccs.k8sRecorder.Eventf(ddc, string(event.Type), string(event.Reason), event.Message) + return errors.New("validating cg failed") } cgs := ddc.Spec.ComputeGroups @@ -71,6 +72,21 @@ func (dccs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj return nil } +// validate compute group config information. +func (dccs *DisaggregatedComputeGroupsController) validateComputeGroup(cgs []dv1.ComputeGroup) (*sc.Event, bool) { + if dupl, duplicate := dccs.validateDuplicated(cgs); duplicate { + klog.Errorf("disaggregatedComputeGroupsController validateComputeGroup validate Duplicated have duplicate unique identifier %s.", dupl) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGUniqueIdentifierDuplicate, Message: "unique identifier " + dupl + " duplicate in compute groups."}, false + } + + if reg, res := dccs.validateRegex(cgs); !res { + klog.Errorf("disaggregatedComputeGroupsController validateComputeGroup validateRegex %s have not match regular expression", reg) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGUniqueIdentifierNotMatchRegex, Message: reg}, false + } + + return nil, true +} + func (dccs *DisaggregatedComputeGroupsController) feAvailable(ddc *dv1.DorisDisaggregatedCluster) bool { //if fe deploy in k8s, should wait fe available //1. wait for fe ok. @@ -238,7 +254,28 @@ func (dccs *DisaggregatedComputeGroupsController) validateDuplicated(cgs []dv1.C return ds, true } +// checking the cg name compliant with regular expression or not. +func (dccs *DisaggregatedComputeGroupsController) validateRegex(cgs []dv1.ComputeGroup) (string, bool) { + var regStr = "" + for _, cg := range cgs { + res, err := regexp.Match(compute_group_name_regex, []byte(cg.Name)) + if !res { + regStr = regStr + cg.Name + " not match " + compute_group_name_regex + } + //for debugging, output the error in log + if err != nil { + klog.Errorf("disaggregatedComputeGroupsController validateRegex cg name %s failed, err=%s", cg.Name, err.Error()) + } + } + if regStr != "" { + return regStr, false + } + + return "", true +} + // validate the name of compute group is duplicated or not in computegroups. +// the cg name must be configured. func validateCGNameDuplicated(cgs []dv1.ComputeGroup) (string, bool) { ss := set.NewSetString() for _, cg := range cgs { @@ -251,6 +288,7 @@ func validateCGNameDuplicated(cgs []dv1.ComputeGroup) (string, bool) { return "", false } +// if cluster id have already configured, checking repeating or not. if not configured ignoring check. func validateCGClusterIdDuplicated(cgs []dv1.ComputeGroup) (string, bool) { scids := set.NewSetString() for _, cg := range cgs { @@ -262,6 +300,7 @@ func validateCGClusterIdDuplicated(cgs []dv1.ComputeGroup) (string, bool) { return "", false } +// if cloudUniqueId have configured, checking repeating or not. if not configured ignoring check. func validateCGCloudUniqueIdDuplicated(cgs []dv1.ComputeGroup) (string, bool) { scuids := set.NewSetString() for _, cg := range cgs { @@ -345,6 +384,19 @@ func (dccs *DisaggregatedComputeGroupsController) UpdateComponentStatus(obj clie } } + var fullAvailableCount int32 + var availableCount int32 + for _, cgs := range ddc.Status.ComputeGroupStatuses { + if cgs.Phase == dv1.Ready { + fullAvailableCount++ + } + if cgs.AvailableReplicas > 0 { + availableCount++ + } + } + ddc.Status.ClusterHealth.CGCount = int32(len(ddc.Status.ComputeGroupStatuses)) + ddc.Status.ClusterHealth.CGFullAvailableCount = fullAvailableCount + ddc.Status.ClusterHealth.CGAvailableCount = availableCount if errMs == "" { return nil } diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/util.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/util.go new file mode 100644 index 00000000..0add66e0 --- /dev/null +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/util.go @@ -0,0 +1,7 @@ +package computegroups + +// regex +var ( + compute_group_name_regex = "[a-zA-Z](_?[0-9a-zA-Z])*" + compute_group_id_regex = "[a-zA-Z](_?[0-9a-zA-Z])*" +) diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/util_test.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/util_test.go new file mode 100644 index 00000000..204a8bc2 --- /dev/null +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/util_test.go @@ -0,0 +1,17 @@ +package computegroups + +import ( + "regexp" + "testing" +) + +func Test_Regex(t *testing.T) { + tns := []string{"test", "test_name", "test_", "test1", "testNa", "1test"} + rns := []bool{true, true, false, true, true, false} + for i, n := range tns { + res, err := regexp.Match(compute_group_name_regex, []byte(n)) + if err != nil && res != rns[i] { + t.Errorf("name %s not match regex %s, err=%s, match result %t", n, compute_group_name_regex, err.Error(), res) + } + } +} diff --git a/pkg/controller/sub_controller/disaggregated_metaservice/ms/controller.go b/pkg/controller/sub_controller/disaggregated_metaservice/ms/controller.go index e235072b..a098ecba 100644 --- a/pkg/controller/sub_controller/disaggregated_metaservice/ms/controller.go +++ b/pkg/controller/sub_controller/disaggregated_metaservice/ms/controller.go @@ -56,7 +56,7 @@ func (msc *Controller) Sync(ctx context.Context, obj client.Object) error { return err } - // TODO prepareStatefulsetApply + // TODO prepareStatefulsetApply for scaling st := msc.buildMSStatefulSet(dms) if err = k8s.ApplyStatefulSet(ctx, msc.K8sclient, &st, func(new *appv1.StatefulSet, old *appv1.StatefulSet) bool { diff --git a/pkg/controller/sub_controller/events.go b/pkg/controller/sub_controller/events.go index 4624c8d7..9e955c75 100644 --- a/pkg/controller/sub_controller/events.go +++ b/pkg/controller/sub_controller/events.go @@ -31,6 +31,7 @@ var ( FDBSpecEmpty EventReason = "SpecEmpty" ComputeGroupsEmpty EventReason = "CGsEmpty" CGUniqueIdentifierDuplicate EventReason = "CGUniqueIdentifierDuplicate" + CGUniqueIdentifierNotMatchRegex EventReason = "CGUniqueIdentifierNotMatchRegex" CGCreateResourceFailed EventReason = "CGCreateResourceFailed" CGApplyResourceFailed EventReason = "CGApplyResourceFailed" CGStatusUpdateFailed EventReason = "CGStatusUpdatedFailed" @@ -41,6 +42,7 @@ var ( ObjectConfigError EventReason = "ObjectConfigError" MSInteractError EventReason = "MSInteractError" InstanceMetaCreated EventReason = "InstanceMetaCreated" + InstanceIdModified EventReason = "InstanceIdModified" ) type Event struct {