Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move walk() to the library, make it behave exactly like filepath's walk. #101

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/hdfs/chmod.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ func chmod(args []string, recursive bool) {
fatal(err)
}

visit := func(p string, fi os.FileInfo) {
err := client.Chmod(p, os.FileMode(mode))
visit := func(p string, fi os.FileInfo, err error) error {
err = client.Chmod(p, os.FileMode(mode))

if err != nil {
fmt.Fprintln(os.Stderr, err)
status = 1
return err
}
return nil
}

for _, p := range expanded {
if recursive {
err = walk(client, p, visit)
err = client.Walk(p, visit)
if err != nil {
fmt.Fprintln(os.Stderr, err)
status = 1
Expand All @@ -43,7 +45,7 @@ func chmod(args []string, recursive bool) {
fatal(err)
}

visit(p, info)
visit(p, info, nil)
}
}
}
10 changes: 6 additions & 4 deletions cmd/hdfs/chown.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,20 @@ func chown(args []string, recursive bool) {
fatal(err)
}

visit := func(p string, fi os.FileInfo) {
err := client.Chown(p, owner, group)
visit := func(p string, fi os.FileInfo, err error) error {
err = client.Chown(p, owner, group)

if err != nil {
fmt.Fprintln(os.Stderr, err)
status = 1
return err
}
return nil
}

for _, p := range expanded {
if recursive {
err = walk(client, p, visit)
err = client.Walk(p, visit)
if err != nil {
fmt.Fprintln(os.Stderr, err)
status = 1
Expand All @@ -52,7 +54,7 @@ func chown(args []string, recursive bool) {
fatal(err)
}

visit(p, info)
visit(p, info, nil)
}
}
}
3 changes: 2 additions & 1 deletion cmd/hdfs/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func get(args []string) {
fatal(err)
}

err = walk(client, source, func(p string, fi os.FileInfo) {
err = client.Walk(source, func(p string, fi os.FileInfo, err error) error {
fullDest := filepath.Join(dest, strings.TrimPrefix(p, source))

if fi.IsDir() {
Expand All @@ -54,6 +54,7 @@ func get(args []string) {
fatal(err)
}
}
return nil
})

if err != nil {
Expand Down
48 changes: 0 additions & 48 deletions cmd/hdfs/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"errors"
"io"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -163,50 +162,3 @@ func expandPaths(client *hdfs.Client, paths []string) ([]string, error) {

return res, nil
}

type walkFunc func(string, os.FileInfo)

func walk(client *hdfs.Client, root string, visit walkFunc) error {
rootInfo, err := client.Stat(root)
if err != nil {
return err
}

visit(root, rootInfo)
if rootInfo.IsDir() {
err = walkDir(client, root, visit)
if err != nil {
return err
}
}

return nil
}

func walkDir(client *hdfs.Client, dir string, visit walkFunc) error {
dirReader, err := client.Open(dir)
if err != nil {
return err
}

var partial []os.FileInfo
for ; err != io.EOF; partial, err = dirReader.Readdir(100) {
if err != nil {
return err
}

for _, child := range partial {
childPath := path.Join(dir, child.Name())
visit(childPath, child)

if child.IsDir() {
err = walkDir(client, childPath, visit)
if err != nil {
return err
}
}
}
}

return nil
}
95 changes: 95 additions & 0 deletions walk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package hdfs

import (
"io"
"os"
"path/filepath"
"sort"
)

// Walk does the exact same thing as filepath.Walk (and is mostly copied from there)
func (client *Client) Walk(root string, visit filepath.WalkFunc) error {
rootInfo, err := client.Stat(root)
if err != nil {
err = visit(root, nil, err)
} else {
err = client.walk(root, rootInfo, visit)
}
if err == filepath.SkipDir {
return nil
}
return err
}

func (client *Client) walk(path string, info os.FileInfo, visit filepath.WalkFunc) error {
err := visit(path, info, nil)
if err != nil {
if info.IsDir() && err == filepath.SkipDir {
return nil
}
return err
}

if !info.IsDir() {
return nil
}

names, err := client.readDirNames(path)
if err != nil {
return visit(path, info, err)
}

for _, name := range names {
filename := filepath.Join(path, name)
fileInfo, err := client.Stat(filename)
if err != nil {
if err := visit(filename, fileInfo, err); err != nil && err != filepath.SkipDir {
return err
}
} else {
err = client.walk(filename, fileInfo, visit)
if err != nil {
if !fileInfo.IsDir() || err != filepath.SkipDir {
return err
}
}
}
}
return nil
}

func (client *Client) readDirNames(dir string) ([]string, error) {
dirReader, err := client.Open(dir)
if err != nil {
return nil, err
}

// read the dir entries by chunks of a few hundreds
names, err := readDirNamesByChunks(dirReader, 500)

if err != nil {
return nil, err
}

sort.Strings(names)
return names, nil
}

func readDirNamesByChunks(dirReader *FileReader, chunkSize int) ([]string, error) {

var toRet []string
var partial []string
var err error

if chunkSize <= 0 {
return dirReader.Readdirnames(chunkSize)
}

for ; err != io.EOF; partial, err = dirReader.Readdirnames(chunkSize) {
if err != nil {
return nil, err
}
toRet = append(toRet, partial...)
}
return toRet, nil
}
90 changes: 90 additions & 0 deletions walk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package hdfs

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
)

func TestReadDirNames(t *testing.T) {
c := getClient(t)
entries, err := c.readDirNames("/_test/")
assert.Nil(t, err, "unexpected error")
assert.False(t, len(entries) == 0, "did not list anything")
assert.Contains(t, entries, "mobydick.txt", "did not find expected file")
assert.Contains(t, entries, "foo.txt", "did not find expected file")
}

func TestReadDirNamesByChunks(t *testing.T) {
c := getClient(t)
dirReader, err := c.Open("/_test/")

assert.Nil(t, err, "unexpected error")

singleRun, err := readDirNamesByChunks(dirReader, -1)
assert.Nil(t, err, "unexpected error")

dirReader, err = c.Open("/_test/")
assert.Nil(t, err, "unexpected error")

byChunks, err := readDirNamesByChunks(dirReader, 1)
assert.Nil(t, err, "unexpected error")

assert.Equal(t, singleRun, byChunks, "discrepancy in listed content")
}

func TestWalk(t *testing.T) {
c := getClient(t)

c.Mkdir("/_test/walk", os.ModePerm)
c.Mkdir("/_test/walk/dir", os.ModePerm)
c.Mkdir("/_test/walk/dir/subdir", os.ModePerm)
c.Create("/_test/walk/walkfile")
c.Create("/_test/walk/dir/walkfile1")
c.Create("/_test/walk/dir/walkfile2")
c.Create("/_test/walk/dir/subdir/walkfile1")
c.Create("/_test/walk/dir/subdir/walkfile2")

paths := make([]string, 0, 8)

err := c.Walk("/_test/walk/", walkFnTest(&paths))
assert.Nil(t, err, "unexpected error")

expected := []string{
"/_test/walk/",
"/_test/walk/dir",
"/_test/walk/dir/subdir",
"/_test/walk/dir/subdir/walkfile1",
"/_test/walk/dir/subdir/walkfile2",
"/_test/walk/dir/walkfile1",
"/_test/walk/dir/walkfile2",
"/_test/walk/walkfile"}

assert.Equal(t, expected, paths, "discrepancy between expected and walked paths.")

}

func TestWalkError(t *testing.T) {
c := getClient(t)
errors := make([]error, 0, 1)
c.Walk("/not_existing", walkErrorFn(&errors))
assert.Equal(t, 1, len(errors), "expected a single error")
}

func walkFnTest(encounteredPaths *[]string) filepath.WalkFunc {
return func(path string, info os.FileInfo, err error) error {
*encounteredPaths = append(*encounteredPaths, path)
return nil
}
}

func walkErrorFn(errors *[]error) filepath.WalkFunc {
return func(path string, info os.FileInfo, err error) error {
if err != nil {
*errors = append(*errors, err)
}
return nil
}
}