@@ -17,11 +17,9 @@ limitations under the License.
1717package blockcutter
1818
1919import (
20- "github.com/hyperledger/fabric/orderer/common/broadcastfilter"
21- "github.com/hyperledger/fabric/orderer/common/configtx"
20+ "github.com/hyperledger/fabric/orderer/common/filter"
2221 cb "github.com/hyperledger/fabric/protos/common"
2322
24- "github.com/golang/protobuf/proto"
2523 "github.com/op/go-logging"
2624)
2725
@@ -34,98 +32,79 @@ func init() {
3432// Receiver defines a sink for the ordered broadcast messages
3533type Receiver interface {
3634 // Ordered should be invoked sequentially as messages are ordered
37- // If the message is a valid normal message and does not fill the batch, nil, true is returned
38- // If the message is a valid normal message and fills a batch, the batch, true is returned
35+ // If the message is a valid normal message and does not fill the batch, nil, nil, true is returned
36+ // If the message is a valid normal message and fills a batch, the batch, committers, true is returned
3937 // If the message is a valid special message (like a config message) it terminates the current batch
40- // and returns the current batch (if it is not empty), plus a second batch containing the special transaction and true
41- // If the ordered message is determined to be invalid, then nil, false is returned
42- Ordered (msg * cb.Envelope ) ([][]* cb.Envelope , bool )
38+ // and returns the current batch and committers (if it is not empty), plus a second batch containing the special transaction and commiter, and true
39+ // If the ordered message is determined to be invalid, then nil, nil, false is returned
40+ Ordered (msg * cb.Envelope ) ([][]* cb.Envelope , [][]filter. Committer , bool )
4341
4442 // Cut returns the current batch and starts a new one
45- Cut () []* cb.Envelope
43+ Cut () ( []* cb.Envelope , []filter. Committer )
4644}
4745
4846type receiver struct {
49- batchSize int
50- filters * broadcastfilter .RuleSet
51- configManager configtx. Manager
52- curBatch [] * cb. Envelope
47+ batchSize int
48+ filters * filter .RuleSet
49+ curBatch [] * cb. Envelope
50+ batchComs []filter. Committer
5351}
5452
5553// NewReceiverImpl creates a Receiver implementation based on the given batchsize, filters, and configtx manager
56- func NewReceiverImpl (batchSize int , filters * broadcastfilter .RuleSet , configManager configtx. Manager ) Receiver {
54+ func NewReceiverImpl (batchSize int , filters * filter .RuleSet ) Receiver {
5755 return & receiver {
58- batchSize : batchSize ,
59- filters : filters ,
60- configManager : configManager ,
56+ batchSize : batchSize ,
57+ filters : filters ,
6158 }
6259}
6360
6461// Ordered should be invoked sequentially as messages are ordered
65- // If the message is a valid normal message and does not fill the batch, nil, true is returned
66- // If the message is a valid normal message and fills a batch, the batch, true is returned
62+ // If the message is a valid normal message and does not fill the batch, nil, nil, true is returned
63+ // If the message is a valid normal message and fills a batch, the batch, committers, true is returned
6764// If the message is a valid special message (like a config message) it terminates the current batch
68- // and returns the current batch (if it is not empty), plus a second batch containing the special transaction and true
69- // If the ordered message is determined to be invalid, then nil, false is returned
70- func (r * receiver ) Ordered (msg * cb.Envelope ) ([][]* cb.Envelope , bool ) {
65+ // and returns the current batch and committers (if it is not empty), plus a second batch containing the special transaction and commiter, and true
66+ // If the ordered message is determined to be invalid, then nil, nil, false is returned
67+ func (r * receiver ) Ordered (msg * cb.Envelope ) ([][]* cb.Envelope , [][]filter. Committer , bool ) {
7168 // The messages must be filtered a second time in case configuration has changed since the message was received
72- action , _ := r .filters .Apply (msg )
73- switch action {
74- case broadcastfilter .Accept :
75- logger .Debugf ("Enqueuing message into batch" )
76- r .curBatch = append (r .curBatch , msg )
77-
78- if len (r .curBatch ) < r .batchSize {
79- return nil , true
80- }
81-
82- logger .Debugf ("Batch size met, creating block" )
83- newBatch := r .curBatch
84- r .curBatch = nil
85- return [][]* cb.Envelope {newBatch }, true
86- case broadcastfilter .Reconfigure :
87- // TODO, this is unmarshaling for a second time, we need a cleaner interface, maybe Apply returns a second arg with thing to put in the batch
88- payload := & cb.Payload {}
89- if err := proto .Unmarshal (msg .Payload , payload ); err != nil {
90- logger .Errorf ("A change was flagged as configuration, but could not be unmarshaled: %v" , err )
91- return nil , false
92- }
93- newConfig := & cb.ConfigurationEnvelope {}
94- if err := proto .Unmarshal (payload .Data , newConfig ); err != nil {
95- logger .Errorf ("A change was flagged as configuration, but could not be unmarshaled: %v" , err )
96- return nil , false
97- }
98- err := r .configManager .Validate (newConfig )
99- if err != nil {
100- logger .Warningf ("A configuration change made it through the ingress filter but could not be included in a batch: %v" , err )
101- return nil , false
102- }
69+ committer , err := r .filters .Apply (msg )
70+ if err != nil {
71+ logger .Debugf ("Rejecting message: %s" , err )
72+ return nil , nil , false
73+ }
10374
104- logger .Debugf ("Configuration change applied successfully, committing previous block and configuration block" )
75+ if committer .Isolated () {
76+ logger .Debugf ("Found message which requested to be isolated, cutting into its own block" )
10577 firstBatch := r .curBatch
10678 r .curBatch = nil
79+ firstComs := r .batchComs
80+ r .batchComs = nil
10781 secondBatch := []* cb.Envelope {msg }
10882 if firstBatch == nil {
109- return [][]* cb.Envelope {secondBatch }, true
83+ return [][]* cb.Envelope {secondBatch }, [][]filter. Committer {[]filter. Committer { committer }}, true
11084 }
111- return [][]* cb.Envelope {firstBatch , secondBatch }, true
112- case broadcastfilter .Reject :
113- logger .Debugf ("Rejecting message" )
114- return nil , false
115- case broadcastfilter .Forward :
116- logger .Debugf ("Ignoring message because it was not accepted by a filter" )
117- return nil , false
118- default :
119- logger .Fatalf ("Received an unknown rule response: %v" , action )
85+ return [][]* cb.Envelope {firstBatch , secondBatch }, [][]filter.Committer {firstComs , []filter.Committer {committer }}, true
12086 }
12187
122- return nil , false // Unreachable
88+ logger .Debugf ("Enqueuing message into batch" )
89+ r .curBatch = append (r .curBatch , msg )
90+ r .batchComs = append (r .batchComs , committer )
12391
92+ if len (r .curBatch ) < r .batchSize {
93+ return nil , nil , true
94+ }
95+
96+ logger .Debugf ("Batch size met, creating block" )
97+ newBatch := r .curBatch
98+ newComs := r .batchComs
99+ r .curBatch = nil
100+ return [][]* cb.Envelope {newBatch }, [][]filter.Committer {newComs }, true
124101}
125102
126103// Cut returns the current batch and starts a new one
127- func (r * receiver ) Cut () []* cb.Envelope {
104+ func (r * receiver ) Cut () ( []* cb.Envelope , []filter. Committer ) {
128105 batch := r .curBatch
129106 r .curBatch = nil
130- return batch
107+ committers := r .batchComs
108+ r .batchComs = nil
109+ return batch , committers
131110}
0 commit comments