Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Merge pull request #96 from RSE-Cambridge/fix-copy-in-out
Browse files Browse the repository at this point in the history
Fix copy in out
  • Loading branch information
JohnGarbutt authored Sep 9, 2019
2 parents 6d7f487 + 4044bae commit 389dfd3
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 151 deletions.
3 changes: 1 addition & 2 deletions .idea/data-acc.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions fs-ansible/create.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,4 @@
- role: lustre
vars:
lustre_state: "present"
lustre_format_disks: true

- role: lustre_client_mount
vars:
lustre_client_mount_present: true
lustre_format_disks: true
4 changes: 0 additions & 4 deletions fs-ansible/delete.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
any_errors_fatal: true
become: yes
roles:
- role: lustre_client_mount
vars:
lustre_client_mount_present: false

- role: lustre
vars:
lustre_state: "absent"
6 changes: 1 addition & 5 deletions fs-ansible/restore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,4 @@
- role: lustre
vars:
lustre_state: "present"
lustre_format_disks: false

- role: lustre_client_mount
vars:
lustre_client_mount_present: true
lustre_format_disks: false
2 changes: 2 additions & 0 deletions internal/pkg/dacctl/workflow_impl/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ func (s sessionFacade) getBricks(poolName datamodel.PoolName, bytes int) (int, [
"unable to get number of requested bricks (%d) for given pool (%s)",
bricksRequired, pool.Pool.Name)
}
// TODO: we should add allocation map into the pool status
// so we spot conflicts if the locks failed for some reason
return actualSize, bricks, nil
}

Expand Down
154 changes: 100 additions & 54 deletions internal/pkg/dacd/brick_manager_impl/session_action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *sessionActionHandler) handleCreate(action datamodel.SessionAction) {
session := action.Session
// Nothing to create, just complete the action
// TODO: why do we send the action?
if session.ActualSizeBytes == 0 {
if session.ActualSizeBytes == 0 && len(session.MultiJobAttachments) == 0 {
return session, nil
}

Expand All @@ -106,13 +106,34 @@ func (s *sessionActionHandler) handleCreate(action datamodel.SessionAction) {
return session, fmt.Errorf("can't do action once delete has been requested for")
}

fsStatus, err := s.fsProvider.Create(session)
session.FilesystemStatus = fsStatus
// Only call create if we have a per job buffer to create
// Note: we always need to do the mount to enable copy-in/out
if session.ActualSizeBytes != 0 {
fsStatus, err := s.fsProvider.Create(session)
session.FilesystemStatus = fsStatus
if err != nil {
session.Status.Error = err.Error()
}

var updateErr error
session, updateErr = s.sessionRegistry.UpdateSession(session)
if updateErr != nil {
log.Println("Failed to update session:", updateErr)
if err == nil {
err = updateErr
}
}
if err != nil {
return session, err
}
log.Println("Filesystem created, now mount on primary brick host")
}

session, err = s.doAllMounts(session, true)
session.Status.FileSystemCreated = err == nil
if err != nil {
session.Status.Error = err.Error()
}

session, updateErr := s.sessionRegistry.UpdateSession(session)
if updateErr != nil {
log.Println("Failed to update session:", updateErr)
Expand All @@ -132,15 +153,22 @@ func (s *sessionActionHandler) handleDelete(action datamodel.SessionAction) {
return action.Session, fmt.Errorf("error getting session: %s", err)
}

if err := s.doAllUnmounts(session, getAttachmentKey(session.Name, true)); err != nil {
return session, fmt.Errorf("failed primary brick host unmount, due to: %s", err.Error())
}
log.Println("did umount primary brick host during delete")

if !session.Status.UnmountComplete {
if err := s.doAllUnmounts(session); err != nil {
log.Println("failed unmount during delete", session.Name)
if err := s.doAllUnmounts(session, getAttachmentKey(session.Name, false)); err != nil {
return session, fmt.Errorf("failed retry unmount during delete, due to: %s", err.Error())
}
log.Println("did unmount during delete")
}
if !session.Status.CopyDataOutComplete && !session.Status.DeleteSkipCopyDataOut {
if err := s.fsProvider.DataCopyOut(action.Session); err != nil {
log.Println("failed DataCopyOut during delete", action.Session.Name)
return session, fmt.Errorf("failed DataCopyOut during delete, due to: %s", err.Error())
}
log.Println("did data copy out during delete")
}

// Only try delete if we have bricks to delete
Expand Down Expand Up @@ -194,45 +222,74 @@ func (s *sessionActionHandler) handleCopyOut(action datamodel.SessionAction) {
})
}

func (s *sessionActionHandler) doAllMounts(actionSession datamodel.Session) (datamodel.Session, error) {
attachmentSession := datamodel.AttachmentSession{
Hosts: actionSession.RequestedAttachHosts,
SessionName: actionSession.Name,
func addHostsFromSession(attachment *datamodel.AttachmentSession, actionSession datamodel.Session, forPrimaryBrickHost bool) {
if forPrimaryBrickHost {
attachment.Hosts = []string{string(actionSession.PrimaryBrickHost)}
} else {
attachment.Hosts = actionSession.RequestedAttachHosts
}
}

func (s *sessionActionHandler) doAllMounts(actionSession datamodel.Session, forPrimaryBrickHost bool) (datamodel.Session, error) {
if actionSession.ActualSizeBytes > 0 {
jobAttachmentStatus := datamodel.AttachmentSessionStatus{
AttachmentSession: attachmentSession,
GlobalMount: actionSession.VolumeRequest.Access == datamodel.Striped || actionSession.VolumeRequest.Access == datamodel.PrivateAndStriped,
PrivateMount: actionSession.VolumeRequest.Access == datamodel.Private || actionSession.VolumeRequest.Access == datamodel.PrivateAndStriped,
SwapBytes: actionSession.VolumeRequest.SwapBytes,
}
if actionSession.CurrentAttachments == nil {
actionSession.CurrentAttachments = map[datamodel.SessionName]datamodel.AttachmentSessionStatus{
actionSession.Name: jobAttachmentStatus,
}
} else {
actionSession.CurrentAttachments[actionSession.Name] = jobAttachmentStatus
jobAttachment := datamodel.AttachmentSession{
SessionName: actionSession.Name,
GlobalMount: actionSession.VolumeRequest.Access == datamodel.Striped || actionSession.VolumeRequest.Access == datamodel.PrivateAndStriped,
PrivateMount: actionSession.VolumeRequest.Access == datamodel.Private || actionSession.VolumeRequest.Access == datamodel.PrivateAndStriped,
SwapBytes: actionSession.VolumeRequest.SwapBytes,
}
if forPrimaryBrickHost {
// Never deal with private mount, as make no sense for copy in to private dir
jobAttachment.PrivateMount = false
}
addHostsFromSession(&jobAttachment, actionSession, forPrimaryBrickHost)
if err := updateAttachments(&actionSession, jobAttachment, forPrimaryBrickHost); err != nil {
return actionSession, err
}
session, err := s.sessionRegistry.UpdateSession(actionSession)
if err != nil {
return actionSession, err
}
actionSession = session

if err := s.fsProvider.Mount(actionSession, jobAttachmentStatus); err != nil {
if err := s.fsProvider.Mount(actionSession, jobAttachment); err != nil {
return actionSession, err
}
// TODO: should we update the session? and delete attachments later?
// TODO: should we track success of each attachment session?
}
for _, sessionName := range actionSession.MultiJobAttachments {
if err := s.doMultiJobMount(actionSession, sessionName); err != nil {
if err := s.doMultiJobMount(actionSession, sessionName, forPrimaryBrickHost); err != nil {
return actionSession, nil
}
}
return actionSession, nil
}

func (s *sessionActionHandler) doMultiJobMount(actionSession datamodel.Session, sessionName datamodel.SessionName) error {
func getAttachmentKey(sessionName datamodel.SessionName, forPrimaryBrickHost bool) datamodel.SessionName {
if forPrimaryBrickHost {
return datamodel.SessionName(fmt.Sprintf("Primary_%s", sessionName))
} else {
return sessionName
}
}

func updateAttachments(session *datamodel.Session, attachment datamodel.AttachmentSession, forPrimaryBrickHost bool) error {
attachmentKey := getAttachmentKey(attachment.SessionName, forPrimaryBrickHost)
if session.CurrentAttachments == nil {
session.CurrentAttachments = map[datamodel.SessionName]datamodel.AttachmentSession{
attachmentKey: attachment,
}
} else {
if _, ok := session.CurrentAttachments[attachmentKey]; ok {
return fmt.Errorf("already attached for session %s and target-volume %s",
attachment.SessionName, session.Name)
}
session.CurrentAttachments[attachmentKey] = attachment
}
return nil
}

func (s *sessionActionHandler) doMultiJobMount(actionSession datamodel.Session, sessionName datamodel.SessionName, forPrimaryBrickHost bool) error {
sessionMutex, err := s.sessionRegistry.GetSessionMutex(sessionName)
if err != nil {
log.Printf("unable to get session mutex: %s due to: %s\n", sessionName, err)
Expand All @@ -256,34 +313,23 @@ func (s *sessionActionHandler) doMultiJobMount(actionSession datamodel.Session,
log.Panicf("trying multi-job attach to non-multi job session %s", multiJobSession.Name)
}

attachmentSession := datamodel.AttachmentSession{
Hosts: actionSession.RequestedAttachHosts,
multiJobAttachment := datamodel.AttachmentSession{
SessionName: actionSession.Name,
GlobalMount: true,
}
multiJobAttachmentStatus := datamodel.AttachmentSessionStatus{
AttachmentSession: attachmentSession,
GlobalMount: true,
}
if multiJobSession.CurrentAttachments == nil {
multiJobSession.CurrentAttachments = map[datamodel.SessionName]datamodel.AttachmentSessionStatus{
attachmentSession.SessionName: multiJobAttachmentStatus,
}
} else {
if _, ok := multiJobSession.CurrentAttachments[attachmentSession.SessionName]; ok {
return fmt.Errorf("already attached for session %s and multi-job %s",
attachmentSession.SessionName, sessionName)
}
multiJobSession.CurrentAttachments[attachmentSession.SessionName] = multiJobAttachmentStatus
addHostsFromSession(&multiJobAttachment, actionSession, forPrimaryBrickHost)
if err := updateAttachments(&multiJobSession, multiJobAttachment, forPrimaryBrickHost); err != nil {
return err
}

multiJobSession, err = s.sessionRegistry.UpdateSession(multiJobSession)
if err != nil {
return err
}
return s.fsProvider.Mount(multiJobSession, multiJobAttachmentStatus)
return s.fsProvider.Mount(multiJobSession, multiJobAttachment)
}

func (s *sessionActionHandler) doMultiJobUnmount(actionSession datamodel.Session, sessionName datamodel.SessionName) error {
func (s *sessionActionHandler) doMultiJobUnmount(actionSession datamodel.Session, sessionName datamodel.SessionName, attachmentKey datamodel.SessionName) error {
sessionMutex, err := s.sessionRegistry.GetSessionMutex(sessionName)
if err != nil {
log.Printf("unable to get session mutex: %s due to: %s\n", sessionName, err)
Expand All @@ -307,29 +353,29 @@ func (s *sessionActionHandler) doMultiJobUnmount(actionSession datamodel.Session
log.Panicf("trying multi-job attach to non-multi job session %s", multiJobSession.Name)
}

attachments, ok := multiJobSession.CurrentAttachments[actionSession.Name]
attachments, ok := multiJobSession.CurrentAttachments[attachmentKey]
if !ok {
log.Println("skip detach, already seems to be detached")
log.Println("skip multi-job detach, already seems to be detached")
return nil
}
if err := s.fsProvider.Unmount(multiJobSession, attachments); err != nil {
return err
}

// update multi job session to note our attachments have now gone
delete(multiJobSession.CurrentAttachments, actionSession.Name)
delete(multiJobSession.CurrentAttachments, attachmentKey)
_, err = s.sessionRegistry.UpdateSession(multiJobSession)
return err
}

func (s *sessionActionHandler) doAllUnmounts(actionSession datamodel.Session) error {
func (s *sessionActionHandler) doAllUnmounts(actionSession datamodel.Session, attachmentKey datamodel.SessionName) error {
if actionSession.ActualSizeBytes > 0 {
if err := s.fsProvider.Unmount(actionSession, actionSession.CurrentAttachments[actionSession.Name]); err != nil {
if err := s.fsProvider.Unmount(actionSession, actionSession.CurrentAttachments[attachmentKey]); err != nil {
return err
}
}
for _, sessionName := range actionSession.MultiJobAttachments {
if err := s.doMultiJobUnmount(actionSession, sessionName); err != nil {
if err := s.doMultiJobUnmount(actionSession, sessionName, attachmentKey); err != nil {
return err
}
}
Expand All @@ -349,9 +395,9 @@ func (s *sessionActionHandler) handleMount(action datamodel.SessionAction) {
return session, errors.New("already mounted, can't mount again")
}

session, err = s.doAllMounts(session)
session, err = s.doAllMounts(session, false)
if err != nil {
if err := s.doAllUnmounts(session); err != nil {
if err := s.doAllUnmounts(session, getAttachmentKey(session.Name, false)); err != nil {
log.Println("error while rolling back possible partial mount", action.Session.Name, err)
}
return action.Session, err
Expand All @@ -375,7 +421,7 @@ func (s *sessionActionHandler) handleUnmount(action datamodel.SessionAction) {
return session, errors.New("already unmounted, can't umount again")
}

if err := s.doAllUnmounts(session); err != nil {
if err := s.doAllUnmounts(session, getAttachmentKey(session.Name, false)); err != nil {
return action.Session, err
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package brick_manager_impl

import (
"context"
"fmt"
"github.com/RSE-Cambridge/data-acc/internal/pkg/datamodel"
"github.com/RSE-Cambridge/data-acc/internal/pkg/mock_filesystem"
"github.com/RSE-Cambridge/data-acc/internal/pkg/mock_registry"
"github.com/RSE-Cambridge/data-acc/internal/pkg/mock_store"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"testing"
)
Expand All @@ -20,37 +15,3 @@ func TestSessionActionHandler_ProcessSessionAction_Unknown(t *testing.T) {
fmt.Sprintf("not yet implemented action for %+v", action),
func() { handler.ProcessSessionAction(action) })
}

func TestSessionActionHandler_handleCreate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
registry := mock_registry.NewMockSessionRegistry(mockCtrl)
actions := mock_registry.NewMockSessionActions(mockCtrl)
fsProvider := mock_filesystem.NewMockProvider(mockCtrl)
handler := sessionActionHandler{
sessionRegistry: registry, actions: actions, fsProvider: fsProvider,
}
action := datamodel.SessionAction{
ActionType: datamodel.SessionCreateFilesystem,
Session: datamodel.Session{Name: "test", ActualSizeBytes: 42},
}
sessionMutex := mock_store.NewMockMutex(mockCtrl)
registry.EXPECT().GetSessionMutex(action.Session.Name).Return(sessionMutex, nil)
sessionMutex.EXPECT().Lock(context.TODO())
sessionMutex.EXPECT().Unlock(context.TODO())
registry.EXPECT().GetSession(action.Session.Name).Return(action.Session, nil)
fsProvider.EXPECT().Create(action.Session)
updatedSession := datamodel.Session{
Name: action.Session.Name,
Status: datamodel.SessionStatus{FileSystemCreated: true},
ActualSizeBytes: 42,
}
registry.EXPECT().UpdateSession(updatedSession).Return(updatedSession, nil)
updatedAction := datamodel.SessionAction{
ActionType: datamodel.SessionCreateFilesystem,
Session: updatedSession,
}
actions.EXPECT().CompleteSessionAction(updatedAction)

handler.handleCreate(action)
}
Loading

0 comments on commit 389dfd3

Please sign in to comment.