Skip to content

Commit

Permalink
Update to new proposed block format
Browse files Browse the repository at this point in the history
This changeset updates ab.proto to use the block format discussed in
JIRA issue https://jira.hyperledger.org/browse/FAB-384

It more closely mirrors the batch format proposed by Simon in the SBFT
work as well as the time tested bitcoin block structure.

Note that the block is split into three pieces, Header, Data, and
Metadata.  This makes the clear distinction that the Headers chain
together, and the Headers have a reference to the hash of the Data.
Therefore the Metadata section can be used for storage of information
which should not be hashed, such as signature collections.  See the JIRA
issue for more details.

Assuming this block format is accepted, it should be removed from
ab.proto and pushed into the fabric.proto / fabric_next.proto.

Change-Id: I9e9f5afa10e29258ba5ad2a8c536a781e765664f
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Nov 1, 2016
1 parent a293bc9 commit ec38c35
Show file tree
Hide file tree
Showing 25 changed files with 1,064 additions and 252 deletions.
748 changes: 724 additions & 24 deletions bddtests/ab_pb2.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion bddtests/steps/orderer_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def readDeliveredMessages(self, expectedCount):
numToRead = self.getWindowSize() if self.getWindowSize() < expectedCount else expectedCount
msgsRead.extend(self.readMessages(numToRead))
# send the ack
self.sendAcknowledgment(msgsRead[-1].Block.Number)
self.sendAcknowledgment(msgsRead[-1].Block.Header.Number)
print('SentACK!!')
print('')
return msgsRead
Expand Down
8 changes: 4 additions & 4 deletions core/committer/noopssinglechain/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func (r *deliverClient) readUntilClose() {
fmt.Println("Got error ", t)
case *ab.DeliverResponse_Block:
txs := []*pb.Transaction2{}
for _, d := range t.Block.Messages {
if d != nil && d.Data != nil {
for _, d := range t.Block.Data.Data {
if d != nil {
tx := &pb.Transaction2{}
if err = proto.Unmarshal(d.Data, tx); err != nil {
if err = proto.Unmarshal(d, tx); err != nil {
fmt.Printf("Error getting tx(%s)...dropping block\n", err)
continue
}
Expand All @@ -152,7 +152,7 @@ func (r *deliverClient) readUntilClose() {
r.unAcknowledged++
if r.unAcknowledged >= r.windowSize/2 {
fmt.Println("Sending acknowledgement")
err = r.client.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Acknowledgement{Acknowledgement: &ab.Acknowledgement{Number: t.Block.Number}}})
err = r.client.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Acknowledgement{Acknowledgement: &ab.Acknowledgement{Number: t.Block.Header.Number}}})
if err != nil {
return
}
Expand Down
279 changes: 167 additions & 112 deletions orderer/atomicbroadcast/ab.pb.go

Large diffs are not rendered by default.

29 changes: 21 additions & 8 deletions orderer/atomicbroadcast/ab.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,28 @@ message DeliverUpdate {
}
}

// This is a temporary data structure, meant to hold the place of the finalized block structure
// This must be a 'block' structure and not a 'batch' structure, although the terminology is slightly confusing
// The requirement is to allow for a consumer of the orderer to declare the unvalidated blockchain as the definitive
// blockchain, without breaking the hash chain or existing proof
// This is a temporary data structure, but is hopefully very close to the finalized block structure
// Note that the BlockHeader chains to the previous BlockHeader, and the BlockData hash is embedded
// in the BlockHeader. This makes it natural and obvious that the Data is included in the hash, but
// the Metadata is not.
message Block {
uint64 Number = 2;
bytes PrevHash = 3;
bytes Proof = 4;
repeated BroadcastMessage Messages = 5;
BlockHeader Header = 1;
BlockData Data = 2;
BlockMetadata Metadata = 3;
}

message BlockHeader {
uint64 Number = 1; // The position in the blockchain
bytes PreviousHash = 2; // The hash of the previous block header
bytes DataHash = 3; // The hash of the BlockData, by MerkleTree
}

message BlockData {
repeated bytes Data = 1;
}

message BlockMetadata {
repeated bytes Metadata = 1;
}

message DeliverResponse {
Expand Down
11 changes: 10 additions & 1 deletion orderer/atomicbroadcast/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ import (
"github.com/hyperledger/fabric/core/util"
)

func (b *Block) Hash() []byte {
func (b *BlockHeader) Hash() []byte {
data, err := proto.Marshal(b) // XXX this is wrong, protobuf is not the right mechanism to serialize for a hash
if err != nil {
panic("This should never fail and is generally irrecoverable")
}

return util.ComputeCryptoHash(data)
}

func (b *BlockData) Hash() []byte {
data, err := proto.Marshal(b) // XXX this is wrong, protobuf is not the right mechanism to serialize for a hash, AND, it is not a MerkleTree hash
if err != nil {
panic("This should never fail and is generally irrecoverable")
}

return util.ComputeCryptoHash(data)
}
13 changes: 9 additions & 4 deletions orderer/common/bootstrap/static/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,17 @@ func (b *bootstrapper) GenesisBlock() (*ab.Block, error) {
},
})

data := &ab.BlockData{
Data: [][]byte{initialConfigTX},
}

return &ab.Block{
Number: 0,
PrevHash: []byte("GENESIS"),
Messages: []*ab.BroadcastMessage{
&ab.BroadcastMessage{Data: initialConfigTX},
Header: &ab.BlockHeader{
Number: 0,
PreviousHash: []byte("GENESIS"),
DataHash: data.Hash(),
},
Data: data,
}, nil

}
35 changes: 24 additions & 11 deletions orderer/kafka/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/config"

"github.com/golang/protobuf/proto"
)

// Broadcaster allows the caller to submit messages to the orderer
Expand All @@ -37,7 +39,7 @@ type broadcasterImpl struct {
once sync.Once

batchChan chan *ab.BroadcastMessage
messages []*ab.BroadcastMessage
messages [][]byte
nextNumber uint64
prevHash []byte
}
Expand All @@ -47,7 +49,7 @@ func newBroadcaster(conf *config.TopLevel) Broadcaster {
producer: newProducer(conf),
config: conf,
batchChan: make(chan *ab.BroadcastMessage, conf.General.BatchSize),
messages: []*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("genesis")}},
messages: [][]byte{[]byte("genesis")},
nextNumber: 0,
}
}
Expand Down Expand Up @@ -75,19 +77,30 @@ func (b *broadcasterImpl) Close() error {
}

func (b *broadcasterImpl) sendBlock() error {
data := &ab.BlockData{
Data: b.messages,
}
block := &ab.Block{
Messages: b.messages,
Number: b.nextNumber,
PrevHash: b.prevHash,
Header: &ab.BlockHeader{
Number: b.nextNumber,
PreviousHash: b.prevHash,
DataHash: data.Hash(),
},
Data: data,
}
logger.Debugf("Prepared block %d with %d messages (%+v)", block.Number, len(block.Messages), block)
logger.Debugf("Prepared block %d with %d messages (%+v)", block.Header.Number, len(block.Data.Data), block)

b.messages = []*ab.BroadcastMessage{}
b.messages = [][]byte{}
b.nextNumber++
hash, data := hashBlock(block)
b.prevHash = hash
b.prevHash = block.Header.Hash()

blockBytes, err := proto.Marshal(block)

if err != nil {
logger.Fatalf("Error marshaling block: %s", err)
}

return b.producer.Send(data)
return b.producer.Send(blockBytes)
}

func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
Expand All @@ -96,7 +109,7 @@ func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
for {
select {
case msg := <-b.batchChan:
b.messages = append(b.messages, msg)
b.messages = append(b.messages, msg.Data)
if len(b.messages) >= int(maxSize) {
if !timer.Stop() {
<-timer.C
Expand Down
2 changes: 1 addition & 1 deletion orderer/kafka/broadcast_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk ch
producer: mockNewProducer(t, conf, seek, disk),
config: conf,
batchChan: make(chan *ab.BroadcastMessage, conf.General.BatchSize),
messages: []*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("checkpoint")}},
messages: [][]byte{[]byte("checkpoint")},
nextNumber: uint64(seek),
}
return mb
Expand Down
18 changes: 9 additions & 9 deletions orderer/kafka/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestBroadcastInit(t *testing.T) {
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if !(bytes.Equal(block.GetMessages()[0].Data, []byte("checkpoint"))) {
if !(bytes.Equal(block.Data.Data[0], []byte("checkpoint"))) {
t.Fatal("Expected first block to be a checkpoint")
}
return
Expand Down Expand Up @@ -125,8 +125,8 @@ func TestBroadcastBatch(t *testing.T) {
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != int(testConf.General.BatchSize) {
t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Messages))
if len(block.Data.Data) != int(testConf.General.BatchSize) {
t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Data.Data))
}
return
case <-time.After(500 * time.Millisecond):
Expand Down Expand Up @@ -176,8 +176,8 @@ func TestBroadcastIncompleteBatch(t *testing.T) {
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != messageCount {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Messages))
if len(block.Data.Data) != messageCount {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data))
}
return
case <-time.After(testConf.General.BatchTimeout + timePadding):
Expand Down Expand Up @@ -229,8 +229,8 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != messageCount {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Messages))
if len(block.Data.Data) != messageCount {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data))
}
return
case <-time.After(testConf.General.BatchTimeout + timePadding):
Expand Down Expand Up @@ -276,8 +276,8 @@ func TestBroadcastBatchAndQuitEarly(t *testing.T) {
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != int(testConf.General.BatchSize) {
t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Messages))
if len(block.Data.Data) != int(testConf.General.BatchSize) {
t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Data.Data))
}
return
case <-time.After(500 * time.Millisecond):
Expand Down
2 changes: 1 addition & 1 deletion orderer/kafka/client_deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (cd *clientDelivererImpl) sendBlocks(stream ab.AtomicBroadcast_DeliverServe
return fmt.Errorf("Failed to send block to the client: %s", err)
}
logger.Debugf("Sent block %v to client (prevHash: %v, messages: %v)\n",
block.Number, block.PrevHash, block.Messages)
block.Header.Number, block.Header.PreviousHash, block.Data.Data)
default:
// Return the push token if there are no messages
// available from the ordering service.
Expand Down
2 changes: 1 addition & 1 deletion orderer/kafka/client_deliver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func testClientDeliverAckFunc(label string, seek, window uint64, threshold, expe
case msg := <-mds.outgoing:
count++
if count == threshold {
mds.incoming <- testNewAckMessage(msg.GetBlock().Number)
mds.incoming <- testNewAckMessage(msg.GetBlock().Header.Number)
}
if count > expected {
t.Fatalf("Delivered %d blocks to the client w/o ACK, expected %d", count, expected)
Expand Down
23 changes: 15 additions & 8 deletions orderer/kafka/consumer_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"strconv"
"testing"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/config"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/golang/protobuf/proto"
)

type mockConsumerImpl struct {
Expand Down Expand Up @@ -93,15 +95,20 @@ func (mc *mockConsumerImpl) testFillWithBlocks(seek int64) {
}

func testNewConsumerMessage(offset int64, topic string) *sarama.ConsumerMessage {
blockData := &ab.BlockData{
Data: [][]byte{[]byte(strconv.FormatInt(offset, 10))},
}
block := &ab.Block{
Messages: []*ab.BroadcastMessage{
&ab.BroadcastMessage{
Data: []byte(strconv.FormatInt(offset, 10)),
},
Header: &ab.BlockHeader{
Number: uint64(offset),
},
Number: uint64(offset),
Data: blockData,
}

data, err := proto.Marshal(block)
if err != nil {
panic("Error marshaling block")
}
_, data := hashBlock(block)

return &sarama.ConsumerMessage{
Value: sarama.ByteEncoder(data),
Expand Down
16 changes: 0 additions & 16 deletions orderer/kafka/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@ limitations under the License.
package kafka

import (
"fmt"

"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/config"
"golang.org/x/crypto/sha3"
)

const (
Expand All @@ -32,17 +27,6 @@ const (
windowOutOfRangeError = "Window out of range"
)

func hashBlock(block *ab.Block) (hash, data []byte) {
data, err := proto.Marshal(block)
if err != nil {
panic(fmt.Errorf("Failed to marshal block: %v", err))
}

hash = make([]byte, 64)
sha3.ShakeSum256(hash, data)
return
}

func newBrokerConfig(conf *config.TopLevel) *sarama.Config {
brokerConfig := sarama.NewConfig()
brokerConfig.Version = conf.Kafka.Version
Expand Down
4 changes: 2 additions & 2 deletions orderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func retrieveConfiguration(rl rawledger.Reader) *ab.ConfigurationEnvelope {
panic(fmt.Errorf("Error parsing blockchain at startup: %v", status))
}
// ConfigTxs should always be by themselves
if len(block.Messages) != 1 {
if len(block.Data.Data) != 1 {
continue
}

maybeConfigTx := &ab.ConfigurationEnvelope{}

err := proto.Unmarshal(block.Messages[0].Data, maybeConfigTx)
err := proto.Unmarshal(block.Data.Data[0], maybeConfigTx)

if err == nil {
lastConfigTx = maybeConfigTx
Expand Down
Loading

0 comments on commit ec38c35

Please sign in to comment.