Skip to content

Commit

Permalink
[FAB-8702] Peer dialer methods context parameter
Browse files Browse the repository at this point in the history
Change-Id: I8b6394587bee3bec3c8e28710e414c41d7d635ce
Signed-off-by: Troy Ronda <troy@troyronda.com>
  • Loading branch information
troyronda committed Mar 7, 2018
1 parent b3bab38 commit aae553d
Show file tree
Hide file tree
Showing 15 changed files with 50 additions and 41 deletions.
4 changes: 2 additions & 2 deletions pkg/context/api/core/mocks/mockconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func DefaultMockConfig(mockCtrl *gomock.Controller) *MockConfig {
config.EXPECT().TLSCACertPool(GoodCert).Return(CertPool, nil).AnyTimes()
config.EXPECT().TLSCACertPool(BadCert).Return(CertPool, errors.New(ErrorMessage)).AnyTimes()
config.EXPECT().TLSCACertPool().Return(CertPool, nil).AnyTimes()
config.EXPECT().TimeoutOrDefault(core.Endorser).Return(time.Second * 5).AnyTimes()
config.EXPECT().TimeoutOrDefault(core.EndorserConnection).Return(time.Second * 5).AnyTimes()
config.EXPECT().TLSClientCerts().Return([]tls.Certificate{TLSCert}, nil).AnyTimes()

return config
Expand All @@ -51,7 +51,7 @@ func BadTLSClientMockConfig(mockCtrl *gomock.Controller) *MockConfig {
config.EXPECT().TLSCACertPool(GoodCert).Return(CertPool, nil).AnyTimes()
config.EXPECT().TLSCACertPool(BadCert).Return(CertPool, errors.New(ErrorMessage)).AnyTimes()
config.EXPECT().TLSCACertPool().Return(CertPool, nil).AnyTimes()
config.EXPECT().TimeoutOrDefault(core.Endorser).Return(time.Second * 5).AnyTimes()
config.EXPECT().TimeoutOrDefault(core.EndorserConnection).Return(time.Second * 5).AnyTimes()
config.EXPECT().TLSClientCerts().Return(nil, errors.Errorf(ErrorMessage)).AnyTimes()

return config
Expand Down
4 changes: 2 additions & 2 deletions pkg/context/api/core/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type ConfigProvider func() (Config, error)
type TimeoutType int

const (
// Endorser connection timeout
Endorser TimeoutType = iota
// EndorserConnection connection timeout
EndorserConnection TimeoutType = iota
// EventHubConnection connection timeout
EventHubConnection
// EventReg connection timeout
Expand Down
9 changes: 5 additions & 4 deletions pkg/context/api/fab/mocks/mockfabapi.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/context/api/fab/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ SPDX-License-Identifier: Apache-2.0
package fab

import (
reqContext "context"

pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
)

// ProposalProcessor simulates transaction proposal, so that a client can submit the result for ordering.
type ProposalProcessor interface {
ProcessTransactionProposal(ProcessProposalRequest) (*TransactionProposalResponse, error)
ProcessTransactionProposal(reqContext.Context, ProcessProposalRequest) (*TransactionProposalResponse, error)
}

// ProposalSender provides the ability for a transaction proposal to be created and sent.
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (c *Config) Timeout(conn core.TimeoutType) time.Duration {
func (c *Config) getTimeout(conn core.TimeoutType) time.Duration {
var timeout time.Duration
switch conn {
case core.Endorser:
case core.EndorserConnection:
timeout = c.configViper.GetDuration("client.peer.timeout.connection")
case core.Query:
timeout = c.configViper.GetDuration("client.peer.timeout.queryResponse")
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func TestTimeouts(t *testing.T) {
configImpl.configViper.Set("client.peer.timeout.executeTxResponse", "8h")
configImpl.configViper.Set("client.orderer.timeout.response", "6s")

t1 := configImpl.TimeoutOrDefault(api.Endorser)
t1 := configImpl.TimeoutOrDefault(api.EndorserConnection)
if t1 != time.Second*2 {
t.Fatalf("Timeout not read correctly. Got: %s", t1)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/fab/channel/deprecated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ func TestSendInstantiateProposal(t *testing.T) {

tpr := fab.TransactionProposalResponse{Endorser: "example.com", Status: 99}

proc.EXPECT().ProcessTransactionProposal(gomock.Any()).Return(&tpr, nil)
proc.EXPECT().ProcessTransactionProposal(gomock.Any()).Return(&tpr, nil)
proc.EXPECT().ProcessTransactionProposal(gomock.Any(), gomock.Any()).Return(&tpr, nil)
proc.EXPECT().ProcessTransactionProposal(gomock.Any(), gomock.Any()).Return(&tpr, nil)
targets := []fab.ProposalProcessor{proc}

//Add a Peer
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestSendUpgradeProposal(t *testing.T) {

tpr := fab.TransactionProposalResponse{Endorser: "example.com", Status: 99, ProposalResponse: nil}

proc.EXPECT().ProcessTransactionProposal(gomock.Any()).Return(&tpr, nil)
proc.EXPECT().ProcessTransactionProposal(gomock.Any(), gomock.Any()).Return(&tpr, nil)
targets := []fab.ProposalProcessor{proc}

//Add a Peer
Expand Down
3 changes: 2 additions & 1 deletion pkg/fab/mocks/mockpeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package mocks

// TODO: Move protos to this library
import (
reqContext "context"
"encoding/pem"
"sync"

Expand Down Expand Up @@ -83,7 +84,7 @@ func (p *MockPeer) URL() string {
}

// ProcessTransactionProposal does not send anything anywhere but returns an empty mock ProposalResponse
func (p *MockPeer) ProcessTransactionProposal(tp fab.ProcessProposalRequest) (*fab.TransactionProposalResponse, error) {
func (p *MockPeer) ProcessTransactionProposal(ctx reqContext.Context, tp fab.ProcessProposalRequest) (*fab.TransactionProposalResponse, error) {
if p.RWLock != nil {
p.RWLock.Lock()
defer p.RWLock.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/fab/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ func (p *Peer) URL() string {
}

// ProcessTransactionProposal sends the created proposal to peer for endorsement.
func (p *Peer) ProcessTransactionProposal(proposal fab.ProcessProposalRequest) (*fab.TransactionProposalResponse, error) {
return p.processor.ProcessTransactionProposal(proposal)
func (p *Peer) ProcessTransactionProposal(ctx reqContext.Context, proposal fab.ProcessProposalRequest) (*fab.TransactionProposalResponse, error) {
return p.processor.ProcessTransactionProposal(ctx, proposal)
}

func (p *Peer) String() string {
Expand Down
14 changes: 10 additions & 4 deletions pkg/fab/peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package peer

import (
reqContext "context"
"encoding/pem"
"fmt"
"io/ioutil"
Expand All @@ -22,11 +23,14 @@ import (
"github.com/pkg/errors"
)

const (
normalTimeout = 5 * time.Second
)

// TestNewPeerWithCertNoTLS tests that a peer can be constructed without using a cert
func TestNewPeerWithCertNoTLS(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

config := mock_core.DefaultMockConfig(mockCtrl)

url := "http://example.com"
Expand Down Expand Up @@ -80,7 +84,7 @@ func TestNewPeerTLSFromCertBad(t *testing.T) {

//apiconfig := mock_core.DefaultMockConfig(mockCtrl)
config := mock_core.NewMockConfig(mockCtrl)
config.EXPECT().TimeoutOrDefault(core.Endorser).Return(time.Second * 5)
config.EXPECT().TimeoutOrDefault(core.EndorserConnection).Return(time.Second * 5)
config.EXPECT().TLSCACertPool(gomock.Any()).Return(nil, errors.New("failed to get certpool")).AnyTimes()

url := "grpcs://0.0.0.0:1234"
Expand Down Expand Up @@ -199,10 +203,12 @@ func TestProposalProcessorSendProposal(t *testing.T) {
tp := mockProcessProposalRequest()
tpr := fab.TransactionProposalResponse{Endorser: "example.com", Status: 99, ProposalResponse: nil}

proc.EXPECT().ProcessTransactionProposal(tp).Return(&tpr, nil)
proc.EXPECT().ProcessTransactionProposal(gomock.Any(), tp).Return(&tpr, nil)

p := Peer{processor: proc, name: "", roles: nil}
tpr1, err := p.ProcessTransactionProposal(tp)
ctx, cancel := reqContext.WithTimeout(reqContext.Background(), normalTimeout)
defer cancel()
tpr1, err := p.ProcessTransactionProposal(ctx, tp)

if err != nil || !reflect.DeepEqual(&tpr, tpr1) {
t.Fatalf("Peer didn't proxy proposal processing")
Expand Down
22 changes: 8 additions & 14 deletions pkg/fab/peer/peerendorser.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newPeerEndorser(endorseReq *peerEndorserRequest) (*peerEndorser, error) {
}
opts = append(opts, grpc.WithDefaultCallOptions(grpc.FailFast(endorseReq.failFast)))

timeout := endorseReq.config.TimeoutOrDefault(core.Endorser)
timeout := endorseReq.config.TimeoutOrDefault(core.EndorserConnection)

tlsConfig, err := comm.TLSConfig(endorseReq.certificate, endorseReq.serverHostOverride, endorseReq.config)
if err != nil {
Expand All @@ -75,10 +75,10 @@ func newPeerEndorser(endorseReq *peerEndorserRequest) (*peerEndorser, error) {
}

// ProcessTransactionProposal sends the transaction proposal to a peer and returns the response.
func (p *peerEndorser) ProcessTransactionProposal(request fab.ProcessProposalRequest) (*fab.TransactionProposalResponse, error) {
func (p *peerEndorser) ProcessTransactionProposal(ctx reqContext.Context, request fab.ProcessProposalRequest) (*fab.TransactionProposalResponse, error) {
logger.Debugf("Processing proposal using endorser: %s", p.target)

proposalResponse, err := p.sendProposal(request, p.secured)
proposalResponse, err := p.sendProposal(ctx, request, p.secured)
if err != nil {
tpr := fab.TransactionProposalResponse{Endorser: p.target}
return &tpr, errors.Wrapf(err, "Transaction processing for endorser [%s]", p.target)
Expand Down Expand Up @@ -107,13 +107,13 @@ func (p *peerEndorser) conn(ctx reqContext.Context, secured bool) (*grpc.ClientC
return p.connector.DialContext(ctx, p.target, grpcOpts...)
}

func (p *peerEndorser) sendProposal(proposal fab.ProcessProposalRequest, secured bool) (*pb.ProposalResponse, error) {
conn, err := p.conn(reqContext.Background(), secured)
func (p *peerEndorser) sendProposal(ctx reqContext.Context, proposal fab.ProcessProposalRequest, secured bool) (*pb.ProposalResponse, error) {
conn, err := p.conn(ctx, secured)
if err != nil {
if secured && p.allowInsecure {
//If secured mode failed and allow insecure is enabled then retry in insecure mode
logger.Debug("Secured NewEndorserClient failed, attempting insecured")
return p.sendProposal(proposal, false)
return p.sendProposal(ctx, proposal, false)
}
rpcStatus, ok := grpcstatus.FromError(err)
if ok {
Expand All @@ -124,15 +124,9 @@ func (p *peerEndorser) sendProposal(proposal fab.ProcessProposalRequest, secured
defer p.connector.ReleaseConn(conn)

endorserClient := pb.NewEndorserClient(conn)
resp, err := endorserClient.ProcessProposal(reqContext.Background(), proposal.SignedProposal)
resp, err := endorserClient.ProcessProposal(ctx, proposal.SignedProposal)
if err != nil {
logger.Error("NewEndorserClient failed, cause : ", err)
if secured && p.allowInsecure {
//If secured mode failed and allow insecure is enabled then retry in insecure mode
logger.Debug("Secured NewEndorserClient failed, attempting insecured")
return p.sendProposal(proposal, false)
}

logger.Errorf("process proposal failed [%s]", err)
rpcStatus, ok := grpcstatus.FromError(err)
if ok {
err = status.NewFromGRPCStatus(rpcStatus)
Expand Down
5 changes: 4 additions & 1 deletion pkg/fab/peer/peerendorser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package peer

import (
reqContext "context"
"crypto/x509"
"fmt"
"net"
Expand Down Expand Up @@ -236,7 +237,9 @@ func testProcessProposal(t *testing.T, url string) (*fab.TransactionProposalResp
t.Fatalf("Peer conn construction error (%v)", err)
}

return conn.ProcessTransactionProposal(mockProcessProposalRequest())
ctx, cancel := reqContext.WithTimeout(reqContext.Background(), normalTimeout)
defer cancel()
return conn.ProcessTransactionProposal(ctx, mockProcessProposalRequest())
}

func getPeerEndorserRequest(url string, cert *x509.Certificate, serverHostOverride string,
Expand Down
2 changes: 1 addition & 1 deletion pkg/fab/resource/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestJoinChannel(t *testing.T) {
defer grpcServer.Stop()

endorserServer, addr := startEndorserServer(t, grpcServer)
peer, _ := peer.New(mocks.NewMockConfig(), peer.WithURL(addr), peer.WithInsecure())
peer, _ := peer.New(mocks.NewMockConfig(), peer.WithURL("grpc://"+addr), peer.WithInsecure())
peers = append(peers, peer)
orderer := mocks.NewMockOrderer("", nil)
orderer.(mocks.MockOrderer).EnqueueForSendDeliver(mocks.NewSimpleMockBlock())
Expand Down
4 changes: 3 additions & 1 deletion pkg/fab/txn/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package txn

import (
reqContext "context"
"sync"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -102,7 +103,8 @@ func SendProposal(ctx context, proposal *fab.TransactionProposal, targets []fab.
go func(processor fab.ProposalProcessor) {
defer wg.Done()

resp, err := processor.ProcessTransactionProposal(request)
// TODO: The RPC should be timed-out.
resp, err := processor.ProcessTransactionProposal(reqContext.Background(), request)
if err != nil {
logger.Debugf("Received error response from txn proposal processing: %v", err)
responseMtx.Lock()
Expand Down
6 changes: 3 additions & 3 deletions pkg/fab/txn/proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestSendTransactionProposalToProcessors(t *testing.T) {
}

tpr := fab.TransactionProposalResponse{Endorser: "example.com", Status: 99}
proc.EXPECT().ProcessTransactionProposal(tp).Return(&tpr, nil)
proc.EXPECT().ProcessTransactionProposal(gomock.Any(), tp).Return(&tpr, nil)
targets := []fab.ProposalProcessor{proc}

result, err := SendProposal(ctx, &fab.TransactionProposal{
Expand Down Expand Up @@ -230,8 +230,8 @@ func TestProposalResponseError(t *testing.T) {

// Test with error from lower layer
tpr := fab.TransactionProposalResponse{Endorser: "example.com", Status: 200}
proc.EXPECT().ProcessTransactionProposal(tp).Return(&tpr, testError)
proc2.EXPECT().ProcessTransactionProposal(tp).Return(&tpr, testError)
proc.EXPECT().ProcessTransactionProposal(gomock.Any(), tp).Return(&tpr, testError)
proc2.EXPECT().ProcessTransactionProposal(gomock.Any(), tp).Return(&tpr, testError)

targets := []fab.ProposalProcessor{proc, proc2}
_, err = SendProposal(ctx, &fab.TransactionProposal{
Expand Down

0 comments on commit aae553d

Please sign in to comment.