Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAB-18198 Ch.Part.API: Restructure registrar init #2015

Merged
merged 1 commit into from
Oct 17, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 71 additions & 26 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,19 @@ func (r *Registrar) init(consenters map[string]consensus.Consenter) {
channelsWithJoinBlock := r.loadJoinBlocks()

// Discover all ledgers. This should already include all channels with join blocks as well.
existingChannels := r.ledgerFactory.ChannelIDs()
// Make sure there are no empty ledgers without a corresponding join-block.
existingChannels := r.discoverLedgers(channelsWithJoinBlock)

// Scan for and initialize the system channel, if it exists.
// Make sure there are no empty ledgers without a corresponding join-block.
// Note that there may be channels with empty ledgers, but always with a join block.
r.initSystemChannel(existingChannels, channelsWithJoinBlock)
r.initSystemChannel(existingChannels)

// Initialize application channels, by creating either a consensus.Chain or a follower.Chain.
r.initAppChannels(existingChannels, channelsWithJoinBlock)
if r.systemChannelID == "" {
r.initAppChannels(existingChannels, channelsWithJoinBlock)
} else {
r.initAppChannelsWhenSystemChannelExists(existingChannels)
}
}

// startChannels starts internal go-routines in chains and followers.
Expand All @@ -171,22 +175,41 @@ func (r *Registrar) startChannels() {
}
}

func (r *Registrar) initSystemChannel(existingChannels []string, channelsWithJoinBlock map[string]*cb.Block) {
func (r *Registrar) discoverLedgers(channelsWithJoinBlock map[string]*cb.Block) []string {
// Discover all ledgers. This should already include all channels with join blocks as well.
existingChannels := r.ledgerFactory.ChannelIDs()

for _, channelID := range existingChannels {
rl, err := r.ledgerFactory.GetOrCreate(channelID)
if err != nil {
logger.Panicf("Ledger factory reported channelID %s but could not retrieve it: %s", channelID, err)
}

// Prune empty ledgers without a join block
if rl.Height() == 0 {
if _, ok := channelsWithJoinBlock[channelID]; !ok {
logger.Warnf("Channel '%s' has an empty ledger without a join-block, removing it", channelID)
if err := r.ledgerFactory.Remove(channelID); err != nil {
logger.Panicf("Ledger factory failed to remove empty ledger '%s', error: %s", channelID, err)
}
}
//TODO currently the system channel cannot be a follower, i.e. start with height==0 and a join-block.
// This may change in FAB-17911, when we allow a system channel to join via the channel participation API.
}
}

return r.ledgerFactory.ChannelIDs()
}

// initSystemChannel scan for and initialize the system channel, if it exists.
func (r *Registrar) initSystemChannel(existingChannels []string) {
for _, channelID := range existingChannels {
rl, err := r.ledgerFactory.GetOrCreate(channelID)
if err != nil {
logger.Panicf("Ledger factory reported channelID %s but could not retrieve it: %s", channelID, err)
}

if rl.Height() == 0 {
// At this point in the initialization flow the system channel cannot be with height==0 and a join-block.
// Even when the system channel is joined via the channel participation API, on-boarding is performed
// prior to this point. Therefore, this is an application channel.
continue // Skip application channels
}

Expand Down Expand Up @@ -234,15 +257,10 @@ func (r *Registrar) initSystemChannel(existingChannels []string, channelsWithJoi
}
}

// initAppChannels initializes application channels, assuming that the system channel does NOT exist.
// This implies that the orderer is using the channel participation API for joins (channel creation).
func (r *Registrar) initAppChannels(existingChannels []string, channelsWithJoinBlock map[string]*cb.Block) {
var appChannelsWithoutJoinBlock []string
for _, channelID := range existingChannels {
if _, withJoinBlock := channelsWithJoinBlock[channelID]; channelID == r.systemChannelID || withJoinBlock {
continue
}
appChannelsWithoutJoinBlock = append(appChannelsWithoutJoinBlock, channelID)
}

// init app channels with join-blocks
for channelID, joinBlock := range channelsWithJoinBlock {
ledgerRes, clusterConsenter, err := r.initLedgerResourcesClusterConsenter(joinBlock)
if err != nil {
Expand All @@ -268,7 +286,12 @@ func (r *Registrar) initAppChannels(existingChannels []string, channelsWithJoinB
}
}

for _, channelID := range appChannelsWithoutJoinBlock {
// init app channels without join-blocks
for _, channelID := range existingChannels {
if _, withJoinBlock := channelsWithJoinBlock[channelID]; withJoinBlock {
continue // Skip channels with join-blocks, since they were already initialized above.
}

rl, err := r.ledgerFactory.GetOrCreate(channelID)
if err != nil {
logger.Panicf("Ledger factory reported channelID %s but could not retrieve it: %s", channelID, err)
Expand All @@ -284,15 +307,6 @@ func (r *Registrar) initAppChannels(existingChannels []string, channelsWithJoinB
logger.Panicf("Error creating ledger resources: %s", err)
}

if r.systemChannel != nil {
chainSupport, err := newChainSupport(r, ledgerRes, r.consenters, r.signer, r.blockcutterMetrics, r.bccsp)
if err != nil {
logger.Panicf("Failed to create chain support for channel '%s', error: %s", channelID, err)
}
r.chains[channelID] = chainSupport
continue
}

ordererConfig, _ := ledgerRes.OrdererConfig()
consenter, foundConsenter := r.consenters[ordererConfig.ConsensusType()]
if !foundConsenter {
Expand Down Expand Up @@ -324,6 +338,37 @@ func (r *Registrar) initAppChannels(existingChannels []string, channelsWithJoinB
}
}

// initAppChannelsWhenSystemChannelExists initializes application channels, assuming that the system channel exists.
// This implies that the channel participation API is not used for joins (channel creation). Therefore, there are no
// join-blocks, and follower.Chain(s) are never created. The call to newChainSupport creates a consensus.Chain of the
// appropriate type.
func (r *Registrar) initAppChannelsWhenSystemChannelExists(existingChannels []string) {
for _, channelID := range existingChannels {
if channelID == r.systemChannelID {
continue // Skip system channel
}
rl, err := r.ledgerFactory.GetOrCreate(channelID)
if err != nil {
logger.Panicf("Ledger factory reported channelID %s but could not retrieve it: %s", channelID, err)
}

configTxEnv := configTx(rl)
if configTxEnv == nil {
logger.Panic("Programming error, configTxEnv should never be nil here")
}
ledgerRes, err := r.newLedgerResources(configTxEnv)
if err != nil {
logger.Panicf("Error creating ledger resources: %s", err)
}

chainSupport, err := newChainSupport(r, ledgerRes, r.consenters, r.signer, r.blockcutterMetrics, r.bccsp)
if err != nil {
logger.Panicf("Failed to create chain support for channel '%s', error: %s", channelID, err)
}
r.chains[channelID] = chainSupport
}
}

func (r *Registrar) initLedgerResourcesClusterConsenter(configBlock *cb.Block) (*ledgerResources, consensus.ClusterConsenter, error) {
configEnv, err := protoutil.ExtractEnvelope(configBlock, 0)
if err != nil {
Expand Down