Skip to content

Commit

Permalink
Add "lakectl fs upload --recursive" flag (#979)
Browse files Browse the repository at this point in the history
* [refactor] extract "upload" function

* Add "lakectl fs upload --recursive"
  • Loading branch information
arielshaqed authored Dec 2, 2020
1 parent 18bf790 commit 3aa396f
Showing 1 changed file with 61 additions and 14 deletions.
75 changes: 61 additions & 14 deletions cmd/lakectl/cmd/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package cmd

import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strings"

"github.com/go-openapi/swag"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/api"
"github.com/treeverse/lakefs/api/gen/models"
"github.com/treeverse/lakefs/cmdutils"
"github.com/treeverse/lakefs/uri"
)
Expand All @@ -19,6 +23,11 @@ Human Size: {{ .SizeBytes|human_bytes }}
Checksum: {{.Checksum}}
`

const fsRecursiveTemplate = `Files: {{.Count}}
Total Size: {{.Bytes}} bytes
Human Total Size: {{.Bytes|human_bytes}}
`

var fsStatCmd = &cobra.Command{
Use: "stat <path uri>",
Short: "view object metadata",
Expand Down Expand Up @@ -90,6 +99,28 @@ var fsCatCmd = &cobra.Command{
},
}

func upload(client api.Client, sourcePathname string, destURI *uri.URI) (*models.ObjectStats, error) {
var fp io.Reader
if strings.EqualFold(sourcePathname, "-") {
// upload from stdin
fp = os.Stdin
} else {
file, err := os.Open(sourcePathname)
if err != nil {
return nil, err
}
defer func() {
_ = file.Close()
}()
fp = file
}

// read
stat, err := client.UploadObject(context.Background(), destURI.Repository, destURI.Ref, destURI.Path, fp)

return stat, err
}

var fsUploadCmd = &cobra.Command{
Use: "upload <path uri>",
Short: "upload a local file to the specified URI",
Expand All @@ -101,27 +132,42 @@ var fsUploadCmd = &cobra.Command{
client := getClient()
pathURI := uri.Must(uri.Parse(args[0]))
source, _ := cmd.Flags().GetString("source")
var fp io.Reader
if strings.EqualFold(source, "-") {
// upload from stdin
fp = os.Stdin
} else {
file, err := os.Open(source)
recursive, _ := cmd.Flags().GetBool("recursive")
if !recursive {
stat, err := upload(client, source, pathURI)
if err != nil {
DieErr(err)
}
defer func() {
_ = file.Close()
}()
fp = file
Write(fsStatTemplate, stat)
return
}

// read
stat, err := client.UploadObject(context.Background(), pathURI.Repository, pathURI.Ref, pathURI.Path, fp)
// copy recursively
var totals struct {
Bytes int64
Count int64
}
err := filepath.Walk(source, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("traverse %s: %w", path, err)
}
if info.IsDir() {
return nil
}
relPath := strings.TrimPrefix(path, source)
uri := *pathURI
uri.Path = filepath.Join(uri.Path, relPath)
stat, err := upload(client, path, &uri)
if err != nil {
return fmt.Errorf("upload %s: %w", path, err)
}
totals.Bytes += stat.SizeBytes
totals.Count++
return nil
})
if err != nil {
DieErr(err)
}
Write(fsStatTemplate, stat)
Write(fsRecursiveTemplate, totals)
},
}

Expand Down Expand Up @@ -158,5 +204,6 @@ func init() {
fsCmd.AddCommand(fsRmCmd)

fsUploadCmd.Flags().StringP("source", "s", "", "local file to upload, or \"-\" for stdin")
fsUploadCmd.Flags().BoolP("recursive", "r", false, "recursively copy all files under local source")
_ = fsUploadCmd.MarkFlagRequired("source")
}

0 comments on commit 3aa396f

Please sign in to comment.