Skip to content

Commit

Permalink
[FAB-798] Factor common gRPC components from solo
Browse files Browse the repository at this point in the history
As the latest step in consolidating the common logic between orderers,
this changeset removes the gRPC components from solo, and distills solo
to its most simple core of a 'solo consenter'.

The solo consenter performs batch cutting and ordering, while the
broadcast filtering and deliver logic was removed in previous
changesets.

Change-Id: I5f9dfe6239356279a9625a2c89d327c5da275bbc
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Nov 22, 2016
1 parent 1b5d378 commit b7908a3
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 47 deletions.
21 changes: 14 additions & 7 deletions orderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,26 @@ func launchSolo(conf *config.TopLevel) {
}

configManager := bootstrapConfigManager(lastConfigTx)
filters := createBroadcastRuleset(configManager)

// XXX actually use the config manager in the future
_ = configManager

solo.New(int(conf.General.QueueSize),
soloConsenter := solo.NewConsenter(
int(conf.General.BatchSize),
int(conf.General.MaxWindowSize),
conf.General.BatchTimeout,
rawledger,
grpcServer,
createBroadcastRuleset(configManager),
filters,
configManager,
)

server := NewServer(
soloConsenter,
rawledger,
int(conf.General.QueueSize),
int(conf.General.MaxWindowSize),
filters,
configManager,
)

ab.RegisterAtomicBroadcastServer(grpcServer, server)
grpcServer.Serve(lis)
}

Expand Down
2 changes: 1 addition & 1 deletion orderer/rawledger/rawledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Iterator interface {

// Reader allows the caller to inspect the raw ledger
type Reader interface {
// Iterator retrieves an Iterator, as specified by an cb.SeekInfo message, returning an iterator, and it's starting block number
// Iterator retrieves an Iterator, as specified by an cb.SeekInfo message, returning an iterator, and its starting block number
Iterator(startType ab.SeekInfo_StartType, specified uint64) (Iterator, uint64)
// Height returns the highest block number in the chain, plus one
Height() uint64
Expand Down
36 changes: 10 additions & 26 deletions orderer/solo/solo.go → orderer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,41 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package solo
package main

import (
"time"

"github.com/hyperledger/fabric/orderer/common/broadcast"
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
"github.com/hyperledger/fabric/orderer/common/configtx"
"github.com/hyperledger/fabric/orderer/common/deliver"
"github.com/hyperledger/fabric/orderer/rawledger"
ab "github.com/hyperledger/fabric/protos/orderer"

"github.com/op/go-logging"
"google.golang.org/grpc"
)

var logger = logging.MustGetLogger("orderer/solo")

func init() {
logging.SetLevel(logging.DEBUG, "")
}

type server struct {
bh broadcast.Handler
bs *broadcastServer
ds deliver.Handler
dh deliver.Handler
}

// New creates a ab.AtomicBroadcastServer based on the solo orderer implementation
func New(queueSize, batchSize, maxWindowSize int, batchTimeout time.Duration, rl rawledger.ReadWriter, grpcServer *grpc.Server, filters *broadcastfilter.RuleSet, configManager configtx.Manager) ab.AtomicBroadcastServer {
logger.Infof("Starting solo with queueSize=%d, batchSize=%d batchTimeout=%v and ledger=%T", queueSize, batchSize, batchTimeout, rl)
bs := newBroadcastServer(batchSize, batchTimeout, rl, filters, configManager)
ds := deliver.NewHandlerImpl(rl, maxWindowSize)
bh := broadcast.NewHandlerImpl(queueSize, bs, filters, configManager)
// NewServer creates a ab.AtomicBroadcastServer based on the broadcast target and ledger Reader
func NewServer(consenter broadcast.Target, rl rawledger.Reader, queueSize, maxWindowSize int, filters *broadcastfilter.RuleSet, configManager configtx.Manager) ab.AtomicBroadcastServer {
logger.Infof("Starting orderer with consenter=%T, and ledger=%T", consenter, rl)

s := &server{
bs: bs,
ds: ds,
bh: bh,
dh: deliver.NewHandlerImpl(rl, maxWindowSize),
bh: broadcast.NewHandlerImpl(queueSize, consenter, filters, configManager),
}
ab.RegisterAtomicBroadcastServer(grpcServer, s)
return s
}

// Broadcast receives a stream of messages from a client for ordering
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
logger.Debugf("Starting new Broadcast handler")
return s.bh.Handle(srv)
}

// Deliver sends a stream of blocks to a client after ordering
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Starting new Deliver loop")
return s.ds.Handle(srv)
logger.Debugf("Starting new Deliver handler")
return s.dh.Handle(srv)
}
23 changes: 15 additions & 8 deletions orderer/solo/broadcast.go → orderer/solo/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ import (
cb "github.com/hyperledger/fabric/protos/common"

"github.com/golang/protobuf/proto"
"github.com/op/go-logging"
)

type broadcastServer struct {
var logger = logging.MustGetLogger("orderer/solo")

func init() {
logging.SetLevel(logging.DEBUG, "")
}

type consenter struct {
batchSize int
batchTimeout time.Duration
rl rawledger.Writer
Expand All @@ -37,14 +44,14 @@ type broadcastServer struct {
exitChan chan struct{}
}

func newBroadcastServer(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *broadcastServer {
bs := newPlainBroadcastServer(batchSize, batchTimeout, rl, filters, configManager)
func NewConsenter(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *consenter {
bs := newPlainConsenter(batchSize, batchTimeout, rl, filters, configManager)
go bs.main()
return bs
}

func newPlainBroadcastServer(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *broadcastServer {
bs := &broadcastServer{
func newPlainConsenter(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *consenter {
bs := &consenter{
batchSize: batchSize,
batchTimeout: batchTimeout,
rl: rl,
Expand All @@ -56,12 +63,12 @@ func newPlainBroadcastServer(batchSize int, batchTimeout time.Duration, rl rawle
return bs
}

func (bs *broadcastServer) halt() {
func (bs *consenter) halt() {
close(bs.exitChan)
}

// Enqueue accepts a message and returns true on acceptance, or false on shutdown
func (bs *broadcastServer) Enqueue(env *cb.Envelope) bool {
func (bs *consenter) Enqueue(env *cb.Envelope) bool {
select {
case bs.sendChan <- env:
return true
Expand All @@ -70,7 +77,7 @@ func (bs *broadcastServer) Enqueue(env *cb.Envelope) bool {
}
}

func (bs *broadcastServer) main() {
func (bs *consenter) main() {
var curBatch []*cb.Envelope
var timer <-chan time.Time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *mockB) Recv() (*cb.Envelope, error) {

func TestEmptyBatch(t *testing.T) {
filters, cm := getFiltersAndConfig()
bs := newPlainBroadcastServer(1, time.Millisecond, ramledger.New(10, genesisBlock), filters, cm)
bs := newPlainConsenter(1, time.Millisecond, ramledger.New(10, genesisBlock), filters, cm)
if bs.rl.(rawledger.Reader).Height() != 1 {
t.Fatalf("Expected no new blocks created")
}
Expand All @@ -133,7 +133,7 @@ func TestBatchTimer(t *testing.T) {
filters, cm := getFiltersAndConfig()
batchSize := 2
rl := ramledger.New(10, genesisBlock)
bs := newBroadcastServer(batchSize, time.Millisecond, rl, filters, cm)
bs := NewConsenter(batchSize, time.Millisecond, rl, filters, cm)
defer bs.halt()
it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1)

Expand All @@ -150,7 +150,7 @@ func TestFilledBatch(t *testing.T) {
filters, cm := getFiltersAndConfig()
batchSize := 2
messages := 10
bs := newPlainBroadcastServer(batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm)
bs := newPlainConsenter(batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm)
done := make(chan struct{})
go func() {
bs.main()
Expand All @@ -170,7 +170,7 @@ func TestFilledBatch(t *testing.T) {
func TestReconfigureGoodPath(t *testing.T) {
filters, cm := getFiltersAndConfig()
batchSize := 2
bs := newPlainBroadcastServer(batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm)
bs := newPlainConsenter(batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm)
done := make(chan struct{})
go func() {
bs.main()
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestReconfigureFailToApply(t *testing.T) {
filters, cm := getFiltersAndConfig()
cm.applyErr = fmt.Errorf("Fail to apply")
batchSize := 2
bs := newPlainBroadcastServer(batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm)
bs := newPlainConsenter(batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm)
done := make(chan struct{})
go func() {
bs.main()
Expand Down

0 comments on commit b7908a3

Please sign in to comment.