Skip to content

Commit

Permalink
Return a specific error if a file is still incomplete after FileWrite…
Browse files Browse the repository at this point in the history
…r.Close

Previously, we would simply ignore the result field on CompleteResponse, which
lead to a silently hanging lease on the file. A hanging lease isn't that bad -
it expires on its own - but it was causing the sneaky
AlreadyBeingCreatedExceptions we were seeing in append tests.

Exposing the error (and adding a function IsErrReplicating to unwrap the
PathError, for convenience) allows the client to either ignore it, and let the
lease expire on its own, or to retry in a loop, as the java client does, until
the file is complete and the lease is destroyed.

This has the potential to be a breaking change, since we would silently swallow
the error before.
  • Loading branch information
colinmarc committed Feb 9, 2022
1 parent aea93fd commit 50243c4
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 92 deletions.
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestCopyToRemote(t *testing.T) {

baleet(t, "/_test/copytoremote.txt")
err := client.CopyToRemote("testdata/foo.txt", "/_test/copytoremote.txt")
require.NoError(t, err)
ignoreErrReplicating(t, err)

bytes, err := client.ReadFile("/_test/copytoremote.txt")
require.NoError(t, err)
Expand Down
34 changes: 20 additions & 14 deletions file_writer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package hdfs

import (
"io"
"errors"
"os"
"time"

Expand All @@ -10,6 +10,15 @@ import (
"google.golang.org/protobuf/proto"
)

var ErrReplicating = errors.New("replication in progress")

// IsErrReplicating returns true if the passed error is an os.PathError wrapping
// ErrReplicating.
func IsErrReplicating(err error) bool {
pe, ok := err.(*os.PathError)
return ok && pe.Err == ErrReplicating
}

// A FileWriter represents a writer for an open file in HDFS. It implements
// Writer and Closer, and can only be used for writes. For reads, see
// FileReader and Client.Open.
Expand All @@ -21,7 +30,6 @@ type FileWriter struct {

blockWriter *transfer.BlockWriter
deadline time.Time
closed bool
}

// Create opens a new file in HDFS with the default replication, block size,
Expand Down Expand Up @@ -168,10 +176,6 @@ func (f *FileWriter) SetDeadline(t time.Time) error {
// of this, it is important that Close is called after all data has been
// written.
func (f *FileWriter) Write(b []byte) (int, error) {
if f.closed {
return 0, io.ErrClosedPipe
}

if f.blockWriter == nil {
err := f.startNewBlock()
if err != nil {
Expand Down Expand Up @@ -199,10 +203,6 @@ func (f *FileWriter) Write(b []byte) (int, error) {
// a call to Flush, it is still necessary to call Close once all data has been
// written.
func (f *FileWriter) Flush() error {
if f.closed {
return io.ErrClosedPipe
}

if f.blockWriter != nil {
return f.blockWriter.Flush()
}
Expand All @@ -213,11 +213,15 @@ func (f *FileWriter) Flush() error {
// Close closes the file, writing any remaining data out to disk and waiting
// for acknowledgements from the datanodes. It is important that Close is called
// after all data has been written.
//
// If the datanodes have acknowledged all writes but not yet to the namenode,
// it can return ErrReplicating (wrapped in an os.PathError). This indicates
// that all data has been written, but the lease is still open for the file.
// It is safe in this case to either ignore the error (and let the lease expire
// on its own) or to call Close multiple times until it completes without an
// error. The Java client, for context, always chooses to retry, with
// exponential backoff.
func (f *FileWriter) Close() error {
if f.closed {
return io.ErrClosedPipe
}

var lastBlock *hdfs.ExtendedBlockProto
if f.blockWriter != nil {
lastBlock = f.blockWriter.Block.GetB()
Expand All @@ -239,6 +243,8 @@ func (f *FileWriter) Close() error {
err := f.client.namenode.Execute("complete", completeReq, completeResp)
if err != nil {
return &os.PathError{"create", f.name, err}
} else if completeResp.GetResult() == false {
return &os.PathError{"create", f.name, ErrReplicating}
}

return nil
Expand Down
120 changes: 43 additions & 77 deletions file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,27 @@ import (
"github.com/stretchr/testify/require"
)

const abcException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException"

func appendIgnoreABC(t *testing.T, client *Client, path string) (*FileWriter, error) {
// This represents a bug in the HDFS append implementation, as far as I can
// tell. Try a few times again, then skip the test.
retries := 0
for {
fw, err := client.Append(path)

if pathErr, ok := err.(*os.PathError); ok {
if nnErr, ok := pathErr.Err.(Error); ok && nnErr.Exception() == abcException {
t.Log("Ignoring AlreadyBeingCreatedException from append")

if retries < 3 {
retries += 1
continue
} else {
t.Skip("skipping Append test because of repeated AlreadyBeingCreatedException")
return fw, nil
}
}
func assertClose(t *testing.T, w *FileWriter) {
var err error
for i := 0; i < 5; i++ {
err := w.Close()
if IsErrReplicating(err) {
time.Sleep(200 * time.Millisecond)
continue
} else {
break
}
}

return fw, err
assert.NoError(t, err)
}

func ignoreErrReplicating(t *testing.T, err error) {
if IsErrReplicating(err) {
return
}

require.NoError(t, err)
}

func TestFileWrite(t *testing.T) {
Expand All @@ -56,9 +52,7 @@ func TestFileWrite(t *testing.T) {
n, err = writer.Write([]byte("bar"))
require.NoError(t, err)
assert.Equal(t, 3, n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/create/1.txt")
require.NoError(t, err)
Expand Down Expand Up @@ -89,9 +83,7 @@ func TestFileWriteLeaseRenewal(t *testing.T) {
n, err = writer.Write([]byte("bar"))
require.NoError(t, err)
assert.Equal(t, 3, n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/create/1.txt")
require.NoError(t, err)
Expand All @@ -114,9 +106,7 @@ func TestFileBigWrite(t *testing.T) {
n, err := io.Copy(writer, mobydick)
require.NoError(t, err)
assert.EqualValues(t, 1257276, n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/create/2.txt")
require.NoError(t, err)
Expand All @@ -141,9 +131,7 @@ func TestFileBigWriteMultipleBlocks(t *testing.T) {
n, err := io.Copy(writer, mobydick)
require.NoError(t, err)
assert.EqualValues(t, 1257276, n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/create/3.txt")
require.NoError(t, err)
Expand All @@ -168,9 +156,7 @@ func TestFileBigWriteWeirdBlockSize(t *testing.T) {
n, err := io.Copy(writer, mobydick)
require.NoError(t, err)
assert.EqualValues(t, 1257276, n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/create/4.txt")
require.NoError(t, err)
Expand All @@ -195,9 +181,7 @@ func TestFileBigWriteReplication(t *testing.T) {
n, err := io.Copy(writer, mobydick)
require.NoError(t, err)
assert.EqualValues(t, 1257276, n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/create/5.txt")
require.NoError(t, err)
Expand Down Expand Up @@ -241,9 +225,7 @@ func TestFileWriteSmallFlushes(t *testing.T) {
n, err = writer.Write([]byte(s))
require.NoError(t, err, "final write of %d bytes", len(s))
assert.Equal(t, len(s), n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/create/6.txt")
require.NoError(t, err)
Expand All @@ -260,7 +242,7 @@ func TestCreateEmptyFile(t *testing.T) {
baleet(t, "/_test/emptyfile")

err := client.CreateEmptyFile("/_test/emptyfile")
require.NoError(t, err)
ignoreErrReplicating(t, err)

fi, err := client.Stat("/_test/emptyfile")
require.NoError(t, err)
Expand All @@ -281,7 +263,7 @@ func TestCreateFileAlreadyExistsException(t *testing.T) {

f, err := client.CreateFile(filePath, 1, 1048576, 0755)
require.NoError(t, err)
require.NoError(t, f.Close())
assertClose(t, f)

_, err = client.CreateFile(filePath, 1, 1048576, 0755)
assertPathError(t, err, "create", filePath, os.ErrExist) // org.apache.hadoop.fs.FileAlreadyExistsException is received from HDFS
Expand All @@ -298,7 +280,7 @@ func TestCreateFileAlreadyBeingCreatedException(t *testing.T) {
f, err := client.CreateFile(filePath, 1, 1048576, 0755)
require.NoError(t, err)
defer func() {
require.NoError(t, f.Close())
assertClose(t, f)
}()

_, err = client.CreateFile(filePath, 1, 1048576, 0755)
Expand Down Expand Up @@ -340,11 +322,9 @@ func TestFileAppend(t *testing.T) {
n, err := writer.Write([]byte("foobar\n"))
require.NoError(t, err)
assert.Equal(t, 7, n)
assertClose(t, writer)

err = writer.Close()
require.NoError(t, err)

writer, err = appendIgnoreABC(t, client, "/_test/append/1.txt")
writer, err = client.Append("/_test/append/1.txt")
require.NoError(t, err)

n, err = writer.Write([]byte("foo"))
Expand All @@ -354,9 +334,7 @@ func TestFileAppend(t *testing.T) {
n, err = writer.Write([]byte("baz"))
require.NoError(t, err)
assert.Equal(t, 3, n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/append/1.txt")
require.NoError(t, err)
Expand All @@ -371,9 +349,9 @@ func TestFileAppendEmptyFile(t *testing.T) {

mkdirp(t, "/_test/append")
err := client.CreateEmptyFile("/_test/append/2.txt")
require.NoError(t, err)
ignoreErrReplicating(t, err)

writer, err := appendIgnoreABC(t, client, "/_test/append/2.txt")
writer, err := client.Append("/_test/append/2.txt")
require.NoError(t, err)

n, err := writer.Write([]byte("foo"))
Expand All @@ -383,9 +361,7 @@ func TestFileAppendEmptyFile(t *testing.T) {
n, err = writer.Write([]byte("bar"))
require.NoError(t, err)
assert.Equal(t, 3, n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/append/2.txt")
require.NoError(t, err)
Expand All @@ -409,19 +385,15 @@ func TestFileAppendLastBlockFull(t *testing.T) {
wn, err := io.CopyN(writer, mobydick, 1048576)
require.NoError(t, err)
assert.EqualValues(t, 1048576, wn)
assertClose(t, writer)

err = writer.Close()
require.NoError(t, err)

writer, err = appendIgnoreABC(t, client, "/_test/append/3.txt")
writer, err = client.Append("/_test/append/3.txt")
require.NoError(t, err)

n, err := writer.Write([]byte("\nfoo"))
require.NoError(t, err)
assert.Equal(t, 4, n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/append/3.txt")
require.NoError(t, err)
Expand Down Expand Up @@ -449,13 +421,11 @@ func TestFileAppendRepeatedly(t *testing.T) {
n, err := writer.Write([]byte("foo"))
require.NoError(t, err)
assert.Equal(t, 3, n)

err = writer.Close()
require.NoError(t, err)
assertClose(t, writer)

expected := "foo"
for i := 0; i < 20; i++ {
writer, err = appendIgnoreABC(t, client, "/_test/append/4.txt")
writer, err = client.Append("/_test/append/4.txt")
require.NoError(t, err)

s := strings.Repeat("b", rand.Intn(1024)) + "\n"
Expand Down Expand Up @@ -522,11 +492,9 @@ func TestFileAppendDeadline(t *testing.T) {
n, err := writer.Write([]byte("foobar\n"))
require.NoError(t, err)
assert.Equal(t, 7, n)
assertClose(t, writer)

err = writer.Close()
require.NoError(t, err)

writer, err = appendIgnoreABC(t, client, "/_test/append/5.txt")
writer, err = client.Append("/_test/append/5.txt")
require.NoError(t, err)

writer.SetDeadline(time.Now().Add(100 * time.Millisecond))
Expand Down Expand Up @@ -554,11 +522,9 @@ func TestFileAppendDeadlineBefore(t *testing.T) {
n, err := writer.Write([]byte("foobar\n"))
require.NoError(t, err)
assert.Equal(t, 7, n)
assertClose(t, writer)

err = writer.Close()
require.NoError(t, err)

writer, err = appendIgnoreABC(t, client, "/_test/append/6.txt")
writer, err = client.Append("/_test/append/6.txt")
require.NoError(t, err)

writer.SetDeadline(time.Now())
Expand Down

0 comments on commit 50243c4

Please sign in to comment.