diff --git a/orderer/rawledger/ramledger/ramledger.go b/orderer/rawledger/ramledger/ramledger.go new file mode 100644 index 00000000000..0fe28904d50 --- /dev/null +++ b/orderer/rawledger/ramledger/ramledger.go @@ -0,0 +1,154 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ramledger + +import ( + ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" + "github.com/hyperledger/fabric/orderer/rawledger" + + "github.com/op/go-logging" +) + +var logger = logging.MustGetLogger("rawledger/ramledger") + +func init() { + logging.SetLevel(logging.DEBUG, "") +} + +type cursor struct { + list *simpleList +} + +type simpleList struct { + next *simpleList + signal chan struct{} + block *ab.Block +} + +type ramLedger struct { + maxSize int + size int + oldest *simpleList + newest *simpleList +} + +// New creates a new instance of the ram ledger +func New(maxSize int) rawledger.ReadWriter { + rl := &ramLedger{ + maxSize: maxSize, + size: 1, + oldest: &simpleList{ + signal: make(chan struct{}), + block: &ab.Block{ + Number: 0, + PrevHash: []byte("GENESIS"), + }, + }, + } + rl.newest = rl.oldest + return rl +} + +// Height returns the highest block number in the chain, plus one +func (rl *ramLedger) Height() uint64 { + return rl.newest.block.Number + 1 +} + +// Iterator implements the rawledger.Reader definition +func (rl *ramLedger) Iterator(startType ab.SeekInfo_StartType, specified uint64) (rawledger.Iterator, uint64) { + var list *simpleList + switch startType { + case ab.SeekInfo_OLDEST: + oldest := rl.oldest + list = &simpleList{ + block: &ab.Block{Number: oldest.block.Number - 1}, + next: oldest, + signal: make(chan struct{}), + } + close(list.signal) + case ab.SeekInfo_NEWEST: + newest := rl.newest + list = &simpleList{ + block: &ab.Block{Number: newest.block.Number - 1}, + next: newest, + signal: make(chan struct{}), + } + close(list.signal) + case ab.SeekInfo_SPECIFIED: + list = rl.oldest + if specified < list.block.Number || specified > rl.newest.block.Number+1 { + return &rawledger.NotFoundErrorIterator{}, 0 + } + + for { + if list.block.Number == specified-1 { + break + } + list = list.next // No need for nil check, because of range check above + } + } + return &cursor{list: list}, list.block.Number + 1 +} + +// Next blocks until there is a new block available, or returns an error if the next block is no longer retrievable +func (cu *cursor) Next() (*ab.Block, ab.Status) { + // This only loops once, as signal reading indicates non-nil next + for { + if cu.list.next != nil { + cu.list = cu.list.next + return cu.list.block, ab.Status_SUCCESS + } + + <-cu.list.signal + } +} + +// ReadyChan returns a channel that will close when Next is ready to be called without blocking +func (cu *cursor) ReadyChan() <-chan struct{} { + return cu.list.signal +} + +// Append creates a new block and appends it to the ledger +func (rl *ramLedger) Append(messages []*ab.BroadcastMessage, proof []byte) *ab.Block { + block := &ab.Block{ + Number: rl.newest.block.Number + 1, + PrevHash: rl.newest.block.Hash(), + Messages: messages, + Proof: proof, + } + rl.appendBlock(block) + return block +} + +func (rl *ramLedger) appendBlock(block *ab.Block) { + rl.newest.next = &simpleList{ + signal: make(chan struct{}), + block: block, + } + + lastSignal := rl.newest.signal + logger.Debugf("Sending signal that block %d has a successor", rl.newest.block.Number) + rl.newest = rl.newest.next + close(lastSignal) + + rl.size++ + + if rl.size > rl.maxSize { + rl.oldest = rl.oldest.next + rl.size-- + } +} diff --git a/orderer/solo/ramledger_test.go b/orderer/rawledger/ramledger/ramledger_test.go similarity index 87% rename from orderer/solo/ramledger_test.go rename to orderer/rawledger/ramledger/ramledger_test.go index e4adc15a5ac..b97515fee27 100644 --- a/orderer/solo/ramledger_test.go +++ b/orderer/rawledger/ramledger/ramledger_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package solo +package ramledger import ( "testing" @@ -23,9 +23,10 @@ import ( ) // TestAppend ensures that appending blocks stores only the maxSize most recent blocks +// Note that 'only' is applicable because the genesis block will be discarded func TestAppend(t *testing.T) { maxSize := 3 - rl := newRAMLedger(maxSize) + rl := New(maxSize).(*ramLedger) var blocks []*ab.Block for i := 0; i < 3; i++ { blocks = append(blocks, &ab.Block{Number: uint64(i + 1)}) @@ -50,7 +51,7 @@ func TestAppend(t *testing.T) { // TestSignal checks if the signal channel closes when an item is appended func TestSignal(t *testing.T) { maxSize := 3 - rl := newRAMLedger(maxSize) + rl := New(maxSize).(*ramLedger) item := rl.newest select { case <-item.signal: @@ -66,12 +67,12 @@ func TestSignal(t *testing.T) { } // TestTruncatingSafety is intended to simulate a reader who fetches a reference to the oldest list item -// which is then pushed off the history by appending greater than the history size (here, 10 appeneds with +// which is then pushed off the history by appending greater than the history size (here, 10 appends with // a maxSize of 3). We let the go garbage collector ensure the references still exist func TestTruncationSafety(t *testing.T) { maxSize := 3 newBlocks := 10 - rl := newRAMLedger(maxSize) + rl := New(maxSize).(*ramLedger) item := rl.oldest for i := 0; i < newBlocks; i++ { rl.appendBlock(&ab.Block{Number: uint64(i + 1)}) @@ -83,6 +84,6 @@ func TestTruncationSafety(t *testing.T) { } if count != newBlocks { - t.Fatalf("The iterator should have found 10 new blocks") + t.Fatalf("The iterator should have found %d new blocks but found %d", newBlocks, count) } } diff --git a/orderer/rawledger/rawledger.go b/orderer/rawledger/rawledger.go new file mode 100644 index 00000000000..2bb1eec2862 --- /dev/null +++ b/orderer/rawledger/rawledger.go @@ -0,0 +1,49 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rawledger + +import ( + ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" +) + +// Iterator is useful for a chain Reader to stream blocks as they are created +type Iterator interface { + // Next blocks until there is a new block available, or returns an error if the next block is no longer retrievable + Next() (*ab.Block, ab.Status) + // ReadyChan supplies a channel which will block until Next will not block + ReadyChan() <-chan struct{} +} + +// Reader allows the caller to inspect the raw ledger +type Reader interface { + // Iterator retrieves an Iterator, as specified by an ab.SeekInfo message, returning an iterator, and it's starting block number + Iterator(startType ab.SeekInfo_StartType, specified uint64) (Iterator, uint64) + // Height returns the highest block number in the chain, plus one + Height() uint64 +} + +// Writer allows the caller to modify the raw ledger +type Writer interface { + // Append a new block to the ledger + Append(blockContents []*ab.BroadcastMessage, proof []byte) *ab.Block +} + +// ReadWriter encapsulated both the reading and writing functions of the rawledger +type ReadWriter interface { + Reader + Writer +} diff --git a/orderer/rawledger/util.go b/orderer/rawledger/util.go new file mode 100644 index 00000000000..9070e2ed0d6 --- /dev/null +++ b/orderer/rawledger/util.go @@ -0,0 +1,41 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rawledger + +import ( + ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" +) + +var closedChan chan struct{} + +func init() { + closedChan = make(chan struct{}) + close(closedChan) +} + +// NotFoundErrorIterator simply always returns an error of ab.Status_NOT_FOUND, and is generally useful for implementations of the Reader interface +type NotFoundErrorIterator struct{} + +// Next returns nil, ab.Status_NOT_FOUND +func (nfei *NotFoundErrorIterator) Next() (*ab.Block, ab.Status) { + return nil, ab.Status_NOT_FOUND +} + +// ReadyChan returns a closed channel +func (nfei *NotFoundErrorIterator) ReadyChan() <-chan struct{} { + return closedChan +} diff --git a/orderer/solo/broadcast.go b/orderer/solo/broadcast.go index 708049a4818..2212e30e0df 100644 --- a/orderer/solo/broadcast.go +++ b/orderer/solo/broadcast.go @@ -20,29 +20,30 @@ import ( "time" ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" + "github.com/hyperledger/fabric/orderer/rawledger" ) type broadcastServer struct { queue chan *ab.BroadcastMessage batchSize int batchTimeout time.Duration - rl *ramLedger + rl rawledger.Writer exitChan chan struct{} } -func newBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rs *ramLedger) *broadcastServer { - bs := newPlainBroadcastServer(queueSize, batchSize, batchTimeout, rs) - bs.exitChan = make(chan struct{}) +func newBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl rawledger.Writer) *broadcastServer { + bs := newPlainBroadcastServer(queueSize, batchSize, batchTimeout, rl) go bs.main() return bs } -func newPlainBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl *ramLedger) *broadcastServer { +func newPlainBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl rawledger.Writer) *broadcastServer { bs := &broadcastServer{ queue: make(chan *ab.BroadcastMessage, queueSize), batchSize: batchSize, batchTimeout: batchTimeout, rl: rl, + exitChan: make(chan struct{}), } return bs } @@ -56,31 +57,28 @@ func (bs *broadcastServer) main() { outer: for { timer := time.After(bs.batchTimeout) - select { - case msg := <-bs.queue: - curBatch = append(curBatch, msg) - if len(curBatch) < bs.batchSize { - continue - } - logger.Debugf("Batch size met, creating block") - case <-timer: - if len(curBatch) == 0 { - continue outer + for { + select { + case msg := <-bs.queue: + curBatch = append(curBatch, msg) + if len(curBatch) < bs.batchSize { + continue + } + logger.Debugf("Batch size met, creating block") + case <-timer: + if len(curBatch) == 0 { + continue outer + } + logger.Debugf("Batch timer expired, creating block") + case <-bs.exitChan: + logger.Debugf("Exiting") + return } - logger.Debugf("Batch timer expired, creating block") - case <-bs.exitChan: - logger.Debugf("Exiting") - return + break } - block := &ab.Block{ - Number: bs.rl.newest.block.Number + 1, - PrevHash: bs.rl.newest.block.Hash(), - Messages: curBatch, - } + bs.rl.Append(curBatch, nil) curBatch = nil - - bs.rl.appendBlock(block) } } diff --git a/orderer/solo/broadcast_test.go b/orderer/solo/broadcast_test.go index e06bfd68eb6..796fdb71615 100644 --- a/orderer/solo/broadcast_test.go +++ b/orderer/solo/broadcast_test.go @@ -24,6 +24,8 @@ import ( "google.golang.org/grpc" ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" + "github.com/hyperledger/fabric/orderer/rawledger" + "github.com/hyperledger/fabric/orderer/rawledger/ramledger" ) type mockB struct { @@ -88,23 +90,23 @@ func TestEmptyBroadcastMessage(t *testing.T) { } func TestEmptyBatch(t *testing.T) { - bs := newPlainBroadcastServer(2, 1, time.Millisecond, newRAMLedger(10)) + bs := newPlainBroadcastServer(2, 1, time.Millisecond, ramledger.New(10)) time.Sleep(100 * time.Millisecond) // Note, this is not a race, as worst case, the timer does not expire, and the test still passes - if bs.rl.size != 1 { + if bs.rl.(rawledger.Reader).Height() != 1 { t.Fatalf("Expected no new blocks created") } } func TestFilledBatch(t *testing.T) { batchSize := 2 - bs := newBroadcastServer(0, batchSize, time.Hour, newRAMLedger(10)) + bs := newBroadcastServer(0, batchSize, time.Hour, ramledger.New(10)) defer bs.halt() messages := 11 // Sending 11 messages, with a batch size of 2, ensures the 10th message is processed before we proceed for 5 blocks for i := 0; i < messages; i++ { bs.queue <- &ab.BroadcastMessage{[]byte("Some bytes")} } - expected := 1 + messages/batchSize - if bs.rl.size != expected { - t.Fatalf("Expected %d blocks but got %d", expected, bs.rl.size) + expected := uint64(1 + messages/batchSize) + if bs.rl.(rawledger.Reader).Height() != expected { + t.Fatalf("Expected %d blocks but got %d", expected, bs.rl.(rawledger.Reader).Height()) } } diff --git a/orderer/solo/deliver.go b/orderer/solo/deliver.go index db6876c726e..825131537a9 100644 --- a/orderer/solo/deliver.go +++ b/orderer/solo/deliver.go @@ -18,14 +18,15 @@ package solo import ( ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" + "github.com/hyperledger/fabric/orderer/rawledger" ) type deliverServer struct { - rl *ramLedger + rl rawledger.Reader maxWindow int } -func newDeliverServer(rl *ramLedger, maxWindow int) *deliverServer { +func newDeliverServer(rl rawledger.Reader, maxWindow int) *deliverServer { return &deliverServer{ rl: rl, maxWindow: maxWindow, @@ -40,13 +41,14 @@ func (ds *deliverServer) handleDeliver(srv ab.AtomicBroadcast_DeliverServer) err } type deliverer struct { - ds *deliverServer - srv ab.AtomicBroadcast_DeliverServer - cursor *simpleList - windowSize uint64 - lastAck uint64 - recvChan chan *ab.DeliverUpdate - exitChan chan struct{} + ds *deliverServer + srv ab.AtomicBroadcast_DeliverServer + cursor rawledger.Iterator + nextBlockNumber uint64 + windowSize uint64 + lastAck uint64 + recvChan chan *ab.DeliverUpdate + exitChan chan struct{} } func newDeliverer(ds *deliverServer, srv ab.AtomicBroadcast_DeliverServer) *deliverer { @@ -65,7 +67,7 @@ func (d *deliverer) halt() { } func (d *deliverer) main() { - var signal chan struct{} + var signal <-chan struct{} for { select { case update := <-d.recvChan: @@ -83,12 +85,24 @@ func (d *deliverer) main() { close(d.exitChan) return default: - logger.Errorf("Unknown type: %v", t) + logger.Errorf("Unknown type: %T:%v", t, t) close(d.exitChan) return } case <-signal: - logger.Debugf("Signal triggered wakeup") + block, status := d.cursor.Next() + if status != ab.Status_SUCCESS { + logger.Errorf("Error reading from channel, cause was: %v", status) + if !d.sendErrorReply(status) { + return + } + d.cursor = nil + } else { + d.nextBlockNumber = block.Number + 1 + if !d.sendBlockReply(block) { + return + } + } case <-d.exitChan: return } @@ -98,24 +112,13 @@ func (d *deliverer) main() { continue } - for { - if d.cursor.next == nil { - logger.Debugf("Ran out of blocks, blocking for signal") - signal = d.cursor.signal - break - } - - if d.lastAck+d.windowSize < d.cursor.next.block.Number { - signal = nil - break - } - - logger.Debugf("Sending block to client") - d.cursor = d.cursor.next - if !d.sendBlockReply(d.cursor.block) { - return - } + if d.lastAck+d.windowSize < d.nextBlockNumber { + signal = nil + continue } + + logger.Debugf("Room for more blocks, activating channel") + signal = d.cursor.ReadyChan() } } @@ -164,7 +167,9 @@ func (d *deliverer) sendBlockReply(block *ab.Block) bool { } func (d *deliverer) processUpdate(update *ab.SeekInfo) bool { - d.cursor = nil + if d.cursor != nil { + d.cursor = nil + } logger.Debugf("Updating properties for client") if update == nil || update.WindowSize == 0 || update.WindowSize > MagicLargestWindow { @@ -174,39 +179,8 @@ func (d *deliverer) processUpdate(update *ab.SeekInfo) bool { d.windowSize = update.WindowSize - switch update.Start { - case ab.SeekInfo_OLDEST: - oldest := d.ds.rl.oldest - d.cursor = &simpleList{ - block: &ab.Block{Number: oldest.block.Number - 1}, // Potential underflow, so do not use > or <, use == +1 - next: oldest, - signal: make(chan struct{}), - } - close(d.cursor.signal) - case ab.SeekInfo_NEWEST: - newest := d.ds.rl.newest - d.cursor = &simpleList{ - block: &ab.Block{Number: newest.block.Number - 1}, // Potential underflow, so do not use > or <, use == +1 - next: newest, - signal: make(chan struct{}), - } - close(d.cursor.signal) - case ab.SeekInfo_SPECIFIED: - d.cursor = d.ds.rl.oldest - target := update.SpecifiedNumber - if target < d.cursor.block.Number || target > d.ds.rl.newest.block.Number+1 { - d.cursor = nil - return d.sendErrorReply(ab.Status_NOT_FOUND) - } - - for { - if d.cursor.block.Number == target-1 { - break - } - d.cursor = d.cursor.next // No need for nil check, because of range check above - } - } + d.cursor, d.nextBlockNumber = d.ds.rl.Iterator(update.Start, update.SpecifiedNumber) + d.lastAck = d.nextBlockNumber - 1 - d.lastAck = d.cursor.block.Number return true } diff --git a/orderer/solo/deliver_test.go b/orderer/solo/deliver_test.go index 637f982f9d6..6a2991fbce0 100644 --- a/orderer/solo/deliver_test.go +++ b/orderer/solo/deliver_test.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" + "github.com/hyperledger/fabric/orderer/rawledger/ramledger" ) type mockD struct { @@ -54,9 +55,9 @@ func (m *mockD) Recv() (*ab.DeliverUpdate, error) { func TestOldestSeek(t *testing.T) { ledgerSize := 5 - rl := newRAMLedger(ledgerSize) + rl := ramledger.New(ledgerSize) for i := 1; i < ledgerSize; i++ { - rl.appendBlock(&ab.Block{Number: uint64(i)}) + rl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() @@ -86,9 +87,9 @@ func TestOldestSeek(t *testing.T) { func TestNewestSeek(t *testing.T) { ledgerSize := 5 - rl := newRAMLedger(ledgerSize) + rl := ramledger.New(ledgerSize) for i := 1; i < ledgerSize; i++ { - rl.appendBlock(&ab.Block{Number: uint64(i)}) + rl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() @@ -115,9 +116,9 @@ func TestNewestSeek(t *testing.T) { func TestSpecificSeek(t *testing.T) { ledgerSize := 5 - rl := newRAMLedger(ledgerSize) + rl := ramledger.New(ledgerSize) for i := 1; i < ledgerSize; i++ { - rl.appendBlock(&ab.Block{Number: uint64(i)}) + rl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() @@ -143,9 +144,9 @@ func TestSpecificSeek(t *testing.T) { func TestBadSeek(t *testing.T) { ledgerSize := 5 - rl := newRAMLedger(ledgerSize) + rl := ramledger.New(ledgerSize) for i := 1; i < 2*ledgerSize; i++ { - rl.appendBlock(&ab.Block{Number: uint64(i)}) + rl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() @@ -179,7 +180,7 @@ func TestBadSeek(t *testing.T) { func TestBadWindow(t *testing.T) { ledgerSize := 5 - rl := newRAMLedger(ledgerSize) + rl := ramledger.New(ledgerSize) m := newMockD() defer close(m.recvChan) @@ -202,9 +203,9 @@ func TestBadWindow(t *testing.T) { func TestAck(t *testing.T) { ledgerSize := 10 windowSize := uint64(2) - rl := newRAMLedger(ledgerSize) + rl := ramledger.New(ledgerSize) for i := 1; i < ledgerSize; i++ { - rl.appendBlock(&ab.Block{Number: uint64(i)}) + rl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() diff --git a/orderer/solo/ramledger.go b/orderer/solo/ramledger.go deleted file mode 100644 index 4ce13b1f93a..00000000000 --- a/orderer/solo/ramledger.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package solo - -import ( - ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" -) - -type simpleList struct { - next *simpleList - signal chan struct{} - block *ab.Block -} - -type ramLedger struct { - maxSize int - size int - oldest *simpleList - newest *simpleList -} - -func newRAMLedger(maxSize int) *ramLedger { - rl := &ramLedger{ - maxSize: maxSize, - size: 1, - oldest: &simpleList{ - signal: make(chan struct{}), - block: &ab.Block{ - Number: 0, - PrevHash: []byte("GENESIS"), - }, - }, - } - rl.newest = rl.oldest - return rl -} - -func (rl *ramLedger) appendBlock(block *ab.Block) { - rl.newest.next = &simpleList{ - signal: make(chan struct{}), - block: block, - } - - lastSignal := rl.newest.signal - logger.Debugf("Sending signal that block %d has a successor", rl.newest.block.Number) - rl.newest = rl.newest.next - close(lastSignal) - - rl.size++ - - if rl.size > rl.maxSize { - rl.oldest = rl.oldest.next - rl.size-- - } -} diff --git a/orderer/solo/solo.go b/orderer/solo/solo.go index 859792519f7..781ffce72d9 100644 --- a/orderer/solo/solo.go +++ b/orderer/solo/solo.go @@ -20,6 +20,9 @@ import ( "time" ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" + "github.com/hyperledger/fabric/orderer/rawledger" + "github.com/hyperledger/fabric/orderer/rawledger/ramledger" + "github.com/op/go-logging" "google.golang.org/grpc" ) @@ -36,13 +39,13 @@ const MagicLargestWindow = 1000 type server struct { bs *broadcastServer ds *deliverServer - rl *ramLedger + rl rawledger.ReadWriter } // New creates a ab.AtomicBroadcastServer based on the solo orderer implementation func New(queueSize, batchSize, historySize int, batchTimeout time.Duration, grpcServer *grpc.Server) ab.AtomicBroadcastServer { logger.Infof("Starting solo with queueSize=%d batchSize=%d historySize=%d batchTimeout=%v", queueSize, batchSize, historySize, batchTimeout) - rl := newRAMLedger(historySize) + rl := ramledger.New(historySize) s := &server{ bs: newBroadcastServer(queueSize, batchSize, batchTimeout, rl), ds: newDeliverServer(rl, MagicLargestWindow),