diff --git a/orderer/common/broadcast/broadcast.go b/orderer/common/broadcast/broadcast.go new file mode 100644 index 00000000000..57244b618d2 --- /dev/null +++ b/orderer/common/broadcast/broadcast.go @@ -0,0 +1,135 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package broadcast + +import ( + "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/configtx" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + + "github.com/op/go-logging" +) + +var logger = logging.MustGetLogger("orderer/common/broadcast") + +func init() { + logging.SetLevel(logging.DEBUG, "") +} + +// Target defines an interface which the broadcast handler will direct broadcasts to +type Target interface { + // Enqueue accepts a message and returns true on acceptance, or false on shutdown + Enqueue(env *cb.Envelope) bool +} + +// Handler defines an interface which handles broadcasts +type Handler interface { + // Handle starts a service thread for a given gRPC connection and services the broadcast connection + Handle(srv ab.AtomicBroadcast_BroadcastServer) error +} + +type handlerImpl struct { + queueSize int + target Target + filters *broadcastfilter.RuleSet + configManager configtx.Manager + exitChan chan struct{} +} + +// NewHandlerImpl constructs a new implementation of the Handler interface +func NewHandlerImpl(queueSize int, target Target, filters *broadcastfilter.RuleSet, configManager configtx.Manager) Handler { + return &handlerImpl{ + queueSize: queueSize, + filters: filters, + configManager: configManager, + target: target, + exitChan: make(chan struct{}), + } +} + +// Handle starts a service thread for a given gRPC connection and services the broadcast connection +func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { + b := newBroadcaster(bh) + defer close(b.queue) + go b.drainQueue() + return b.queueEnvelopes(srv) +} + +type broadcaster struct { + bs *handlerImpl + queue chan *cb.Envelope +} + +func newBroadcaster(bs *handlerImpl) *broadcaster { + b := &broadcaster{ + bs: bs, + queue: make(chan *cb.Envelope, bs.queueSize), + } + return b +} + +func (b *broadcaster) drainQueue() { + for { + select { + case msg, ok := <-b.queue: + if ok { + if !b.bs.target.Enqueue(msg) { + return + } + } else { + return + } + case <-b.bs.exitChan: + return + } + } +} + +func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) error { + + for { + msg, err := srv.Recv() + if err != nil { + return err + } + + action, _ := b.bs.filters.Apply(msg) + + switch action { + case broadcastfilter.Reconfigure: + fallthrough + case broadcastfilter.Accept: + select { + case b.queue <- msg: + err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) + default: + err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE}) + } + case broadcastfilter.Forward: + fallthrough + case broadcastfilter.Reject: + err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) + default: + logger.Fatalf("Unknown filter action :%v", action) + } + + if err != nil { + return err + } + } +} diff --git a/orderer/common/broadcast/broadcast_test.go b/orderer/common/broadcast/broadcast_test.go new file mode 100644 index 00000000000..41fd57597d9 --- /dev/null +++ b/orderer/common/broadcast/broadcast_test.go @@ -0,0 +1,237 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package broadcast + +import ( + "bytes" + "fmt" + "testing" + + "google.golang.org/grpc" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/configtx" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" +) + +var configTx []byte + +func init() { + var err error + configTx, err = proto.Marshal(&cb.ConfigurationEnvelope{}) + if err != nil { + panic("Error marshaling empty config tx") + } +} + +type mockConfigManager struct { + validated bool + applied bool + validateErr error + applyErr error +} + +func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error { + mcm.validated = true + return mcm.validateErr +} + +func (mcm *mockConfigManager) Apply(message *cb.ConfigurationEnvelope) error { + mcm.applied = true + return mcm.applyErr +} + +type mockConfigFilter struct { + manager configtx.Manager +} + +func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action { + if bytes.Equal(msg.Payload, configTx) { + if mcf.manager == nil || mcf.manager.Validate(nil) != nil { + return broadcastfilter.Reject + } + return broadcastfilter.Reconfigure + } + return broadcastfilter.Forward +} + +type mockTarget struct { + queue chan *cb.Envelope + done bool +} + +func (mt *mockTarget) Enqueue(env *cb.Envelope) bool { + mt.queue <- env + return !mt.done +} + +func (mt *mockTarget) halt() { + mt.done = true + select { + case <-mt.queue: + default: + } +} + +type mockB struct { + grpc.ServerStream + recvChan chan *cb.Envelope + sendChan chan *ab.BroadcastResponse +} + +func newMockB() *mockB { + return &mockB{ + recvChan: make(chan *cb.Envelope), + sendChan: make(chan *ab.BroadcastResponse), + } +} + +func (m *mockB) Send(br *ab.BroadcastResponse) error { + m.sendChan <- br + return nil +} + +func (m *mockB) Recv() (*cb.Envelope, error) { + msg, ok := <-m.recvChan + if !ok { + return msg, fmt.Errorf("Channel closed") + } + return msg, nil +} + +func getFiltersConfigMockTarget() (*broadcastfilter.RuleSet, *mockConfigManager, *mockTarget) { + cm := &mockConfigManager{} + filters := broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ + broadcastfilter.EmptyRejectRule, + &mockConfigFilter{cm}, + broadcastfilter.AcceptRule, + }) + mt := &mockTarget{queue: make(chan *cb.Envelope)} + return filters, cm, mt + +} + +func TestQueueOverflow(t *testing.T) { + filters, cm, mt := getFiltersConfigMockTarget() + defer mt.halt() + bh := NewHandlerImpl(2, mt, filters, cm) + m := newMockB() + defer close(m.recvChan) + b := newBroadcaster(bh.(*handlerImpl)) + go b.queueEnvelopes(m) + + for i := 0; i < 2; i++ { + m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + reply := <-m.sendChan + if reply.Status != cb.Status_SUCCESS { + t.Fatalf("Should have successfully queued the message") + } + } + + m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + reply := <-m.sendChan + if reply.Status != cb.Status_SERVICE_UNAVAILABLE { + t.Fatalf("Should not have successfully queued the message") + } + +} + +func TestMultiQueueOverflow(t *testing.T) { + filters, cm, mt := getFiltersConfigMockTarget() + defer mt.halt() + bh := NewHandlerImpl(2, mt, filters, cm) + ms := []*mockB{newMockB(), newMockB(), newMockB()} + + for _, m := range ms { + defer close(m.recvChan) + b := newBroadcaster(bh.(*handlerImpl)) + go b.queueEnvelopes(m) + } + + for _, m := range ms { + for i := 0; i < 2; i++ { + m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + reply := <-m.sendChan + if reply.Status != cb.Status_SUCCESS { + t.Fatalf("Should have successfully queued the message") + } + } + } + + for _, m := range ms { + m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + reply := <-m.sendChan + if reply.Status != cb.Status_SERVICE_UNAVAILABLE { + t.Fatalf("Should not have successfully queued the message") + } + } +} + +func TestEmptyEnvelope(t *testing.T) { + filters, cm, mt := getFiltersConfigMockTarget() + defer mt.halt() + bh := NewHandlerImpl(2, mt, filters, cm) + m := newMockB() + defer close(m.recvChan) + go bh.Handle(m) + + m.recvChan <- &cb.Envelope{} + reply := <-m.sendChan + if reply.Status != cb.Status_BAD_REQUEST { + t.Fatalf("Should have rejected the null message") + } + +} + +func TestReconfigureAccept(t *testing.T) { + filters, cm, mt := getFiltersConfigMockTarget() + defer mt.halt() + bh := NewHandlerImpl(2, mt, filters, cm) + m := newMockB() + defer close(m.recvChan) + go bh.Handle(m) + + m.recvChan <- &cb.Envelope{Payload: configTx} + + reply := <-m.sendChan + if reply.Status != cb.Status_SUCCESS { + t.Fatalf("Should have successfully queued the message") + } + + if !cm.validated { + t.Errorf("ConfigTx should have been validated before processing") + } +} + +func TestReconfigureReject(t *testing.T) { + filters, cm, mt := getFiltersConfigMockTarget() + cm.validateErr = fmt.Errorf("Fail to validate") + defer mt.halt() + bh := NewHandlerImpl(2, mt, filters, cm) + m := newMockB() + defer close(m.recvChan) + go bh.Handle(m) + + m.recvChan <- &cb.Envelope{Payload: configTx} + + reply := <-m.sendChan + if reply.Status != cb.Status_BAD_REQUEST { + t.Fatalf("Should have failed to queue the message because it was invalid config") + } +} diff --git a/orderer/solo/broadcast.go b/orderer/solo/broadcast.go index 90e4407c036..4c09a87173e 100644 --- a/orderer/solo/broadcast.go +++ b/orderer/solo/broadcast.go @@ -23,13 +23,11 @@ import ( "github.com/hyperledger/fabric/orderer/common/configtx" "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" "github.com/golang/protobuf/proto" ) type broadcastServer struct { - queueSize int batchSize int batchTimeout time.Duration rl rawledger.Writer @@ -39,15 +37,14 @@ type broadcastServer struct { exitChan chan struct{} } -func newBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *broadcastServer { - bs := newPlainBroadcastServer(queueSize, batchSize, batchTimeout, rl, filters, configManager) +func newBroadcastServer(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *broadcastServer { + bs := newPlainBroadcastServer(batchSize, batchTimeout, rl, filters, configManager) go bs.main() return bs } -func newPlainBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *broadcastServer { +func newPlainBroadcastServer(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *broadcastServer { bs := &broadcastServer{ - queueSize: queueSize, batchSize: batchSize, batchTimeout: batchTimeout, rl: rl, @@ -63,6 +60,16 @@ func (bs *broadcastServer) halt() { close(bs.exitChan) } +// Enqueue accepts a message and returns true on acceptance, or false on shutdown +func (bs *broadcastServer) Enqueue(env *cb.Envelope) bool { + select { + case bs.sendChan <- env: + return true + case <-bs.exitChan: + return false + } +} + func (bs *broadcastServer) main() { var curBatch []*cb.Envelope var timer <-chan time.Time @@ -130,76 +137,3 @@ func (bs *broadcastServer) main() { } } } - -func (bs *broadcastServer) handleBroadcast(srv ab.AtomicBroadcast_BroadcastServer) error { - b := newBroadcaster(bs) - defer close(b.queue) - go b.drainQueue() - return b.queueEnvelopes(srv) -} - -type broadcaster struct { - bs *broadcastServer - queue chan *cb.Envelope -} - -func (b *broadcaster) drainQueue() { - for { - select { - case msg, ok := <-b.queue: - if ok { - select { - case b.bs.sendChan <- msg: - case <-b.bs.exitChan: - return - } - } else { - return - } - case <-b.bs.exitChan: - return - } - } -} - -func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) error { - - for { - msg, err := srv.Recv() - if err != nil { - return err - } - - action, _ := b.bs.filter.Apply(msg) - - switch action { - case broadcastfilter.Reconfigure: - fallthrough - case broadcastfilter.Accept: - select { - case b.queue <- msg: - err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) - default: - err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE}) - } - case broadcastfilter.Forward: - fallthrough - case broadcastfilter.Reject: - err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) - default: - logger.Fatalf("Unknown filter action :%v", action) - } - - if err != nil { - return err - } - } -} - -func newBroadcaster(bs *broadcastServer) *broadcaster { - b := &broadcaster{ - bs: bs, - queue: make(chan *cb.Envelope, bs.queueSize), - } - return b -} diff --git a/orderer/solo/broadcast_test.go b/orderer/solo/broadcast_test.go index 51fac4a4ebe..fe1e6a82d4f 100644 --- a/orderer/solo/broadcast_test.go +++ b/orderer/solo/broadcast_test.go @@ -121,81 +121,9 @@ func (m *mockB) Recv() (*cb.Envelope, error) { return msg, nil } -func TestQueueOverflow(t *testing.T) { - filters, cm := getFiltersAndConfig() - bs := newPlainBroadcastServer(2, 1, time.Second, nil, filters, cm) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused), filters, configManager - m := newMockB() - b := newBroadcaster(bs) - go b.queueEnvelopes(m) - defer close(m.recvChan) - - bs.halt() - - for i := 0; i < 2; i++ { - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} - reply := <-m.sendChan - if reply.Status != cb.Status_SUCCESS { - t.Fatalf("Should have successfully queued the message") - } - } - - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} - reply := <-m.sendChan - if reply.Status != cb.Status_SERVICE_UNAVAILABLE { - t.Fatalf("Should not have successfully queued the message") - } - -} - -func TestMultiQueueOverflow(t *testing.T) { - filters, cm := getFiltersAndConfig() - bs := newPlainBroadcastServer(2, 1, time.Second, nil, filters, cm) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused), filters, configManager - // m := newMockB() - ms := []*mockB{newMockB(), newMockB(), newMockB()} - - for _, m := range ms { - b := newBroadcaster(bs) - go b.queueEnvelopes(m) - defer close(m.recvChan) - } - - for _, m := range ms { - for i := 0; i < 2; i++ { - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} - reply := <-m.sendChan - if reply.Status != cb.Status_SUCCESS { - t.Fatalf("Should have successfully queued the message") - } - } - } - - for _, m := range ms { - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} - reply := <-m.sendChan - if reply.Status != cb.Status_SERVICE_UNAVAILABLE { - t.Fatalf("Should not have successfully queued the message") - } - } -} - -func TestEmptyEnvelope(t *testing.T) { - filters, cm := getFiltersAndConfig() - bs := newPlainBroadcastServer(2, 1, time.Second, nil, filters, cm) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused), filters, configManager - m := newMockB() - defer close(m.recvChan) - go bs.handleBroadcast(m) - - m.recvChan <- &cb.Envelope{} - reply := <-m.sendChan - if reply.Status != cb.Status_BAD_REQUEST { - t.Fatalf("Should have rejected the null message") - } - -} - func TestEmptyBatch(t *testing.T) { filters, cm := getFiltersAndConfig() - bs := newPlainBroadcastServer(2, 1, time.Millisecond, ramledger.New(10, genesisBlock), filters, cm) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused), filters, configManager + bs := newPlainBroadcastServer(1, time.Millisecond, ramledger.New(10, genesisBlock), filters, cm) if bs.rl.(rawledger.Reader).Height() != 1 { t.Fatalf("Expected no new blocks created") } @@ -205,7 +133,7 @@ func TestBatchTimer(t *testing.T) { filters, cm := getFiltersAndConfig() batchSize := 2 rl := ramledger.New(10, genesisBlock) - bs := newBroadcastServer(0, batchSize, time.Millisecond, rl, filters, cm) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused), filters, configManager + bs := newBroadcastServer(batchSize, time.Millisecond, rl, filters, cm) defer bs.halt() it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1) @@ -222,7 +150,7 @@ func TestFilledBatch(t *testing.T) { filters, cm := getFiltersAndConfig() batchSize := 2 messages := 10 - bs := newPlainBroadcastServer(0, batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused), filters, configManager + bs := newPlainBroadcastServer(batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm) done := make(chan struct{}) go func() { bs.main() @@ -242,7 +170,7 @@ func TestFilledBatch(t *testing.T) { func TestReconfigureGoodPath(t *testing.T) { filters, cm := getFiltersAndConfig() batchSize := 2 - bs := newPlainBroadcastServer(0, batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused), filters, configManager + bs := newPlainBroadcastServer(batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm) done := make(chan struct{}) go func() { bs.main() @@ -270,42 +198,11 @@ func TestReconfigureGoodPath(t *testing.T) { } } -func TestReconfigureFailToValidate(t *testing.T) { - filters, cm := getFiltersAndConfig() - cm.validateErr = fmt.Errorf("Fail to validate") - batchSize := 2 - bs := newPlainBroadcastServer(0, batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused), filters, configManager - done := make(chan struct{}) - go func() { - bs.main() - close(done) - }() - - bs.sendChan <- &cb.Envelope{Payload: []byte("Msg1")} - bs.sendChan <- &cb.Envelope{Payload: configTx} - bs.sendChan <- &cb.Envelope{Payload: []byte("Msg2")} - - bs.halt() - <-done - expected := uint64(2) - if bs.rl.(rawledger.Reader).Height() != expected { - t.Fatalf("Expected %d blocks but got %d", expected, bs.rl.(rawledger.Reader).Height()) - } - - if !cm.validated { - t.Errorf("ConfigTx should have been validated before processing") - } - - if cm.applied { - t.Errorf("ConfigTx should not have been applied") - } -} - func TestReconfigureFailToApply(t *testing.T) { filters, cm := getFiltersAndConfig() cm.applyErr = fmt.Errorf("Fail to apply") batchSize := 2 - bs := newPlainBroadcastServer(0, batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused), filters, configManager + bs := newPlainBroadcastServer(batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm) done := make(chan struct{}) go func() { bs.main() diff --git a/orderer/solo/solo.go b/orderer/solo/solo.go index 7b8e73aa527..62d6afea22b 100644 --- a/orderer/solo/solo.go +++ b/orderer/solo/solo.go @@ -19,6 +19,7 @@ package solo import ( "time" + "github.com/hyperledger/fabric/orderer/common/broadcast" "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" "github.com/hyperledger/fabric/orderer/rawledger" @@ -35,6 +36,7 @@ func init() { } type server struct { + bh broadcast.Handler bs *broadcastServer ds *DeliverServer } @@ -42,9 +44,14 @@ type server struct { // New creates a ab.AtomicBroadcastServer based on the solo orderer implementation func New(queueSize, batchSize, maxWindowSize int, batchTimeout time.Duration, rl rawledger.ReadWriter, grpcServer *grpc.Server, filters *broadcastfilter.RuleSet, configManager configtx.Manager) ab.AtomicBroadcastServer { logger.Infof("Starting solo with queueSize=%d, batchSize=%d batchTimeout=%v and ledger=%T", queueSize, batchSize, batchTimeout, rl) + bs := newBroadcastServer(batchSize, batchTimeout, rl, filters, configManager) + ds := NewDeliverServer(rl, maxWindowSize) + bh := broadcast.NewHandlerImpl(queueSize, bs, filters, configManager) + s := &server{ - bs: newBroadcastServer(queueSize, batchSize, batchTimeout, rl, filters, configManager), - ds: NewDeliverServer(rl, maxWindowSize), + bs: bs, + ds: ds, + bh: bh, } ab.RegisterAtomicBroadcastServer(grpcServer, s) return s @@ -52,7 +59,7 @@ func New(queueSize, batchSize, maxWindowSize int, batchTimeout time.Duration, rl // Broadcast receives a stream of messages from a client for ordering func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error { - return s.bs.handleBroadcast(srv) + return s.bh.Handle(srv) } // Deliver sends a stream of blocks to a client after ordering