From 595d01b103374f7e6442f2581453c2fa51515f5a Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Wed, 13 Sep 2017 19:29:52 +0100 Subject: [PATCH 01/12] Improves README --- .gitignore | 1 + README.md | 36 ++++++++++++++++++++++++++++++++++-- cli/root.go | 9 ++------- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index d6f3562..bfe1862 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ tmp dist +paddle diff --git a/README.md b/README.md index 5438767..1ac3496 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,40 @@ # Paddle -Paddle is a command line tool for data archival and processing. +Paddle is a command line tool for canoe data archival and processing. -Work in progress. +## Setup + +Make sure you have Go installed on your machine and that you checkout the repo to +the right folder. By default should be: + +``` +mkdir -p ~/go/src/github.com/deliveroo +cd ~/go/src/github.com/deliveroo +git clone git@github.com:deliveroo/paddle.git +cd paddle +``` + +You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g: + +``` +> cat $HOME/.paddle.yaml +bucket: roo-bucket +``` + +You will also need to create a `$HOME/.aws/config` or `$HOME/.aws/credentials` so Paddle can connect to AWS, e.g.: + +``` +> cat $HOME/.aws/credentials +[default] +aws_access_key_id=xxx +aws_secret_access_key=yyy +region=eu-west-1 +``` + + +``` +$ go build +``` ## Usage diff --git a/cli/root.go b/cli/root.go index 00c08f1..ae2c4c7 100644 --- a/cli/root.go +++ b/cli/root.go @@ -28,13 +28,8 @@ var cfgFile string // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ Use: "paddle", - Short: "A brief description of your application", - Long: `A longer description that spans multiple lines and likely contains -examples and usage of using your application. For example: - -Cobra is a CLI library for Go that empowers applications. -This application is a tool to generate the needed files -to quickly create a Cobra application.`, + Short: "Canoe tool for data archival and processing", + Long: "Canoe tool for data archival and processing", // Uncomment the following line if your bare application // has an action associated with it: // Run: func(cmd *cobra.Command, args []string) { }, From 8bf1d6ba65c092448b92730f5a44cb70ee29c061 Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Mon, 25 Sep 2017 16:01:08 +0100 Subject: [PATCH 02/12] Refactor of get --- cli/data/get.go | 99 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 66 insertions(+), 33 deletions(-) diff --git a/cli/data/get.go b/cli/data/get.go index 24e0b8b..eb6b857 100644 --- a/cli/data/get.go +++ b/cli/data/get.go @@ -27,6 +27,21 @@ import ( "strings" ) +type S3Target struct { + bucket string + prefix string + path string +} + +func (s *S3Target) copy() *S3Target { + clone := *s + return &clone +} + +func (t *S3Target) fullPath() string { + return fmt.Sprintf("%s/%s/%s", t.bucket, t.prefix, t.path); +} + var getBranch string var getCommitPath string @@ -44,7 +59,14 @@ $ paddle data get -b experimental trained-model/version1 dest/path if !viper.IsSet("bucket") { exitErrorf("Bucket not defined. Please define 'bucket' in your config file.") } - fetchPath(viper.GetString("bucket"), args[0], getBranch, getCommitPath, args[1]) + + source := S3Target{ + bucket: viper.GetString("bucket"), + prefix: fmt.Sprintf("%s/%s", args[0], getBranch), + path: getCommitPath, + } + + copyPathToDestination(&source, args[1]) }, } @@ -53,69 +75,80 @@ func init() { getCmd.Flags().StringVarP(&getCommitPath, "path", "p", "HEAD", "Path to fetch (instead of HEAD)") } -func fetchPath(bucket string, version string, branch string, path string, destination string) { - sess := session.Must(session.NewSessionWithOptions(session.Options{ +func copyPathToDestination(source *S3Target, destination string) { + session := session.Must(session.NewSessionWithOptions(session.Options{ SharedConfigState: session.SharedConfigEnable, })) - if path == "HEAD" { - svc := s3.New(sess) - headPath := fmt.Sprintf("%s/%s/HEAD", version, branch) - fmt.Println(headPath) - out, err := svc.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(headPath), - }) - if err != nil { - exitErrorf("%v", err) - } - buf := new(bytes.Buffer) - buf.ReadFrom(out.Body) - path = buf.String() - } else { - path = fmt.Sprintf("%s/%s/%s", version, branch, path) + /* + * HEAD contains the path to latest folder + */ + if source.path == "HEAD" { + source = source.copy() + source.path = readHEAD(session, source) + } + + fmt.Println("Copying " + source.fullPath() + " to " + destination) + copy(session, source, destination) +} + +func readHEAD(session *session.Session, source *S3Target) string { + svc := s3.New(session) + key := fmt.Sprintf("%s/HEAD", source.prefix) + + out, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(source.bucket), + Key: aws.String(key), + }) + + if err != nil { + exitErrorf("%v", err) } - fmt.Println("Fetching " + path) - getBucketObjects(sess, bucket, path, destination) + + buf := new(bytes.Buffer) + buf.ReadFrom(out.Body) + return buf.String() } -func getBucketObjects(sess *session.Session, bucket string, prefix string, dest string) { +func copy(session *session.Session, source *S3Target, destination string) { query := &s3.ListObjectsV2Input{ - Bucket: aws.String(bucket), - Prefix: aws.String(prefix), + Bucket: aws.String(source.bucket), + Prefix: aws.String(source.prefix + "/" + source.path), } - svc := s3.New(sess) + svc := s3.New(session) truncatedListing := true for truncatedListing { - resp, err := svc.ListObjectsV2(query) + response, err := svc.ListObjectsV2(query) if err != nil { fmt.Println(err.Error()) return } - getObjectsAll(bucket, resp, svc, prefix, dest) - query.ContinuationToken = resp.NextContinuationToken - truncatedListing = *resp.IsTruncated + copyToLocalFiles(svc, response.Contents, source, destination) + + // Check if more results + query.ContinuationToken = response.NextContinuationToken + truncatedListing = *response.IsTruncated } } -func getObjectsAll(bucket string, bucketObjectsList *s3.ListObjectsV2Output, s3Client *s3.S3, prefix string, dest string) { - for _, key := range bucketObjectsList.Contents { +func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source *S3Target, destination string) { + for _, key := range objects { destFilename := *key.Key if strings.HasSuffix(*key.Key, "/") { fmt.Println("Got a directory") continue } out, err := s3Client.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(bucket), + Bucket: aws.String(source.bucket), Key: key.Key, }) if err != nil { exitErrorf("%v", err) } - destFilePath := dest + "/" + strings.TrimPrefix(destFilename, prefix+"/") + destFilePath := destination + "/" + strings.TrimPrefix(destFilename, source.prefix + "/") err = os.MkdirAll(filepath.Dir(destFilePath), 0777) fmt.Print(destFilePath) destFile, err := os.Create(destFilePath) From 48fd3a67b3c09249e3f7ee35c101e272077e7ca6 Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Mon, 25 Sep 2017 17:07:19 +0100 Subject: [PATCH 03/12] Refactor struct to its own file --- cli/data/get.go | 25 +++++-------------------- cli/data/s3source.go | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+), 20 deletions(-) create mode 100644 cli/data/s3source.go diff --git a/cli/data/get.go b/cli/data/get.go index eb6b857..a3bd501 100644 --- a/cli/data/get.go +++ b/cli/data/get.go @@ -27,21 +27,6 @@ import ( "strings" ) -type S3Target struct { - bucket string - prefix string - path string -} - -func (s *S3Target) copy() *S3Target { - clone := *s - return &clone -} - -func (t *S3Target) fullPath() string { - return fmt.Sprintf("%s/%s/%s", t.bucket, t.prefix, t.path); -} - var getBranch string var getCommitPath string @@ -60,7 +45,7 @@ $ paddle data get -b experimental trained-model/version1 dest/path exitErrorf("Bucket not defined. Please define 'bucket' in your config file.") } - source := S3Target{ + source := S3Source{ bucket: viper.GetString("bucket"), prefix: fmt.Sprintf("%s/%s", args[0], getBranch), path: getCommitPath, @@ -75,7 +60,7 @@ func init() { getCmd.Flags().StringVarP(&getCommitPath, "path", "p", "HEAD", "Path to fetch (instead of HEAD)") } -func copyPathToDestination(source *S3Target, destination string) { +func copyPathToDestination(source *S3Source, destination string) { session := session.Must(session.NewSessionWithOptions(session.Options{ SharedConfigState: session.SharedConfigEnable, })) @@ -92,7 +77,7 @@ func copyPathToDestination(source *S3Target, destination string) { copy(session, source, destination) } -func readHEAD(session *session.Session, source *S3Target) string { +func readHEAD(session *session.Session, source *S3Source) string { svc := s3.New(session) key := fmt.Sprintf("%s/HEAD", source.prefix) @@ -110,7 +95,7 @@ func readHEAD(session *session.Session, source *S3Target) string { return buf.String() } -func copy(session *session.Session, source *S3Target, destination string) { +func copy(session *session.Session, source *S3Source, destination string) { query := &s3.ListObjectsV2Input{ Bucket: aws.String(source.bucket), Prefix: aws.String(source.prefix + "/" + source.path), @@ -134,7 +119,7 @@ func copy(session *session.Session, source *S3Target, destination string) { } } -func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source *S3Target, destination string) { +func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source *S3Source, destination string) { for _, key := range objects { destFilename := *key.Key if strings.HasSuffix(*key.Key, "/") { diff --git a/cli/data/s3source.go b/cli/data/s3source.go new file mode 100644 index 0000000..b7abe54 --- /dev/null +++ b/cli/data/s3source.go @@ -0,0 +1,20 @@ +package data + +import ( + "fmt" +) + +type S3Source struct { + bucket string + prefix string + path string +} + +func (s *S3Source) copy() *S3Source { + clone := *s + return &clone +} + +func (t *S3Source) fullPath() string { + return fmt.Sprintf("%s/%s/%s", t.bucket, t.prefix, t.path); +} From 61c25ae607b46621131ba600951a28942eeacc93 Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Mon, 25 Sep 2017 18:15:46 +0100 Subject: [PATCH 04/12] Better S3 interface + tests --- cli/data/get.go | 30 ++++++++++----------- cli/data/s3path.go | 23 ++++++++++++++++ cli/data/s3path_test.go | 59 +++++++++++++++++++++++++++++++++++++++++ cli/data/s3source.go | 20 -------------- 4 files changed, 96 insertions(+), 36 deletions(-) create mode 100644 cli/data/s3path.go create mode 100644 cli/data/s3path_test.go delete mode 100644 cli/data/s3source.go diff --git a/cli/data/get.go b/cli/data/get.go index a3bd501..5c55772 100644 --- a/cli/data/get.go +++ b/cli/data/get.go @@ -45,13 +45,12 @@ $ paddle data get -b experimental trained-model/version1 dest/path exitErrorf("Bucket not defined. Please define 'bucket' in your config file.") } - source := S3Source{ + source := S3Path{ bucket: viper.GetString("bucket"), - prefix: fmt.Sprintf("%s/%s", args[0], getBranch), - path: getCommitPath, + path: fmt.Sprintf("%s/%s/%s", args[0], getBranch, getCommitPath), } - copyPathToDestination(&source, args[1]) + copyPathToDestination(source, args[1]) }, } @@ -60,7 +59,7 @@ func init() { getCmd.Flags().StringVarP(&getCommitPath, "path", "p", "HEAD", "Path to fetch (instead of HEAD)") } -func copyPathToDestination(source *S3Source, destination string) { +func copyPathToDestination(source S3Path, destination string) { session := session.Must(session.NewSessionWithOptions(session.Options{ SharedConfigState: session.SharedConfigEnable, })) @@ -68,22 +67,21 @@ func copyPathToDestination(source *S3Source, destination string) { /* * HEAD contains the path to latest folder */ - if source.path == "HEAD" { - source = source.copy() - source.path = readHEAD(session, source) + if source.Basename() == "HEAD" { + latestFolder := readHEAD(session, source) + source.path = strings.Replace(source.path, "HEAD", latestFolder, 1) } - fmt.Println("Copying " + source.fullPath() + " to " + destination) + fmt.Println("Copying " + source.path + " to " + destination) copy(session, source, destination) } -func readHEAD(session *session.Session, source *S3Source) string { +func readHEAD(session *session.Session, source S3Path) string { svc := s3.New(session) - key := fmt.Sprintf("%s/HEAD", source.prefix) out, err := svc.GetObject(&s3.GetObjectInput{ Bucket: aws.String(source.bucket), - Key: aws.String(key), + Key: aws.String(source.path), }) if err != nil { @@ -95,10 +93,10 @@ func readHEAD(session *session.Session, source *S3Source) string { return buf.String() } -func copy(session *session.Session, source *S3Source, destination string) { +func copy(session *session.Session, source S3Path, destination string) { query := &s3.ListObjectsV2Input{ Bucket: aws.String(source.bucket), - Prefix: aws.String(source.prefix + "/" + source.path), + Prefix: aws.String(source.path), } svc := s3.New(session) @@ -119,7 +117,7 @@ func copy(session *session.Session, source *S3Source, destination string) { } } -func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source *S3Source, destination string) { +func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source S3Path, destination string) { for _, key := range objects { destFilename := *key.Key if strings.HasSuffix(*key.Key, "/") { @@ -133,7 +131,7 @@ func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source *S3Source, d if err != nil { exitErrorf("%v", err) } - destFilePath := destination + "/" + strings.TrimPrefix(destFilename, source.prefix + "/") + destFilePath := destination + "/" + strings.TrimPrefix(destFilename, source.Dirname() + "/") err = os.MkdirAll(filepath.Dir(destFilePath), 0777) fmt.Print(destFilePath) destFile, err := os.Create(destFilePath) diff --git a/cli/data/s3path.go b/cli/data/s3path.go new file mode 100644 index 0000000..9cdfcd4 --- /dev/null +++ b/cli/data/s3path.go @@ -0,0 +1,23 @@ +package data + +import ( + "strings" +) + +type S3Path struct { + bucket string + path string +} + +func (p *S3Path) Basename() string { + components := strings.Split(p.path, "/") + return components[len(components)-1] +} + +func (p *S3Path) Dirname() string { + components := strings.Split(p.path, "/") + if len(components) == 0 { + return "" + } + return strings.Join(components[:len(components)-1], "/") +} diff --git a/cli/data/s3path_test.go b/cli/data/s3path_test.go new file mode 100644 index 0000000..29d8be1 --- /dev/null +++ b/cli/data/s3path_test.go @@ -0,0 +1,59 @@ +package data + +import "testing" + +func TestBasename(t *testing.T) { + path := S3Path{ + bucket: "foo", + path: "aaa/bbb/ccc", + } + + dirname := path.Basename() + expectation := "ccc" + + if dirname != expectation { + t.Errorf("Basename was incorrect, got: %s, want: %s.", dirname, expectation) + } +} + +func TestBasenameWithEmptyPath(t *testing.T) { + path := S3Path{ + bucket: "foo", + path: "", + } + + dirname := path.Basename() + expectation := "" + + if dirname != expectation { + t.Errorf("Basename was incorrect, got: %s, want: %s.", dirname, expectation) + } +} + +func TestDirname(t *testing.T) { + path := S3Path{ + bucket: "foo", + path: "aaa/bbb/ccc", + } + + dirname := path.Dirname() + expectation := "aaa/bbb" + + if dirname != expectation { + t.Errorf("Dirname was incorrect, got: %s, want: %s.", dirname, expectation) + } +} + +func TestDirnameOnBasicPath(t *testing.T) { + path := S3Path{ + bucket: "foo", + path: "aaa", + } + + dirname := path.Dirname() + expectation := "" + + if dirname != expectation { + t.Errorf("Dirname was incorrect, got: %s, want: %s.", dirname, expectation) + } +} diff --git a/cli/data/s3source.go b/cli/data/s3source.go deleted file mode 100644 index b7abe54..0000000 --- a/cli/data/s3source.go +++ /dev/null @@ -1,20 +0,0 @@ -package data - -import ( - "fmt" -) - -type S3Source struct { - bucket string - prefix string - path string -} - -func (s *S3Source) copy() *S3Source { - clone := *s - return &clone -} - -func (t *S3Source) fullPath() string { - return fmt.Sprintf("%s/%s/%s", t.bucket, t.prefix, t.path); -} From dbd350c3d075786cb3117ce93775a2cf38af1cc6 Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Tue, 26 Sep 2017 16:14:50 +0100 Subject: [PATCH 05/12] Couple of refactors to commit operation --- cli/data/commit.go | 56 ++++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/cli/data/commit.go b/cli/data/commit.go index b61e5a6..5e6cc1d 100644 --- a/cli/data/commit.go +++ b/cli/data/commit.go @@ -45,7 +45,13 @@ $ paddle data commit -b experimental source/path trained-model/version1 if !viper.IsSet("bucket") { exitErrorf("Bucket not defined. Please define 'bucket' in your config file.") } - commitPath(args[0], viper.GetString("bucket"), args[1], commitBranch) + + destination := S3Path{ + bucket: viper.GetString("bucket"), + path: fmt.Sprintf("%s/%s", args[1], commitBranch), + } + + commitPath(args[0], destination) }, } @@ -53,28 +59,15 @@ func init() { commitCmd.Flags().StringVarP(&commitBranch, "branch", "b", "master", "Branch to work on") } -func commitPath(path string, bucket string, version string, branch string) { +func commitPath(path string, destination S3Path) { fd, err := os.Stat(path) if err != nil { - exitErrorf("Path %v not found", path) + exitErrorf("Source path %v not found", path) } if !fd.Mode().IsDir() { - exitErrorf("Path %v must be a directory", path) - } - - hash, err := common.DirHash(path) - if err != nil { - exitErrorf("Unable to hash input folder") + exitErrorf("Source path %v must be a directory", path) } - t := time.Now().UTC() - - datePath := fmt.Sprintf("%d/%02d/%02d/%02d%02d", - t.Year(), t.Month(), t.Day(), - t.Hour(), t.Minute()) - - destPath := fmt.Sprintf("%s/%s/%s_%s", version, branch, datePath, hash) - sess := session.Must(session.NewSessionWithOptions(session.Options{ SharedConfigState: session.SharedConfigEnable, })) @@ -89,22 +82,31 @@ func commitPath(path string, bucket string, version string, branch string) { } }) + t := time.Now().UTC() + datePath := fmt.Sprintf("%d/%02d/%02d/%02d%02d", + t.Year(), t.Month(), t.Day(), + t.Hour(), t.Minute()) + + hash, err := common.DirHash(path) + if err != nil { + exitErrorf("Unable to hash input folder") + } + uploader := s3manager.NewUploader(sess) + folderKey := fmt.Sprintf("%s/%s_%s", destination.path, datePath, hash) for _, file := range fileList { - key := destPath + "/" + strings.TrimPrefix(file, path+"/") + key := fmt.Sprintf("%s/%s", folderKey, strings.TrimPrefix(file, path + "/")) fmt.Println(file + " -> " + key) - uploadFileToS3(uploader, bucket, key, file) + uploadFileToS3(uploader, destination.bucket, key, file) } // Update HEAD - - headFile := fmt.Sprintf("%s/%s/HEAD", version, branch) - - uploadDataToS3(sess, destPath, bucket, headFile) + headKey := fmt.Sprintf("%s/HEAD", destination.path) + uploadDataToS3(sess, destination.bucket, headKey, folderKey) } -func uploadFileToS3(uploader *s3manager.Uploader, bucketName string, key string, filePath string) { +func uploadFileToS3(uploader *s3manager.Uploader, bucket string, key string, filePath string) { file, err := os.Open(filePath) if err != nil { fmt.Println("Failed to open file", file, err) @@ -113,18 +115,18 @@ func uploadFileToS3(uploader *s3manager.Uploader, bucketName string, key string, defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(bucketName), + Bucket: aws.String(bucket), Key: aws.String(key), Body: file, }) if err != nil { - exitErrorf("Failed to upload data to %s/%s, %s", bucketName, key, err.Error()) + exitErrorf("Failed to upload data to %s/%s, %s", bucket, key, err.Error()) return } } -func uploadDataToS3(sess *session.Session, data string, bucket string, key string) { +func uploadDataToS3(sess *session.Session, bucket string, key string, data string) { s3Svc := s3.New(sess) _, err := s3Svc.PutObject(&s3.PutObjectInput{ From 3309911e331c7198d70a5f1d77787b21f6d0d340 Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Wed, 11 Oct 2017 15:03:44 +0100 Subject: [PATCH 06/12] Remove redudant return --- cli/data/commit.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cli/data/commit.go b/cli/data/commit.go index 5e6cc1d..200504f 100644 --- a/cli/data/commit.go +++ b/cli/data/commit.go @@ -109,8 +109,7 @@ func commitPath(path string, destination S3Path) { func uploadFileToS3(uploader *s3manager.Uploader, bucket string, key string, filePath string) { file, err := os.Open(filePath) if err != nil { - fmt.Println("Failed to open file", file, err) - os.Exit(1) + exitErrorf("Failed to open file", file, err) } defer file.Close() @@ -122,7 +121,6 @@ func uploadFileToS3(uploader *s3manager.Uploader, bucket string, key string, fil if err != nil { exitErrorf("Failed to upload data to %s/%s, %s", bucket, key, err.Error()) - return } } From b4e255d1c8c3e8eca32c46d37076dab948ad7bd8 Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Fri, 13 Oct 2017 15:42:17 +0100 Subject: [PATCH 07/12] FilesToKeys as a function + unit tests --- cli/data/commit.go | 58 ++++++++++++++++++++++++----------------- cli/data/commit_test.go | 36 +++++++++++++++++++++++++ cli/data/get.go | 4 +-- cli/data/s3path.go | 2 +- cli/data/s3path_test.go | 8 +++--- 5 files changed, 77 insertions(+), 31 deletions(-) create mode 100644 cli/data/commit_test.go diff --git a/cli/data/commit.go b/cli/data/commit.go index 200504f..5c9920d 100644 --- a/cli/data/commit.go +++ b/cli/data/commit.go @@ -21,15 +21,16 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/deliveroo/paddle/common" + "github.com/spf13/afero" "github.com/spf13/cobra" "github.com/spf13/viper" "os" - "path/filepath" "strings" "time" ) var commitBranch string +var AppFs = afero.NewOsFs() var commitCmd = &cobra.Command{ Use: "commit [source path] [version]", @@ -48,9 +49,10 @@ $ paddle data commit -b experimental source/path trained-model/version1 destination := S3Path{ bucket: viper.GetString("bucket"), - path: fmt.Sprintf("%s/%s", args[1], commitBranch), + path: fmt.Sprintf("%s/%s", args[1], commitBranch), } + validatePath(args[0]) commitPath(args[0], destination) }, } @@ -59,55 +61,63 @@ func init() { commitCmd.Flags().StringVarP(&commitBranch, "branch", "b", "master", "Branch to work on") } -func commitPath(path string, destination S3Path) { - fd, err := os.Stat(path) +func validatePath(path string) { + fd, err := AppFs.Stat(path) if err != nil { exitErrorf("Source path %v not found", path) } if !fd.Mode().IsDir() { exitErrorf("Source path %v must be a directory", path) } +} +func commitPath(path string, destination S3Path) { sess := session.Must(session.NewSessionWithOptions(session.Options{ SharedConfigState: session.SharedConfigEnable, })) - fileList := []string{} - filepath.Walk(path, func(p string, f os.FileInfo, err error) error { - if common.IsDirectory(p) { - return nil - } else { - fileList = append(fileList, p) + rootKey := generateRootKey(path, destination) + keys := filesToKeys(path) + uploader := s3manager.NewUploader(sess) + + for _, file := range keys { + key := fmt.Sprintf("%s/%s", rootKey, strings.TrimPrefix(file, path+"/")) + fmt.Println(file + " -> " + key) + uploadFileToS3(uploader, destination.bucket, key, file) + } + + // Update HEAD + headKey := fmt.Sprintf("%s/HEAD", destination.path) + uploadDataToS3(sess, destination.bucket, headKey, rootKey) +} + +func filesToKeys(path string) (keys []string) { + afero.Walk(AppFs, path, func(p string, f os.FileInfo, err error) error { + if f.IsDir() { return nil } + keys = append(keys, p) + return nil }) + return keys +} +func generateRootKey(source string, destination S3Path) string { t := time.Now().UTC() datePath := fmt.Sprintf("%d/%02d/%02d/%02d%02d", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()) - hash, err := common.DirHash(path) + hash, err := common.DirHash(source) if err != nil { exitErrorf("Unable to hash input folder") } - uploader := s3manager.NewUploader(sess) - folderKey := fmt.Sprintf("%s/%s_%s", destination.path, datePath, hash) - - for _, file := range fileList { - key := fmt.Sprintf("%s/%s", folderKey, strings.TrimPrefix(file, path + "/")) - fmt.Println(file + " -> " + key) - uploadFileToS3(uploader, destination.bucket, key, file) - } - - // Update HEAD - headKey := fmt.Sprintf("%s/HEAD", destination.path) - uploadDataToS3(sess, destination.bucket, headKey, folderKey) + return fmt.Sprintf("%s/%s_%s", destination.path, datePath, hash) } func uploadFileToS3(uploader *s3manager.Uploader, bucket string, key string, filePath string) { - file, err := os.Open(filePath) + file, err := AppFs.Open(filePath) if err != nil { exitErrorf("Failed to open file", file, err) } diff --git a/cli/data/commit_test.go b/cli/data/commit_test.go new file mode 100644 index 0000000..9be8b57 --- /dev/null +++ b/cli/data/commit_test.go @@ -0,0 +1,36 @@ +package data + +import ( + "github.com/spf13/afero" + "reflect" + "strings" + "testing" +) + +func TestFilesToKeys(t *testing.T) { + AppFs = afero.NewMemMapFs() + AppFs.MkdirAll("src/", 0755) + afero.WriteFile(AppFs, "src/a/b", []byte("file c"), 0644) + afero.WriteFile(AppFs, "src/c", []byte("file c"), 0644) + + list := filesToKeys("src") + expectation := []string{ + "src/a/b", + "src/c", + } + + if !reflect.DeepEqual(list, expectation) { + t.Errorf("list is different got: %s, want: %s.", strings.Join(list, ","), strings.Join(expectation, ",")) + } +} + +func TestFilesToKeysWhenEmptyFolder(t *testing.T) { + AppFs = afero.NewMemMapFs() + AppFs.MkdirAll("src", 0755) + + list := filesToKeys("src") + + if len(list) != 0 { + t.Errorf("expecting empty list but got: %s", strings.Join(list, ",")) + } +} diff --git a/cli/data/get.go b/cli/data/get.go index 5c55772..33185ad 100644 --- a/cli/data/get.go +++ b/cli/data/get.go @@ -47,7 +47,7 @@ $ paddle data get -b experimental trained-model/version1 dest/path source := S3Path{ bucket: viper.GetString("bucket"), - path: fmt.Sprintf("%s/%s/%s", args[0], getBranch, getCommitPath), + path: fmt.Sprintf("%s/%s/%s", args[0], getBranch, getCommitPath), } copyPathToDestination(source, args[1]) @@ -131,7 +131,7 @@ func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source S3Path, dest if err != nil { exitErrorf("%v", err) } - destFilePath := destination + "/" + strings.TrimPrefix(destFilename, source.Dirname() + "/") + destFilePath := destination + "/" + strings.TrimPrefix(destFilename, source.Dirname()+"/") err = os.MkdirAll(filepath.Dir(destFilePath), 0777) fmt.Print(destFilePath) destFile, err := os.Create(destFilePath) diff --git a/cli/data/s3path.go b/cli/data/s3path.go index 9cdfcd4..9a48a89 100644 --- a/cli/data/s3path.go +++ b/cli/data/s3path.go @@ -6,7 +6,7 @@ import ( type S3Path struct { bucket string - path string + path string } func (p *S3Path) Basename() string { diff --git a/cli/data/s3path_test.go b/cli/data/s3path_test.go index 29d8be1..03f5a11 100644 --- a/cli/data/s3path_test.go +++ b/cli/data/s3path_test.go @@ -5,7 +5,7 @@ import "testing" func TestBasename(t *testing.T) { path := S3Path{ bucket: "foo", - path: "aaa/bbb/ccc", + path: "aaa/bbb/ccc", } dirname := path.Basename() @@ -19,7 +19,7 @@ func TestBasename(t *testing.T) { func TestBasenameWithEmptyPath(t *testing.T) { path := S3Path{ bucket: "foo", - path: "", + path: "", } dirname := path.Basename() @@ -33,7 +33,7 @@ func TestBasenameWithEmptyPath(t *testing.T) { func TestDirname(t *testing.T) { path := S3Path{ bucket: "foo", - path: "aaa/bbb/ccc", + path: "aaa/bbb/ccc", } dirname := path.Dirname() @@ -47,7 +47,7 @@ func TestDirname(t *testing.T) { func TestDirnameOnBasicPath(t *testing.T) { path := S3Path{ bucket: "foo", - path: "aaa", + path: "aaa", } dirname := path.Dirname() From 422e487286939f08833ca8af1782e4f01a8cb299 Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Fri, 13 Oct 2017 16:06:27 +0100 Subject: [PATCH 08/12] Simplify hasher with just a random generated string --- cli/data/commit.go | 15 +++------ common/hasher.go | 81 ---------------------------------------------- common/path.go | 21 ------------ rand/strings.go | 24 ++++++++++++++ 4 files changed, 29 insertions(+), 112 deletions(-) delete mode 100644 common/hasher.go delete mode 100644 common/path.go create mode 100644 rand/strings.go diff --git a/cli/data/commit.go b/cli/data/commit.go index 5c9920d..e926a2f 100644 --- a/cli/data/commit.go +++ b/cli/data/commit.go @@ -20,7 +20,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" - "github.com/deliveroo/paddle/common" + "github.com/deliveroo/paddle/rand" "github.com/spf13/afero" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -76,7 +76,7 @@ func commitPath(path string, destination S3Path) { SharedConfigState: session.SharedConfigEnable, })) - rootKey := generateRootKey(path, destination) + rootKey := generateRootKey(destination) keys := filesToKeys(path) uploader := s3manager.NewUploader(sess) @@ -102,18 +102,13 @@ func filesToKeys(path string) (keys []string) { return keys } -func generateRootKey(source string, destination S3Path) string { +func generateRootKey(destination S3Path) string { t := time.Now().UTC() - datePath := fmt.Sprintf("%d/%02d/%02d/%02d%02d", + datePath := fmt.Sprintf("%d/%02d/%02d/%02d/%02d", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()) - hash, err := common.DirHash(source) - if err != nil { - exitErrorf("Unable to hash input folder") - } - - return fmt.Sprintf("%s/%s_%s", destination.path, datePath, hash) + return fmt.Sprintf("%s/%s_%s", destination.path, datePath, rand.String(10)) } func uploadFileToS3(uploader *s3manager.Uploader, bucket string, key string, filePath string) { diff --git a/common/hasher.go b/common/hasher.go deleted file mode 100644 index 81998ee..0000000 --- a/common/hasher.go +++ /dev/null @@ -1,81 +0,0 @@ -package common - -import ( - "crypto/sha1" - "fmt" - "io" - "math" - "os" - "path/filepath" - "strings" -) - -const filechunk = 8192 - -func DirHash(path string) (string, error) { - fileList := []string{} - sha1List := []string{} - filepath.Walk(path, func(p string, f os.FileInfo, err error) error { - if IsDirectory(p) { - return nil - } else { - fileList = append(fileList, p) - return nil - } - }) - for _, file := range fileList { - sha, err := FileHash(file) - if err == nil { - sha1List = append(sha1List, fmt.Sprintf("%s:%s", file, sha)) - } else { - return "", err - } - } - files := strings.Join(sha1List, "\n") - hasher := sha1.New() - hasher.Write([]byte(files)) - return fmt.Sprintf("%x", hasher.Sum(nil)), nil -} - -func FileHash(path string) (string, error) { - // Open the file for reading - file, err := os.Open(path) - if err != nil { - fmt.Println("Cannot find file:", os.Args[1]) - return "", err - } - - defer file.Close() - - // Get file info - info, err := file.Stat() - if err != nil { - fmt.Println("Cannot access file:", os.Args[1]) - return "", err - } - - // Get the filesize - filesize := info.Size() - - // Calculate the number of blocks - blocks := uint64(math.Ceil(float64(filesize) / float64(filechunk))) - - hash := sha1.New() - - // Check each block - for i := uint64(0); i < blocks; i++ { - // Calculate block size - blocksize := int(math.Min(filechunk, float64(filesize-int64(i*filechunk)))) - - // Make a buffer - buf := make([]byte, blocksize) - - // Make a buffer - file.Read(buf) - - // Write to the buffer - io.WriteString(hash, string(buf)) - } - - return fmt.Sprintf("%x", hash.Sum(nil)), nil -} diff --git a/common/path.go b/common/path.go deleted file mode 100644 index dbc9286..0000000 --- a/common/path.go +++ /dev/null @@ -1,21 +0,0 @@ -package common - -import ( - "fmt" - "os" -) - -func IsDirectory(path string) bool { - fd, err := os.Stat(path) - if err != nil { - fmt.Println(err) - os.Exit(2) - } - switch mode := fd.Mode(); { - case mode.IsDir(): - return true - case mode.IsRegular(): - return false - } - return false -} diff --git a/rand/strings.go b/rand/strings.go new file mode 100644 index 0000000..7f4acd2 --- /dev/null +++ b/rand/strings.go @@ -0,0 +1,24 @@ +package rand + +import ( + "math/rand" + "time" +) + +const charset = "abcdefghijklmnopqrstuvwxyz" + + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + +var seededRand *rand.Rand = rand.New( + rand.NewSource(time.Now().UnixNano())) + +func StringWithCharset(length int, charset string) string { + b := make([]byte, length) + for i := range b { + b[i] = charset[seededRand.Intn(len(charset))] + } + return string(b) +} + +func String(length int) string { + return StringWithCharset(length, charset) +} From 16ac408387fe5a1fa5b636bb0dea4b64a06ebcff Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Fri, 13 Oct 2017 16:18:44 +0100 Subject: [PATCH 09/12] Small improvement --- cli/data/commit.go | 2 +- cli/data/commit_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cli/data/commit.go b/cli/data/commit.go index e926a2f..0f87428 100644 --- a/cli/data/commit.go +++ b/cli/data/commit.go @@ -66,7 +66,7 @@ func validatePath(path string) { if err != nil { exitErrorf("Source path %v not found", path) } - if !fd.Mode().IsDir() { + if !fd.IsDir() { exitErrorf("Source path %v must be a directory", path) } } diff --git a/cli/data/commit_test.go b/cli/data/commit_test.go index 9be8b57..74c56d4 100644 --- a/cli/data/commit_test.go +++ b/cli/data/commit_test.go @@ -9,7 +9,7 @@ import ( func TestFilesToKeys(t *testing.T) { AppFs = afero.NewMemMapFs() - AppFs.MkdirAll("src/", 0755) + AppFs.MkdirAll("src/a", 0755) afero.WriteFile(AppFs, "src/a/b", []byte("file c"), 0644) afero.WriteFile(AppFs, "src/c", []byte("file c"), 0644) From a358f626ceb65991be0f627a2743d6f8acbb663a Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Mon, 16 Oct 2017 10:58:51 +0100 Subject: [PATCH 10/12] Better readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1ac3496..76385cd 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Paddle is a command line tool for canoe data archival and processing. -## Setup +## Setup for local development Make sure you have Go installed on your machine and that you checkout the repo to the right folder. By default should be: From b2f065e5fa22745d78b4f610acc9c0fe194a0299 Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Mon, 16 Oct 2017 14:18:48 +0100 Subject: [PATCH 11/12] Release instructions --- README.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 76385cd..195dc14 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,8 @@ You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g: bucket: roo-bucket ``` +or if you prefer specific `BUCKET` as an environment variable + You will also need to create a `$HOME/.aws/config` or `$HOME/.aws/credentials` so Paddle can connect to AWS, e.g.: ``` @@ -31,11 +33,20 @@ aws_secret_access_key=yyy region=eu-west-1 ``` - ``` $ go build ``` +## Release + +In order to release a new version, set up github export GITHUB_TOKEN=[YOUR_TOKEN] and do the following steps: + +``` +$ git tag -a vX.X.X -m "[Comment]" +$ git push origin vX.X.X +$ goreleaser +``` + ## Usage ``` From 2f696d02af45ddc1cef18527543b77c94245cceb Mon Sep 17 00:00:00 2001 From: Pedro Cunha Date: Mon, 16 Oct 2017 15:26:34 +0100 Subject: [PATCH 12/12] Fixes typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 195dc14..3a74b61 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g: bucket: roo-bucket ``` -or if you prefer specific `BUCKET` as an environment variable +or if you prefer specify `BUCKET` as an environment variable You will also need to create a `$HOME/.aws/config` or `$HOME/.aws/credentials` so Paddle can connect to AWS, e.g.: