Skip to content

Commit

Permalink
FABGW-25 Endorse using generated ChaincodeInterest
Browse files Browse the repository at this point in the history
Change the Endorse() method logic as described in the design document attached to the Jira.
- Process proposal on local org peer.
- Using the ChaincodeInterest in the propsal response, invoke discovery to obtain a layout that contains the local org
- Obtain the remaining endorsements
- Build the transaction to send back to the client.

Extra unit tests added to test the new logic.

Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
  • Loading branch information
andrew-coleman committed Jul 23, 2021
1 parent fa3960b commit e61bff5
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 134 deletions.
98 changes: 88 additions & 10 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
return nil, status.Error(codes.InvalidArgument, "an evaluate request is required")
}
signedProposal := request.GetProposedTransaction()
channel, chaincodeID, err := getChannelAndChaincodeFromSignedProposal(signedProposal)
channel, chaincodeID, _, err := getChannelAndChaincodeFromSignedProposal(signedProposal)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err)
}

err = gs.registry.registerChannel(channel)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
}

endorser, err := gs.registry.evaluator(channel, chaincodeID, request.GetTargetOrganizations())
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
Expand Down Expand Up @@ -68,7 +73,7 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
Result: retVal,
}

logger.Debugw("Evaluate call to endorser returned success", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", retVal.Status, "message", retVal.Message)
logger.Debugw("Evaluate call to endorser returned success", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", retVal.GetStatus(), "message", retVal.GetMessage())
return evaluateResponse, nil
}

Expand All @@ -86,31 +91,105 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err)
}
channel, chaincodeID, err := getChannelAndChaincodeFromSignedProposal(signedProposal)
channel, chaincodeID, hasTransientData, err := getChannelAndChaincodeFromSignedProposal(signedProposal)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err)
}

var endorsers []*endorser
if len(request.EndorsingOrganizations) > 0 {
endorsers, err = gs.registry.endorsersForOrgs(channel, chaincodeID, request.EndorsingOrganizations)
} else {
endorsers, err = gs.registry.endorsers(channel, chaincodeID)
}
err = gs.registry.registerChannel(channel)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
}

ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()

defaultInterest := &peer.ChaincodeInterest{
Chaincodes: []*peer.ChaincodeCall{{
Name: chaincodeID,
}},
}

var endorsers []*endorser
var responses []*peer.ProposalResponse
if len(request.EndorsingOrganizations) > 0 {
// The client is specifying the endorsing orgs and taking responsibility for ensuring it meets the signature policy
endorsers, err = gs.registry.endorsersForOrgs(channel, chaincodeID, request.EndorsingOrganizations)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
}
} else {
// The client is delegating choice of endorsers to the gateway.

// 1. Choose an endorser from the gateway's organization
var firstEndorser *endorser
es, ok := gs.registry.endorsersByOrg(channel, chaincodeID)[gs.registry.localEndorser.mspid]
if !ok {
// No local org endorsers for this channel/chaincode. If transient data is involved, return error
if hasTransientData {
return nil, status.Error(codes.FailedPrecondition, "no endorsers found in the gateway's organization; retry specifying endorsing organization(s) to protect transient data")
}
// Otherwise, just let discovery pick one.
endorsers, err = gs.registry.endorsers(channel, defaultInterest, "")
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
}
firstEndorser = endorsers[0]
} else {
firstEndorser = es[0].endorser
}

gs.logger.Debugw("Sending to first endorser:", "channel", channel, "chaincode", chaincodeID, "MSPID", firstEndorser.mspid, "endpoint", firstEndorser.address)

// 2. Process the proposal on this endorser
firstResponse, err := firstEndorser.client.ProcessProposal(ctx, signedProposal)
if err != nil {
return nil, rpcError(codes.Aborted, "failed to endorse transaction", endpointError(firstEndorser, err))
}
if firstResponse.Response.Status < 200 || firstResponse.Response.Status >= 400 {
return nil, rpcError(codes.Aborted, "failed to endorse transaction", endpointError(firstEndorser, fmt.Errorf("error %d, %s", firstResponse.Response.Status, firstResponse.Response.Message)))
}

// 3. Extract ChaincodeInterest and SBE policies
// The chaincode interest could be nil for legacy peers and for chaincode functions that don't produce a read-write set
interest := firstResponse.Interest
if len(interest.GetChaincodes()) == 0 {
interest = defaultInterest
}

// 4. If transient data is involved, then we need to ensure that discovery only returns orgs which own the collections involved.
// Do this by setting NoPrivateReads to false on each collection
if hasTransientData {
for _, call := range interest.GetChaincodes() {
call.NoPrivateReads = false
}
}

// 5. Get a set of endorsers from discovery via the registry
// The preferred discovery layout will contain the firstEndorser's Org.
endorsers, err = gs.registry.endorsers(channel, interest, firstEndorser.mspid)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
}

// 6. Remove the gateway org's endorser, since we've already done that
for i, e := range endorsers {
if e.mspid == firstEndorser.mspid {
endorsers = append(endorsers[:i], endorsers[i+1:]...)
responses = append(responses, firstResponse)
break
}
}
}

var wg sync.WaitGroup
responseCh := make(chan *endorserResponse, len(endorsers))
// send to all the endorsers
for _, e := range endorsers {
wg.Add(1)
go func(e *endorser) {
defer wg.Done()
gs.logger.Debugw("Sending to endorser:", "channel", channel, "chaincode", chaincodeID, "MSPID", e.mspid, "endpoint", e.address)
response, err := e.client.ProcessProposal(ctx, signedProposal)
switch {
case err != nil:
Expand All @@ -129,7 +208,6 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
wg.Wait()
close(responseCh)

var responses []*peer.ProposalResponse
var errorDetails []proto.Message
for response := range responseCh {
if response.err != nil {
Expand Down
Loading

0 comments on commit e61bff5

Please sign in to comment.