Skip to content

Commit

Permalink
Split device sync into readable functions (#879)
Browse files Browse the repository at this point in the history
Signed-off-by: Bala.FA <bala@minio.io>
  • Loading branch information
balamurugana authored Nov 3, 2023
1 parent 45ff6da commit bc00b70
Showing 1 changed file with 121 additions and 115 deletions.
236 changes: 121 additions & 115 deletions pkg/device/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,126 @@ func syncDrive(drive *types.Drive, device device) (updated bool) {
return
}

func verifyDrive(drive *types.Drive) (updated bool) {
switch drive.Status.Status {
case directpvtypes.DriveStatusReady, directpvtypes.DriveStatusLost, directpvtypes.DriveStatusError, directpvtypes.DriveStatusMoving:
default:
return false
}

source := utils.AddDevPrefix(string(drive.GetDriveName()))
target := types.GetDriveMountDir(drive.Status.FSUUID)
if err := xfs.Mount(source, target); err != nil {
drive.Status.Status = directpvtypes.DriveStatusError
drive.SetMountErrorCondition(fmt.Sprintf("unable to mount; %v", err))
client.Eventf(drive, client.EventTypeWarning, client.EventReasonDriveMountError, "unable to mount the drive; %v", err)
klog.ErrorS(err, "unable to mount the drive", "Source", source, "Target", target)
return true
}

client.Eventf(
drive,
client.EventTypeNormal,
client.EventReasonDriveMounted,
"Drive mounted successfully to %s", target,
)

if drive.Status.Status == directpvtypes.DriveStatusLost {
updated = true
drive.Status.Status = directpvtypes.DriveStatusReady
}

latestErrorConditionType := drive.GetLatestErrorConditionType()
if drive.Status.Status == directpvtypes.DriveStatusError {
switch latestErrorConditionType {
case directpvtypes.DriveConditionTypeMountError, directpvtypes.DriveConditionTypeMultipleMatches, directpvtypes.DriveConditionTypeRelabelError:
updated = true
drive.Status.Status = directpvtypes.DriveStatusReady
}
}

if !drive.Spec.Relabel {
return updated
}

err := os.Symlink(".", types.GetVolumeRootDir(drive.Status.FSUUID))
if err != nil {
switch {
case errors.Is(err, os.ErrExist):
err = nil
default:
if latestErrorConditionType != directpvtypes.DriveConditionTypeIOError {
drive.Status.Status = directpvtypes.DriveStatusError
drive.SetRelabelErrorCondition(fmt.Sprintf("unable to relabel; %v", err))
}

client.Eventf(
drive,
client.EventTypeWarning,
client.EventReasonDriveRelabelError,
"unable to relabel; %v", err,
)

klog.ErrorS(
err,
"unable to create symlink",
"symlink", types.GetVolumeRootDir(drive.Status.FSUUID),
"drive", drive.Name,
)
}
}

if err == nil {
drive.Spec.Relabel = false
}

return true
}

func updateDrive(ctx context.Context, driveName string, deviceMap map[string][]device) error {
drive, err := client.DriveClient().Get(ctx, driveName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
var updated bool
devices := deviceMap[drive.Status.FSUUID]
switch len(devices) {
case 0:
// no match
if drive.Status.Status != directpvtypes.DriveStatusLost {
updated = true
drive.Status.Status = directpvtypes.DriveStatusLost
}
case 1:
// device found
updated = syncDrive(drive, devices[0])
if verifyDrive(drive) {
updated = true
}
default:
// more than one matching devices
updated = true
var deviceNames string
for i := range devices {
if deviceNames != "" {
deviceNames += ", "
}
deviceNames += devices[i].Name
}
drive.Status.Status = directpvtypes.DriveStatusError
drive.SetMultipleMatchesErrorCondition(fmt.Sprintf("multiple devices found by FSUUID; %v", deviceNames))
client.Eventf(drive, client.EventTypeWarning, client.EventReasonDriveHasMultipleMatches, "unable to mount the drive due to %v", err)
klog.ErrorS(err, "multiple devices found by FSUUID", "drive", drive.GetDriveName(), "FSUUID", drive.Status.FSUUID, "devices", deviceNames)
}
if updated {
_, err = client.DriveClient().Update(ctx, drive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()})
}
return err
}

// Sync - matches and syncs the drive with locally probed device
func Sync(ctx context.Context, nodeID directpvtypes.NodeID) error {
deviceMap, err := probeDeviceMap()
Expand All @@ -105,121 +225,7 @@ func Sync(ctx context.Context, nodeID directpvtypes.NodeID) error {
return err
}
for i := range drives {
updateFunc := func() error {
drive, err := client.DriveClient().Get(ctx, drives[i].Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
var updated bool
devices := deviceMap[drive.Status.FSUUID]
switch len(devices) {
case 0:
// no match
if drive.Status.Status != directpvtypes.DriveStatusLost {
updated = true
drive.Status.Status = directpvtypes.DriveStatusLost
}
case 1:
// device found
if syncDrive(drive, devices[0]) {
updated = true
}

// verify mount
switch drive.Status.Status {
case directpvtypes.DriveStatusReady, directpvtypes.DriveStatusLost, directpvtypes.DriveStatusError, directpvtypes.DriveStatusMoving:
source := utils.AddDevPrefix(string(drive.GetDriveName()))
target := types.GetDriveMountDir(drive.Status.FSUUID)
if err = xfs.Mount(source, target); err != nil {
updated = true
drive.Status.Status = directpvtypes.DriveStatusError
drive.SetMountErrorCondition(fmt.Sprintf("unable to mount; %v", err))
// Events and logs
client.Eventf(drive, client.EventTypeWarning, client.EventReasonDriveMountError, "unable to mount the drive; %v", err)
klog.ErrorS(err, "unable to mount the drive", "Source", source, "Target", target)
} else {
client.Eventf(
drive,
client.EventTypeNormal,
client.EventReasonDriveMounted,
"Drive mounted successfully to %s", target,
)

if drive.Status.Status == directpvtypes.DriveStatusLost {
updated = true
drive.Status.Status = directpvtypes.DriveStatusReady
}

latestErrorConditionType := drive.GetLatestErrorConditionType()
if drive.Status.Status == directpvtypes.DriveStatusError {
switch latestErrorConditionType {
case directpvtypes.DriveConditionTypeMountError, directpvtypes.DriveConditionTypeMultipleMatches, directpvtypes.DriveConditionTypeRelabelError:
updated = true
drive.Status.Status = directpvtypes.DriveStatusReady
}
}

if drive.Spec.Relabel {
updated = true

if err = os.Symlink(".", types.GetVolumeRootDir(drive.Status.FSUUID)); err != nil {
switch {
case errors.Is(err, os.ErrExist):
err = nil
default:
if latestErrorConditionType != directpvtypes.DriveConditionTypeIOError {
drive.Status.Status = directpvtypes.DriveStatusError
drive.SetRelabelErrorCondition(fmt.Sprintf("unable to relabel; %v", err))
}

client.Eventf(
drive,
client.EventTypeWarning,
client.EventReasonDriveRelabelError,
"unable to relabel; %v", err,
)

klog.ErrorS(
err,
"unable to create symlink",
"symlink", types.GetVolumeRootDir(drive.Status.FSUUID),
"drive", drive.Name,
)
}
}

if err == nil {
drive.Spec.Relabel = false
}
}
}
}
default:
// more than one matching devices
updated = true
var deviceNames string
for i := range devices {
if deviceNames != "" {
deviceNames += ", "
}
deviceNames += devices[i].Name
}
drive.Status.Status = directpvtypes.DriveStatusError
drive.SetMultipleMatchesErrorCondition(fmt.Sprintf("multiple devices found by FSUUID; %v", deviceNames))
client.Eventf(drive, client.EventTypeWarning, client.EventReasonDriveHasMultipleMatches, "unable to mount the drive due to %v", err)
klog.ErrorS(err, "multiple devices found by FSUUID", "drive", drive.GetDriveName(), "FSUUID", drive.Status.FSUUID, "devices", deviceNames)
}
if updated {
if _, err := client.DriveClient().Update(ctx, drive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()}); err != nil {
return err
}
}
return nil
}
if err = retry.RetryOnConflict(retry.DefaultRetry, updateFunc); err != nil {
if err = retry.RetryOnConflict(retry.DefaultRetry, func() error { return updateDrive(ctx, drives[i].Name, deviceMap) }); err != nil {
return err
}
}
Expand Down

0 comments on commit bc00b70

Please sign in to comment.