From 13f635069f49019b6c48025abba427456b1e2872 Mon Sep 17 00:00:00 2001 From: yati1998 Date: Wed, 7 Sep 2022 13:09:54 +0530 Subject: [PATCH] replication: add reconcile logic to update last_sync This commit adds reconcile logic to update the last sync time. Signed-off-by: yati1998 --- .../replication/replication.go | 11 ++++ .../volumereplication_controller.go | 53 +++++++++++++++++++ internal/client/fake/replication-client.go | 12 +++++ internal/client/replication-client.go | 19 +++++++ internal/sidecar/service/volumereplication.go | 31 +++++++++++ 5 files changed, 126 insertions(+) diff --git a/controllers/replication.storage/replication/replication.go b/controllers/replication.storage/replication/replication.go index 82634ca05..772257e2b 100644 --- a/controllers/replication.storage/replication/replication.go +++ b/controllers/replication.storage/replication/replication.go @@ -107,6 +107,17 @@ func (r *Replication) Resync() *Response { return &Response{Response: resp, Error: err} } +func (r *Replication) GetInfo() *Response { + resp, err := r.Params.Replication.GetVolumeReplicationInfo( + r.Params.VolumeID, + r.Params.ReplicationID, + r.Params.SecretName, + r.Params.SecretNamespace, + ) + + return &Response{Response: resp, Error: err} +} + func (r *Response) HasKnownGRPCError(knownErrors []codes.Code) bool { if r.Error == nil { return false diff --git a/controllers/replication.storage/volumereplication_controller.go b/controllers/replication.storage/volumereplication_controller.go index 14f129c8f..34060041e 100644 --- a/controllers/replication.storage/volumereplication_controller.go +++ b/controllers/replication.storage/volumereplication_controller.go @@ -125,6 +125,8 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re } // remove the prefix keys in volume replication class parameters parameters := filterPrefixedParameters(replicationParameterPrefix, vrcObj.Spec.Parameters) + schedulingTime := parameters["schedulingInterval"] + scheduleTime, _ := time.ParseDuration(schedulingTime) // get secret secretName := vrcObj.Spec.Parameters[prefixedReplicationSecretNameKey] @@ -363,6 +365,15 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re } instance.Status.LastCompletionTime = getCurrentTime() + var last_sync_time int64 + var requeueForInfo bool + if instance.Spec.ReplicationState == replicationv1alpha1.Primary { + last_sync_time, err = r.getVolumeReplicationInfo(vr) + last_sync_time1 := time.Unix(last_sync_time, 0).UTC() + instance.Status.LastSyncTime = getTime(last_sync_time1) + requeueForInfo = true + + } err = r.updateReplicationStatus(instance, logger, getReplicationState(instance), msg) if err != nil { return ctrl.Result{}, err @@ -370,6 +381,14 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re logger.Info(msg) + if requeueForInfo { + logger.Info("volume is primary, requeuing to get volume replication info") + return ctrl.Result{ + Requeue: true, + RequeueAfter: scheduleTime, + }, nil + } + return ctrl.Result{}, nil } @@ -614,6 +633,34 @@ func (r *VolumeReplicationReconciler) enableReplication(vr *volumeReplicationIns return nil } +// getVolumeReplicationInfo gets volume replication info. +func (r *VolumeReplicationReconciler) getVolumeReplicationInfo(vr *volumeReplicationInstance) (int64, error) { + volumeReplication := replication.Replication{ + Params: vr.commonRequestParameters, + } + + resp := volumeReplication.GetInfo() + if resp.Error != nil { + vr.logger.Error(resp.Error, "failed to get volume replication info") + + return 0, resp.Error + } + + infoResponse, ok := resp.Response.(*proto.GetVolumeReplicationInfoResponse) + if !ok { + err := fmt.Errorf("received response of unexpected type") + vr.logger.Error(err, "unable to parse response") + + return 0, err + } + + var last_sync_time int64 + + last_sync_time = infoResponse.LastSyncTime + + return last_sync_time, nil +} + func getReplicationState(instance *replicationv1alpha1.VolumeReplication) replicationv1alpha1.State { switch instance.Spec.ReplicationState { case replicationv1alpha1.Primary: @@ -651,3 +698,9 @@ func getCurrentTime() *metav1.Time { return &metav1NowTime } + +func getTime(time time.Time) *metav1.Time { + metav1Time := metav1.NewTime(time) + + return &metav1Time +} diff --git a/internal/client/fake/replication-client.go b/internal/client/fake/replication-client.go index a7dba7c34..d642854dd 100644 --- a/internal/client/fake/replication-client.go +++ b/internal/client/fake/replication-client.go @@ -30,6 +30,8 @@ type ReplicationClient struct { DemoteVolumeMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.DemoteVolumeResponse, error) // ResyncVolumeMock mocks ResyncVolume RPC call. ResyncVolumeMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.ResyncVolumeResponse, error) + // GetVolumeReplicationInfo mocks GetVolumeReplicationInfo RPC call. + GetVolumeReplicationInfoMock func(volumeID, replicationID string, secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error) } // EnableVolumeReplication calls EnableVolumeReplicationMock mock function. @@ -87,3 +89,13 @@ func (rc *ReplicationClient) ResyncVolume( error) { return rc.ResyncVolumeMock(volumeID, replicationID, secretName, secretNamespace, parameters) } + +// GetVolumeReplicationInfo calls GetVolumeReplicationInfoMock function. +func (rc *ReplicationClient) GetVolumeReplicationInfo( + volumeID, + replicationID string, + secretName, secretNamespace string) ( + *proto.GetVolumeReplicationInfoResponse, + error) { + return rc.GetVolumeReplicationInfoMock(volumeID, replicationID, secretName, secretNamespace) +} diff --git a/internal/client/replication-client.go b/internal/client/replication-client.go index de3cc4f7e..c19beeaf3 100644 --- a/internal/client/replication-client.go +++ b/internal/client/replication-client.go @@ -45,6 +45,8 @@ type VolumeReplication interface { // ResyncVolume RPC call to resync the volume. ResyncVolume(volumeID, replicationID string, force bool, secretName, secretNamespace string, parameters map[string]string) (*proto. ResyncVolumeResponse, error) + // GetVolumeReplicationInfo RPC call to get volume replication info. + GetVolumeReplicationInfo(volumeID, replicationID string, secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error) } // NewReplicationClient returns VolumeReplication interface which has the RPC @@ -143,3 +145,20 @@ func (rc *replicationClient) ResyncVolume(volumeID, replicationID string, force return resp, err } + +// GetVolumeReplicationInfo RPC call to get volume replication info. +func (rc *replicationClient) GetVolumeReplicationInfo(volumeID, replicationID string, + secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error) { + req := &proto.GetVolumeReplicationInfoRequest{ + VolumeId: volumeID, + ReplicationId: replicationID, + SecretName: secretName, + SecretNamespace: secretNamespace, + } + + createCtx, cancel := context.WithTimeout(context.Background(), rc.timeout) + defer cancel() + resp, err := rc.client.GetVolumeReplicationInfo(createCtx, req) + + return resp, err +} diff --git a/internal/sidecar/service/volumereplication.go b/internal/sidecar/service/volumereplication.go index 6b7d1146b..dc4d2f054 100644 --- a/internal/sidecar/service/volumereplication.go +++ b/internal/sidecar/service/volumereplication.go @@ -191,3 +191,34 @@ func (rs *ReplicationServer) ResyncVolume( Ready: resp.Ready, }, nil } + +// GetVolumeReplicationInfo fetches required information from kubernetes cluster and calls +// CSI-Addons GetVolumeReplicationInfo service. +func (rs *ReplicationServer) GetVolumeReplicationInfo( + ctx context.Context, + req *proto.GetVolumeReplicationInfoRequest) (*proto.GetVolumeReplicationInfoResponse, error) { + // Get the secrets from the k8s cluster + data, err := kube.GetSecret(ctx, rs.kubeClient, req.GetSecretName(), req.GetSecretNamespace()) + if err != nil { + klog.Errorf("Failed to get secret %s in namespace %s: %v", req.GetSecretName(), req.GetSecretNamespace(), err) + return nil, status.Error(codes.Internal, err.Error()) + } + + resp, err := rs.controllerClient.GetVolumeReplicationInfo(ctx, + &csiReplication.GetVolumeReplicationInfoRequest{ + VolumeId: req.VolumeId, + Secrets: data, + ReplicationId: req.ReplicationId, + }) + if err != nil { + klog.Errorf("Failed to get volume replication info: %v", err) + return nil, err + } + + lastsynctime := resp.LastSyncTime + synctime := lastsynctime.GetSeconds() + + return &proto.GetVolumeReplicationInfoResponse{ + LastSyncTime: synctime, + }, nil +}