Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2 from deliveroo/LOG-1772/canoe-productionise-paddle
Browse files Browse the repository at this point in the history
LOG-1772: Canoe: productionise Paddle
  • Loading branch information
pedrocunha authored Oct 16, 2017
2 parents 7ee4657 + 2f696d0 commit d1aba39
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 188 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
tmp
dist
paddle
47 changes: 45 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,51 @@
# Paddle

Paddle is a command line tool for data archival and processing.
Paddle is a command line tool for canoe data archival and processing.

Work in progress.
## Setup for local development

Make sure you have Go installed on your machine and that you checkout the repo to
the right folder. By default should be:

```
mkdir -p ~/go/src/github.com/deliveroo
cd ~/go/src/github.com/deliveroo
git clone git@github.com:deliveroo/paddle.git
cd paddle
```

You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g:

```
> cat $HOME/.paddle.yaml
bucket: roo-bucket
```

or if you prefer specify `BUCKET` as an environment variable

You will also need to create a `$HOME/.aws/config` or `$HOME/.aws/credentials` so Paddle can connect to AWS, e.g.:

```
> cat $HOME/.aws/credentials
[default]
aws_access_key_id=xxx
aws_secret_access_key=yyy
region=eu-west-1
```

```
$ go build
```

## Release

In order to release a new version, set up github export GITHUB_TOKEN=[YOUR_TOKEN] and do the following steps:

```
$ git tag -a vX.X.X -m "[Comment]"
$ git push origin vX.X.X
$ goreleaser
```

## Usage

Expand Down
93 changes: 49 additions & 44 deletions cli/data/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/deliveroo/paddle/common"
"github.com/deliveroo/paddle/rand"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"os"
"path/filepath"
"strings"
"time"
)

var commitBranch string
var AppFs = afero.NewOsFs()

var commitCmd = &cobra.Command{
Use: "commit [source path] [version]",
Expand All @@ -45,86 +46,90 @@ $ paddle data commit -b experimental source/path trained-model/version1
if !viper.IsSet("bucket") {
exitErrorf("Bucket not defined. Please define 'bucket' in your config file.")
}
commitPath(args[0], viper.GetString("bucket"), args[1], commitBranch)

destination := S3Path{
bucket: viper.GetString("bucket"),
path: fmt.Sprintf("%s/%s", args[1], commitBranch),
}

validatePath(args[0])
commitPath(args[0], destination)
},
}

func init() {
commitCmd.Flags().StringVarP(&commitBranch, "branch", "b", "master", "Branch to work on")
}

func commitPath(path string, bucket string, version string, branch string) {
fd, err := os.Stat(path)
func validatePath(path string) {
fd, err := AppFs.Stat(path)
if err != nil {
exitErrorf("Path %v not found", path)
exitErrorf("Source path %v not found", path)
}
if !fd.Mode().IsDir() {
exitErrorf("Path %v must be a directory", path)
if !fd.IsDir() {
exitErrorf("Source path %v must be a directory", path)
}
}

hash, err := common.DirHash(path)
if err != nil {
exitErrorf("Unable to hash input folder")
}

t := time.Now().UTC()

datePath := fmt.Sprintf("%d/%02d/%02d/%02d%02d",
t.Year(), t.Month(), t.Day(),
t.Hour(), t.Minute())

destPath := fmt.Sprintf("%s/%s/%s_%s", version, branch, datePath, hash)

func commitPath(path string, destination S3Path) {
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))

fileList := []string{}
filepath.Walk(path, func(p string, f os.FileInfo, err error) error {
if common.IsDirectory(p) {
return nil
} else {
fileList = append(fileList, p)
return nil
}
})

rootKey := generateRootKey(destination)
keys := filesToKeys(path)
uploader := s3manager.NewUploader(sess)

for _, file := range fileList {
key := destPath + "/" + strings.TrimPrefix(file, path+"/")
for _, file := range keys {
key := fmt.Sprintf("%s/%s", rootKey, strings.TrimPrefix(file, path+"/"))
fmt.Println(file + " -> " + key)
uploadFileToS3(uploader, bucket, key, file)
uploadFileToS3(uploader, destination.bucket, key, file)
}

// Update HEAD
headKey := fmt.Sprintf("%s/HEAD", destination.path)
uploadDataToS3(sess, destination.bucket, headKey, rootKey)
}

func filesToKeys(path string) (keys []string) {
afero.Walk(AppFs, path, func(p string, f os.FileInfo, err error) error {
if f.IsDir() {
return nil
}
keys = append(keys, p)
return nil
})
return keys
}

headFile := fmt.Sprintf("%s/%s/HEAD", version, branch)
func generateRootKey(destination S3Path) string {
t := time.Now().UTC()
datePath := fmt.Sprintf("%d/%02d/%02d/%02d/%02d",
t.Year(), t.Month(), t.Day(),
t.Hour(), t.Minute())

uploadDataToS3(sess, destPath, bucket, headFile)
return fmt.Sprintf("%s/%s_%s", destination.path, datePath, rand.String(10))
}

func uploadFileToS3(uploader *s3manager.Uploader, bucketName string, key string, filePath string) {
file, err := os.Open(filePath)
func uploadFileToS3(uploader *s3manager.Uploader, bucket string, key string, filePath string) {
file, err := AppFs.Open(filePath)
if err != nil {
fmt.Println("Failed to open file", file, err)
os.Exit(1)
exitErrorf("Failed to open file", file, err)
}
defer file.Close()

_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(bucketName),
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: file,
})

if err != nil {
exitErrorf("Failed to upload data to %s/%s, %s", bucketName, key, err.Error())
return
exitErrorf("Failed to upload data to %s/%s, %s", bucket, key, err.Error())
}
}

func uploadDataToS3(sess *session.Session, data string, bucket string, key string) {
func uploadDataToS3(sess *session.Session, bucket string, key string, data string) {
s3Svc := s3.New(sess)

_, err := s3Svc.PutObject(&s3.PutObjectInput{
Expand Down
36 changes: 36 additions & 0 deletions cli/data/commit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package data

import (
"github.com/spf13/afero"
"reflect"
"strings"
"testing"
)

func TestFilesToKeys(t *testing.T) {
AppFs = afero.NewMemMapFs()
AppFs.MkdirAll("src/a", 0755)
afero.WriteFile(AppFs, "src/a/b", []byte("file c"), 0644)
afero.WriteFile(AppFs, "src/c", []byte("file c"), 0644)

list := filesToKeys("src")
expectation := []string{
"src/a/b",
"src/c",
}

if !reflect.DeepEqual(list, expectation) {
t.Errorf("list is different got: %s, want: %s.", strings.Join(list, ","), strings.Join(expectation, ","))
}
}

func TestFilesToKeysWhenEmptyFolder(t *testing.T) {
AppFs = afero.NewMemMapFs()
AppFs.MkdirAll("src", 0755)

list := filesToKeys("src")

if len(list) != 0 {
t.Errorf("expecting empty list but got: %s", strings.Join(list, ","))
}
}
82 changes: 49 additions & 33 deletions cli/data/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ $ paddle data get -b experimental trained-model/version1 dest/path
if !viper.IsSet("bucket") {
exitErrorf("Bucket not defined. Please define 'bucket' in your config file.")
}
fetchPath(viper.GetString("bucket"), args[0], getBranch, getCommitPath, args[1])

source := S3Path{
bucket: viper.GetString("bucket"),
path: fmt.Sprintf("%s/%s/%s", args[0], getBranch, getCommitPath),
}

copyPathToDestination(source, args[1])
},
}

Expand All @@ -53,69 +59,79 @@ func init() {
getCmd.Flags().StringVarP(&getCommitPath, "path", "p", "HEAD", "Path to fetch (instead of HEAD)")
}

func fetchPath(bucket string, version string, branch string, path string, destination string) {
sess := session.Must(session.NewSessionWithOptions(session.Options{
func copyPathToDestination(source S3Path, destination string) {
session := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))

if path == "HEAD" {
svc := s3.New(sess)
headPath := fmt.Sprintf("%s/%s/HEAD", version, branch)
fmt.Println(headPath)
out, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(headPath),
})
if err != nil {
exitErrorf("%v", err)
}
buf := new(bytes.Buffer)
buf.ReadFrom(out.Body)
path = buf.String()
} else {
path = fmt.Sprintf("%s/%s/%s", version, branch, path)
/*
* HEAD contains the path to latest folder
*/
if source.Basename() == "HEAD" {
latestFolder := readHEAD(session, source)
source.path = strings.Replace(source.path, "HEAD", latestFolder, 1)
}
fmt.Println("Fetching " + path)
getBucketObjects(sess, bucket, path, destination)

fmt.Println("Copying " + source.path + " to " + destination)
copy(session, source, destination)
}

func getBucketObjects(sess *session.Session, bucket string, prefix string, dest string) {
func readHEAD(session *session.Session, source S3Path) string {
svc := s3.New(session)

out, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(source.bucket),
Key: aws.String(source.path),
})

if err != nil {
exitErrorf("%v", err)
}

buf := new(bytes.Buffer)
buf.ReadFrom(out.Body)
return buf.String()
}

func copy(session *session.Session, source S3Path, destination string) {
query := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
Bucket: aws.String(source.bucket),
Prefix: aws.String(source.path),
}
svc := s3.New(sess)
svc := s3.New(session)

truncatedListing := true

for truncatedListing {
resp, err := svc.ListObjectsV2(query)
response, err := svc.ListObjectsV2(query)

if err != nil {
fmt.Println(err.Error())
return
}
getObjectsAll(bucket, resp, svc, prefix, dest)
query.ContinuationToken = resp.NextContinuationToken
truncatedListing = *resp.IsTruncated
copyToLocalFiles(svc, response.Contents, source, destination)

// Check if more results
query.ContinuationToken = response.NextContinuationToken
truncatedListing = *response.IsTruncated
}
}

func getObjectsAll(bucket string, bucketObjectsList *s3.ListObjectsV2Output, s3Client *s3.S3, prefix string, dest string) {
for _, key := range bucketObjectsList.Contents {
func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source S3Path, destination string) {
for _, key := range objects {
destFilename := *key.Key
if strings.HasSuffix(*key.Key, "/") {
fmt.Println("Got a directory")
continue
}
out, err := s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Bucket: aws.String(source.bucket),
Key: key.Key,
})
if err != nil {
exitErrorf("%v", err)
}
destFilePath := dest + "/" + strings.TrimPrefix(destFilename, prefix+"/")
destFilePath := destination + "/" + strings.TrimPrefix(destFilename, source.Dirname()+"/")
err = os.MkdirAll(filepath.Dir(destFilePath), 0777)
fmt.Print(destFilePath)
destFile, err := os.Create(destFilePath)
Expand Down
23 changes: 23 additions & 0 deletions cli/data/s3path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package data

import (
"strings"
)

type S3Path struct {
bucket string
path string
}

func (p *S3Path) Basename() string {
components := strings.Split(p.path, "/")
return components[len(components)-1]
}

func (p *S3Path) Dirname() string {
components := strings.Split(p.path, "/")
if len(components) == 0 {
return ""
}
return strings.Join(components[:len(components)-1], "/")
}
Loading

0 comments on commit d1aba39

Please sign in to comment.