Skip to content

Commit

Permalink
create ledger with genesis block
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-2676

This CR introduces a new function in the ledgermgmt package
`CreateWithGenesisBlock(genesisBlock *common.Block)` which
allows the ledger creation and committing of genesis block
atomically.

This can be used for peer joining a channel with the genesis block
with the config tx.

Change-Id: I8f90a2b623aabca27c35448bc6eb6bc13e2dbd90
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Apr 1, 2017
1 parent 51b7e85 commit 4e0f96b
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 10 deletions.
160 changes: 150 additions & 10 deletions core/ledger/kvledger/kv_ledger_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package kvledger

import (
"bytes"
"errors"
"fmt"

"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
Expand All @@ -29,6 +31,9 @@ import (
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/syndtr/goleveldb/leveldb"
)

var (
Expand All @@ -38,6 +43,9 @@ var (
ErrNonExistingLedgerID = errors.New("LedgerID does not exist")
// ErrLedgerNotOpened is thrown by a CloseLedger call if a ledger with the given id has not been opened
ErrLedgerNotOpened = errors.New("Ledger is not opened yet")

underConstructionLedgerKey = []byte("underConstructionLedgerKey")
ledgerKeyPrefix = []byte("l")
)

// Provider implements interface ledger.PeerLedgerProvider
Expand Down Expand Up @@ -90,7 +98,44 @@ func NewProvider() (ledger.PeerLedgerProvider, error) {
historydbProvider = historyleveldb.NewHistoryDBProvider()

logger.Info("ledger provider Initialized")
return &Provider{idStore, blockStoreProvider, vdbProvider, historydbProvider}, nil
provider := &Provider{idStore, blockStoreProvider, vdbProvider, historydbProvider}
provider.recoverUnderConstructionLedger()
return provider, nil
}

// CreateWithGenesisBlock implements the corresponding method from interface ledger.PeerLedgerProvider
// This functions sets a under construction flag before doing any thing related to ledger creation and
// upon a successful ledger creation with the committed genesis block, removes the flag and add entry into
// created ledgers list (atomically). If a crash happens in between, the 'recoverUnderConstructionLedger'
// function is invoked before declaring the provider to be usable
func (provider *Provider) CreateWithGenesisBlock(genesisBlock *common.Block) (ledger.PeerLedger, error) {
ledgerID, err := utils.GetChainIDFromBlock(genesisBlock)
if err != nil {
return nil, err
}
exists, err := provider.idStore.ledgerIDExists(ledgerID)
if err != nil {
return nil, err
}
if exists {
return nil, ErrLedgerIDExists
}
if err = provider.idStore.setUnderConstructionFlag(ledgerID); err != nil {
return nil, err
}
ledger, err := provider.openInternal(ledgerID)
if err != nil {
logger.Errorf("Error in opening a new empty ledger. Unsetting under construction flag. Err: %s", err)
panicOnErr(provider.runCleanup(ledgerID), "Error while running cleanup for ledger id [%s]", ledgerID)
panicOnErr(provider.idStore.unsetUnderConstructionFlag(), "Error while unsetting under construction flag")
return nil, err
}
if err := ledger.Commit(genesisBlock); err != nil {
ledger.Close()
return nil, err
}
panicOnErr(provider.idStore.createLedgerID(ledgerID), "Error while marking ledger as created")
return ledger, nil
}

// Create implements the corresponding method from interface ledger.PeerLedgerProvider
Expand All @@ -102,15 +147,20 @@ func (provider *Provider) Create(ledgerID string) (ledger.PeerLedger, error) {
if exists {
return nil, ErrLedgerIDExists
}
provider.idStore.createLedgerID(ledgerID)
return provider.Open(ledgerID)
ledger, err := provider.openInternal(ledgerID)
if err != nil {
return nil, err
}
if err = provider.idStore.createLedgerID(ledgerID); err != nil {
ledger.Close()
return nil, err
}
return ledger, nil
}

// Open implements the corresponding method from interface ledger.PeerLedgerProvider
func (provider *Provider) Open(ledgerID string) (ledger.PeerLedger, error) {

logger.Debugf("Open() opening kvledger: %s", ledgerID)

// Check the ID store to ensure that the chainId/ledgerId exists
exists, err := provider.idStore.ledgerIDExists(ledgerID)
if err != nil {
Expand All @@ -119,7 +169,10 @@ func (provider *Provider) Open(ledgerID string) (ledger.PeerLedger, error) {
if !exists {
return nil, ErrNonExistingLedgerID
}
return provider.openInternal(ledgerID)
}

func (provider *Provider) openInternal(ledgerID string) (ledger.PeerLedger, error) {
// Get the block store for a chain/ledger
blockStore, err := provider.blockStoreProvider.OpenBlockStore(ledgerID)
if err != nil {
Expand Down Expand Up @@ -165,6 +218,63 @@ func (provider *Provider) Close() {
provider.historydbProvider.Close()
}

// recoverUnderConstructionLedger checks whether the under construction flag is set - this would be the case
// if a crash had happened during creation of ledger and the ledger creation could have been left in intermediate
// state. Recovery checks if the ledger was created and the genesis block was committed successfully then it completes
// the last step of adding the ledger id to the list of created ledgers. Else, it clears the under construction flag
func (provider *Provider) recoverUnderConstructionLedger() {
logger.Debugf("Recovering under construction ledger")
ledgerID, err := provider.idStore.getUnderConstructionFlag()
panicOnErr(err, "Error while checking whether the under construction flag is set")
if ledgerID == "" {
logger.Debugf("No under construction ledger found. Quitting recovery")
return
}
logger.Infof("ledger [%s] found as under construction", ledgerID)
ledger, err := provider.openInternal(ledgerID)
panicOnErr(err, "Error while opening under construction ledger [%s]", ledgerID)
bcInfo, err := ledger.GetBlockchainInfo()
panicOnErr(err, "Error while getting blockchain info for the under construction ledger [%s]", ledgerID)
ledger.Close()

switch bcInfo.Height {
case 0:
logger.Infof("Genesis block was not committed. Hence, the peer ledger not created. unsetting the under construction flag")
panicOnErr(provider.runCleanup(ledgerID), "Error while running cleanup for ledger id [%s]", ledgerID)
panicOnErr(provider.idStore.unsetUnderConstructionFlag(), "Error while unsetting under construction flag")
case 1:
logger.Infof("Genesis block was committed. Hence, marking the peer ledger as created")
panicOnErr(provider.idStore.createLedgerID(ledgerID), "Error while adding ledgerID [%s] to created list", ledgerID)
default:
panic(fmt.Errorf(
"Data inconsistency: under construction flag is set for ledger [%s] while the height of the blockchain is [%d]",
ledgerID, bcInfo.Height))
}
return
}

// runCleanup cleans up blockstorage, statedb, and historydb for what
// may have got created during in-complete ledger creation
func (provider *Provider) runCleanup(ledgerID string) error {
// TODO - though, not having this is harmless for kv ledger.
// If we want, following could be done:
// - blockstorage could remove empty folders
// - couchdb backed statedb could delete the database if got created
// - leveldb backed statedb and history db need not perform anything as it uses a single db shared across ledgers
return nil
}

func panicOnErr(err error, mgsFormat string, args ...interface{}) {
if err == nil {
return
}
args = append(args, err)
panic(fmt.Sprintf(mgsFormat+" Err:%s ", args...))
}

//////////////////////////////////////////////////////////////////////
// Ledger id persistence related code
///////////////////////////////////////////////////////////////////////
type idStore struct {
db *leveldbhelper.DB
}
Expand All @@ -175,8 +285,24 @@ func openIDStore(path string) *idStore {
return &idStore{db}
}

func (s *idStore) setUnderConstructionFlag(ledgerID string) error {
return s.db.Put(underConstructionLedgerKey, []byte(ledgerID), true)
}

func (s *idStore) unsetUnderConstructionFlag() error {
return s.db.Delete(underConstructionLedgerKey, true)
}

func (s *idStore) getUnderConstructionFlag() (string, error) {
val, err := s.db.Get(underConstructionLedgerKey)
if err != nil {
return "", err
}
return string(val), nil
}

func (s *idStore) createLedgerID(ledgerID string) error {
key := []byte(ledgerID)
key := s.encodeLedgerKey(ledgerID)
val := []byte{}
err := error(nil)
if val, err = s.db.Get(key); err != nil {
Expand All @@ -185,11 +311,14 @@ func (s *idStore) createLedgerID(ledgerID string) error {
if val != nil {
return ErrLedgerIDExists
}
return s.db.Put(key, val, true)
batch := &leveldb.Batch{}
batch.Put(key, val)
batch.Delete(underConstructionLedgerKey)
return s.db.WriteBatch(batch, true)
}

func (s *idStore) ledgerIDExists(ledgerID string) (bool, error) {
key := []byte(ledgerID)
key := s.encodeLedgerKey(ledgerID)
val := []byte{}
err := error(nil)
if val, err = s.db.Get(key); err != nil {
Expand All @@ -203,8 +332,11 @@ func (s *idStore) getAllLedgerIds() ([]string, error) {
itr := s.db.GetIterator(nil, nil)
itr.First()
for itr.Valid() {
key := string(itr.Key())
ids = append(ids, key)
if bytes.Equal(itr.Key(), underConstructionLedgerKey) {
continue
}
id := string(s.decodeLedgerID(itr.Key()))
ids = append(ids, id)
itr.Next()
}
return ids, nil
Expand All @@ -213,3 +345,11 @@ func (s *idStore) getAllLedgerIds() ([]string, error) {
func (s *idStore) close() {
s.db.Close()
}

func (s *idStore) encodeLedgerKey(ledgerID string) []byte {
return append(ledgerKeyPrefix, []byte(ledgerID)...)
}

func (s *idStore) decodeLedgerID(key []byte) string {
return string(key[len(ledgerKeyPrefix):])
}
42 changes: 42 additions & 0 deletions core/ledger/kvledger/kv_ledger_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"testing"

configtxtest "github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger"
)
Expand Down Expand Up @@ -56,6 +57,47 @@ func TestLedgerProvider(t *testing.T) {
testutil.AssertEquals(t, err, ErrNonExistingLedgerID)
}

func TestLedgerCreationWithGenesisBlock(t *testing.T) {
env := newTestEnv(t)
defer env.cleanup()
numLedgers := 10
provider, _ := NewProvider()
existingLedgerIDs, err := provider.List()
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, len(existingLedgerIDs), 0)
for i := 0; i < numLedgers; i++ {
genesisBlock, _ := configtxtest.MakeGenesisBlock(constructTestLedgerID(i))
provider.CreateWithGenesisBlock(genesisBlock)
}
existingLedgerIDs, err = provider.List()
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, len(existingLedgerIDs), numLedgers)

provider.Close()

provider, _ = NewProvider()
defer provider.Close()
ledgerIds, _ := provider.List()
testutil.AssertEquals(t, len(ledgerIds), numLedgers)
t.Logf("ledgerIDs=%#v", ledgerIds)
for i := 0; i < numLedgers; i++ {
testutil.AssertEquals(t, ledgerIds[i], constructTestLedgerID(i))
}
for i := 0; i < numLedgers; i++ {
ledger, err := provider.Open(constructTestLedgerID(i))
testutil.AssertNoError(t, err, "")
bcInfo, err := ledger.GetBlockchainInfo()
ledger.Close()
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, bcInfo.Height, uint64(1))
}
_, err = provider.Create(constructTestLedgerID(2))
testutil.AssertEquals(t, err, ErrLedgerIDExists)

_, err = provider.Open(constructTestLedgerID(numLedgers))
testutil.AssertEquals(t, err, ErrNonExistingLedgerID)
}

func TestMultipleLedgerBasicRW(t *testing.T) {
env := newTestEnv(t)
defer env.cleanup()
Expand Down
4 changes: 4 additions & 0 deletions core/ledger/ledger_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
type PeerLedgerProvider interface {
// Create creates a new ledger with a given unique id
Create(ledgerID string) (PeerLedger, error)
// CreateWithGenesisBlock creates a new ledger with the given genesis block.
// This function guarentees that the creation of ledger and committing the genesis block would an atomic action
// The chain id retrieved from the genesis block is treated as a ledger id
CreateWithGenesisBlock(genesisBlock *common.Block) (PeerLedger, error)
// Open opens an already created ledger
Open(ledgerID string) (PeerLedger, error)
// Exists tells whether the ledger with given id exists
Expand Down
27 changes: 27 additions & 0 deletions core/ledger/ledgermgmt/ledger_mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
logging "github.com/op/go-logging"
)

Expand Down Expand Up @@ -80,6 +82,31 @@ func CreateLedger(id string) (ledger.PeerLedger, error) {
return l, nil
}

// CreateWithGenesisBlock creates a new ledger with the given genesis block.
// This function guarentees that the creation of ledger and committing the genesis block would an atomic action
// The chain id retrieved from the genesis block is treated as a ledger id
func CreateWithGenesisBlock(genesisBlock *common.Block) (ledger.PeerLedger, error) {
lock.Lock()
defer lock.Unlock()
if !initialized {
return nil, ErrLedgerMgmtNotInitialized
}
id, err := utils.GetChainIDFromBlock(genesisBlock)
if err != nil {
return nil, err
}

logger.Infof("Creating ledger [%s] with genesis block", id)
l, err := ledgerProvider.CreateWithGenesisBlock(genesisBlock)
if err != nil {
return nil, err
}
l = wrapLedger(id, l)
openedLedgers[id] = l
logger.Infof("Created ledger [%s] with genesis block", id)
return l, nil
}

// OpenLedger returns a ledger for the given id
func OpenLedger(id string) (ledger.PeerLedger, error) {
logger.Infof("Opening ledger with id = %s", id)
Expand Down

0 comments on commit 4e0f96b

Please sign in to comment.