Skip to content

Commit

Permalink
Fix data path concurrent
Browse files Browse the repository at this point in the history
Signed-off-by: Ming <mqiu@vmware.com>
  • Loading branch information
qiuming-best committed Jun 30, 2023
1 parent 1bfcee7 commit 02bf31b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
12 changes: 6 additions & 6 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request

log.Info("Restore PVC is ready")

if r.dataPathMgr.IsConcurrentExceed() {
log.Info("Data path instance is concurrent limited, requeue after one minute")
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
}

// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
Expand Down Expand Up @@ -214,12 +219,7 @@ func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, dd *

fsRestore, err := r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("runCancelableDataDownload is concurrent limited")
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
} else {
return r.errorOut(ctx, dd, err, "error to create data path", log)
}
return r.errorOut(ctx, dd, err, "error to create data path", log)
}

path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log)
Expand Down
11 changes: 5 additions & 6 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)

log.Info("Exposed snapshot is ready")

if r.dataPathMgr.IsConcurrentExceed() {
log.Info("Data path instance is concurrent limited, requeue after one minute")
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
}
// Update status to InProgress
original := du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
Expand Down Expand Up @@ -209,12 +213,7 @@ func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, du *

fsBackup, err := r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("runCancelableDataUpload is concurrent limited")
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
} else {
return r.errorOut(ctx, du, err, "error to create data path", log)
}
return r.errorOut(ctx, du, err, "error to create data path", log)
}

path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log)
Expand Down
7 changes: 7 additions & 0 deletions pkg/datapath/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ func NewManager(cocurrentNum int) *Manager {
}
}

// IsConcurrentExceed returns whether the current data path instance number is exceed limit
func (m *Manager) IsConcurrentExceed() bool {
m.trackerLock.Lock()
defer m.trackerLock.Unlock()
return len(m.tracker) == m.cocurrentNum
}

// CreateFileSystemBR creates a new file system backup/restore data path instance
func (m *Manager) CreateFileSystemBR(jobName string, requestorType string, ctx context.Context, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) (AsyncBR, error) {
m.trackerLock.Lock()
Expand Down

0 comments on commit 02bf31b

Please sign in to comment.