Skip to content

Commit

Permalink
Add replication reconciler for VerticaReplicator (#773)
Browse files Browse the repository at this point in the history
This pull request modifies the Vertica replicator controller to invoke
the replication command synchronously. Additionally, it introduces new
status conditions within VerticaReplicator to monitor the controller's
various states.

---------

Co-authored-by: spilchen <mspilchen@opentext.com>
  • Loading branch information
2 people authored and cchen-vertica committed Jul 24, 2024
1 parent c4d7e16 commit 3d1661a
Show file tree
Hide file tree
Showing 31 changed files with 1,173 additions and 129 deletions.
2 changes: 0 additions & 2 deletions api/v1/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ const (
RestoreSupportedMinVersion = "v24.2.0"
// Starting in v24.3.0, database replication via vclusterops is supported.
ReplicationViaVclusteropsSupportedMinVersion = "v24.3.0"
// Starting in v23.3.0, in-database replication is supported.
ReplicationSupportedMinVersion = "v23.3.0"
// The NMA TLS secret can be stored in an external secret store. These are
// the minimum versions of the NMA that we support them.
NMATLSSecretInGSMMinVersion = "v24.1.0"
Expand Down
6 changes: 5 additions & 1 deletion api/v1beta1/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func (vrep *VerticaReplicator) IsStatusConditionFalse(statusCondition string) bo
return meta.IsStatusConditionFalse(vrep.Status.Conditions, statusCondition)
}

func (vrep *VerticaReplicator) IsStatusConditionPresent(statusCondition string) bool {
return meta.FindStatusCondition(vrep.Status.Conditions, statusCondition) != nil
}

func MakeSampleVrpqName() types.NamespacedName {
return types.NamespacedName{Name: "vrpq-sample", Namespace: "default"}
}
Expand Down Expand Up @@ -199,7 +203,7 @@ func MakeVrep() *VerticaReplicator {
ObjectMeta: metav1.ObjectMeta{
Name: nm.Name,
Namespace: nm.Namespace,
UID: "zxcvbn-ghi-lkm",
UID: "zxcvbn-ghi-lkm-xyz",
},
Spec: VerticaReplicatorSpec{
Source: VerticaReplicatorDatabaseInfo{
Expand Down
1 change: 1 addition & 0 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func addReconcilersToManager(mgr manager.Manager, restCfg *rest.Config) {
if err := (&vrep.VerticaReplicatorReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Cfg: restCfg,
EVRec: mgr.GetEventRecorderFor(vmeta.OperatorName),
Log: ctrl.Log.WithName("controllers").WithName("VerticaReplicator"),
}).SetupWithManager(mgr); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/vdb/dbaddnode_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (d *DBAddNodeReconciler) reconcileSubcluster(ctx context.Context, sc *vapi.

// runAddNode will add nodes to the given subcluster
func (d *DBAddNodeReconciler) runAddNode(ctx context.Context, podsToAdd []*PodFact) (ctrl.Result, error) {
initiatorPod, ok := d.PFacts.findPodToRunVsql(false, "")
initiatorPod, ok := d.PFacts.findFirstUpPod(false, "")
if !ok {
d.Log.Info("No pod found to run vsql and admintools from. Requeue reconciliation.")
return ctrl.Result{Requeue: true}, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/vdb/metrics_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (p *MetricReconciler) captureRawMetrics() map[string]*subclusterGaugeDetail
// so its up to the caller to check that the annotaiton was set and act
// accordingly.
func (p *MetricReconciler) setReviveInstanceIDAnnotation(ctx context.Context) error {
pf, ok := p.PFacts.findPodToRunVsql(true, "")
pf, ok := p.PFacts.findFirstUpPod(true, "")
if !ok {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/vdb/onlineupgrade_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ func (o *OnlineUpgradeReconciler) skipTransientSetup() bool {
// We skip creating the transient if the cluster is down. We cannot add the
// transient if everything is down. And there is nothing "online" with this
// upgrade if we start with everything down.
_, found := o.PFacts.findPodToRunVsql(false, "")
_, found := o.PFacts.findFirstUpPod(false, "")
return !found
}

Expand Down
23 changes: 12 additions & 11 deletions pkg/controllers/vdb/podfacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,19 +827,20 @@ func (p *PodFacts) doesDBExist() bool {
return false
}

// findPodToRunVsql returns the name of the pod we will exec into in
// order to run vsql
// findFirstUpPod returns the first (sorted) pod that has an up vertica node
// Will return false for second parameter if no pod could be found.
func (p *PodFacts) findPodToRunVsql(allowReadOnly bool, scName string) (*PodFact, bool) {
for _, v := range p.Detail {
if scName != "" && v.subclusterName != scName {
continue
}
if v.upNode && (allowReadOnly || !v.readOnly) {
return v, true
}
func (p *PodFacts) findFirstUpPod(allowReadOnly bool, scName string) (*PodFact, bool) {
return p.findFirstPodSorted(func(v *PodFact) bool {
return (scName == "" || v.subclusterName == scName) &&
v.upNode && (allowReadOnly || !v.readOnly)
})
}

func (p *PodFacts) FindFirstUpPodIP(allowReadOnly bool, scName string) (string, bool) {
if pod, ok := p.findFirstUpPod(allowReadOnly, scName); ok {
return pod.podIP, true
}
return &PodFact{}, false
return "", false
}

// findPodToRunAdminCmdAny returns the name of the pod we will exec into into
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/vdb/rebalanceshards_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *RebalanceShardsReconciler) Reconcile(ctx context.Context, _ *ctrl.Reque
return ctrl.Result{}, nil
}

atPod, ok := s.PFacts.findPodToRunVsql(false, "")
atPod, ok := s.PFacts.findFirstUpPod(false, "")
if !ok {
s.Log.Info("No pod found to run vsql from. Requeue reconciliation.")
return ctrl.Result{Requeue: true}, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/vdb/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (i *UpgradeManager) traceActorReconcile(actor controllers.ReconcileActor) {
// that are active for a given subcluster. It returns a requeue error if there
// are still active connections.
func (i *UpgradeManager) isSubclusterIdle(ctx context.Context, pfacts *PodFacts, scName string) (ctrl.Result, error) {
pf, ok := pfacts.findPodToRunVsql(true, scName)
pf, ok := pfacts.findFirstUpPod(true, scName)
if !ok {
i.Log.Info("No pod found to run vsql. Skipping active connection check")
return ctrl.Result{}, nil
Expand Down
43 changes: 43 additions & 0 deletions pkg/controllers/vrep/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
(c) Copyright [2021-2024] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vrep

import (
"context"

vapi "github.com/vertica/vertica-kubernetes/api/v1"
"github.com/vertica/vertica-kubernetes/api/v1beta1"
verrors "github.com/vertica/vertica-kubernetes/pkg/errors"
"github.com/vertica/vertica-kubernetes/pkg/names"
"github.com/vertica/vertica-kubernetes/pkg/vk8s"
ctrl "sigs.k8s.io/controller-runtime"
)

func fetchSourceAndTargetVDBs(ctx context.Context,
vRec *VerticaReplicatorReconciler,
vrep *v1beta1.VerticaReplicator) (vdbSource, vdbTarget *vapi.VerticaDB, res ctrl.Result, err error) {
vdbSource = &vapi.VerticaDB{}
vdbTarget = &vapi.VerticaDB{}
nmSource := names.GenNamespacedName(vrep, vrep.Spec.Source.VerticaDB)
nmTarget := names.GenNamespacedName(vrep, vrep.Spec.Target.VerticaDB)
if res, err = vk8s.FetchVDB(ctx, vRec, vrep, nmSource, vdbSource); verrors.IsReconcileAborted(res, err) {
return nil, nil, res, err
}
if res, err = vk8s.FetchVDB(ctx, vRec, vrep, nmTarget, vdbTarget); verrors.IsReconcileAborted(res, err) {
return nil, nil, res, err
}
return vdbSource, vdbTarget, ctrl.Result{}, nil
}
Loading

0 comments on commit 3d1661a

Please sign in to comment.