Skip to content

Commit

Permalink
Merge "[FAB-1524] Reinitialize chains on orderer restart"
Browse files Browse the repository at this point in the history
  • Loading branch information
christo4ferris authored and Gerrit Code Review committed Jan 11, 2017
2 parents 5c8c71c + 141ab4c commit 0f90df8
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 87 deletions.
2 changes: 0 additions & 2 deletions orderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ func main() {
}
} else {
logger.Infof("Not bootstrapping because of existing chains")
logger.Warningf("XXXXXXX RESTART IS NOT CURRENTLY SUPPORTED XXXXXXXXX")
// XXX Remove this once restart is supported
}

if conf.Kafka.Verbose {
Expand Down
12 changes: 2 additions & 10 deletions orderer/multichain/chainsupport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"

"github.com/golang/protobuf/proto"
)

type mockLedgerReadWriter struct {
Expand Down Expand Up @@ -94,17 +92,11 @@ func TestWriteLastConfiguration(t *testing.T) {
cs := &chainSupport{ledger: ml, configManager: cm}

lastConfig := func(block *cb.Block) uint64 {
md := &cb.Metadata{}
err := proto.Unmarshal(block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIGURATION], md)
if err != nil {
panic(err)
}
lc := &cb.LastConfiguration{}
err = proto.Unmarshal(md.Value, lc)
index, err := utils.GetLastConfigurationIndexFromBlock(block)
if err != nil {
panic(err)
}
return lc.Index
return index
}

expected := uint64(0)
Expand Down
58 changes: 14 additions & 44 deletions orderer/multichain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/hyperledger/fabric/orderer/common/sharedconfig"
"github.com/hyperledger/fabric/orderer/rawledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -59,48 +59,18 @@ type multiLedger struct {
sysChain *systemChain
}

// getConfigTx, this should ultimately be done more intelligently, but for now, we search the whole chain for txs and pick the last config one
func getConfigTx(reader rawledger.Reader) *cb.Envelope {
var lastConfigTx *cb.Envelope

it, _ := reader.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
// Iterate over the blockchain, looking for config transactions, track the most recent one encountered
// this will be the transaction which is returned
for {
select {
case <-it.ReadyChan():
block, status := it.Next()
if status != cb.Status_SUCCESS {
logger.Fatalf("Error parsing blockchain at startup: %v", status)
}
// ConfigTxs should always be by themselves
if len(block.Data.Data) != 1 {
continue
}

maybeConfigTx := &cb.Envelope{}

err := proto.Unmarshal(block.Data.Data[0], maybeConfigTx)

if err != nil {
logger.Fatalf("Found data which was not an envelope: %s", err)
}

payload := &cb.Payload{}
if err = proto.Unmarshal(maybeConfigTx.Payload, payload); err != nil {
logger.Fatalf("Unable to unmarshal transaction payload: %s", err)
}

if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) {
continue
}

logger.Debugf("Found configuration transaction for chain %x at block %d", payload.Header.ChainHeader.ChainID, block.Header.Number)
lastConfigTx = maybeConfigTx
default:
return lastConfigTx
}
lastBlock := rawledger.GetBlock(reader, reader.Height()-1)
index, err := utils.GetLastConfigurationIndexFromBlock(lastBlock)
if err != nil {
logger.Panicf("Chain did not have appropriately encoded last configuration in its latest block: %s", err)
}
configBlock := rawledger.GetBlock(reader, index)
if configBlock == nil {
logger.Panicf("Configuration block does not exist")
}

return utils.ExtractEnvelopeOrPanic(configBlock, 0)
}

// NewManagerImpl produces an instance of a Manager
Expand All @@ -126,16 +96,16 @@ func NewManagerImpl(ledgerFactory rawledger.Factory, consenters map[string]Conse

if sharedConfigManager.ChainCreators() != nil {
if ml.sysChain != nil {
logger.Fatalf("There appear to be two system chains %x and %x", ml.sysChain.support.ChainID(), chainID)
logger.Fatalf("There appear to be two system chains %s and %s", ml.sysChain.support.ChainID(), chainID)
}
logger.Debugf("Starting with system chain: %x", chainID)
logger.Debugf("Starting with system chain: %s", chainID)
chain := newChainSupport(createSystemChainFilters(ml, configManager), configManager, policyManager, backingLedger, sharedConfigManager, consenters)
ml.chains[string(chainID)] = chain
ml.sysChain = newSystemChain(chain)
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
defer chain.start()
} else {
logger.Debugf("Starting chain: %x", chainID)
logger.Debugf("Starting chain: %s", chainID)
chain := newChainSupport(createStandardFilters(configManager), configManager, policyManager, backingLedger, sharedConfigManager, consenters)
ml.chains[string(chainID)] = chain
chain.start()
Expand Down
15 changes: 10 additions & 5 deletions orderer/multichain/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func TestGetConfigTx(t *testing.T) {
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{makeConfigTx(provisional.TestChainID, 5)}))
ctx := makeConfigTx(provisional.TestChainID, 6)
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{ctx}))
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 7)}))

block := rawledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 7)})
block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIGURATION] = utils.MarshalOrPanic(&cb.Metadata{Value: utils.MarshalOrPanic(&cb.LastConfiguration{Index: 7})})
rl.Append(block)

pctx := getConfigTx(rl)

Expand All @@ -104,11 +107,13 @@ func TestGetConfigTxFailure(t *testing.T) {
}))
}
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 11)}))
pctx := getConfigTx(rl)
defer func() {
if recover() == nil {
t.Fatalf("Should have panic-ed because there was no configuration tx")
}
}()
getConfigTx(rl)

if pctx != nil {
t.Fatalf("Should not have found a configuration tx")
}
}

// This test essentially brings the entire system up and is ultimately what main.go will replicate
Expand Down
26 changes: 6 additions & 20 deletions orderer/rawledger/blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,6 @@ type ledgerTestFactory interface {

var testables []ledgerTestable

func getBlock(number uint64, li ReadWriter) *cb.Block {
i, _ := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: number}}})
select {
case <-i.ReadyChan():
block, status := i.Next()
if status != cb.Status_SUCCESS {
return nil
}
return block
default:
return nil
}
}

func allTest(t *testing.T, test func(ledgerTestFactory, *testing.T)) {
for _, lt := range testables {

Expand Down Expand Up @@ -83,7 +69,7 @@ func testInitialization(lf ledgerTestFactory, t *testing.T) {
if li.Height() != 1 {
t.Fatalf("Block height should be 1")
}
block := getBlock(0, li)
block := GetBlock(li, 0)
if block == nil {
t.Fatalf("Error retrieving genesis block")
}
Expand All @@ -109,7 +95,7 @@ func testReinitialization(lf ledgerTestFactory, t *testing.T) {
if li.Height() != 2 {
t.Fatalf("Block height should be 2")
}
block := getBlock(1, li)
block := GetBlock(li, 1)
if block == nil {
t.Fatalf("Error retrieving block 1")
}
Expand All @@ -124,7 +110,7 @@ func TestAddition(t *testing.T) {

func testAddition(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
genesis := getBlock(0, li)
genesis := GetBlock(li, 0)
if genesis == nil {
t.Fatalf("Could not retrieve genesis block")
}
Expand All @@ -134,7 +120,7 @@ func testAddition(lf ledgerTestFactory, t *testing.T) {
if li.Height() != 2 {
t.Fatalf("Block height should be 2")
}
block := getBlock(1, li)
block := GetBlock(li, 1)
if block == nil {
t.Fatalf("Error retrieving genesis block")
}
Expand Down Expand Up @@ -255,7 +241,7 @@ func testMultichain(lf ledgerTestFactory, t *testing.T) {
t.Fatalf("Error retrieving chain1: %s", err)
}

if b := getBlock(1, c1); !reflect.DeepEqual(c1b1, b) {
if b := GetBlock(c1, 1); !reflect.DeepEqual(c1b1, b) {
t.Fatalf("Did not properly store block 1 on chain 1:")
}

Expand All @@ -264,7 +250,7 @@ func testMultichain(lf ledgerTestFactory, t *testing.T) {
t.Fatalf("Error retrieving chain2: %s", err)
}

if b := getBlock(0, c2); reflect.DeepEqual(c2b0, b) {
if b := GetBlock(c2, 0); reflect.DeepEqual(c2b0, b) {
t.Fatalf("Did not properly store block 1 on chain 1")
}
}
15 changes: 15 additions & 0 deletions orderer/rawledger/rawledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,18 @@ func CreateNextBlock(rl Reader, messages []*cb.Envelope) *cb.Block {

return block
}

// GetBlock is a utility method for retrieving a single block
func GetBlock(rl Reader, index uint64) *cb.Block {
i, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: index}}})
select {
case <-i.ReadyChan():
block, status := i.Next()
if status != cb.Status_SUCCESS {
return nil
}
return block
default:
return nil
}
}
15 changes: 15 additions & 0 deletions protos/utils/blockutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ func GetChainIDFromBlock(block *cb.Block) (string, error) {
return payload.Header.ChainHeader.ChainID, nil
}

// GetLastConfigurationIndexFromBlock retrieves the index of the last configuration block as encoded in the block metadata
func GetLastConfigurationIndexFromBlock(block *cb.Block) (uint64, error) {
md := &cb.Metadata{}
err := proto.Unmarshal(block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIGURATION], md)
if err != nil {
return 0, err
}
lc := &cb.LastConfiguration{}
err = proto.Unmarshal(md.Value, lc)
if err != nil {
return 0, err
}
return lc.Index, nil
}

// GetBlockFromBlockBytes marshals the bytes into Block
func GetBlockFromBlockBytes(blockBytes []byte) (*cb.Block, error) {
block := &cb.Block{}
Expand Down
20 changes: 14 additions & 6 deletions protos/utils/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,32 @@ import (
ab "github.com/hyperledger/fabric/protos/orderer"
)

const CreationPolicyKey = "CreationPolicy"
const (
CreationPolicyKey = "CreationPolicy"
ChainCreatorsKey = "ChainCreators"
)

// ChainCreationConfiguration creates a new chain creation configuration envelope from
// the supplied creationPolicy, new chainID, and a template configuration envelope
// The template configuration envelope will have the correct chainID set on all items,
// and the first item will be a CreationPolicy which is ready for the signatures as
// required by the policy
// required by the policy, it also strips out the ChainCreators item as this is invalid
// for the ordering system chain
func ChainCreationConfiguration(creationPolicy, newChainID string, template *cb.ConfigurationEnvelope) *cb.ConfigurationEnvelope {
newConfigItems := make([]*cb.SignedConfigurationItem, len(template.Items))
var newConfigItems []*cb.SignedConfigurationItem
var hashBytes []byte

for i, item := range template.Items {
for _, item := range template.Items {
configItem := UnmarshalConfigurationItemOrPanic(item.ConfigurationItem)
if configItem.Type == cb.ConfigurationItem_Orderer && configItem.Key == ChainCreatorsKey {
continue
}
configItem.Header.ChainID = newChainID
newConfigItems[i] = &cb.SignedConfigurationItem{
newConfigItem := &cb.SignedConfigurationItem{
ConfigurationItem: MarshalOrPanic(configItem),
}
hashBytes = append(hashBytes, newConfigItems[i].ConfigurationItem...)
newConfigItems = append(newConfigItems, newConfigItem)
hashBytes = append(hashBytes, newConfigItem.ConfigurationItem...)
}

digest := cu.ComputeCryptoHash(hashBytes)
Expand Down

0 comments on commit 0f90df8

Please sign in to comment.