Skip to content

Commit

Permalink
[FAB-1738] Event registration requiring signature
Browse files Browse the repository at this point in the history
Use SignedEvent instead of Event for registration requests in
order to apply access control based on Creator identity against
target peer's local MSP. Note that at this point event
stream is still channel-agnostic so local MSP is checked.

Strictly speaking the registered listener is supposed to be
checked against the "Readers" policy of all channels, since
the registration will result in the listener receiving blocks
from all channels. However this is determined to be out of scope
for v1.0 due to the following reasons:
- special Readers policies that exclude normal "members" of an
  organization is considered edge cases
- rather complicated code refactoring would be needed to implement
  policy checking against channels in order to avoid a circular
  dependency from peer -> committer -> event producer -> peer

Change-Id: Ic941566c5299b4600b291137bc4724a02b5263d7
Signed-off-by: Jim Zhang <jzhang@us.ibm.com>
  • Loading branch information
jimthematrix committed Apr 19, 2017
1 parent eba4a20 commit 3870bcf
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 79 deletions.
10 changes: 5 additions & 5 deletions events/consumer/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ limitations under the License.
package consumer

import (
ehpb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/peer"
)

//EventAdapter is the interface by which a openchain event client registers interested events and
//receives messages from the openchain event Server
//EventAdapter is the interface by which a fabric event client registers interested events and
//receives messages from the fabric event Server
type EventAdapter interface {
GetInterestedEvents() ([]*ehpb.Interest, error)
Recv(msg *ehpb.Event) (bool, error)
GetInterestedEvents() ([]*peer.Interest, error)
Recv(msg *peer.Event) (bool, error)
Disconnected(err error)
}
29 changes: 26 additions & 3 deletions events/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@ limitations under the License.
package consumer

import (
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/op/go-logging"
"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/hyperledger/fabric/core/comm"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
ehpb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
)

var consumerLogger = logging.MustGetLogger("eventhub_consumer")

//EventsClient holds the stream and adapter for consumer to work with
type EventsClient struct {
sync.RWMutex
Expand Down Expand Up @@ -62,7 +68,24 @@ func newEventsClientConnectionWithAddress(peerAddress string) (*grpc.ClientConn,
func (ec *EventsClient) send(emsg *ehpb.Event) error {
ec.Lock()
defer ec.Unlock()
return ec.stream.Send(emsg)

// obtain the default signing identity for this peer; it will be used to sign the event
localMsp := mspmgmt.GetLocalMSP()
if localMsp == nil {
return errors.New("nil local MSP manager")
}

signer, err := localMsp.GetDefaultSigningIdentity()
if err != nil {
return fmt.Errorf("could not obtain the default signing identity, err %s", err)
}

signedEvt, err := utils.GetSignedEvent(emsg, signer)
if err != nil {
return fmt.Errorf("could not sign outgoing event, err %s", err)
}

return ec.stream.Send(signedEvt)
}

// RegisterAsync - registers interest in a event and doesn't wait for a response
Expand Down Expand Up @@ -196,7 +219,7 @@ func (ec *EventsClient) processEvents() error {
func (ec *EventsClient) Start() error {
conn, err := newEventsClientConnectionWithAddress(ec.peerAddress)
if err != nil {
return fmt.Errorf("Could not create client conn to %s:%s", ec.peerAddress, err)
return fmt.Errorf("could not create client conn to %s:%s", ec.peerAddress, err)
}

ies, err := ec.adapter.GetInterestedEvents()
Expand All @@ -211,7 +234,7 @@ func (ec *EventsClient) Start() error {
serverClient := ehpb.NewEventsClient(conn)
ec.stream, err = serverClient.Chat(context.Background())
if err != nil {
return fmt.Errorf("Could not create client conn to %s:%s", ec.peerAddress, err)
return fmt.Errorf("could not create client conn to %s:%s", ec.peerAddress, err)
}

if err = ec.register(ies); err != nil {
Expand Down
22 changes: 11 additions & 11 deletions events/producer/eventhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ func SendProducerBlockEvent(block *common.Block) error {
ebytes := d
if ebytes != nil {
if env, err := utils.GetEnvelopeFromBlock(ebytes); err != nil {
logger.Errorf("Error getting tx from block(%s)\n", err)
logger.Errorf("error getting tx from block(%s)\n", err)
} else if env != nil {
// get the payload from the envelope
payload, err := utils.GetPayload(env)
if err != nil {
return fmt.Errorf("Could not extract payload from envelope, err %s", err)
return fmt.Errorf("could not extract payload from envelope, err %s", err)
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
Expand All @@ -57,20 +57,20 @@ func SendProducerBlockEvent(block *common.Block) error {
if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
tx, err := utils.GetTransaction(payload.Data)
if err != nil {
return fmt.Errorf("Error unmarshalling transaction payload for block event: %s", err)
return fmt.Errorf("error unmarshalling transaction payload for block event: %s", err)
}
chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload)
if err != nil {
return fmt.Errorf("Error unmarshalling transaction action payload for block event: %s", err)
return fmt.Errorf("error unmarshalling transaction action payload for block event: %s", err)
}
propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload)
if err != nil {
return fmt.Errorf("Error unmarshalling proposal response payload for block event: %s", err)
return fmt.Errorf("error unmarshalling proposal response payload for block event: %s", err)
}
//ENDORSER_ACTION, ProposalResponsePayload.Extension field contains ChaincodeAction
caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension)
if err != nil {
return fmt.Errorf("Error unmarshalling chaincode action for block event: %s", err)
return fmt.Errorf("error unmarshalling chaincode action for block event: %s", err)
}
// Drop read write set from transaction before sending block event
// Performance issue with chaincode deploy txs and causes nodejs grpc
Expand All @@ -80,23 +80,23 @@ func SendProducerBlockEvent(block *common.Block) error {
caPayload.Results = nil
chaincodeActionPayload.Action.ProposalResponsePayload, err = utils.GetBytesProposalResponsePayload(propRespPayload.ProposalHash, caPayload.Response, caPayload.Results, caPayload.Events)
if err != nil {
return fmt.Errorf("Error marshalling tx proposal payload for block event: %s", err)
return fmt.Errorf("error marshalling tx proposal payload for block event: %s", err)
}
tx.Actions[0].Payload, err = utils.GetBytesChaincodeActionPayload(chaincodeActionPayload)
if err != nil {
return fmt.Errorf("Error marshalling tx action payload for block event: %s", err)
return fmt.Errorf("error marshalling tx action payload for block event: %s", err)
}
payload.Data, err = utils.GetBytesTransaction(tx)
if err != nil {
return fmt.Errorf("Error marshalling payload for block event: %s", err)
return fmt.Errorf("error marshalling payload for block event: %s", err)
}
env.Payload, err = utils.GetBytesPayload(payload)
if err != nil {
return fmt.Errorf("Error marshalling tx envelope for block event: %s", err)
return fmt.Errorf("error marshalling tx envelope for block event: %s", err)
}
ebytes, err = utils.GetBytesEnvelope(env)
if err != nil {
return fmt.Errorf("Cannot marshal transaction %s", err)
return fmt.Errorf("cannot marshal transaction %s", err)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ type eventProcessor struct {
var gEventProcessor *eventProcessor

func (ep *eventProcessor) start() {
producerLogger.Info("event processor started")
producerLogger.Info("Event processor started")
for {
//wait for event
e := <-ep.eventChannel
Expand All @@ -219,7 +219,7 @@ func (ep *eventProcessor) start() {
eType := getMessageType(e)
ep.Lock()
if hl, _ = ep.eventConsumers[eType]; hl == nil {
producerLogger.Errorf("Event of type %s does not exist", eType)
producerLogger.Errorf("event of type %s does not exist", eType)
ep.Unlock()
continue
}
Expand Down
82 changes: 71 additions & 11 deletions events/producer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package producer

import (
"errors"
"fmt"
"strconv"

"github.com/golang/protobuf/proto"

"github.com/hyperledger/fabric/msp/mgmt"
pb "github.com/hyperledger/fabric/protos/peer"
)

Expand Down Expand Up @@ -55,6 +59,7 @@ func getInterestKey(interest pb.Interest) string {
default:
producerLogger.Errorf("unknown interest type %s", interest.EventType)
}

return key
}

Expand Down Expand Up @@ -94,26 +99,30 @@ func (d *handler) deregisterAll() {
}

// HandleMessage handles the Openchain messages for the Peer.
func (d *handler) HandleMessage(msg *pb.Event) error {
//producerLogger.Debug("Handling Event")
switch msg.Event.(type) {
func (d *handler) HandleMessage(msg *pb.SignedEvent) error {
evt, err := validateEventMessage(msg)
if err != nil {
return errors.New("event message must be properly signed by an identitiy from a participating organization in any of the channels existing in the peer")
}

switch evt.Event.(type) {
case *pb.Event_Register:
eventsObj := msg.GetRegister()
eventsObj := evt.GetRegister()
if err := d.register(eventsObj.Events); err != nil {
return fmt.Errorf("Could not register events %s", err)
return fmt.Errorf("could not register events %s", err)
}
case *pb.Event_Unregister:
eventsObj := msg.GetUnregister()
eventsObj := evt.GetUnregister()
if err := d.deregister(eventsObj.Events); err != nil {
return fmt.Errorf("Could not unregister events %s", err)
return fmt.Errorf("could not unregister events %s", err)
}
case nil:
default:
return fmt.Errorf("Invalide type from client %T", msg.Event)
return fmt.Errorf("invalide type from client %T", evt.Event)
}
//TODO return supported events.. for now just return the received msg
if err := d.ChatStream.Send(msg); err != nil {
return fmt.Errorf("Error sending response to %v: %s", msg, err)
if err := d.ChatStream.Send(evt); err != nil {
return fmt.Errorf("error sending response to %v: %s", msg, err)
}

return nil
Expand All @@ -123,7 +132,58 @@ func (d *handler) HandleMessage(msg *pb.Event) error {
func (d *handler) SendMessage(msg *pb.Event) error {
err := d.ChatStream.Send(msg)
if err != nil {
return fmt.Errorf("Error Sending message through ChatStream: %s", err)
return fmt.Errorf("error Sending message through ChatStream: %s", err)
}
return nil
}

// Validates event messages by validating the Creator and verifying
// the signature. Returns the unmarshaled Event object
// Validation of the creator identity's validity is done by checking with local MSP to ensure the
// submitter is a member in the same organization as the peer
//
// TODO: ideally this should also check each channel's "Readers" policy to ensure the identity satisfies
// each channel's access control policy. This step is necessary because the registered listener is going
// to get read access to all channels by receiving Block events from all channels.
// However, this is not being done for v1.0 due to complexity concerns and the need to complex a stable,
// minimally viable release. Eventually events will be made channel-specific, at which point this method
// should be revisited
func validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, error) {
producerLogger.Debugf("ValidateEventMessage starts for signed event %p", signedEvt)

// messages from the client for registering and unregistering must be signed
// and accompanied by the signing certificate in the "Creator" field
evt := &pb.Event{}
err := proto.Unmarshal(signedEvt.EventBytes, evt)
if err != nil {
return nil, fmt.Errorf("error unmarshaling the event bytes in the SignedEvent: %s", err)
}

localMSP := mgmt.GetLocalMSP()
principalGetter := mgmt.NewLocalMSPPrincipalGetter()

// Load MSPPrincipal for policy
principal, err := principalGetter.Get("member")
if err != nil {
return nil, fmt.Errorf("failed getting local MSP principal [member]: [%s]", err)
}

id, err := localMSP.DeserializeIdentity(evt.Creator)
if err != nil {
return nil, fmt.Errorf("failed deserializing event creator: [%s]", err)
}

// Verify that event's creator satisfies the principal
err = id.SatisfiesPrincipal(principal)
if err != nil {
return nil, fmt.Errorf("failed verifying the creator satisfies local MSP's [member] principal: [%s]", err)
}

// Verify the signature
err = id.Verify(signedEvt.EventBytes, signedEvt.Signature)
if err != nil {
return nil, fmt.Errorf("failed verifying the event signature: %s", err)
}

return evt, nil
}
6 changes: 3 additions & 3 deletions events/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewEventsServer(bufferSize uint, timeout int) *EventsServer {
func (p *EventsServer) Chat(stream pb.Events_ChatServer) error {
handler, err := newEventHandler(stream)
if err != nil {
return fmt.Errorf("Error creating handler during handleChat initiation: %s", err)
return fmt.Errorf("error creating handler during handleChat initiation: %s", err)
}
defer handler.Stop()
for {
Expand All @@ -61,13 +61,13 @@ func (p *EventsServer) Chat(stream pb.Events_ChatServer) error {
return nil
}
if err != nil {
e := fmt.Errorf("Error during Chat, stopping handler: %s", err)
e := fmt.Errorf("error during Chat, stopping handler: %s", err)
producerLogger.Error(e.Error())
return e
}
err = handler.HandleMessage(in)
if err != nil {
producerLogger.Errorf("Error handling message: %s", err)
producerLogger.Errorf("error handling message: %s", err)
return err
}

Expand Down
Loading

0 comments on commit 3870bcf

Please sign in to comment.