diff --git a/cmd/hdfs/chmod.go b/cmd/hdfs/chmod.go index 1bc25671..c55c77ed 100644 --- a/cmd/hdfs/chmod.go +++ b/cmd/hdfs/chmod.go @@ -21,18 +21,20 @@ func chmod(args []string, recursive bool) { fatal(err) } - visit := func(p string, fi os.FileInfo) { - err := client.Chmod(p, os.FileMode(mode)) + visit := func(p string, fi os.FileInfo, err error) error { + err = client.Chmod(p, os.FileMode(mode)) if err != nil { fmt.Fprintln(os.Stderr, err) status = 1 + return err } + return nil } for _, p := range expanded { if recursive { - err = walk(client, p, visit) + err = client.Walk(p, visit) if err != nil { fmt.Fprintln(os.Stderr, err) status = 1 @@ -43,7 +45,7 @@ func chmod(args []string, recursive bool) { fatal(err) } - visit(p, info) + visit(p, info, nil) } } } diff --git a/cmd/hdfs/chown.go b/cmd/hdfs/chown.go index 6afc9214..99adf9e4 100644 --- a/cmd/hdfs/chown.go +++ b/cmd/hdfs/chown.go @@ -30,18 +30,20 @@ func chown(args []string, recursive bool) { fatal(err) } - visit := func(p string, fi os.FileInfo) { - err := client.Chown(p, owner, group) + visit := func(p string, fi os.FileInfo, err error) error { + err = client.Chown(p, owner, group) if err != nil { fmt.Fprintln(os.Stderr, err) status = 1 + return err } + return nil } for _, p := range expanded { if recursive { - err = walk(client, p, visit) + err = client.Walk(p, visit) if err != nil { fmt.Fprintln(os.Stderr, err) status = 1 @@ -52,7 +54,7 @@ func chown(args []string, recursive bool) { fatal(err) } - visit(p, info) + visit(p, info, nil) } } } diff --git a/cmd/hdfs/get.go b/cmd/hdfs/get.go index 64079b8a..59a682d8 100644 --- a/cmd/hdfs/get.go +++ b/cmd/hdfs/get.go @@ -38,7 +38,7 @@ func get(args []string) { fatal(err) } - err = walk(client, source, func(p string, fi os.FileInfo) { + err = client.Walk(source, func(p string, fi os.FileInfo, err error) error { fullDest := filepath.Join(dest, strings.TrimPrefix(p, source)) if fi.IsDir() { @@ -54,6 +54,7 @@ func get(args []string) { fatal(err) } } + return nil }) if err != nil { diff --git a/cmd/hdfs/paths.go b/cmd/hdfs/paths.go index 964eccf1..5a918c46 100644 --- a/cmd/hdfs/paths.go +++ b/cmd/hdfs/paths.go @@ -2,7 +2,6 @@ package main import ( "errors" - "io" "net/url" "os" "path" @@ -163,50 +162,3 @@ func expandPaths(client *hdfs.Client, paths []string) ([]string, error) { return res, nil } - -type walkFunc func(string, os.FileInfo) - -func walk(client *hdfs.Client, root string, visit walkFunc) error { - rootInfo, err := client.Stat(root) - if err != nil { - return err - } - - visit(root, rootInfo) - if rootInfo.IsDir() { - err = walkDir(client, root, visit) - if err != nil { - return err - } - } - - return nil -} - -func walkDir(client *hdfs.Client, dir string, visit walkFunc) error { - dirReader, err := client.Open(dir) - if err != nil { - return err - } - - var partial []os.FileInfo - for ; err != io.EOF; partial, err = dirReader.Readdir(100) { - if err != nil { - return err - } - - for _, child := range partial { - childPath := path.Join(dir, child.Name()) - visit(childPath, child) - - if child.IsDir() { - err = walkDir(client, childPath, visit) - if err != nil { - return err - } - } - } - } - - return nil -} diff --git a/walk.go b/walk.go new file mode 100644 index 00000000..76c8a396 --- /dev/null +++ b/walk.go @@ -0,0 +1,95 @@ +package hdfs + +import ( + "io" + "os" + "path/filepath" + "sort" +) + +// Walk does the exact same thing as filepath.Walk (and is mostly copied from there) +func (client *Client) Walk(root string, visit filepath.WalkFunc) error { + rootInfo, err := client.Stat(root) + if err != nil { + err = visit(root, nil, err) + } else { + err = client.walk(root, rootInfo, visit) + } + if err == filepath.SkipDir { + return nil + } + return err +} + +func (client *Client) walk(path string, info os.FileInfo, visit filepath.WalkFunc) error { + err := visit(path, info, nil) + if err != nil { + if info.IsDir() && err == filepath.SkipDir { + return nil + } + return err + } + + if !info.IsDir() { + return nil + } + + names, err := client.readDirNames(path) + if err != nil { + return visit(path, info, err) + } + + for _, name := range names { + filename := filepath.Join(path, name) + fileInfo, err := client.Stat(filename) + if err != nil { + if err := visit(filename, fileInfo, err); err != nil && err != filepath.SkipDir { + return err + } + } else { + err = client.walk(filename, fileInfo, visit) + if err != nil { + if !fileInfo.IsDir() || err != filepath.SkipDir { + return err + } + } + } + } + return nil +} + +func (client *Client) readDirNames(dir string) ([]string, error) { + dirReader, err := client.Open(dir) + if err != nil { + return nil, err + } + + // read the dir entries by chunks of a few hundreds + names, err := readDirNamesByChunks(dirReader, 500) + + if err != nil { + return nil, err + } + + sort.Strings(names) + return names, nil +} + +func readDirNamesByChunks(dirReader *FileReader, chunkSize int) ([]string, error) { + + var toRet []string + var partial []string + var err error + + if chunkSize <= 0 { + return dirReader.Readdirnames(chunkSize) + } + + for ; err != io.EOF; partial, err = dirReader.Readdirnames(chunkSize) { + if err != nil { + return nil, err + } + toRet = append(toRet, partial...) + } + return toRet, nil +} diff --git a/walk_test.go b/walk_test.go new file mode 100644 index 00000000..97f6aaf6 --- /dev/null +++ b/walk_test.go @@ -0,0 +1,90 @@ +package hdfs + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReadDirNames(t *testing.T) { + c := getClient(t) + entries, err := c.readDirNames("/_test/") + assert.Nil(t, err, "unexpected error") + assert.False(t, len(entries) == 0, "did not list anything") + assert.Contains(t, entries, "mobydick.txt", "did not find expected file") + assert.Contains(t, entries, "foo.txt", "did not find expected file") +} + +func TestReadDirNamesByChunks(t *testing.T) { + c := getClient(t) + dirReader, err := c.Open("/_test/") + + assert.Nil(t, err, "unexpected error") + + singleRun, err := readDirNamesByChunks(dirReader, -1) + assert.Nil(t, err, "unexpected error") + + dirReader, err = c.Open("/_test/") + assert.Nil(t, err, "unexpected error") + + byChunks, err := readDirNamesByChunks(dirReader, 1) + assert.Nil(t, err, "unexpected error") + + assert.Equal(t, singleRun, byChunks, "discrepancy in listed content") +} + +func TestWalk(t *testing.T) { + c := getClient(t) + + c.Mkdir("/_test/walk", os.ModePerm) + c.Mkdir("/_test/walk/dir", os.ModePerm) + c.Mkdir("/_test/walk/dir/subdir", os.ModePerm) + c.Create("/_test/walk/walkfile") + c.Create("/_test/walk/dir/walkfile1") + c.Create("/_test/walk/dir/walkfile2") + c.Create("/_test/walk/dir/subdir/walkfile1") + c.Create("/_test/walk/dir/subdir/walkfile2") + + paths := make([]string, 0, 8) + + err := c.Walk("/_test/walk/", walkFnTest(&paths)) + assert.Nil(t, err, "unexpected error") + + expected := []string{ + "/_test/walk/", + "/_test/walk/dir", + "/_test/walk/dir/subdir", + "/_test/walk/dir/subdir/walkfile1", + "/_test/walk/dir/subdir/walkfile2", + "/_test/walk/dir/walkfile1", + "/_test/walk/dir/walkfile2", + "/_test/walk/walkfile"} + + assert.Equal(t, expected, paths, "discrepancy between expected and walked paths.") + +} + +func TestWalkError(t *testing.T) { + c := getClient(t) + errors := make([]error, 0, 1) + c.Walk("/not_existing", walkErrorFn(&errors)) + assert.Equal(t, 1, len(errors), "expected a single error") +} + +func walkFnTest(encounteredPaths *[]string) filepath.WalkFunc { + return func(path string, info os.FileInfo, err error) error { + *encounteredPaths = append(*encounteredPaths, path) + return nil + } +} + +func walkErrorFn(errors *[]error) filepath.WalkFunc { + return func(path string, info os.FileInfo, err error) error { + if err != nil { + *errors = append(*errors, err) + } + return nil + } +}