Skip to content

Commit

Permalink
Merge "[FAB-4202] Fix race condition in orderer json ledger."
Browse files Browse the repository at this point in the history
  • Loading branch information
kchristidis authored and Gerrit Code Review committed Jun 1, 2017
2 parents db813a4 + 9433734 commit ebc4d60
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
24 changes: 19 additions & 5 deletions orderer/ledger/json/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"

ledger "github.com/hyperledger/fabric/orderer/ledger"
cb "github.com/hyperledger/fabric/protos/common"
Expand All @@ -32,6 +33,7 @@ import (

var logger = logging.MustGetLogger("orderer/jsonledger")
var closedChan chan struct{}
var fileLock sync.Mutex

func init() {
closedChan = make(chan struct{})
Expand All @@ -58,7 +60,14 @@ type jsonLedger struct {

// readBlock returns the block or nil, and whether the block was found or not, (nil,true) generally indicates an irrecoverable problem
func (jl *jsonLedger) readBlock(number uint64) (*cb.Block, bool) {
file, err := os.Open(jl.blockFilename(number))
name := jl.blockFilename(number)

// In case of ongoing write, reading the block file may result in `unexpected EOF` error.
// Therefore, we use file mutex here to prevent this race condition.
fileLock.Lock()
defer fileLock.Unlock()

file, err := os.Open(name)
if err == nil {
defer file.Close()
block := &cb.Block{}
Expand Down Expand Up @@ -113,10 +122,9 @@ func (jl *jsonLedger) Iterator(startPosition *ab.SeekPosition) (ledger.Iterator,
return &ledger.NotFoundErrorIterator{}, 0
}
return &cursor{jl: jl, blockNumber: start.Specified.Number}, start.Specified.Number
default:
return &ledger.NotFoundErrorIterator{}, 0
}

// This line should be unreachable, but the compiler requires it
return &ledger.NotFoundErrorIterator{}, 0
}

// Height returns the number of blocks on the ledger
Expand Down Expand Up @@ -144,11 +152,17 @@ func (jl *jsonLedger) Append(block *cb.Block) error {

// writeBlock commits a block to disk
func (jl *jsonLedger) writeBlock(block *cb.Block) {
file, err := os.Create(jl.blockFilename(block.Header.Number))
name := jl.blockFilename(block.Header.Number)

fileLock.Lock()
defer fileLock.Unlock()

file, err := os.Create(name)
if err != nil {
panic(err)
}
defer file.Close()

err = jl.marshaler.Marshal(file, block)
logger.Debugf("Wrote block %d", block.Header.Number)
if err != nil {
Expand Down
26 changes: 26 additions & 0 deletions orderer/ledger/json/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,32 @@ func TestRetrieval(t *testing.T) {
}
}

// Without file lock in the implementation, this test is flaky due to
// a race condition of concurrent write and read of block file. With
// the fix, running this test repeatedly should not yield failure.
func TestRaceCondition(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()

it, _ := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})

var block *cb.Block
var status cb.Status

complete := make(chan struct{})
go func() {
block, status = it.Next()
close(complete)
}()

fl.Append(ledger.CreateNextBlock(fl, []*cb.Envelope{{Payload: []byte("My Data")}}))
<-complete

if status != cb.Status_SUCCESS {
t.Fatalf("Expected to successfully read the block")
}
}

func TestBlockedRetrieval(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
Expand Down

0 comments on commit ebc4d60

Please sign in to comment.