Skip to content

Commit

Permalink
[FAB-5262] Rm committer from ProcessConfigMsg
Browse files Browse the repository at this point in the history
To simplify the removal of the filter.Committer from the blockcutter, it
was pushed into the ProcessConfigMsg method, so that the consenter was
still able to supply a committer to the WriteBlock method.

However, the filter.Committer concept is going away, so this must be
removed from the consensus code.  This pushes it one level closer to
WriteBlock to facilitate its ultimate removal.

Change-Id: Icff52bb2b8a9fad96d41487147400abd149ab2f1
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Jul 19, 2017
1 parent dc51be1 commit 30e20a7
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 44 deletions.
11 changes: 5 additions & 6 deletions orderer/kafka/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/orderer/common/filter"
localconfig "github.com/hyperledger/fabric/orderer/localconfig"
"github.com/hyperledger/fabric/orderer/multichain"
cb "github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -382,16 +381,16 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support multichain.C
batch := support.BlockCutter().Cut()
if batch != nil {
block := support.CreateNextBlock(batch)
support.WriteBlock(block, nil, nil)
support.WriteBlock(block, nil)
}

committer, _, err := support.ProcessNormalMsg(env)
_, err := support.ProcessNormalMsg(env)
if err != nil {
logger.Warningf("[channel: %s] Discarding bad config message: %s", support.ChainID(), err)
break
}
block := support.CreateNextBlock([]*cb.Envelope{env})
support.WriteBlock(block, []filter.Committer{committer}, nil)
support.WriteConfigBlock(block, nil)
*timer = nil
case multichain.NormalMsg:
batches, ok := support.BlockCutter().Ordered(env)
Expand All @@ -409,7 +408,7 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support multichain.C
offset := receivedOffset - int64(len(batches)-i-1)
block := support.CreateNextBlock(batch)
encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: offset})
support.WriteBlock(block, nil, encodedLastOffsetPersisted)
support.WriteBlock(block, encodedLastOffsetPersisted)
*lastCutBlockNumber++
logger.Debugf("[channel: %s] Batch filled, just cut block %d - last persisted offset is now %d", support.ChainID(), *lastCutBlockNumber, offset)
}
Expand All @@ -435,7 +434,7 @@ func processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, support multichain.C
}
block := support.CreateNextBlock(batch)
encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: receivedOffset})
support.WriteBlock(block, nil, encodedLastOffsetPersisted)
support.WriteBlock(block, encodedLastOffsetPersisted)
*lastCutBlockNumber++
logger.Debugf("[channel: %s] Proper time-to-cut received, just cut block %d", support.ChainID(), *lastCutBlockNumber)
return nil
Expand Down
13 changes: 8 additions & 5 deletions orderer/mocks/multichain/multichain.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/hyperledger/fabric/common/config"
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/filter"
mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/blockcutter"
"github.com/hyperledger/fabric/orderer/multichain"
cb "github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -94,8 +93,7 @@ func (mcs *ConsenterSupport) CreateNextBlock(data []*cb.Envelope) *cb.Block {
}

// WriteBlock writes data to the Blocks channel
// Note that _committers is ignored by this mock implementation
func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, _committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
}
Expand All @@ -104,6 +102,11 @@ func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, _committers []filter.Co
return block
}

// WriteConfigBlock calls WriteBlock
func (mcs *ConsenterSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
return mcs.WriteBlock(block, encodedMetadataValue)
}

// ChainID returns the chain ID this specific consenter instance is associated with
func (mcs *ConsenterSupport) ChainID() string {
return mcs.ChainIDVal
Expand All @@ -130,8 +133,8 @@ func (mcs *ConsenterSupport) ClassifyMsg(env *cb.Envelope) (multichain.MsgClassi
}

// ProcessNormalMsg returns ConfigSeqVal, ProcessNormalMsgErr
func (mcs *ConsenterSupport) ProcessNormalMsg(env *cb.Envelope) (committer filter.Committer, configSeq uint64, err error) {
return nil, mcs.ConfigSeqVal, mcs.ProcessNormalMsgErr
func (mcs *ConsenterSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
return mcs.ConfigSeqVal, mcs.ProcessNormalMsgErr
}

// ProcessConfigUpdateMsg returns ProcessConfigUpdateMsgVal, ConfigSeqVal, ProcessConfigUpdateMsgErr
Expand Down
24 changes: 17 additions & 7 deletions orderer/multichain/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ type ConsenterSupport interface {
BlockCutter() blockcutter.Receiver
SharedConfig() config.Orderer
CreateNextBlock(messages []*cb.Envelope) *cb.Block
WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block
WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block
ChainID() string // ChainID returns the chain ID this specific consenter instance is associated with
Height() uint64 // Returns the number of blocks on the chain this specific consenter instance is associated with
}
Expand All @@ -102,7 +103,7 @@ type MsgProcessor interface {

// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
ProcessNormalMsg(env *cb.Envelope) (committer filter.Committer, configSeq uint64, err error)
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)

// ProcessConfigUpdateMsg will attempt to apply the config impetus msg to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config impetus message
Expand Down Expand Up @@ -260,9 +261,9 @@ func (cs *chainSupport) ClassifyMsg(env *cb.Envelope) (MsgClassification, error)

// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
func (cs *chainSupport) ProcessNormalMsg(env *cb.Envelope) (committer filter.Committer, configSeq uint64, err error) {
func (cs *chainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
configSeq = cs.Sequence()
committer, err = cs.filters.Apply(env)
_, err = cs.filters.Apply(env)
return
}

Expand Down Expand Up @@ -336,10 +337,19 @@ func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
})
}

func (cs *chainSupport) WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
for _, committer := range committers {
committer.Commit()
func (cs *chainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// XXX This hacky path is temporary and will be removed by the end of this change series
// The panics here are just fine
committer, err := cs.filters.Apply(utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]))
if err != nil {
logger.Panicf("Config should have already been validated")
}
committer.Commit()

return cs.WriteBlock(block, encodedMetadataValue)
}

func (cs *chainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// Set the orderer-related metadata field
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
Expand Down
29 changes: 14 additions & 15 deletions orderer/multichain/chainsupport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,16 @@ func (mc *mockCommitter) Commit() {
func TestCommitConfig(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &chainSupport{
ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml},
filters: filter.NewRuleSet([]filter.Rule{filter.AcceptRule}),
signer: mockCrypto(),
}
assert.Equal(t, uint64(0), cs.Height(), "Should has height of 0")

txs := []*cb.Envelope{makeNormalTx("foo", 0), makeNormalTx("bar", 1)}
committers := []filter.Committer{&mockCommitter{}, &mockCommitter{}}
block := cs.CreateNextBlock(txs)
cs.WriteBlock(block, committers, nil)
cs.WriteConfigBlock(block, nil)
assert.Equal(t, uint64(1), cs.Height(), "Should has height of 1")

blockTXs := make([]*cb.Envelope, len(ml.data))
Expand All @@ -81,18 +84,14 @@ func TestCommitConfig(t *testing.T) {
}

assert.Equal(t, txs, blockTXs, "Should have written input data to ledger but did not")

for _, c := range committers {
assert.EqualValues(t, 1, c.(*mockCommitter).committed, "Expected exactly 1 commits but got %d", c.(*mockCommitter).committed)
}
}

func TestWriteBlockSignatures(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}

actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil, nil), cb.BlockMetadataIndex_SIGNATURES)
actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil), cb.BlockMetadataIndex_SIGNATURES)
assert.NotNil(t, actual, "Block should have block signature")
}

Expand All @@ -103,7 +102,7 @@ func TestWriteBlockOrdererMetadata(t *testing.T) {

value := []byte("foo")
expected := &cb.Metadata{Value: value}
actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil, value), cb.BlockMetadataIndex_ORDERER)
actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), value), cb.BlockMetadataIndex_ORDERER)
assert.NotNil(t, actual, "Block should have orderer metadata written")
assert.True(t, proto.Equal(expected, actual), "Orderer metadata not written to block correctly")
}
Expand All @@ -128,28 +127,28 @@ func TestWriteLastConfig(t *testing.T) {
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}

expected := uint64(0)
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil, nil))
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil))
assert.Equal(t, expected, lc, "First block should have config block index of %d, but got %d", expected, lc)
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(1, nil), nil, nil))
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(1, nil), nil))
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)

cm.SequenceVal = 1
expected = uint64(2)
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(2, nil), nil, nil))
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(2, nil), nil))
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)

lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(3, nil), nil, nil))
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(3, nil), nil))
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)

t.Run("ResetChainSupport", func(t *testing.T) {
cm.SequenceVal = 2
expected = uint64(4)

cs = &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(4, nil), nil, nil))
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(4, nil), nil))
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)

lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(5, nil), nil, nil))
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(5, nil), nil))
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d")
})
}
9 changes: 4 additions & 5 deletions orderer/multichain/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/configtx/tool/provisional"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/filter"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
)
Expand Down Expand Up @@ -76,21 +75,21 @@ func (mch *mockChain) Start() {
batch := mch.support.BlockCutter().Cut()
if batch != nil {
block := mch.support.CreateNextBlock(batch)
mch.support.WriteBlock(block, nil, nil)
mch.support.WriteBlock(block, nil)
}

committer, _, err := mch.support.ProcessNormalMsg(msg)
_, err := mch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad config message: %s", err)
continue
}
block := mch.support.CreateNextBlock([]*cb.Envelope{msg})
mch.support.WriteBlock(block, []filter.Committer{committer}, nil)
mch.support.WriteConfigBlock(block, nil)
case NormalMsg:
batches, _ := mch.support.BlockCutter().Ordered(msg)
for _, batch := range batches {
block := mch.support.CreateNextBlock(batch)
mch.support.WriteBlock(block, nil, nil)
mch.support.WriteBlock(block, nil)
}
default:
logger.Panicf("Unsupported msg classification: %v", class)
Expand Down
11 changes: 5 additions & 6 deletions orderer/solo/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package solo
import (
"time"

"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/multichain"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/op/go-logging"
Expand Down Expand Up @@ -98,16 +97,16 @@ func (ch *chain) main() {
batch := ch.support.BlockCutter().Cut()
if batch != nil {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil, nil)
ch.support.WriteBlock(block, nil)
}

committer, _, err := ch.support.ProcessNormalMsg(msg)
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad config message: %s", err)
continue
}
block := ch.support.CreateNextBlock([]*cb.Envelope{msg})
ch.support.WriteBlock(block, []filter.Committer{committer}, nil)
ch.support.WriteConfigBlock(block, nil)
timer = nil
case multichain.NormalMsg:
batches, ok := ch.support.BlockCutter().Ordered(msg)
Expand All @@ -117,7 +116,7 @@ func (ch *chain) main() {
}
for _, batch := range batches {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil, nil)
ch.support.WriteBlock(block, nil)
}
if len(batches) > 0 {
timer = nil
Expand All @@ -136,7 +135,7 @@ func (ch *chain) main() {
}
logger.Debugf("Batch timer expired, creating block")
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil, nil)
ch.support.WriteBlock(block, nil)
case <-ch.exitChan:
logger.Debugf("Exiting")
return
Expand Down

0 comments on commit 30e20a7

Please sign in to comment.