Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.

LOG-1772: Canoe: productionise Paddle #2

Merged
merged 12 commits into from
Oct 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
tmp
dist
paddle
47 changes: 45 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,51 @@
# Paddle

Paddle is a command line tool for data archival and processing.
Paddle is a command line tool for canoe data archival and processing.

Work in progress.
## Setup for local development

Make sure you have Go installed on your machine and that you checkout the repo to
the right folder. By default should be:

```
mkdir -p ~/go/src/github.com/deliveroo
cd ~/go/src/github.com/deliveroo
git clone git@github.com:deliveroo/paddle.git
cd paddle
```

You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g:

```
> cat $HOME/.paddle.yaml
bucket: roo-bucket
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth pointing out that these settings can be configured through ENV as well (e.g. BUCKET).

```

or if you prefer specify `BUCKET` as an environment variable

You will also need to create a `$HOME/.aws/config` or `$HOME/.aws/credentials` so Paddle can connect to AWS, e.g.:

```
> cat $HOME/.aws/credentials
[default]
aws_access_key_id=xxx
aws_secret_access_key=yyy
region=eu-west-1
```

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary newline

```
$ go build
```

## Release

In order to release a new version, set up github export GITHUB_TOKEN=[YOUR_TOKEN] and do the following steps:

```
$ git tag -a vX.X.X -m "[Comment]"
$ git push origin vX.X.X
$ goreleaser
```

## Usage

Expand Down
93 changes: 49 additions & 44 deletions cli/data/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/deliveroo/paddle/common"
"github.com/deliveroo/paddle/rand"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"os"
"path/filepath"
"strings"
"time"
)

var commitBranch string
var AppFs = afero.NewOsFs()

var commitCmd = &cobra.Command{
Use: "commit [source path] [version]",
Expand All @@ -45,86 +46,90 @@ $ paddle data commit -b experimental source/path trained-model/version1
if !viper.IsSet("bucket") {
exitErrorf("Bucket not defined. Please define 'bucket' in your config file.")
}
commitPath(args[0], viper.GetString("bucket"), args[1], commitBranch)

destination := S3Path{
bucket: viper.GetString("bucket"),
path: fmt.Sprintf("%s/%s", args[1], commitBranch),
}

validatePath(args[0])
commitPath(args[0], destination)
},
}

func init() {
commitCmd.Flags().StringVarP(&commitBranch, "branch", "b", "master", "Branch to work on")
}

func commitPath(path string, bucket string, version string, branch string) {
fd, err := os.Stat(path)
func validatePath(path string) {
fd, err := AppFs.Stat(path)
if err != nil {
exitErrorf("Path %v not found", path)
exitErrorf("Source path %v not found", path)
}
if !fd.Mode().IsDir() {
exitErrorf("Path %v must be a directory", path)
if !fd.IsDir() {
exitErrorf("Source path %v must be a directory", path)
}
}

hash, err := common.DirHash(path)
if err != nil {
exitErrorf("Unable to hash input folder")
}

t := time.Now().UTC()

datePath := fmt.Sprintf("%d/%02d/%02d/%02d%02d",
t.Year(), t.Month(), t.Day(),
t.Hour(), t.Minute())

destPath := fmt.Sprintf("%s/%s/%s_%s", version, branch, datePath, hash)

func commitPath(path string, destination S3Path) {
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))

fileList := []string{}
filepath.Walk(path, func(p string, f os.FileInfo, err error) error {
if common.IsDirectory(p) {
return nil
} else {
fileList = append(fileList, p)
return nil
}
})

rootKey := generateRootKey(destination)
keys := filesToKeys(path)
uploader := s3manager.NewUploader(sess)

for _, file := range fileList {
key := destPath + "/" + strings.TrimPrefix(file, path+"/")
for _, file := range keys {
key := fmt.Sprintf("%s/%s", rootKey, strings.TrimPrefix(file, path+"/"))
fmt.Println(file + " -> " + key)
uploadFileToS3(uploader, bucket, key, file)
uploadFileToS3(uploader, destination.bucket, key, file)
}

// Update HEAD
headKey := fmt.Sprintf("%s/HEAD", destination.path)
uploadDataToS3(sess, destination.bucket, headKey, rootKey)
}

func filesToKeys(path string) (keys []string) {
afero.Walk(AppFs, path, func(p string, f os.FileInfo, err error) error {
if f.IsDir() {
return nil
}
keys = append(keys, p)
return nil
})
return keys
}

headFile := fmt.Sprintf("%s/%s/HEAD", version, branch)
func generateRootKey(destination S3Path) string {
t := time.Now().UTC()
datePath := fmt.Sprintf("%d/%02d/%02d/%02d/%02d",
t.Year(), t.Month(), t.Day(),
t.Hour(), t.Minute())

uploadDataToS3(sess, destPath, bucket, headFile)
return fmt.Sprintf("%s/%s_%s", destination.path, datePath, rand.String(10))
}

func uploadFileToS3(uploader *s3manager.Uploader, bucketName string, key string, filePath string) {
file, err := os.Open(filePath)
func uploadFileToS3(uploader *s3manager.Uploader, bucket string, key string, filePath string) {
file, err := AppFs.Open(filePath)
if err != nil {
fmt.Println("Failed to open file", file, err)
os.Exit(1)
exitErrorf("Failed to open file", file, err)
}
defer file.Close()

_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(bucketName),
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: file,
})

if err != nil {
exitErrorf("Failed to upload data to %s/%s, %s", bucketName, key, err.Error())
return
exitErrorf("Failed to upload data to %s/%s, %s", bucket, key, err.Error())
}
}

func uploadDataToS3(sess *session.Session, data string, bucket string, key string) {
func uploadDataToS3(sess *session.Session, bucket string, key string, data string) {
s3Svc := s3.New(sess)

_, err := s3Svc.PutObject(&s3.PutObjectInput{
Expand Down
36 changes: 36 additions & 0 deletions cli/data/commit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package data

import (
"github.com/spf13/afero"
"reflect"
"strings"
"testing"
)

func TestFilesToKeys(t *testing.T) {
AppFs = afero.NewMemMapFs()
AppFs.MkdirAll("src/a", 0755)
afero.WriteFile(AppFs, "src/a/b", []byte("file c"), 0644)
afero.WriteFile(AppFs, "src/c", []byte("file c"), 0644)

list := filesToKeys("src")
expectation := []string{
"src/a/b",
"src/c",
}

if !reflect.DeepEqual(list, expectation) {
t.Errorf("list is different got: %s, want: %s.", strings.Join(list, ","), strings.Join(expectation, ","))
}
}

func TestFilesToKeysWhenEmptyFolder(t *testing.T) {
AppFs = afero.NewMemMapFs()
AppFs.MkdirAll("src", 0755)

list := filesToKeys("src")

if len(list) != 0 {
t.Errorf("expecting empty list but got: %s", strings.Join(list, ","))
}
}
82 changes: 49 additions & 33 deletions cli/data/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ $ paddle data get -b experimental trained-model/version1 dest/path
if !viper.IsSet("bucket") {
exitErrorf("Bucket not defined. Please define 'bucket' in your config file.")
}
fetchPath(viper.GetString("bucket"), args[0], getBranch, getCommitPath, args[1])

source := S3Path{
bucket: viper.GetString("bucket"),
path: fmt.Sprintf("%s/%s/%s", args[0], getBranch, getCommitPath),
}

copyPathToDestination(source, args[1])
},
}

Expand All @@ -53,69 +59,79 @@ func init() {
getCmd.Flags().StringVarP(&getCommitPath, "path", "p", "HEAD", "Path to fetch (instead of HEAD)")
}

func fetchPath(bucket string, version string, branch string, path string, destination string) {
sess := session.Must(session.NewSessionWithOptions(session.Options{
func copyPathToDestination(source S3Path, destination string) {
session := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))

if path == "HEAD" {
svc := s3.New(sess)
headPath := fmt.Sprintf("%s/%s/HEAD", version, branch)
fmt.Println(headPath)
out, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(headPath),
})
if err != nil {
exitErrorf("%v", err)
}
buf := new(bytes.Buffer)
buf.ReadFrom(out.Body)
path = buf.String()
} else {
path = fmt.Sprintf("%s/%s/%s", version, branch, path)
/*
* HEAD contains the path to latest folder
*/
if source.Basename() == "HEAD" {
latestFolder := readHEAD(session, source)
source.path = strings.Replace(source.path, "HEAD", latestFolder, 1)
}
fmt.Println("Fetching " + path)
getBucketObjects(sess, bucket, path, destination)

fmt.Println("Copying " + source.path + " to " + destination)
copy(session, source, destination)
}

func getBucketObjects(sess *session.Session, bucket string, prefix string, dest string) {
func readHEAD(session *session.Session, source S3Path) string {
svc := s3.New(session)

out, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(source.bucket),
Key: aws.String(source.path),
})

if err != nil {
exitErrorf("%v", err)
}

buf := new(bytes.Buffer)
buf.ReadFrom(out.Body)
return buf.String()
}

func copy(session *session.Session, source S3Path, destination string) {
query := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
Bucket: aws.String(source.bucket),
Prefix: aws.String(source.path),
}
svc := s3.New(sess)
svc := s3.New(session)

truncatedListing := true

for truncatedListing {
resp, err := svc.ListObjectsV2(query)
response, err := svc.ListObjectsV2(query)

if err != nil {
fmt.Println(err.Error())
return
}
getObjectsAll(bucket, resp, svc, prefix, dest)
query.ContinuationToken = resp.NextContinuationToken
truncatedListing = *resp.IsTruncated
copyToLocalFiles(svc, response.Contents, source, destination)

// Check if more results
query.ContinuationToken = response.NextContinuationToken
truncatedListing = *response.IsTruncated
}
}

func getObjectsAll(bucket string, bucketObjectsList *s3.ListObjectsV2Output, s3Client *s3.S3, prefix string, dest string) {
for _, key := range bucketObjectsList.Contents {
func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source S3Path, destination string) {
for _, key := range objects {
destFilename := *key.Key
if strings.HasSuffix(*key.Key, "/") {
fmt.Println("Got a directory")
continue
}
out, err := s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Bucket: aws.String(source.bucket),
Key: key.Key,
})
if err != nil {
exitErrorf("%v", err)
}
destFilePath := dest + "/" + strings.TrimPrefix(destFilename, prefix+"/")
destFilePath := destination + "/" + strings.TrimPrefix(destFilename, source.Dirname()+"/")
err = os.MkdirAll(filepath.Dir(destFilePath), 0777)
fmt.Print(destFilePath)
destFile, err := os.Create(destFilePath)
Expand Down
23 changes: 23 additions & 0 deletions cli/data/s3path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package data

import (
"strings"
)

type S3Path struct {
bucket string
path string
}

func (p *S3Path) Basename() string {
components := strings.Split(p.path, "/")
return components[len(components)-1]
}

func (p *S3Path) Dirname() string {
components := strings.Split(p.path, "/")
if len(components) == 0 {
return ""
}
return strings.Join(components[:len(components)-1], "/")
}
Loading