Skip to content

Commit

Permalink
Ch.Part.API: Save joinblock on JoinChannel (#1608)
Browse files Browse the repository at this point in the history
Remove joinblock upon completion of JoinChannel (succcess
or failure)..

FAB-18008 #done

Signed-off-by: Danny Cao <dcao@us.ibm.com>
Signed-off-by: Will Lahti <wtlahti@us.ibm.com>

Co-authored-by: Danny Cao <dcao@us.ibm.com>
  • Loading branch information
wlahti and caod123 authored Sep 15, 2020
1 parent 4ce4e10 commit ca9b032
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 59 deletions.
2 changes: 0 additions & 2 deletions orderer/common/follower/follower_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ func (c *Chain) pull() error {
return errors.Wrap(err, "failed to pull up to join block")
}

// TODO remove the join-block from the file repo

// The join block never returns an error. This is checked before the follower is started.
if isMem, _ := c.clusterConsenter.IsChannelMember(c.joinBlock); isMem {
c.setStatusReport(types.ClusterRelationMember, types.StatusActive)
Expand Down
88 changes: 69 additions & 19 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"path/filepath"
"sync"

"github.com/golang/protobuf/proto"
cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/channelconfig"
Expand Down Expand Up @@ -105,24 +106,29 @@ func NewRegistrar(
}

if config.ChannelParticipation.Enabled {
r.initializeJoinBlockFileRepo()
err := r.initializeJoinBlockFileRepo()
if err != nil {
logger.Panicf("Error initializing joinblock file repo: %s", err)
}
}

return r
}

// initialize the channel participation API joinblock file repo. This creates
// the fileRepoDir on the filesystem if it does not already exist.
func (r *Registrar) initializeJoinBlockFileRepo() {
func (r *Registrar) initializeJoinBlockFileRepo() error {
fileRepoDir := filepath.Join(r.config.FileLedger.Location, "filerepo")
logger.Infof("Channel Participation API enabled, registrar initializing with file repo %s", fileRepoDir)

joinBlockFileRepo, err := filerepo.New(fileRepoDir, "joinblock")
if err != nil {
logger.Panicf("Error initializing join block file repo: %s", err)
return err
}

r.joinBlockFileRepo = joinBlockFileRepo

return nil
}

func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
Expand Down Expand Up @@ -255,7 +261,9 @@ func (r *Registrar) initAppChannels(existingChannels []string, channelsWithJoinB
if _, _, err := r.createAsMember(ledgerRes, joinBlock, channelID); err != nil {
logger.Panicf("Failed to createAsMember, error: %s", err)
}
// TODO remove the join block
if err := r.removeJoinBlock(channelID); err != nil {
logger.Panicf("Failed to remove join-block, channel: %s, error: %s", channelID, err)
}
} else {
if _, _, err = r.createFollower(ledgerRes, clusterConsenter, joinBlock, channelID); err != nil {
logger.Panicf("Failed to createFollower, error: %s", err)
Expand Down Expand Up @@ -512,22 +520,25 @@ func (r *Registrar) createNewChain(configtx *cb.Envelope) *ChainSupport {
// SwitchFollowerToChain creates a consensus.Chain from the tip of the ledger, and removes the follower.
// It is called when a follower detects a config block that indicates cluster membership and halts, transferring
// execution to the consensus.Chain.
func (r *Registrar) SwitchFollowerToChain(chainName string) {
func (r *Registrar) SwitchFollowerToChain(channelID string) {
r.lock.Lock()
defer r.lock.Unlock()

lf, err := r.ledgerFactory.GetOrCreate(chainName)
lf, err := r.ledgerFactory.GetOrCreate(channelID)
if err != nil {
logger.Panicf("Failed obtaining ledger factory for %s: %v", chainName, err)
logger.Panicf("Failed obtaining ledger factory for channel %s: %v", channelID, err)
}

if _, chainExists := r.chains[chainName]; chainExists {
logger.Panicf("Programming error, chain already exists: %s", chainName)
if _, chainExists := r.chains[channelID]; chainExists {
logger.Panicf("Programming error, channel already exists: %s", channelID)
}

delete(r.followers, chainName)
logger.Debugf("Removed follower for channel %s", chainName)
delete(r.followers, channelID)
logger.Debugf("Removed follower for channel %s", channelID)
cs := r.createNewChain(configTx(lf))
if err := r.removeJoinBlock(channelID); err != nil {
logger.Panicf("Failed removing join-block for channel: %s: %v", channelID, err)
}
cs.start()
logger.Infof("Created and started channel %s", cs.ChannelID())
}
Expand Down Expand Up @@ -639,7 +650,7 @@ func (r *Registrar) ChannelInfo(channelID string) (types.ChannelInfo, error) {

// JoinChannel instructs the orderer to create a channel and join it with the provided config block.
// The URL field is empty, and is to be completed by the caller.
func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block, isAppChannel bool) (types.ChannelInfo, error) {
func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block, isAppChannel bool) (info types.ChannelInfo, err error) {
r.lock.RLock()
defer r.lock.RUnlock()

Expand All @@ -659,23 +670,45 @@ func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block, isAppCh
return types.ChannelInfo{}, types.ErrAppChannelsAlreadyExists
}

defer func() {
if err != nil {
if err2 := r.ledgerFactory.Remove(channelID); err2 != nil {
logger.Warningf("Failed to cleanup ledger: %v", err2)
}
}
}()
ledgerRes, clusterConsenter, err := r.initLedgerResourcesClusterConsenter(configBlock)
if err != nil {
return types.ChannelInfo{}, err
}

//TODO save the join-block in the file repo to make this action crash tolerant.
//TODO remove join block & ledger if things go bad below, (using defer that identifies an error?)
blockBytes, err := proto.Marshal(configBlock)
if err != nil {
return types.ChannelInfo{}, errors.Wrap(err, "failed marshaling joinblock")
}

if err := r.joinBlockFileRepo.Save(channelID, blockBytes); err != nil {
return types.ChannelInfo{}, errors.Wrapf(err, "failed saving joinblock to file repo for channel %s", channelID)
}
defer func() {
if err != nil {
if err2 := r.removeJoinBlock(channelID); err2 != nil {
logger.Warningf("Failed to cleanup joinblock for channel %s: %v", channelID, err2)
}
}
}()

isMember, err := clusterConsenter.IsChannelMember(configBlock)
if err != nil {
return types.ChannelInfo{}, errors.Wrap(err, "failed to determine cluster membership from join-block ")
return types.ChannelInfo{}, errors.Wrap(err, "failed to determine cluster membership from join-block")
}

if configBlock.Header.Number == 0 && isMember {
chain, info, err := r.createAsMember(ledgerRes, configBlock, channelID)
if err == nil {
//TODO remove the join block
if err := r.removeJoinBlock(channelID); err != nil {
return types.ChannelInfo{}, err
}
chain.start()
}
return info, err
Expand All @@ -694,7 +727,7 @@ func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block, isAppCh
func (r *Registrar) createAsMember(ledgerRes *ledgerResources, configBlock *cb.Block, channelID string) (*ChainSupport, types.ChannelInfo, error) {
if ledgerRes.Height() == 0 {
if err := ledgerRes.Append(configBlock); err != nil {
return nil, types.ChannelInfo{}, errors.Wrap(err, "error appending join block to the ledger")
return nil, types.ChannelInfo{}, errors.Wrap(err, "failed to append join block to the ledger")
}
}
chain, err := newChainSupport(
Expand All @@ -706,7 +739,7 @@ func (r *Registrar) createAsMember(ledgerRes *ledgerResources, configBlock *cb.B
r.bccsp,
)
if err != nil {
return nil, types.ChannelInfo{}, errors.Wrap(err, "error creating chain support")
return nil, types.ChannelInfo{}, errors.Wrap(err, "failed to create chain support")
}

info := types.ChannelInfo{
Expand Down Expand Up @@ -804,7 +837,16 @@ func (r *Registrar) removeMember(channelID string, cs *ChainSupport) error {
}

func (r *Registrar) removeFollower(channelID string, follower *follower.Chain) error {
// TODO if follower is onboarding, remove the joinblock from file repo
follower.Halt()

// join block may still exist if the follower is:
// 1) still onboarding
// 2) active but not yet called registrar.SwitchFollowerToChain()
// NOTE: if the join block does not exist, os.RemoveAll returns nil
// so there is no harm attempting to remove a non-existent join block.
if err := r.removeJoinBlock(channelID); err != nil {
return err
}

err := r.ledgerFactory.Remove(channelID)
if err != nil {
Expand Down Expand Up @@ -852,6 +894,14 @@ func (r *Registrar) loadJoinBlocks() map[string]*cb.Block {
return channelToBlockMap
}

func (r *Registrar) removeJoinBlock(channelID string) error {
if err := r.joinBlockFileRepo.Remove(channelID); err != nil {
return errors.Wrapf(err, "failed removing joinblock for channel %s", channelID)
}

return nil
}

func (r *Registrar) removeSystemChannel() error {
systemChannelID := r.systemChannelID
consensusType := r.systemChannel.SharedConfig().ConsensusType()
Expand Down
Loading

0 comments on commit ca9b032

Please sign in to comment.