Skip to content

Commit 30e20a7

Browse files
author
Jason Yellick
committed
[FAB-5262] Rm committer from ProcessConfigMsg
To simplify the removal of the filter.Committer from the blockcutter, it was pushed into the ProcessConfigMsg method, so that the consenter was still able to supply a committer to the WriteBlock method. However, the filter.Committer concept is going away, so this must be removed from the consensus code. This pushes it one level closer to WriteBlock to facilitate its ultimate removal. Change-Id: Icff52bb2b8a9fad96d41487147400abd149ab2f1 Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
1 parent dc51be1 commit 30e20a7

File tree

6 files changed

+53
-44
lines changed

6 files changed

+53
-44
lines changed

orderer/kafka/chain.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313

1414
"github.com/Shopify/sarama"
1515
"github.com/golang/protobuf/proto"
16-
"github.com/hyperledger/fabric/orderer/common/filter"
1716
localconfig "github.com/hyperledger/fabric/orderer/localconfig"
1817
"github.com/hyperledger/fabric/orderer/multichain"
1918
cb "github.com/hyperledger/fabric/protos/common"
@@ -382,16 +381,16 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support multichain.C
382381
batch := support.BlockCutter().Cut()
383382
if batch != nil {
384383
block := support.CreateNextBlock(batch)
385-
support.WriteBlock(block, nil, nil)
384+
support.WriteBlock(block, nil)
386385
}
387386

388-
committer, _, err := support.ProcessNormalMsg(env)
387+
_, err := support.ProcessNormalMsg(env)
389388
if err != nil {
390389
logger.Warningf("[channel: %s] Discarding bad config message: %s", support.ChainID(), err)
391390
break
392391
}
393392
block := support.CreateNextBlock([]*cb.Envelope{env})
394-
support.WriteBlock(block, []filter.Committer{committer}, nil)
393+
support.WriteConfigBlock(block, nil)
395394
*timer = nil
396395
case multichain.NormalMsg:
397396
batches, ok := support.BlockCutter().Ordered(env)
@@ -409,7 +408,7 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support multichain.C
409408
offset := receivedOffset - int64(len(batches)-i-1)
410409
block := support.CreateNextBlock(batch)
411410
encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: offset})
412-
support.WriteBlock(block, nil, encodedLastOffsetPersisted)
411+
support.WriteBlock(block, encodedLastOffsetPersisted)
413412
*lastCutBlockNumber++
414413
logger.Debugf("[channel: %s] Batch filled, just cut block %d - last persisted offset is now %d", support.ChainID(), *lastCutBlockNumber, offset)
415414
}
@@ -435,7 +434,7 @@ func processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, support multichain.C
435434
}
436435
block := support.CreateNextBlock(batch)
437436
encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: receivedOffset})
438-
support.WriteBlock(block, nil, encodedLastOffsetPersisted)
437+
support.WriteBlock(block, encodedLastOffsetPersisted)
439438
*lastCutBlockNumber++
440439
logger.Debugf("[channel: %s] Proper time-to-cut received, just cut block %d", support.ChainID(), *lastCutBlockNumber)
441440
return nil

orderer/mocks/multichain/multichain.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/hyperledger/fabric/common/config"
2121
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
2222
"github.com/hyperledger/fabric/orderer/common/blockcutter"
23-
"github.com/hyperledger/fabric/orderer/common/filter"
2423
mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/blockcutter"
2524
"github.com/hyperledger/fabric/orderer/multichain"
2625
cb "github.com/hyperledger/fabric/protos/common"
@@ -94,8 +93,7 @@ func (mcs *ConsenterSupport) CreateNextBlock(data []*cb.Envelope) *cb.Block {
9493
}
9594

9695
// WriteBlock writes data to the Blocks channel
97-
// Note that _committers is ignored by this mock implementation
98-
func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, _committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
96+
func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
9997
if encodedMetadataValue != nil {
10098
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
10199
}
@@ -104,6 +102,11 @@ func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, _committers []filter.Co
104102
return block
105103
}
106104

105+
// WriteConfigBlock calls WriteBlock
106+
func (mcs *ConsenterSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
107+
return mcs.WriteBlock(block, encodedMetadataValue)
108+
}
109+
107110
// ChainID returns the chain ID this specific consenter instance is associated with
108111
func (mcs *ConsenterSupport) ChainID() string {
109112
return mcs.ChainIDVal
@@ -130,8 +133,8 @@ func (mcs *ConsenterSupport) ClassifyMsg(env *cb.Envelope) (multichain.MsgClassi
130133
}
131134

132135
// ProcessNormalMsg returns ConfigSeqVal, ProcessNormalMsgErr
133-
func (mcs *ConsenterSupport) ProcessNormalMsg(env *cb.Envelope) (committer filter.Committer, configSeq uint64, err error) {
134-
return nil, mcs.ConfigSeqVal, mcs.ProcessNormalMsgErr
136+
func (mcs *ConsenterSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
137+
return mcs.ConfigSeqVal, mcs.ProcessNormalMsgErr
135138
}
136139

137140
// ProcessConfigUpdateMsg returns ProcessConfigUpdateMsgVal, ConfigSeqVal, ProcessConfigUpdateMsgErr

orderer/multichain/chainsupport.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ type ConsenterSupport interface {
9090
BlockCutter() blockcutter.Receiver
9191
SharedConfig() config.Orderer
9292
CreateNextBlock(messages []*cb.Envelope) *cb.Block
93-
WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block
93+
WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block
94+
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block
9495
ChainID() string // ChainID returns the chain ID this specific consenter instance is associated with
9596
Height() uint64 // Returns the number of blocks on the chain this specific consenter instance is associated with
9697
}
@@ -102,7 +103,7 @@ type MsgProcessor interface {
102103

103104
// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
104105
// configuration sequence number and nil on success, or an error if the message is not valid
105-
ProcessNormalMsg(env *cb.Envelope) (committer filter.Committer, configSeq uint64, err error)
106+
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)
106107

107108
// ProcessConfigUpdateMsg will attempt to apply the config impetus msg to the current configuration, and if successful
108109
// return the resulting config message and the configSeq the config was computed from. If the config impetus message
@@ -260,9 +261,9 @@ func (cs *chainSupport) ClassifyMsg(env *cb.Envelope) (MsgClassification, error)
260261

261262
// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
262263
// configuration sequence number and nil on success, or an error if the message is not valid
263-
func (cs *chainSupport) ProcessNormalMsg(env *cb.Envelope) (committer filter.Committer, configSeq uint64, err error) {
264+
func (cs *chainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
264265
configSeq = cs.Sequence()
265-
committer, err = cs.filters.Apply(env)
266+
_, err = cs.filters.Apply(env)
266267
return
267268
}
268269

@@ -336,10 +337,19 @@ func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
336337
})
337338
}
338339

339-
func (cs *chainSupport) WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
340-
for _, committer := range committers {
341-
committer.Commit()
340+
func (cs *chainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
341+
// XXX This hacky path is temporary and will be removed by the end of this change series
342+
// The panics here are just fine
343+
committer, err := cs.filters.Apply(utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]))
344+
if err != nil {
345+
logger.Panicf("Config should have already been validated")
342346
}
347+
committer.Commit()
348+
349+
return cs.WriteBlock(block, encodedMetadataValue)
350+
}
351+
352+
func (cs *chainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
343353
// Set the orderer-related metadata field
344354
if encodedMetadataValue != nil {
345355
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})

orderer/multichain/chainsupport_test.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,16 @@ func (mc *mockCommitter) Commit() {
6666
func TestCommitConfig(t *testing.T) {
6767
ml := &mockLedgerReadWriter{}
6868
cm := &mockconfigtx.Manager{}
69-
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
69+
cs := &chainSupport{
70+
ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml},
71+
filters: filter.NewRuleSet([]filter.Rule{filter.AcceptRule}),
72+
signer: mockCrypto(),
73+
}
7074
assert.Equal(t, uint64(0), cs.Height(), "Should has height of 0")
7175

7276
txs := []*cb.Envelope{makeNormalTx("foo", 0), makeNormalTx("bar", 1)}
73-
committers := []filter.Committer{&mockCommitter{}, &mockCommitter{}}
7477
block := cs.CreateNextBlock(txs)
75-
cs.WriteBlock(block, committers, nil)
78+
cs.WriteConfigBlock(block, nil)
7679
assert.Equal(t, uint64(1), cs.Height(), "Should has height of 1")
7780

7881
blockTXs := make([]*cb.Envelope, len(ml.data))
@@ -81,18 +84,14 @@ func TestCommitConfig(t *testing.T) {
8184
}
8285

8386
assert.Equal(t, txs, blockTXs, "Should have written input data to ledger but did not")
84-
85-
for _, c := range committers {
86-
assert.EqualValues(t, 1, c.(*mockCommitter).committed, "Expected exactly 1 commits but got %d", c.(*mockCommitter).committed)
87-
}
8887
}
8988

9089
func TestWriteBlockSignatures(t *testing.T) {
9190
ml := &mockLedgerReadWriter{}
9291
cm := &mockconfigtx.Manager{}
9392
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
9493

95-
actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil, nil), cb.BlockMetadataIndex_SIGNATURES)
94+
actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil), cb.BlockMetadataIndex_SIGNATURES)
9695
assert.NotNil(t, actual, "Block should have block signature")
9796
}
9897

@@ -103,7 +102,7 @@ func TestWriteBlockOrdererMetadata(t *testing.T) {
103102

104103
value := []byte("foo")
105104
expected := &cb.Metadata{Value: value}
106-
actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil, value), cb.BlockMetadataIndex_ORDERER)
105+
actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), value), cb.BlockMetadataIndex_ORDERER)
107106
assert.NotNil(t, actual, "Block should have orderer metadata written")
108107
assert.True(t, proto.Equal(expected, actual), "Orderer metadata not written to block correctly")
109108
}
@@ -128,28 +127,28 @@ func TestWriteLastConfig(t *testing.T) {
128127
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
129128

130129
expected := uint64(0)
131-
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil, nil))
130+
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil))
132131
assert.Equal(t, expected, lc, "First block should have config block index of %d, but got %d", expected, lc)
133-
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(1, nil), nil, nil))
132+
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(1, nil), nil))
134133
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)
135134

136135
cm.SequenceVal = 1
137136
expected = uint64(2)
138-
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(2, nil), nil, nil))
137+
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(2, nil), nil))
139138
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)
140139

141-
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(3, nil), nil, nil))
140+
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(3, nil), nil))
142141
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)
143142

144143
t.Run("ResetChainSupport", func(t *testing.T) {
145144
cm.SequenceVal = 2
146145
expected = uint64(4)
147146

148147
cs = &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
149-
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(4, nil), nil, nil))
148+
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(4, nil), nil))
150149
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)
151150

152-
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(5, nil), nil, nil))
151+
lc = utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(5, nil), nil))
153152
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d")
154153
})
155154
}

orderer/multichain/util_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/hyperledger/fabric/common/configtx"
2424
"github.com/hyperledger/fabric/common/configtx/tool/provisional"
2525
"github.com/hyperledger/fabric/orderer/common/blockcutter"
26-
"github.com/hyperledger/fabric/orderer/common/filter"
2726
cb "github.com/hyperledger/fabric/protos/common"
2827
"github.com/hyperledger/fabric/protos/utils"
2928
)
@@ -76,21 +75,21 @@ func (mch *mockChain) Start() {
7675
batch := mch.support.BlockCutter().Cut()
7776
if batch != nil {
7877
block := mch.support.CreateNextBlock(batch)
79-
mch.support.WriteBlock(block, nil, nil)
78+
mch.support.WriteBlock(block, nil)
8079
}
8180

82-
committer, _, err := mch.support.ProcessNormalMsg(msg)
81+
_, err := mch.support.ProcessNormalMsg(msg)
8382
if err != nil {
8483
logger.Warningf("Discarding bad config message: %s", err)
8584
continue
8685
}
8786
block := mch.support.CreateNextBlock([]*cb.Envelope{msg})
88-
mch.support.WriteBlock(block, []filter.Committer{committer}, nil)
87+
mch.support.WriteConfigBlock(block, nil)
8988
case NormalMsg:
9089
batches, _ := mch.support.BlockCutter().Ordered(msg)
9190
for _, batch := range batches {
9291
block := mch.support.CreateNextBlock(batch)
93-
mch.support.WriteBlock(block, nil, nil)
92+
mch.support.WriteBlock(block, nil)
9493
}
9594
default:
9695
logger.Panicf("Unsupported msg classification: %v", class)

orderer/solo/consensus.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package solo
1919
import (
2020
"time"
2121

22-
"github.com/hyperledger/fabric/orderer/common/filter"
2322
"github.com/hyperledger/fabric/orderer/multichain"
2423
cb "github.com/hyperledger/fabric/protos/common"
2524
"github.com/op/go-logging"
@@ -98,16 +97,16 @@ func (ch *chain) main() {
9897
batch := ch.support.BlockCutter().Cut()
9998
if batch != nil {
10099
block := ch.support.CreateNextBlock(batch)
101-
ch.support.WriteBlock(block, nil, nil)
100+
ch.support.WriteBlock(block, nil)
102101
}
103102

104-
committer, _, err := ch.support.ProcessNormalMsg(msg)
103+
_, err := ch.support.ProcessNormalMsg(msg)
105104
if err != nil {
106105
logger.Warningf("Discarding bad config message: %s", err)
107106
continue
108107
}
109108
block := ch.support.CreateNextBlock([]*cb.Envelope{msg})
110-
ch.support.WriteBlock(block, []filter.Committer{committer}, nil)
109+
ch.support.WriteConfigBlock(block, nil)
111110
timer = nil
112111
case multichain.NormalMsg:
113112
batches, ok := ch.support.BlockCutter().Ordered(msg)
@@ -117,7 +116,7 @@ func (ch *chain) main() {
117116
}
118117
for _, batch := range batches {
119118
block := ch.support.CreateNextBlock(batch)
120-
ch.support.WriteBlock(block, nil, nil)
119+
ch.support.WriteBlock(block, nil)
121120
}
122121
if len(batches) > 0 {
123122
timer = nil
@@ -136,7 +135,7 @@ func (ch *chain) main() {
136135
}
137136
logger.Debugf("Batch timer expired, creating block")
138137
block := ch.support.CreateNextBlock(batch)
139-
ch.support.WriteBlock(block, nil, nil)
138+
ch.support.WriteBlock(block, nil)
140139
case <-ch.exitChan:
141140
logger.Debugf("Exiting")
142141
return

0 commit comments

Comments
 (0)