Skip to content

Commit

Permalink
[FAB-2204] File LedgerType w/ fsblkstorage
Browse files Browse the repository at this point in the history
The orderer "file" LedgerType re-introduced with the
leveldb based fsblkstorage backing store.

* I removed the previously provided OrdererLedger and
  OrdererLedgerProvider interfaces, instead choosing
  to implement the ordererledger.Factory interface
  and its corresponding ledger interfaces.
* A Close() method was added to the Factory interface.

Change-Id: I43007f3c570633896ef29de32fcd32c3301efcff
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez authored and kchristidis committed Mar 2, 2017
1 parent 28687ca commit 1ce056a
Show file tree
Hide file tree
Showing 13 changed files with 524 additions and 262 deletions.
4 changes: 3 additions & 1 deletion orderer/ledger/blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func testInitialization(lf ledgerTestFactory, t *testing.T) {
if block == nil {
t.Fatalf("Error retrieving genesis block")
}

}

func TestReinitialization(t *testing.T) {
Expand All @@ -84,12 +85,13 @@ func testReinitialization(lf ledgerTestFactory, t *testing.T) {
t.Log("Skipping test as persistence is not available for this ledger type")
return
}
_, oli := lf.New()
olf, oli := lf.New()
aBlock := CreateNextBlock(oli, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})
err := oli.Append(aBlock)
if err != nil {
t.Fatalf("Error appending block: %s", err)
}
olf.Close()

_, li := lf.New()
if li.Height() != 2 {
Expand Down
78 changes: 78 additions & 0 deletions orderer/ledger/file/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fileledger

import (
"sync"

"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
ordererledger "github.com/hyperledger/fabric/orderer/ledger"
)

type fileLedgerFactory struct {
blkstorageProvider blkstorage.BlockStoreProvider
ledgers map[string]ordererledger.ReadWriter
mutex sync.Mutex
}

// GetOrCreate gets an existing ledger (if it exists) or creates it if it does not
func (lf *fileLedgerFactory) GetOrCreate(chainID string) (ordererledger.ReadWriter, error) {
lf.mutex.Lock()
defer lf.mutex.Unlock()

key := chainID
// check cache
ledger, ok := lf.ledgers[key]
if ok {
return ledger, nil
}
// open fresh
blockStore, err := lf.blkstorageProvider.OpenBlockStore(key)
if err != nil {
return nil, err
}
ledger = &fileLedger{blockStore: blockStore, signal: make(chan struct{})}
lf.ledgers[key] = ledger
return ledger, nil
}

// ChainIDs returns the chain IDs the Factory is aware of
func (lf *fileLedgerFactory) ChainIDs() []string {
chainIDs, err := lf.blkstorageProvider.List()
if err != nil {
panic(err)
}
return chainIDs
}

// Close closes the file ledgers served by this factory
func (lf *fileLedgerFactory) Close() {
lf.blkstorageProvider.Close()
}

// New creates a new ledger factory
func New(directory string) ordererledger.Factory {
return &fileLedgerFactory{
blkstorageProvider: fsblkstorage.NewProvider(
fsblkstorage.NewConf(directory, -1),
&blkstorage.IndexConfig{
AttrsToIndex: []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNum}},
),
ledgers: make(map[string]ordererledger.ReadWriter),
}
}
111 changes: 111 additions & 0 deletions orderer/ledger/file/fileledger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fileledger

import (
"github.com/hyperledger/fabric/common/ledger/blkstorage"
ordererledger "github.com/hyperledger/fabric/orderer/ledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"
)

var logger = logging.MustGetLogger("ordererledger/fileledger")
var closedChan chan struct{}

func init() {
closedChan = make(chan struct{})
close(closedChan)
}

type fileLedger struct {
blockStore blkstorage.BlockStore
signal chan struct{}
}

type fileLedgerIterator struct {
ledger *fileLedger
blockNumber uint64
}

// Next blocks until there is a new block available, or returns an error if the
// next block is no longer retrievable
func (i *fileLedgerIterator) Next() (*cb.Block, cb.Status) {
for {
if i.blockNumber < i.ledger.Height() {
block, err := i.ledger.blockStore.RetrieveBlockByNumber(i.blockNumber)
if err != nil {
return nil, cb.Status_SERVICE_UNAVAILABLE
}
i.blockNumber++
return block, cb.Status_SUCCESS
}
<-i.ledger.signal
}
}

// ReadyChan supplies a channel which will block until Next will not block
func (i *fileLedgerIterator) ReadyChan() <-chan struct{} {
signal := i.ledger.signal
if i.blockNumber > i.ledger.Height()-1 {
return signal
}
return closedChan
}

// Iterator returns an Iterator, as specified by a cb.SeekInfo message, and its
// starting block number
func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (ordererledger.Iterator, uint64) {
switch start := startPosition.Type.(type) {
case *ab.SeekPosition_Oldest:
return &fileLedgerIterator{ledger: fl, blockNumber: 0}, 0
case *ab.SeekPosition_Newest:
info, err := fl.blockStore.GetBlockchainInfo()
if err != nil {
panic(err)
}
newestBlockNumber := info.Height - 1
return &fileLedgerIterator{ledger: fl, blockNumber: newestBlockNumber}, newestBlockNumber
case *ab.SeekPosition_Specified:
height := fl.Height()
if start.Specified.Number > height {
return &ordererledger.NotFoundErrorIterator{}, 0
}
return &fileLedgerIterator{ledger: fl, blockNumber: start.Specified.Number}, start.Specified.Number
}
// This line should be unreachable, but the compiler requires it
return &ordererledger.NotFoundErrorIterator{}, 0
}

// Height returns the number of blocks on the ledger
func (fl *fileLedger) Height() uint64 {
info, err := fl.blockStore.GetBlockchainInfo()
if err != nil {
panic(err)
}
return info.Height
}

// Append a new block to the ledger
func (fl *fileLedger) Append(block *cb.Block) error {
err := fl.blockStore.AddBlock(block)
if err == nil {
close(fl.signal)
fl.signal = make(chan struct{})
}
return err
}
Loading

0 comments on commit 1ce056a

Please sign in to comment.