Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Skip format when doing a restore
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnGarbutt committed Aug 31, 2019
1 parent f0c92e6 commit d472037
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 69 deletions.
1 change: 1 addition & 0 deletions internal/pkg/dacd/brick_manager_impl/brick_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (bm *brickManager) completePendingActions() {
log.Fatalf("unable to get outstanding session action requests due to: %s", err.Error())
}

// We wait for these to finish before starting keepalive
for _, action := range actions {
// TODO: what about the extra response if no one is listening any more?
bm.sessionActionHandler.ProcessSessionAction(action)
Expand Down
128 changes: 65 additions & 63 deletions internal/pkg/dacd/brick_manager_impl/session_action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,38 +50,6 @@ func (s *sessionActionHandler) ProcessSessionAction(action datamodel.SessionActi
}
}

func (s *sessionActionHandler) RestoreSession(session datamodel.Session) {
// Get session lock before attempting the restore
sessionMutex, err := s.sessionRegistry.GetSessionMutex(session.Name)
if err != nil {
log.Printf("unable to get session mutex: %s due to: %s\n", session.Name, err)
return
}
err = sessionMutex.Lock(context.TODO())
if err != nil {
log.Printf("unable to lock session mutex: %s due to: %s\n", session.Name, err)
return
}
// Always drop mutex on function exit
defer func() {
if err := sessionMutex.Unlock(context.TODO()); err != nil {
log.Printf("failed to drop mutex for: %s due to: %s\n", session.Name, err.Error())
}
}()

// TODO: need a way that doesn't try to do format!!
_, err = s.doCreate(session)
if err != nil {
log.Printf("unable to restore session: %+v\n", session)
session.Status.Error = err.Error()
if _, err := s.sessionRegistry.UpdateSession(session); err != nil {
log.Panicf("unable to report that session restore failed for session: %s", session.Name)
}
}

// TODO: do we just assume any pending mounts will resume in their own time? or should we retry mounts too?
}

func (s *sessionActionHandler) processWithMutex(action datamodel.SessionAction, process func() (datamodel.Session, error)) {

sessionName := action.Session.Name
Expand Down Expand Up @@ -122,41 +90,38 @@ func (s *sessionActionHandler) processWithMutex(action datamodel.SessionAction,

func (s *sessionActionHandler) handleCreate(action datamodel.SessionAction) {
s.processWithMutex(action, func() (datamodel.Session, error) {
return s.doCreate(action.Session)
})
}

func (s *sessionActionHandler) doCreate(session datamodel.Session) (datamodel.Session, error) {
// Nothing to create, just complete the action
// TODO: why do we send the action?
if session.ActualSizeBytes == 0 {
return session, nil
}
session := action.Session
// Nothing to create, just complete the action
// TODO: why do we send the action?
if session.ActualSizeBytes == 0 {
return session, nil
}

// Get latest session now we have the mutex
session, err := s.sessionRegistry.GetSession(session.Name)
if err != nil {
return session, fmt.Errorf("error getting session: %s", err)
}
if session.Status.DeleteRequested {
return session, fmt.Errorf("can't do action once delete has been requested for")
}
// Get latest session now we have the mutex
session, err := s.sessionRegistry.GetSession(session.Name)
if err != nil {
return session, fmt.Errorf("error getting session: %s", err)
}
if session.Status.DeleteRequested {
return session, fmt.Errorf("can't do action once delete has been requested for")
}

fsStatus, err := s.fsProvider.Create(session)
session.FilesystemStatus = fsStatus
session.Status.FileSystemCreated = err == nil
if err != nil {
session.Status.Error = err.Error()
}
fsStatus, err := s.fsProvider.Create(session)
session.FilesystemStatus = fsStatus
session.Status.FileSystemCreated = err == nil
if err != nil {
session.Status.Error = err.Error()
}

session, updateErr := s.sessionRegistry.UpdateSession(session)
if updateErr != nil {
log.Println("Failed to update session:", updateErr)
if err == nil {
err = updateErr
session, updateErr := s.sessionRegistry.UpdateSession(session)
if updateErr != nil {
log.Println("Failed to update session:", updateErr)
if err == nil {
err = updateErr
}
}
}
return session, err
return session, err
})
}

func (s *sessionActionHandler) handleDelete(action datamodel.SessionAction) {
Expand Down Expand Up @@ -416,3 +381,40 @@ func (s *sessionActionHandler) handleUnmount(action datamodel.SessionAction) {
return s.sessionRegistry.UpdateSession(session)
})
}

func (s *sessionActionHandler) RestoreSession(session datamodel.Session) {
if session.ActualSizeBytes == 0 {
// Nothing to do
return
}

// Get session lock before attempting the restore
sessionMutex, err := s.sessionRegistry.GetSessionMutex(session.Name)
if err != nil {
log.Printf("unable to get session mutex: %s due to: %s\n", session.Name, err)
return
}
err = sessionMutex.Lock(context.TODO())
if err != nil {
log.Printf("unable to lock session mutex: %s due to: %s\n", session.Name, err)
return
}
// Always drop mutex on function exit
defer func() {
if err := sessionMutex.Unlock(context.TODO()); err != nil {
log.Printf("failed to drop mutex for: %s due to: %s\n", session.Name, err.Error())
}
}()

err = s.fsProvider.Restore(session)

if err != nil {
log.Printf("unable to restore session: %+v\n", session)
session.Status.Error = err.Error()
if _, err := s.sessionRegistry.UpdateSession(session); err != nil {
log.Panicf("unable to report that session restore failed for session: %s", session.Name)
}
}

// TODO: do we just assume any pending mounts will resume in their own time? or should we retry mounts too?
}
1 change: 1 addition & 0 deletions internal/pkg/filesystem/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "github.com/RSE-Cambridge/data-acc/internal/pkg/datamodel"

type Provider interface {
Create(session datamodel.Session) (datamodel.FilesystemStatus, error)
Restore(session datamodel.Session) error
Delete(session datamodel.Session) error

DataCopyIn(session datamodel.Session) error
Expand Down
13 changes: 8 additions & 5 deletions internal/pkg/filesystem_impl/ansible.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,21 @@ func setupAnsible(fsType FSType, internalName string, bricks []datamodel.Brick)
return dir, err
}

func executeAnsibleSetup(internalName string, bricks []datamodel.Brick) error {
func executeAnsibleSetup(internalName string, bricks []datamodel.Brick, doFormat bool) error {
// TODO: restore beegfs support
dir, err := setupAnsible(Lustre, internalName, bricks)
if err != nil {
return err
}
defer os.RemoveAll(dir)

formatArgs := "dac.yml -i inventory --tag format"
err = executeAnsiblePlaybook(dir, formatArgs)
if err != nil {
return fmt.Errorf("error during server format: %s", err.Error())
// allow skip format when trying to rebuild
if doFormat {
formatArgs := "dac.yml -i inventory --tag format"
err = executeAnsiblePlaybook(dir, formatArgs)
if err != nil {
return fmt.Errorf("error during server format: %s", err.Error())
}
}

startupArgs := "dac.yml -i inventory --tag mount,create_mdt,create_mgs,create_osts,client_mount"
Expand Down
6 changes: 5 additions & 1 deletion internal/pkg/filesystem_impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ func (f *fileSystemProvider) Create(session datamodel.Session) (datamodel.Filesy
InternalName: GetNewUUID(),
InternalData: "",
}
err := executeAnsibleSetup(session.FilesystemStatus.InternalName, session.AllocatedBricks)
err := executeAnsibleSetup(session.FilesystemStatus.InternalName, session.AllocatedBricks, true)
return session.FilesystemStatus, err
}

func (f *fileSystemProvider) Restore(session datamodel.Session) error {
return executeAnsibleSetup(session.FilesystemStatus.InternalName, session.AllocatedBricks, false)
}

func (f *fileSystemProvider) Delete(session datamodel.Session) error {
return executeAnsibleTeardown(session.FilesystemStatus.InternalName, session.AllocatedBricks)
}
Expand Down
14 changes: 14 additions & 0 deletions internal/pkg/mock_filesystem/provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d472037

Please sign in to comment.