Skip to content

Commit b22e211

Browse files
authored
feat(fs): Add skipExisting option to move and copy, merge option to copy (#1556)
* fix(fs): Add skipExisting option to move and copy. * feat(fs): Add merge option to copy. * feat(fs): Code smell. * feat(fs): Code smell.
1 parent ca401b9 commit b22e211

File tree

3 files changed

+66
-16
lines changed

3 files changed

+66
-16
lines changed

internal/fs/copy_move.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@ type taskType uint8
2424
func (t taskType) String() string {
2525
if t == 0 {
2626
return "copy"
27-
} else {
27+
} else if t == 1 {
2828
return "move"
29+
} else {
30+
return "merge"
2931
}
3032
}
3133

3234
const (
3335
copy taskType = iota
3436
move
37+
merge
3538
)
3639

3740
type FileTransferTask struct {
@@ -67,7 +70,7 @@ func (t *FileTransferTask) Run() error {
6770
return t.RunWithNextTaskCallback(func(nextTask *FileTransferTask) error {
6871
nextTask.groupID = t.groupID
6972
task_group.TransferCoordinator.AddTask(t.groupID, nil)
70-
if t.TaskType == copy {
73+
if t.TaskType == copy || t.TaskType == merge {
7174
CopyTaskManager.Add(nextTask)
7275
} else {
7376
MoveTaskManager.Add(nextTask)
@@ -109,7 +112,7 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
109112
}
110113

111114
if srcStorage.GetStorage() == dstStorage.GetStorage() {
112-
if taskType == copy {
115+
if taskType == copy || taskType == merge {
113116
err = op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...)
114117
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) {
115118
return nil, err
@@ -161,7 +164,7 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
161164
t.Creator, _ = ctx.Value(conf.UserKey).(*model.User)
162165
t.ApiUrl = common.GetApiUrl(ctx)
163166
t.groupID = dstDirPath
164-
if taskType == copy {
167+
if taskType == copy || taskType == merge {
165168
task_group.TransferCoordinator.AddTask(dstDirPath, nil)
166169
CopyTaskManager.Add(t)
167170
} else {
@@ -177,24 +180,42 @@ func (t *FileTransferTask) RunWithNextTaskCallback(f func(nextTask *FileTransfer
177180
if err != nil {
178181
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath)
179182
}
183+
180184
if srcObj.IsDir() {
181185
t.Status = "src object is dir, listing objs"
182186
objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.ListArgs{})
183187
if err != nil {
184188
return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcActualPath)
185189
}
186190
dstActualPath := stdpath.Join(t.DstActualPath, srcObj.GetName())
187-
if t.TaskType == copy {
191+
if t.TaskType == copy || t.TaskType == merge {
188192
if t.Ctx().Value(conf.NoTaskKey) != nil {
189193
defer op.Cache.DeleteDirectory(t.DstStorage, dstActualPath)
190194
} else {
191195
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstActualPath))
192196
}
193197
}
198+
199+
existedObjs := make(map[string]bool)
200+
if t.TaskType == merge {
201+
dstObjs, _ := op.List(t.Ctx(), t.DstStorage, dstActualPath, model.ListArgs{})
202+
for _, obj := range dstObjs {
203+
if !obj.IsDir() {
204+
existedObjs[obj.GetName()] = true
205+
}
206+
}
207+
}
208+
194209
for _, obj := range objs {
195210
if utils.IsCanceled(t.Ctx()) {
196211
return nil
197212
}
213+
214+
if t.TaskType == merge && !obj.IsDir() && existedObjs[obj.GetName()] {
215+
// skip existed file
216+
continue
217+
}
218+
198219
err = f(&FileTransferTask{
199220
TaskType: t.TaskType,
200221
TaskData: TaskData{

internal/fs/fs.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,14 @@ func Copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool)
8484
return res, err
8585
}
8686

87+
func Merge(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
88+
res, err := transfer(ctx, merge, srcObjPath, dstDirPath, lazyCache...)
89+
if err != nil {
90+
log.Errorf("failed merge %s to %s: %+v", srcObjPath, dstDirPath, err)
91+
}
92+
return res, err
93+
}
94+
8795
func Rename(ctx context.Context, srcPath, dstName string, lazyCache ...bool) error {
8896
err := rename(ctx, srcPath, dstName, lazyCache...)
8997
if err != nil {

server/handles/fsmanage.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@ func FsMkdir(c *gin.Context) {
5757
}
5858

5959
type MoveCopyReq struct {
60-
SrcDir string `json:"src_dir"`
61-
DstDir string `json:"dst_dir"`
62-
Names []string `json:"names"`
63-
Overwrite bool `json:"overwrite"`
60+
SrcDir string `json:"src_dir"`
61+
DstDir string `json:"dst_dir"`
62+
Names []string `json:"names"`
63+
Overwrite bool `json:"overwrite"`
64+
SkipExisting bool `json:"skip_existing"`
65+
Merge bool `json:"merge"`
6466
}
6567

6668
func FsMove(c *gin.Context) {
@@ -89,20 +91,25 @@ func FsMove(c *gin.Context) {
8991
return
9092
}
9193

94+
var validNames []string
9295
if !req.Overwrite {
9396
for _, name := range req.Names {
94-
if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil {
97+
if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil && !req.SkipExisting {
9598
common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403)
9699
return
100+
} else if res == nil {
101+
validNames = append(validNames, name)
97102
}
98103
}
104+
} else {
105+
validNames = req.Names
99106
}
100107

101108
// Create all tasks immediately without any synchronous validation
102109
// All validation will be done asynchronously in the background
103110
var addedTasks []task.TaskExtensionInfo
104-
for i, name := range req.Names {
105-
t, err := fs.Move(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1)
111+
for i, name := range validNames {
112+
t, err := fs.Move(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1)
106113
if t != nil {
107114
addedTasks = append(addedTasks, t)
108115
}
@@ -151,20 +158,34 @@ func FsCopy(c *gin.Context) {
151158
return
152159
}
153160

161+
var validNames []string
154162
if !req.Overwrite {
155163
for _, name := range req.Names {
156164
if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil {
157-
common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403)
158-
return
165+
if !req.SkipExisting && !req.Merge {
166+
common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403)
167+
return
168+
} else if req.Merge && res.IsDir() {
169+
validNames = append(validNames, name)
170+
}
171+
} else {
172+
validNames = append(validNames, name)
159173
}
160174
}
175+
} else {
176+
validNames = req.Names
161177
}
162178

163179
// Create all tasks immediately without any synchronous validation
164180
// All validation will be done asynchronously in the background
165181
var addedTasks []task.TaskExtensionInfo
166-
for i, name := range req.Names {
167-
t, err := fs.Copy(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1)
182+
for i, name := range validNames {
183+
var t task.TaskExtensionInfo
184+
if req.Merge {
185+
t, err = fs.Merge(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1)
186+
} else {
187+
t, err = fs.Copy(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1)
188+
}
168189
if t != nil {
169190
addedTasks = append(addedTasks, t)
170191
}

0 commit comments

Comments
 (0)