Skip to content

Commit

Permalink
fix: the component controller is not notified when a backup for h-sca…
Browse files Browse the repository at this point in the history
…le becomes ready (#8473)

(cherry picked from commit 25ad8b1)
  • Loading branch information
leon-inf committed Nov 18, 2024
1 parent bbd7bd0 commit 4bc243b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 25 deletions.
21 changes: 14 additions & 7 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type componentWorkloadOps struct {
reqCtx intctrlutil.RequestCtx
cluster *appsv1.Cluster
synthesizeComp *component.SynthesizedComponent
comp *appsv1.Component
dag *graph.DAG

// runningITS is a snapshot of the InstanceSet that is already running
Expand All @@ -82,6 +83,7 @@ func (t *componentWorkloadTransformer) Transform(ctx graph.TransformContext, dag

cluster := transCtx.Cluster
compDef := transCtx.CompDef
comp := transCtx.Component
synthesizeComp := transCtx.SynthesizeComponent
reqCtx := intctrlutil.RequestCtx{
Ctx: transCtx.Context,
Expand All @@ -103,14 +105,14 @@ func (t *componentWorkloadTransformer) Transform(ctx graph.TransformContext, dag
}
transCtx.ProtoWorkload = protoITS

if err = t.reconcileWorkload(synthesizeComp, transCtx.Component, runningITS, protoITS); err != nil {
if err = t.reconcileWorkload(synthesizeComp, comp, runningITS, protoITS); err != nil {
return err
}

graphCli, _ := transCtx.Client.(model.GraphClient)
if runningITS == nil {
if protoITS != nil {
if err := setCompOwnershipNFinalizer(transCtx.Component, protoITS); err != nil {
if err := setCompOwnershipNFinalizer(comp, protoITS); err != nil {
return err
}
graphCli.Create(dag, protoITS)
Expand All @@ -120,7 +122,7 @@ func (t *componentWorkloadTransformer) Transform(ctx graph.TransformContext, dag
if protoITS == nil {
graphCli.Delete(dag, runningITS)
} else {
err = t.handleUpdate(reqCtx, graphCli, dag, cluster, synthesizeComp, runningITS, protoITS)
err = t.handleUpdate(reqCtx, graphCli, dag, cluster, synthesizeComp, comp, runningITS, protoITS)
}
}
return err
Expand Down Expand Up @@ -173,10 +175,10 @@ func (t *componentWorkloadTransformer) stopWorkload(protoITS *workloads.Instance
}

func (t *componentWorkloadTransformer) handleUpdate(reqCtx intctrlutil.RequestCtx, cli model.GraphClient, dag *graph.DAG,
cluster *appsv1.Cluster, synthesizeComp *component.SynthesizedComponent, runningITS, protoITS *workloads.InstanceSet) error {
cluster *appsv1.Cluster, synthesizeComp *component.SynthesizedComponent, comp *appsv1.Component, runningITS, protoITS *workloads.InstanceSet) error {
if !isCompStopped(synthesizeComp) {
// postpone the update of the workload until the component is back to running.
if err := t.handleWorkloadUpdate(reqCtx, dag, cluster, synthesizeComp, runningITS, protoITS); err != nil {
if err := t.handleWorkloadUpdate(reqCtx, dag, cluster, synthesizeComp, comp, runningITS, protoITS); err != nil {
return err
}
}
Expand All @@ -190,8 +192,8 @@ func (t *componentWorkloadTransformer) handleUpdate(reqCtx intctrlutil.RequestCt
}

func (t *componentWorkloadTransformer) handleWorkloadUpdate(reqCtx intctrlutil.RequestCtx, dag *graph.DAG,
cluster *appsv1.Cluster, synthesizeComp *component.SynthesizedComponent, obj, its *workloads.InstanceSet) error {
cwo, err := newComponentWorkloadOps(reqCtx, t.Client, cluster, synthesizeComp, obj, its, dag)
cluster *appsv1.Cluster, synthesizeComp *component.SynthesizedComponent, comp *appsv1.Component, obj, its *workloads.InstanceSet) error {
cwo, err := newComponentWorkloadOps(reqCtx, t.Client, cluster, synthesizeComp, comp, obj, its, dag)
if err != nil {
return err
}
Expand Down Expand Up @@ -596,6 +598,9 @@ func (r *componentWorkloadOps) scaleOut(itsObj *workloads.InstanceSet) error {
return err
}
for _, obj := range objs1 {
if err := setCompOwnershipNFinalizer(r.comp, obj); err != nil {
return err
}
graphCli.Do(r.dag, nil, obj, model.ActionCreatePtr(), nil)
}
for _, obj := range objs2 {
Expand Down Expand Up @@ -1079,6 +1084,7 @@ func newComponentWorkloadOps(reqCtx intctrlutil.RequestCtx,
cli client.Client,
cluster *appsv1.Cluster,
synthesizeComp *component.SynthesizedComponent,
comp *appsv1.Component,
runningITS *workloads.InstanceSet,
protoITS *workloads.InstanceSet,
dag *graph.DAG) (*componentWorkloadOps, error) {
Expand All @@ -1094,6 +1100,7 @@ func newComponentWorkloadOps(reqCtx intctrlutil.RequestCtx,
cli: cli,
reqCtx: reqCtx,
cluster: cluster,
comp: comp,
synthesizeComp: synthesizeComp,
runningITS: runningITS,
protoITS: protoITS,
Expand Down
36 changes: 18 additions & 18 deletions controllers/apps/transformer_component_workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@ package apps
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/sets"

kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
testk8s "github.com/apecloud/kubeblocks/pkg/testutil/k8s"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps"
testk8s "github.com/apecloud/kubeblocks/pkg/testutil/k8s"
)

var _ = Describe("Component Workload Operations Test", func() {
Expand All @@ -46,18 +44,19 @@ var _ = Describe("Component Workload Operations Test", func() {
var (
reader *mockReader
dag *graph.DAG
comp *appsv1.Component
synthesizeComp *component.SynthesizedComponent
)

newDAG := func(graphCli model.GraphClient, comp *appsv1alpha1.Component) *graph.DAG {
newDAG := func(graphCli model.GraphClient, comp *appsv1.Component) *graph.DAG {
d := graph.NewDAG()
graphCli.Root(d, comp, comp, model.ActionStatusPtr())
return d
}

BeforeEach(func() {
reader = &mockReader{}
comp := &appsv1alpha1.Component{
comp = &appsv1.Component{
ObjectMeta: metav1.ObjectMeta{
Namespace: testCtx.DefaultNamespace,
Name: constant.GenerateClusterComponentName(clusterName, compName),
Expand All @@ -67,30 +66,30 @@ var _ = Describe("Component Workload Operations Test", func() {
constant.KBAppComponentLabelKey: compName,
},
},
Spec: appsv1alpha1.ComponentSpec{},
Spec: appsv1.ComponentSpec{},
}

synthesizeComp = &component.SynthesizedComponent{
Namespace: testCtx.DefaultNamespace,
ClusterName: clusterName,
Name: compName,
Roles: []kbappsv1.ReplicaRole{
Roles: []appsv1.ReplicaRole{
{Name: "leader", Serviceable: true, Writable: true, Votable: true},
{Name: "follower", Serviceable: false, Writable: false, Votable: false},
},
LifecycleActions: &kbappsv1.ComponentLifecycleActions{
MemberJoin: &kbappsv1.Action{
Exec: &kbappsv1.ExecAction{
LifecycleActions: &appsv1.ComponentLifecycleActions{
MemberJoin: &appsv1.Action{
Exec: &appsv1.ExecAction{
Image: "test-image",
},
},
MemberLeave: &kbappsv1.Action{
Exec: &kbappsv1.ExecAction{
MemberLeave: &appsv1.Action{
Exec: &appsv1.ExecAction{
Image: "test-image",
},
},
Switchover: &kbappsv1.Action{
Exec: &kbappsv1.ExecAction{
Switchover: &appsv1.Action{
Exec: &appsv1.ExecAction{
Image: "test-image",
},
},
Expand Down Expand Up @@ -167,8 +166,9 @@ var _ = Describe("Component Workload Operations Test", func() {
ops = &componentWorkloadOps{
cli: k8sClient,
reqCtx: intctrlutil.RequestCtx{Ctx: ctx, Log: logger},
synthesizeComp: synthesizeComp,
cluster: mockCluster,
synthesizeComp: synthesizeComp,
comp: comp,
runningITS: mockITS,
protoITS: mockITS.DeepCopy(),
dag: dag,
Expand Down

0 comments on commit 4bc243b

Please sign in to comment.