Skip to content

Commit

Permalink
[FAB-7224] Enhance custom tx processor
Browse files Browse the repository at this point in the history
This CR enhances custom transaction processor in ledger such that
the processor is able to distinguish whether it is being invoked
during ledger initialization or for on-line tx processing.

In the former situation, the processor needs to just rely on the
states stored previously in the ledger and in the latter it can assume that
the peer is up and running and hence other peer structures can be used.

Other than this, in the initialization situation only valid transactions are
passed to the processor which makes the custom validation in the processor
optional.

Change-Id: Idff54ab124dd588dfa5172a5b882e521217ca62a
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Dec 6, 2017
1 parent 3974f15 commit adf7475
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 66 deletions.
4 changes: 2 additions & 2 deletions core/aclmgmt/aclmgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ var aclMgmtCfgTxProcessor = &AclMgmtConfigTxProcessor{}

//GenerateSimulationResults this is just a proxy to delegate registered aclProvider.
//Need this as aclmgmt is initialized with ledger initialization as required by ledger
func (*AclMgmtConfigTxProcessor) GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator) error {
func (*AclMgmtConfigTxProcessor) GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator, initializingLedger bool) error {
configtxLock.RLock()
defer configtxLock.RUnlock()

//this should not be nil (aclProvider is initialized at the outset to either
//rscc or default)
if aclProvider != nil {
return aclProvider.GenerateSimulationResults(txEnvelop, simulator)
return aclProvider.GenerateSimulationResults(txEnvelop, simulator, initializingLedger)
}

return fmt.Errorf("warning! call to handle config tx before setting ACL provider")
Expand Down
4 changes: 2 additions & 2 deletions core/aclmgmt/aclmgmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func registerACLProvider() *mocks.MockACLProvider {

func TestACLProcessor(t *testing.T) {
reinit()
assert.NotNil(t, GetConfigTxProcessor().GenerateSimulationResults(nil, nil), "Expected non-nil error")
assert.NotNil(t, GetConfigTxProcessor().GenerateSimulationResults(nil, nil, false), "Expected non-nil error")
RegisterACLProvider(nil)
assert.Nil(t, GetConfigTxProcessor().GenerateSimulationResults(nil, nil), "Expected nil error")
assert.Nil(t, GetConfigTxProcessor().GenerateSimulationResults(nil, nil, false), "Expected nil error")
}

func TestPanicOnUnregistered(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions core/aclmgmt/aclmgmtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func newACLMgmt(r ACLProvider) ACLProvider {
return &aclMgmtImpl{aclOverrides: make(map[string]aclMethod)}
}

func (am *aclMgmtImpl) GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator) error {
func (am *aclMgmtImpl) GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator, initializingLedger bool) error {
if rscc == nil {
panic("-----RegisterACLProvider not called ----")
}

return rscc.GenerateSimulationResults(txEnvelop, simulator)
return rscc.GenerateSimulationResults(txEnvelop, simulator, initializingLedger)
}
2 changes: 1 addition & 1 deletion core/aclmgmt/defaultaclprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,6 @@ func (d *defaultACLProvider) CheckACL(resName string, channelID string, idinfo i

//GenerateSimulationResults does nothing for default provider currently as it defaults to
//1.0 behavior
func (d *defaultACLProvider) GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator) error {
func (d *defaultACLProvider) GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator, initializingLedger bool) error {
return nil
}
2 changes: 1 addition & 1 deletion core/aclmgmt/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (m *MockACLProvider) CheckACL(resName string, channelID string, idinfo inte
return args.Error(0)
}

func (m *MockACLProvider) GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator) error {
func (m *MockACLProvider) GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator, initializingLedger bool) error {
return nil
}

Expand Down
10 changes: 0 additions & 10 deletions core/ledger/customtx/custom_tx_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package customtx
import (
"sync"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/common"
)

Expand All @@ -19,15 +18,6 @@ var once sync.Once
// Processors maintains the association between a custom transaction type to its corresponding tx processor
type Processors map[common.HeaderType]Processor

// Processor allows to generate simulation results during commit time for custom transactions.
// A custom processor may represent the information in a propriety fashion and can use this process to translate
// the information into the form of `TxSimulationResults`. Because, the original information is signed in a
// custom representation, an implementation of a `Processor` should be cautious that the custom representation
// is used for simulation in an deterministic fashion and should take care of compatibility cross fabric versions.
type Processor interface {
GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator) error
}

// Initialize sets the custom processors. This function is expected to be invoked only during ledgermgmt.Initialize() function.
func Initialize(customTxProcessors Processors) {
once.Do(func() {
Expand Down
35 changes: 35 additions & 0 deletions core/ledger/customtx/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package customtx

import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/common"
)

// InvalidTxError is expected to be thrown by a custom transaction processor (an implementation of interface `Processor`)
// if it wants the ledger to record a particular transaction as invalid
type InvalidTxError struct {
Msg string
}

func (e *InvalidTxError) Error() string {
return e.Msg
}

// Processor allows to generate simulation results during commit time for custom transactions.
// A custom processor may represent the information in a propriety fashion and can use this process to translate
// the information into the form of `TxSimulationResults`. Because, the original information is signed in a
// custom representation, an implementation of a `Processor` should be cautious that the custom representation
// is used for simulation in an deterministic fashion and should take care of compatibility cross fabric versions.
// 'initializingLedger' true indicates that either the transaction being processed is from the genesis block or the ledger is
// synching the state (which could happen during peer startup if the statedb is found to be lagging behind the blockchain).
// In the former case, the transactions processed are expected to be valid and in the latter case, only valid transactions
// are reprocessed and hence any validation can be skipped.
type Processor interface {
GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator, initializingLedger bool) error
}
12 changes: 12 additions & 0 deletions core/ledger/customtx/test_export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package customtx

// InitializeTestEnv initializes custom tx processors for test
func InitializeTestEnv(processors Processors) {
initialize(processors)
}
91 changes: 67 additions & 24 deletions core/ledger/kvledger/custom_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,35 @@ package kvledger
import (
"testing"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/protos/peer"

"github.com/hyperledger/fabric/common/ledger/testutil"
lgrutil "github.com/hyperledger/fabric/core/ledger/util"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/customtx"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/stretchr/testify/assert"
)

type customTxProcessor struct {
namespace string
initData map[string][]byte
}

func (ctp *customTxProcessor) GenerateSimulationResults(
txEnvelop *common.Envelope, simulator ledger.TxSimulator) error {
for k, v := range ctp.initData {
simulator.SetState(ctp.namespace, k, []byte(v))
func (ctp *customTxProcessor) GenerateSimulationResults(txEnvelop *common.Envelope, simulator ledger.TxSimulator, initializingLedger bool) error {
payload := utils.UnmarshalPayloadOrPanic(txEnvelop.Payload)
chHdr, _ := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
chainid := chHdr.ChannelId
kvw := &kvrwset.KVWrite{}
if err := proto.Unmarshal(payload.Data, kvw); err != nil {
return err
}
return nil
if len(kvw.Key) == 0 {
return &customtx.InvalidTxError{Msg: "Nil key"}
}
return simulator.SetState(chainid, kvw.Key, kvw.Value)
}

func TestCustomProcessor(t *testing.T) {
Expand All @@ -35,27 +46,59 @@ func TestCustomProcessor(t *testing.T) {
provider, _ := NewProvider()
defer provider.Close()

// create a custom tx processor and register it to handle 'common.HeaderType_CONFIG' type of transaction
testNamespace := "TestNamespace"
testKVs := map[string][]byte{"one": []byte("1"), "two": []byte("2")}
customtx.Initialize(customtx.Processors{
common.HeaderType_CONFIG: &customTxProcessor{
testNamespace,
testKVs,
}})
// create a custom tx processor and register it to handle '100 and 101' type of transaction
chainid := "testLedger"
customTxProcessor := &customTxProcessor{}
customtx.InitializeTestEnv(customtx.Processors{
100: customTxProcessor,
101: customTxProcessor})

// Create a genesis block with a common.HeaderType_CONFIG transaction
_, gb := testutil.NewBlockGenerator(t, "testLedger", false)
ledger, err := provider.Create(gb)
defer ledger.Close()
_, gb := testutil.NewBlockGenerator(t, chainid, false)
lgr, err := provider.Create(gb)
defer lgr.Close()
assert.NoError(t, err)

// commit a block with three custom trans
tx1 := createCustomTx(t, 100, chainid, "custom_key1", "value1")
tx2 := createCustomTx(t, 101, chainid, "custom_key2", "value2")
tx3 := createCustomTx(t, 101, chainid, "", "")
blk1 := testutil.NewBlock([]*common.Envelope{tx1, tx2, tx3}, 1, gb.Header.Hash())
assert.NoError(t, lgr.CommitWithPvtData(&ledger.BlockAndPvtData{Block: blk1}))
// verify that the state changes caused by the custom processor took place during ledger creation
txSim, err := ledger.NewTxSimulator("testTxid")
qe, err := lgr.NewQueryExecutor()
assert.NoError(t, err)
for k, v := range testKVs {
value, err := txSim.GetState(testNamespace, k)
assert.NoError(t, err)
assert.Equal(t, v, value)
}
val, err := qe.GetState(chainid, "custom_key1")
assert.NoError(t, err)
assert.Equal(t, "value1", string(val))

val, err = qe.GetState(chainid, "custom_key2")
assert.NoError(t, err)
assert.Equal(t, "value2", string(val))
qe.Done()

blockPersisted, err := lgr.GetBlockByNumber(1)
assert.NoError(t, err)
var txFilter lgrutil.TxValidationFlags
txFilter = blockPersisted.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]
assert.Equal(t, peer.TxValidationCode_VALID, txFilter.Flag(0))
assert.Equal(t, peer.TxValidationCode_VALID, txFilter.Flag(1))
assert.Equal(t, peer.TxValidationCode_INVALID_OTHER_REASON, txFilter.Flag(2))

tx4 := createCustomTx(t, 100, chainid, "custom_key4", "value4")
blk2 := testutil.NewBlock([]*common.Envelope{tx4}, 2, blk1.Header.Hash())
assert.NoError(t, lgr.CommitWithPvtData(&ledger.BlockAndPvtData{Block: blk2}))
qe, err = lgr.NewQueryExecutor()
assert.NoError(t, err)
val, err = qe.GetState(chainid, "custom_key4")
qe.Done()
assert.NoError(t, err)
assert.Equal(t, "value4", string(val))
}

func createCustomTx(t *testing.T, txType common.HeaderType, chainid, key, val string) *common.Envelope {
kvWrite := &kvrwset.KVWrite{Key: key, Value: []byte(val)}
txEnv, err := utils.CreateSignedEnvelope(txType, chainid, nil, kvWrite, 0, 0)
assert.NoError(t, err)
return txEnv
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (impl *DefaultImpl) ValidateAndPrepareBatch(blockAndPvtdata *ledger.BlockAn
var err error

logger.Debug("preprocessing ProtoBlock...")
if internalBlock, err = preprocessProtoBlock(impl.txmgr, block); err != nil {
if internalBlock, err = preprocessProtoBlock(impl.txmgr, block, doMVCCValidation); err != nil {
return nil, err
}

Expand Down
16 changes: 10 additions & 6 deletions core/ledger/kvledger/txmgmt/validator/valimpl/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func validatePvtdata(tx *valinternal.Transaction, pvtdata *ledger.TxPvtData) err

// preprocessProtoBlock parses the proto instance of block into 'Block' structure.
// The retuned 'Block' structure contains only transactions that are endorser transactions and are not alredy marked as invalid
func preprocessProtoBlock(txmgr txmgr.TxMgr, block *common.Block) (*valinternal.Block, error) {
func preprocessProtoBlock(txmgr txmgr.TxMgr, block *common.Block, doMVCCValidation bool) (*valinternal.Block, error) {
b := &valinternal.Block{Num: block.Header.Number}
// Committer validator has already set validation flags based on well formed tran checks
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
Expand Down Expand Up @@ -135,7 +135,11 @@ func preprocessProtoBlock(txmgr txmgr.TxMgr, block *common.Block) (*valinternal.
continue
}
} else {
rwsetProto, err := processNonEndorserTx(env, chdr.TxId, txType, txmgr)
rwsetProto, err := processNonEndorserTx(env, chdr.TxId, txType, txmgr, !doMVCCValidation)
if _, ok := err.(*customtx.InvalidTxError); ok {
txsFilter.SetFlag(txIndex, peer.TxValidationCode_INVALID_OTHER_REASON)
continue
}
if err != nil {
return nil, err
}
Expand All @@ -152,22 +156,22 @@ func preprocessProtoBlock(txmgr txmgr.TxMgr, block *common.Block) (*valinternal.
return b, nil
}

func processNonEndorserTx(txEnv *common.Envelope, txid string, txType common.HeaderType, txmgr txmgr.TxMgr) (*rwset.TxReadWriteSet, error) {
func processNonEndorserTx(txEnv *common.Envelope, txid string, txType common.HeaderType, txmgr txmgr.TxMgr, synchingState bool) (*rwset.TxReadWriteSet, error) {
logger.Debugf("Performing custom processing for transaction [txid=%s], [txType=%s]", txid, txType)
processor := customtx.GetProcessor(txType)
logger.Debug("Processor for custom tx processing:%#v", processor)
logger.Debugf("Processor for custom tx processing:%#v", processor)
if processor == nil {
return nil, nil
}

var err error
var sim ledger.TxSimulator
var simRes *ledger.TxSimulationResults

if sim, err = txmgr.NewTxSimulator(txid); err != nil {
return nil, err
}
if err = processor.GenerateSimulationResults(txEnv, sim); err != nil {
defer sim.Done()
if err = processor.GenerateSimulationResults(txEnv, sim, synchingState); err != nil {
return nil, err
}
if simRes, err = sim.GetTxSimulationResults(); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions core/ledger/kvledger/txmgmt/validator/valimpl/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ func TestPreprocessProtoBlock(t *testing.T) {
// good block
//_, gb := testutil.NewBlockGenerator(t, "testLedger", false)
gb := testutil.ConstructTestBlock(t, 10, 1, 1)
_, err := preprocessProtoBlock(nil, gb)
_, err := preprocessProtoBlock(nil, gb, false)
assert.NoError(t, err)
// bad envelope
gb = testutil.ConstructTestBlock(t, 11, 1, 1)
gb.Data = &common.BlockData{Data: [][]byte{[]byte{123}}}
gb.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] =
lutils.NewTxValidationFlags(len(gb.Data.Data))
_, err = preprocessProtoBlock(nil, gb)
_, err = preprocessProtoBlock(nil, gb, false)
assert.Error(t, err)
t.Log(err)
// bad payload
gb = testutil.ConstructTestBlock(t, 12, 1, 1)
envBytes, _ := putils.GetBytesEnvelope(&common.Envelope{Payload: []byte{123}})
gb.Data = &common.BlockData{Data: [][]byte{envBytes}}
_, err = preprocessProtoBlock(nil, gb)
_, err = preprocessProtoBlock(nil, gb, false)
assert.Error(t, err)
t.Log(err)
// bad channel header
Expand All @@ -49,7 +49,7 @@ func TestPreprocessProtoBlock(t *testing.T) {
})
envBytes, _ = putils.GetBytesEnvelope(&common.Envelope{Payload: payloadBytes})
gb.Data = &common.BlockData{Data: [][]byte{envBytes}}
_, err = preprocessProtoBlock(nil, gb)
_, err = preprocessProtoBlock(nil, gb, false)
assert.Error(t, err)
t.Log(err)

Expand All @@ -63,7 +63,7 @@ func TestPreprocessProtoBlock(t *testing.T) {
flags := lutils.NewTxValidationFlags(len(gb.Data.Data))
flags.SetFlag(0, peer.TxValidationCode_BAD_CHANNEL_HEADER)
gb.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = flags
_, err = preprocessProtoBlock(nil, gb)
_, err = preprocessProtoBlock(nil, gb, false)
assert.NoError(t, err) // invalid filter should take precendence

// new block
Expand All @@ -77,7 +77,7 @@ func TestPreprocessProtoBlock(t *testing.T) {
// set logging backend for test
backend := logging.NewMemoryBackend(1)
logging.SetBackend(backend)
_, err = preprocessProtoBlock(nil, gb)
_, err = preprocessProtoBlock(nil, gb, false)
assert.NoError(t, err)
expected := fmt.Sprintf("Channel [%s]: Block [%d] Transaction index [%d] TxId [%s]"+
" marked as invalid by committer. Reason code [%s]",
Expand Down
9 changes: 9 additions & 0 deletions core/ledger/ledgermgmt/ledger_mgmt_test_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package ledgermgmt
import (
"os"

"github.com/hyperledger/fabric/core/ledger/customtx"

"github.com/hyperledger/fabric/core/ledger/ledgerconfig"

"fmt"
Expand All @@ -30,6 +32,13 @@ func InitializeTestEnv() {
initialize(nil)
}

// InitializeTestEnvWithCustomProcessors initializes ledgermgmt for tests with the supplied custom tx processors
func InitializeTestEnvWithCustomProcessors(customTxProcessors customtx.Processors) {
remove()
customtx.InitializeTestEnv(customTxProcessors)
initialize(customTxProcessors)
}

// CleanupTestEnv closes the ledgermagmt and removes the store directory
func CleanupTestEnv() {
Close()
Expand Down
2 changes: 1 addition & 1 deletion core/scc/rscc/rscc.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (rscc *Rscc) CheckACL(resName string, channelID string, idinfo interface{})

//GenerateSimulationResults called to add config state. Currently only handles "join" requests.
//Note that this is just a ledger hook and does not modify RSCC data structures
func (rscc *Rscc) GenerateSimulationResults(txEnv *common.Envelope, sim ledger.TxSimulator) error {
func (rscc *Rscc) GenerateSimulationResults(txEnv *common.Envelope, sim ledger.TxSimulator, initializingLedger bool) error {
//should never happen, but check anyway
if txEnv == nil || sim == nil {
return fmt.Errorf("nil parameters")
Expand Down
Loading

0 comments on commit adf7475

Please sign in to comment.