Skip to content

Commit

Permalink
[FAB-5599] Blocks events client example
Browse files Browse the repository at this point in the history
Since there was a new API introduced to deliver both filtered blocks
with minimal information per requirements from FAB-5481 and normal
blocks, which is going to replace events producer functionaly. This
commit add an example of how clients could leverage new service APIs to
receive an updates of new blocks committed.

Change-Id: I94984eab966365272706520615ebab5514e44f38
Signed-off-by: Artem Barger <bartem@il.ibm.com>
Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
  • Loading branch information
C0rWin authored and wlahti committed Jan 26, 2018
1 parent e821875 commit e693389
Show file tree
Hide file tree
Showing 3 changed files with 388 additions and 0 deletions.
19 changes: 19 additions & 0 deletions examples/events/block-listener/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
# Deprecation
Please note that events API available in Hyperledger Fabric v1.0.0 will be deprecated in favour of new
events delivery API

```proto
service Deliver {
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with Payload data as a marshaled orderer.SeekInfo message,
// then a stream of block replies is received.
rpc Deliver (stream common.Envelope) returns (stream DeliverResponse) {
}
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with Payload data as a marshaled orderer.SeekInfo message,
// then a stream of **filtered** block replies is received.
rpc DeliverFiltered (stream common.Envelope) returns (stream DeliverResponse) {
}
}
```

Please explore `eventsclient` example for demonstration of using new APIs.

# What is block-listener
block-listener.go connects to a peer in order to receive block and chaincode
events (if there are chaincode events being sent).
Expand Down
92 changes: 92 additions & 0 deletions examples/events/eventsclient/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Events client
This sample client demonstrates how to connect to a peer to receive block
events. Block events come in the form of either full blocks as they have been
committed to the ledger or filtered blocks (a minimal set of information about
the block) which includes the transaction ids, transaction statuses, and any
chaincode events associated with the transaction.

# Events service interface
Starting with v1.1, two new event services are available:

```proto
service Deliver {
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with Payload data as a marshaled orderer.SeekInfo message,
// then a stream of block replies is received.
rpc Deliver (stream common.Envelope) returns (stream DeliverResponse) {
}
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with Payload data as a marshaled orderer.SeekInfo message,
// then a stream of **filtered** block replies is received.
rpc DeliverFiltered (stream common.Envelope) returns (stream DeliverResponse) {
}
}
```

This sample demonstrates connecting to both of these services.

# General use
```sh
cd fabric/examples/events/eventsclient
go build
```
You will see the executable **eventsclient** if there are no compilation errors.

Next, to start receiving block events from a peer with TLS enabled, run the
following command:

```sh
CORE_PEER_LOCALMSPID=<msp-id> CORE_PEER_MSPCONFIGPATH=<path to MSP folder> ./eventsclient -channelID=<channel-id> -filtered=<true or false> -tls=true -clientKey=<path to the client key> -clientCert=<path to the client TLS certificate> -rootCert=<path to the server root CA certificate>
```

If the peer is not using TLS you can run:

```bash
CORE_PEER_LOCALMSPID=<msp-id> CORE_PEER_MSPCONFIGPATH=<path to MSP folder> ./eventsclient -channelID=<channel-id> -filtered=<true or false> -tls=false
```

The peer will begin delivering block events and print the output to the console.

# Example with the e2e_cli example
The events client sample can be used with TLS enabled or disabled. By default,
the e2e_cli example will have TLS enabled. In order to allow the events client
to connect to peers created by the e2e_cli example with TLS enabled, the easiest
way would be to map `127.0.0.1` to the hostname of the peer that you are
connecting to, such as `peer0.org1.example.com`. For example on \*nix based
systems this would be an entry in `/etc/hosts` file.

If you would prefer to disable TLS, you may do so by setting
CORE_PEER_TLS_ENABLED=***false*** in ``docker-compose-cli.yaml`` and
``base/peer-base.yaml`` as well as
ORDERER_GENERAL_TLS_ENABLED=***false*** in``base/docker-compose-base.yaml``.

Next, run the [e2e_cli example](https://github.com/hyperledger/fabric/tree/master/examples/e2e_cli).

Once the "All in one" command:
```sh
./network_setup.sh up
```
has completed, attach the event client to peer peer0.org1.example.com by doing
the following:

* If TLS is enabled:
* to receive full blocks:
```sh
CORE_PEER_LOCALMSPID=Org1MSP CORE_PEER_MSPCONFIGPATH=$GOPATH/src/github.com/hyperledger/fabric/examples/e2e_cli/crypto-config/peerOrganizations/org1.example.com/peers/peer0.Org1.example.com/msp ./eventsclient -server=peer0.org1.example.com:7051 -channelID=mychannel -filtered=false -tls=true -clientKey=$GOPATH/src/github.com/hyperledger/fabric/examples/e2e_cli/crypto-config/peerOrganizations/org1.example.com/users/Admin@Org1.example.com/tls/client.key -clientCert=$GOPATH/src/github.com/hyperledger/fabric/examples/e2e_cli/crypto-config/peerOrganizations/org1.example.com/users/Admin@Org1.example.com/tls/client.crt -rootCert=$GOPATH/src/github.com/hyperledger/fabric/examples/e2e_cli/crypto-config/peerOrganizations/org1.example.com/users/Admin@Org1.example.com/tls/ca.crt
```

* to receive filtered blocks:
```sh
CORE_PEER_LOCALMSPID=Org1MSP CORE_PEER_MSPCONFIGPATH=$GOPATH/src/github.com/hyperledger/fabric/examples/e2e_cli/crypto-config/peerOrganizations/org1.example.com/peers/peer0.Org1.example.com/msp ./eventsclient -server=peer0.org1.example.com:7051 -channelID=mychannel -filtered=true -tls=true -clientKey=$GOPATH/src/github.com/hyperledger/fabric/examples/e2e_cli/crypto-config/peerOrganizations/org1.example.com/users/Admin@Org1.example.com/tls/client.key -clientCert=$GOPATH/src/github.com/hyperledger/fabric/examples/e2e_cli/crypto-config/peerOrganizations/org1.example.com/users/Admin@Org1.example.com/tls/client.crt -rootCert=$GOPATH/src/github.com/hyperledger/fabric/examples/e2e_cli/crypto-config/peerOrganizations/org1.example.com/users/Admin@Org1.example.com/tls/ca.crt
```

* If TLS is disabled:
* to receive full blocks:
```sh
CORE_PEER_LOCALMSPID=Org1MSP CORE_PEER_MSPCONFIGPATH=$GOPATH/src/github.com/hyperledger/fabric/examples/e2e_cli/crypto-config/peerOrganizations/org1.example.com/peers/peer0.Org1.example.com/msp ./eventsclient -server=peer0.org1.example.com:7051 -channelID=mychannel -filtered=false -tls=false
```

* to receive filtered blocks:
```sh
CORE_PEER_LOCALMSPID=Org1MSP CORE_PEER_MSPCONFIGPATH=$GOPATH/src/github.com/hyperledger/fabric/examples/e2e_cli/crypto-config/peerOrganizations/org1.example.com/peers/peer0.Org1.example.com/msp ./eventsclient -server=peer0.org1.example.com:7051 -channelID=mychannel -filtered=true -tls=false
```

<a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by/4.0/88x31.png" /></a><br />This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International License</a>.
277 changes: 277 additions & 0 deletions examples/events/eventsclient/eventsclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package main

import (
"context"
"flag"
"fmt"
"io/ioutil"
"math"
"os"
"strings"
"time"

"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/localmsp"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/common/tools/protolator"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/comm"
config2 "github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/msp"
common2 "github.com/hyperledger/fabric/peer/common"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/spf13/viper"
)

var (
channelID string
serverAddr string
clientKeyPath string
clientCertPath string
serverRootCAPath string
seek int
quiet bool
filtered bool
tlsEnabled bool
mTlsEnabled bool

oldest = &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}}
newest = &orderer.SeekPosition{Type: &orderer.SeekPosition_Newest{Newest: &orderer.SeekNewest{}}}
maxStop = &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}}
)

const (
OLDEST = -2
NEWEST = -1

ROOT = "core"
)

var logger = flogging.MustGetLogger("eventsclient")

// deliverClient abstracts common interface
// for deliver and deliverfiltered services
type deliverClient interface {
Send(*common.Envelope) error
Recv() (*peer.DeliverResponse, error)
}

// eventsClient client to get connected to the
// events peer delivery system
type eventsClient struct {
client deliverClient
signer crypto.LocalSigner
tlsCertHash []byte
}

func (r *eventsClient) seekOldest() error {
return r.client.Send(r.seekHelper(oldest, maxStop))
}

func (r *eventsClient) seekNewest() error {
return r.client.Send(r.seekHelper(newest, maxStop))
}

func (r *eventsClient) seekSingle(blockNumber uint64) error {
specific := &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: blockNumber}}}
return r.client.Send(r.seekHelper(specific, specific))
}

func (r *eventsClient) seekHelper(start *orderer.SeekPosition, stop *orderer.SeekPosition) *common.Envelope {
env, err := utils.CreateSignedEnvelopeWithTLSBinding(common.HeaderType_DELIVER_SEEK_INFO, channelID, r.signer, &orderer.SeekInfo{
Start: start,
Stop: stop,
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}, 0, 0, r.tlsCertHash)
if err != nil {
panic(err)
}
return env
}

func (r *eventsClient) readEventsStream() {
for {
msg, err := r.client.Recv()
if err != nil {
logger.Info("Error receiving:", err)
return
}

switch t := msg.Type.(type) {
case *peer.DeliverResponse_Status:
logger.Info("Got status ", t)
return
case *peer.DeliverResponse_Block:
if !quiet {
logger.Info("Received block: ")
err := protolator.DeepMarshalJSON(os.Stdout, t.Block)
if err != nil {
fmt.Printf(" Error pretty printing block: %s", err)
}
} else {
logger.Info("Received block: ", t.Block.Header.Number)
}
case *peer.DeliverResponse_FilteredBlock:
if !quiet {
logger.Info("Received filtered block: ")
err := protolator.DeepMarshalJSON(os.Stdout, t.FilteredBlock)
if err != nil {
fmt.Printf(" Error pretty printing filtered block: %s", err)
}
} else {
logger.Info("Received filtered block: ", t.FilteredBlock.Number)
}
}
}
}

func (r *eventsClient) seek(s int) error {
var err error
switch seek {
case OLDEST:
err = r.seekOldest()
case NEWEST:
err = r.seekNewest()
default:
err = r.seekSingle(uint64(seek))
}
return err
}

func main() {
initConfig()
initMSP()
readCLInputs()

if seek < OLDEST {
logger.Info("Invalid seek value")
flag.PrintDefaults()
return
}

clientConfig := comm.ClientConfig{
KaOpts: comm.DefaultKeepaliveOptions(),
SecOpts: &comm.SecureOptions{},
Timeout: 5 * time.Minute,
}

if tlsEnabled {
clientConfig.SecOpts.UseTLS = true
rootCert, err := ioutil.ReadFile(serverRootCAPath)
if err != nil {
logger.Info("error loading TLS root certificate", err)
return
}
clientConfig.SecOpts.ServerRootCAs = [][]byte{rootCert}
if mTlsEnabled {
clientConfig.SecOpts.RequireClientCert = true
clientKey, err := ioutil.ReadFile(clientKeyPath)
if err != nil {
logger.Info("error loading client TLS key from", clientKeyPath)
return
}
clientConfig.SecOpts.Key = clientKey

clientCert, err := ioutil.ReadFile(clientCertPath)
if err != nil {
logger.Info("error loading client TLS certificate from path", clientCertPath)
return
}
clientConfig.SecOpts.Certificate = clientCert
}
}

grpcClient, err := comm.NewGRPCClient(clientConfig)
if err != nil {
logger.Info("Error creating grpc client:", err)
return
}
conn, err := grpcClient.NewConnection(serverAddr, "")
if err != nil {
logger.Info("Error connecting:", err)
return
}

var client deliverClient
if filtered {
client, err = peer.NewDeliverClient(conn).DeliverFiltered(context.Background())
} else {
client, err = peer.NewDeliverClient(conn).Deliver(context.Background())
}

if err != nil {
logger.Info("Error connecting:", err)
return
}

events := &eventsClient{
client: client,
signer: localmsp.NewSigner(),
}

if mTlsEnabled {
events.tlsCertHash = util.ComputeSHA256(grpcClient.Certificate().Certificate[0])
}

events.seek(seek)
if err != nil {
logger.Info("Received error:", err)
return
}

events.readEventsStream()
}

func readCLInputs() {
flag.StringVar(&serverAddr, "server", "localhost:7051", "The RPC server to connect to.")
flag.StringVar(&channelID, "channelID", genesisconfig.TestChainID, "The channel ID to deliver from.")
flag.BoolVar(&quiet, "quiet", false, "Only print the block number, will not attempt to print its block contents.")
flag.BoolVar(&filtered, "filtered", true, "Whenever to read filtered events from the peer delivery service or get regular blocks.")
flag.BoolVar(&tlsEnabled, "tls", false, "TLS enabled/disabled")
flag.BoolVar(&mTlsEnabled, "mTls", false, "Mutual TLS enabled/disabled (whenever server side validates clients TLS certificate)")
flag.StringVar(&clientKeyPath, "clientKey", "", "Specify path to the client TLS key")
flag.StringVar(&clientCertPath, "clientCert", "", "Specify path to the client TLS certificate")
flag.StringVar(&serverRootCAPath, "rootCert", "", "Specify path to the server root CA certificate")
flag.IntVar(&seek, "seek", OLDEST, "Specify the range of requested blocks."+
"Acceptable values:"+
"-2 (or -1) to start from oldest (or newest) and keep at it indefinitely."+
"N >= 0 to fetch block N only.")
flag.Parse()
}

func initMSP() {
// Init the MSP
var mspMgrConfigDir = config2.GetPath("peer.mspConfigPath")
var mspID = viper.GetString("peer.localMspId")
var mspType = viper.GetString("peer.localMspType")
if mspType == "" {
mspType = msp.ProviderTypeToString(msp.FABRIC)
}
err := common2.InitCrypto(mspMgrConfigDir, mspID, mspType)
if err != nil { // Handle errors reading the config file
panic(fmt.Sprintf("Cannot run client because %s", err.Error()))
}
}

func initConfig() {
// For environment variables.
viper.SetEnvPrefix(ROOT)
viper.AutomaticEnv()
replacer := strings.NewReplacer(".", "_")
viper.SetEnvKeyReplacer(replacer)

err := common2.InitConfig(ROOT)
if err != nil { // Handle errors reading the config file
panic(fmt.Errorf("fatal error when initializing %s config : %s", ROOT, err))
}
}

0 comments on commit e693389

Please sign in to comment.