Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.

Commit

Permalink
Couple of refactors to commit operation
Browse files Browse the repository at this point in the history
  • Loading branch information
Pedro Cunha committed Sep 26, 2017
1 parent 61c25ae commit dbd350c
Showing 1 changed file with 29 additions and 27 deletions.
56 changes: 29 additions & 27 deletions cli/data/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,29 @@ $ paddle data commit -b experimental source/path trained-model/version1
if !viper.IsSet("bucket") {
exitErrorf("Bucket not defined. Please define 'bucket' in your config file.")
}
commitPath(args[0], viper.GetString("bucket"), args[1], commitBranch)

destination := S3Path{
bucket: viper.GetString("bucket"),
path: fmt.Sprintf("%s/%s", args[1], commitBranch),
}

commitPath(args[0], destination)
},
}

func init() {
commitCmd.Flags().StringVarP(&commitBranch, "branch", "b", "master", "Branch to work on")
}

func commitPath(path string, bucket string, version string, branch string) {
func commitPath(path string, destination S3Path) {
fd, err := os.Stat(path)
if err != nil {
exitErrorf("Path %v not found", path)
exitErrorf("Source path %v not found", path)
}
if !fd.Mode().IsDir() {
exitErrorf("Path %v must be a directory", path)
}

hash, err := common.DirHash(path)
if err != nil {
exitErrorf("Unable to hash input folder")
exitErrorf("Source path %v must be a directory", path)
}

t := time.Now().UTC()

datePath := fmt.Sprintf("%d/%02d/%02d/%02d%02d",
t.Year(), t.Month(), t.Day(),
t.Hour(), t.Minute())

destPath := fmt.Sprintf("%s/%s/%s_%s", version, branch, datePath, hash)

sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
Expand All @@ -89,22 +82,31 @@ func commitPath(path string, bucket string, version string, branch string) {
}
})

t := time.Now().UTC()
datePath := fmt.Sprintf("%d/%02d/%02d/%02d%02d",
t.Year(), t.Month(), t.Day(),
t.Hour(), t.Minute())

hash, err := common.DirHash(path)
if err != nil {
exitErrorf("Unable to hash input folder")
}

uploader := s3manager.NewUploader(sess)
folderKey := fmt.Sprintf("%s/%s_%s", destination.path, datePath, hash)

for _, file := range fileList {
key := destPath + "/" + strings.TrimPrefix(file, path+"/")
key := fmt.Sprintf("%s/%s", folderKey, strings.TrimPrefix(file, path + "/"))
fmt.Println(file + " -> " + key)
uploadFileToS3(uploader, bucket, key, file)
uploadFileToS3(uploader, destination.bucket, key, file)
}

// Update HEAD

headFile := fmt.Sprintf("%s/%s/HEAD", version, branch)

uploadDataToS3(sess, destPath, bucket, headFile)
headKey := fmt.Sprintf("%s/HEAD", destination.path)
uploadDataToS3(sess, destination.bucket, headKey, folderKey)
}

func uploadFileToS3(uploader *s3manager.Uploader, bucketName string, key string, filePath string) {
func uploadFileToS3(uploader *s3manager.Uploader, bucket string, key string, filePath string) {
file, err := os.Open(filePath)
if err != nil {
fmt.Println("Failed to open file", file, err)
Expand All @@ -113,18 +115,18 @@ func uploadFileToS3(uploader *s3manager.Uploader, bucketName string, key string,
defer file.Close()

_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(bucketName),
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: file,
})

if err != nil {
exitErrorf("Failed to upload data to %s/%s, %s", bucketName, key, err.Error())
exitErrorf("Failed to upload data to %s/%s, %s", bucket, key, err.Error())
return
}
}

func uploadDataToS3(sess *session.Session, data string, bucket string, key string) {
func uploadDataToS3(sess *session.Session, bucket string, key string, data string) {
s3Svc := s3.New(sess)

_, err := s3Svc.PutObject(&s3.PutObjectInput{
Expand Down

0 comments on commit dbd350c

Please sign in to comment.