diff --git a/cmd/upload.go b/cmd/upload.go index 6433ada..2168e07 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -75,6 +75,8 @@ func init() { } func performUpload(u *uploader.Uploader) error { + u.Log.Info("Running...") + /* Cleans */ if u.Config.Hidden.Enabled { gp := gorpool.NewPool(config.Config.Core.Workers, 0). @@ -87,16 +89,30 @@ func performUpload(u *uploader.Uploader) error { } } + /* Generate Additional Rclone Params */ + additionalRcloneParams := u.CheckRcloneParams() + /* Copies */ if len(u.Config.Remotes.Copy) > 0 { - if err := u.Copy(); err != nil { - return errors.WithMessage(err, "failed performing copies") + if err := u.Copy(additionalRcloneParams); err != nil { + return errors.WithMessage(err, "failed performing all copies") } } /* Move */ + if len(u.Config.Remotes.Move) > 0 { + if err := u.Move(false, additionalRcloneParams); err != nil { + return errors.WithMessage(err, "failed performing move") + } + } /* Move Server Side */ + if len(u.Config.Remotes.MoveServerSide) > 0 { + if err := u.Move(true, nil); err != nil { + return errors.WithMessage(err, "failed performing server-side moves") + } + } + u.Log.Info("Finished!") return nil } diff --git a/config/uploader.go b/config/uploader.go index dd7f24e..091e49c 100644 --- a/config/uploader.go +++ b/config/uploader.go @@ -14,11 +14,16 @@ type UploaderHidden struct { Cleanup bool } +type UploaderRemotesMoveServerSide struct { + From string + To string +} + type UploaderRemotes struct { Clean []string Copy []string Move string - MoveServerSide []map[string]string `mapstructure:"move_server_side"` + MoveServerSide []UploaderRemotesMoveServerSide `mapstructure:"move_server_side"` Dedupe []string } diff --git a/rclone/copy.go b/rclone/copy.go index 11e6b67..6901643 100644 --- a/rclone/copy.go +++ b/rclone/copy.go @@ -10,32 +10,38 @@ import ( /* Public */ -func Copy(u *config.UploaderConfig, localPath string, remotePath string, serviceAccountFile *pathutils.Path) (bool, int, error) { +func Copy(u *config.UploaderConfig, from string, to string, serviceAccountFile *pathutils.Path, + additionalRcloneParams []string) (bool, int, error) { // set variables rLog := log.WithFields(logrus.Fields{ - "action": CMD_COPY, - "local_path": localPath, - "remote_path": remotePath, + "action": CMD_COPY, + "from": from, + "to": to, }) result := false // generate required rclone parameters params := []string{ CMD_COPY, - localPath, - remotePath, + from, + to, } if baseParams, err := getBaseParams(); err != nil { - return false, 1, errors.Wrapf(err, "failed generating baseParams to %q: %q -> %q", - CMD_COPY, localPath, remotePath) + return false, 1, errors.WithMessagef(err, "failed generating baseParams to %q: %q -> %q", + CMD_COPY, from, to) } else { params = append(params, baseParams...) } - if additionalParams, err := getAdditionalParams(CMD_COPY, u.RcloneParams.Copy); err != nil { - return false, 1, errors.Wrapf(err, "failed generating additionalParams to %q: %q -> %q", - CMD_COPY, localPath, remotePath) + extraParams := u.RcloneParams.Copy + if additionalRcloneParams != nil { + extraParams = append(extraParams, additionalRcloneParams...) + } + + if additionalParams, err := getAdditionalParams(CMD_COPY, extraParams); err != nil { + return false, 1, errors.WithMessagef(err, "failed generating additionalParams to %q: %q -> %q", + CMD_COPY, from, to) } else { params = append(params, additionalParams...) } diff --git a/rclone/deletefile.go b/rclone/deletefile.go index 027bba7..a90804c 100644 --- a/rclone/deletefile.go +++ b/rclone/deletefile.go @@ -23,7 +23,7 @@ func DeleteFile(remoteFilePath string) (bool, int, error) { } if baseParams, err := getBaseParams(); err != nil { - return false, 1, errors.Wrapf(err, "failed generating baseParams to %q: %q", CMD_DELETE_FILE, + return false, 1, errors.WithMessagef(err, "failed generating baseParams to %q: %q", CMD_DELETE_FILE, remoteFilePath) } else { params = append(params, baseParams...) diff --git a/rclone/move.go b/rclone/move.go index 0bbb819..5373854 100644 --- a/rclone/move.go +++ b/rclone/move.go @@ -10,45 +10,91 @@ import ( /* Public */ -func Move(u *config.UploaderConfig, localPath string, remotePath string, serviceAccountFile *pathutils.Path) (bool, int, error) { +func Move(u *config.UploaderConfig, from string, to string, serviceAccountFile *pathutils.Path, serverSide bool, + additionalRcloneParams []string) (bool, int, error) { // set variables rLog := log.WithFields(logrus.Fields{ - "action": CMD_MOVE, - "local_path": localPath, - "remote_path": remotePath, + "action": CMD_MOVE, + "from": from, + "to": to, }) result := false // generate required rclone parameters params := []string{ CMD_MOVE, - localPath, - remotePath, + from, + to, } if baseParams, err := getBaseParams(); err != nil { - return false, 1, errors.Wrapf(err, "failed generating baseParams to %q: %q -> %q", - CMD_MOVE, localPath, remotePath) + return false, 1, errors.WithMessagef(err, "failed generating baseParams to %q: %q -> %q", + CMD_MOVE, from, to) } else { params = append(params, baseParams...) } - if additionalParams, err := getAdditionalParams(CMD_MOVE, u.RcloneParams.Move); err != nil { - return false, 1, errors.Wrapf(err, "failed generating additionalParams to %q: %q -> %q", - CMD_MOVE, localPath, remotePath) + extraParams := u.RcloneParams.Move + if serverSide { + // this is a server side move, so add any additional configured params + extraParams = append(extraParams, u.RcloneParams.MoveServerSide...) + // add server side parameter + extraParams = append(extraParams, "--drive-server-side-across-configs") + } else if additionalRcloneParams != nil { + // add additional params from parameters + extraParams = append(extraParams, additionalRcloneParams...) + } + + if additionalParams, err := getAdditionalParams(CMD_MOVE, extraParams); err != nil { + return false, 1, errors.WithMessagef(err, "failed generating additionalParams to %q: %q -> %q", + CMD_MOVE, from, to) } else { params = append(params, additionalParams...) } - if serviceAccountFile != nil { - params = append(params, getServiceAccountParams(serviceAccountFile)...) + if serviceAccountFile != nil && !serverSide { + // service account file provided but this is a server side move, so ignore it + saParams := getServiceAccountParams(serviceAccountFile) + params = append(params, saParams...) } - rLog.Tracef("Generated params: %v", params) + rLog.Debugf("Generated params: %v", params) + + // setup cmd + cmdOptions := cmd.Options{ + Buffered: false, + Streaming: true, + } + rcloneCmd := cmd.NewCmdOptions(cmdOptions, cfg.Rclone.Path, params...) + + // live stream logs + doneChan := make(chan struct{}) + go func() { + defer close(doneChan) + + for rcloneCmd.Stdout != nil || rcloneCmd.Stderr != nil { + select { + case line, open := <-rcloneCmd.Stdout: + if !open { + rcloneCmd.Stdout = nil + continue + } + log.Info(line) + case line, open := <-rcloneCmd.Stderr: + if !open { + rcloneCmd.Stderr = nil + continue + } + log.Info(line) + } + } + }() + + // run command + rLog.Debug("Starting...") - // remove file - rcloneCmd := cmd.NewCmd(cfg.Rclone.Path, params...) status := <-rcloneCmd.Start() + <-doneChan // check status switch status.Exit { diff --git a/rclone/rmdir.go b/rclone/rmdir.go index cf635e1..3facf11 100644 --- a/rclone/rmdir.go +++ b/rclone/rmdir.go @@ -24,7 +24,7 @@ func RmDir(remoteFilePath string) (bool, int, error) { } if baseParams, err := getBaseParams(); err != nil { - return false, 1, errors.Wrapf(err, "failed generating baseParams to %q: %q", CMD_DELETE_DIR, + return false, 1, errors.WithMessagef(err, "failed generating baseParams to %q: %q", CMD_DELETE_DIR, remoteFilePath) } else { params = append(params, baseParams...) diff --git a/uploader/check.go b/uploader/check.go index becd15a..0f181a2 100644 --- a/uploader/check.go +++ b/uploader/check.go @@ -15,3 +15,8 @@ func (u *Uploader) Check() (bool, error) { // Perform the check return u.Checker.Check(&u.Config.Check, u.Log, u.LocalFiles, u.LocalFilesSize) } + +func (u *Uploader) CheckRcloneParams() []string { + // Return rclone parameters for a passed check + return u.Checker.RcloneParams(&u.Config.Check, u.Log) +} diff --git a/uploader/checker/age.go b/uploader/checker/age.go index 64fd15b..f7c43fb 100644 --- a/uploader/checker/age.go +++ b/uploader/checker/age.go @@ -1,6 +1,7 @@ package checker import ( + "fmt" "github.com/dustin/go-humanize" "github.com/l3uddz/crop/config" "github.com/l3uddz/crop/pathutils" @@ -45,3 +46,23 @@ func (_ Age) CheckFile(cfg *config.UploaderCheck, log *logrus.Entry, path pathut return false, nil } + +func (_ Age) RcloneParams(cfg *config.UploaderCheck, log *logrus.Entry) []string { + params := []string{ + "--min-age", + fmt.Sprintf("%dm", cfg.Limit), + } + + // add filters + for _, include := range cfg.Include { + params = append(params, + "--include", include) + } + + for _, exclude := range cfg.Exclude { + params = append(params, + "--exclude", exclude) + } + + return params +} diff --git a/uploader/checker/interface.go b/uploader/checker/interface.go index 5e71b78..e8dd854 100644 --- a/uploader/checker/interface.go +++ b/uploader/checker/interface.go @@ -9,4 +9,5 @@ import ( type Interface interface { Check(*config.UploaderCheck, *logrus.Entry, []pathutils.Path, uint64) (bool, error) CheckFile(*config.UploaderCheck, *logrus.Entry, pathutils.Path, uint64) (bool, error) + RcloneParams(check *config.UploaderCheck, entry *logrus.Entry) []string } diff --git a/uploader/checker/size.go b/uploader/checker/size.go index 32a62e6..73dd213 100644 --- a/uploader/checker/size.go +++ b/uploader/checker/size.go @@ -33,3 +33,20 @@ func (_ Size) CheckFile(cfg *config.UploaderCheck, log *logrus.Entry, path pathu return false, nil } + +func (_ Size) RcloneParams(cfg *config.UploaderCheck, log *logrus.Entry) []string { + var params []string + + // add filters + for _, include := range cfg.Include { + params = append(params, + "--include", include) + } + + for _, exclude := range cfg.Exclude { + params = append(params, + "--exclude", exclude) + } + + return params +} diff --git a/uploader/copy.go b/uploader/copy.go index 4f3dca6..bd57b84 100644 --- a/uploader/copy.go +++ b/uploader/copy.go @@ -10,13 +10,11 @@ import ( "time" ) -func (u *Uploader) Copy() error { +func (u *Uploader) Copy(additionalRcloneParams []string) error { // iterate all remotes and run copy for _, remotePath := range u.Config.Remotes.Copy { - // set logic variables + // set variables attempts := 1 - - // set log rLog := u.Log.WithFields(logrus.Fields{ "copy_remote": remotePath, "copy_local_path": u.Config.LocalFolder, @@ -48,7 +46,8 @@ func (u *Uploader) Copy() error { // copy rLog.Info("Copying...") - success, exitCode, err := rclone.Copy(u.Config, u.Config.LocalFolder, remotePath, serviceAccount) + success, exitCode, err := rclone.Copy(u.Config, u.Config.LocalFolder, remotePath, serviceAccount, + additionalRcloneParams) // check result if err != nil { @@ -69,12 +68,12 @@ func (u *Uploader) Copy() error { case rclone.EXIT_FATAL_ERROR: // ban this service account if err := cache.Set(serviceAccount.RealPath, time.Now().UTC().Add(25*time.Hour)); err != nil { - rLog.WithError(err).Error("Failed banning service account, cannot proceed...") + rLog.WithError(err).Error("Failed banning service account, cannot try again...") return fmt.Errorf("failed banning service account: %v", serviceAccount.RealPath) } // attempt copy again - rLog.Warnf("Copy failed with retryable exit code %v, attempting again...", exitCode) + rLog.Warnf("Copy failed with retryable exit code %v, trying again...", exitCode) attempts++ continue default: diff --git a/uploader/move.go b/uploader/move.go new file mode 100644 index 0000000..c5ca9a1 --- /dev/null +++ b/uploader/move.go @@ -0,0 +1,117 @@ +package uploader + +import ( + "fmt" + "github.com/l3uddz/crop/cache" + "github.com/l3uddz/crop/pathutils" + "github.com/l3uddz/crop/rclone" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "time" +) + +type MoveInstruction struct { + From string + To string + ServerSide bool +} + +func (u *Uploader) Move(serverSide bool, additionalRcloneParams []string) error { + var moveRemotes []MoveInstruction + + // create move instructions + if serverSide { + // this is a server side move + for _, remote := range u.Config.Remotes.MoveServerSide { + moveRemotes = append(moveRemotes, MoveInstruction{ + From: remote.From, + To: remote.To, + ServerSide: true, + }) + } + } else { + // this is a normal move (to only one location) + moveRemotes = append(moveRemotes, MoveInstruction{ + From: u.Config.LocalFolder, + To: u.Config.Remotes.Move, + ServerSide: false, + }) + } + + // iterate all remotes and run copy + for _, move := range moveRemotes { + // set variables + attempts := 1 + rLog := u.Log.WithFields(logrus.Fields{ + "move_to": move.To, + "move_from": move.From, + "attempts": attempts, + }) + + // move to remote + for { + // get service account file + var serviceAccount *pathutils.Path + var err error + + if u.ServiceAccountCount > 0 && !serverSide { + // server side moves not supported with service account files + serviceAccount, err = u.getAvailableServiceAccount() + if err != nil { + return errors.WithMessagef(err, + "aborting further move attempts of %q due to serviceAccount exhaustion", + move.From) + } + + // reset log + rLog = u.Log.WithFields(logrus.Fields{ + "move_to": move.To, + "move_from": move.From, + "attempts": attempts, + "service_account": serviceAccount.RealPath, + }) + } + + // move + rLog.Info("Moving...") + success, exitCode, err := rclone.Move(u.Config, move.From, move.To, serviceAccount, serverSide, + additionalRcloneParams) + + // check result + if err != nil { + rLog.WithError(err).Errorf("Failed unexpectedly...") + return errors.WithMessagef(err, "move failed unexpectedly with exit code: %v", exitCode) + } else if success { + // successful exit code + break + } else if serverSide { + // server side moves not supported with service accounts + return fmt.Errorf("failed and cannot proceed with exit code: %v", exitCode) + } + + // are we using service accounts? + if u.ServiceAccountCount == 0 { + return fmt.Errorf("move failed with exit code: %v", exitCode) + } + + // is this an exit code we can retry? + switch exitCode { + case rclone.EXIT_FATAL_ERROR: + // ban this service account + if err := cache.Set(serviceAccount.RealPath, time.Now().UTC().Add(25*time.Hour)); err != nil { + rLog.WithError(err).Error("Failed banning service account, cannot try again...") + return fmt.Errorf("failed banning service account: %v", serviceAccount.RealPath) + } + + // attempt copy again + rLog.Warnf("Move failed with retryable exit code %v, trying again...", exitCode) + attempts++ + continue + default: + return fmt.Errorf("failed and cannot proceed with exit code: %v", exitCode) + } + } + } + + return nil +}