diff --git a/core/ledger/kvledger/kv_ledger_provider.go b/core/ledger/kvledger/kv_ledger_provider.go index 10cf8e67bff..6c6591d883c 100644 --- a/core/ledger/kvledger/kv_ledger_provider.go +++ b/core/ledger/kvledger/kv_ledger_provider.go @@ -17,7 +17,9 @@ limitations under the License. package kvledger import ( + "bytes" "errors" + "fmt" "github.com/hyperledger/fabric/common/ledger/blkstorage" "github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage" @@ -29,6 +31,9 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" + "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/utils" + "github.com/syndtr/goleveldb/leveldb" ) var ( @@ -38,6 +43,9 @@ var ( ErrNonExistingLedgerID = errors.New("LedgerID does not exist") // ErrLedgerNotOpened is thrown by a CloseLedger call if a ledger with the given id has not been opened ErrLedgerNotOpened = errors.New("Ledger is not opened yet") + + underConstructionLedgerKey = []byte("underConstructionLedgerKey") + ledgerKeyPrefix = []byte("l") ) // Provider implements interface ledger.PeerLedgerProvider @@ -90,7 +98,44 @@ func NewProvider() (ledger.PeerLedgerProvider, error) { historydbProvider = historyleveldb.NewHistoryDBProvider() logger.Info("ledger provider Initialized") - return &Provider{idStore, blockStoreProvider, vdbProvider, historydbProvider}, nil + provider := &Provider{idStore, blockStoreProvider, vdbProvider, historydbProvider} + provider.recoverUnderConstructionLedger() + return provider, nil +} + +// CreateWithGenesisBlock implements the corresponding method from interface ledger.PeerLedgerProvider +// This functions sets a under construction flag before doing any thing related to ledger creation and +// upon a successful ledger creation with the committed genesis block, removes the flag and add entry into +// created ledgers list (atomically). If a crash happens in between, the 'recoverUnderConstructionLedger' +// function is invoked before declaring the provider to be usable +func (provider *Provider) CreateWithGenesisBlock(genesisBlock *common.Block) (ledger.PeerLedger, error) { + ledgerID, err := utils.GetChainIDFromBlock(genesisBlock) + if err != nil { + return nil, err + } + exists, err := provider.idStore.ledgerIDExists(ledgerID) + if err != nil { + return nil, err + } + if exists { + return nil, ErrLedgerIDExists + } + if err = provider.idStore.setUnderConstructionFlag(ledgerID); err != nil { + return nil, err + } + ledger, err := provider.openInternal(ledgerID) + if err != nil { + logger.Errorf("Error in opening a new empty ledger. Unsetting under construction flag. Err: %s", err) + panicOnErr(provider.runCleanup(ledgerID), "Error while running cleanup for ledger id [%s]", ledgerID) + panicOnErr(provider.idStore.unsetUnderConstructionFlag(), "Error while unsetting under construction flag") + return nil, err + } + if err := ledger.Commit(genesisBlock); err != nil { + ledger.Close() + return nil, err + } + panicOnErr(provider.idStore.createLedgerID(ledgerID), "Error while marking ledger as created") + return ledger, nil } // Create implements the corresponding method from interface ledger.PeerLedgerProvider @@ -102,15 +147,20 @@ func (provider *Provider) Create(ledgerID string) (ledger.PeerLedger, error) { if exists { return nil, ErrLedgerIDExists } - provider.idStore.createLedgerID(ledgerID) - return provider.Open(ledgerID) + ledger, err := provider.openInternal(ledgerID) + if err != nil { + return nil, err + } + if err = provider.idStore.createLedgerID(ledgerID); err != nil { + ledger.Close() + return nil, err + } + return ledger, nil } // Open implements the corresponding method from interface ledger.PeerLedgerProvider func (provider *Provider) Open(ledgerID string) (ledger.PeerLedger, error) { - logger.Debugf("Open() opening kvledger: %s", ledgerID) - // Check the ID store to ensure that the chainId/ledgerId exists exists, err := provider.idStore.ledgerIDExists(ledgerID) if err != nil { @@ -119,7 +169,10 @@ func (provider *Provider) Open(ledgerID string) (ledger.PeerLedger, error) { if !exists { return nil, ErrNonExistingLedgerID } + return provider.openInternal(ledgerID) +} +func (provider *Provider) openInternal(ledgerID string) (ledger.PeerLedger, error) { // Get the block store for a chain/ledger blockStore, err := provider.blockStoreProvider.OpenBlockStore(ledgerID) if err != nil { @@ -165,6 +218,63 @@ func (provider *Provider) Close() { provider.historydbProvider.Close() } +// recoverUnderConstructionLedger checks whether the under construction flag is set - this would be the case +// if a crash had happened during creation of ledger and the ledger creation could have been left in intermediate +// state. Recovery checks if the ledger was created and the genesis block was committed successfully then it completes +// the last step of adding the ledger id to the list of created ledgers. Else, it clears the under construction flag +func (provider *Provider) recoverUnderConstructionLedger() { + logger.Debugf("Recovering under construction ledger") + ledgerID, err := provider.idStore.getUnderConstructionFlag() + panicOnErr(err, "Error while checking whether the under construction flag is set") + if ledgerID == "" { + logger.Debugf("No under construction ledger found. Quitting recovery") + return + } + logger.Infof("ledger [%s] found as under construction", ledgerID) + ledger, err := provider.openInternal(ledgerID) + panicOnErr(err, "Error while opening under construction ledger [%s]", ledgerID) + bcInfo, err := ledger.GetBlockchainInfo() + panicOnErr(err, "Error while getting blockchain info for the under construction ledger [%s]", ledgerID) + ledger.Close() + + switch bcInfo.Height { + case 0: + logger.Infof("Genesis block was not committed. Hence, the peer ledger not created. unsetting the under construction flag") + panicOnErr(provider.runCleanup(ledgerID), "Error while running cleanup for ledger id [%s]", ledgerID) + panicOnErr(provider.idStore.unsetUnderConstructionFlag(), "Error while unsetting under construction flag") + case 1: + logger.Infof("Genesis block was committed. Hence, marking the peer ledger as created") + panicOnErr(provider.idStore.createLedgerID(ledgerID), "Error while adding ledgerID [%s] to created list", ledgerID) + default: + panic(fmt.Errorf( + "Data inconsistency: under construction flag is set for ledger [%s] while the height of the blockchain is [%d]", + ledgerID, bcInfo.Height)) + } + return +} + +// runCleanup cleans up blockstorage, statedb, and historydb for what +// may have got created during in-complete ledger creation +func (provider *Provider) runCleanup(ledgerID string) error { + // TODO - though, not having this is harmless for kv ledger. + // If we want, following could be done: + // - blockstorage could remove empty folders + // - couchdb backed statedb could delete the database if got created + // - leveldb backed statedb and history db need not perform anything as it uses a single db shared across ledgers + return nil +} + +func panicOnErr(err error, mgsFormat string, args ...interface{}) { + if err == nil { + return + } + args = append(args, err) + panic(fmt.Sprintf(mgsFormat+" Err:%s ", args...)) +} + +////////////////////////////////////////////////////////////////////// +// Ledger id persistence related code +/////////////////////////////////////////////////////////////////////// type idStore struct { db *leveldbhelper.DB } @@ -175,8 +285,24 @@ func openIDStore(path string) *idStore { return &idStore{db} } +func (s *idStore) setUnderConstructionFlag(ledgerID string) error { + return s.db.Put(underConstructionLedgerKey, []byte(ledgerID), true) +} + +func (s *idStore) unsetUnderConstructionFlag() error { + return s.db.Delete(underConstructionLedgerKey, true) +} + +func (s *idStore) getUnderConstructionFlag() (string, error) { + val, err := s.db.Get(underConstructionLedgerKey) + if err != nil { + return "", err + } + return string(val), nil +} + func (s *idStore) createLedgerID(ledgerID string) error { - key := []byte(ledgerID) + key := s.encodeLedgerKey(ledgerID) val := []byte{} err := error(nil) if val, err = s.db.Get(key); err != nil { @@ -185,11 +311,14 @@ func (s *idStore) createLedgerID(ledgerID string) error { if val != nil { return ErrLedgerIDExists } - return s.db.Put(key, val, true) + batch := &leveldb.Batch{} + batch.Put(key, val) + batch.Delete(underConstructionLedgerKey) + return s.db.WriteBatch(batch, true) } func (s *idStore) ledgerIDExists(ledgerID string) (bool, error) { - key := []byte(ledgerID) + key := s.encodeLedgerKey(ledgerID) val := []byte{} err := error(nil) if val, err = s.db.Get(key); err != nil { @@ -203,8 +332,11 @@ func (s *idStore) getAllLedgerIds() ([]string, error) { itr := s.db.GetIterator(nil, nil) itr.First() for itr.Valid() { - key := string(itr.Key()) - ids = append(ids, key) + if bytes.Equal(itr.Key(), underConstructionLedgerKey) { + continue + } + id := string(s.decodeLedgerID(itr.Key())) + ids = append(ids, id) itr.Next() } return ids, nil @@ -213,3 +345,11 @@ func (s *idStore) getAllLedgerIds() ([]string, error) { func (s *idStore) close() { s.db.Close() } + +func (s *idStore) encodeLedgerKey(ledgerID string) []byte { + return append(ledgerKeyPrefix, []byte(ledgerID)...) +} + +func (s *idStore) decodeLedgerID(key []byte) string { + return string(key[len(ledgerKeyPrefix):]) +} diff --git a/core/ledger/kvledger/kv_ledger_provider_test.go b/core/ledger/kvledger/kv_ledger_provider_test.go index 6a1f4f640dc..48e2774441b 100644 --- a/core/ledger/kvledger/kv_ledger_provider_test.go +++ b/core/ledger/kvledger/kv_ledger_provider_test.go @@ -20,6 +20,7 @@ import ( "fmt" "testing" + configtxtest "github.com/hyperledger/fabric/common/configtx/test" "github.com/hyperledger/fabric/common/ledger/testutil" "github.com/hyperledger/fabric/core/ledger" ) @@ -56,6 +57,47 @@ func TestLedgerProvider(t *testing.T) { testutil.AssertEquals(t, err, ErrNonExistingLedgerID) } +func TestLedgerCreationWithGenesisBlock(t *testing.T) { + env := newTestEnv(t) + defer env.cleanup() + numLedgers := 10 + provider, _ := NewProvider() + existingLedgerIDs, err := provider.List() + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, len(existingLedgerIDs), 0) + for i := 0; i < numLedgers; i++ { + genesisBlock, _ := configtxtest.MakeGenesisBlock(constructTestLedgerID(i)) + provider.CreateWithGenesisBlock(genesisBlock) + } + existingLedgerIDs, err = provider.List() + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, len(existingLedgerIDs), numLedgers) + + provider.Close() + + provider, _ = NewProvider() + defer provider.Close() + ledgerIds, _ := provider.List() + testutil.AssertEquals(t, len(ledgerIds), numLedgers) + t.Logf("ledgerIDs=%#v", ledgerIds) + for i := 0; i < numLedgers; i++ { + testutil.AssertEquals(t, ledgerIds[i], constructTestLedgerID(i)) + } + for i := 0; i < numLedgers; i++ { + ledger, err := provider.Open(constructTestLedgerID(i)) + testutil.AssertNoError(t, err, "") + bcInfo, err := ledger.GetBlockchainInfo() + ledger.Close() + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, bcInfo.Height, uint64(1)) + } + _, err = provider.Create(constructTestLedgerID(2)) + testutil.AssertEquals(t, err, ErrLedgerIDExists) + + _, err = provider.Open(constructTestLedgerID(numLedgers)) + testutil.AssertEquals(t, err, ErrNonExistingLedgerID) +} + func TestMultipleLedgerBasicRW(t *testing.T) { env := newTestEnv(t) defer env.cleanup() diff --git a/core/ledger/ledger_interface.go b/core/ledger/ledger_interface.go index aafb7d7a09f..4f4311540a2 100644 --- a/core/ledger/ledger_interface.go +++ b/core/ledger/ledger_interface.go @@ -26,6 +26,10 @@ import ( type PeerLedgerProvider interface { // Create creates a new ledger with a given unique id Create(ledgerID string) (PeerLedger, error) + // CreateWithGenesisBlock creates a new ledger with the given genesis block. + // This function guarentees that the creation of ledger and committing the genesis block would an atomic action + // The chain id retrieved from the genesis block is treated as a ledger id + CreateWithGenesisBlock(genesisBlock *common.Block) (PeerLedger, error) // Open opens an already created ledger Open(ledgerID string) (PeerLedger, error) // Exists tells whether the ledger with given id exists diff --git a/core/ledger/ledgermgmt/ledger_mgmt.go b/core/ledger/ledgermgmt/ledger_mgmt.go index 3d6a28901a8..acdb8d6b3cd 100644 --- a/core/ledger/ledgermgmt/ledger_mgmt.go +++ b/core/ledger/ledgermgmt/ledger_mgmt.go @@ -24,6 +24,8 @@ import ( "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger" + "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/utils" logging "github.com/op/go-logging" ) @@ -80,6 +82,31 @@ func CreateLedger(id string) (ledger.PeerLedger, error) { return l, nil } +// CreateWithGenesisBlock creates a new ledger with the given genesis block. +// This function guarentees that the creation of ledger and committing the genesis block would an atomic action +// The chain id retrieved from the genesis block is treated as a ledger id +func CreateWithGenesisBlock(genesisBlock *common.Block) (ledger.PeerLedger, error) { + lock.Lock() + defer lock.Unlock() + if !initialized { + return nil, ErrLedgerMgmtNotInitialized + } + id, err := utils.GetChainIDFromBlock(genesisBlock) + if err != nil { + return nil, err + } + + logger.Infof("Creating ledger [%s] with genesis block", id) + l, err := ledgerProvider.CreateWithGenesisBlock(genesisBlock) + if err != nil { + return nil, err + } + l = wrapLedger(id, l) + openedLedgers[id] = l + logger.Infof("Created ledger [%s] with genesis block", id) + return l, nil +} + // OpenLedger returns a ledger for the given id func OpenLedger(id string) (ledger.PeerLedger, error) { logger.Infof("Opening ledger with id = %s", id)