Skip to content

Commit

Permalink
Merge "Update to new proposed block format"
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Nov 2, 2016
2 parents 8590dbb + ec38c35 commit 37b1168
Show file tree
Hide file tree
Showing 25 changed files with 1,064 additions and 252 deletions.
748 changes: 724 additions & 24 deletions bddtests/ab_pb2.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion bddtests/steps/orderer_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def readDeliveredMessages(self, expectedCount):
numToRead = self.getWindowSize() if self.getWindowSize() < expectedCount else expectedCount
msgsRead.extend(self.readMessages(numToRead))
# send the ack
self.sendAcknowledgment(msgsRead[-1].Block.Number)
self.sendAcknowledgment(msgsRead[-1].Block.Header.Number)
print('SentACK!!')
print('')
return msgsRead
Expand Down
8 changes: 4 additions & 4 deletions core/committer/noopssinglechain/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func (r *deliverClient) readUntilClose() {
fmt.Println("Got error ", t)
case *ab.DeliverResponse_Block:
txs := []*pb.Transaction2{}
for _, d := range t.Block.Messages {
if d != nil && d.Data != nil {
for _, d := range t.Block.Data.Data {
if d != nil {
tx := &pb.Transaction2{}
if err = proto.Unmarshal(d.Data, tx); err != nil {
if err = proto.Unmarshal(d, tx); err != nil {
fmt.Printf("Error getting tx(%s)...dropping block\n", err)
continue
}
Expand All @@ -152,7 +152,7 @@ func (r *deliverClient) readUntilClose() {
r.unAcknowledged++
if r.unAcknowledged >= r.windowSize/2 {
fmt.Println("Sending acknowledgement")
err = r.client.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Acknowledgement{Acknowledgement: &ab.Acknowledgement{Number: t.Block.Number}}})
err = r.client.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Acknowledgement{Acknowledgement: &ab.Acknowledgement{Number: t.Block.Header.Number}}})
if err != nil {
return
}
Expand Down
279 changes: 167 additions & 112 deletions orderer/atomicbroadcast/ab.pb.go

Large diffs are not rendered by default.

29 changes: 21 additions & 8 deletions orderer/atomicbroadcast/ab.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,28 @@ message DeliverUpdate {
}
}

// This is a temporary data structure, meant to hold the place of the finalized block structure
// This must be a 'block' structure and not a 'batch' structure, although the terminology is slightly confusing
// The requirement is to allow for a consumer of the orderer to declare the unvalidated blockchain as the definitive
// blockchain, without breaking the hash chain or existing proof
// This is a temporary data structure, but is hopefully very close to the finalized block structure
// Note that the BlockHeader chains to the previous BlockHeader, and the BlockData hash is embedded
// in the BlockHeader. This makes it natural and obvious that the Data is included in the hash, but
// the Metadata is not.
message Block {
uint64 Number = 2;
bytes PrevHash = 3;
bytes Proof = 4;
repeated BroadcastMessage Messages = 5;
BlockHeader Header = 1;
BlockData Data = 2;
BlockMetadata Metadata = 3;
}

message BlockHeader {
uint64 Number = 1; // The position in the blockchain
bytes PreviousHash = 2; // The hash of the previous block header
bytes DataHash = 3; // The hash of the BlockData, by MerkleTree
}

message BlockData {
repeated bytes Data = 1;
}

message BlockMetadata {
repeated bytes Metadata = 1;
}

message DeliverResponse {
Expand Down
11 changes: 10 additions & 1 deletion orderer/atomicbroadcast/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ import (
"github.com/hyperledger/fabric/core/util"
)

func (b *Block) Hash() []byte {
func (b *BlockHeader) Hash() []byte {
data, err := proto.Marshal(b) // XXX this is wrong, protobuf is not the right mechanism to serialize for a hash
if err != nil {
panic("This should never fail and is generally irrecoverable")
}

return util.ComputeCryptoHash(data)
}

func (b *BlockData) Hash() []byte {
data, err := proto.Marshal(b) // XXX this is wrong, protobuf is not the right mechanism to serialize for a hash, AND, it is not a MerkleTree hash
if err != nil {
panic("This should never fail and is generally irrecoverable")
}

return util.ComputeCryptoHash(data)
}
13 changes: 9 additions & 4 deletions orderer/common/bootstrap/static/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,17 @@ func (b *bootstrapper) GenesisBlock() (*ab.Block, error) {
},
})

data := &ab.BlockData{
Data: [][]byte{initialConfigTX},
}

return &ab.Block{
Number: 0,
PrevHash: []byte("GENESIS"),
Messages: []*ab.BroadcastMessage{
&ab.BroadcastMessage{Data: initialConfigTX},
Header: &ab.BlockHeader{
Number: 0,
PreviousHash: []byte("GENESIS"),
DataHash: data.Hash(),
},
Data: data,
}, nil

}
35 changes: 24 additions & 11 deletions orderer/kafka/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/config"

"github.com/golang/protobuf/proto"
)

// Broadcaster allows the caller to submit messages to the orderer
Expand All @@ -37,7 +39,7 @@ type broadcasterImpl struct {
once sync.Once

batchChan chan *ab.BroadcastMessage
messages []*ab.BroadcastMessage
messages [][]byte
nextNumber uint64
prevHash []byte
}
Expand All @@ -47,7 +49,7 @@ func newBroadcaster(conf *config.TopLevel) Broadcaster {
producer: newProducer(conf),
config: conf,
batchChan: make(chan *ab.BroadcastMessage, conf.General.BatchSize),
messages: []*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("genesis")}},
messages: [][]byte{[]byte("genesis")},
nextNumber: 0,
}
}
Expand Down Expand Up @@ -75,19 +77,30 @@ func (b *broadcasterImpl) Close() error {
}

func (b *broadcasterImpl) sendBlock() error {
data := &ab.BlockData{
Data: b.messages,
}
block := &ab.Block{
Messages: b.messages,
Number: b.nextNumber,
PrevHash: b.prevHash,
Header: &ab.BlockHeader{
Number: b.nextNumber,
PreviousHash: b.prevHash,
DataHash: data.Hash(),
},
Data: data,
}
logger.Debugf("Prepared block %d with %d messages (%+v)", block.Number, len(block.Messages), block)
logger.Debugf("Prepared block %d with %d messages (%+v)", block.Header.Number, len(block.Data.Data), block)

b.messages = []*ab.BroadcastMessage{}
b.messages = [][]byte{}
b.nextNumber++
hash, data := hashBlock(block)
b.prevHash = hash
b.prevHash = block.Header.Hash()

blockBytes, err := proto.Marshal(block)

if err != nil {
logger.Fatalf("Error marshaling block: %s", err)
}

return b.producer.Send(data)
return b.producer.Send(blockBytes)
}

func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
Expand All @@ -96,7 +109,7 @@ func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
for {
select {
case msg := <-b.batchChan:
b.messages = append(b.messages, msg)
b.messages = append(b.messages, msg.Data)
if len(b.messages) >= int(maxSize) {
if !timer.Stop() {
<-timer.C
Expand Down
2 changes: 1 addition & 1 deletion orderer/kafka/broadcast_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk ch
producer: mockNewProducer(t, conf, seek, disk),
config: conf,
batchChan: make(chan *ab.BroadcastMessage, conf.General.BatchSize),
messages: []*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("checkpoint")}},
messages: [][]byte{[]byte("checkpoint")},
nextNumber: uint64(seek),
}
return mb
Expand Down
18 changes: 9 additions & 9 deletions orderer/kafka/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestBroadcastInit(t *testing.T) {
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if !(bytes.Equal(block.GetMessages()[0].Data, []byte("checkpoint"))) {
if !(bytes.Equal(block.Data.Data[0], []byte("checkpoint"))) {
t.Fatal("Expected first block to be a checkpoint")
}
return
Expand Down Expand Up @@ -125,8 +125,8 @@ func TestBroadcastBatch(t *testing.T) {
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != int(testConf.General.BatchSize) {
t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Messages))
if len(block.Data.Data) != int(testConf.General.BatchSize) {
t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Data.Data))
}
return
case <-time.After(500 * time.Millisecond):
Expand Down Expand Up @@ -176,8 +176,8 @@ func TestBroadcastIncompleteBatch(t *testing.T) {
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != messageCount {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Messages))
if len(block.Data.Data) != messageCount {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data))
}
return
case <-time.After(testConf.General.BatchTimeout + timePadding):
Expand Down Expand Up @@ -229,8 +229,8 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != messageCount {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Messages))
if len(block.Data.Data) != messageCount {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Data.Data))
}
return
case <-time.After(testConf.General.BatchTimeout + timePadding):
Expand Down Expand Up @@ -276,8 +276,8 @@ func TestBroadcastBatchAndQuitEarly(t *testing.T) {
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != int(testConf.General.BatchSize) {
t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Messages))
if len(block.Data.Data) != int(testConf.General.BatchSize) {
t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Data.Data))
}
return
case <-time.After(500 * time.Millisecond):
Expand Down
2 changes: 1 addition & 1 deletion orderer/kafka/client_deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (cd *clientDelivererImpl) sendBlocks(stream ab.AtomicBroadcast_DeliverServe
return fmt.Errorf("Failed to send block to the client: %s", err)
}
logger.Debugf("Sent block %v to client (prevHash: %v, messages: %v)\n",
block.Number, block.PrevHash, block.Messages)
block.Header.Number, block.Header.PreviousHash, block.Data.Data)
default:
// Return the push token if there are no messages
// available from the ordering service.
Expand Down
2 changes: 1 addition & 1 deletion orderer/kafka/client_deliver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func testClientDeliverAckFunc(label string, seek, window uint64, threshold, expe
case msg := <-mds.outgoing:
count++
if count == threshold {
mds.incoming <- testNewAckMessage(msg.GetBlock().Number)
mds.incoming <- testNewAckMessage(msg.GetBlock().Header.Number)
}
if count > expected {
t.Fatalf("Delivered %d blocks to the client w/o ACK, expected %d", count, expected)
Expand Down
23 changes: 15 additions & 8 deletions orderer/kafka/consumer_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"strconv"
"testing"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/config"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/golang/protobuf/proto"
)

type mockConsumerImpl struct {
Expand Down Expand Up @@ -93,15 +95,20 @@ func (mc *mockConsumerImpl) testFillWithBlocks(seek int64) {
}

func testNewConsumerMessage(offset int64, topic string) *sarama.ConsumerMessage {
blockData := &ab.BlockData{
Data: [][]byte{[]byte(strconv.FormatInt(offset, 10))},
}
block := &ab.Block{
Messages: []*ab.BroadcastMessage{
&ab.BroadcastMessage{
Data: []byte(strconv.FormatInt(offset, 10)),
},
Header: &ab.BlockHeader{
Number: uint64(offset),
},
Number: uint64(offset),
Data: blockData,
}

data, err := proto.Marshal(block)
if err != nil {
panic("Error marshaling block")
}
_, data := hashBlock(block)

return &sarama.ConsumerMessage{
Value: sarama.ByteEncoder(data),
Expand Down
16 changes: 0 additions & 16 deletions orderer/kafka/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@ limitations under the License.
package kafka

import (
"fmt"

"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/config"
"golang.org/x/crypto/sha3"
)

const (
Expand All @@ -32,17 +27,6 @@ const (
windowOutOfRangeError = "Window out of range"
)

func hashBlock(block *ab.Block) (hash, data []byte) {
data, err := proto.Marshal(block)
if err != nil {
panic(fmt.Errorf("Failed to marshal block: %v", err))
}

hash = make([]byte, 64)
sha3.ShakeSum256(hash, data)
return
}

func newBrokerConfig(conf *config.TopLevel) *sarama.Config {
brokerConfig := sarama.NewConfig()
brokerConfig.Version = conf.Kafka.Version
Expand Down
4 changes: 2 additions & 2 deletions orderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func retrieveConfiguration(rl rawledger.Reader) *ab.ConfigurationEnvelope {
panic(fmt.Errorf("Error parsing blockchain at startup: %v", status))
}
// ConfigTxs should always be by themselves
if len(block.Messages) != 1 {
if len(block.Data.Data) != 1 {
continue
}

maybeConfigTx := &ab.ConfigurationEnvelope{}

err := proto.Unmarshal(block.Messages[0].Data, maybeConfigTx)
err := proto.Unmarshal(block.Data.Data[0], maybeConfigTx)

if err == nil {
lastConfigTx = maybeConfigTx
Expand Down
Loading

0 comments on commit 37b1168

Please sign in to comment.