Skip to content

Commit

Permalink
support for truncate on hdfs
Browse files Browse the repository at this point in the history
  • Loading branch information
junjieqian committed Mar 29, 2017
1 parent d961456 commit 48afdbc
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 0 deletions.
8 changes: 8 additions & 0 deletions cmd/hdfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Valid commands:
getmerge SOURCE DEST
put SOURCE DEST
df [-h]
truncate -s SIZE FILE
`, os.Args[0])

lsOpts = getopt.New()
Expand Down Expand Up @@ -74,6 +75,9 @@ Valid commands:
dfOpts = getopt.New()
dfh = dfOpts.Bool('h')

truncateOpts = getopt.New()
truncateSize = truncateOpts.Int64('s', -1)

cachedClient *hdfs.Client
status = 0
)
Expand All @@ -89,6 +93,7 @@ func init() {
duOpts.SetUsage(printHelp)
getmergeOpts.SetUsage(printHelp)
dfOpts.SetUsage(printHelp)
truncateOpts.SetUsage(printHelp)
}

func main() {
Expand Down Expand Up @@ -142,6 +147,9 @@ func main() {
case "df":
dfOpts.Parse(argv)
df(*dfh)
case "truncate":
truncateOpts.Parse(argv)
truncate(truncateOpts.Args()[0], uint64(*truncateSize))
// it's a seeeeecret command
case "complete":
complete(argv)
Expand Down
31 changes: 31 additions & 0 deletions cmd/hdfs/test/truncate.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env bats

load helper

setup() {
$HDFS mkdir -p /_test_cmd/truncate
$HDFS touch /_test_cmd/truncate/a
}

@test "truncate larger" {
skip "Not support until Hadoop-2.7.2"
run $HDFS truncate -s 10 /_test_cmd/a
assert_failure
}

@test "truncate nonexistent" {
skip "Not support until Hadoop-2.7.2"
run $HDSF truncate -s 10 /_test_cmd/nonexistent
assert_failure
}

@test "truncate" {
skip "Not support until Hadoop-2.7.2"
run $HDFS put $ROOT_TEST_DIR/test/foo.txt /_test_cmd/truncate/1
run $HDFS truncate -s 2 /_test_cmd/truncate/1
assert_success
}

teardown() {
$HDFS rm -r /_test_cmd/truncate
}
13 changes: 13 additions & 0 deletions cmd/hdfs/truncate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package main

func truncate(name string, newLength uint64) {
client, err := getClient("")
if err != nil {
fatal(err)
}

err = client.Truncate(name, newLength)
if err != nil {
fatal(err)
}
}
8 changes: 8 additions & 0 deletions file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,11 @@ func (f *FileWriter) startNewBlock() error {
f.blockWriter = rpc.NewBlockWriter(f.block, f.client.namenode, f.blockSize)
return nil
}

func (f *FileWriter) Truncate(newLength ...uint64) error {
newsize := uint64(0)
if len(newLength) > 0 {
newsize = newLength[0]
}
return f.client.Truncate(f.name, newsize)
}
23 changes: 23 additions & 0 deletions file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -345,3 +346,25 @@ func TestFileAppendRepeatedly(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, expected, string(bytes))
}

func TestFileTruncate(t *testing.T) {
t.Skip("Truncate not support in Hadoop-2.6")
client := getClient(t)

baleet(t, "/_test/create/5.txt")
mkdirp(t, "/_test/create")
writer, err := client.Create("/_test/create/5.txt")
require.NoError(t, err)

n, err := writer.Write([]byte("foobar"))
require.NoError(t, err)
assert.Equal(t, 6, n)

writer.Close()
err = writer.Truncate(3)
require.NoError(t, err)
stat, err := client.Stat("/_test/create/5.txt")
require.NoError(t, err)
time.Sleep(time.Second)
assert.EqualValues(t, 3, stat.Size())
}
37 changes: 37 additions & 0 deletions truncate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package hdfs

import (
"errors"
"os"

hdfs "github.com/colinmarc/hdfs/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/rpc"
"github.com/golang/protobuf/proto"
)

func (c *Client) Truncate(name string, newLength uint64) error {
req := &hdfs.TruncateRequestProto {
Src: proto.String(name),
NewLength: proto.Uint64(newLength),
ClientName: proto.String(c.namenode.ClientName()),
}
resp := &hdfs.TruncateResponseProto{}

err := c.namenode.Execute("truncate", req, resp)
if err != nil {
if nnErr, ok := err.(*rpc.NamenodeError); ok {
err = interpretException(nnErr.Exception, err)
}

return &os.PathError{"truncate", name, err}
} else if resp.Result == nil {
return &os.PathError{
"truncate",
name,
errors.New("Unexpected empty response to 'truncate' rpc call"),
}
}

return nil
}

40 changes: 40 additions & 0 deletions truncate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package hdfs

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTruncate(t *testing.T) {
t.Skip("Truncate not support in Hadoop-2.6")
client := getClient(t)

baleet(t, "/_test/truncate/1.txt")
mkdirp(t, "/_test/truncate")
writer, _ := client.Create("/_test/truncate/1.txt")
_, _ = writer.Write([]byte("foobar\nfoobar\n"))
_ = writer.Close()

err := client.Truncate("/_test/truncate/1.txt", 4)
require.NoError(t, err)

stat, err := client.Stat("/_test/truncate/1.txt")
require.NoError(t, err)
assert.EqualValues(t, 4, stat.Size())

err = client.Truncate("/_test/truncate/1.txt", 10)
assert.NotNil(t, err)
}

func TestTruncateNoExistent(t *testing.T) {
if os.Getenv("HADOOP_DISTRO") == "cdh" {
t.Skip("Truncate not support in Hadoop-2.6")
}
client := getClient(t)

err := client.Truncate("/_test/nonexistent", 100)
assertPathError(t, err, "truncate", "/_test/nonexistent", os.ErrNotExist)
}

0 comments on commit 48afdbc

Please sign in to comment.