Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAB-17890 Ch.Part.API: allow registrar to list a single channel #1349

Merged
merged 3 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger/fabric/internal/pkg/identity"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
Expand All @@ -34,6 +35,11 @@ type ChainSupport struct {
// that there is a single consensus type at this orderer node and therefore the resolution of
// the consensus type too happens only at the ChainSupport level.
consensus.MetadataValidator

// The registrar is not aware of the exact type that the Chain is, e.g. etcdraft, inactive, or follower.
// Therefore, we let each chain report its cluster relation and status through this interface. Non cluster
// type chains (solo, kafka) are assigned a static reporter.
consensus.StatusReporter
}

func newChainSupport(
Expand Down Expand Up @@ -88,6 +94,11 @@ func newChainSupport(
cs.MetadataValidator = consensus.NoOpMetadataValidator{}
}

cs.StatusReporter, ok = cs.Chain.(consensus.StatusReporter)
if !ok { // Non-cluster types: solo, kafka
cs.StatusReporter = consensus.StaticStatusReporter{ClusterRelation: types.ClusterRelationNone, Status: types.StatusActive}
}

logger.Debugf("[channel: %s] Done creating channel support resources", cs.ChannelID())

return cs
Expand Down
16 changes: 14 additions & 2 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,20 @@ func (r *Registrar) ChannelList() types.ChannelList {
}

func (r *Registrar) ChannelInfo(channelID string) (types.ChannelInfo, error) {
//TODO
return types.ChannelInfo{}, errors.New("Not implemented yet")
r.lock.RLock()
defer r.lock.RUnlock()

info := types.ChannelInfo{}
cs, ok := r.chains[channelID]
if !ok {
return info, types.ErrChannelNotExist
}

info.Name = channelID
tock-ibm marked this conversation as resolved.
Show resolved Hide resolved
info.Height = cs.Height()
info.ClusterRelation, info.Status = cs.StatusReport()

return info, nil
}

func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block) (types.ChannelInfo, error) {
Expand Down
32 changes: 28 additions & 4 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ func TestNewRegistrar(t *testing.T) {
}, "Should not panic when starting without a system channel")
require.NotNil(t, manager)
list := manager.ChannelList()
assert.Equal(t, types.ChannelList{SystemChannel: nil, Channels: nil}, list)
assert.Equal(t, types.ChannelList{}, list)
info, err := manager.ChannelInfo("my-channel")
assert.EqualError(t, err, types.ErrChannelNotExist.Error())
assert.Equal(t, types.ChannelInfo{}, info)
})

// This test checks to make sure that the orderer refuses to come up if there are multiple system channels
Expand Down Expand Up @@ -231,6 +234,13 @@ func TestNewRegistrar(t *testing.T) {
list,
)

info, err := manager.ChannelInfo("testchannelid")
assert.NoError(t, err)
assert.Equal(t,
types.ChannelInfo{Name: "testchannelid", URL: "", ClusterRelation: "none", Status: "active", Height: 1},
info,
)

testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, "testchannelid", chainSupport, rl, t)
})
}
Expand All @@ -251,7 +261,7 @@ func TestCreateChain(t *testing.T) {
lf, _ := newLedgerAndFactory(tmpdir, "testchannelid", genesisBlockSys)

consenters := make(map[string]consensus.Consenter)
consenters[confSys.Orderer.OrdererType] = &mockConsenter{}
consenters[confSys.Orderer.OrdererType] = &mockConsenter{cluster: true}

manager := NewRegistrar(localconfig.TopLevel{}, lf, mockCrypto(), &disabled.Provider{}, cryptoProvider)
manager.Initialize(consenters)
Expand All @@ -278,18 +288,32 @@ func TestCreateChain(t *testing.T) {
list,
)

info, err := manager.ChannelInfo("testchannelid")
assert.NoError(t, err)
assert.Equal(t,
types.ChannelInfo{Name: "testchannelid", URL: "", ClusterRelation: types.ClusterRelationMember, Status: types.StatusActive, Height: 1},
info,
)

info, err = manager.ChannelInfo("mychannel")
assert.NoError(t, err)
assert.Equal(t,
types.ChannelInfo{Name: "mychannel", URL: "", ClusterRelation: types.ClusterRelationMember, Status: types.StatusActive, Height: 1},
info,
)

// A subsequent creation, replaces the chain.
manager.CreateChain("mychannel")
chain2 := manager.GetChain("mychannel")
assert.NotNil(t, chain2)
// They are not the same
assert.NotEqual(t, chain, chain2)
// The old chain is halted
_, ok := <-chain.Chain.(*mockChain).queue
_, ok := <-chain.Chain.(*mockChainCluster).queue
assert.False(t, ok)

// The new chain is not halted: Close the channel to prove that.
close(chain2.Chain.(*mockChain).queue)
close(chain2.Chain.(*mockChainCluster).queue)
})

// This test brings up the entire system, with the mock consenter, including the broadcasters etc. and creates a new chain
Expand Down
22 changes: 20 additions & 2 deletions orderer/common/multichannel/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package multichannel

import (
"fmt"
"github.com/hyperledger/fabric/orderer/common/types"

cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/capabilities"
Expand All @@ -23,16 +24,33 @@ import (
)

type mockConsenter struct {
cluster bool
}

func (mc *mockConsenter) HandleChain(support consensus.ConsenterSupport, metadata *cb.Metadata) (consensus.Chain, error) {
return &mockChain{
chain := &mockChain{
queue: make(chan *cb.Envelope),
cutter: support.BlockCutter(),
support: support,
metadata: metadata,
done: make(chan struct{}),
}, nil
}

if mc.cluster {
clusterChain := &mockChainCluster{}
clusterChain.mockChain = chain
return clusterChain, nil
}

return chain, nil
}

type mockChainCluster struct {
*mockChain
}

func (c *mockChainCluster) StatusReport() (types.ClusterRelation, types.Status) {
return types.ClusterRelationMember, types.StatusActive
}

type mockChain struct {
Expand Down
29 changes: 25 additions & 4 deletions orderer/common/types/channelinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,38 @@ type ChannelInfoShort struct {
URL string `json:"url"`
}

type ClusterRelation string

const (
ClusterRelationMember ClusterRelation = "member"
ClusterRelationFollower ClusterRelation = "follower"
ClusterRelationConfigTracker ClusterRelation = "config-tracker"
tock-ibm marked this conversation as resolved.
Show resolved Hide resolved
ClusterRelationNone ClusterRelation = "none"
)

type Status string

const (
StatusActive Status = "active"
StatusOnBoarding Status = "onboarding"
StatusInactive Status = "inactive"
)

// ChannelInfo carries the response to an HTTP request to List a single channel.
// This is marshaled into the body of the HTTP response.
type ChannelInfo struct {
// The channel name.
Name string `json:"name"`
// The channel relative URL (no Host:Port, only path), e.g.: "/participation/v1/channels/my-channel".
URL string `json:"url"`
// Whether the orderer is a “member” or ”follower” of the cluster, for this channel. Case insensitive.
ClusterRelation string `json:"clusterRelation"`
// Whether the orderer is ”onboarding” or ”active”, for this channel. Case insensitive.
Status string `json:"status"`
// Whether the orderer is a “member” or ”follower” of the cluster, or "config-tracker" of the cluster, for this channel.
// For non cluster consensus types (solo, kafka) it is "none".
// Possible values: “member”, ”follower”, "config-tracker", "none".
ClusterRelation ClusterRelation `json:"clusterRelation"`
// Whether the orderer is ”onboarding”, ”active”, or "inactive", for this channel.
// For non cluster consensus types (solo, kafka) it is "active".
// Possible values: “onboarding”, ”active”, "inactive".
Status Status `json:"status"`
// Current block height.
Height uint64 `json:"height"`
}
4 changes: 2 additions & 2 deletions orderer/common/types/channelinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestChannelInfo(t *testing.T) {
info := types.ChannelInfo{
Name: "a",
URL: "/api/channels/a",
ClusterRelation: "follower",
Status: "active",
ClusterRelation: types.ClusterRelationFollower,
Status: types.StatusActive,
Height: uint64(1) << 60,
}

Expand Down
2 changes: 1 addition & 1 deletion orderer/common/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ var ErrChannelAlreadyExists = errors.New("channel already exists")
// already exist.
var ErrAppChannelsAlreadyExists = errors.New("application channels already exist")

// This error is returned when trying to remove a channel that does not exist
// This error is returned when trying to remove or list a channel that does not exist
var ErrChannelNotExist = errors.New("channel does not exist")
32 changes: 32 additions & 0 deletions orderer/consensus/cluster_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package consensus

import "github.com/hyperledger/fabric/orderer/common/types"

// StatusReporter is implemented by cluster-type Chain implementations.
// It allows the node to report its cluster relation and its status within that relation.
// This information is used to generate the channelparticipation.ChannelInfo in response
// to a "List" request on a particular channel.
//
// Not all chains must implement this, in particular non-cluster-type (solo, kafka) are
// assigned a StaticStatusReporter at construction time.
type StatusReporter interface {
// StatusReport provides the cluster relation and status.
// See: channelparticipation.ChannelInfo for more details.
StatusReport() (types.ClusterRelation, types.Status)
}

// StaticStatusReporter is intended for chains that do not implement the StatusReporter interface.
type StaticStatusReporter struct {
ClusterRelation types.ClusterRelation
Status types.Status
}

func (s StaticStatusReporter) StatusReport() (types.ClusterRelation, types.Status) {
return s.ClusterRelation, s.Status
}
27 changes: 27 additions & 0 deletions orderer/consensus/cluster_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package consensus_test

import (
"github.com/hyperledger/fabric/orderer/common/types"
"testing"

"github.com/hyperledger/fabric/orderer/consensus"
"github.com/stretchr/testify/assert"
)

func TestStaticStatusReporter(t *testing.T) {
staticSR := &consensus.StaticStatusReporter{
ClusterRelation: types.ClusterRelationNone,
Status: types.StatusActive,
}

var sr consensus.StatusReporter = staticSR // make sure it implements this interface
cRel, status := sr.StatusReport()
assert.Equal(t, types.ClusterRelationNone, cRel)
assert.Equal(t, types.StatusActive, status)
}
6 changes: 6 additions & 0 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"encoding/pem"
"fmt"
"github.com/hyperledger/fabric/orderer/common/types"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1315,6 +1316,11 @@ func (c *Chain) ValidateConsensusMetadata(oldMetadataBytes, newMetadataBytes []b
return nil
}

// StatusReport returns the ClusterRelation & Status
func (c *Chain) StatusReport() (types.ClusterRelation, types.Status) {
return types.ClusterRelationMember, types.StatusActive
}

func (c *Chain) suspectEviction() bool {
if c.isRunning() != nil {
return false
Expand Down
4 changes: 4 additions & 0 deletions orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
orderer_types "github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft/mocks"
consensusmocks "github.com/hyperledger/fabric/orderer/consensus/mocks"
Expand Down Expand Up @@ -202,6 +203,9 @@ var _ = Describe("Chain", func() {
Expect(err).NotTo(HaveOccurred())

chain.Start()
cRel, status := chain.StatusReport()
Expect(cRel).To(Equal(orderer_types.ClusterRelationMember))
Expect(status).To(Equal(orderer_types.StatusActive))

// When the Raft node bootstraps, it produces a ConfChange
// to add itself, which needs to be consumed with Ready().
Expand Down
66 changes: 66 additions & 0 deletions orderer/consensus/follower/follower_chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package follower

import (
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/orderer/common/types"
)

//TODO skeleton

// Chain implements a component that allows the orderer to follow a specific channel when is not a cluster member,
// that is, be a "follower" of the cluster. This means that the current orderer is not a member of the consenters set
// of the channel, and is only pulling blocks from other orderers.
//
// The follower is inspecting config blocks as they are pulled and if it discovers that it was introduced into the
// consenters set, it will trigger the creation of a regular etcdraft.Chain, that is, turn into a "member" of the
// cluster.
//
// The follower is started in one of two ways: 1) following an API Join request with a join-block that does not include
// the orderer in its conseters set, or 2) when the orderer was a cluster member and was removed from the consenters
// set.
//
// The follower is in status "onboarding" when it pulls blocks below the join-block number, or "active" when it
// pulls blocks equal or above the join-block number.
type Chain struct {
Err error
//TODO skeleton
}

func (c *Chain) Order(_ *common.Envelope, _ uint64) error {
return c.Err
}

func (c *Chain) Configure(_ *common.Envelope, _ uint64) error {
return c.Err
}

func (c *Chain) WaitReady() error {
return c.Err
}

func (*Chain) Errored() <-chan struct{} {
closedChannel := make(chan struct{})
close(closedChannel)
return closedChannel
}

func (c *Chain) Start() {
//TODO skeleton
}

func (c *Chain) Halt() {
//TODO skeleton
}

// StatusReport returns the ClusterRelation & Status
func (c *Chain) StatusReport() (types.ClusterRelation, types.Status) {
status := types.StatusActive
//TODO if (height is >= join-block.height) return "active"; else return "onboarding"
return types.ClusterRelationFollower, status
}
Loading