Skip to content

Commit

Permalink
[FAB-798] Abstract out the solo broadcast handler
Browse files Browse the repository at this point in the history
As a first step of consolidating the common logic of the atomicbroadcast
api between components, this changeset pulls out the logic which is not
solo specific and moves it into the common/broadcast package.

This begins, but does not satisfy FAB-798.

Change-Id: I084ba83832c6986c5f5fb64b5f2cd16d4ab2ff68
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Nov 22, 2016
1 parent 77c7323 commit 73c501c
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 190 deletions.
135 changes: 135 additions & 0 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package broadcast

import (
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
"github.com/hyperledger/fabric/orderer/common/configtx"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"

"github.com/op/go-logging"
)

var logger = logging.MustGetLogger("orderer/common/broadcast")

func init() {
logging.SetLevel(logging.DEBUG, "")
}

// Target defines an interface which the broadcast handler will direct broadcasts to
type Target interface {
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
Enqueue(env *cb.Envelope) bool
}

// Handler defines an interface which handles broadcasts
type Handler interface {
// Handle starts a service thread for a given gRPC connection and services the broadcast connection
Handle(srv ab.AtomicBroadcast_BroadcastServer) error
}

type handlerImpl struct {
queueSize int
target Target
filters *broadcastfilter.RuleSet
configManager configtx.Manager
exitChan chan struct{}
}

// NewHandlerImpl constructs a new implementation of the Handler interface
func NewHandlerImpl(queueSize int, target Target, filters *broadcastfilter.RuleSet, configManager configtx.Manager) Handler {
return &handlerImpl{
queueSize: queueSize,
filters: filters,
configManager: configManager,
target: target,
exitChan: make(chan struct{}),
}
}

// Handle starts a service thread for a given gRPC connection and services the broadcast connection
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
b := newBroadcaster(bh)
defer close(b.queue)
go b.drainQueue()
return b.queueEnvelopes(srv)
}

type broadcaster struct {
bs *handlerImpl
queue chan *cb.Envelope
}

func newBroadcaster(bs *handlerImpl) *broadcaster {
b := &broadcaster{
bs: bs,
queue: make(chan *cb.Envelope, bs.queueSize),
}
return b
}

func (b *broadcaster) drainQueue() {
for {
select {
case msg, ok := <-b.queue:
if ok {
if !b.bs.target.Enqueue(msg) {
return
}
} else {
return
}
case <-b.bs.exitChan:
return
}
}
}

func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) error {

for {
msg, err := srv.Recv()
if err != nil {
return err
}

action, _ := b.bs.filters.Apply(msg)

switch action {
case broadcastfilter.Reconfigure:
fallthrough
case broadcastfilter.Accept:
select {
case b.queue <- msg:
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
default:
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}
case broadcastfilter.Forward:
fallthrough
case broadcastfilter.Reject:
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
default:
logger.Fatalf("Unknown filter action :%v", action)
}

if err != nil {
return err
}
}
}
237 changes: 237 additions & 0 deletions orderer/common/broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package broadcast

import (
"bytes"
"fmt"
"testing"

"google.golang.org/grpc"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
"github.com/hyperledger/fabric/orderer/common/configtx"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
)

var configTx []byte

func init() {
var err error
configTx, err = proto.Marshal(&cb.ConfigurationEnvelope{})
if err != nil {
panic("Error marshaling empty config tx")
}
}

type mockConfigManager struct {
validated bool
applied bool
validateErr error
applyErr error
}

func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error {
mcm.validated = true
return mcm.validateErr
}

func (mcm *mockConfigManager) Apply(message *cb.ConfigurationEnvelope) error {
mcm.applied = true
return mcm.applyErr
}

type mockConfigFilter struct {
manager configtx.Manager
}

func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action {
if bytes.Equal(msg.Payload, configTx) {
if mcf.manager == nil || mcf.manager.Validate(nil) != nil {
return broadcastfilter.Reject
}
return broadcastfilter.Reconfigure
}
return broadcastfilter.Forward
}

type mockTarget struct {
queue chan *cb.Envelope
done bool
}

func (mt *mockTarget) Enqueue(env *cb.Envelope) bool {
mt.queue <- env
return !mt.done
}

func (mt *mockTarget) halt() {
mt.done = true
select {
case <-mt.queue:
default:
}
}

type mockB struct {
grpc.ServerStream
recvChan chan *cb.Envelope
sendChan chan *ab.BroadcastResponse
}

func newMockB() *mockB {
return &mockB{
recvChan: make(chan *cb.Envelope),
sendChan: make(chan *ab.BroadcastResponse),
}
}

func (m *mockB) Send(br *ab.BroadcastResponse) error {
m.sendChan <- br
return nil
}

func (m *mockB) Recv() (*cb.Envelope, error) {
msg, ok := <-m.recvChan
if !ok {
return msg, fmt.Errorf("Channel closed")
}
return msg, nil
}

func getFiltersConfigMockTarget() (*broadcastfilter.RuleSet, *mockConfigManager, *mockTarget) {
cm := &mockConfigManager{}
filters := broadcastfilter.NewRuleSet([]broadcastfilter.Rule{
broadcastfilter.EmptyRejectRule,
&mockConfigFilter{cm},
broadcastfilter.AcceptRule,
})
mt := &mockTarget{queue: make(chan *cb.Envelope)}
return filters, cm, mt

}

func TestQueueOverflow(t *testing.T) {
filters, cm, mt := getFiltersConfigMockTarget()
defer mt.halt()
bh := NewHandlerImpl(2, mt, filters, cm)
m := newMockB()
defer close(m.recvChan)
b := newBroadcaster(bh.(*handlerImpl))
go b.queueEnvelopes(m)

for i := 0; i < 2; i++ {
m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")}
reply := <-m.sendChan
if reply.Status != cb.Status_SUCCESS {
t.Fatalf("Should have successfully queued the message")
}
}

m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")}
reply := <-m.sendChan
if reply.Status != cb.Status_SERVICE_UNAVAILABLE {
t.Fatalf("Should not have successfully queued the message")
}

}

func TestMultiQueueOverflow(t *testing.T) {
filters, cm, mt := getFiltersConfigMockTarget()
defer mt.halt()
bh := NewHandlerImpl(2, mt, filters, cm)
ms := []*mockB{newMockB(), newMockB(), newMockB()}

for _, m := range ms {
defer close(m.recvChan)
b := newBroadcaster(bh.(*handlerImpl))
go b.queueEnvelopes(m)
}

for _, m := range ms {
for i := 0; i < 2; i++ {
m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")}
reply := <-m.sendChan
if reply.Status != cb.Status_SUCCESS {
t.Fatalf("Should have successfully queued the message")
}
}
}

for _, m := range ms {
m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")}
reply := <-m.sendChan
if reply.Status != cb.Status_SERVICE_UNAVAILABLE {
t.Fatalf("Should not have successfully queued the message")
}
}
}

func TestEmptyEnvelope(t *testing.T) {
filters, cm, mt := getFiltersConfigMockTarget()
defer mt.halt()
bh := NewHandlerImpl(2, mt, filters, cm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)

m.recvChan <- &cb.Envelope{}
reply := <-m.sendChan
if reply.Status != cb.Status_BAD_REQUEST {
t.Fatalf("Should have rejected the null message")
}

}

func TestReconfigureAccept(t *testing.T) {
filters, cm, mt := getFiltersConfigMockTarget()
defer mt.halt()
bh := NewHandlerImpl(2, mt, filters, cm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)

m.recvChan <- &cb.Envelope{Payload: configTx}

reply := <-m.sendChan
if reply.Status != cb.Status_SUCCESS {
t.Fatalf("Should have successfully queued the message")
}

if !cm.validated {
t.Errorf("ConfigTx should have been validated before processing")
}
}

func TestReconfigureReject(t *testing.T) {
filters, cm, mt := getFiltersConfigMockTarget()
cm.validateErr = fmt.Errorf("Fail to validate")
defer mt.halt()
bh := NewHandlerImpl(2, mt, filters, cm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)

m.recvChan <- &cb.Envelope{Payload: configTx}

reply := <-m.sendChan
if reply.Status != cb.Status_BAD_REQUEST {
t.Fatalf("Should have failed to queue the message because it was invalid config")
}
}
Loading

0 comments on commit 73c501c

Please sign in to comment.