Skip to content

Commit

Permalink
[FAB-8944] Channel configuration cache refresh
Browse files Browse the repository at this point in the history
Change-Id: I652baea135602648059fdb61be45239306beac5c
Signed-off-by: Divyank Katira <Divyank.Katira@securekey.com>
  • Loading branch information
d1vyank committed Mar 17, 2018
1 parent ff9763c commit ecb7b03
Show file tree
Hide file tree
Showing 26 changed files with 531 additions and 115 deletions.
5 changes: 4 additions & 1 deletion pkg/client/channel/chclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ func (cc *Client) prepareHandlerContexts(reqCtx reqContext.Context, request Requ
return nil, nil, errors.New("ChaincodeID and Fcn are required")
}

chConfig := cc.context.ChannelService().ChannelConfig()
chConfig, err := cc.context.ChannelService().ChannelConfig()
if err != nil {
return nil, nil, errors.WithMessage(err, "failed to retrieve channel config")
}
transactor, err := cc.context.InfraProvider().CreateChannelTransactor(reqCtx, chConfig)
if err != nil {
return nil, nil, errors.WithMessage(err, "failed to create transactor")
Expand Down
5 changes: 4 additions & 1 deletion pkg/client/resmgmt/resmgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,10 @@ func (rc *Client) sendCCProposal(reqCtx reqContext.Context, ccProposalType chain
return errors.WithMessage(err, "Unable to get channel service")
}

chConfig := channelService.ChannelConfig()
chConfig, err := channelService.ChannelConfig()
if err != nil {
return errors.WithMessage(err, "get channel config failed")
}
transactor, err := rc.ctx.InfraProvider().CreateChannelTransactor(reqCtx, chConfig)
if err != nil {
return errors.WithMessage(err, "get channel transactor failed")
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/providers/core/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ const (
PeerResponse
// ResMgmt timeout is default overall timeout for all resource management operations
ResMgmt
// ChannelConfigRefresh channel configuration refresh interval
ChannelConfigRefresh
// ChannelMembershipRefresh channel membership refresh interval
ChannelMembershipRefresh
)

// EventServiceType specifies the type of event service to use
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/providers/fab/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type ChannelService interface {
Config() (ChannelConfig, error)
EventService() (EventService, error)
Membership() (ChannelMembership, error)
ChannelConfig() ChannelCfg
ChannelConfig() (ChannelCfg, error)
}

// Transactor supplies methods for sending transaction proposals and transactions.
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/providers/fab/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ type ClientContext interface {
// InfraProvider enables access to fabric objects such as peer and user based on config or
type InfraProvider interface {
CreateChannelConfig(name string) (ChannelConfig, error)
CreateChannelCfg(ctx ClientContext, channelID string) (ChannelCfg, error)
CreateChannelTransactor(reqCtx reqContext.Context, cfg ChannelCfg) (Transactor, error)
CreateChannelMembership(cfg ChannelCfg) (ChannelMembership, error)
CreateEventService(ctx ClientContext, chConfig ChannelCfg) (EventService, error)
CreateChannelMembership(ctx ClientContext, channelID string) (ChannelMembership, error)
CreateEventService(ctx ClientContext, channelID string) (EventService, error)
CreatePeerFromConfig(peerCfg *core.NetworkPeer) (Peer, error)
CreateOrdererFromConfig(cfg *core.OrdererConfig) (Orderer, error)
CommManager() CommManager
Expand Down
4 changes: 4 additions & 0 deletions pkg/core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ func (c *Config) getTimeout(tType core.TimeoutType) time.Duration {
timeout = c.configViper.GetDuration("client.orderer.timeout.connection")
case core.OrdererResponse:
timeout = c.configViper.GetDuration("client.orderer.timeout.response")
case core.ChannelConfigRefresh:
timeout = c.configViper.GetDuration("client.global.timeout.cache.channelConfig")
case core.ChannelMembershipRefresh:
timeout = c.configViper.GetDuration("client.global.timeout.cache.channelMembership")
case core.CacheSweepInterval: // EXPERIMENTAL - do we need this to be configurable?
timeout = c.configViper.GetDuration("client.cache.interval.sweep")
if timeout == 0 {
Expand Down
80 changes: 80 additions & 0 deletions pkg/fab/channel/membership/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package membership

import (
"crypto/sha256"
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazycache"
"github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazyref"

"github.com/pkg/errors"
)

// CacheKey membership reference cache key
type CacheKey interface {
lazycache.Key
Context() Context
ChannelID() string
ChConfigRef() *lazyref.Reference
}

// CacheKey holds a key for the cache
type cacheKey struct {
key string
context Context
channelID string
chConfigRef *lazyref.Reference
}

// NewCacheKey returns a new CacheKey
func NewCacheKey(context Context, chConfigRef *lazyref.Reference, channelID string) (CacheKey, error) {
h := sha256.New()
hash := h.Sum([]byte(channelID))

return &cacheKey{
key: string(hash),
context: context,
chConfigRef: chConfigRef,
channelID: channelID,
}, nil
}

// NewRefCache a cache of membership references that refreshed with the
// given interval
func NewRefCache(refresh time.Duration) *lazycache.Cache {
initializer := func(key lazycache.Key) (interface{}, error) {
ck, ok := key.(CacheKey)
if !ok {
return nil, errors.New("Unexpected cache key")
}
return NewRef(refresh, ck.Context(), ck.ChConfigRef()), nil
}

return lazycache.New("Membership_Cache", initializer)
}

// String returns the key as a string
func (k *cacheKey) String() string {
return k.key
}

// Context returns the context
func (k *cacheKey) Context() Context {
return k.context
}

// ChannelID returns the channelID
func (k *cacheKey) ChannelID() string {
return k.channelID
}

// ChConfigRef returns the channel config reference
func (k *cacheKey) ChConfigRef() *lazyref.Reference {
return k.chConfigRef
}
86 changes: 86 additions & 0 deletions pkg/fab/channel/membership/reference.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package membership

import (
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazyref"
"github.com/pkg/errors"
)

// Ref membership reference that refreshes to load the given channel config reference
type Ref struct {
*lazyref.Reference
chConfigRef *lazyref.Reference
context Context
}

// NewRef returns a new membership reference
func NewRef(refresh time.Duration, context Context, chConfigRef *lazyref.Reference) *Ref {
ref := &Ref{
chConfigRef: chConfigRef,
context: context,
}

ref.Reference = lazyref.New(
ref.initializer(),
lazyref.WithRefreshInterval(lazyref.InitImmediately, refresh),
)

return ref
}

// Validate calls validate on the underlying reference
func (ref *Ref) Validate(serializedID []byte) error {
membership, err := ref.get()
if err != nil {
return err
}
return membership.Validate(serializedID)
}

// Verify calls validate on the underlying reference
func (ref *Ref) Verify(serializedID []byte, msg []byte, sig []byte) error {
membership, err := ref.get()
if err != nil {
return err
}
return membership.Verify(serializedID, msg, sig)
}

func (ref *Ref) get() (fab.ChannelMembership, error) {
m, err := ref.Get()
if err != nil {
return nil, err
}
return m.(fab.ChannelMembership), nil
}

func (ref *Ref) initializer() lazyref.Initializer {
return func() (interface{}, error) {
logger.Debugf("Creating membership...")

channelCfg, err := ref.chConfigRef.Get()
if err != nil {
return nil, errors.WithMessage(err, "could not get channel config from reference")
}
cfg, ok := channelCfg.(fab.ChannelCfg)
if !ok {
return nil, errors.New("chConfigRef.Get() returned unexpected value ")
}

//TODO: create new membership only if config block number has changed
membership, err := New(ref.context, cfg)
if err != nil {
return nil, err
}

return membership, nil
}
}
89 changes: 89 additions & 0 deletions pkg/fab/chconfig/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package chconfig

import (
"crypto/sha256"
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazycache"

"github.com/pkg/errors"
)

// Provider provides ChannelConfig
type Provider func(channelID string) (fab.ChannelConfig, error)

// CacheKey channel config reference cache key
type CacheKey interface {
lazycache.Key
Context() fab.ClientContext
ChannelID() string
Provider() Provider
}

// CacheKey holds a key for the provider cache
type cacheKey struct {
key string
channelID string
context fab.ClientContext
pvdr Provider
}

// NewCacheKey returns a new CacheKey
func NewCacheKey(ctx fab.ClientContext, pvdr Provider, channelID string) (CacheKey, error) {
identity, err := ctx.Serialize()
if err != nil {
return nil, err
}

h := sha256.New()
h.Write(identity)
hash := h.Sum([]byte(channelID))

return &cacheKey{
key: string(hash),
channelID: channelID,
context: ctx,
pvdr: pvdr,
}, nil
}

// NewRefCache a cache of channel config references that refreshed with the
// given interval
func NewRefCache(refresh time.Duration) *lazycache.Cache {
initializer := func(key lazycache.Key) (interface{}, error) {
ck, ok := key.(CacheKey)
if !ok {
return nil, errors.New("Unexpected cache key")
}
return NewRef(refresh, ck.Provider(), ck.ChannelID(), ck.Context()), nil
}

return lazycache.New("Channel_Cfg_Cache", initializer)
}

// String returns the key as a string
func (k *cacheKey) String() string {
return k.key
}

// Context returns the Context
func (k *cacheKey) Context() fab.ClientContext {
return k.context
}

// ChannelID returns the channelID
func (k *cacheKey) ChannelID() string {
return k.channelID
}

// Provider channel configuration provider
func (k *cacheKey) Provider() Provider {
return k.pvdr
}
61 changes: 61 additions & 0 deletions pkg/fab/chconfig/reference.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package chconfig

import (
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
contextImpl "github.com/hyperledger/fabric-sdk-go/pkg/context"
"github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazyref"
"github.com/pkg/errors"
)

// Ref channel configuration lazy reference
type Ref struct {
*lazyref.Reference
pvdr Provider
ctx fab.ClientContext
channelID string
closed int32
}

// NewRef returns a new channel config reference
func NewRef(refresh time.Duration, pvdr Provider, channel string, ctx fab.ClientContext) *Ref {
cfgRef := &Ref{
pvdr: pvdr,
ctx: ctx,
channelID: channel,
}

cfgRef.Reference = lazyref.New(
cfgRef.initializer(),
lazyref.WithRefreshInterval(lazyref.InitImmediately, refresh),
)

return cfgRef
}

func (ref *Ref) initializer() lazyref.Initializer {
return func() (interface{}, error) {
chConfigProvider, err := ref.pvdr(ref.channelID)
if err != nil {
return nil, errors.WithMessage(err, "error creating channel config provider")
}

reqCtx, cancel := contextImpl.NewRequest(ref.ctx, contextImpl.WithTimeoutType(core.PeerResponse))
defer cancel()

chConfig, err := chConfigProvider.Query(reqCtx)
if err != nil {
return nil, err
}

return chConfig, nil
}
}
4 changes: 2 additions & 2 deletions pkg/fab/mocks/mockchprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ func (cs *MockChannelService) Membership() (fab.ChannelMembership, error) {
}

//ChannelConfig returns channel config
func (cs *MockChannelService) ChannelConfig() fab.ChannelCfg {
return &MockChannelCfg{MockID: cs.channelID, MockOrderers: cs.mockOrderers}
func (cs *MockChannelService) ChannelConfig() (fab.ChannelCfg, error) {
return &MockChannelCfg{MockID: cs.channelID, MockOrderers: cs.mockOrderers}, nil
}
Loading

0 comments on commit ecb7b03

Please sign in to comment.