Skip to content

Commit

Permalink
Adding Extension interfaces to Chain and EventHub.
Browse files Browse the repository at this point in the history
Also, EventHub had another exposed propbuf left.

Change-Id: I141996ba22ea85af281cafedfbb5d53c074fc5a6
Signed-off-by: Karl Einholz <karl.einholz@securekey.com>
  • Loading branch information
Karl Einholz committed Mar 30, 2017
1 parent dacc9ca commit fe98dcb
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 21 deletions.
37 changes: 32 additions & 5 deletions fabric-client/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ type Chain interface {
SendTransaction(tx *Transaction) ([]*TransactionResponse, error)
SendInstallProposal(chaincodeName string, chaincodePath string, chaincodeVersion string, chaincodePackage []byte, targets []Peer) ([]*TransactionProposalResponse, string, error)
SendInstantiateProposal(chaincodeName string, chainID string, args []string, chaincodePath string, chaincodeVersion string, targets []Peer) ([]*TransactionProposalResponse, string, error)

QueryExtensionInterface() ChainExtension
}

// The ChainExtension interface allows extensions of the SDK to add functionality to Chain overloads.
type ChainExtension interface {
GetClientContext() Client

SignPayload(payload []byte) (*SignedEnvelope, error)
BroadcastEnvelope(envelope *SignedEnvelope) ([]*TransactionResponse, error)

// TODO: This should go somewhere else?
GetProposalBytes(tp *TransactionProposal) ([]byte, error)
}

type chain struct {
Expand Down Expand Up @@ -167,6 +180,20 @@ func NewChain(name string, client Client) (Chain, error) {
return c, nil
}

func (c *chain) QueryExtensionInterface() ChainExtension {
return c
}

// GetClientContext returns the Client that was passed in to NewChain
func (c *chain) GetClientContext() Client {
return c.clientContext
}

// GetProposalBytes returns the serialized transaction.
func (c *chain) GetProposalBytes(tp *TransactionProposal) ([]byte, error) {
return proto.Marshal(tp.signedProposal)
}

// GetName ...
/**
* Get the chain name.
Expand Down Expand Up @@ -358,7 +385,7 @@ func (c *chain) CreateChannel(request CreateChannelRequest) error {
err.Error())
}
// Send request
responseMap, err := c.broadcastEnvelope(&SignedEnvelope{
responseMap, err := c.BroadcastEnvelope(&SignedEnvelope{
signature: signedEnvelope.Signature,
Payload: signedEnvelope.Payload,
})
Expand Down Expand Up @@ -788,12 +815,12 @@ func (c *chain) SendTransaction(tx *Transaction) ([]*TransactionResponse, error)
}

// here's the envelope
envelope, err := c.signPayload(paylBytes)
envelope, err := c.SignPayload(paylBytes)
if err != nil {
return nil, err
}

transactionResponses, err := c.broadcastEnvelope(envelope)
transactionResponses, err := c.BroadcastEnvelope(envelope)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -944,7 +971,7 @@ func (c *chain) SendInstantiateProposal(chaincodeName string, chainID string,
return transactionProposalResponse, txID, err
}

func (c *chain) signPayload(payload []byte) (*SignedEnvelope, error) {
func (c *chain) SignPayload(payload []byte) (*SignedEnvelope, error) {
//Get user info
user, err := c.clientContext.GetUserContext("")
if err != nil {
Expand All @@ -961,7 +988,7 @@ func (c *chain) signPayload(payload []byte) (*SignedEnvelope, error) {
}

//broadcastEnvelope will send the given envelope to each orderer
func (c *chain) broadcastEnvelope(envelope *SignedEnvelope) ([]*TransactionResponse, error) {
func (c *chain) BroadcastEnvelope(envelope *SignedEnvelope) ([]*TransactionResponse, error) {
// Check if orderers are defined
if c.orderers == nil || len(c.orderers) == 0 {
return nil, fmt.Errorf("orderers not set")
Expand Down
75 changes: 59 additions & 16 deletions fabric-client/events/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ type EventHub interface {
SetPeerAddr(peerURL string, certificate string, serverHostOverride string)
IsConnected() bool
Connect() error
RegisterChaincodeEvent(ccid string, eventname string, callback func(*pb.ChaincodeEvent)) *ChainCodeCBE
RegisterChaincodeEvent(ccid string, eventname string, callback func(*ChaincodeEvent)) *ChainCodeCBE
UnregisterChaincodeEvent(cbe *ChainCodeCBE)
RegisterTxEvent(txID string, callback func(string, error))
UnregisterTxEvent(txID string)
}

// The EventHubExt interface allows extensions of the SDK to add functionality to EventHub overloads.
type EventHubExt interface {
SetInterests(block bool, rejection bool)
AddChaincodeInterest(ChaincodeID string, EventName string)
}

type eventHub struct {
// Protects chaincodeRegistrants, blockRegistrants and txRegistrants
mtx sync.RWMutex
Expand All @@ -68,6 +74,14 @@ type eventHub struct {
interestedEvents []*pb.Interest
}

// ChaincodeEvent contains the current event data for the event handler
type ChaincodeEvent struct {
ChaincodeId string
TxId string
EventName string
Payload []byte
}

// ChainCodeCBE ...
/**
* The ChainCodeCBE is used internal to the EventHub to hold chaincode
Expand All @@ -79,7 +93,7 @@ type ChainCodeCBE struct {
// event name regex filter
EventNameFilter string
// callback function to invoke on successful filter match
CallbackFunc func(*pb.ChaincodeEvent)
CallbackFunc func(*ChaincodeEvent)
}

// NewEventHub ...
Expand All @@ -88,15 +102,40 @@ func NewEventHub() EventHub {
blockRegistrants := make([]func(*common.Block), 0)
txRegistrants := make(map[string]func(string, error))

eventHub := &eventHub{chaincodeRegistrants: chaincodeRegistrants, blockRegistrants: blockRegistrants, txRegistrants: txRegistrants, interestedEvents: nil}
// default interested events
// TODO: set interestedEvents based on handler registration
interestedEvents := []*pb.Interest{{EventType: pb.EventType_BLOCK}}

eventHub := &eventHub{chaincodeRegistrants: chaincodeRegistrants, blockRegistrants: blockRegistrants, txRegistrants: txRegistrants, interestedEvents: interestedEvents}
eventHub.SetInterests(true, true)

return eventHub
}

// SetInterests clears all interests and sets the interests for BLOCK and REJECTION type of events.
func (eventHub *eventHub) SetInterests(block bool, rejection bool) {
eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

eventHub.interestedEvents = nil
if block {
eventHub.interestedEvents = append(eventHub.interestedEvents, &pb.Interest{EventType: pb.EventType_BLOCK})
}
if rejection {
eventHub.interestedEvents = append(eventHub.interestedEvents, &pb.Interest{EventType: pb.EventType_REJECTION})
}
}

// AddChaincodeInterest adds interest for specific CHAINCODE events.
func (eventHub *eventHub) AddChaincodeInterest(ChaincodeID string, EventName string) {
eventHub.interestedEvents = append(eventHub.interestedEvents, &pb.Interest{
EventType: pb.EventType_CHAINCODE,
RegInfo: &pb.Interest_ChaincodeRegInfo{
ChaincodeRegInfo: &pb.ChaincodeReg{
ChaincodeId: ChaincodeID,
EventName: EventName,
},
},
})
}

// SetPeerAddr ...
/**
* Set peer url for event source<p>
Expand Down Expand Up @@ -149,6 +188,11 @@ func (eventHub *eventHub) Connect() error {
return nil
}

//SetInterestedEvents set events that client is interested in
func (eventHub *eventHub) SetInterestedEvents(events []*pb.Interest) {
eventHub.interestedEvents = events
}

//GetInterestedEvents implements consumer.EventAdapter interface for registering interested events
func (eventHub *eventHub) GetInterestedEvents() ([]*pb.Interest, error) {
return eventHub.interestedEvents, nil
Expand Down Expand Up @@ -180,7 +224,12 @@ func (eventHub *eventHub) Recv(msg *pb.Event) (bool, error) {
if v.EventNameFilter == ccEvent.ChaincodeEvent.EventName {
callback := v.CallbackFunc
if callback != nil {
callback(ccEvent.ChaincodeEvent)
callback(&ChaincodeEvent{
ChaincodeId: ccEvent.ChaincodeEvent.ChaincodeId,
TxId: ccEvent.ChaincodeEvent.TxId,
EventName: ccEvent.ChaincodeEvent.EventName,
Payload: ccEvent.ChaincodeEvent.Payload,
})
}
}
}
Expand Down Expand Up @@ -217,14 +266,12 @@ func (eventHub *eventHub) Disconnected(err error) {
* @returns {object} ChainCodeCBE object that should be treated as an opaque
* handle used to unregister (see unregisterChaincodeEvent)
*/
func (eventHub *eventHub) RegisterChaincodeEvent(ccid string, eventname string, callback func(*pb.ChaincodeEvent)) *ChainCodeCBE {
if !eventHub.connected {
return nil
}

func (eventHub *eventHub) RegisterChaincodeEvent(ccid string, eventname string, callback func(*ChaincodeEvent)) *ChainCodeCBE {
eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

eventHub.AddChaincodeInterest(ccid, eventname)

cbe := ChainCodeCBE{CCID: ccid, EventNameFilter: eventname, CallbackFunc: callback}
cbeArray := eventHub.chaincodeRegistrants[ccid]
if cbeArray == nil && len(cbeArray) <= 0 {
Expand All @@ -245,10 +292,6 @@ func (eventHub *eventHub) RegisterChaincodeEvent(ccid string, eventname string,
* registerChaincodeEvent.
*/
func (eventHub *eventHub) UnregisterChaincodeEvent(cbe *ChainCodeCBE) {
if !eventHub.connected {
return
}

eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

Expand Down

0 comments on commit fe98dcb

Please sign in to comment.