Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support component-level stop and start capabilities (cherry-pick 934e0ed) #8480

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,28 @@ type SpecificOpsRequest struct {
// +listMapKey=componentName
VolumeExpansionList []VolumeExpansion `json:"volumeExpansion,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"componentName"`

// Lists Components to be started. If empty, all components will be started.
//
// +optional
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="forbidden to update spec.start"
// +kubebuilder:validation:MaxItems=1024
// +patchMergeKey=componentName
// +patchStrategy=merge,retainKeys
// +listType=map
// +listMapKey=componentName
StartList []ComponentOps `json:"start,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"componentName"`

// Lists Components to be stopped. If empty, all components will be stopped.
//
// +optional
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="forbidden to update spec.stop"
// +kubebuilder:validation:MaxItems=1024
// +patchMergeKey=componentName
// +patchStrategy=merge,retainKeys
// +listType=map
// +listMapKey=componentName
StopList []ComponentOps `json:"stop,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"componentName"`

// Lists Components to be restarted.
//
// +optional
Expand Down
10 changes: 10 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4723,6 +4723,48 @@ spec:
required:
- componentName
type: object
start:
description: Lists Components to be started. If empty, all components
will be started.
items:
description: ComponentOps specifies the Component to be operated
on.
properties:
componentName:
description: Specifies the name of the Component.
type: string
required:
- componentName
type: object
maxItems: 1024
type: array
x-kubernetes-list-map-keys:
- componentName
x-kubernetes-list-type: map
x-kubernetes-validations:
- message: forbidden to update spec.start
rule: self == oldSelf
stop:
description: Lists Components to be stopped. If empty, all components
will be stopped.
items:
description: ComponentOps specifies the Component to be operated
on.
properties:
componentName:
description: Specifies the name of the Component.
type: string
required:
- componentName
type: object
maxItems: 1024
type: array
x-kubernetes-list-map-keys:
- componentName
x-kubernetes-list-type: map
x-kubernetes-validations:
- message: forbidden to update spec.stop
rule: self == oldSelf
switchover:
description: Lists Switchover objects, each specifying a Component
to perform the switchover operation.
Expand Down
9 changes: 9 additions & 0 deletions controllers/apps/operations/ops_comp_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,12 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
}
return appsv1alpha1.OpsSucceedPhase, 0, nil
}

func hasIntersectionCompOpsList[T ComponentOpsInterface, S ComponentOpsInterface](currCompOpsMap map[string]T, list []S) bool {
for _, comp := range list {
if _, ok := currCompOpsMap[comp.GetComponentName()]; ok {
return true
}
}
return false
}
5 changes: 3 additions & 2 deletions controllers/apps/operations/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ func (r restartOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Clie
return fmt.Errorf("status.startTimestamp can not be null")
}
// abort earlier running vertical scaling opsRequest.
r.compOpsHelper = newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList)
// abort earlier running 'Restart' opsRequest.
if err := abortEarlierOpsRequestWithSameKind(reqCtx, cli, opsRes, []appsv1alpha1.OpsType{appsv1alpha1.RestartType},
func(earlierOps *appsv1alpha1.OpsRequest) (bool, error) {
return true, nil
return hasIntersectionCompOpsList(r.compOpsHelper.componentOpsSet, earlierOps.Spec.RestartList), nil
}); err != nil {
return err
}
r.compOpsHelper = newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList)
componentKindList := []client.ObjectList{
&appv1.StatefulSetList{},
&workloads.InstanceSetList{},
Expand Down
16 changes: 12 additions & 4 deletions controllers/apps/operations/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,20 @@ var _ = Describe("Restart OpsRequest", func() {
})
})

func createRestartOpsObj(clusterName, restartOpsName string) *appsv1alpha1.OpsRequest {
func createRestartOpsObj(clusterName, restartOpsName string, componentNames ...string) *appsv1alpha1.OpsRequest {
ops := testapps.NewOpsRequestObj(restartOpsName, testCtx.DefaultNamespace,
clusterName, appsv1alpha1.RestartType)
ops.Spec.RestartList = []appsv1alpha1.ComponentOps{
{ComponentName: consensusComp},
{ComponentName: statelessComp},
if len(componentNames) == 0 {
ops.Spec.RestartList = []appsv1alpha1.ComponentOps{
{ComponentName: consensusComp},
{ComponentName: statelessComp},
}
} else {
for _, compName := range componentNames {
ops.Spec.RestartList = append(ops.Spec.RestartList, appsv1alpha1.ComponentOps{
ComponentName: compName,
})
}
}
opsRequest := testapps.CreateOpsRequest(ctx, testCtx, ops)
opsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase
Expand Down
47 changes: 33 additions & 14 deletions controllers/apps/operations/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ type StartOpsHandler struct{}
var _ OpsHandler = StartOpsHandler{}

func init() {
stopBehaviour := OpsBehaviour{
FromClusterPhases: []appsv1alpha1.ClusterPhase{appsv1alpha1.StoppedClusterPhase},
ToClusterPhase: appsv1alpha1.UpdatingClusterPhase,
QueueByCluster: true,
OpsHandler: StartOpsHandler{},
startBehaviour := OpsBehaviour{
FromClusterPhases: append(appsv1alpha1.GetClusterUpRunningPhases(), appsv1alpha1.UpdatingClusterPhase,
appsv1alpha1.StoppedClusterPhase, appsv1alpha1.StoppingClusterPhase),
ToClusterPhase: appsv1alpha1.UpdatingClusterPhase,
QueueByCluster: true,
OpsHandler: StartOpsHandler{},
}

opsMgr := GetOpsManager()
opsMgr.RegisterOps(appsv1alpha1.StartType, stopBehaviour)
opsMgr.RegisterOps(appsv1alpha1.StartType, startBehaviour)
}

// ActionStartedCondition the started condition when handling the start request.
Expand All @@ -55,15 +56,33 @@ func (start StartOpsHandler) ActionStartedCondition(reqCtx intctrlutil.RequestCt
func (start StartOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error {
var (
cluster = opsRes.Cluster
startComp = func(compSpec *appsv1alpha1.ClusterComponentSpec) {
compSpec.Stop = nil
}
startList = opsRes.OpsRequest.Spec.StartList
)
for i := range cluster.Spec.ComponentSpecs {
startComp(&cluster.Spec.ComponentSpecs[i])
compOpsHelper := newComponentOpsHelper(startList)
// abort earlier running opsRequests.
if err := abortEarlierOpsRequestWithSameKind(reqCtx, cli, opsRes, []appsv1alpha1.OpsType{appsv1alpha1.StopType},
func(earlierOps *appsv1alpha1.OpsRequest) (bool, error) {
if len(startList) == 0 {
// start all components
return true, nil
}
return len(earlierOps.Spec.StopList) == 0 || hasIntersectionCompOpsList(compOpsHelper.componentOpsSet, earlierOps.Spec.StopList), nil
}); err != nil {
return err
}
startComp := func(compSpec *appsv1alpha1.ClusterComponentSpec, clusterCompName string) {
if len(startList) > 0 {
if _, ok := compOpsHelper.componentOpsSet[clusterCompName]; !ok {
return
}
}
compSpec.Stop = nil
}
for i, v := range cluster.Spec.ComponentSpecs {
startComp(&cluster.Spec.ComponentSpecs[i], v.Name)
}
for i := range cluster.Spec.ShardingSpecs {
startComp(&cluster.Spec.ShardingSpecs[i].Template)
for i, v := range cluster.Spec.ShardingSpecs {
startComp(&cluster.Spec.ShardingSpecs[i].Template, v.Name)
}
return cli.Update(reqCtx.Ctx, cluster)
}
Expand All @@ -84,7 +103,7 @@ func (start StartOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli
}
return handleComponentProgressForScalingReplicas(reqCtx, cli, opsRes, pgRes, compStatus)
}
compOpsHelper := newComponentOpsHelper([]appsv1alpha1.ComponentOps{})
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.StartList)
return compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes, "start", handleComponentProgress)
}

Expand Down
98 changes: 89 additions & 9 deletions controllers/apps/operations/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package operations
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/utils/pointer"

"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -38,6 +39,7 @@ var _ = Describe("Start OpsRequest", func() {
clusterDefinitionName = "cluster-definition-for-ops-" + randomStr
clusterVersionName = "clusterversion-for-ops-" + randomStr
clusterName = "cluster-for-ops-" + randomStr
clusterDefName = "test-clusterdef-" + randomStr
)

cleanEnv := func() {
Expand All @@ -48,7 +50,7 @@ var _ = Describe("Start OpsRequest", func() {
By("clean resources")

// delete cluster(and all dependent sub-resources), clusterversion and clusterdef
testapps.ClearClusterResources(&testCtx)
testapps.ClearClusterResourcesWithRemoveFinalizerOption(&testCtx)

// delete rest resources
inNS := client.InNamespace(testCtx.DefaultNamespace)
Expand All @@ -63,17 +65,32 @@ var _ = Describe("Start OpsRequest", func() {
AfterEach(cleanEnv)

Context("Test OpsRequest", func() {
createStartOpsRequest := func(opsRes *OpsResource, startCompNames ...string) *appsv1alpha1.OpsRequest {
By("create Start opsRequest")
ops := testapps.NewOpsRequestObj("start-ops-"+testCtx.GetRandomStr(), testCtx.DefaultNamespace,
clusterName, appsv1alpha1.StartType)
var startList []appsv1alpha1.ComponentOps
for _, startCompName := range startCompNames {
startList = append(startList, appsv1alpha1.ComponentOps{
ComponentName: startCompName,
})
}
ops.Spec.StartList = startList
opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops)
// set ops phase to Pending
opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase
return ops
}

It("Test start OpsRequest", func() {
By("init operations resources ")
reqCtx := intctrlutil.RequestCtx{Ctx: ctx}
opsRes, _, _ := initOperationsResources(clusterDefinitionName, clusterVersionName, clusterName)
testapps.MockInstanceSetComponent(&testCtx, clusterName, consensusComp)
testapps.MockInstanceSetComponent(&testCtx, clusterName, statelessComp)
testapps.MockInstanceSetComponent(&testCtx, clusterName, statefulComp)
By("create Start opsRequest")
ops := testapps.NewOpsRequestObj("start-ops-"+randomStr, testCtx.DefaultNamespace,
clusterName, appsv1alpha1.StartType)
opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops)
By("create 'Start' opsRequest")
createStartOpsRequest(opsRes)

By("test start action and reconcile function")
Expect(opsutil.UpdateClusterOpsAnnotations(ctx, k8sClient, opsRes.Cluster, nil)).Should(Succeed())
Expand All @@ -83,18 +100,81 @@ var _ = Describe("Start OpsRequest", func() {
})).ShouldNot(HaveOccurred())

// set ops phase to Pending
opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase
_, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase)
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(appsv1alpha1.OpsCreatingPhase))
// do start action
_, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes)
_, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
for _, v := range opsRes.Cluster.Spec.ComponentSpecs {
Expect(v.Stop).Should(BeNil())
}
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err == nil).Should(BeTrue())
})
It("Test start specific components OpsRequest", func() {
By("init operations resources with topology")
opsRes, _, _ := initOperationsResourcesWithTopology(clusterDefName, clusterDefName, clusterName)
// mock components is stopped
Expect(testapps.ChangeObj(&testCtx, opsRes.Cluster, func(pobj *appsv1alpha1.Cluster) {
for i := range pobj.Spec.ComponentSpecs {
pobj.Spec.ComponentSpecs[i].Stop = pointer.Bool(true)
}
})).Should(Succeed())

By("create 'Start' opsRequest for specific components")
createStartOpsRequest(opsRes, defaultCompName)

By("mock 'Start' OpsRequest to Creating phase")
reqCtx := intctrlutil.RequestCtx{Ctx: ctx}
runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase)

By("test start action")
startHandler := StartOpsHandler{}
err := startHandler.Action(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())

By("verify components are being started")
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.Cluster), func(g Gomega, pobj *appsv1alpha1.Cluster) {
for _, v := range pobj.Spec.ComponentSpecs {
if v.Name == defaultCompName {
Expect(v.Stop).Should(BeNil())
} else {
Expect(v.Stop).ShouldNot(BeNil())
Expect(*v.Stop).Should(BeTrue())
}
}
})).Should(Succeed())

By("mock components start successfully")
testapps.MockInstanceSetPods(&testCtx, nil, opsRes.Cluster, defaultCompName)
testapps.MockInstanceSetStatus(testCtx, opsRes.Cluster, defaultCompName)

By("test reconcile")
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())

By("verify ops request completed")
Eventually(testapps.GetOpsRequestPhase(&testCtx,
client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(appsv1alpha1.OpsSucceedPhase))
})

It("Test abort running 'Stop' opsRequest", func() {
By("init operations resources with topology")
opsRes, _, _ := initOperationsResourcesWithTopology(clusterDefName, clusterDefName, clusterName)
reqCtx := intctrlutil.RequestCtx{Ctx: ctx}

By("create 'Stop' opsRequest for all components")
stopOps := createStopOpsRequest(opsRes, defaultCompName)
runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase)

By("create a start opsRequest")
createStartOpsRequest(opsRes, defaultCompName)
startHandler := StartOpsHandler{}
err := startHandler.Action(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())

By("expect the 'Stop' OpsRequest to be Aborted")
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(stopOps))).Should(Equal(appsv1alpha1.OpsAbortedPhase))
})
})
})
Loading
Loading