Skip to content

Commit

Permalink
Abstract out a rawledger interface
Browse files Browse the repository at this point in the history
This also removes the ramledger implementation from solo,
and turns it into its own package, utilizing the new
rawledger interface.

https://jira.hyperledger.org/browse/FAB-325

Change-Id: I1993349e096fe1664089a919f955da1ab57e58a0
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Sep 14, 2016
1 parent 543baa3 commit 45bd645
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 183 deletions.
154 changes: 154 additions & 0 deletions orderer/rawledger/ramledger/ramledger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ramledger

import (
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/rawledger"

"github.com/op/go-logging"
)

var logger = logging.MustGetLogger("rawledger/ramledger")

func init() {
logging.SetLevel(logging.DEBUG, "")
}

type cursor struct {
list *simpleList
}

type simpleList struct {
next *simpleList
signal chan struct{}
block *ab.Block
}

type ramLedger struct {
maxSize int
size int
oldest *simpleList
newest *simpleList
}

// New creates a new instance of the ram ledger
func New(maxSize int) rawledger.ReadWriter {
rl := &ramLedger{
maxSize: maxSize,
size: 1,
oldest: &simpleList{
signal: make(chan struct{}),
block: &ab.Block{
Number: 0,
PrevHash: []byte("GENESIS"),
},
},
}
rl.newest = rl.oldest
return rl
}

// Height returns the highest block number in the chain, plus one
func (rl *ramLedger) Height() uint64 {
return rl.newest.block.Number + 1
}

// Iterator implements the rawledger.Reader definition
func (rl *ramLedger) Iterator(startType ab.SeekInfo_StartType, specified uint64) (rawledger.Iterator, uint64) {
var list *simpleList
switch startType {
case ab.SeekInfo_OLDEST:
oldest := rl.oldest
list = &simpleList{
block: &ab.Block{Number: oldest.block.Number - 1},
next: oldest,
signal: make(chan struct{}),
}
close(list.signal)
case ab.SeekInfo_NEWEST:
newest := rl.newest
list = &simpleList{
block: &ab.Block{Number: newest.block.Number - 1},
next: newest,
signal: make(chan struct{}),
}
close(list.signal)
case ab.SeekInfo_SPECIFIED:
list = rl.oldest
if specified < list.block.Number || specified > rl.newest.block.Number+1 {
return &rawledger.NotFoundErrorIterator{}, 0
}

for {
if list.block.Number == specified-1 {
break
}
list = list.next // No need for nil check, because of range check above
}
}
return &cursor{list: list}, list.block.Number + 1
}

// Next blocks until there is a new block available, or returns an error if the next block is no longer retrievable
func (cu *cursor) Next() (*ab.Block, ab.Status) {
// This only loops once, as signal reading indicates non-nil next
for {
if cu.list.next != nil {
cu.list = cu.list.next
return cu.list.block, ab.Status_SUCCESS
}

<-cu.list.signal
}
}

// ReadyChan returns a channel that will close when Next is ready to be called without blocking
func (cu *cursor) ReadyChan() <-chan struct{} {
return cu.list.signal
}

// Append creates a new block and appends it to the ledger
func (rl *ramLedger) Append(messages []*ab.BroadcastMessage, proof []byte) *ab.Block {
block := &ab.Block{
Number: rl.newest.block.Number + 1,
PrevHash: rl.newest.block.Hash(),
Messages: messages,
Proof: proof,
}
rl.appendBlock(block)
return block
}

func (rl *ramLedger) appendBlock(block *ab.Block) {
rl.newest.next = &simpleList{
signal: make(chan struct{}),
block: block,
}

lastSignal := rl.newest.signal
logger.Debugf("Sending signal that block %d has a successor", rl.newest.block.Number)
rl.newest = rl.newest.next
close(lastSignal)

rl.size++

if rl.size > rl.maxSize {
rl.oldest = rl.oldest.next
rl.size--
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package solo
package ramledger

import (
"testing"
Expand All @@ -23,9 +23,10 @@ import (
)

// TestAppend ensures that appending blocks stores only the maxSize most recent blocks
// Note that 'only' is applicable because the genesis block will be discarded
func TestAppend(t *testing.T) {
maxSize := 3
rl := newRAMLedger(maxSize)
rl := New(maxSize).(*ramLedger)
var blocks []*ab.Block
for i := 0; i < 3; i++ {
blocks = append(blocks, &ab.Block{Number: uint64(i + 1)})
Expand All @@ -50,7 +51,7 @@ func TestAppend(t *testing.T) {
// TestSignal checks if the signal channel closes when an item is appended
func TestSignal(t *testing.T) {
maxSize := 3
rl := newRAMLedger(maxSize)
rl := New(maxSize).(*ramLedger)
item := rl.newest
select {
case <-item.signal:
Expand All @@ -66,12 +67,12 @@ func TestSignal(t *testing.T) {
}

// TestTruncatingSafety is intended to simulate a reader who fetches a reference to the oldest list item
// which is then pushed off the history by appending greater than the history size (here, 10 appeneds with
// which is then pushed off the history by appending greater than the history size (here, 10 appends with
// a maxSize of 3). We let the go garbage collector ensure the references still exist
func TestTruncationSafety(t *testing.T) {
maxSize := 3
newBlocks := 10
rl := newRAMLedger(maxSize)
rl := New(maxSize).(*ramLedger)
item := rl.oldest
for i := 0; i < newBlocks; i++ {
rl.appendBlock(&ab.Block{Number: uint64(i + 1)})
Expand All @@ -83,6 +84,6 @@ func TestTruncationSafety(t *testing.T) {
}

if count != newBlocks {
t.Fatalf("The iterator should have found 10 new blocks")
t.Fatalf("The iterator should have found %d new blocks but found %d", newBlocks, count)
}
}
49 changes: 49 additions & 0 deletions orderer/rawledger/rawledger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rawledger

import (
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
)

// Iterator is useful for a chain Reader to stream blocks as they are created
type Iterator interface {
// Next blocks until there is a new block available, or returns an error if the next block is no longer retrievable
Next() (*ab.Block, ab.Status)
// ReadyChan supplies a channel which will block until Next will not block
ReadyChan() <-chan struct{}
}

// Reader allows the caller to inspect the raw ledger
type Reader interface {
// Iterator retrieves an Iterator, as specified by an ab.SeekInfo message, returning an iterator, and it's starting block number
Iterator(startType ab.SeekInfo_StartType, specified uint64) (Iterator, uint64)
// Height returns the highest block number in the chain, plus one
Height() uint64
}

// Writer allows the caller to modify the raw ledger
type Writer interface {
// Append a new block to the ledger
Append(blockContents []*ab.BroadcastMessage, proof []byte) *ab.Block
}

// ReadWriter encapsulated both the reading and writing functions of the rawledger
type ReadWriter interface {
Reader
Writer
}
41 changes: 41 additions & 0 deletions orderer/rawledger/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rawledger

import (
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
)

var closedChan chan struct{}

func init() {
closedChan = make(chan struct{})
close(closedChan)
}

// NotFoundErrorIterator simply always returns an error of ab.Status_NOT_FOUND, and is generally useful for implementations of the Reader interface
type NotFoundErrorIterator struct{}

// Next returns nil, ab.Status_NOT_FOUND
func (nfei *NotFoundErrorIterator) Next() (*ab.Block, ab.Status) {
return nil, ab.Status_NOT_FOUND
}

// ReadyChan returns a closed channel
func (nfei *NotFoundErrorIterator) ReadyChan() <-chan struct{} {
return closedChan
}
50 changes: 24 additions & 26 deletions orderer/solo/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,30 @@ import (
"time"

ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/rawledger"
)

type broadcastServer struct {
queue chan *ab.BroadcastMessage
batchSize int
batchTimeout time.Duration
rl *ramLedger
rl rawledger.Writer
exitChan chan struct{}
}

func newBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rs *ramLedger) *broadcastServer {
bs := newPlainBroadcastServer(queueSize, batchSize, batchTimeout, rs)
bs.exitChan = make(chan struct{})
func newBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl rawledger.Writer) *broadcastServer {
bs := newPlainBroadcastServer(queueSize, batchSize, batchTimeout, rl)
go bs.main()
return bs
}

func newPlainBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl *ramLedger) *broadcastServer {
func newPlainBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl rawledger.Writer) *broadcastServer {
bs := &broadcastServer{
queue: make(chan *ab.BroadcastMessage, queueSize),
batchSize: batchSize,
batchTimeout: batchTimeout,
rl: rl,
exitChan: make(chan struct{}),
}
return bs
}
Expand All @@ -56,31 +57,28 @@ func (bs *broadcastServer) main() {
outer:
for {
timer := time.After(bs.batchTimeout)
select {
case msg := <-bs.queue:
curBatch = append(curBatch, msg)
if len(curBatch) < bs.batchSize {
continue
}
logger.Debugf("Batch size met, creating block")
case <-timer:
if len(curBatch) == 0 {
continue outer
for {
select {
case msg := <-bs.queue:
curBatch = append(curBatch, msg)
if len(curBatch) < bs.batchSize {
continue
}
logger.Debugf("Batch size met, creating block")
case <-timer:
if len(curBatch) == 0 {
continue outer
}
logger.Debugf("Batch timer expired, creating block")
case <-bs.exitChan:
logger.Debugf("Exiting")
return
}
logger.Debugf("Batch timer expired, creating block")
case <-bs.exitChan:
logger.Debugf("Exiting")
return
break
}

block := &ab.Block{
Number: bs.rl.newest.block.Number + 1,
PrevHash: bs.rl.newest.block.Hash(),
Messages: curBatch,
}
bs.rl.Append(curBatch, nil)
curBatch = nil

bs.rl.appendBlock(block)
}
}

Expand Down
Loading

0 comments on commit 45bd645

Please sign in to comment.