Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement recursive uploads #415

Merged
merged 3 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 78 additions & 5 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func download_http(sourceUrl *url.URL, destination string, payload *payloadStruc

if recursive {
var err error
files, err = walkDavDir(sourceUrl, namespace, token)
files, err = walkDavDir(sourceUrl, namespace, token, "", false)
if err != nil {
log.Errorln("Error from walkDavDir", err)
return 0, err
Expand Down Expand Up @@ -744,6 +744,37 @@ func (pr *ProgressReader) Size() int64 {
return pr.sizer.Size()
}

// Recursively uploads a directory with all files and nested dirs, keeping file structure on server side
func UploadDirectory(src string, dest *url.URL, token string, namespace namespaces.Namespace) (int64, error) {
var files []string
var amountDownloaded int64
srcUrl := url.URL{Path: src}
// Get the list of files as well as make any directories on the server end
files, err := walkDavDir(&srcUrl, namespace, token, dest.Path, true)
if err != nil {
return 0, err
}

// Upload all of our files within the proper directories
for _, file := range files {
tempDest := url.URL{}
tempDest.Path, err = url.JoinPath(dest.Path, file)
if err != nil {
return 0, err
}
downloaded, err := UploadFile(file, &tempDest, token, namespace)
if err != nil {
return 0, err
}
amountDownloaded += downloaded
}
// Close progress bar container
if ObjectClientOptions.ProgressBars {
progressContainer.Wait()
}
return amountDownloaded, err
}

// UploadFile Uploads a file using HTTP
func UploadFile(src string, origDest *url.URL, token string, namespace namespaces.Namespace) (int64, error) {

Expand Down Expand Up @@ -846,7 +877,12 @@ func UploadFile(src string, origDest *url.URL, token string, namespace namespace
} else {
progressBar.Abort(true)
}
progressContainer.Wait()
// If it is recursive, we need to reuse the mpb instance. Closed later
if ObjectClientOptions.Recursive {
progressBar.Wait()
} else { // If not recursive, go ahead and close it
progressContainer.Wait()
}
}()
}
tickerDuration := 500 * time.Millisecond
Expand Down Expand Up @@ -936,7 +972,7 @@ func doPut(request *http.Request, responseChan chan<- *http.Response, errorChan

}

func walkDavDir(url *url.URL, namespace namespaces.Namespace, token string) ([]string, error) {
func walkDavDir(url *url.URL, namespace namespaces.Namespace, token string, destPath string, upload bool) ([]string, error) {

// Create the client to walk the filesystem
rootUrl := *url
Expand All @@ -961,13 +997,50 @@ func walkDavDir(url *url.URL, namespace namespaces.Namespace, token string) ([]s
// XRootD does not like keep alives and kills things, so turn them off.
transport := config.GetTransport()
c.SetTransport(transport)

files, err := walkDir(url.Path, c)
var files []string
var err error
if upload {
files, err = walkDirUpload(url.Path, c, destPath)
} else {
files, err = walkDir(url.Path, c)
}
log.Debugln("Found files:", files)
return files, err

}

// For uploads, we want to make directories on the server end
func walkDirUpload(path string, client *gowebdav.Client, destPath string) ([]string, error) {
// List of files to return
var files []string
// Whenever this function is called, we should create a new dir on the server side for uploads
err := client.Mkdir(destPath+path, 0755)
if err != nil {
return nil, err
}

// Get our list of files
infos, err := os.ReadDir(path)
if err != nil {
return nil, err
}
for _, info := range infos {
newPath := path + "/" + info.Name()
if info.IsDir() {
// Recursively call this function to create any nested dir's as well as list their files
returnedFiles, err := walkDirUpload(newPath, client, destPath)
if err != nil {
return nil, err
}
files = append(files, returnedFiles...)
} else {
// It is a normal file
files = append(files, newPath)
}
}
return files, err
}

func walkDir(path string, client *gowebdav.Client) ([]string, error) {
var files []string
log.Debugln("Reading directory: ", path)
Expand Down
11 changes: 7 additions & 4 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,17 @@ func getTokenName(destination *url.URL) (scheme, tokenName string) {
}

// Do writeback to stash using SciTokens
func doWriteBack(source string, destination *url.URL, namespace namespaces.Namespace) (int64, error) {
func doWriteBack(source string, destination *url.URL, namespace namespaces.Namespace, recursive bool) (int64, error) {

scitoken_contents, err := getToken(destination, namespace, true, "")
if err != nil {
return 0, err
}
return UploadFile(source, destination, scitoken_contents, namespace)

if recursive {
return UploadDirectory(source, destination, scitoken_contents, namespace)
} else {
return UploadFile(source, destination, scitoken_contents, namespace)
}
}

// getToken returns the token to use for the given destination
Expand Down Expand Up @@ -503,7 +506,7 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
AddError(err)
return 0, err
}
_, err = doWriteBack(source_url.Path, dest_url, ns)
_, err = doWriteBack(source_url.Path, dest_url, ns, recursive)
AddError(err)
return 0, err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/object_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func copyMain(cmd *cobra.Command, args []string) {
for _, src := range source {
var tmpDownloaded int64
isRecursive, _ := cmd.Flags().GetBool("recursive")
client.ObjectClientOptions.Recursive = isRecursive
tmpDownloaded, result = client.DoStashCPSingle(src, dest, splitMethods, isRecursive)
downloaded += tmpDownloaded
if result != nil {
Expand Down
Loading