From bc3b3c0efc936e2669a064152671f4476978e301 Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Mon, 28 Nov 2016 17:12:50 -0500 Subject: [PATCH] [FAB-1278] Generalize Config Execution Path For the impending chain creation work, a second type of transaction will need to be 'executed' beyond a configuration transaction. This changeset generalizes the old configuration specific code paths into a re-usable path. In particular, the broadcast filters have been generalized to be a more generic filtering mechanism. Instead of replying with the matched rule type, and then having the invoker make decisions based on the match, the filters now return a Committer which can perform those actions with no specific knowledge from the caller. The configuration transaction was being treated specially in several parts of the orderer code. This created ugly case switching and ultimately produced more code than with a more generic approach. So this changeset produces a net decrease in overall lines of code. Change-Id: Ifaa769a0303f828de1d76e0a014e47d9044756ce Signed-off-by: Jason Yellick --- orderer/common/blockcutter/blockcutter.go | 113 ++++----- .../common/blockcutter/blockcutter_test.go | 223 ++++++------------ orderer/common/broadcast/broadcast.go | 20 +- orderer/common/broadcast/broadcast_test.go | 114 +-------- orderer/common/configtx/filter.go | 37 ++- orderer/common/configtx/filter_test.go | 40 +++- .../{broadcastfilter => filter}/filter.go | 52 ++-- .../filter_test.go | 61 ++--- orderer/multichain/chainsupport.go | 76 ++---- orderer/multichain/chainsupport_mock_test.go | 19 +- orderer/multichain/chainsupport_test.go | 74 ++++-- orderer/solo/consensus.go | 10 +- orderer/solo/consensus_test.go | 97 ++++---- 13 files changed, 372 insertions(+), 564 deletions(-) rename orderer/common/{broadcastfilter => filter}/filter.go (56%) rename orderer/common/{broadcastfilter => filter}/filter_test.go (53%) diff --git a/orderer/common/blockcutter/blockcutter.go b/orderer/common/blockcutter/blockcutter.go index de4354b7c80..22a1abcbf09 100644 --- a/orderer/common/blockcutter/blockcutter.go +++ b/orderer/common/blockcutter/blockcutter.go @@ -17,11 +17,9 @@ limitations under the License. package blockcutter import ( - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" - "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/filter" cb "github.com/hyperledger/fabric/protos/common" - "github.com/golang/protobuf/proto" "github.com/op/go-logging" ) @@ -34,98 +32,79 @@ func init() { // Receiver defines a sink for the ordered broadcast messages type Receiver interface { // Ordered should be invoked sequentially as messages are ordered - // If the message is a valid normal message and does not fill the batch, nil, true is returned - // If the message is a valid normal message and fills a batch, the batch, true is returned + // If the message is a valid normal message and does not fill the batch, nil, nil, true is returned + // If the message is a valid normal message and fills a batch, the batch, committers, true is returned // If the message is a valid special message (like a config message) it terminates the current batch - // and returns the current batch (if it is not empty), plus a second batch containing the special transaction and true - // If the ordered message is determined to be invalid, then nil, false is returned - Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) + // and returns the current batch and committers (if it is not empty), plus a second batch containing the special transaction and commiter, and true + // If the ordered message is determined to be invalid, then nil, nil, false is returned + Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) // Cut returns the current batch and starts a new one - Cut() []*cb.Envelope + Cut() ([]*cb.Envelope, []filter.Committer) } type receiver struct { - batchSize int - filters *broadcastfilter.RuleSet - configManager configtx.Manager - curBatch []*cb.Envelope + batchSize int + filters *filter.RuleSet + curBatch []*cb.Envelope + batchComs []filter.Committer } // NewReceiverImpl creates a Receiver implementation based on the given batchsize, filters, and configtx manager -func NewReceiverImpl(batchSize int, filters *broadcastfilter.RuleSet, configManager configtx.Manager) Receiver { +func NewReceiverImpl(batchSize int, filters *filter.RuleSet) Receiver { return &receiver{ - batchSize: batchSize, - filters: filters, - configManager: configManager, + batchSize: batchSize, + filters: filters, } } // Ordered should be invoked sequentially as messages are ordered -// If the message is a valid normal message and does not fill the batch, nil, true is returned -// If the message is a valid normal message and fills a batch, the batch, true is returned +// If the message is a valid normal message and does not fill the batch, nil, nil, true is returned +// If the message is a valid normal message and fills a batch, the batch, committers, true is returned // If the message is a valid special message (like a config message) it terminates the current batch -// and returns the current batch (if it is not empty), plus a second batch containing the special transaction and true -// If the ordered message is determined to be invalid, then nil, false is returned -func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) { +// and returns the current batch and committers (if it is not empty), plus a second batch containing the special transaction and commiter, and true +// If the ordered message is determined to be invalid, then nil, nil, false is returned +func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) { // The messages must be filtered a second time in case configuration has changed since the message was received - action, _ := r.filters.Apply(msg) - switch action { - case broadcastfilter.Accept: - logger.Debugf("Enqueuing message into batch") - r.curBatch = append(r.curBatch, msg) - - if len(r.curBatch) < r.batchSize { - return nil, true - } - - logger.Debugf("Batch size met, creating block") - newBatch := r.curBatch - r.curBatch = nil - return [][]*cb.Envelope{newBatch}, true - case broadcastfilter.Reconfigure: - // TODO, this is unmarshaling for a second time, we need a cleaner interface, maybe Apply returns a second arg with thing to put in the batch - payload := &cb.Payload{} - if err := proto.Unmarshal(msg.Payload, payload); err != nil { - logger.Errorf("A change was flagged as configuration, but could not be unmarshaled: %v", err) - return nil, false - } - newConfig := &cb.ConfigurationEnvelope{} - if err := proto.Unmarshal(payload.Data, newConfig); err != nil { - logger.Errorf("A change was flagged as configuration, but could not be unmarshaled: %v", err) - return nil, false - } - err := r.configManager.Validate(newConfig) - if err != nil { - logger.Warningf("A configuration change made it through the ingress filter but could not be included in a batch: %v", err) - return nil, false - } + committer, err := r.filters.Apply(msg) + if err != nil { + logger.Debugf("Rejecting message: %s", err) + return nil, nil, false + } - logger.Debugf("Configuration change applied successfully, committing previous block and configuration block") + if committer.Isolated() { + logger.Debugf("Found message which requested to be isolated, cutting into its own block") firstBatch := r.curBatch r.curBatch = nil + firstComs := r.batchComs + r.batchComs = nil secondBatch := []*cb.Envelope{msg} if firstBatch == nil { - return [][]*cb.Envelope{secondBatch}, true + return [][]*cb.Envelope{secondBatch}, [][]filter.Committer{[]filter.Committer{committer}}, true } - return [][]*cb.Envelope{firstBatch, secondBatch}, true - case broadcastfilter.Reject: - logger.Debugf("Rejecting message") - return nil, false - case broadcastfilter.Forward: - logger.Debugf("Ignoring message because it was not accepted by a filter") - return nil, false - default: - logger.Fatalf("Received an unknown rule response: %v", action) + return [][]*cb.Envelope{firstBatch, secondBatch}, [][]filter.Committer{firstComs, []filter.Committer{committer}}, true } - return nil, false // Unreachable + logger.Debugf("Enqueuing message into batch") + r.curBatch = append(r.curBatch, msg) + r.batchComs = append(r.batchComs, committer) + if len(r.curBatch) < r.batchSize { + return nil, nil, true + } + + logger.Debugf("Batch size met, creating block") + newBatch := r.curBatch + newComs := r.batchComs + r.curBatch = nil + return [][]*cb.Envelope{newBatch}, [][]filter.Committer{newComs}, true } // Cut returns the current batch and starts a new one -func (r *receiver) Cut() []*cb.Envelope { +func (r *receiver) Cut() ([]*cb.Envelope, []filter.Committer) { batch := r.curBatch r.curBatch = nil - return batch + committers := r.batchComs + r.batchComs = nil + return batch, committers } diff --git a/orderer/common/blockcutter/blockcutter_test.go b/orderer/common/blockcutter/blockcutter_test.go index 426273d3be1..9e41a3259e6 100644 --- a/orderer/common/blockcutter/blockcutter_test.go +++ b/orderer/common/blockcutter/blockcutter_test.go @@ -18,98 +18,66 @@ package blockcutter import ( "bytes" - "fmt" "testing" - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" - "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/filter" cb "github.com/hyperledger/fabric/protos/common" - - "github.com/golang/protobuf/proto" ) -type mockConfigManager struct { - validated bool - validateErr error -} +type isolatedCommitter struct{} -func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error { - mcm.validated = true - return mcm.validateErr -} +func (ic isolatedCommitter) Isolated() bool { return true } -func (mcm *mockConfigManager) Apply(message *cb.ConfigurationEnvelope) error { - panic("Unimplemented") -} +func (ic isolatedCommitter) Commit() {} -func (mcm *mockConfigManager) ChainID() string { - panic("Unimplemented") -} - -type mockConfigFilter struct { - manager configtx.Manager -} +type mockIsolatedFilter struct{} -func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action { - if bytes.Equal(msg.Payload, configTx.Payload) { - if mcf.manager == nil || mcf.manager.Validate(nil) != nil { - return broadcastfilter.Reject - } - return broadcastfilter.Reconfigure +func (mif *mockIsolatedFilter) Apply(msg *cb.Envelope) (filter.Action, filter.Committer) { + if bytes.Equal(msg.Payload, isolatedTx.Payload) { + return filter.Accept, isolatedCommitter{} } - return broadcastfilter.Forward + return filter.Forward, nil } type mockRejectFilter struct{} -func (mrf mockRejectFilter) Apply(message *cb.Envelope) broadcastfilter.Action { +func (mrf mockRejectFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) { if bytes.Equal(message.Payload, badTx.Payload) { - return broadcastfilter.Reject + return filter.Reject, nil } - return broadcastfilter.Forward + return filter.Forward, nil } type mockAcceptFilter struct{} -func (mrf mockAcceptFilter) Apply(message *cb.Envelope) broadcastfilter.Action { +func (mrf mockAcceptFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) { if bytes.Equal(message.Payload, goodTx.Payload) { - return broadcastfilter.Accept + return filter.Accept, filter.NoopCommitter } - return broadcastfilter.Forward + return filter.Forward, nil } -func getFiltersAndConfig() (*broadcastfilter.RuleSet, *mockConfigManager) { - cm := &mockConfigManager{} - filters := broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ - &mockConfigFilter{cm}, +func getFilters() *filter.RuleSet { + return filter.NewRuleSet([]filter.Rule{ + &mockIsolatedFilter{}, &mockRejectFilter{}, &mockAcceptFilter{}, }) - return filters, cm } var badTx = &cb.Envelope{Payload: []byte("BAD")} var goodTx = &cb.Envelope{Payload: []byte("GOOD")} -var configTx = &cb.Envelope{Payload: []byte("CONFIG")} +var isolatedTx = &cb.Envelope{Payload: []byte("ISOLATED")} var unmatchedTx = &cb.Envelope{Payload: []byte("UNMATCHED")} -func init() { - configBytes, err := proto.Marshal(&cb.ConfigurationEnvelope{}) - if err != nil { - panic("Error marshaling empty config tx") - } - configTx = &cb.Envelope{Payload: configBytes} - -} - func TestNormalBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() + filters := getFilters() batchSize := 2 - r := NewReceiverImpl(batchSize, filters, cm) + r := NewReceiverImpl(batchSize, filters) - batches, ok := r.Ordered(goodTx) + batches, committers, ok := r.Ordered(goodTx) - if batches != nil { + if batches != nil || committers != nil { t.Fatalf("Should not have created batch") } @@ -117,9 +85,9 @@ func TestNormalBatch(t *testing.T) { t.Fatalf("Should have enqueued message into batch") } - batches, ok = r.Ordered(goodTx) + batches, committers, ok = r.Ordered(goodTx) - if batches == nil { + if batches == nil || committers == nil { t.Fatalf("Should have created batch") } @@ -130,13 +98,13 @@ func TestNormalBatch(t *testing.T) { } func TestBadMessageInBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() + filters := getFilters() batchSize := 2 - r := NewReceiverImpl(batchSize, filters, cm) + r := NewReceiverImpl(batchSize, filters) - batches, ok := r.Ordered(badTx) + batches, committers, ok := r.Ordered(badTx) - if batches != nil { + if batches != nil || committers != nil { t.Fatalf("Should not have created batch") } @@ -144,9 +112,9 @@ func TestBadMessageInBatch(t *testing.T) { t.Fatalf("Should not have enqueued bad message into batch") } - batches, ok = r.Ordered(goodTx) + batches, committers, ok = r.Ordered(goodTx) - if batches != nil { + if batches != nil || committers != nil { t.Fatalf("Should not have created batch") } @@ -154,9 +122,9 @@ func TestBadMessageInBatch(t *testing.T) { t.Fatalf("Should have enqueued good message into batch") } - batches, ok = r.Ordered(badTx) + batches, committers, ok = r.Ordered(badTx) - if batches != nil { + if batches != nil || committers != nil { t.Fatalf("Should not have created batch") } @@ -166,13 +134,13 @@ func TestBadMessageInBatch(t *testing.T) { } func TestUnmatchedMessageInBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() + filters := getFilters() batchSize := 2 - r := NewReceiverImpl(batchSize, filters, cm) + r := NewReceiverImpl(batchSize, filters) - batches, ok := r.Ordered(unmatchedTx) + batches, committers, ok := r.Ordered(unmatchedTx) - if batches != nil { + if batches != nil || committers != nil { t.Fatalf("Should not have created batch") } @@ -180,9 +148,9 @@ func TestUnmatchedMessageInBatch(t *testing.T) { t.Fatalf("Should not have enqueued unmatched message into batch") } - batches, ok = r.Ordered(goodTx) + batches, committers, ok = r.Ordered(goodTx) - if batches != nil { + if batches != nil || committers != nil { t.Fatalf("Should not have created batch") } @@ -190,9 +158,9 @@ func TestUnmatchedMessageInBatch(t *testing.T) { t.Fatalf("Should have enqueued good message into batch") } - batches, ok = r.Ordered(unmatchedTx) + batches, committers, ok = r.Ordered(unmatchedTx) - if batches != nil { + if batches != nil || committers != nil { t.Fatalf("Should not have created batch from unmatched message") } @@ -201,42 +169,38 @@ func TestUnmatchedMessageInBatch(t *testing.T) { } } -func TestReconfigureEmptyBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() +func TestIsolatedEmptyBatch(t *testing.T) { + filters := getFilters() batchSize := 2 - r := NewReceiverImpl(batchSize, filters, cm) + r := NewReceiverImpl(batchSize, filters) - batches, ok := r.Ordered(configTx) + batches, committers, ok := r.Ordered(isolatedTx) if !ok { - t.Fatalf("Should have enqueued config message") + t.Fatalf("Should have enqueued isolated message") } - if !cm.validated { - t.Errorf("ConfigTx should have been validated before processing") + if len(batches) != 1 || len(committers) != 1 { + t.Fatalf("Should created new batch, got %d and %d", len(batches), len(committers)) } - if len(batches) != 1 { - t.Fatalf("Should created new batch, got %d", len(batches)) + if len(batches[0]) != 1 || len(committers[0]) != 1 { + t.Fatalf("Should have had one isolatedTx in the second batch got %d and %d", len(batches[1]), len(committers[0])) } - if len(batches[0]) != 1 { - t.Fatalf("Should have had one config tx in the second batch got %d", len(batches[1])) - } - - if !bytes.Equal(batches[0][0].Payload, configTx.Payload) { - t.Fatalf("Should have had the normal tx in the first batch") + if !bytes.Equal(batches[0][0].Payload, isolatedTx.Payload) { + t.Fatalf("Should have had the isolated tx in the first batch") } } -func TestReconfigurePartialBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() +func TestIsolatedPartialBatch(t *testing.T) { + filters := getFilters() batchSize := 2 - r := NewReceiverImpl(batchSize, filters, cm) + r := NewReceiverImpl(batchSize, filters) - batches, ok := r.Ordered(goodTx) + batches, committers, ok := r.Ordered(goodTx) - if batches != nil { + if batches != nil || committers != nil { t.Fatalf("Should not have created batch") } @@ -244,82 +208,29 @@ func TestReconfigurePartialBatch(t *testing.T) { t.Fatalf("Should have enqueued good message into batch") } - batches, ok = r.Ordered(configTx) + batches, committers, ok = r.Ordered(isolatedTx) if !ok { - t.Fatalf("Should have enqueued config message") - } - - if !cm.validated { - t.Errorf("ConfigTx should have been validated before processing") + t.Fatalf("Should have enqueued isolated message") } - if len(batches) != 2 { - t.Fatalf("Should have created two batches, got %d", len(batches)) + if len(batches) != 2 || len(committers) != 2 { + t.Fatalf("Should have created two batches, got %d and %d", len(batches), len(committers)) } - if len(batches[0]) != 1 { - t.Fatalf("Should have had one normal tx in the first batch got %d", len(batches[0])) + if len(batches[0]) != 1 || len(committers[0]) != 1 { + t.Fatalf("Should have had one normal tx in the first batch got %d and %d committers", len(batches[0]), len(committers[0])) } if !bytes.Equal(batches[0][0].Payload, goodTx.Payload) { t.Fatalf("Should have had the normal tx in the first batch") } - if len(batches[1]) != 1 { - t.Fatalf("Should have had one config tx in the second batch got %d", len(batches[1])) - } - - if !bytes.Equal(batches[1][0].Payload, configTx.Payload) { - t.Fatalf("Should have had the normal tx in the first batch") + if len(batches[1]) != 1 || len(committers[1]) != 1 { + t.Fatalf("Should have had one isolated tx in the second batch got %d and %d committers", len(batches[1]), len(committers[1])) } -} - -func TestReconfigureFailToVerify(t *testing.T) { - filters, cm := getFiltersAndConfig() - cm.validateErr = fmt.Errorf("Fail to apply") - batchSize := 2 - r := NewReceiverImpl(batchSize, filters, cm) - batches, ok := r.Ordered(goodTx) - - if batches != nil { - t.Fatalf("Should not have created batch") - } - - if !ok { - t.Fatalf("Should have enqueued good message into batch") - } - - batches, ok = r.Ordered(configTx) - - if !cm.validated { - t.Errorf("ConfigTx should have been validated before processing") - } - - if batches != nil { - t.Fatalf("Should not have created batch") - } - - if ok { - t.Fatalf("Should not have enqueued bad config message into batch") - } - - batches, ok = r.Ordered(goodTx) - - if batches == nil { - t.Fatalf("Should have created batch") - } - - if len(batches) != 1 { - t.Fatalf("Batches should only have had one batch") - } - - if len(batches[0]) != 2 { - t.Fatalf("Should have had full batch") - } - - if !ok { - t.Fatalf("Should have enqueued good message into batch") + if !bytes.Equal(batches[1][0].Payload, isolatedTx.Payload) { + t.Fatalf("Should have had the isolated tx in the second batch") } } diff --git a/orderer/common/broadcast/broadcast.go b/orderer/common/broadcast/broadcast.go index 5a651ddb35a..aff005d7607 100644 --- a/orderer/common/broadcast/broadcast.go +++ b/orderer/common/broadcast/broadcast.go @@ -17,7 +17,7 @@ limitations under the License. package broadcast import ( - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/filter" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/op/go-logging" @@ -48,7 +48,7 @@ type Support interface { Enqueue(env *cb.Envelope) bool // Filters returns the set of broadcast filters for this chain - Filters() *broadcastfilter.RuleSet + Filters() *filter.RuleSet } type handlerImpl struct { @@ -129,24 +129,18 @@ func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) err panic("Unimplemented") } - action, _ := support.Filters().Apply(msg) + _, filterErr := support.Filters().Apply(msg) - switch action { - case broadcastfilter.Reconfigure: - fallthrough - case broadcastfilter.Accept: + if filterErr != nil { + logger.Debugf("Rejecting broadcast message") + err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) + } else { select { case b.queue <- &msgAndSupport{msg: msg, support: support}: err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) default: err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE}) } - case broadcastfilter.Forward: - fallthrough - case broadcastfilter.Reject: - err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) - default: - logger.Fatalf("Unknown filter action :%v", action) } if err != nil { diff --git a/orderer/common/broadcast/broadcast_test.go b/orderer/common/broadcast/broadcast_test.go index bbbae9bc2f4..b42df39cbdc 100644 --- a/orderer/common/broadcast/broadcast_test.go +++ b/orderer/common/broadcast/broadcast_test.go @@ -17,13 +17,11 @@ limitations under the License. package broadcast import ( - "bytes" "fmt" "testing" "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" - "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/filter" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" @@ -32,53 +30,6 @@ import ( var systemChain = "systemChain" -var configTx []byte - -func init() { - var err error - configTx, err = proto.Marshal(&cb.ConfigurationEnvelope{}) - if err != nil { - panic("Error marshaling empty config tx") - } -} - -type mockConfigManager struct { - validated bool - validateErr error -} - -func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error { - mcm.validated = true - return mcm.validateErr -} - -func (mcm *mockConfigManager) Apply(message *cb.ConfigurationEnvelope) error { - panic("Unimplemented") -} - -func (mcm *mockConfigManager) ChainID() string { - panic("Unimplemented") -} - -type mockConfigFilter struct { - manager configtx.Manager -} - -func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action { - payload := &cb.Payload{} - err := proto.Unmarshal(msg.Payload, payload) - if err != nil { - panic(err) - } - if bytes.Equal(payload.Data, configTx) { - if mcf.manager == nil || mcf.manager.Validate(nil) != nil { - return broadcastfilter.Reject - } - return broadcastfilter.Reconfigure - } - return broadcastfilter.Forward -} - type mockB struct { grpc.ServerStream recvChan chan *cb.Envelope @@ -121,17 +72,12 @@ func (mm *mockSupportManager) halt() { } type mockSupport struct { - configManager *mockConfigManager - filters *broadcastfilter.RuleSet - queue chan *cb.Envelope - done bool -} - -func (ms *mockSupport) ConfigManager() configtx.Manager { - return ms.configManager + filters *filter.RuleSet + queue chan *cb.Envelope + done bool } -func (ms *mockSupport) Filters() *broadcastfilter.RuleSet { +func (ms *mockSupport) Filters() *filter.RuleSet { return ms.filters } @@ -169,19 +115,16 @@ func makeMessage(chainID string, data []byte) *cb.Envelope { } func getMultichainManager() *mockSupportManager { - cm := &mockConfigManager{} - filters := broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ - broadcastfilter.EmptyRejectRule, - &mockConfigFilter{cm}, - broadcastfilter.AcceptRule, + filters := filter.NewRuleSet([]filter.Rule{ + filter.EmptyRejectRule, + filter.AcceptRule, }) mm := &mockSupportManager{ chains: make(map[string]*mockSupport), } mm.chains[string(systemChain)] = &mockSupport{ - filters: filters, - configManager: cm, - queue: make(chan *cb.Envelope), + filters: filters, + queue: make(chan *cb.Envelope), } return mm } @@ -257,40 +200,3 @@ func TestEmptyEnvelope(t *testing.T) { } } - -func TestReconfigureAccept(t *testing.T) { - mm := getMultichainManager() - defer mm.halt() - bh := NewHandlerImpl(mm, 2) - m := newMockB() - defer close(m.recvChan) - go bh.Handle(m) - - m.recvChan <- makeMessage(systemChain, configTx) - - reply := <-m.sendChan - if reply.Status != cb.Status_SUCCESS { - t.Fatalf("Should have successfully queued the message") - } - - if !mm.chains[string(systemChain)].configManager.validated { - t.Errorf("ConfigTx should have been validated before processing") - } -} - -func TestReconfigureReject(t *testing.T) { - mm := getMultichainManager() - mm.chains[string(systemChain)].configManager.validateErr = fmt.Errorf("Fail to validate") - defer mm.halt() - bh := NewHandlerImpl(mm, 2) - m := newMockB() - defer close(m.recvChan) - go bh.Handle(m) - - m.recvChan <- makeMessage(systemChain, configTx) - - reply := <-m.sendChan - if reply.Status != cb.Status_BAD_REQUEST { - t.Fatalf("Should have failed to queue the message because it was invalid config") - } -} diff --git a/orderer/common/configtx/filter.go b/orderer/common/configtx/filter.go index dc684c9fe20..b26523ab254 100644 --- a/orderer/common/configtx/filter.go +++ b/orderer/common/configtx/filter.go @@ -17,7 +17,9 @@ limitations under the License. package configtx import ( - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "fmt" + + "github.com/hyperledger/fabric/orderer/common/filter" cb "github.com/hyperledger/fabric/protos/common" "github.com/golang/protobuf/proto" @@ -28,35 +30,54 @@ type configFilter struct { } // New creates a new configfilter Rule based on the given Manager -func NewFilter(manager Manager) broadcastfilter.Rule { +func NewFilter(manager Manager) filter.Rule { return &configFilter{ configManager: manager, } } +type configCommitter struct { + manager Manager + configEnvelope *cb.ConfigurationEnvelope +} + +func (cc *configCommitter) Commit() { + err := cc.manager.Apply(cc.configEnvelope) + if err != nil { + panic(fmt.Errorf("Could not apply configuration transaction which should have already been validated: %s", err)) + } +} + +func (cc *configCommitter) Isolated() bool { + return true +} + // Apply applies the rule to the given Envelope, replying with the Action to take for the message -func (cf *configFilter) Apply(message *cb.Envelope) broadcastfilter.Action { +func (cf *configFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) { msgData := &cb.Payload{} err := proto.Unmarshal(message.Payload, msgData) if err != nil { - return broadcastfilter.Forward + return filter.Forward, nil } if msgData.Header == nil || msgData.Header.ChainHeader == nil || msgData.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) { - return broadcastfilter.Forward + return filter.Forward, nil } config := &cb.ConfigurationEnvelope{} err = proto.Unmarshal(msgData.Data, config) if err != nil { - return broadcastfilter.Reject + return filter.Reject, nil } err = cf.configManager.Validate(config) if err != nil { - return broadcastfilter.Reject + return filter.Reject, nil } - return broadcastfilter.Reconfigure + return filter.Accept, &configCommitter{ + manager: cf.configManager, + configEnvelope: config, + } } diff --git a/orderer/common/configtx/filter_test.go b/orderer/common/configtx/filter_test.go index 336bc29fdcd..70305e3a6c8 100644 --- a/orderer/common/configtx/filter_test.go +++ b/orderer/common/configtx/filter_test.go @@ -18,16 +18,18 @@ package configtx import ( "fmt" + "reflect" "testing" - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/filter" cb "github.com/hyperledger/fabric/protos/common" "github.com/golang/protobuf/proto" ) type mockConfigManager struct { - err error + applied *cb.ConfigurationEnvelope + err error } func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error { @@ -35,6 +37,7 @@ func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error } func (mcm *mockConfigManager) Apply(configtx *cb.ConfigurationEnvelope) error { + mcm.applied = configtx return mcm.err } @@ -44,23 +47,36 @@ func (mcm *mockConfigManager) ChainID() string { func TestForwardNonConfig(t *testing.T) { cf := NewFilter(&mockConfigManager{}) - result := cf.Apply(&cb.Envelope{ + result, _ := cf.Apply(&cb.Envelope{ Payload: []byte("Opaque"), }) - if result != broadcastfilter.Forward { + if result != filter.Forward { t.Fatalf("Should have forwarded opaque message") } } func TestAcceptGoodConfig(t *testing.T) { - cf := NewFilter(&mockConfigManager{}) - config, _ := proto.Marshal(&cb.ConfigurationEnvelope{}) + mcm := &mockConfigManager{} + cf := NewFilter(mcm) + configEnv := &cb.ConfigurationEnvelope{} + config, _ := proto.Marshal(configEnv) configBytes, _ := proto.Marshal(&cb.Payload{Header: &cb.Header{ChainHeader: &cb.ChainHeader{Type: int32(cb.HeaderType_CONFIGURATION_TRANSACTION)}}, Data: config}) - result := cf.Apply(&cb.Envelope{ + configEnvelope := &cb.Envelope{ Payload: configBytes, - }) - if result != broadcastfilter.Reconfigure { - t.Fatalf("Should have indiated a good config message causes a reconfiguration") + } + result, committer := cf.Apply(configEnvelope) + if result != filter.Accept { + t.Fatalf("Should have indicated a good config message causes a reconfiguration") + } + + if !committer.Isolated() { + t.Fatalf("Configuration transactions should be isolated to their own block") + } + + committer.Commit() + + if !reflect.DeepEqual(mcm.applied, configEnv) { + t.Fatalf("Should have applied new configuration on commit got %v and %v", mcm.applied, configEnv) } } @@ -68,10 +84,10 @@ func TestRejectBadConfig(t *testing.T) { cf := NewFilter(&mockConfigManager{err: fmt.Errorf("Error")}) config, _ := proto.Marshal(&cb.ConfigurationEnvelope{}) configBytes, _ := proto.Marshal(&cb.Payload{Header: &cb.Header{ChainHeader: &cb.ChainHeader{Type: int32(cb.HeaderType_CONFIGURATION_TRANSACTION)}}, Data: config}) - result := cf.Apply(&cb.Envelope{ + result, _ := cf.Apply(&cb.Envelope{ Payload: configBytes, }) - if result != broadcastfilter.Reject { + if result != filter.Reject { t.Fatalf("Should have rejected bad config message") } } diff --git a/orderer/common/broadcastfilter/filter.go b/orderer/common/filter/filter.go similarity index 56% rename from orderer/common/broadcastfilter/filter.go rename to orderer/common/filter/filter.go index 3313bcd2aaf..eb78192a8b6 100644 --- a/orderer/common/broadcastfilter/filter.go +++ b/orderer/common/filter/filter.go @@ -14,9 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -package broadcastfilter +package filter import ( + "fmt" + ab "github.com/hyperledger/fabric/protos/common" ) @@ -26,8 +28,6 @@ type Action int const ( // Accept indicates that the message should be processed Accept = iota - // Reconfigure indicates that this message modifies this rule, and should therefore be processed in a batch by itself - Reconfigure // Reject indicates that the message should not be processed Reject // Forward indicates that the rule could not determine the correct course of action @@ -37,19 +37,37 @@ const ( // Rule defines a filter function which accepts, rejects, or forwards (to the next rule) a Envelope type Rule interface { // Apply applies the rule to the given Envelope, replying with the Action to take for the message - Apply(message *ab.Envelope) Action + // If the filter Accepts a message, it should provide a committer to use when writing the message to the chain + Apply(message *ab.Envelope) (Action, Committer) } +// Committer is returned by postfiltering and should be invoked once the message has been written to the blockchain +type Committer interface { + // Commit performs whatever action should be performed upon commiting of a message + Commit() + + // Isolated returns whether this transaction should have a block to itself or may be mixed with other transactions + Isolated() bool +} + +type noopCommitter struct{} + +func (nc noopCommitter) Commit() {} +func (nc noopCommitter) Isolated() bool { return false } + +// NoopCommitter does nothing on commit and is not isolate +var NoopCommitter = Committer(noopCommitter{}) + // EmptyRejectRule rejects empty messages var EmptyRejectRule = Rule(emptyRejectRule{}) type emptyRejectRule struct{} -func (a emptyRejectRule) Apply(message *ab.Envelope) Action { +func (a emptyRejectRule) Apply(message *ab.Envelope) (Action, Committer) { if message.Payload == nil { - return Reject + return Reject, nil } - return Forward + return Forward, nil } // AcceptRule always returns Accept as a result for Apply @@ -57,8 +75,8 @@ var AcceptRule = Rule(acceptRule{}) type acceptRule struct{} -func (a acceptRule) Apply(message *ab.Envelope) Action { - return Accept +func (a acceptRule) Apply(message *ab.Envelope) (Action, Committer) { + return Accept, NoopCommitter } // RuleSet is used to apply a collection of rules @@ -73,17 +91,17 @@ func NewRuleSet(rules []Rule) *RuleSet { } } -// Apply applies the rules given for this set in order, returning the first non-Forward result and the Rule which generated it -// or returning Forward, nil if no rules accept or reject it -func (rs *RuleSet) Apply(message *ab.Envelope) (Action, Rule) { +// Filter applies the rules given for this set in order, returning the committer, nil on valid, or nil, err on invalid +func (rs *RuleSet) Apply(message *ab.Envelope) (Committer, error) { for _, rule := range rs.rules { - action := rule.Apply(message) + action, committer := rule.Apply(message) switch action { - case Forward: - continue + case Accept: + return committer, nil + case Reject: + return nil, fmt.Errorf("Rejected by rule: %T", rule) default: - return action, rule } } - return Forward, nil + return nil, fmt.Errorf("No matching filter found") } diff --git a/orderer/common/broadcastfilter/filter_test.go b/orderer/common/filter/filter_test.go similarity index 53% rename from orderer/common/broadcastfilter/filter_test.go rename to orderer/common/filter/filter_test.go index f751fff022d..4e20a8b9891 100644 --- a/orderer/common/broadcastfilter/filter_test.go +++ b/orderer/common/filter/filter_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package broadcastfilter +package filter import ( "testing" @@ -26,28 +26,24 @@ var RejectRule = Rule(rejectRule{}) type rejectRule struct{} -func (r rejectRule) Apply(message *cb.Envelope) Action { - return Reject +func (r rejectRule) Apply(message *cb.Envelope) (Action, Committer) { + return Reject, nil } var ForwardRule = Rule(forwardRule{}) type forwardRule struct{} -func (r forwardRule) Apply(message *cb.Envelope) Action { - return Forward +func (r forwardRule) Apply(message *cb.Envelope) (Action, Committer) { + return Forward, nil } func TestEmptyRejectRule(t *testing.T) { - rs := NewRuleSet([]Rule{EmptyRejectRule}) - result, rule := rs.Apply(&cb.Envelope{}) + result, _ := EmptyRejectRule.Apply(&cb.Envelope{}) if result != Reject { t.Fatalf("Should have rejected") } - if rule != EmptyRejectRule { - t.Fatalf("Rejected but not for the right rule") - } - result, _ = rs.Apply(&cb.Envelope{Payload: []byte("fakedata")}) + result, _ = EmptyRejectRule.Apply(&cb.Envelope{Payload: []byte("fakedata")}) if result != Forward { t.Fatalf("Should have forwarded") } @@ -55,55 +51,40 @@ func TestEmptyRejectRule(t *testing.T) { func TestAcceptReject(t *testing.T) { rs := NewRuleSet([]Rule{AcceptRule, RejectRule}) - result, rule := rs.Apply(&cb.Envelope{}) - if result != Accept { - t.Fatalf("Should have accepted") - } - if rule != AcceptRule { - t.Fatalf("Accepted but not for the right rule") + _, err := rs.Apply(&cb.Envelope{}) + if err != nil { + t.Fatalf("Should have accepted: %s", err) } } func TestRejectAccept(t *testing.T) { rs := NewRuleSet([]Rule{RejectRule, AcceptRule}) - result, rule := rs.Apply(&cb.Envelope{}) - if result != Reject { + _, err := rs.Apply(&cb.Envelope{}) + if err == nil { t.Fatalf("Should have rejected") } - if rule != RejectRule { - t.Fatalf("Rejected but not for the right rule") - } } func TestForwardAccept(t *testing.T) { rs := NewRuleSet([]Rule{ForwardRule, AcceptRule}) - result, rule := rs.Apply(&cb.Envelope{}) - if result != Accept { - t.Fatalf("Should have accepted") - } - if rule != AcceptRule { - t.Fatalf("Accepted but not for the right rule") + _, err := rs.Apply(&cb.Envelope{}) + if err != nil { + t.Fatalf("Should have accepted: %s ", err) } } func TestForward(t *testing.T) { rs := NewRuleSet([]Rule{ForwardRule}) - result, rule := rs.Apply(&cb.Envelope{}) - if result != Forward { - t.Fatalf("Should have forwarded") - } - if rule != nil { - t.Fatalf("Forwarded but rule is set") + _, err := rs.Apply(&cb.Envelope{}) + if err == nil { + t.Fatalf("Should have rejected") } } func TestNoRule(t *testing.T) { rs := NewRuleSet([]Rule{}) - result, rule := rs.Apply(&cb.Envelope{}) - if result != Forward { - t.Fatalf("Should have forwarded") - } - if rule != nil { - t.Fatalf("Forwarded but rule is set") + _, err := rs.Apply(&cb.Envelope{}) + if err == nil { + t.Fatalf("Should have rejected") } } diff --git a/orderer/multichain/chainsupport.go b/orderer/multichain/chainsupport.go index 1a48b2da2bd..c7bd6e0d424 100644 --- a/orderer/multichain/chainsupport.go +++ b/orderer/multichain/chainsupport.go @@ -19,15 +19,13 @@ package multichain import ( "github.com/hyperledger/fabric/orderer/common/blockcutter" "github.com/hyperledger/fabric/orderer/common/broadcast" - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" "github.com/hyperledger/fabric/orderer/common/deliver" + "github.com/hyperledger/fabric/orderer/common/filter" "github.com/hyperledger/fabric/orderer/common/policies" "github.com/hyperledger/fabric/orderer/common/sharedconfig" "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" - - "github.com/golang/protobuf/proto" ) // Consenter defines the backing ordering mechanism @@ -61,7 +59,7 @@ type Chain interface { type ConsenterSupport interface { BlockCutter() blockcutter.Receiver SharedConfig() sharedconfig.Manager - Writer() rawledger.Writer + WriteBlock(data []*cb.Envelope, metadata [][]byte, committers []filter.Committer) } // ChainSupport provides a wrapper for the resources backing a chain @@ -77,15 +75,14 @@ type chainSupport struct { configManager configtx.Manager policyManager policies.Manager sharedConfigManager sharedconfig.Manager - reader rawledger.Reader - writer rawledger.Writer - filters *broadcastfilter.RuleSet + ledger rawledger.ReadWriter + filters *filter.RuleSet } func newChainSupport(configManager configtx.Manager, policyManager policies.Manager, backing rawledger.ReadWriter, sharedConfigManager sharedconfig.Manager, consenters map[string]Consenter) *chainSupport { - batchSize := sharedConfigManager.BatchSize() + batchSize := sharedConfigManager.BatchSize() // XXX this needs to be pushed deeper so that the blockcutter queries it after each write for reconfiguration support filters := createBroadcastRuleset(configManager) - cutter := blockcutter.NewReceiverImpl(batchSize, filters, configManager) + cutter := blockcutter.NewReceiverImpl(batchSize, filters) consenterType := sharedConfigManager.ConsensusType() consenter, ok := consenters[consenterType] if !ok { @@ -98,8 +95,7 @@ func newChainSupport(configManager configtx.Manager, policyManager policies.Mana sharedConfigManager: sharedConfigManager, cutter: cutter, filters: filters, - reader: backing, - writer: newWriteInterceptor(configManager, backing), + ledger: backing, } var err error @@ -111,11 +107,11 @@ func newChainSupport(configManager configtx.Manager, policyManager policies.Mana return cs } -func createBroadcastRuleset(configManager configtx.Manager) *broadcastfilter.RuleSet { - return broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ - broadcastfilter.EmptyRejectRule, +func createBroadcastRuleset(configManager configtx.Manager) *filter.RuleSet { + return filter.NewRuleSet([]filter.Rule{ + filter.EmptyRejectRule, configtx.NewFilter(configManager), - broadcastfilter.AcceptRule, + filter.AcceptRule, }) } @@ -135,7 +131,7 @@ func (cs *chainSupport) PolicyManager() policies.Manager { return cs.policyManager } -func (cs *chainSupport) Filters() *broadcastfilter.RuleSet { +func (cs *chainSupport) Filters() *filter.RuleSet { return cs.filters } @@ -144,55 +140,17 @@ func (cs *chainSupport) BlockCutter() blockcutter.Receiver { } func (cs *chainSupport) Reader() rawledger.Reader { - return cs.reader -} - -func (cs *chainSupport) Writer() rawledger.Writer { - return cs.writer + return cs.ledger } func (cs *chainSupport) Enqueue(env *cb.Envelope) bool { return cs.chain.Enqueue(env) } -// writeInterceptor performs 'execution/processing' of blockContents before committing them to the normal passive ledger -// This is intended to support reconfiguration transactions, and ultimately chain creation -type writeInterceptor struct { - configtxManager configtx.Manager - backing rawledger.Writer -} - -func newWriteInterceptor(configtxManager configtx.Manager, backing rawledger.Writer) *writeInterceptor { - return &writeInterceptor{ - backing: backing, - configtxManager: configtxManager, +func (cs *chainSupport) WriteBlock(data []*cb.Envelope, metadata [][]byte, committers []filter.Committer) { + for _, committer := range committers { + committer.Commit() } -} -func (wi *writeInterceptor) Append(blockContents []*cb.Envelope, metadata [][]byte) *cb.Block { - // Note that in general any errors encountered in this path are fatal. - // The previous layers (broadcastfilters, blockcutter) should have scrubbed any invalid - // 'executable' transactions like config before committing via Append - - if len(blockContents) == 1 { - payload := &cb.Payload{} - err := proto.Unmarshal(blockContents[0].Payload, payload) - if err != nil { - logger.Fatalf("Asked to write a malformed envelope to the chain: %s", err) - } - - if payload.Header.ChainHeader.Type == int32(cb.HeaderType_CONFIGURATION_TRANSACTION) { - configEnvelope := &cb.ConfigurationEnvelope{} - err = proto.Unmarshal(payload.Data, configEnvelope) - if err != nil { - logger.Fatalf("Configuration envelope was malformed: %s", err) - } - - err = wi.configtxManager.Apply(configEnvelope) - if err != nil { - logger.Fatalf("Error applying configuration transaction which was already validated: %s", err) - } - } - } - return wi.backing.Append(blockContents, metadata) + cs.ledger.Append(data, metadata) } diff --git a/orderer/multichain/chainsupport_mock_test.go b/orderer/multichain/chainsupport_mock_test.go index 8a68f8c9786..e7d2219c3b9 100644 --- a/orderer/multichain/chainsupport_mock_test.go +++ b/orderer/multichain/chainsupport_mock_test.go @@ -18,7 +18,6 @@ package multichain import ( "github.com/hyperledger/fabric/orderer/common/blockcutter" - "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" ) @@ -27,16 +26,16 @@ type mockConsenter struct { func (mc *mockConsenter) HandleChain(support ConsenterSupport) (Chain, error) { return &mockChain{ - queue: make(chan *cb.Envelope), - ledger: support.Writer(), - cutter: support.BlockCutter(), + queue: make(chan *cb.Envelope), + cutter: support.BlockCutter(), + support: support, }, nil } type mockChain struct { - queue chan *cb.Envelope - ledger rawledger.Writer - cutter blockcutter.Receiver + queue chan *cb.Envelope + support ConsenterSupport + cutter blockcutter.Receiver } func (mch *mockChain) Enqueue(env *cb.Envelope) bool { @@ -51,9 +50,9 @@ func (mch *mockChain) Start() { if !ok { return } - batches, _ := mch.cutter.Ordered(msg) - for _, batch := range batches { - mch.ledger.Append(batch, nil) + batches, committers, _ := mch.cutter.Ordered(msg) + for i, batch := range batches { + mch.support.WriteBlock(batch, nil, committers[i]) } } }() diff --git a/orderer/multichain/chainsupport_test.go b/orderer/multichain/chainsupport_test.go index 963c9f66216..9ab3e078381 100644 --- a/orderer/multichain/chainsupport_test.go +++ b/orderer/multichain/chainsupport_test.go @@ -17,39 +17,65 @@ limitations under the License. package multichain import ( + "reflect" "testing" + "github.com/hyperledger/fabric/orderer/common/filter" + "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" ) -func TestCommitConfig(t *testing.T) { - mcm := &mockConfigtxManager{} - ctx1 := makeConfigTx("foo", 0) - wi := newWriteInterceptor(mcm, &mockLedgerWriter{}) - wi.Append([]*cb.Envelope{ctx1}, nil) - if mcm.config == nil { - t.Fatalf("Should have applied configuration") - } +type mockLedgerReadWriter struct { + data []*cb.Envelope + metadata [][]byte } -func TestIgnoreMultiConfig(t *testing.T) { - mcm := &mockConfigtxManager{} - ctx1 := makeConfigTx("foo", 0) - ctx2 := makeConfigTx("foo", 1) - wi := newWriteInterceptor(mcm, &mockLedgerWriter{}) - wi.Append([]*cb.Envelope{ctx1, ctx2}, nil) - if mcm.config != nil { - t.Fatalf("Should not have applied configuration, we should only check batches with a single tx") - } +func (mlw *mockLedgerReadWriter) Append(data []*cb.Envelope, metadata [][]byte) *cb.Block { + mlw.data = data + mlw.metadata = metadata + return nil +} + +func (mlw *mockLedgerReadWriter) Iterator(startType ab.SeekInfo_StartType, specified uint64) (rawledger.Iterator, uint64) { + panic("Unimplemented") +} + +func (mlw *mockLedgerReadWriter) Height() uint64 { + panic("Unimplemented") +} + +type mockCommitter struct { + committed int } -func TestIgnoreSingleNonConfig(t *testing.T) { - mcm := &mockConfigtxManager{} - ctx1 := makeNormalTx("foo", 0) - wi := newWriteInterceptor(mcm, &mockLedgerWriter{}) - wi.Append([]*cb.Envelope{ctx1}, nil) - if mcm.config != nil { - t.Fatalf("Should not have applied configuration, it was a normal transaction") +func (mc *mockCommitter) Isolated() bool { + panic("Unimplemented") +} + +func (mc *mockCommitter) Commit() { + mc.committed++ +} + +func TestCommitConfig(t *testing.T) { + ml := &mockLedgerReadWriter{} + cs := &chainSupport{ledger: ml} + txs := []*cb.Envelope{makeNormalTx("foo", 0), makeNormalTx("bar", 1)} + md := [][]byte{[]byte("foometa"), []byte("barmeta")} + committers := []filter.Committer{&mockCommitter{}, &mockCommitter{}} + cs.WriteBlock(txs, md, committers) + + if !reflect.DeepEqual(ml.data, txs) { + t.Errorf("Should have written input data to ledger but did not") + } + + if !reflect.DeepEqual(ml.metadata, md) { + t.Errorf("Should have written input metadata to ledger but did not") } + for _, c := range committers { + if c.(*mockCommitter).committed != 1 { + t.Errorf("Expected exactly 1 commits but got %d", c.(*mockCommitter).committed) + } + } } diff --git a/orderer/solo/consensus.go b/orderer/solo/consensus.go index e7d79e8cc45..12fad36b1af 100644 --- a/orderer/solo/consensus.go +++ b/orderer/solo/consensus.go @@ -94,13 +94,13 @@ func (ch *chain) main() { for { select { case msg := <-ch.sendChan: - batches, ok := ch.support.BlockCutter().Ordered(msg) + batches, committers, ok := ch.support.BlockCutter().Ordered(msg) if ok && len(batches) == 0 && timer == nil { timer = time.After(ch.batchTimeout) continue } - for _, batch := range batches { - ch.support.Writer().Append(batch, nil) + for i, batch := range batches { + ch.support.WriteBlock(batch, nil, committers[i]) } if len(batches) > 0 { timer = nil @@ -109,13 +109,13 @@ func (ch *chain) main() { //clear the timer timer = nil - batch := ch.support.BlockCutter().Cut() + batch, committers := ch.support.BlockCutter().Cut() if len(batch) == 0 { logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug") continue } logger.Debugf("Batch timer expired, creating block") - ch.support.Writer().Append(batch, nil) + ch.support.WriteBlock(batch, nil, committers) case <-ch.exitChan: logger.Debugf("Exiting") return diff --git a/orderer/solo/consensus_test.go b/orderer/solo/consensus_test.go index 8e613b49f0e..cbba3e1acc4 100644 --- a/orderer/solo/consensus_test.go +++ b/orderer/solo/consensus_test.go @@ -21,44 +21,52 @@ import ( "time" "github.com/hyperledger/fabric/orderer/common/blockcutter" + "github.com/hyperledger/fabric/orderer/common/filter" "github.com/hyperledger/fabric/orderer/common/sharedconfig" "github.com/hyperledger/fabric/orderer/multichain" - "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" ) type mockBlockCutter struct { - queueNext bool // Ordered returns nil false when not set to true - configTx bool // Ordered returns [][]{curBatch, []{newTx}}, true when set to true - cutNext bool // Ordered returns [][]{append(curBatch, newTx)}, true when set to true - curBatch []*cb.Envelope - block chan struct{} + queueNext bool // Ordered returns nil false when not set to true + isolatedTx bool // Ordered returns [][]{curBatch, []{newTx}}, true when set to true + cutNext bool // Ordered returns [][]{append(curBatch, newTx)}, true when set to true + curBatch []*cb.Envelope + block chan struct{} } func newMockBlockCutter() *mockBlockCutter { return &mockBlockCutter{ - queueNext: true, - configTx: false, - cutNext: false, - block: make(chan struct{}), + queueNext: true, + isolatedTx: false, + cutNext: false, + block: make(chan struct{}), } } -func (mbc *mockBlockCutter) Ordered(env *cb.Envelope) ([][]*cb.Envelope, bool) { +func noopCommitters(size int) []filter.Committer { + res := make([]filter.Committer, size) + for i := range res { + res[i] = filter.NoopCommitter + } + return res +} + +func (mbc *mockBlockCutter) Ordered(env *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) { defer func() { <-mbc.block }() if !mbc.queueNext { logger.Debugf("mockBlockCutter: Not queueing message") - return nil, false + return nil, nil, false } - if mbc.configTx { + if mbc.isolatedTx { logger.Debugf("mockBlockCutter: Returning dual batch") res := [][]*cb.Envelope{mbc.curBatch, []*cb.Envelope{env}} mbc.curBatch = nil - return res, true + return res, [][]filter.Committer{noopCommitters(len(res[0])), noopCommitters(len(res[1]))}, true } mbc.curBatch = append(mbc.curBatch, env) @@ -67,33 +75,23 @@ func (mbc *mockBlockCutter) Ordered(env *cb.Envelope) ([][]*cb.Envelope, bool) { logger.Debugf("mockBlockCutter: Returning regular batch") res := [][]*cb.Envelope{mbc.curBatch} mbc.curBatch = nil - return res, true + return res, [][]filter.Committer{noopCommitters(len(res))}, true } logger.Debugf("mockBlockCutter: Appending to batch") - return nil, true + return nil, nil, true } -func (mbc *mockBlockCutter) Cut() []*cb.Envelope { +func (mbc *mockBlockCutter) Cut() ([]*cb.Envelope, []filter.Committer) { logger.Debugf("mockBlockCutter: Cutting batch") res := mbc.curBatch mbc.curBatch = nil - return res -} - -type mockWriter struct { - batches chan []*cb.Envelope -} - -func (mw *mockWriter) Append(data []*cb.Envelope, metadata [][]byte) *cb.Block { - logger.Debugf("mockWriter: attempting to write batch") - mw.batches <- data - return nil + return res, noopCommitters(len(res)) } type mockConsenterSupport struct { - cutter *mockBlockCutter - writer *mockWriter + cutter *mockBlockCutter + batches chan []*cb.Envelope } func (mcs *mockConsenterSupport) BlockCutter() blockcutter.Receiver { @@ -102,8 +100,9 @@ func (mcs *mockConsenterSupport) BlockCutter() blockcutter.Receiver { func (mcs *mockConsenterSupport) SharedConfig() sharedconfig.Manager { panic("Unimplemented") } -func (mcs *mockConsenterSupport) Writer() rawledger.Writer { - return mcs.writer +func (mcs *mockConsenterSupport) WriteBlock(data []*cb.Envelope, metadata [][]byte, committers []filter.Committer) { + logger.Debugf("mockWriter: attempting to write batch") + mcs.batches <- data } var testMessage = &cb.Envelope{Payload: []byte("TEST_MESSAGE")} @@ -130,8 +129,8 @@ func goWithWait(target func()) *waitableGo { func TestEmptyBatch(t *testing.T) { support := &mockConsenterSupport{ - writer: &mockWriter{batches: make(chan []*cb.Envelope)}, - cutter: newMockBlockCutter(), + batches: make(chan []*cb.Envelope), + cutter: newMockBlockCutter(), } defer close(support.cutter.block) bs := newChain(time.Millisecond, support) @@ -141,7 +140,7 @@ func TestEmptyBatch(t *testing.T) { syncQueueMessage(testMessage, bs, support.cutter) bs.Halt() select { - case <-support.writer.batches: + case <-support.batches: t.Fatalf("Expected no invocations of Append") case <-wg.done: } @@ -149,8 +148,8 @@ func TestEmptyBatch(t *testing.T) { func TestBatchTimer(t *testing.T) { support := &mockConsenterSupport{ - writer: &mockWriter{batches: make(chan []*cb.Envelope)}, - cutter: newMockBlockCutter(), + batches: make(chan []*cb.Envelope), + cutter: newMockBlockCutter(), } defer close(support.cutter.block) bs := newChain(time.Millisecond, support) @@ -160,21 +159,21 @@ func TestBatchTimer(t *testing.T) { syncQueueMessage(testMessage, bs, support.cutter) select { - case <-support.writer.batches: + case <-support.batches: case <-time.After(time.Second): t.Fatalf("Expected a block to be cut because of batch timer expiration but did not") } syncQueueMessage(testMessage, bs, support.cutter) select { - case <-support.writer.batches: + case <-support.batches: case <-time.After(time.Second): t.Fatalf("Did not create the second batch, indicating that the timer was not appopriately reset") } bs.Halt() select { - case <-support.writer.batches: + case <-support.batches: t.Fatalf("Expected no invocations of Append") case <-wg.done: } @@ -182,8 +181,8 @@ func TestBatchTimer(t *testing.T) { func TestBatchTimerHaltOnFilledBatch(t *testing.T) { support := &mockConsenterSupport{ - writer: &mockWriter{batches: make(chan []*cb.Envelope)}, - cutter: newMockBlockCutter(), + batches: make(chan []*cb.Envelope), + cutter: newMockBlockCutter(), } defer close(support.cutter.block) @@ -196,7 +195,7 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) { syncQueueMessage(testMessage, bs, support.cutter) select { - case <-support.writer.batches: + case <-support.batches: case <-time.After(time.Second): t.Fatalf("Expected a block to be cut because the batch was filled, but did not") } @@ -208,7 +207,7 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) { syncQueueMessage(testMessage, bs, support.cutter) select { - case <-support.writer.batches: + case <-support.batches: case <-time.After(time.Second): t.Fatalf("Did not create the second batch, indicating that the old timer was still running") } @@ -223,8 +222,8 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) { func TestConfigStyleMultiBatch(t *testing.T) { support := &mockConsenterSupport{ - writer: &mockWriter{batches: make(chan []*cb.Envelope)}, - cutter: newMockBlockCutter(), + batches: make(chan []*cb.Envelope), + cutter: newMockBlockCutter(), } defer close(support.cutter.block) bs := newChain(time.Hour, support) @@ -232,17 +231,17 @@ func TestConfigStyleMultiBatch(t *testing.T) { defer bs.Halt() syncQueueMessage(testMessage, bs, support.cutter) - support.cutter.configTx = true + support.cutter.isolatedTx = true syncQueueMessage(testMessage, bs, support.cutter) select { - case <-support.writer.batches: + case <-support.batches: case <-time.After(time.Second): t.Fatalf("Expected two blocks to be cut but never got the first") } select { - case <-support.writer.batches: + case <-support.batches: case <-time.After(time.Second): t.Fatalf("Expected the config type tx to create two blocks, but only go the first") }