Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pga: safe downloads #69

Merged
merged 3 commits into from
Jul 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions PublicGitArchive/pga/cmd/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"compress/gzip"
"context"
"fmt"
"io"
"os/user"
Expand All @@ -17,32 +18,83 @@ const (

// updateCache checks whether a new version of the file in url exists and downloads it
// to dest. It returns an error when it was not possible to update it.
func updateCache(dest, source FileSystem, name string) error {
func updateCache(ctx context.Context, dest, source FileSystem, name string) error {
logrus.Debugf("syncing %s to %s", source.Abs(name), dest.Abs(name))
if upToDate(dest, source, name) {
logrus.Debugf("local copy is up to date")
return nil
}

logrus.Debugf("local copy is outdated or non existent")
wc, err := dest.Create(name)
tmpName := name + ".tmp"
if err := copy(ctx, source, dest, name, tmpName); err != nil {
if cerr := dest.Remove(tmpName); cerr != nil {
logrus.Warningf("error removing temporary file %s: %v",
dest.Abs(tmpName), cerr)
}

return fmt.Errorf("could not copy to temporary file %s: %v",
dest.Abs(tmpName), err)
}

if err := dest.Rename(tmpName, name); err != nil {
return fmt.Errorf("rename %s to %s failed: %v",
dest.Abs(tmpName), dest.Abs(name), err)
}

return nil
}

func copy(ctx context.Context, source, dest FileSystem,
sourceName, destName string) (err error) {

wc, err := dest.Create(destName)
if err != nil {
return fmt.Errorf("could not create %s: %v", dest.Abs(name), err)
return fmt.Errorf("could not create %s: %v", dest.Abs(destName), err)
}

rc, err := source.Open(name)
rc, err := source.Open(sourceName)
if err != nil {
_ = wc.Close()
return err
}
defer rc.Close()

if _, err = io.Copy(wc, rc); err != nil {
return fmt.Errorf("could not copy %s to %s: %v", source.Abs(name), dest.Abs(name), err)
if _, err = cancelableCopy(ctx, wc, rc); err != nil {
_ = rc.Close()
_ = wc.Close()
return fmt.Errorf("could not copy %s to %s: %v",
source.Abs(sourceName), dest.Abs(destName), err)
}
if err := wc.Close(); err != nil {
return fmt.Errorf("could not close %s: %v", dest.Abs(name), err)

if err := rc.Close(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say that failing to close a reader is not important, so it's ok to simply do defer rc.Close(),
but I'll let you decide on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That depends on the reader implementation. Some of them signal unexpected EOF on Close. You could argue that it is their fault to not use UnexpectedEOF or some other errors on Read though...

_ = wc.Close()
return err
}

return wc.Close()
}

const copyBufferSize = 512 * 1024

func cancelableCopy(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm almost tempted to propose this function to Go's stdlib.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great, but I doubt it gets accepted, since nothing in io or io/ioutil is cancellable. Also, there are prominent voices against that usage: Context isn’t for cancellation.

var written int64
for {
select {
case <-ctx.Done():
return written, fmt.Errorf("download interrupted")
default:
}

w, err := io.CopyN(dst, src, copyBufferSize)
written += w
if err == io.EOF {
return written, nil
}

if err != nil {
return written, err
}
}
return nil
}

func upToDate(dest, source FileSystem, name string) bool {
Expand Down Expand Up @@ -79,15 +131,15 @@ func matchHash(dest, source FileSystem, name string) (bool, error) {
return localHash == remoteHash, nil
}

func getIndex() (io.ReadCloser, error) {
func getIndex(ctx context.Context) (io.ReadCloser, error) {
usr, err := user.Current()
if err != nil {
return nil, err
}
dest := localFS(filepath.Join(usr.HomeDir, ".pga"))
source := urlFS(indexURL)

if err := updateCache(dest, source, indexName); err != nil {
if err := updateCache(ctx, dest, source, indexName); err != nil {
return nil, err
}

Expand Down
19 changes: 19 additions & 0 deletions PublicGitArchive/pga/cmd/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type FileSystem interface {
ModTime(path string) (time.Time, error)
Size(path string) (int64, error)
MD5(path string) (string, error)
Remove(path string) error
Rename(oldpath, newpath string) error
}

// FileSystemFromFlags returns the correct file system given a set of flags.
Expand Down Expand Up @@ -63,6 +65,10 @@ func (fs localFS) Open(path string) (io.ReadCloser, error) { return os.Open(fs.A
func (fs localFS) ModTime(path string) (time.Time, error) { return modtime(os.Stat(fs.Abs(path))) }
func (fs localFS) Size(path string) (int64, error) { return size(os.Stat(fs.Abs(path))) }
func (fs localFS) MD5(path string) (string, error) { return md5Hash(fs, path) }
func (fs localFS) Remove(path string) error { return os.Remove(fs.Abs(path)) }
func (fs localFS) Rename(oldpath, newpath string) error {
return os.Rename(fs.Abs(oldpath), fs.Abs(newpath))
}

func md5Hash(fs FileSystem, path string) (string, error) {
rc, err := fs.Open(path)
Expand Down Expand Up @@ -152,6 +158,14 @@ func (fs urlFS) MD5(path string) (string, error) {
return string(bytes.Fields(b)[0]), nil
}

func (fs urlFS) Remove(path string) error {
return fmt.Errorf("not implemented for URLs")
}

func (fs urlFS) Rename(oldpath, newpath string) error {
return fmt.Errorf("not implemented for URLs")
}

type hdfsFS struct {
path string
c *hdfs.Client
Expand All @@ -172,6 +186,11 @@ func (fs hdfsFS) Open(path string) (io.ReadCloser, error) { return fs.c.Open(fs.
func (fs hdfsFS) ModTime(path string) (time.Time, error) { return modtime(fs.c.Stat(fs.Abs(path))) }
func (fs hdfsFS) Size(path string) (int64, error) { return size(fs.c.Stat(fs.Abs(path))) }
func (fs hdfsFS) MD5(path string) (string, error) { return md5Hash(fs, path) }
func (fs hdfsFS) Remove(path string) error { return fs.c.Remove(fs.Abs(path)) }

func (fs hdfsFS) Rename(oldpath, newpath string) error {
return fs.c.Rename(fs.Abs(oldpath), fs.Abs(newpath))
}

func modtime(fi os.FileInfo, err error) (time.Time, error) {
if err != nil {
Expand Down
61 changes: 48 additions & 13 deletions PublicGitArchive/pga/cmd/get.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package cmd

import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/src-d/datasets/PublicGitArchive/pga/pga"
pb "gopkg.in/cheggaaa/pb.v1"
Expand All @@ -21,6 +25,8 @@ var getCmd = &cobra.Command{

Alternatively, a list of .siva filenames can be passed through standard input.`,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := setupContext()

source := urlFS("http://pga.sourced.tech/")

dest, err := FileSystemFromFlags(cmd.Flags())
Expand Down Expand Up @@ -54,7 +60,7 @@ Alternatively, a list of .siva filenames can be passed through standard input.`,
filenames = append(filenames, filename)
}
} else {
f, err := getIndex()
f, err := getIndex(ctx)
if err != nil {
return fmt.Errorf("could not open index file: %v", err)
}
Expand Down Expand Up @@ -82,39 +88,68 @@ Alternatively, a list of .siva filenames can be passed through standard input.`,
}
}

return downloadFilenames(dest, source, filenames, maxDownloads)
return downloadFilenames(ctx, dest, source, filenames, maxDownloads)
},
}

func downloadFilenames(dest, source FileSystem, filenames []string, maxDownloads int) error {
func downloadFilenames(ctx context.Context, dest, source FileSystem,
filenames []string, maxDownloads int) error {

tokens := make(chan bool, maxDownloads)
for i := 0; i < maxDownloads; i++ {
tokens <- true
}

done := make(chan bool)
done := make(chan error)
for _, filename := range filenames {
filename := filepath.Join("siva", "latest", filename[:2], filename)
go func() {
<-tokens
select {
case <-tokens:
case <-ctx.Done():
done <- fmt.Errorf("canceled")
return
}
defer func() { tokens <- true }()

if err := updateCache(dest, source, filename); err != nil {
err := updateCache(ctx, dest, source, filename)
if err != nil {
fmt.Fprintf(os.Stderr, "could not get %s: %v\n", filename, err)
}
done <- true

done <- err
}()
}

bar := pb.StartNew(len(filenames))
for i := 1; ; i++ {
<-done
bar.Set(i)
bar.Update()
if i == len(filenames) {
return nil
var finalErr error
for i := 1; i <= len(filenames); i++ {
if err := <-done; err != nil {
finalErr = fmt.Errorf("there where failed downloads")
}

if finalErr == nil {
bar.Set(i)
bar.Update()
}
}

return finalErr
}

func setupContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
var term = make(chan os.Signal)
go func() {
select {
case <-term:
logrus.Warningf("signal received, stopping...")
cancel()
}
}()
signal.Notify(term, syscall.SIGTERM, os.Interrupt)

return ctx
}

func init() {
Expand Down
20 changes: 16 additions & 4 deletions PublicGitArchive/pga/cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,45 @@ var listCmd = &cobra.Command{
Use: "list",
Short: "list all the repositories in the index",
Long: `List the repositories in the index, use flags to filter the results.`,
RunE: func(cmd *cobra.Command, args []string) error {
f, err := getIndex()
RunE: func(cmd *cobra.Command, args []string) (err error) {
ctx := setupContext()
f, err := getIndex(ctx)
if err != nil {
return fmt.Errorf("could not open index file: %v", err)
}
defer f.Close()

index, err := pga.IndexFromCSV(f)
if err != nil {
_ = f.Close()
return err
}

filter, err := filterFromFlags(cmd.Flags())
if err != nil {
_ = f.Close()
return err
}

formatter, err := formatterFromFlags(cmd.Flags())
if err != nil {
_ = f.Close()
return err
}

index = pga.WithFilter(index, filter)
for {
select {
case <-ctx.Done():
_ = f.Close()
return fmt.Errorf("command canceled")
default:
}

r, err := index.Next()
if err == io.EOF {
break
} else if err != nil {
_ = f.Close()
return err
}

Expand All @@ -55,7 +66,8 @@ var listCmd = &cobra.Command{
fmt.Print(s)
}
}
return nil

return f.Close()
},
}

Expand Down