Skip to content

Commit

Permalink
[FAB-8802] Integrate discovery client
Browse files Browse the repository at this point in the history
Created a DiscoveryClient that interacts with
fabric's discovery client.

Change-Id: Ia57e335399c3a2d43e21a40576502f84a083842f
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Apr 20, 2018
1 parent d87362e commit 8b4777e
Show file tree
Hide file tree
Showing 17 changed files with 718 additions and 129 deletions.
154 changes: 154 additions & 0 deletions internal/github.com/hyperledger/fabric/discovery/client/selection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
/*
Notice: This file has been modified for Hyperledger Fabric SDK Go usage.
Please review third_party pinning scripts and patches for more details.
*/

package discovery

import (
"math/rand"
"sort"
"time"
)

// ExclusionFilter returns true if the given Peer
// is not to be considered when selecting peers
type ExclusionFilter interface {
// Exclude returns whether the given Peer is to be excluded or not
Exclude(Peer) bool
}

type selectionFunc func(Peer) bool

func (sf selectionFunc) Exclude(p Peer) bool {
return sf(p)
}

// PrioritySelector guides the selection of peers via
// giving peers a relative priority to their selection
type PrioritySelector interface {
// Compare compares between 2 peers and returns
// their relative scores
Compare(Peer, Peer) Priority
}

// Priority defines how likely a peer is to be selected
// over another peer.
// Positive priority means the left peer is selected
// Negative priority means the right peer is selected
// Zero priority means their priorities are the same
type Priority int

var (
// PrioritiesByHeight selects peers by descending height
PrioritiesByHeight = &byHeight{}
// NoExclusion accepts all peers and rejects no peers
NoExclusion = selectionFunc(noExclusion)
// NoPriorities is indifferent to how it selects peers
NoPriorities = &noPriorities{}
)

type noPriorities struct{}

func (nc noPriorities) Compare(_ Peer, _ Peer) Priority {
return 0
}

type byHeight struct{}

func (*byHeight) Compare(left Peer, right Peer) Priority {
leftHeight := left.StateInfoMessage.GetStateInfo().Properties.LedgerHeight
rightHeight := right.StateInfoMessage.GetStateInfo().Properties.LedgerHeight

if leftHeight > rightHeight {
return 1
}
if rightHeight > leftHeight {
return -1
}
return 0
}

func noExclusion(_ Peer) bool {
return false
}

// ExcludeHosts returns a ExclusionFilter that excludes the given endpoints
func ExcludeHosts(endpoints ...string) ExclusionFilter {
m := make(map[string]struct{})
for _, endpoint := range endpoints {
m[endpoint] = struct{}{}
}
return ExcludeByHost(func(host string) bool {
_, excluded := m[host]
return excluded
})
}

// ExcludeByHost creates a ExclusionFilter out of the given exclusion predicate
func ExcludeByHost(reject func(host string) bool) ExclusionFilter {
return selectionFunc(func(p Peer) bool {
endpoint := p.AliveMessage.GetAliveMsg().Membership.Endpoint
var internalEndpoint string
se := p.AliveMessage.GetSecretEnvelope()
if se != nil {
internalEndpoint = se.InternalEndpoint()
}
return reject(endpoint) || reject(internalEndpoint)
})
}

// Filter filters the endorsers according to the given ExclusionFilter
func (endorsers Endorsers) Filter(f ExclusionFilter) Endorsers {
var res Endorsers
for _, e := range endorsers {
if !f.Exclude(*e) {
res = append(res, e)
}
}
return res
}

// Shuffle sorts the endorsers in random order
func (endorsers Endorsers) Shuffle() Endorsers {
res := make(Endorsers, len(endorsers))
rand.Seed(time.Now().UnixNano())
for i, index := range rand.Perm(len(endorsers)) {
res[i] = endorsers[index]
}
return res
}

type endorserSort struct {
Endorsers
PrioritySelector
}

// Sort sorts the endorsers according to the given PrioritySelector
func (endorsers Endorsers) Sort(ps PrioritySelector) Endorsers {
sort.Sort(&endorserSort{
Endorsers: endorsers,
PrioritySelector: ps,
})
return endorsers
}

func (es *endorserSort) Len() int {
return len(es.Endorsers)
}

func (es *endorserSort) Less(i, j int) bool {
e1 := es.Endorsers[i]
e2 := es.Endorsers[j]
less := es.Compare(*e1, *e2)
return less > Priority(0)
}

func (es *endorserSort) Swap(i, j int) {
es.Endorsers[i], es.Endorsers[j] = es.Endorsers[j], es.Endorsers[i]
}
11 changes: 11 additions & 0 deletions internal/github.com/hyperledger/fabric/protos/gossip/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ func (e *Envelope) ToGossipMessage() (*SignedGossipMessage, error) {
}, nil
}

// InternalEndpoint returns the internal endpoint
// in the secret envelope, or an empty string
// if a failure occurs.
func (s *SecretEnvelope) InternalEndpoint() string {
secret := &Secret{}
if err := proto.Unmarshal(s.Payload, secret); err != nil {
return ""
}
return secret.GetInternalEndpoint()
}

// SignedGossipMessage contains a GossipMessage
// and the Envelope from which it came from
type SignedGossipMessage struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/common/providers/fab/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
)

// ClientContext contains the client context
// TODO: This is a duplicate of context.Client since importing context.Client causes
// a circular import error. This problem should be addressed in a future patch.
type ClientContext interface {
core.Providers
msp.Providers
Expand Down Expand Up @@ -134,6 +132,8 @@ const (
ChannelConfigRefresh
// ChannelMembershipRefresh channel membership refresh interval
ChannelMembershipRefresh
// DiscoveryConnection discovery connection timeout
DiscoveryConnection
)

// EventServiceType specifies the type of event service to use
Expand Down
3 changes: 3 additions & 0 deletions pkg/core/config/testdata/template/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ client:
# timeout:
# connection: 15s
# response: 15s
# discovery:
# timeout:
# connection: 15s
# global:
# timeout:
# query: 180s
Expand Down
9 changes: 1 addition & 8 deletions pkg/fab/comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ const (
// GRPCConnection manages the GRPC connection and client stream
type GRPCConnection struct {
context fabcontext.Client
chConfig fab.ChannelCfg
conn *grpc.ClientConn
commManager fab.CommManager
tlsCertHash []byte
done int32
}

// NewConnection creates a new connection
func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, url string, opts ...options.Opt) (*GRPCConnection, error) {
func NewConnection(ctx fabcontext.Client, url string, opts ...options.Opt) (*GRPCConnection, error) {
if url == "" {
return nil, errors.New("server URL not specified")
}
Expand Down Expand Up @@ -71,18 +70,12 @@ func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, url string, o

return &GRPCConnection{
context: ctx,
chConfig: chConfig,
commManager: commManager,
conn: grpcconn,
tlsCertHash: comm.TLSCertHash(ctx.EndpointConfig()),
}, nil
}

// ChannelConfig returns the channel configuration
func (c *GRPCConnection) ChannelConfig() fab.ChannelCfg {
return c.chConfig
}

// ClientConn returns the underlying GRPC connection
func (c *GRPCConnection) ClientConn() *grpc.ClientConn {
return c.conn
Expand Down
7 changes: 2 additions & 5 deletions pkg/fab/comm/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@ import (
)

func TestConnection(t *testing.T) {
channelID := "testchannel"

context := newMockContext()
chConfig := fabmocks.NewMockChannelCfg(channelID)

_, err := NewConnection(context, chConfig, "")
_, err := NewConnection(context, "")
if err == nil {
t.Fatalf("expected error creating new connection with empty URL")
}
conn, err := NewConnection(context, chConfig, peerURL)
conn, err := NewConnection(context, peerURL)
if err != nil {
t.Fatalf("error creating new connection: %s", err)
}
Expand Down
63 changes: 63 additions & 0 deletions pkg/fab/comm/connectionopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"crypto/x509"
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/common/errors/status"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/spf13/cast"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -142,3 +145,63 @@ type insecureSetter interface {
type connectTimeoutSetter interface {
SetConnectTimeout(value time.Duration)
}

// OptsFromPeerConfig returns a set of connection options from the given peer config
func OptsFromPeerConfig(peerCfg *fab.PeerConfig) ([]options.Opt, error) {
certificate, err := peerCfg.TLSCACerts.TLSCert()
if err != nil {
//Ignore empty cert errors,
errStatus, ok := err.(*status.Status)
if !ok || errStatus.Code != status.EmptyCert.ToInt32() {
return nil, err
}
}

opts := []options.Opt{
WithHostOverride(getServerNameOverride(peerCfg)),
WithFailFast(getFailFast(peerCfg)),
WithKeepAliveParams(getKeepAliveOptions(peerCfg)),
WithCertificate(certificate),
}
if isInsecureAllowed(peerCfg) {
opts = append(opts, WithInsecure())
}

return opts, nil
}

func getServerNameOverride(peerCfg *fab.PeerConfig) string {
if str, ok := peerCfg.GRPCOptions["ssl-target-name-override"].(string); ok {
return str
}
return ""
}

func getFailFast(peerCfg *fab.PeerConfig) bool {
if ff, ok := peerCfg.GRPCOptions["fail-fast"].(bool); ok {
return cast.ToBool(ff)
}
return false
}

func getKeepAliveOptions(peerCfg *fab.PeerConfig) keepalive.ClientParameters {
var kap keepalive.ClientParameters
if kaTime, ok := peerCfg.GRPCOptions["keep-alive-time"]; ok {
kap.Time = cast.ToDuration(kaTime)
}
if kaTimeout, ok := peerCfg.GRPCOptions["keep-alive-timeout"]; ok {
kap.Timeout = cast.ToDuration(kaTimeout)
}
if kaPermit, ok := peerCfg.GRPCOptions["keep-alive-permit"]; ok {
kap.PermitWithoutStream = cast.ToBool(kaPermit)
}
return kap
}

func isInsecureAllowed(peerCfg *fab.PeerConfig) bool {
allowInsecure, ok := peerCfg.GRPCOptions["allow-insecure"].(bool)
if ok {
return allowInsecure
}
return false
}
13 changes: 10 additions & 3 deletions pkg/fab/comm/streamconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ type StreamProvider func(conn *grpc.ClientConn) (grpc.ClientStream, error)
// StreamConnection manages the GRPC connection and client stream
type StreamConnection struct {
*GRPCConnection
stream grpc.ClientStream
lock sync.Mutex
chConfig fab.ChannelCfg
stream grpc.ClientStream
lock sync.Mutex
}

// NewStreamConnection creates a new connection with stream
func NewStreamConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamProvider, url string, opts ...options.Opt) (*StreamConnection, error) {
conn, err := NewConnection(ctx, chConfig, url, opts...)
conn, err := NewConnection(ctx, url, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -67,10 +68,16 @@ func NewStreamConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamP

return &StreamConnection{
GRPCConnection: conn,
chConfig: chConfig,
stream: stream,
}, nil
}

// ChannelConfig returns the channel configuration
func (c *StreamConnection) ChannelConfig() fab.ChannelCfg {
return c.chConfig
}

// Close closes the connection
func (c *StreamConnection) Close() {
c.lock.Lock()
Expand Down
Loading

0 comments on commit 8b4777e

Please sign in to comment.