Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LastSyncTime to volume replication status #232

Merged
merged 4 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type VolumeReplicationStatus struct {
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
LastStartTime *metav1.Time `json:"lastStartTime,omitempty"`
LastCompletionTime *metav1.Time `json:"lastCompletionTime,omitempty"`
LastSyncTime *metav1.Time `json:"lastSyncTime,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
4 changes: 4 additions & 0 deletions apis/replication.storage/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ spec:
lastStartTime:
format: date-time
type: string
lastSyncTime:
format: date-time
type: string
message:
type: string
observedGeneration:
Expand Down
11 changes: 11 additions & 0 deletions controllers/replication.storage/replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ func (r *Replication) Resync() *Response {
return &Response{Response: resp, Error: err}
}

func (r *Replication) GetInfo() *Response {
resp, err := r.Params.Replication.GetVolumeReplicationInfo(
r.Params.VolumeID,
r.Params.ReplicationID,
r.Params.SecretName,
r.Params.SecretNamespace,
)

return &Response{Response: resp, Error: err}
}

func (r *Response) HasKnownGRPCError(knownErrors []codes.Code) bool {
if r.Error == nil {
return false
Expand Down
73 changes: 73 additions & 0 deletions controllers/replication.storage/volumereplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
pvcDataSource = "PersistentVolumeClaim"
volumeReplicationClass = "VolumeReplicationClass"
volumeReplication = "VolumeReplication"
defaultScheduleTime = time.Hour
)

var (
Expand Down Expand Up @@ -363,16 +364,64 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
}

instance.Status.LastCompletionTime = getCurrentTime()

var requeueForInfo bool

if instance.Spec.ReplicationState == replicationv1alpha1.Primary {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it required only for primary state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to resync only when it is in primary state.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary end has the status, that it reads from the secondary, and so we use that to update the lastSyncTime.

In cases where we demoted a Primary and are waiting for both Secondaries to be in sync, the timestamp is still useful. Tells us how far back the sync is, before either image can be gracefully promoted.

The demoted snapshot sync time is not critical at present, it would also just show that the lastSyncTime is in the past as compared to the demoted snapshot, and ideally once the demoted snapshot is synced the timestamps would be the same. So not sure if it is useful as such.

Needs more thought on how to use that information, hence sticking to when image is primary at one end is useful.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Madhu-1 If an image is in the split-brain state, is there a TS that is usable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK no during split-brain no sync will happen this is not useful.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In split-brain, I do not assume one of the images is marked Primary? If that is the case, the lastSyncTime should be the time of the last successful sync, maybe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are updating it in every interval, I guess if we reach split brain case, it would already have that, cc @ShyamsundarR

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question really is what does RBD report, and then what should the proto report. In the case of split-brain we should report not-synced time (which IMHO is the "0" timestamp), or if available the last time it was synced...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, on a sync failure (like split-brain), no LastSyncTime should be reported, but ideally an error. Either the automatic rescheduling should stop, or be done with an exponential backoff.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the image is in replaying state with RBD we get the additional description, if stopped or split-brain the additional data is not available, and we will hence default to the "0" time stamp. We should be fine with that.

nixpanic marked this conversation as resolved.
Show resolved Hide resolved
info, err := r.getVolumeReplicationInfo(vr)
if err != nil {
logger.Error(err, "Failed to get volume replication info")
return ctrl.Result{}, err
}
ts := info.GetLastSyncTime()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary newline

lastSyncTime := metav1.NewTime(ts.AsTime())
instance.Status.LastSyncTime = &lastSyncTime
requeueForInfo = true
}
if instance.Spec.ReplicationState == replicationv1alpha1.Secondary {
instance.Status.LastSyncTime = nil
}
err = r.updateReplicationStatus(instance, logger, getReplicationState(instance), msg)
if err != nil {
return ctrl.Result{}, err
}

logger.Info(msg)

if requeueForInfo {
nixpanic marked this conversation as resolved.
Show resolved Hide resolved
scheduleTime := getScheduleTime(parameters, logger)
return ctrl.Result{
Requeue: true,
RequeueAfter: scheduleTime,
}, nil
}

return ctrl.Result{}, nil
}

// getScheduleTime takes parameters and returns the scheduling interval
// after converting it to time.Duration. If the schedulingInterval is empty
// or there is error parsing, it is set to the default value.
func getScheduleTime(parameters map[string]string, logger logr.Logger) time.Duration {
nixpanic marked this conversation as resolved.
Show resolved Hide resolved
// the schedulingInterval looks like below, which is the part of volumereplicationclass
// and is an optional parameter.
// ```parameters:
// replication.storage.openshift.io/replication-secret-name: rook-csi-rbd-provisioner
// replication.storage.openshift.io/replication-secret-namespace: rook-ceph
// schedulingInterval: 1m```
rawScheduleTime := parameters["schedulingInterval"]
nixpanic marked this conversation as resolved.
Show resolved Hide resolved
if rawScheduleTime == "" {
return defaultScheduleTime
}
scheduleTime, err := time.ParseDuration(rawScheduleTime)
if err != nil {
logger.Error(err, "failed to parse time: %v", rawScheduleTime)
return defaultScheduleTime
}
return scheduleTime
}

func (r *VolumeReplicationReconciler) getReplicationClient(driverName string) (grpcClient.VolumeReplication, error) {
conns := r.Connpool.GetByNodeID(driverName, "")

Expand Down Expand Up @@ -614,6 +663,30 @@ func (r *VolumeReplicationReconciler) enableReplication(vr *volumeReplicationIns
return nil
}

// getVolumeReplicationInfo gets volume replication info.
func (r *VolumeReplicationReconciler) getVolumeReplicationInfo(vr *volumeReplicationInstance) (*proto.GetVolumeReplicationInfoResponse, error) {
volumeReplication := replication.Replication{
Params: vr.commonRequestParameters,
}

resp := volumeReplication.GetInfo()
if resp.Error != nil {
vr.logger.Error(resp.Error, "failed to get volume replication info")

return nil, resp.Error
}

infoResponse, ok := resp.Response.(*proto.GetVolumeReplicationInfoResponse)
if !ok {
err := fmt.Errorf("received response of unexpected type")
vr.logger.Error(err, "unable to parse response")

return nil, err
}

return infoResponse, nil
}

nixpanic marked this conversation as resolved.
Show resolved Hide resolved
func getReplicationState(instance *replicationv1alpha1.VolumeReplication) replicationv1alpha1.State {
switch instance.Spec.ReplicationState {
case replicationv1alpha1.Primary:
Expand Down
74 changes: 74 additions & 0 deletions controllers/replication.storage/volumereplication_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2022 The Kubernetes-CSI-Addons Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"testing"
"time"

"github.com/go-logr/logr/testr"
)

func TestGetScheduledTime(t *testing.T) {
t.Parallel()
td, _ := time.ParseDuration("1m")
const defaultScheduleTime = time.Hour
logger := testr.New(t)
testcases := []struct {
parameters map[string]string
time time.Duration
}{
{
parameters: map[string]string{
"replication.storage.openshift.io/replication-secret-name": "rook-csi-rbd-provisioner",
"schedulingInterval": "1m",
},
time: td,
},
{
parameters: map[string]string{
"replication.storage.openshift.io/replication-secret-name": "rook-csi-rbd-provisioner",
},
time: defaultScheduleTime,
},
{
parameters: map[string]string{},
time: defaultScheduleTime,
},
{
parameters: map[string]string{
"schedulingInterval": "",
},
time: defaultScheduleTime,
},
{
parameters: map[string]string{
"schedulingInterval": "2mm",
},
time: defaultScheduleTime,
},
}
for _, tt := range testcases {
newtt := tt
t.Run("", func(t *testing.T) {
t.Parallel()
if got := getScheduleTime(newtt.parameters, logger); got != newtt.time {
t.Errorf("GetSchedluedTime() = %v, want %v", got, newtt.time)
}
})
}
}
3 changes: 3 additions & 0 deletions deploy/controller/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,9 @@ spec:
lastStartTime:
format: date-time
type: string
lastSyncTime:
format: date-time
type: string
message:
type: string
observedGeneration:
Expand Down
1 change: 1 addition & 0 deletions docs/volumereplicationclass.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ spec:
parameters:
replication.storage.openshift.io/replication-secret-name: secret-name
replication.storage.openshift.io/replication-secret-namespace: secret-namespace
schedulingInterval: 1m
```
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/container-storage-interface/spec v1.6.0
github.com/csi-addons/spec v0.1.2-0.20220829042231-b27a0d84b50b
github.com/csi-addons/spec v0.1.2-0.20220906123848-52ce69f90900
github.com/go-logr/logr v1.2.3
github.com/kubernetes-csi/csi-lib-utils v0.11.0
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/csi-addons/spec v0.1.2-0.20220829042231-b27a0d84b50b h1:C5KgryC4RwQVSF8L/pgcKftgn7Z1zHFZlACJukPlCxs=
github.com/csi-addons/spec v0.1.2-0.20220829042231-b27a0d84b50b/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.1.2-0.20220906123848-52ce69f90900 h1:zX0138DipZsZqxK1UwAmaRZmL89OuQMkwh7FtvTDgFw=
github.com/csi-addons/spec v0.1.2-0.20220906123848-52ce69f90900/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI=
github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
Expand Down
12 changes: 12 additions & 0 deletions internal/client/fake/replication-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type ReplicationClient struct {
DemoteVolumeMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.DemoteVolumeResponse, error)
// ResyncVolumeMock mocks ResyncVolume RPC call.
ResyncVolumeMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.ResyncVolumeResponse, error)
// GetVolumeReplicationInfo mocks GetVolumeReplicationInfo RPC call.
GetVolumeReplicationInfoMock func(volumeID, replicationID string, secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error)
}

// EnableVolumeReplication calls EnableVolumeReplicationMock mock function.
Expand Down Expand Up @@ -87,3 +89,13 @@ func (rc *ReplicationClient) ResyncVolume(
error) {
return rc.ResyncVolumeMock(volumeID, replicationID, secretName, secretNamespace, parameters)
}

// GetVolumeReplicationInfo calls GetVolumeReplicationInfoMock function.
func (rc *ReplicationClient) GetVolumeReplicationInfo(
volumeID,
replicationID string,
secretName, secretNamespace string) (
*proto.GetVolumeReplicationInfoResponse,
error) {
return rc.GetVolumeReplicationInfoMock(volumeID, replicationID, secretName, secretNamespace)
}
19 changes: 19 additions & 0 deletions internal/client/replication-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type VolumeReplication interface {
// ResyncVolume RPC call to resync the volume.
ResyncVolume(volumeID, replicationID string, force bool, secretName, secretNamespace string, parameters map[string]string) (*proto.
ResyncVolumeResponse, error)
// GetVolumeReplicationInfo RPC call to get volume replication info.
GetVolumeReplicationInfo(volumeID, replicationID, secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error)
}

// NewReplicationClient returns VolumeReplication interface which has the RPC
Expand Down Expand Up @@ -143,3 +145,20 @@ func (rc *replicationClient) ResyncVolume(volumeID, replicationID string, force

return resp, err
}

// GetVolumeReplicationInfo RPC call to get volume replication info.
func (rc *replicationClient) GetVolumeReplicationInfo(volumeID, replicationID,
secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error) {
req := &proto.GetVolumeReplicationInfoRequest{
VolumeId: volumeID,
ReplicationId: replicationID,
SecretName: secretName,
SecretNamespace: secretNamespace,
}

createCtx, cancel := context.WithTimeout(context.Background(), rc.timeout)
defer cancel()
resp, err := rc.client.GetVolumeReplicationInfo(createCtx, req)

return resp, err
}
Loading