Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAB-17890 Ch.Part.API: allow registrar to list a single channel #1349

Merged
merged 3 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger/fabric/internal/pkg/identity"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
Expand All @@ -34,6 +35,11 @@ type ChainSupport struct {
// that there is a single consensus type at this orderer node and therefore the resolution of
// the consensus type too happens only at the ChainSupport level.
consensus.MetadataValidator

// The registrar is not aware of the exact type that the Chain is, e.g. etcdraft, inactive, or follower.
// Therefore, we let each chain report its cluster relation and status through this interface. Non cluster
// type chains (solo, kafka) are assigned a static reporter.
consensus.StatusReporter
}

func newChainSupport(
Expand Down Expand Up @@ -88,6 +94,11 @@ func newChainSupport(
cs.MetadataValidator = consensus.NoOpMetadataValidator{}
}

cs.StatusReporter, ok = cs.Chain.(consensus.StatusReporter)
if !ok { // Non-cluster types: solo, kafka
cs.StatusReporter = consensus.StaticStatusReporter{ClusterRelation: types.ClusterRelationNone, Status: types.StatusActive}
}

logger.Debugf("[channel: %s] Done creating channel support resources", cs.ChannelID())

return cs
Expand Down
16 changes: 14 additions & 2 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,20 @@ func (r *Registrar) ChannelList() types.ChannelList {
}

func (r *Registrar) ChannelInfo(channelID string) (types.ChannelInfo, error) {
//TODO
return types.ChannelInfo{}, errors.New("Not implemented yet")
r.lock.RLock()
defer r.lock.RUnlock()

info := types.ChannelInfo{}
cs, ok := r.chains[channelID]
if !ok {
return info, types.ErrChannelNotExist
}

info.Name = channelID
tock-ibm marked this conversation as resolved.
Show resolved Hide resolved
info.Height = cs.Height()
info.ClusterRelation, info.Status = cs.StatusReport()

return info, nil
}

func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block) (types.ChannelInfo, error) {
Expand Down
32 changes: 28 additions & 4 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ func TestNewRegistrar(t *testing.T) {
}, "Should not panic when starting without a system channel")
require.NotNil(t, manager)
list := manager.ChannelList()
assert.Equal(t, types.ChannelList{SystemChannel: nil, Channels: nil}, list)
assert.Equal(t, types.ChannelList{}, list)
info, err := manager.ChannelInfo("my-channel")
assert.EqualError(t, err, types.ErrChannelNotExist.Error())
assert.Equal(t, types.ChannelInfo{}, info)
})

// This test checks to make sure that the orderer refuses to come up if there are multiple system channels
Expand Down Expand Up @@ -231,6 +234,13 @@ func TestNewRegistrar(t *testing.T) {
list,
)

info, err := manager.ChannelInfo("testchannelid")
assert.NoError(t, err)
assert.Equal(t,
types.ChannelInfo{Name: "testchannelid", URL: "", ClusterRelation: "none", Status: "active", Height: 1},
info,
)

testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, "testchannelid", chainSupport, rl, t)
})
}
Expand All @@ -251,7 +261,7 @@ func TestCreateChain(t *testing.T) {
lf, _ := newLedgerAndFactory(tmpdir, "testchannelid", genesisBlockSys)

consenters := make(map[string]consensus.Consenter)
consenters[confSys.Orderer.OrdererType] = &mockConsenter{}
consenters[confSys.Orderer.OrdererType] = &mockConsenter{cluster: true}

manager := NewRegistrar(localconfig.TopLevel{}, lf, mockCrypto(), &disabled.Provider{}, cryptoProvider)
manager.Initialize(consenters)
Expand All @@ -278,18 +288,32 @@ func TestCreateChain(t *testing.T) {
list,
)

info, err := manager.ChannelInfo("testchannelid")
assert.NoError(t, err)
assert.Equal(t,
types.ChannelInfo{Name: "testchannelid", URL: "", ClusterRelation: types.ClusterRelationMember, Status: types.StatusActive, Height: 1},
info,
)

info, err = manager.ChannelInfo("mychannel")
assert.NoError(t, err)
assert.Equal(t,
types.ChannelInfo{Name: "mychannel", URL: "", ClusterRelation: types.ClusterRelationMember, Status: types.StatusActive, Height: 1},
info,
)

// A subsequent creation, replaces the chain.
manager.CreateChain("mychannel")
chain2 := manager.GetChain("mychannel")
assert.NotNil(t, chain2)
// They are not the same
assert.NotEqual(t, chain, chain2)
// The old chain is halted
_, ok := <-chain.Chain.(*mockChain).queue
_, ok := <-chain.Chain.(*mockChainCluster).queue
assert.False(t, ok)

// The new chain is not halted: Close the channel to prove that.
close(chain2.Chain.(*mockChain).queue)
close(chain2.Chain.(*mockChainCluster).queue)
})

// This test brings up the entire system, with the mock consenter, including the broadcasters etc. and creates a new chain
Expand Down
22 changes: 20 additions & 2 deletions orderer/common/multichannel/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package multichannel

import (
"fmt"
"github.com/hyperledger/fabric/orderer/common/types"

cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/capabilities"
Expand All @@ -23,16 +24,33 @@ import (
)

type mockConsenter struct {
cluster bool
}

func (mc *mockConsenter) HandleChain(support consensus.ConsenterSupport, metadata *cb.Metadata) (consensus.Chain, error) {
return &mockChain{
chain := &mockChain{
queue: make(chan *cb.Envelope),
cutter: support.BlockCutter(),
support: support,
metadata: metadata,
done: make(chan struct{}),
}, nil
}

if mc.cluster {
clusterChain := &mockChainCluster{}
clusterChain.mockChain = chain
return clusterChain, nil
}

return chain, nil
}

type mockChainCluster struct {
*mockChain
}

func (c *mockChainCluster) StatusReport() (types.ClusterRelation, types.Status) {
return types.ClusterRelationMember, types.StatusActive
}

type mockChain struct {
Expand Down
44 changes: 40 additions & 4 deletions orderer/common/types/channelinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,53 @@ type ChannelInfoShort struct {
URL string `json:"url"`
}

// ClusterRelation represents the relationship between the orderer and the channel's consensus cluster.
type ClusterRelation string

const (
// The orderer is a cluster member of a cluster consensus protocol (e.g. etcdraft) for a specific channel.
// That is, the orderer is in the consenters set of the channel.
ClusterRelationMember ClusterRelation = "member"
// The orderer is following a cluster consensus protocol by pulling blocks from other orderers.
// The orderer is NOT in the consenters set of the channel.
ClusterRelationFollower ClusterRelation = "follower"
// The orderer is NOT in the consenters set of the channel, and is just tracking (polling) the last config block
// of the channel in order to detect when it is added to the channel.
ClusterRelationConfigTracker ClusterRelation = "config-tracker"
tock-ibm marked this conversation as resolved.
Show resolved Hide resolved
// The orderer runs a non-cluster consensus type, solo or kafka.
ClusterRelationNone ClusterRelation = "none"
)

// Status represents the degree by which the orderer had caught up with the rest of the cluster after joining the
// channel (either as a member or a follower).
type Status string

const (
// The orderer is active in the channel's consensus protocol, or following the cluster,
// with block height > the join-block number. (Height is last block number +1).
StatusActive Status = "active"
// The orderer is catching up with the cluster by pulling blocks from other orderers,
// with block height <= the join-block number.
StatusOnBoarding Status = "onboarding"
// The orderer is not storing any blocks for this channel.
StatusInactive Status = "inactive"
)

// ChannelInfo carries the response to an HTTP request to List a single channel.
// This is marshaled into the body of the HTTP response.
type ChannelInfo struct {
// The channel name.
Name string `json:"name"`
// The channel relative URL (no Host:Port, only path), e.g.: "/participation/v1/channels/my-channel".
URL string `json:"url"`
// Whether the orderer is a “member” or ”follower” of the cluster, for this channel. Case insensitive.
ClusterRelation string `json:"clusterRelation"`
// Whether the orderer is ”onboarding” or ”active”, for this channel. Case insensitive.
Status string `json:"status"`
// Whether the orderer is a “member” or ”follower” of the cluster, or "config-tracker" of the cluster, for this channel.
// For non cluster consensus types (solo, kafka) it is "none".
// Possible values: “member”, ”follower”, "config-tracker", "none".
ClusterRelation ClusterRelation `json:"clusterRelation"`
// Whether the orderer is ”onboarding”, ”active”, or "inactive", for this channel.
// For non cluster consensus types (solo, kafka) it is "active".
// Possible values: “onboarding”, ”active”, "inactive".
Status Status `json:"status"`
// Current block height.
Height uint64 `json:"height"`
}
4 changes: 2 additions & 2 deletions orderer/common/types/channelinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestChannelInfo(t *testing.T) {
info := types.ChannelInfo{
Name: "a",
URL: "/api/channels/a",
ClusterRelation: "follower",
Status: "active",
ClusterRelation: types.ClusterRelationFollower,
Status: types.StatusActive,
Height: uint64(1) << 60,
}

Expand Down
2 changes: 1 addition & 1 deletion orderer/common/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ var ErrChannelAlreadyExists = errors.New("channel already exists")
// already exist.
var ErrAppChannelsAlreadyExists = errors.New("application channels already exist")

// This error is returned when trying to remove a channel that does not exist
// This error is returned when trying to remove or list a channel that does not exist
var ErrChannelNotExist = errors.New("channel does not exist")
32 changes: 32 additions & 0 deletions orderer/consensus/cluster_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package consensus

import "github.com/hyperledger/fabric/orderer/common/types"

// StatusReporter is implemented by cluster-type Chain implementations.
// It allows the node to report its cluster relation and its status within that relation.
// This information is used to generate the channelparticipation.ChannelInfo in response
// to a "List" request on a particular channel.
//
// Not all chains must implement this, in particular non-cluster-type (solo, kafka) are
// assigned a StaticStatusReporter at construction time.
type StatusReporter interface {
// StatusReport provides the cluster relation and status.
// See: channelparticipation.ChannelInfo for more details.
StatusReport() (types.ClusterRelation, types.Status)
}

// StaticStatusReporter is intended for chains that do not implement the StatusReporter interface.
type StaticStatusReporter struct {
ClusterRelation types.ClusterRelation
Status types.Status
}

func (s StaticStatusReporter) StatusReport() (types.ClusterRelation, types.Status) {
return s.ClusterRelation, s.Status
}
27 changes: 27 additions & 0 deletions orderer/consensus/cluster_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package consensus_test

import (
"github.com/hyperledger/fabric/orderer/common/types"
"testing"

"github.com/hyperledger/fabric/orderer/consensus"
"github.com/stretchr/testify/assert"
)

func TestStaticStatusReporter(t *testing.T) {
staticSR := &consensus.StaticStatusReporter{
ClusterRelation: types.ClusterRelationNone,
Status: types.StatusActive,
}

var sr consensus.StatusReporter = staticSR // make sure it implements this interface
cRel, status := sr.StatusReport()
assert.Equal(t, types.ClusterRelationNone, cRel)
assert.Equal(t, types.StatusActive, status)
}
6 changes: 6 additions & 0 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"encoding/pem"
"fmt"
"github.com/hyperledger/fabric/orderer/common/types"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1315,6 +1316,11 @@ func (c *Chain) ValidateConsensusMetadata(oldMetadataBytes, newMetadataBytes []b
return nil
}

// StatusReport returns the ClusterRelation & Status
func (c *Chain) StatusReport() (types.ClusterRelation, types.Status) {
return types.ClusterRelationMember, types.StatusActive
}

func (c *Chain) suspectEviction() bool {
if c.isRunning() != nil {
return false
Expand Down
4 changes: 4 additions & 0 deletions orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
orderer_types "github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft/mocks"
consensusmocks "github.com/hyperledger/fabric/orderer/consensus/mocks"
Expand Down Expand Up @@ -202,6 +203,9 @@ var _ = Describe("Chain", func() {
Expect(err).NotTo(HaveOccurred())

chain.Start()
cRel, status := chain.StatusReport()
Expect(cRel).To(Equal(orderer_types.ClusterRelationMember))
Expect(status).To(Equal(orderer_types.StatusActive))

// When the Raft node bootstraps, it produces a ConfChange
// to add itself, which needs to be consumed with Ready().
Expand Down
Loading