diff --git a/orderer/solo/deliver.go b/orderer/common/deliver/deliver.go similarity index 91% rename from orderer/solo/deliver.go rename to orderer/common/deliver/deliver.go index a588f8cce96..7eb87511f5f 100644 --- a/orderer/solo/deliver.go +++ b/orderer/common/deliver/deliver.go @@ -14,27 +14,39 @@ See the License for the specific language governing permissions and limitations under the License. */ -package solo +package deliver import ( "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + + "github.com/op/go-logging" ) +var logger = logging.MustGetLogger("orderer/common/deliver") + +func init() { + logging.SetLevel(logging.DEBUG, "") +} + +type Handler interface { + Handle(srv ab.AtomicBroadcast_DeliverServer) error +} + type DeliverServer struct { rl rawledger.Reader maxWindow int } -func NewDeliverServer(rl rawledger.Reader, maxWindow int) *DeliverServer { +func NewHandlerImpl(rl rawledger.Reader, maxWindow int) Handler { return &DeliverServer{ rl: rl, maxWindow: maxWindow, } } -func (ds *DeliverServer) HandleDeliver(srv ab.AtomicBroadcast_DeliverServer) error { +func (ds *DeliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { logger.Debugf("Starting new Deliver loop") d := newDeliverer(ds, srv) return d.recv() diff --git a/orderer/solo/deliver_test.go b/orderer/common/deliver/deliver_test.go similarity index 90% rename from orderer/solo/deliver_test.go rename to orderer/common/deliver/deliver_test.go index d9d6e1cf94f..b4d888e9f90 100644 --- a/orderer/solo/deliver_test.go +++ b/orderer/common/deliver/deliver_test.go @@ -14,20 +14,32 @@ See the License for the specific language governing permissions and limitations under the License. */ -package solo +package deliver import ( "fmt" "testing" "time" - "google.golang.org/grpc" - + "github.com/hyperledger/fabric/orderer/common/bootstrap/static" "github.com/hyperledger/fabric/orderer/rawledger/ramledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + + "google.golang.org/grpc" ) +var genesisBlock *cb.Block + +func init() { + bootstrapper := static.New() + var err error + genesisBlock, err = bootstrapper.GenesisBlock() + if err != nil { + panic("Error intializing static bootstrap genesis block") + } +} + // MagicLargestWindow is used as the default max window size for initializing the deliver service const MagicLargestWindow int = 1000 @@ -66,9 +78,9 @@ func TestOldestSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_OLDEST}}} @@ -98,9 +110,9 @@ func TestNewestSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST}}} @@ -126,9 +138,9 @@ func TestSpecificSeek(t *testing.T) { } m := newMockD() - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} @@ -155,9 +167,9 @@ func TestBadSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} @@ -188,9 +200,9 @@ func TestBadWindow(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow) * 2, Start: ab.SeekInfo_OLDEST}}} @@ -214,9 +226,9 @@ func TestAck(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: windowSize, Start: ab.SeekInfo_OLDEST}}} diff --git a/orderer/sbft/backend/backendab.go b/orderer/sbft/backend/backendab.go index 1c27384bb8c..7c918453fa3 100644 --- a/orderer/sbft/backend/backendab.go +++ b/orderer/sbft/backend/backendab.go @@ -18,20 +18,20 @@ package backend import ( "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/orderer/solo" + "github.com/hyperledger/fabric/orderer/common/deliver" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" ) type BackendAB struct { backend *Backend - deliverserver *solo.DeliverServer + deliverserver deliver.Handler } func NewBackendAB(backend *Backend) *BackendAB { bab := &BackendAB{ backend: backend, - deliverserver: solo.NewDeliverServer(backend.ledger, 1000), + deliverserver: deliver.NewHandlerImpl(backend.ledger, 1000), } return bab } @@ -64,5 +64,5 @@ func (b *BackendAB) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error { // Deliver sends a stream of blocks to a client after ordering func (b *BackendAB) Deliver(srv ab.AtomicBroadcast_DeliverServer) error { - return b.deliverserver.HandleDeliver(srv) + return b.deliverserver.Handle(srv) } diff --git a/orderer/solo/solo.go b/orderer/solo/solo.go index 62d6afea22b..aafbd639bec 100644 --- a/orderer/solo/solo.go +++ b/orderer/solo/solo.go @@ -22,6 +22,7 @@ import ( "github.com/hyperledger/fabric/orderer/common/broadcast" "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/deliver" "github.com/hyperledger/fabric/orderer/rawledger" ab "github.com/hyperledger/fabric/protos/orderer" @@ -38,14 +39,14 @@ func init() { type server struct { bh broadcast.Handler bs *broadcastServer - ds *DeliverServer + ds deliver.Handler } // New creates a ab.AtomicBroadcastServer based on the solo orderer implementation func New(queueSize, batchSize, maxWindowSize int, batchTimeout time.Duration, rl rawledger.ReadWriter, grpcServer *grpc.Server, filters *broadcastfilter.RuleSet, configManager configtx.Manager) ab.AtomicBroadcastServer { logger.Infof("Starting solo with queueSize=%d, batchSize=%d batchTimeout=%v and ledger=%T", queueSize, batchSize, batchTimeout, rl) bs := newBroadcastServer(batchSize, batchTimeout, rl, filters, configManager) - ds := NewDeliverServer(rl, maxWindowSize) + ds := deliver.NewHandlerImpl(rl, maxWindowSize) bh := broadcast.NewHandlerImpl(queueSize, bs, filters, configManager) s := &server{ @@ -65,5 +66,5 @@ func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error { // Deliver sends a stream of blocks to a client after ordering func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error { logger.Debugf("Starting new Deliver loop") - return s.ds.HandleDeliver(srv) + return s.ds.Handle(srv) }