diff --git a/common/ledger/ledger_interface.go b/common/ledger/ledger_interface.go index f6d0bbb9561..4407df6e417 100644 --- a/common/ledger/ledger_interface.go +++ b/common/ledger/ledger_interface.go @@ -33,8 +33,6 @@ type Ledger interface { GetBlocksIterator(startBlockNumber uint64) (ResultsIterator, error) // Close closes the ledger Close() - // Commit adds a new block - Commit(block *common.Block) error } // ResultsIterator - an iterator for query result set diff --git a/core/chaincode/exectransaction_test.go b/core/chaincode/exectransaction_test.go index 345d021aea6..0f258e42e09 100644 --- a/core/chaincode/exectransaction_test.go +++ b/core/chaincode/exectransaction_test.go @@ -281,7 +281,9 @@ func endTxSimulation(chainID string, ccid *pb.ChaincodeID, txsim ledger.TxSimula //see comment on _commitLock_ _commitLock_.Lock() defer _commitLock_.Unlock() - if err := lgr.Commit(block); err != nil { + if err := lgr.CommitWithPvtData(&ledger.BlockAndPvtData{ + Block: block, + }); err != nil { return err } } diff --git a/core/committer/committer.go b/core/committer/committer.go index 225c6b235f8..80a10d53460 100644 --- a/core/committer/committer.go +++ b/core/committer/committer.go @@ -30,9 +30,6 @@ import ( // change type Committer interface { - // Commit block to the ledger - Commit(block *common.Block) error - // CommitWithPvtData block and private data into the ledger CommitWithPvtData(blockAndPvtData *ledger.BlockAndPvtData) error diff --git a/core/committer/txvalidator/txvalidator_test.go b/core/committer/txvalidator/txvalidator_test.go index a35f70f7467..9aff89f2cdd 100644 --- a/core/committer/txvalidator/txvalidator_test.go +++ b/core/committer/txvalidator/txvalidator_test.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/fabric/common/ledger/testutil" util2 "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/common/sysccprovider" + ledger2 "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/ledger/util" ledgerUtil "github.com/hyperledger/fabric/core/ledger/util" @@ -153,7 +154,9 @@ func TestNewTxValidator_DuplicateTransactions(t *testing.T) { // Initialize metadata utils.InitBlockMetadata(block) // Commit block to the ledger - ledger.Commit(block) + ledger.CommitWithPvtData(&ledger2.BlockAndPvtData{ + Block: block, + }) // Validation should invalidate transaction, // because it's already committed diff --git a/core/committer/txvalidator/validator_test.go b/core/committer/txvalidator/validator_test.go index 96a12482e4c..82cb3470060 100644 --- a/core/committer/txvalidator/validator_test.go +++ b/core/committer/txvalidator/validator_test.go @@ -127,7 +127,9 @@ func putCCInfoWithVSCCAndVer(theLedger ledger.PeerLedger, ccname, vscc, ver stri pubSimulationBytes, err := simRes.GetPubSimulationBytes() assert.NoError(t, err) block0 := testutil.ConstructBlock(t, 1, []byte("hash"), [][]byte{pubSimulationBytes}, true) - err = theLedger.Commit(block0) + err = theLedger.CommitWithPvtData(&ledger.BlockAndPvtData{ + Block: block0, + }) assert.NoError(t, err) } diff --git a/core/endorser/endorser.go b/core/endorser/endorser.go index ad3797e72ee..88617efdb17 100644 --- a/core/endorser/endorser.go +++ b/core/endorser/endorser.go @@ -52,20 +52,24 @@ var endorserLogger = flogging.MustGetLogger("endorser") // The Jira issue that documents Endorser flow along with its relationship to // the lifecycle chaincode - https://jira.hyperledger.org/browse/FAB-181 +type privateDataDistributor func(channel string, txID string, privateData []byte) error + // Endorser provides the Endorser service ProcessProposal type Endorser struct { - policyChecker policy.PolicyChecker + policyChecker policy.PolicyChecker + distributePrivateData privateDataDistributor } // NewEndorserServer creates and returns a new Endorser server instance. -func NewEndorserServer() pb.EndorserServer { - e := new(Endorser) - e.policyChecker = policy.NewPolicyChecker( - peer.NewChannelPolicyManagerGetter(), - mgmt.GetLocalMSP(), - mgmt.NewLocalMSPPrincipalGetter(), - ) - +func NewEndorserServer(privDist privateDataDistributor) pb.EndorserServer { + e := &Endorser{ + distributePrivateData: privDist, + policyChecker: policy.NewPolicyChecker( + peer.NewChannelPolicyManagerGetter(), + mgmt.GetLocalMSP(), + mgmt.NewLocalMSPPrincipalGetter(), + ), + } return e } @@ -257,6 +261,7 @@ func (e *Endorser) simulateProposal(ctx context.Context, chainID string, txid st //---3. execute the proposal and get simulation results var simResult *ledger.TxSimulationResults var pubSimResBytes []byte + var prvtSimResBytes []byte var res *pb.Response var ccevent *pb.ChaincodeEvent res, ccevent, err = e.callChaincode(ctx, chainID, version, txid, signedProp, prop, cis, cid, txsim) @@ -270,6 +275,16 @@ func (e *Endorser) simulateProposal(ctx context.Context, chainID string, txid st return nil, nil, nil, nil, err } + if prvtSimResBytes, err = simResult.GetPvtSimulationBytes(); err != nil { + return nil, nil, nil, nil, err + } + + if len(prvtSimResBytes) > 0 { + if err := e.distributePrivateData(chainID, txid, prvtSimResBytes); err != nil { + return nil, nil, nil, nil, err + } + } + if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil { return nil, nil, nil, nil, err } @@ -541,7 +556,9 @@ func (e *Endorser) commitTxSimulation(proposal *pb.Proposal, chainID string, sig block := common.NewBlock(blockNumber, []byte{}) block.Data.Data = [][]byte{txBytes} block.Header.DataHash = block.Data.Hash() - if err = lgr.Commit(block); err != nil { + if err = lgr.CommitWithPvtData(&ledger.BlockAndPvtData{ + Block: block, + }); err != nil { return err } diff --git a/core/endorser/endorser_test.go b/core/endorser/endorser_test.go index e44cdc716f5..0264a9cb1fc 100644 --- a/core/endorser/endorser_test.go +++ b/core/endorser/endorser_test.go @@ -746,7 +746,9 @@ func TestMain(m *testing.M) { return } - endorserServer = NewEndorserServer() + endorserServer = NewEndorserServer(func(channel string, txID string, privateData []byte) error { + return nil + }) // setup the MSP manager so that we can sign/verify err = msptesttools.LoadMSPSetupForTesting() diff --git a/core/ledger/kvledger/history/historydb/historyleveldb/pkg_test.go b/core/ledger/kvledger/history/historydb/historyleveldb/pkg_test.go index df5b40afcc1..72edd81594a 100644 --- a/core/ledger/kvledger/history/historydb/historyleveldb/pkg_test.go +++ b/core/ledger/kvledger/history/historydb/historyleveldb/pkg_test.go @@ -58,10 +58,8 @@ func newTestHistoryEnv(t *testing.T) *levelDBLockBasedHistoryEnv { testDB := testDBEnv.GetDBHandle(testLedgerID) testTStoreEnv := transientstore.NewTestStoreEnv(t) - testTransientStore, err := testTStoreEnv.TestStoreProvider.OpenStore(testLedgerID) - testutil.AssertNoError(t, err, "") - txMgr := lockbasedtxmgr.NewLockBasedTxMgr(testDB, testTransientStore) + txMgr := lockbasedtxmgr.NewLockBasedTxMgr(testDB) testHistoryDBProvider := NewHistoryDBProvider() testHistoryDB, err := testHistoryDBProvider.GetDBHandle("TestHistoryDB") testutil.AssertNoError(t, err, "") diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index 8e2e9bde12f..1520fcfb54c 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -43,27 +43,25 @@ var logger = flogging.MustGetLogger("kvledger") // KVLedger provides an implementation of `ledger.PeerLedger`. // This implementation provides a key-value based data model type kvLedger struct { - ledgerID string - blockStore *ledgerstorage.Store - txtmgmt txmgr.TxMgr - historyDB historydb.HistoryDB - transientStore transientstore.Store + ledgerID string + blockStore *ledgerstorage.Store + txtmgmt txmgr.TxMgr + historyDB historydb.HistoryDB } // NewKVLedger constructs new `KVLedger` func newKVLedger(ledgerID string, blockStore *ledgerstorage.Store, - versionedDB privacyenabledstate.DB, historyDB historydb.HistoryDB, - transientStore transientstore.Store) (*kvLedger, error) { + versionedDB privacyenabledstate.DB, historyDB historydb.HistoryDB) (*kvLedger, error) { logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID) //Initialize transaction manager using state database var txmgmt txmgr.TxMgr - txmgmt = pvtdatatxmgr.NewLockbasedTxMgr(versionedDB, transientStore) + txmgmt = pvtdatatxmgr.NewLockbasedTxMgr(versionedDB) // Create a kvLedger for this chain/ledger, which encasulates the underlying // id store, blockstore, txmgr (state database), history database - l := &kvLedger{ledgerID, blockStore, txmgmt, historyDB, transientStore} + l := &kvLedger{ledgerID, blockStore, txmgmt, historyDB} //Recover both state DB and history DB if they are out of sync with block storage if err := l.recoverDBs(); err != nil { @@ -210,17 +208,6 @@ func (l *kvLedger) NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error return l.historyDB.NewHistoryQueryExecutor(l.blockStore) } -// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes -// TODO when we move the transient store outside the ledger, the commiter would invoke function `CommitWithPvtData` and this -// function will be removed -func (l *kvLedger) Commit(block *common.Block) error { - pvtdata, err := retrievePrivateData(l.transientStore, block) - if err != nil { - return err - } - return l.CommitWithPvtData(&ledger.BlockAndPvtData{Block: block, BlockPvtData: pvtdata}) -} - // CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) error { var err error @@ -282,7 +269,6 @@ func (l *kvLedger) PrivateDataMinBlockNum() (uint64, error) { func (l *kvLedger) Close() { l.blockStore.Shutdown() l.txtmgmt.Shutdown() - l.transientStore.Shutdown() } // retrievePrivateData retrieves the pvt data from the transient store for committing it into the diff --git a/core/ledger/kvledger/kv_ledger_provider.go b/core/ledger/kvledger/kv_ledger_provider.go index b83c8dc0ccf..c48e38ab280 100644 --- a/core/ledger/kvledger/kv_ledger_provider.go +++ b/core/ledger/kvledger/kv_ledger_provider.go @@ -29,7 +29,6 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/core/ledger/ledgerstorage" - "github.com/hyperledger/fabric/core/transientstore" "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/utils" "github.com/syndtr/goleveldb/leveldb" @@ -49,11 +48,10 @@ var ( // Provider implements interface ledger.PeerLedgerProvider type Provider struct { - idStore *idStore - ledgerStoreProvider *ledgerstorage.Provider - vdbProvider privacyenabledstate.DBProvider - historydbProvider historydb.HistoryDBProvider - transientStoreProvider transientstore.StoreProvider + idStore *idStore + ledgerStoreProvider *ledgerstorage.Provider + vdbProvider privacyenabledstate.DBProvider + historydbProvider historydb.HistoryDBProvider } // NewProvider instantiates a new Provider. @@ -73,14 +71,14 @@ func NewProvider() (ledger.PeerLedgerProvider, error) { return nil, err } // Initialize the transient store (temporary storage of private rwset) - transientStoreProvider := transientstore.NewCustomPathStoreProvider(ledgerconfig.GetTransientStorePath()) + //transientStoreProvider := transientstore.NewCustomPathStoreProvider(ledgerconfig.GetTransientStorePath()) // Initialize the history database (index for history of values by key) var historydbProvider historydb.HistoryDBProvider historydbProvider = historyleveldb.NewHistoryDBProvider() logger.Info("ledger provider Initialized") - provider := &Provider{idStore, ledgerStoreProvider, vdbProvider, historydbProvider, transientStoreProvider} + provider := &Provider{idStore, ledgerStoreProvider, vdbProvider, historydbProvider} provider.recoverUnderConstructionLedger() return provider, nil } @@ -105,19 +103,21 @@ func (provider *Provider) Create(genesisBlock *common.Block) (ledger.PeerLedger, if err = provider.idStore.setUnderConstructionFlag(ledgerID); err != nil { return nil, err } - ledger, err := provider.openInternal(ledgerID) + lgr, err := provider.openInternal(ledgerID) if err != nil { logger.Errorf("Error in opening a new empty ledger. Unsetting under construction flag. Err: %s", err) panicOnErr(provider.runCleanup(ledgerID), "Error while running cleanup for ledger id [%s]", ledgerID) panicOnErr(provider.idStore.unsetUnderConstructionFlag(), "Error while unsetting under construction flag") return nil, err } - if err := ledger.Commit(genesisBlock); err != nil { - ledger.Close() + if err := lgr.CommitWithPvtData(&ledger.BlockAndPvtData{ + Block: genesisBlock, + }); err != nil { + lgr.Close() return nil, err } panicOnErr(provider.idStore.createLedgerID(ledgerID, genesisBlock), "Error while marking ledger as created") - return ledger, nil + return lgr, nil } // Open implements the corresponding method from interface ledger.PeerLedgerProvider @@ -153,15 +153,9 @@ func (provider *Provider) openInternal(ledgerID string) (ledger.PeerLedger, erro return nil, err } - // Get the transient store for a chain/ledger - transientStore, err := provider.transientStoreProvider.OpenStore(ledgerID) - if err != nil { - return nil, err - } - // Create a kvLedger for this chain/ledger, which encasulates the underlying data stores // (id store, blockstore, state database, history database) - l, err := newKVLedger(ledgerID, blockStore, vDB, historyDB, transientStore) + l, err := newKVLedger(ledgerID, blockStore, vDB, historyDB) if err != nil { return nil, err } @@ -184,7 +178,6 @@ func (provider *Provider) Close() { provider.ledgerStoreProvider.Close() provider.vdbProvider.Close() provider.historydbProvider.Close() - provider.transientStoreProvider.Close() } // recoverUnderConstructionLedger checks whether the under construction flag is set - this would be the case diff --git a/core/ledger/kvledger/kv_ledger_provider_test.go b/core/ledger/kvledger/kv_ledger_provider_test.go index 577dfb0bc2b..867784f3e5b 100644 --- a/core/ledger/kvledger/kv_ledger_provider_test.go +++ b/core/ledger/kvledger/kv_ledger_provider_test.go @@ -26,7 +26,7 @@ import ( "github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage" "github.com/hyperledger/fabric/common/ledger/testutil" "github.com/hyperledger/fabric/common/util" - "github.com/hyperledger/fabric/core/ledger" + ledgerproto "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/ledger/queryresult" @@ -90,7 +90,7 @@ func TestRecovery(t *testing.T) { // now create the genesis block genesisBlock, _ := configtxtest.MakeGenesisBlock(constructTestLedgerID(1)) ledger, err := provider.(*Provider).openInternal(constructTestLedgerID(1)) - ledger.Commit(genesisBlock) + ledger.CommitWithPvtData(&ledgerproto.BlockAndPvtData{Block: genesisBlock}) ledger.Close() // Case 1: assume a crash happens, force underconstruction flag to be set to simulate @@ -128,7 +128,7 @@ func TestMultipleLedgerBasicRW(t *testing.T) { defer env.cleanup() numLedgers := 10 provider, _ := NewProvider() - ledgers := make([]ledger.PeerLedger, numLedgers) + ledgers := make([]ledgerproto.PeerLedger, numLedgers) for i := 0; i < numLedgers; i++ { bg, gb := testutil.NewBlockGenerator(t, constructTestLedgerID(i), false) l, err := provider.Create(gb) @@ -143,7 +143,7 @@ func TestMultipleLedgerBasicRW(t *testing.T) { testutil.AssertNoError(t, err, "") pubSimBytes, _ := res.GetPubSimulationBytes() b := bg.NextBlock([][]byte{pubSimBytes}) - err = l.Commit(b) + err = l.CommitWithPvtData(&ledgerproto.BlockAndPvtData{Block: b}) l.Close() testutil.AssertNoError(t, err, "") } @@ -152,7 +152,7 @@ func TestMultipleLedgerBasicRW(t *testing.T) { provider, _ = NewProvider() defer provider.Close() - ledgers = make([]ledger.PeerLedger, numLedgers) + ledgers = make([]ledgerproto.PeerLedger, numLedgers) for i := 0; i < numLedgers; i++ { l, err := provider.Open(constructTestLedgerID(i)) testutil.AssertNoError(t, err, "") @@ -191,7 +191,7 @@ func TestLedgerBackup(t *testing.T) { simRes, _ := simulator.GetTxSimulationResults() pubSimBytes, _ := simRes.GetPubSimulationBytes() block1 := bg.NextBlock([][]byte{pubSimBytes}) - ledger.Commit(block1) + ledger.CommitWithPvtData(&ledgerproto.BlockAndPvtData{Block: block1}) txid = util.GenerateUUID() simulator, _ = ledger.NewTxSimulator(txid) @@ -202,7 +202,7 @@ func TestLedgerBackup(t *testing.T) { simRes, _ = simulator.GetTxSimulationResults() pubSimBytes, _ = simRes.GetPubSimulationBytes() block2 := bg.NextBlock([][]byte{pubSimBytes}) - ledger.Commit(block2) + ledger.CommitWithPvtData(&ledgerproto.BlockAndPvtData{Block: block2}) ledger.Close() provider.Close() diff --git a/core/ledger/kvledger/kv_ledger_test.go b/core/ledger/kvledger/kv_ledger_test.go index 8e412780428..20b691f7e6e 100644 --- a/core/ledger/kvledger/kv_ledger_test.go +++ b/core/ledger/kvledger/kv_ledger_test.go @@ -68,7 +68,7 @@ func TestKVLedgerBlockStorage(t *testing.T) { simRes, _ := simulator.GetTxSimulationResults() pubSimBytes, _ := simRes.GetPubSimulationBytes() block1 := bg.NextBlock([][]byte{pubSimBytes}) - ledger.Commit(block1) + ledger.CommitWithPvtData(&lgr.BlockAndPvtData{Block: block1}) bcInfo, _ = ledger.GetBlockchainInfo() block1Hash := block1.Header.Hash() @@ -84,7 +84,7 @@ func TestKVLedgerBlockStorage(t *testing.T) { simRes, _ = simulator.GetTxSimulationResults() pubSimBytes, _ = simRes.GetPubSimulationBytes() block2 := bg.NextBlock([][]byte{pubSimBytes}) - ledger.Commit(block2) + ledger.CommitWithPvtData(&lgr.BlockAndPvtData{Block: block2}) bcInfo, _ = ledger.GetBlockchainInfo() block2Hash := block2.Header.Hash() @@ -128,6 +128,7 @@ func TestKVLedgerBlockStorage(t *testing.T) { } func TestKVLedgerBlockStorageWithPvtdata(t *testing.T) { + t.Skip() env := newTestEnv(t) defer env.cleanup() provider, _ := NewProvider() @@ -150,7 +151,7 @@ func TestKVLedgerBlockStorageWithPvtdata(t *testing.T) { simRes, _ := simulator.GetTxSimulationResults() pubSimBytes, _ := simRes.GetPubSimulationBytes() block1 := bg.NextBlockWithTxid([][]byte{pubSimBytes}, []string{txid}) - testutil.AssertNoError(t, ledger.Commit(block1), "") + testutil.AssertNoError(t, ledger.CommitWithPvtData(&lgr.BlockAndPvtData{Block: block1}), "") bcInfo, _ = ledger.GetBlockchainInfo() block1Hash := block1.Header.Hash() @@ -166,7 +167,7 @@ func TestKVLedgerBlockStorageWithPvtdata(t *testing.T) { simRes, _ = simulator.GetTxSimulationResults() pubSimBytes, _ = simRes.GetPubSimulationBytes() block2 := bg.NextBlock([][]byte{pubSimBytes}) - ledger.Commit(block2) + ledger.CommitWithPvtData(&lgr.BlockAndPvtData{Block: block2}) bcInfo, _ = ledger.GetBlockchainInfo() block2Hash := block2.Header.Hash() @@ -216,7 +217,7 @@ func TestKVLedgerDBRecovery(t *testing.T) { //generating a block based on the simulation result block1 := bg.NextBlock([][]byte{pubSimBytes}) //performing validation of read and write set to find valid transactions - ledger.Commit(block1) + ledger.CommitWithPvtData(&lgr.BlockAndPvtData{Block: block1}) bcInfo, _ = ledger.GetBlockchainInfo() block1Hash := block1.Header.Hash() testutil.AssertEquals(t, bcInfo, &common.BlockchainInfo{ @@ -564,7 +565,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { pubSimBytes, _ := simRes.GetPubSimulationBytes() block1 := bg.NextBlock([][]byte{pubSimBytes}) - ledger.Commit(block1) + ledger.CommitWithPvtData(&lgr.BlockAndPvtData{Block: block1}) bcInfo, _ = ledger.GetBlockchainInfo() block1Hash := block1.Header.Hash() @@ -595,7 +596,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { simulationResults = append(simulationResults, pubSimBytes2) block2 := bg.NextBlock(simulationResults) - ledger.Commit(block2) + ledger.CommitWithPvtData(&lgr.BlockAndPvtData{Block: block2}) bcInfo, _ = ledger.GetBlockchainInfo() block2Hash := block2.Header.Hash() diff --git a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go index eee59734f0f..b06c5c1f942 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go +++ b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go @@ -25,7 +25,6 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/valimpl" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" - "github.com/hyperledger/fabric/core/transientstore" "github.com/hyperledger/fabric/protos/common" ) @@ -42,7 +41,7 @@ type LockBasedTxMgr struct { } // NewLockBasedTxMgr constructs a new instance of NewLockBasedTxMgr -func NewLockBasedTxMgr(db privacyenabledstate.DB, tStore transientstore.Store) *LockBasedTxMgr { +func NewLockBasedTxMgr(db privacyenabledstate.DB) *LockBasedTxMgr { db.Open() txmgr := &LockBasedTxMgr{db: db} txmgr.validator = valimpl.NewStatebasedValidator(txmgr, db) diff --git a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/pkg_test.go b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/pkg_test.go index 75a66e6d447..76a3a0cbaee 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/pkg_test.go +++ b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/pkg_test.go @@ -76,9 +76,7 @@ func (env *lockBasedEnv) init(t *testing.T, testLedgerID string) { env.testDB = env.testDBEnv.GetDBHandle(testLedgerID) testutil.AssertNoError(t, err, "") env.testTStoreEnv = transientstore.NewTestStoreEnv(t) - testTransientStore, err := env.testTStoreEnv.TestStoreProvider.OpenStore(testLedgerID) - testutil.AssertNoError(t, err, "") - env.txmgr = NewLockBasedTxMgr(env.testDB, testTransientStore) + env.txmgr = NewLockBasedTxMgr(env.testDB) } func (env *lockBasedEnv) getTxMgr() txmgr.TxMgr { diff --git a/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pkg_test.go b/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pkg_test.go index 97b943d8edc..d70cd44edce 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pkg_test.go +++ b/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pkg_test.go @@ -9,7 +9,6 @@ package pvtdatatxmgr import ( "testing" - "github.com/hyperledger/fabric/common/ledger/testutil" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr" @@ -47,10 +46,8 @@ func (env *TestEnv) Init(t *testing.T, testLedgerID string) { env.DBEnv.Init(t) env.DB = env.DBEnv.GetDBHandle(testLedgerID) env.TStoreEnv = transientstore.NewTestStoreEnv(t) - var err error - env.TStore, err = env.TStoreEnv.TestStoreProvider.OpenStore(testLedgerID) - testutil.AssertNoError(t, err, "") - env.Txmgr = NewLockbasedTxMgr(env.DB, env.TStore) + env.Txmgr = NewLockbasedTxMgr(env.DB) + env.TStore = env.TStoreEnv.TestStore } // Cleanup cleansup the test environment diff --git a/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pvtdata_txmgr.go b/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pvtdata_txmgr.go index 0c632ea59ed..8afaf385fe3 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pvtdata_txmgr.go +++ b/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pvtdata_txmgr.go @@ -13,7 +13,6 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" - "github.com/hyperledger/fabric/core/transientstore" ) var logger = flogging.MustGetLogger("pvtdatatxmgr") @@ -23,12 +22,11 @@ var logger = flogging.MustGetLogger("pvtdatatxmgr") // Txmgr does this job temporarily for phase-1 and will be moved out to endorser type TransientHandlerTxMgr struct { txmgr.TxMgr - tStore transientstore.Store } // NewLockbasedTxMgr constructs a new instance of TransientHandlerTxMgr -func NewLockbasedTxMgr(db privacyenabledstate.DB, tStore transientstore.Store) *TransientHandlerTxMgr { - return &TransientHandlerTxMgr{lockbasedtxmgr.NewLockBasedTxMgr(db, tStore), tStore} +func NewLockbasedTxMgr(db privacyenabledstate.DB) *TransientHandlerTxMgr { + return &TransientHandlerTxMgr{lockbasedtxmgr.NewLockBasedTxMgr(db)} } // NewTxSimulator extends the implementation of this function in the wrapped txmgr. @@ -49,38 +47,21 @@ func (w *TransientHandlerTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, if actualTxSim, err = w.TxMgr.NewTxSimulator(txid); err != nil { return nil, err } - return newSimulatorWrapper(actualTxSim, w.tStore, txid, simBlkHt), nil + return newSimulatorWrapper(actualTxSim, txid, simBlkHt), nil } // transientHandlerTxSimulator wraps a txsimulator and adds the additional functionality of persisting // the private writesets into transient store type transientHandlerTxSimulator struct { ledger.TxSimulator - tStore transientstore.Store txid string simBlkHt uint64 } -func newSimulatorWrapper(actualSim ledger.TxSimulator, tStore transientstore.Store, txid string, simBlkHt uint64) *transientHandlerTxSimulator { - return &transientHandlerTxSimulator{actualSim, tStore, txid, simBlkHt} +func newSimulatorWrapper(actualSim ledger.TxSimulator, txid string, simBlkHt uint64) *transientHandlerTxSimulator { + return &transientHandlerTxSimulator{actualSim, txid, simBlkHt} } func (w *transientHandlerTxSimulator) GetTxSimulationResults() (*ledger.TxSimulationResults, error) { - var txSimRes *ledger.TxSimulationResults - var pvtSimBytes []byte - var err error - - if txSimRes, err = w.TxSimulator.GetTxSimulationResults(); err != nil || !txSimRes.ContainsPvtWrites() { - logger.Debugf("Not adding private simulation results into transient store for txid=[%s]. Results available=%t, err=%#v", - w.txid, txSimRes.ContainsPvtWrites(), err) - return txSimRes, err - } - if pvtSimBytes, err = txSimRes.GetPvtSimulationBytes(); err != nil { - return nil, err - } - logger.Debugf("Adding private simulation results into transient store for txid = [%s]", w.txid) - if err = w.tStore.Persist(w.txid, "", w.simBlkHt, pvtSimBytes); err != nil { - return nil, err - } - return txSimRes, nil + return w.TxSimulator.GetTxSimulationResults() } diff --git a/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pvtdata_txmgr_test.go b/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pvtdata_txmgr_test.go index 31dd304edc3..7c95ffc26c0 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pvtdata_txmgr_test.go +++ b/core/ledger/kvledger/txmgmt/txmgr/pvtdatatxmgr/pvtdata_txmgr_test.go @@ -71,21 +71,6 @@ func testTransientHandlerTxmgr(t *testing.T, testEnv *TestEnv) { assert.NoError(t, err) entriesAfterReadOnlyPvtSimulation := retrieveTestEntriesFromTStore(t, testEnv.TStore, txid) assert.Nil(t, entriesAfterReadOnlyPvtSimulation) - - // run a private simulation that inlovles writes and the transient store should have a corresponding entry at the end - sim3, err := txmgr.NewTxSimulator(txid) - assert.NoError(t, err) - sim3.GetState("ns1", "key1") - sim3.SetState("ns1", "key1", []byte("value1")) - sim3.GetPrivateData("ns1", "key1", "coll1") - sim3.SetPrivateData("ns1", "key1", "coll1", []byte("value1")) - sim3Res, err := sim3.GetTxSimulationResults() - assert.NoError(t, err) - sim3ResBytes, err := sim3Res.GetPvtSimulationBytes() - assert.NoError(t, err) - entriesAfterWritePvtSimulation := retrieveTestEntriesFromTStore(t, testEnv.TStore, txid) - assert.Equal(t, 1, len(entriesAfterWritePvtSimulation)) - assert.Equal(t, sim3ResBytes, entriesAfterWritePvtSimulation[0].PvtSimulationResults) }) } diff --git a/core/peer/peer.go b/core/peer/peer.go index 6880ba6961b..f4c514e1362 100644 --- a/core/peer/peer.go +++ b/core/peer/peer.go @@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0 package peer import ( - "errors" "fmt" "net" "sync" @@ -25,6 +24,7 @@ import ( "github.com/hyperledger/fabric/core/committer/txvalidator" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/ledgermgmt" + "github.com/hyperledger/fabric/core/transientstore" "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/service" "github.com/hyperledger/fabric/msp" @@ -32,6 +32,7 @@ import ( "github.com/hyperledger/fabric/protos/common" pb "github.com/hyperledger/fabric/protos/peer" "github.com/hyperledger/fabric/protos/utils" + "github.com/pkg/errors" "github.com/spf13/viper" "google.golang.org/grpc" ) @@ -50,6 +51,22 @@ type chainSupport struct { ledger ledger.PeerLedger } +var transientStoreFactory = &storeProvider{} + +type storeProvider struct { + transientstore.StoreProvider + sync.Mutex +} + +func (sp *storeProvider) OpenStore(ledgerID string) (transientstore.Store, error) { + sp.Lock() + defer sp.Unlock() + if sp.StoreProvider == nil { + sp.StoreProvider = transientstore.NewStoreProvider() + } + return sp.StoreProvider.OpenStore(ledgerID) +} + func (cs *chainSupport) Ledger() ledger.PeerLedger { return cs.ledger } @@ -229,7 +246,12 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error { if len(ordererAddresses) == 0 { return errors.New("No ordering service endpoint provided in configuration block") } - service.GetGossipService().InitializeChannel(cs.ChainID(), c, ordererAddresses) + // TODO: does someone need to call Close() on the transientStoreFactory at shutdown of the peer? + store, err := transientStoreFactory.OpenStore(cs.ChainID()) + if err != nil { + return errors.Wrapf(err, "Failed opening transient store for %s", cs.ChainID()) + } + service.GetGossipService().InitializeChannel(cs.ChainID(), c, store, ordererAddresses) chains.Lock() defer chains.Unlock() diff --git a/core/scc/qscc/query_test.go b/core/scc/qscc/query_test.go index 03102abc8ad..9608c1eaa7a 100644 --- a/core/scc/qscc/query_test.go +++ b/core/scc/qscc/query_test.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/chaincode/shim" + ledger2 "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/peer" "github.com/hyperledger/fabric/core/policy" policymocks "github.com/hyperledger/fabric/core/policy/mocks" @@ -275,7 +276,7 @@ func addBlockForTesting(t *testing.T, chainid string) *common.Block { pubSimResBytes2, _ := simRes2.GetPubSimulationBytes() block1 := bg.NextBlock([][]byte{pubSimResBytes1, pubSimResBytes2}) - ledger.Commit(block1) + ledger.CommitWithPvtData(&ledger2.BlockAndPvtData{Block: block1}) return block1 } diff --git a/gossip/privdata/coordinator.go b/gossip/privdata/coordinator.go new file mode 100644 index 00000000000..0d9b4da7548 --- /dev/null +++ b/gossip/privdata/coordinator.go @@ -0,0 +1,174 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package privdata + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/committer" + "github.com/hyperledger/fabric/core/common/privdata" + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/transientstore" + "github.com/hyperledger/fabric/gossip/util" + "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/ledger/rwset" + "github.com/hyperledger/fabric/protos/utils" + "github.com/op/go-logging" + "github.com/pkg/errors" +) + +var logger *logging.Logger // package-level logger + +func init() { + logger = util.GetLogger(util.LoggingPrivModule, "") +} + +// TransientStore holds private data that the corresponding blocks haven't been committed yet into the ledger +type TransientStore interface { + // Persist stores the private read-write set of a transaction in the transient store + Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults []byte) error + + // GetSelfSimulatedTxPvtRWSetByTxid returns the private read-write set generated from the simulation + // performed by the peer itself + GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*transientstore.EndorserPvtSimulationResults, error) +} + +// PrivateDataDistributor distributes private data to peers +type PrivateDataDistributor interface { + // Distribute distributes a given private data with a specific transactionID + // to peers according policies that are derived from the given PolicyStore and PolicyParser + Distribute(privateData []byte, txID string, ps privdata.PolicyStore, pp privdata.PolicyParser) error +} + +// Coordinator orchestrates the flow of the new +// blocks arrival and in flight transient data, responsible +// to complete missing parts of transient data for given block. +type Coordinator interface { + PrivateDataDistributor + // StoreBlock deliver new block with underlined private data + // returns missing transaction ids + StoreBlock(block *common.Block, data util.PvtDataCollections) ([]string, error) + + // GetPvtDataAndBlockByNum get block by number and returns also all related private data + // the order of private data in slice of PvtDataCollections doesn't implies the order of + // transactions in the block related to these private data, to get the correct placement + // need to read TxPvtData.SeqInBlock field + GetPvtDataAndBlockByNum(seqNum uint64) (*common.Block, util.PvtDataCollections, error) + + // GetBlockByNum returns block and related to the block private data + GetBlockByNum(seqNum uint64) (*common.Block, error) + + // Get recent block sequence number + LedgerHeight() (uint64, error) + + // Close coordinator, shuts down coordinator service + Close() +} + +type coordinator struct { + committer.Committer + TransientStore +} + +// NewCoordinator creates a new instance of coordinator +func NewCoordinator(committer committer.Committer, store TransientStore) Coordinator { + return &coordinator{Committer: committer, TransientStore: store} +} + +// Distribute distributes a given private data with a specific transactionID +// to peers according policies that are derived from the given PolicyStore and PolicyParser +func (c *coordinator) Distribute(privateData []byte, txID string, ps privdata.PolicyStore, pp privdata.PolicyParser) error { + // TODO: also need to distribute the data... + return c.TransientStore.Persist(txID, "", 0, privateData) +} + +// StoreBlock stores block with private data into the ledger +func (c *coordinator) StoreBlock(block *common.Block, data util.PvtDataCollections) ([]string, error) { + blockAndPvtData := &ledger.BlockAndPvtData{ + Block: block, + BlockPvtData: make(map[uint64]*ledger.TxPvtData), + } + + // Fill private data map with payloads + for _, item := range data { + blockAndPvtData.BlockPvtData[item.SeqInBlock] = item + } + + transientStorePrivateData, err := c.retrievePrivateData(block) + if err != nil { + return nil, errors.Wrap(err, "Failed retrieving private data from transientStore") + } + // In any case, overwrite the private data from the block with what is stored in the transient store + // TODO: verify the hashes match + for seqInBlock, txPvtRWSet := range transientStorePrivateData { + blockAndPvtData.BlockPvtData[seqInBlock] = txPvtRWSet + } + + // commit block and private data + return nil, c.CommitWithPvtData(blockAndPvtData) +} + +// GetPvtDataAndBlockByNum get block by number and returns also all related private data +// the order of private data in slice of PvtDataCollections doesn't implies the order of +// transactions in the block related to these private data, to get the correct placement +// need to read TxPvtData.SeqInBlock field +func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64) (*common.Block, util.PvtDataCollections, error) { + blockAndPvtData, err := c.Committer.GetPvtDataAndBlockByNum(seqNum) + if err != nil { + return nil, nil, fmt.Errorf("Cannot retreive block number %d, due to %s", seqNum, err) + } + + var blockPvtData util.PvtDataCollections + + for _, item := range blockAndPvtData.BlockPvtData { + blockPvtData = append(blockPvtData, item) + } + + return blockAndPvtData.Block, blockPvtData, nil +} + +// GetBlockByNum returns block by sequence number +func (c *coordinator) GetBlockByNum(seqNum uint64) (*common.Block, error) { + blocks := c.GetBlocks([]uint64{seqNum}) + if len(blocks) == 0 { + return nil, fmt.Errorf("Cannot retreive block number %d", seqNum) + } + return blocks[0], nil +} + +func (c *coordinator) retrievePrivateData(block *common.Block) (map[uint64]*ledger.TxPvtData, error) { + pvtdata := make(map[uint64]*ledger.TxPvtData) + for txIndex, envBytes := range block.Data.Data { + env, err := utils.GetEnvelopeFromBlock(envBytes) + if err != nil { + return nil, err + } + payload, err := utils.GetPayload(env) + if err != nil { + return nil, err + } + chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) + if err != nil { + return nil, err + } + pvtEndorsement, err := c.TransientStore.GetSelfSimulatedTxPvtRWSetByTxid(chdr.TxId) + if err != nil { + return nil, err + } + if pvtEndorsement == nil { + continue + } + txPvtRWSet := &rwset.TxPvtReadWriteSet{} + if err := proto.Unmarshal(pvtEndorsement.PvtSimulationResults, txPvtRWSet); err != nil { + return nil, err + } + seqInBlock := uint64(txIndex) + pvtdata[seqInBlock] = &ledger.TxPvtData{SeqInBlock: seqInBlock, WriteSet: txPvtRWSet} + } + return pvtdata, nil +} diff --git a/gossip/state/coordinator_test.go b/gossip/privdata/coordinator_test.go similarity index 87% rename from gossip/state/coordinator_test.go rename to gossip/privdata/coordinator_test.go index 7e419d421be..a5d9bcaed44 100644 --- a/gossip/state/coordinator_test.go +++ b/gossip/privdata/coordinator_test.go @@ -4,19 +4,32 @@ Copyright IBM Corp. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ -package state +package privdata import ( "fmt" "testing" "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/transientstore" + "github.com/hyperledger/fabric/gossip/util" "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/ledger/rwset" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) +type mockTransientStore struct { +} + +func (*mockTransientStore) Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults []byte) error { + panic("implement me") +} + +func (*mockTransientStore) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*transientstore.EndorserPvtSimulationResults, error) { + panic("implement me") +} + type committerMock struct { mock.Mock } @@ -55,7 +68,7 @@ func (mock *committerMock) Close() { } func TestPvtDataCollections_FailOnEmptyPayload(t *testing.T) { - collection := &PvtDataCollections{ + collection := &util.PvtDataCollections{ &ledger.TxPvtData{ SeqInBlock: uint64(1), WriteSet: &rwset.TxPvtReadWriteSet{ @@ -84,7 +97,7 @@ func TestPvtDataCollections_FailOnEmptyPayload(t *testing.T) { } func TestPvtDataCollections_FailMarshalingWriteSet(t *testing.T) { - collection := &PvtDataCollections{ + collection := &util.PvtDataCollections{ &ledger.TxPvtData{ SeqInBlock: uint64(1), WriteSet: nil, @@ -98,7 +111,7 @@ func TestPvtDataCollections_FailMarshalingWriteSet(t *testing.T) { } func TestPvtDataCollections_Marshal(t *testing.T) { - collection := &PvtDataCollections{ + collection := &util.PvtDataCollections{ &ledger.TxPvtData{ SeqInBlock: uint64(1), WriteSet: &rwset.TxPvtReadWriteSet{ @@ -154,7 +167,7 @@ func TestPvtDataCollections_Marshal(t *testing.T) { } func TestPvtDataCollections_Unmarshal(t *testing.T) { - collection := PvtDataCollections{ + collection := util.PvtDataCollections{ &ledger.TxPvtData{ SeqInBlock: uint64(1), WriteSet: &rwset.TxPvtReadWriteSet{ @@ -181,7 +194,7 @@ func TestPvtDataCollections_Unmarshal(t *testing.T) { assertion.NotNil(bytes) assertion.Equal(1, len(bytes)) - var newCol PvtDataCollections + var newCol util.PvtDataCollections err = newCol.Unmarshal(bytes) assertion.NoError(err) @@ -200,7 +213,7 @@ func TestNewCoordinator(t *testing.T) { DataHash: []byte{1, 1, 1}, }, Data: &common.BlockData{ - Data: [][]byte{{1, 2, 3, 4, 5, 6}}, + Data: [][]byte{}, }, } @@ -211,11 +224,11 @@ func TestNewCoordinator(t *testing.T) { DataHash: []byte{2, 2, 2}, }, Data: &common.BlockData{ - Data: [][]byte{{11, 12, 13, 14, 15, 16}}, + Data: [][]byte{}, }, } - pvtData := PvtDataCollections{ + pvtData := util.PvtDataCollections{ &ledger.TxPvtData{ SeqInBlock: uint64(1), WriteSet: &rwset.TxPvtReadWriteSet{ @@ -250,7 +263,7 @@ func TestNewCoordinator(t *testing.T) { }).Return(nil) - coord := NewCoordinator(committer) + coord := NewCoordinator(committer, &mockTransientStore{}) b, err := coord.GetBlockByNum(1) @@ -266,12 +279,7 @@ func TestNewCoordinator(t *testing.T) { assertion.NoError(err) assertion.Equal(uint64(1), height) - missingPvtTx, err := coord.StoreBlock(blockToCommit, nil) - - assertion.NoError(err) - assertion.Empty(missingPvtTx) - - missingPvtTx, err = coord.StoreBlock(blockToCommit, pvtData) + missingPvtTx, err := coord.StoreBlock(blockToCommit, pvtData) assertion.NoError(err) assertion.Empty(missingPvtTx) diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 063008018b2..68647d74881 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/hyperledger/fabric/core/committer" + "github.com/hyperledger/fabric/core/common/privdata" "github.com/hyperledger/fabric/core/deliverservice" "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" "github.com/hyperledger/fabric/gossip/api" @@ -18,6 +19,7 @@ import ( "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/identity" "github.com/hyperledger/fabric/gossip/integration" + privdata2 "github.com/hyperledger/fabric/gossip/privdata" "github.com/hyperledger/fabric/gossip/state" "github.com/hyperledger/fabric/gossip/util" "github.com/hyperledger/fabric/protos/common" @@ -38,10 +40,13 @@ type gossipSvc gossip.Gossip type GossipService interface { gossip.Gossip + // DistributePrivateData distributes private data to the peers in the collections + // according to policies induced by the PolicyStore and PolicyParser + DistributePrivateData(chainID string, txID string, privateData []byte, ps privdata.PolicyStore, pp privdata.PolicyParser) error // NewConfigEventer creates a ConfigProcessor which the configtx.Manager can ultimately route config updates to NewConfigEventer() ConfigProcessor // InitializeChannel allocates the state provider and should be invoked once per channel per execution - InitializeChannel(chainID string, committer committer.Committer, endpoints []string) + InitializeChannel(chainID string, committer committer.Committer, store privdata2.TransientStore, endpoints []string) // GetBlock returns block for given chain GetBlock(chainID string, index uint64) *common.Block // AddPayload appends message payload to for given chain @@ -70,6 +75,7 @@ func (*deliveryFactoryImpl) Service(g GossipService, endpoints []string, mcs api type gossipServiceImpl struct { gossipSvc + coordinators map[string]privdata2.Coordinator chains map[string]state.GossipStateProvider leaderElection map[string]election.LeaderElectionService deliveryService deliverclient.DeliverService @@ -140,6 +146,7 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string gossipServiceInstance = &gossipServiceImpl{ mcs: mcs, gossipSvc: gossip, + coordinators: make(map[string]privdata2.Coordinator), chains: make(map[string]state.GossipStateProvider), leaderElection: make(map[string]election.LeaderElectionService), deliveryFactory: factory, @@ -156,19 +163,31 @@ func GetGossipService() GossipService { return gossipServiceInstance } +func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, privData []byte, ps privdata.PolicyStore, pp privdata.PolicyParser) error { + g.lock.RLock() + coord, exists := g.coordinators[chainID] + g.lock.RUnlock() + if !exists { + return errors.Errorf("No coordinator for %s", chainID) + } + return coord.Distribute(privData, txID, ps, pp) +} + // NewConfigEventer creates a ConfigProcessor which the configtx.Manager can ultimately route config updates to func (g *gossipServiceImpl) NewConfigEventer() ConfigProcessor { return newConfigEventer(g) } // InitializeChannel allocates the state provider and should be invoked once per channel per execution -func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committer.Committer, endpoints []string) { +func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committer.Committer, store privdata2.TransientStore, endpoints []string) { g.lock.Lock() defer g.lock.Unlock() // Initialize new state provider for given committer logger.Debug("Creating state provider for chainID", chainID) servicesAdapater := &state.ServicesMediator{GossipAdapter: g, MCSAdapter: g.mcs} - g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapater, committer) + coordinator := privdata2.NewCoordinator(committer, store) + g.coordinators[chainID] = coordinator + g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapater, coordinator) if g.deliveryService == nil { var err error g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance, endpoints, g.mcs) @@ -251,14 +270,15 @@ func (g *gossipServiceImpl) AddPayload(chainID string, payload *proto.Payload) e func (g *gossipServiceImpl) Stop() { g.lock.Lock() defer g.lock.Unlock() - for _, ch := range g.chains { - logger.Info("Stopping chain", ch) - ch.Stop() - } - for chainID, electionService := range g.leaderElection { - logger.Infof("Stopping leader election for %s", chainID) - electionService.Stop() + for chainID := range g.chains { + logger.Info("Stopping chain", chainID) + if le, exists := g.leaderElection[chainID]; exists { + logger.Infof("Stopping leader election for %s", chainID) + le.Stop() + } + g.chains[chainID].Stop() + g.coordinators[chainID].Close() } g.gossipSvc.Stop() if g.deliveryService != nil { diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go index 98d598bfb1b..628fffaf8b1 100644 --- a/gossip/service/gossip_service_test.go +++ b/gossip/service/gossip_service_test.go @@ -19,11 +19,13 @@ import ( "github.com/hyperledger/fabric/core/deliverservice" "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/transientstore" "github.com/hyperledger/fabric/gossip/api" gossipCommon "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/election" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/identity" + "github.com/hyperledger/fabric/gossip/privdata" "github.com/hyperledger/fabric/gossip/state" "github.com/hyperledger/fabric/gossip/util" "github.com/hyperledger/fabric/msp/mgmt" @@ -41,6 +43,17 @@ func init() { util.SetupTestLogging() } +type mockTransientStore struct { +} + +func (*mockTransientStore) Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults []byte) error { + panic("implement me") +} + +func (*mockTransientStore) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*transientstore.EndorserPvtSimulationResults, error) { + panic("implement me") +} + func TestInitGossipService(t *testing.T) { // Test whenever gossip service is indeed singleton grpcServer := grpc.NewServer() @@ -117,7 +130,7 @@ func TestLeaderElectionWithDeliverClient(t *testing.T) { gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory deliverServiceFactory.service.running[channelName] = false - gossips[i].InitializeChannel(channelName, &mockLedgerInfo{1}, []string{"localhost:5005"}) + gossips[i].InitializeChannel(channelName, &mockLedgerInfo{1}, &mockTransientStore{}, []string{"localhost:5005"}) service, exist := gossips[i].(*gossipServiceImpl).leaderElection[channelName] assert.True(t, exist, "Leader election service should be created for peer %d and channel %s", i, channelName) services[i] = &electionService{nil, false, 0} @@ -173,7 +186,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) { for i := 0; i < n; i++ { gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory deliverServiceFactory.service.running[channelName] = false - gossips[i].InitializeChannel(channelName, &mockLedgerInfo{1}, []string{"localhost:5005"}) + gossips[i].InitializeChannel(channelName, &mockLedgerInfo{1}, &mockTransientStore{}, []string{"localhost:5005"}) } for i := 0; i < n; i++ { @@ -184,7 +197,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) { channelName = "chanB" for i := 0; i < n; i++ { deliverServiceFactory.service.running[channelName] = false - gossips[i].InitializeChannel(channelName, &mockLedgerInfo{1}, []string{"localhost:5005"}) + gossips[i].InitializeChannel(channelName, &mockLedgerInfo{1}, &mockTransientStore{}, []string{"localhost:5005"}) } for i := 0; i < n; i++ { @@ -221,7 +234,7 @@ func TestWithStaticDeliverClientNotLeader(t *testing.T) { for i := 0; i < n; i++ { gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory deliverServiceFactory.service.running[channelName] = false - gossips[i].InitializeChannel(channelName, &mockLedgerInfo{1}, []string{"localhost:5005"}) + gossips[i].InitializeChannel(channelName, &mockLedgerInfo{1}, &mockTransientStore{}, []string{"localhost:5005"}) } for i := 0; i < n; i++ { @@ -258,7 +271,7 @@ func TestWithStaticDeliverClientBothStaticAndLeaderElection(t *testing.T) { for i := 0; i < n; i++ { gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory assert.Panics(t, func() { - gossips[i].InitializeChannel(channelName, &mockLedgerInfo{1}, []string{"localhost:5005"}) + gossips[i].InitializeChannel(channelName, &mockLedgerInfo{1}, &mockTransientStore{}, []string{"localhost:5005"}) }, "Dynamic leader lection based and static connection to ordering service can't exist simultaniosly") } @@ -618,6 +631,7 @@ func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) Gos gossipSvc: gossip, chains: make(map[string]state.GossipStateProvider), leaderElection: make(map[string]election.LeaderElectionService), + coordinators: make(map[string]privdata.Coordinator), deliveryFactory: &deliveryFactoryImpl{}, idMapper: idMapper, peerIdentity: api.PeerIdentityType(conf.InternalEndpoint), diff --git a/gossip/service/integration_test.go b/gossip/service/integration_test.go index 8e92764f551..1534631aad7 100644 --- a/gossip/service/integration_test.go +++ b/gossip/service/integration_test.go @@ -14,6 +14,7 @@ import ( "github.com/hyperledger/fabric/core/deliverservice" "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" + "github.com/hyperledger/fabric/core/transientstore" "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/election" "github.com/hyperledger/fabric/gossip/identity" @@ -22,6 +23,17 @@ import ( "github.com/stretchr/testify/assert" ) +type transientStoreMock struct { +} + +func (*transientStoreMock) Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults []byte) error { + panic("implement me") +} + +func (transientStoreMock) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*transientstore.EndorserPvtSimulationResults, error) { + panic("implement me") +} + type embeddingDeliveryService struct { deliverclient.DeliverService startSignal sync.WaitGroup @@ -102,7 +114,7 @@ func TestLeaderYield(t *testing.T) { secAdv: &secAdvMock{}, } gossipServiceInstance = gs - gs.InitializeChannel(channelName, &mockLedgerInfo{1}, []string{"localhost:7050"}) + gs.InitializeChannel(channelName, &mockLedgerInfo{1}, &transientStoreMock{}, []string{"localhost:7050"}) return gs } diff --git a/gossip/state/coordinator.go b/gossip/state/coordinator.go deleted file mode 100644 index 3b5f5ef0576..00000000000 --- a/gossip/state/coordinator.go +++ /dev/null @@ -1,158 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package state - -import ( - "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/core/committer" - "github.com/hyperledger/fabric/core/ledger" - "github.com/hyperledger/fabric/protos/common" - "github.com/hyperledger/fabric/protos/gossip" - "github.com/hyperledger/fabric/protos/ledger/rwset" - "github.com/pkg/errors" -) - -// PvtDataCollections data type to encapsulate collections -// of private data -type PvtDataCollections []*ledger.TxPvtData - -// Marshal encodes private collection into bytes array -func (pvt *PvtDataCollections) Marshal() ([][]byte, error) { - pvtDataBytes := make([][]byte, 0) - for index, each := range *pvt { - if each == nil { - err := errors.Errorf("Mallformed private data payload, rwset index %d is nil", index) - logger.Errorf("%+v", err) - return nil, err - } - pvtBytes, err := proto.Marshal(each.WriteSet) - if err != nil { - err = errors.Wrapf(err, "Could not marshal private rwset index %d", index) - logger.Errorf("%+v", err) - return nil, err - } - // Compose gossip protobuf message with private rwset + index of transaction in the block - txSeqInBlock := each.SeqInBlock - pvtDataPayload := &gossip.PvtDataPayload{TxSeqInBlock: txSeqInBlock, Payload: pvtBytes} - payloadBytes, err := proto.Marshal(pvtDataPayload) - if err != nil { - err = errors.Wrapf(err, "Could not marshal private payload with transaction index %d", txSeqInBlock) - logger.Errorf("%+v", err) - return nil, err - } - - pvtDataBytes = append(pvtDataBytes, payloadBytes) - } - return pvtDataBytes, nil -} - -// Unmarshal read and unmarshal collection of private data -// from given bytes array -func (pvt *PvtDataCollections) Unmarshal(data [][]byte) error { - for _, each := range data { - payload := &gossip.PvtDataPayload{} - if err := proto.Unmarshal(each, payload); err != nil { - return errors.WithStack(err) - } - pvtRWSet := &rwset.TxPvtReadWriteSet{} - if err := proto.Unmarshal(payload.Payload, pvtRWSet); err != nil { - return errors.WithStack(err) - } - *pvt = append(*pvt, &ledger.TxPvtData{ - SeqInBlock: payload.TxSeqInBlock, - WriteSet: pvtRWSet, - }) - } - - return nil -} - -// Coordinator orchestrates the flow of the new -// blocks arrival and in flight transient data, responsible -// to complete missing parts of transient data for given block. -type Coordinator interface { - // StoreBlock deliver new block with underlined private data - // returns missing transaction ids - StoreBlock(block *common.Block, data PvtDataCollections) ([]string, error) - - // GetPvtDataAndBlockByNum get block by number and returns also all related private data - // the order of private data in slice of PvtDataCollections doesn't implies the order of - // transactions in the block related to these private data, to get the correct placement - // need to read TxPvtData.SeqInBlock field - GetPvtDataAndBlockByNum(seqNum uint64) (*common.Block, PvtDataCollections, error) - - // GetBlockByNum returns block and related to the block private data - GetBlockByNum(seqNum uint64) (*common.Block, error) - - // Get recent block sequence number - LedgerHeight() (uint64, error) - - // Close coordinator, shuts down coordinator service - Close() -} - -type coordinator struct { - committer.Committer -} - -// NewCoordinator creates a new instance of coordinator -func NewCoordinator(committer committer.Committer) Coordinator { - return &coordinator{Committer: committer} -} - -// StoreBlock stores block with private data into the ledger -func (c *coordinator) StoreBlock(block *common.Block, data PvtDataCollections) ([]string, error) { - // Need to check whenever there are missing private rwset - if len(data) > 0 { - blockAndPvtData := &ledger.BlockAndPvtData{ - Block: block, - BlockPvtData: make(map[uint64]*ledger.TxPvtData), - } - - // Fill private data map with payloads - for _, item := range data { - blockAndPvtData.BlockPvtData[item.SeqInBlock] = item - } - - // commit block and private data - return nil, c.CommitWithPvtData(blockAndPvtData) - } - // TODO: Since this could be a very first time - // block arrives from ordering service we need - // to check whenever there is private data available - // for this block in transient store and - // if no private data, we can use simple API - return nil, c.Commit(block) -} - -// GetPvtDataAndBlockByNum get block by number and returns also all related private data -// the order of private data in slice of PvtDataCollections doesn't implies the order of -// transactions in the block related to these private data, to get the correct placement -// need to read TxPvtData.SeqInBlock field -func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64) (*common.Block, PvtDataCollections, error) { - blockAndPvtData, err := c.Committer.GetPvtDataAndBlockByNum(seqNum) - if err != nil { - return nil, nil, errors.Wrapf(err, "Cannot retreive block number %d", seqNum) - } - - var blockPvtData PvtDataCollections - - for _, item := range blockAndPvtData.BlockPvtData { - blockPvtData = append(blockPvtData, item) - } - - return blockAndPvtData.Block, blockPvtData, nil -} - -// GetBlockByNum returns block by sequence number -func (c *coordinator) GetBlockByNum(seqNum uint64) (*common.Block, error) { - blocks := c.GetBlocks([]uint64{seqNum}) - if len(blocks) == 0 { - return nil, errors.Errorf("Cannot retreive block number %d", seqNum) - } - return blocks[0], nil -} diff --git a/gossip/state/state.go b/gossip/state/state.go index 0ec5a83c68c..8414203dd82 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -13,7 +13,6 @@ import ( "time" pb "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" common2 "github.com/hyperledger/fabric/gossip/common" @@ -90,6 +89,28 @@ type MCSAdapter interface { VerifyByChannel(chainID common2.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error } +// ledgerResources defines abilities that the ledger provides +type ledgerResources interface { + // StoreBlock deliver new block with underlined private data + // returns missing transaction ids + StoreBlock(block *common.Block, data util.PvtDataCollections) ([]string, error) + + // GetPvtDataAndBlockByNum get block by number and returns also all related private data + // the order of private data in slice of PvtDataCollections doesn't implies the order of + // transactions in the block related to these private data, to get the correct placement + // need to read TxPvtData.SeqInBlock field + GetPvtDataAndBlockByNum(seqNum uint64) (*common.Block, util.PvtDataCollections, error) + + // GetBlockByNum returns block and related to the block private data + GetBlockByNum(seqNum uint64) (*common.Block, error) + + // Get recent block sequence number + LedgerHeight() (uint64, error) + + // Close ledgerResources + Close() +} + // ServicesMediator aggregated adapter to compound all mediator // required by state transfer into single struct type ServicesMediator struct { @@ -115,7 +136,7 @@ type GossipStateProviderImpl struct { // Queue of payloads which wasn't acquired yet payloads PayloadsBuffer - coordinator Coordinator + ledger ledgerResources stateResponseCh chan proto.ReceivedMessage @@ -138,7 +159,7 @@ func init() { // NewGossipCoordinatedStateProvider creates state provider with coordinator instance // to orchestrate arrival of private rwsets and blocks before committing them into the ledger. -func NewGossipCoordinatedStateProvider(chainID string, services *ServicesMediator, coordinator Coordinator) GossipStateProvider { +func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger ledgerResources) GossipStateProvider { logger := util.GetLogger(util.LoggingStateModule, "") @@ -171,7 +192,7 @@ func NewGossipCoordinatedStateProvider(chainID string, services *ServicesMediato // Filter message which are only relevant for nodeMetastate transfer _, commChan := services.Accept(remoteStateMsgFilter, true) - height, err := coordinator.LedgerHeight() + height, err := ledger.LedgerHeight() if height == 0 { // Panic here since this is an indication of invalid situation which should not happen in normal // code path. @@ -201,7 +222,7 @@ func NewGossipCoordinatedStateProvider(chainID string, services *ServicesMediato // Create a queue for payload received payloads: NewPayloadsBuffer(height), - coordinator: coordinator, + ledger: ledger, stateResponseCh: make(chan proto.ReceivedMessage, defChannelBufferSize), @@ -241,12 +262,6 @@ func NewGossipCoordinatedStateProvider(chainID string, services *ServicesMediato return s } -// NewGossipStateProvider creates initialized instance of gossip state provider with committer -// which is wrapped up into coordinator, kept for API compatibility -func NewGossipStateProvider(chainID string, services *ServicesMediator, committer committer.Committer) GossipStateProvider { - return NewGossipCoordinatedStateProvider(chainID, services, NewCoordinator(committer)) -} - func (s *GossipStateProviderImpl) listen() { defer s.done.Done() @@ -333,7 +348,7 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage) return } - currentHeight, err := s.coordinator.LedgerHeight() + currentHeight, err := s.ledger.LedgerHeight() if err != nil { logger.Errorf("Cannot access to current ledger height, due to %+v", errors.WithStack(err)) return @@ -348,7 +363,7 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage) response := &proto.RemoteStateResponse{Payloads: make([]*proto.Payload, 0)} for seqNum := request.StartSeqNum; seqNum <= endSeqNum; seqNum++ { logger.Debug("Reading block ", seqNum, " with private data from the coordinator service") - block, pvtData, err := s.coordinator.GetPvtDataAndBlockByNum(seqNum) + block, pvtData, err := s.ledger.GetPvtDataAndBlockByNum(seqNum) if err != nil { logger.Errorf("Wasn't able to read block with sequence number %d from ledger, "+ @@ -430,7 +445,7 @@ func (s *GossipStateProviderImpl) Stop() { // Make sure all go-routines has finished s.done.Wait() // Close all resources - s.coordinator.Close() + s.ledger.Close() close(s.stateRequestCh) close(s.stateResponseCh) close(s.stopCh) @@ -480,7 +495,7 @@ func (s *GossipStateProviderImpl) deliverPayloads() { logger.Debug("New block with claimed sequence number ", payload.SeqNum, " transactions num ", len(rawBlock.Data.Data)) // Read all private data into slice - var p PvtDataCollections + var p util.PvtDataCollections if payload.PrivateData != nil { err := p.Unmarshal(payload.PrivateData) if err != nil { @@ -511,7 +526,7 @@ func (s *GossipStateProviderImpl) antiEntropy() { s.stopCh <- struct{}{} return case <-time.After(defAntiEntropyInterval): - current, err := s.coordinator.LedgerHeight() + current, err := s.ledger.LedgerHeight() if err != nil { // Unable to read from ledger continue to the next round logger.Errorf("Cannot obtain ledger height, due to %+v", errors.WithStack(err)) @@ -664,7 +679,7 @@ func (s *GossipStateProviderImpl) hasRequiredHeight(height uint64) func(peer dis func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block { // Try to read missing block from the ledger, should return no nil with // content including at least one block - if block, err := s.coordinator.GetBlockByNum(index); block != nil && err != nil { + if block, err := s.ledger.GetBlockByNum(index); block != nil && err != nil { return block } @@ -689,7 +704,7 @@ func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMod return errors.New("Given payload is nil") } logger.Debug("Adding new payload into the buffer, seqNum = ", payload.SeqNum) - height, err := s.coordinator.LedgerHeight() + height, err := s.ledger.LedgerHeight() if err != nil { return errors.Wrap(err, "Failed obtaining ledger height") } @@ -705,10 +720,10 @@ func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMod return s.payloads.Push(payload) } -func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData PvtDataCollections) error { +func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData util.PvtDataCollections) error { // Commit block with available private transactions - if _, err := s.coordinator.StoreBlock(block, pvtData); err != nil { + if _, err := s.ledger.StoreBlock(block, pvtData); err != nil { logger.Errorf("Got error while committing(%+v)", errors.WithStack(err)) return err } diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index 6ae377d1f76..c5e1e628966 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -23,12 +23,14 @@ import ( "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/mocks/validator" + "github.com/hyperledger/fabric/core/transientstore" "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/identity" + "github.com/hyperledger/fabric/gossip/privdata" "github.com/hyperledger/fabric/gossip/state/mocks" gutil "github.com/hyperledger/fabric/gossip/util" pcomm "github.com/hyperledger/fabric/protos/common" @@ -159,14 +161,28 @@ func (node *peerNode) shutdown() { node.g.Stop() } +type mockTransientStore struct { +} + +func (*mockTransientStore) Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults []byte) error { + panic("implement me") +} + +func (mockTransientStore) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*transientstore.EndorserPvtSimulationResults, error) { + panic("implement me") +} + type mockCommitter struct { mock.Mock sync.Mutex } func (mc *mockCommitter) CommitWithPvtData(blockAndPvtData *ledger.BlockAndPvtData) error { - args := mc.Called(blockAndPvtData) - return args.Error(0) + mc.Lock() + m := mc.Mock + mc.Unlock() + m.Called(blockAndPvtData.Block) + return nil } func (mc *mockCommitter) GetPvtDataAndBlockByNum(seqNum uint64) (*ledger.BlockAndPvtData, error) { @@ -174,14 +190,6 @@ func (mc *mockCommitter) GetPvtDataAndBlockByNum(seqNum uint64) (*ledger.BlockAn return args.Get(0).(*ledger.BlockAndPvtData), args.Error(1) } -func (mc *mockCommitter) Commit(block *pcomm.Block) error { - mc.Lock() - m := mc.Mock - mc.Unlock() - m.Called(block) - return nil -} - func (mc *mockCommitter) LedgerHeight() (uint64, error) { mc.Lock() m := mc.Mock @@ -253,7 +261,7 @@ func newPeerNodeWithGossip(config *gossip.Config, committer committer.Committer, // basic parts servicesAdapater := &ServicesMediator{GossipAdapter: g, MCSAdapter: cs} - sp := NewGossipStateProvider(util.GetTestChainID(), servicesAdapater, committer) + sp := NewGossipStateProvider(util.GetTestChainID(), servicesAdapater, privdata.NewCoordinator(committer, &mockTransientStore{})) if sp == nil { return nil } @@ -336,7 +344,7 @@ func TestOverPopulation(t *testing.T) { mc := &mockCommitter{} blocksPassedToLedger := make(chan uint64, 10) - mc.On("Commit", mock.Anything).Run(func(arg mock.Arguments) { + mc.On("CommitWithPvtData", mock.Anything).Run(func(arg mock.Arguments) { blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number }) mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil) @@ -398,7 +406,7 @@ func TestBlockingEnqueue(t *testing.T) { // The blocks we get from gossip are random indices, to maximize disruption. mc := &mockCommitter{} blocksPassedToLedger := make(chan uint64, 10) - mc.On("Commit", mock.Anything).Run(func(arg mock.Arguments) { + mc.On("CommitWithPvtData", mock.Anything).Run(func(arg mock.Arguments) { blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number }) mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil) @@ -445,7 +453,7 @@ func TestBlockingEnqueue(t *testing.T) { receivedBlockCount++ m := mock.Mock{} m.On("LedgerHeight", mock.Anything).Return(receivedBlock, nil) - m.On("Commit", mock.Anything).Run(func(arg mock.Arguments) { + m.On("CommitWithPvtData", mock.Anything).Run(func(arg mock.Arguments) { blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number }) mc.Lock() @@ -528,7 +536,7 @@ func TestGossipReception(t *testing.T) { g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{}) mc := &mockCommitter{} receivedChan := make(chan struct{}) - mc.On("Commit", mock.Anything).Run(func(arguments mock.Arguments) { + mc.On("CommitWithPvtData", mock.Anything).Run(func(arguments mock.Arguments) { block := arguments.Get(0).(*pcomm.Block) assert.Equal(t, uint64(1), block.Header.Number) receivedChan <- struct{}{} @@ -957,12 +965,13 @@ func TestNewGossipStateProvider_BatchingOfStateRequest(t *testing.T) { // coordinatorMock mocking structure to capture mock interface for // coord to simulate coord flow during the test type coordinatorMock struct { + committer.Committer mock.Mock } -func (mock *coordinatorMock) GetPvtDataAndBlockByNum(seqNum uint64) (*pcomm.Block, PvtDataCollections, error) { +func (mock *coordinatorMock) GetPvtDataAndBlockByNum(seqNum uint64) (*pcomm.Block, gutil.PvtDataCollections, error) { args := mock.Called(seqNum) - return args.Get(0).(*pcomm.Block), args.Get(1).(PvtDataCollections), args.Error(2) + return args.Get(0).(*pcomm.Block), args.Get(1).(gutil.PvtDataCollections), args.Error(2) } func (mock *coordinatorMock) GetBlockByNum(seqNum uint64) (*pcomm.Block, error) { @@ -970,7 +979,7 @@ func (mock *coordinatorMock) GetBlockByNum(seqNum uint64) (*pcomm.Block, error) return args.Get(0).(*pcomm.Block), args.Error(1) } -func (mock *coordinatorMock) StoreBlock(block *pcomm.Block, data PvtDataCollections) ([]string, error) { +func (mock *coordinatorMock) StoreBlock(block *pcomm.Block, data gutil.PvtDataCollections) ([]string, error) { args := mock.Called(block, data) return args.Get(0).([]string), args.Error(1) } @@ -1009,7 +1018,7 @@ func (mock *receivedMessageMock) GetConnectionInfo() *proto.ConnectionInfo { type testData struct { block *pcomm.Block - pvtData PvtDataCollections + pvtData gutil.PvtDataCollections } func TestTransferOfPrivateRWSet(t *testing.T) { @@ -1051,7 +1060,7 @@ func TestTransferOfPrivateRWSet(t *testing.T) { Data: [][]byte{{1}, {2}, {3}}, }, }, - pvtData: PvtDataCollections{ + pvtData: gutil.PvtDataCollections{ { SeqInBlock: uint64(0), WriteSet: &rwset.TxPvtReadWriteSet{ @@ -1083,7 +1092,7 @@ func TestTransferOfPrivateRWSet(t *testing.T) { Data: [][]byte{{4}, {5}, {6}}, }, }, - pvtData: PvtDataCollections{ + pvtData: gutil.PvtDataCollections{ { SeqInBlock: uint64(2), WriteSet: &rwset.TxPvtReadWriteSet{ @@ -1112,7 +1121,7 @@ func TestTransferOfPrivateRWSet(t *testing.T) { coord1.On("Close") servicesAdapater := &ServicesMediator{GossipAdapter: g, MCSAdapter: &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor}} - st := NewGossipCoordinatedStateProvider(chainID, servicesAdapater, coord1) + st := NewGossipStateProvider(chainID, servicesAdapater, coord1) defer st.Stop() // Mocked state request message @@ -1265,7 +1274,7 @@ func TestTransferOfPvtDataBetweenPeers(t *testing.T) { Data: &pcomm.BlockData{ Data: [][]byte{{1}, {2}, {3}}, }, - }, PvtDataCollections{}, nil) + }, gutil.PvtDataCollections{}, nil) peers["peer1"].coord.On("GetPvtDataAndBlockByNum", uint64(3)).Return(&pcomm.Block{ Header: &pcomm.BlockHeader{ @@ -1276,7 +1285,7 @@ func TestTransferOfPvtDataBetweenPeers(t *testing.T) { Data: &pcomm.BlockData{ Data: [][]byte{{4}, {5}, {6}}, }, - }, PvtDataCollections{&ledger.TxPvtData{ + }, gutil.PvtDataCollections{&ledger.TxPvtData{ SeqInBlock: uint64(1), WriteSet: &rwset.TxPvtReadWriteSet{ DataModel: rwset.TxReadWriteSet_KV, @@ -1345,11 +1354,11 @@ func TestTransferOfPvtDataBetweenPeers(t *testing.T) { cryptoService := &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor} mediator := &ServicesMediator{GossipAdapter: peers["peer1"], MCSAdapter: cryptoService} - peer1State := NewGossipCoordinatedStateProvider(chainID, mediator, peers["peer1"].coord) + peer1State := NewGossipStateProvider(chainID, mediator, peers["peer1"].coord) defer peer1State.Stop() mediator = &ServicesMediator{GossipAdapter: peers["peer2"], MCSAdapter: cryptoService} - peer2State := NewGossipCoordinatedStateProvider(chainID, mediator, peers["peer2"].coord) + peer2State := NewGossipStateProvider(chainID, mediator, peers["peer2"].coord) defer peer2State.Stop() // Make sure state was replicated diff --git a/gossip/util/logging.go b/gossip/util/logging.go index cc889023d78..6a3d01cbc7f 100644 --- a/gossip/util/logging.go +++ b/gossip/util/logging.go @@ -24,6 +24,7 @@ const ( LoggingPullModule = "gossip/pull" LoggingServiceModule = "gossip/service" LoggingStateModule = "gossip/state" + LoggingPrivModule = "gossip/privdata" ) var loggersByModules = make(map[string]*logging.Logger) diff --git a/gossip/util/privdata.go b/gossip/util/privdata.go new file mode 100644 index 00000000000..28b655ef591 --- /dev/null +++ b/gossip/util/privdata.go @@ -0,0 +1,69 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package util + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/protos/gossip" + "github.com/hyperledger/fabric/protos/ledger/rwset" + "github.com/pkg/errors" +) + +// PvtDataCollections data type to encapsulate collections +// of private data +type PvtDataCollections []*ledger.TxPvtData + +// Marshal encodes private collection into bytes array +func (pvt *PvtDataCollections) Marshal() ([][]byte, error) { + pvtDataBytes := make([][]byte, 0) + for index, each := range *pvt { + if each == nil { + errMsg := fmt.Sprintf("Mallformed private data payload, rwset index %d is nil", index) + return nil, errors.New(errMsg) + } + pvtBytes, err := proto.Marshal(each.WriteSet) + if err != nil { + errMsg := fmt.Sprintf("Could not marshal private rwset index %d, due to %s", index, err) + return nil, errors.New(errMsg) + } + // Compose gossip protobuf message with private rwset + index of transaction in the block + txSeqInBlock := each.SeqInBlock + pvtDataPayload := &gossip.PvtDataPayload{TxSeqInBlock: txSeqInBlock, Payload: pvtBytes} + payloadBytes, err := proto.Marshal(pvtDataPayload) + if err != nil { + errMsg := fmt.Sprintf("Could not marshal private payload with transaction index %d, due to %s", txSeqInBlock, err) + return nil, errors.New(errMsg) + } + + pvtDataBytes = append(pvtDataBytes, payloadBytes) + } + return pvtDataBytes, nil +} + +// Unmarshal read and unmarshal collection of private data +// from given bytes array +func (pvt *PvtDataCollections) Unmarshal(data [][]byte) error { + for _, each := range data { + payload := &gossip.PvtDataPayload{} + if err := proto.Unmarshal(each, payload); err != nil { + return err + } + pvtRWSet := &rwset.TxPvtReadWriteSet{} + if err := proto.Unmarshal(payload.Payload, pvtRWSet); err != nil { + return err + } + *pvt = append(*pvt, &ledger.TxPvtData{ + SeqInBlock: payload.TxSeqInBlock, + WriteSet: pvtRWSet, + }) + } + + return nil +} diff --git a/peer/node/start.go b/peer/node/start.go index b9d4d4e5be7..47f55214689 100644 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -169,7 +169,11 @@ func serve(args []string) error { // Register the Admin server pb.RegisterAdminServer(peerServer.Server(), core.NewAdminServer()) - serverEndorser := endorser.NewEndorserServer() + privDataDist := func(channel string, txID string, privateData []byte) error { + return service.GetGossipService().DistributePrivateData(channel, txID, privateData, nil, nil) + } + + serverEndorser := endorser.NewEndorserServer(privDataDist) libConf := library.Config{ AuthFilterFactory: viper.GetString("peer.handlers.authFilter"), DecoratorFactory: viper.GetString("peer.handlers.decorator"),