Skip to content

Commit

Permalink
Merge pull request #209 from catpineapple/fix_disaggregated_0726
Browse files Browse the repository at this point in the history
fix_disaggregated
  • Loading branch information
intelligentfu authored Jul 26, 2024
2 parents 6cf8b2f + 6a71f1e commit a60f4a0
Show file tree
Hide file tree
Showing 15 changed files with 70 additions and 150 deletions.
2 changes: 2 additions & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ header:
- 'go.mod'
- 'go.sum'
- 'licenses'
- 'api/**/zz_generated.deepcopy.go'
- 'config/crd/bases/*'
- '**/*.md'
- "**/*.json"
- "**/*.txt"
Expand Down
17 changes: 0 additions & 17 deletions api/disaggregated/cluster/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 0 additions & 17 deletions api/disaggregated/metaservice/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 0 additions & 17 deletions api/doris/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
Expand Down
17 changes: 0 additions & 17 deletions config/crd/bases/doris.selectdb.com_dorisclusters.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
Expand Down
7 changes: 7 additions & 0 deletions config/operator/disaggregated-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,13 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
Expand Down
4 changes: 2 additions & 2 deletions pkg/common/utils/resource/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewPodTemplateSpec(dcr *v1.DorisCluster, componentType v1.ComponentType) co
NodeSelector: spec.NodeSelector,
Volumes: volumes,
ServiceAccountName: spec.ServiceAccount,
Affinity: spec.Affinity,
Affinity: spec.Affinity.DeepCopy(),
Tolerations: spec.Tolerations,
HostAliases: spec.HostAliases,
InitContainers: defaultInitContainers,
Expand Down Expand Up @@ -167,7 +167,7 @@ func NewPodTemplateSpecWithCommonSpec(cs *dv1.CommonSpec, componentType dv1.Disa
ImagePullSecrets: cs.ImagePullSecrets,
NodeSelector: cs.NodeSelector,
ServiceAccountName: cs.ServiceAccount,
Affinity: cs.Affinity,
Affinity: cs.Affinity.DeepCopy(),
Tolerations: cs.Tolerations,
HostAliases: cs.HostAliases,
SecurityContext: cs.SecurityContext,
Expand Down
11 changes: 9 additions & 2 deletions pkg/common/utils/resource/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,17 @@ func NewStatefulSetWithComputeGroup(cg *dv1.ComputeGroup) *appv1.StatefulSet {

// StatefulSetDeepEqual judge two statefulset equal or not.
func StatefulSetDeepEqual(new *appv1.StatefulSet, old *appv1.StatefulSet, excludeReplicas bool) bool {
return StatefulsetDeepEqualWithAnnoKey(new, old, v1.ComponentResourceHash, excludeReplicas)
return StatefulsetDeepEqualWithOmitKey(new, old, v1.ComponentResourceHash, false, excludeReplicas)
}

func StatefulsetDeepEqualWithAnnoKey(new, old *appv1.StatefulSet, annoKey string, excludeReplicas bool) bool {
func StatefulsetDeepEqualWithOmitKey(new, old *appv1.StatefulSet, annoKey string, omit bool, excludeReplicas bool) bool {
if omit {
newHso := statefulSetHashObject(new, excludeReplicas)
newHashv := hash.HashObject(newHso)
oldHso := statefulSetHashObject(old, excludeReplicas)
oldHashv := hash.HashObject(oldHso)
return new.Namespace == old.Namespace && newHashv == oldHashv
}
var newHashv, oldHashv string
if annoKey == "" {
annoKey = v1.ComponentResourceHash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (dccs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte
}

if err := k8s.ApplyStatefulSet(ctx, dccs.k8sClient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithAnnoKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false)
return resource.StatefulsetDeepEqualWithOmitKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, true, false)
}); err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,26 @@ func (dccs *DisaggregatedComputeGroupsController) NewPodTemplateSpec(ddc *dv1.Do
pts.Spec.Volumes = append(pts.Spec.Volumes, vs...)

cgClusterId := selector[dv1.DorisDisaggregatedComputeGroupClusterId]
defAffinity := dccs.newCGDefaultAffinity(dv1.DorisDisaggregatedComputeGroupClusterId, cgClusterId)
if pts.Spec.Affinity == nil {
pts.Spec.Affinity = defAffinity
return pts
}
pts.Spec.Affinity = dccs.constructAffinity(dv1.DorisDisaggregatedComputeGroupClusterId, cgClusterId, pts.Spec.Affinity)

if pts.Spec.Affinity.PodAntiAffinity == nil {
pts.Spec.Affinity.PodAntiAffinity = defAffinity.PodAntiAffinity
} else {
pts.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(pts.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
defAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution...)
return pts
}

func (dccs *DisaggregatedComputeGroupsController) constructAffinity(matchKey, value string, ddcAffinity *corev1.Affinity) *corev1.Affinity {
affinity := dccs.newCGDefaultAffinity(matchKey, value)

if ddcAffinity == nil {
return affinity
}

return pts
ddcPodAntiAffinity := ddcAffinity.PodAntiAffinity
if ddcPodAntiAffinity != nil {
affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = ddcPodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution, ddcPodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution...)
}
affinity.NodeAffinity = ddcAffinity.NodeAffinity
affinity.PodAffinity = ddcAffinity.PodAffinity
return affinity
}

func (dccs *DisaggregatedComputeGroupsController) newCGDefaultAffinity(matchKey, value string) *corev1.Affinity {
Expand Down Expand Up @@ -261,9 +267,8 @@ func (dccs *DisaggregatedComputeGroupsController) buildVolumesVolumeMountsAndPVC
Name: LogStoreName,
Annotations: cg.CommonSpec.PersistentVolume.Annotations,
},
Spec: cg.CommonSpec.PersistentVolume.PersistentVolumeClaimSpec,
Spec: *cg.CommonSpec.PersistentVolume.PersistentVolumeClaimSpec.DeepCopy(),
}
logPvc.Spec.Resources.Requests[corev1.ResourceStorage] = kr.MustParse("200Gi")
pvcs = append(pvcs, logPvc)
}
}()
Expand Down Expand Up @@ -293,7 +298,7 @@ func (dccs *DisaggregatedComputeGroupsController) buildVolumesVolumeMountsAndPVC
Name: StorageStorePreName + strconv.Itoa(i),
Annotations: cg.CommonSpec.PersistentVolume.Annotations,
},
Spec: cg.CommonSpec.PersistentVolume.PersistentVolumeClaimSpec,
Spec: *cg.CommonSpec.PersistentVolume.PersistentVolumeClaimSpec.DeepCopy(),
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (dfc *DisaggregatedFEController) reconcileService(ctx context.Context, svc
return resource.ServiceDeepEqualWithAnnoKey(nsvc, osvc, dv1.DisaggregatedSpecHashValueAnnotation)
}); err != nil {
klog.Errorf("disaggregatedFEController reconcileService apply service namespace=%s name=%s failed, err=%s", svc.Namespace, svc.Name, err.Error())
return &sub_controller.Event{Type: sub_controller.EventWarning, Reason: sub_controller.FEApplyResourceFailed, Message: err.Error()}, err
return &sub_controller.Event{Type: sub_controller.EventWarning, Reason: sub_controller.FECreateResourceFailed, Message: err.Error()}, err
}

return nil, nil
Expand All @@ -256,7 +256,7 @@ func (dfc *DisaggregatedFEController) reconcileStatefulset(ctx context.Context,
}

if err := k8s.ApplyStatefulSet(ctx, dfc.k8sClient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithAnnoKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false)
return resource.StatefulsetDeepEqualWithOmitKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, true, false)
}); err != nil {
klog.Errorf("disaggregatedFEController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sub_controller.Event{Type: sub_controller.EventWarning, Reason: sub_controller.FEApplyResourceFailed, Message: err.Error()}, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
sub "github.com/selectdb/doris-operator/pkg/controller/sub_controller"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kr "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"strconv"
Expand All @@ -39,14 +40,15 @@ const (
)

const (
DefaultMetaPath = "/opt/apache-doris/fe/doris-meta"
MetaPathKey = "meta_dir"
DefaultLogPath = "/opt/apache-doris/fe/log"
LogPathKey = "LOG_DIR"
LogStoreName = "fe-log"
MetaStoreName = "fe-meta"
FeClusterId = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"
FeClusterName = "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER"
DefaultMetaPath = "/opt/apache-doris/fe/doris-meta"
MetaPathKey = "meta_dir"
DefaultLogPath = "/opt/apache-doris/fe/log"
LogPathKey = "LOG_DIR"
LogStoreName = "fe-log"
MetaStoreName = "fe-meta"
FeClusterId = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"
FeClusterName = "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER"
DefaultStorageSize int64 = 107374182400
)

var (
Expand Down Expand Up @@ -226,6 +228,22 @@ func (dfc *DisaggregatedFEController) buildVolumesVolumeMountsAndPVCs(confMap ma
var vms []corev1.VolumeMount
var pvcs []corev1.PersistentVolumeClaim

func() {
defQuantity := kr.NewQuantity(DefaultStorageSize, kr.BinarySI)
if fe.PersistentVolume.PersistentVolumeClaimSpec.Resources.Requests == nil {
fe.PersistentVolume.PersistentVolumeClaimSpec.Resources.Requests = map[corev1.ResourceName]kr.Quantity{}
}
pvcSize := fe.PersistentVolume.PersistentVolumeClaimSpec.Resources.Requests[corev1.ResourceStorage]
cmp := defQuantity.Cmp(pvcSize)
if cmp > 0 {
fe.PersistentVolume.PersistentVolumeClaimSpec.Resources.Requests[corev1.ResourceStorage] = *defQuantity
}

if len(fe.PersistentVolume.PersistentVolumeClaimSpec.AccessModes) == 0 {
fe.PersistentVolume.PersistentVolumeClaimSpec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
}
}()

vs = append(vs, corev1.Volume{Name: LogStoreName, VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: LogStoreName,
Expand All @@ -236,7 +254,7 @@ func (dfc *DisaggregatedFEController) buildVolumesVolumeMountsAndPVCs(confMap ma
Name: LogStoreName,
Annotations: fe.CommonSpec.PersistentVolume.Annotations,
},
Spec: fe.CommonSpec.PersistentVolume.PersistentVolumeClaimSpec,
Spec: *fe.CommonSpec.PersistentVolume.PersistentVolumeClaimSpec.DeepCopy(),
})

vs = append(vs, corev1.Volume{Name: MetaStoreName, VolumeSource: corev1.VolumeSource{
Expand All @@ -249,7 +267,7 @@ func (dfc *DisaggregatedFEController) buildVolumesVolumeMountsAndPVCs(confMap ma
Name: MetaStoreName,
Annotations: fe.CommonSpec.PersistentVolume.Annotations,
},
Spec: fe.CommonSpec.PersistentVolume.PersistentVolumeClaimSpec,
Spec: *fe.CommonSpec.PersistentVolume.PersistentVolumeClaimSpec.DeepCopy(),
})

return vs, vms, pvcs
Expand Down

0 comments on commit a60f4a0

Please sign in to comment.