Skip to content

Commit

Permalink
Use subcluster status to update podfacts (#814)
Browse files Browse the repository at this point in the history
After unsandbox, the catalog dir is deleted. The issue we check its
existence to set dbExists in the pod facts. With this change, dbExists
is true if the catalog dir exists or if addedToDB field in the
subcluster status detail for that specific pod is true.
  • Loading branch information
roypaulin authored and cchen-vertica committed Jul 24, 2024
1 parent 90643bd commit d386d12
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 6 deletions.
4 changes: 2 additions & 2 deletions api/v1/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,8 +874,8 @@ func (v *VerticaDB) IsHTTPSTLSConfGenerationEnabled() (bool, error) {
func (v *VerticaDB) GenSubclusterStatusMap() map[string]*SubclusterStatus {
scMap := map[string]*SubclusterStatus{}
for i := range v.Status.Subclusters {
sb := &v.Status.Subclusters[i]
scMap[sb.Name] = sb
sc := &v.Status.Subclusters[i]
scMap[sc.Name] = sc
}
return scMap
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/controllers/vdb/dbremovenode_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/vertica/vertica-kubernetes/pkg/names"
"github.com/vertica/vertica-kubernetes/pkg/vadmin"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/removenode"
"github.com/vertica/vertica-kubernetes/pkg/vdbstatus"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -140,6 +141,10 @@ func (d *DBRemoveNodeReconciler) removeNodesInSubcluster(ctx context.Context, sc
return ctrl.Result{}, fmt.Errorf("failed to call remove node: %w", err)
}

if err := d.updateSubclusterStatus(ctx, podsToRemove); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update subcluster status: %w", err)
}

// We successfully called db_remove_node, invalidate the pod facts cache
// so that it is refreshed the next time we need it.
d.PFacts.Invalidate()
Expand Down Expand Up @@ -202,3 +207,23 @@ func (d *DBRemoveNodeReconciler) findPodsSuitableForScaleDown(sc *vapi.Subcluste
}
return pods, requeueNeeded
}

// updateSubclusterStatus updates the removed nodes detail in their subcluster status
func (d *DBRemoveNodeReconciler) updateSubclusterStatus(ctx context.Context, removedPods []*PodFact) error {
refreshInPlace := func(vdb *vapi.VerticaDB) error {
scMap := vdb.GenSubclusterStatusMap()
// The removed nodes belong to the same subcluster
// so we return if we don't find its status
scs := scMap[removedPods[0].subclusterName]
if scs == nil {
return nil
}
for _, p := range removedPods {
if int(p.podIndex) < len(scs.Detail) {
scs.Detail[p.podIndex].AddedToDB = false
}
}
return nil
}
return vdbstatus.Update(ctx, d.VRec.Client, d.Vdb, refreshInPlace)
}
17 changes: 17 additions & 0 deletions pkg/controllers/vdb/dbremovenode_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ var _ = Describe("dbremovenode_reconcile", func() {
vdb := vapi.MakeVDB()
sc := &vdb.Spec.Subclusters[0]
sc.Size = 2
test.CreateVDB(ctx, k8sClient, vdb)
defer test.DeleteVDB(ctx, k8sClient, vdb)
test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning)
defer test.DeletePods(ctx, k8sClient, vdb)

Expand Down Expand Up @@ -82,6 +84,15 @@ var _ = Describe("dbremovenode_reconcile", func() {
test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning)
defer test.DeletePods(ctx, k8sClient, vdbCopy)

vdb.Status.Subclusters = []vapi.SubclusterStatus{
{Name: sc.Name, Detail: []vapi.VerticaDBPodStatus{
{AddedToDB: true},
{AddedToDB: true},
{AddedToDB: true},
}},
}
Expect(k8sClient.Status().Update(ctx, vdb)).Should(Succeed())

// Resize the subcluster to remove two nodes
nm := vdb.ExtractNamespacedName()
fetchedVdb := &vapi.VerticaDB{}
Expand Down Expand Up @@ -109,6 +120,12 @@ var _ = Describe("dbremovenode_reconcile", func() {
"--hosts",
fmt.Sprintf("%s,%s", pfacts.Detail[uninstallPods[0]].dnsName, pfacts.Detail[uninstallPods[1]].dnsName),
))

Expect(k8sClient.Get(ctx, nm, fetchedVdb)).Should(Succeed())
Expect(len(fetchedVdb.Status.Subclusters[0].Detail)).Should(Equal(int(sc.Size)))
Expect(fetchedVdb.Status.Subclusters[0].Detail[0].AddedToDB).Should(BeTrue())
Expect(fetchedVdb.Status.Subclusters[0].Detail[1].AddedToDB).Should(BeFalse())
Expect(fetchedVdb.Status.Subclusters[0].Detail[2].AddedToDB).Should(BeFalse())
})

It("should skip remove node and requeue because pod we need to remove node from isn't running", func() {
Expand Down
30 changes: 30 additions & 0 deletions pkg/controllers/vdb/dbremovesubcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/vertica/vertica-kubernetes/pkg/names"
"github.com/vertica/vertica-kubernetes/pkg/vadmin"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/removesc"
"github.com/vertica/vertica-kubernetes/pkg/vdbstatus"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
)
Expand Down Expand Up @@ -124,6 +125,14 @@ func (d *DBRemoveSubclusterReconciler) removeExtraSubclusters(ctx context.Contex
if err := d.removeSubcluster(ctx, subclusters[i].Name); err != nil {
return ctrl.Result{}, err
}

if err := d.updateSubclusterStatus(ctx, subclusters[i].Name); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update subcluster status: %w", err)
}

// We successfully called remove subcluster and updated the status, invalidate
// the pod facts cache so that it is refreshed the next time we need it.
d.PFacts.Invalidate()
}
return ctrl.Result{}, nil
}
Expand All @@ -142,6 +151,27 @@ func (d *DBRemoveSubclusterReconciler) removeSubcluster(ctx context.Context, scN
return nil
}

// updateSubclusterStatus updates all of the given subcluster's nodes detail
// in the status
func (d *DBRemoveSubclusterReconciler) updateSubclusterStatus(ctx context.Context, scName string) error {
refreshInPlace := func(vdb *vapi.VerticaDB) error {
scMap := vdb.GenSubclusterStatusMap()
scs := scMap[scName]
if scs == nil {
return nil
}
for _, p := range d.PFacts.Detail {
if p.subclusterName == scName {
if int(p.podIndex) < len(scs.Detail) {
scs.Detail[p.podIndex].AddedToDB = false
}
}
}
return nil
}
return vdbstatus.Update(ctx, d.VRec.Client, d.Vdb, refreshInPlace)
}

// resetDefaultSubcluster will set the default subcluster to the first
// subcluster that exists in the vdb. This step is necessary before removing
// any subclusters because you cannot remove the default subcluster.
Expand Down
9 changes: 7 additions & 2 deletions pkg/controllers/vdb/podfacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,9 @@ func (p *PodFacts) getEnvValueFromPod(cnt *corev1.Container, envName string) (st
func (p *PodFacts) checkIsDBCreated(_ context.Context, vdb *vapi.VerticaDB, pf *PodFact, gs *GatherState) error {
pf.dbExists = false

// Set dbExists and vnodeName from state found in the vdb.status. We
// cannot always trust the state on disk. When a pod is unsandboxed, the catalog is
// removed for instance.
scs, ok := vdb.FindSubclusterStatus(pf.subclusterName)
if ok {
// Set the db exists indicator first based on the count in the status
Expand All @@ -741,13 +744,15 @@ func (p *PodFacts) checkIsDBCreated(_ context.Context, vdb *vapi.VerticaDB, pf *
// Inherit the vnode name if present
if int(pf.podIndex) < len(scs.Detail) {
pf.vnodeName = scs.Detail[pf.podIndex].VNodeName
pf.dbExists = scs.Detail[pf.podIndex].AddedToDB
}
}
// Nothing else can be gathered if the pod isn't running.
// The gather state is empty if the pod isn't running.
if !pf.isPodRunning {
return nil
}
pf.dbExists = gs.DBExists

pf.dbExists = gs.DBExists || pf.dbExists
pf.vnodeName = gs.VNodeName
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/vdb/podfacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ = Describe("podfacts", func() {
fpr := &cmds.FakePodRunner{}
pfacts := MakePodFacts(vdbRec, fpr, logger, TestPassword)
vdb.Status.Subclusters = []vapi.SubclusterStatus{
{Name: sc.Name, AddedToDBCount: sc.Size, Detail: []vapi.VerticaDBPodStatus{{Installed: true}}},
{Name: sc.Name, AddedToDBCount: sc.Size, Detail: []vapi.VerticaDBPodStatus{{Installed: true, AddedToDB: true}}},
}
Expect(pfacts.Collect(ctx, vdb)).Should(Succeed())
pf, ok := pfacts.Detail[nm]
Expand All @@ -65,7 +65,7 @@ var _ = Describe("podfacts", func() {
Expect(pf.dbExists).Should(BeTrue())

vdb.Status.Subclusters = []vapi.SubclusterStatus{
{Name: sc.Name, AddedToDBCount: 0, Detail: []vapi.VerticaDBPodStatus{{Installed: false}}},
{Name: sc.Name, AddedToDBCount: 0, Detail: []vapi.VerticaDBPodStatus{{Installed: false, AddedToDB: false}}},
}
pfacts.Invalidate()
Expect(pfacts.Collect(ctx, vdb)).Should(Succeed())
Expand Down

0 comments on commit d386d12

Please sign in to comment.