diff --git a/.gitignore b/.gitignore index d6f3562..bfe1862 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ tmp dist +paddle diff --git a/README.md b/README.md index 5438767..3a74b61 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,51 @@ # Paddle -Paddle is a command line tool for data archival and processing. +Paddle is a command line tool for canoe data archival and processing. -Work in progress. +## Setup for local development + +Make sure you have Go installed on your machine and that you checkout the repo to +the right folder. By default should be: + +``` +mkdir -p ~/go/src/github.com/deliveroo +cd ~/go/src/github.com/deliveroo +git clone git@github.com:deliveroo/paddle.git +cd paddle +``` + +You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g: + +``` +> cat $HOME/.paddle.yaml +bucket: roo-bucket +``` + +or if you prefer specify `BUCKET` as an environment variable + +You will also need to create a `$HOME/.aws/config` or `$HOME/.aws/credentials` so Paddle can connect to AWS, e.g.: + +``` +> cat $HOME/.aws/credentials +[default] +aws_access_key_id=xxx +aws_secret_access_key=yyy +region=eu-west-1 +``` + +``` +$ go build +``` + +## Release + +In order to release a new version, set up github export GITHUB_TOKEN=[YOUR_TOKEN] and do the following steps: + +``` +$ git tag -a vX.X.X -m "[Comment]" +$ git push origin vX.X.X +$ goreleaser +``` ## Usage diff --git a/cli/data/commit.go b/cli/data/commit.go index b61e5a6..0f87428 100644 --- a/cli/data/commit.go +++ b/cli/data/commit.go @@ -20,16 +20,17 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" - "github.com/deliveroo/paddle/common" + "github.com/deliveroo/paddle/rand" + "github.com/spf13/afero" "github.com/spf13/cobra" "github.com/spf13/viper" "os" - "path/filepath" "strings" "time" ) var commitBranch string +var AppFs = afero.NewOsFs() var commitCmd = &cobra.Command{ Use: "commit [source path] [version]", @@ -45,7 +46,14 @@ $ paddle data commit -b experimental source/path trained-model/version1 if !viper.IsSet("bucket") { exitErrorf("Bucket not defined. Please define 'bucket' in your config file.") } - commitPath(args[0], viper.GetString("bucket"), args[1], commitBranch) + + destination := S3Path{ + bucket: viper.GetString("bucket"), + path: fmt.Sprintf("%s/%s", args[1], commitBranch), + } + + validatePath(args[0]) + commitPath(args[0], destination) }, } @@ -53,78 +61,75 @@ func init() { commitCmd.Flags().StringVarP(&commitBranch, "branch", "b", "master", "Branch to work on") } -func commitPath(path string, bucket string, version string, branch string) { - fd, err := os.Stat(path) +func validatePath(path string) { + fd, err := AppFs.Stat(path) if err != nil { - exitErrorf("Path %v not found", path) + exitErrorf("Source path %v not found", path) } - if !fd.Mode().IsDir() { - exitErrorf("Path %v must be a directory", path) + if !fd.IsDir() { + exitErrorf("Source path %v must be a directory", path) } +} - hash, err := common.DirHash(path) - if err != nil { - exitErrorf("Unable to hash input folder") - } - - t := time.Now().UTC() - - datePath := fmt.Sprintf("%d/%02d/%02d/%02d%02d", - t.Year(), t.Month(), t.Day(), - t.Hour(), t.Minute()) - - destPath := fmt.Sprintf("%s/%s/%s_%s", version, branch, datePath, hash) - +func commitPath(path string, destination S3Path) { sess := session.Must(session.NewSessionWithOptions(session.Options{ SharedConfigState: session.SharedConfigEnable, })) - fileList := []string{} - filepath.Walk(path, func(p string, f os.FileInfo, err error) error { - if common.IsDirectory(p) { - return nil - } else { - fileList = append(fileList, p) - return nil - } - }) - + rootKey := generateRootKey(destination) + keys := filesToKeys(path) uploader := s3manager.NewUploader(sess) - for _, file := range fileList { - key := destPath + "/" + strings.TrimPrefix(file, path+"/") + for _, file := range keys { + key := fmt.Sprintf("%s/%s", rootKey, strings.TrimPrefix(file, path+"/")) fmt.Println(file + " -> " + key) - uploadFileToS3(uploader, bucket, key, file) + uploadFileToS3(uploader, destination.bucket, key, file) } // Update HEAD + headKey := fmt.Sprintf("%s/HEAD", destination.path) + uploadDataToS3(sess, destination.bucket, headKey, rootKey) +} + +func filesToKeys(path string) (keys []string) { + afero.Walk(AppFs, path, func(p string, f os.FileInfo, err error) error { + if f.IsDir() { + return nil + } + keys = append(keys, p) + return nil + }) + return keys +} - headFile := fmt.Sprintf("%s/%s/HEAD", version, branch) +func generateRootKey(destination S3Path) string { + t := time.Now().UTC() + datePath := fmt.Sprintf("%d/%02d/%02d/%02d/%02d", + t.Year(), t.Month(), t.Day(), + t.Hour(), t.Minute()) - uploadDataToS3(sess, destPath, bucket, headFile) + return fmt.Sprintf("%s/%s_%s", destination.path, datePath, rand.String(10)) } -func uploadFileToS3(uploader *s3manager.Uploader, bucketName string, key string, filePath string) { - file, err := os.Open(filePath) +func uploadFileToS3(uploader *s3manager.Uploader, bucket string, key string, filePath string) { + file, err := AppFs.Open(filePath) if err != nil { - fmt.Println("Failed to open file", file, err) - os.Exit(1) + exitErrorf("Failed to open file", file, err) } defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(bucketName), + Bucket: aws.String(bucket), Key: aws.String(key), Body: file, }) if err != nil { - exitErrorf("Failed to upload data to %s/%s, %s", bucketName, key, err.Error()) - return + exitErrorf("Failed to upload data to %s/%s, %s", bucket, key, err.Error()) } } -func uploadDataToS3(sess *session.Session, data string, bucket string, key string) { +func uploadDataToS3(sess *session.Session, bucket string, key string, data string) { s3Svc := s3.New(sess) _, err := s3Svc.PutObject(&s3.PutObjectInput{ diff --git a/cli/data/commit_test.go b/cli/data/commit_test.go new file mode 100644 index 0000000..74c56d4 --- /dev/null +++ b/cli/data/commit_test.go @@ -0,0 +1,36 @@ +package data + +import ( + "github.com/spf13/afero" + "reflect" + "strings" + "testing" +) + +func TestFilesToKeys(t *testing.T) { + AppFs = afero.NewMemMapFs() + AppFs.MkdirAll("src/a", 0755) + afero.WriteFile(AppFs, "src/a/b", []byte("file c"), 0644) + afero.WriteFile(AppFs, "src/c", []byte("file c"), 0644) + + list := filesToKeys("src") + expectation := []string{ + "src/a/b", + "src/c", + } + + if !reflect.DeepEqual(list, expectation) { + t.Errorf("list is different got: %s, want: %s.", strings.Join(list, ","), strings.Join(expectation, ",")) + } +} + +func TestFilesToKeysWhenEmptyFolder(t *testing.T) { + AppFs = afero.NewMemMapFs() + AppFs.MkdirAll("src", 0755) + + list := filesToKeys("src") + + if len(list) != 0 { + t.Errorf("expecting empty list but got: %s", strings.Join(list, ",")) + } +} diff --git a/cli/data/get.go b/cli/data/get.go index 24e0b8b..33185ad 100644 --- a/cli/data/get.go +++ b/cli/data/get.go @@ -44,7 +44,13 @@ $ paddle data get -b experimental trained-model/version1 dest/path if !viper.IsSet("bucket") { exitErrorf("Bucket not defined. Please define 'bucket' in your config file.") } - fetchPath(viper.GetString("bucket"), args[0], getBranch, getCommitPath, args[1]) + + source := S3Path{ + bucket: viper.GetString("bucket"), + path: fmt.Sprintf("%s/%s/%s", args[0], getBranch, getCommitPath), + } + + copyPathToDestination(source, args[1]) }, } @@ -53,69 +59,79 @@ func init() { getCmd.Flags().StringVarP(&getCommitPath, "path", "p", "HEAD", "Path to fetch (instead of HEAD)") } -func fetchPath(bucket string, version string, branch string, path string, destination string) { - sess := session.Must(session.NewSessionWithOptions(session.Options{ +func copyPathToDestination(source S3Path, destination string) { + session := session.Must(session.NewSessionWithOptions(session.Options{ SharedConfigState: session.SharedConfigEnable, })) - if path == "HEAD" { - svc := s3.New(sess) - headPath := fmt.Sprintf("%s/%s/HEAD", version, branch) - fmt.Println(headPath) - out, err := svc.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(headPath), - }) - if err != nil { - exitErrorf("%v", err) - } - buf := new(bytes.Buffer) - buf.ReadFrom(out.Body) - path = buf.String() - } else { - path = fmt.Sprintf("%s/%s/%s", version, branch, path) + /* + * HEAD contains the path to latest folder + */ + if source.Basename() == "HEAD" { + latestFolder := readHEAD(session, source) + source.path = strings.Replace(source.path, "HEAD", latestFolder, 1) } - fmt.Println("Fetching " + path) - getBucketObjects(sess, bucket, path, destination) + + fmt.Println("Copying " + source.path + " to " + destination) + copy(session, source, destination) } -func getBucketObjects(sess *session.Session, bucket string, prefix string, dest string) { +func readHEAD(session *session.Session, source S3Path) string { + svc := s3.New(session) + + out, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(source.bucket), + Key: aws.String(source.path), + }) + + if err != nil { + exitErrorf("%v", err) + } + + buf := new(bytes.Buffer) + buf.ReadFrom(out.Body) + return buf.String() +} + +func copy(session *session.Session, source S3Path, destination string) { query := &s3.ListObjectsV2Input{ - Bucket: aws.String(bucket), - Prefix: aws.String(prefix), + Bucket: aws.String(source.bucket), + Prefix: aws.String(source.path), } - svc := s3.New(sess) + svc := s3.New(session) truncatedListing := true for truncatedListing { - resp, err := svc.ListObjectsV2(query) + response, err := svc.ListObjectsV2(query) if err != nil { fmt.Println(err.Error()) return } - getObjectsAll(bucket, resp, svc, prefix, dest) - query.ContinuationToken = resp.NextContinuationToken - truncatedListing = *resp.IsTruncated + copyToLocalFiles(svc, response.Contents, source, destination) + + // Check if more results + query.ContinuationToken = response.NextContinuationToken + truncatedListing = *response.IsTruncated } } -func getObjectsAll(bucket string, bucketObjectsList *s3.ListObjectsV2Output, s3Client *s3.S3, prefix string, dest string) { - for _, key := range bucketObjectsList.Contents { +func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source S3Path, destination string) { + for _, key := range objects { destFilename := *key.Key if strings.HasSuffix(*key.Key, "/") { fmt.Println("Got a directory") continue } out, err := s3Client.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(bucket), + Bucket: aws.String(source.bucket), Key: key.Key, }) if err != nil { exitErrorf("%v", err) } - destFilePath := dest + "/" + strings.TrimPrefix(destFilename, prefix+"/") + destFilePath := destination + "/" + strings.TrimPrefix(destFilename, source.Dirname()+"/") err = os.MkdirAll(filepath.Dir(destFilePath), 0777) fmt.Print(destFilePath) destFile, err := os.Create(destFilePath) diff --git a/cli/data/s3path.go b/cli/data/s3path.go new file mode 100644 index 0000000..9a48a89 --- /dev/null +++ b/cli/data/s3path.go @@ -0,0 +1,23 @@ +package data + +import ( + "strings" +) + +type S3Path struct { + bucket string + path string +} + +func (p *S3Path) Basename() string { + components := strings.Split(p.path, "/") + return components[len(components)-1] +} + +func (p *S3Path) Dirname() string { + components := strings.Split(p.path, "/") + if len(components) == 0 { + return "" + } + return strings.Join(components[:len(components)-1], "/") +} diff --git a/cli/data/s3path_test.go b/cli/data/s3path_test.go new file mode 100644 index 0000000..03f5a11 --- /dev/null +++ b/cli/data/s3path_test.go @@ -0,0 +1,59 @@ +package data + +import "testing" + +func TestBasename(t *testing.T) { + path := S3Path{ + bucket: "foo", + path: "aaa/bbb/ccc", + } + + dirname := path.Basename() + expectation := "ccc" + + if dirname != expectation { + t.Errorf("Basename was incorrect, got: %s, want: %s.", dirname, expectation) + } +} + +func TestBasenameWithEmptyPath(t *testing.T) { + path := S3Path{ + bucket: "foo", + path: "", + } + + dirname := path.Basename() + expectation := "" + + if dirname != expectation { + t.Errorf("Basename was incorrect, got: %s, want: %s.", dirname, expectation) + } +} + +func TestDirname(t *testing.T) { + path := S3Path{ + bucket: "foo", + path: "aaa/bbb/ccc", + } + + dirname := path.Dirname() + expectation := "aaa/bbb" + + if dirname != expectation { + t.Errorf("Dirname was incorrect, got: %s, want: %s.", dirname, expectation) + } +} + +func TestDirnameOnBasicPath(t *testing.T) { + path := S3Path{ + bucket: "foo", + path: "aaa", + } + + dirname := path.Dirname() + expectation := "" + + if dirname != expectation { + t.Errorf("Dirname was incorrect, got: %s, want: %s.", dirname, expectation) + } +} diff --git a/cli/root.go b/cli/root.go index 00c08f1..ae2c4c7 100644 --- a/cli/root.go +++ b/cli/root.go @@ -28,13 +28,8 @@ var cfgFile string // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ Use: "paddle", - Short: "A brief description of your application", - Long: `A longer description that spans multiple lines and likely contains -examples and usage of using your application. For example: - -Cobra is a CLI library for Go that empowers applications. -This application is a tool to generate the needed files -to quickly create a Cobra application.`, + Short: "Canoe tool for data archival and processing", + Long: "Canoe tool for data archival and processing", // Uncomment the following line if your bare application // has an action associated with it: // Run: func(cmd *cobra.Command, args []string) { }, diff --git a/common/hasher.go b/common/hasher.go deleted file mode 100644 index 81998ee..0000000 --- a/common/hasher.go +++ /dev/null @@ -1,81 +0,0 @@ -package common - -import ( - "crypto/sha1" - "fmt" - "io" - "math" - "os" - "path/filepath" - "strings" -) - -const filechunk = 8192 - -func DirHash(path string) (string, error) { - fileList := []string{} - sha1List := []string{} - filepath.Walk(path, func(p string, f os.FileInfo, err error) error { - if IsDirectory(p) { - return nil - } else { - fileList = append(fileList, p) - return nil - } - }) - for _, file := range fileList { - sha, err := FileHash(file) - if err == nil { - sha1List = append(sha1List, fmt.Sprintf("%s:%s", file, sha)) - } else { - return "", err - } - } - files := strings.Join(sha1List, "\n") - hasher := sha1.New() - hasher.Write([]byte(files)) - return fmt.Sprintf("%x", hasher.Sum(nil)), nil -} - -func FileHash(path string) (string, error) { - // Open the file for reading - file, err := os.Open(path) - if err != nil { - fmt.Println("Cannot find file:", os.Args[1]) - return "", err - } - - defer file.Close() - - // Get file info - info, err := file.Stat() - if err != nil { - fmt.Println("Cannot access file:", os.Args[1]) - return "", err - } - - // Get the filesize - filesize := info.Size() - - // Calculate the number of blocks - blocks := uint64(math.Ceil(float64(filesize) / float64(filechunk))) - - hash := sha1.New() - - // Check each block - for i := uint64(0); i < blocks; i++ { - // Calculate block size - blocksize := int(math.Min(filechunk, float64(filesize-int64(i*filechunk)))) - - // Make a buffer - buf := make([]byte, blocksize) - - // Make a buffer - file.Read(buf) - - // Write to the buffer - io.WriteString(hash, string(buf)) - } - - return fmt.Sprintf("%x", hash.Sum(nil)), nil -} diff --git a/common/path.go b/common/path.go deleted file mode 100644 index dbc9286..0000000 --- a/common/path.go +++ /dev/null @@ -1,21 +0,0 @@ -package common - -import ( - "fmt" - "os" -) - -func IsDirectory(path string) bool { - fd, err := os.Stat(path) - if err != nil { - fmt.Println(err) - os.Exit(2) - } - switch mode := fd.Mode(); { - case mode.IsDir(): - return true - case mode.IsRegular(): - return false - } - return false -} diff --git a/rand/strings.go b/rand/strings.go new file mode 100644 index 0000000..7f4acd2 --- /dev/null +++ b/rand/strings.go @@ -0,0 +1,24 @@ +package rand + +import ( + "math/rand" + "time" +) + +const charset = "abcdefghijklmnopqrstuvwxyz" + + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + +var seededRand *rand.Rand = rand.New( + rand.NewSource(time.Now().UnixNano())) + +func StringWithCharset(length int, charset string) string { + b := make([]byte, length) + for i := range b { + b[i] = charset[seededRand.Intn(len(charset))] + } + return string(b) +} + +func String(length int) string { + return StringWithCharset(length, charset) +}