diff --git a/internal/pkg/gateway/api.go b/internal/pkg/gateway/api.go index e1e99827d6f..94e933543ab 100644 --- a/internal/pkg/gateway/api.go +++ b/internal/pkg/gateway/api.go @@ -32,11 +32,16 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g return nil, status.Error(codes.InvalidArgument, "an evaluate request is required") } signedProposal := request.GetProposedTransaction() - channel, chaincodeID, err := getChannelAndChaincodeFromSignedProposal(signedProposal) + channel, chaincodeID, _, err := getChannelAndChaincodeFromSignedProposal(signedProposal) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err) } + err = gs.registry.registerChannel(channel) + if err != nil { + return nil, status.Errorf(codes.Unavailable, "%s", err) + } + endorser, err := gs.registry.evaluator(channel, chaincodeID, request.GetTargetOrganizations()) if err != nil { return nil, status.Errorf(codes.Unavailable, "%s", err) @@ -68,7 +73,7 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g Result: retVal, } - logger.Debugw("Evaluate call to endorser returned success", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", retVal.Status, "message", retVal.Message) + logger.Debugw("Evaluate call to endorser returned success", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", retVal.GetStatus(), "message", retVal.GetMessage()) return evaluateResponse, nil } @@ -86,17 +91,12 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp. if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err) } - channel, chaincodeID, err := getChannelAndChaincodeFromSignedProposal(signedProposal) + channel, chaincodeID, hasTransientData, err := getChannelAndChaincodeFromSignedProposal(signedProposal) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err) } - var endorsers []*endorser - if len(request.EndorsingOrganizations) > 0 { - endorsers, err = gs.registry.endorsersForOrgs(channel, chaincodeID, request.EndorsingOrganizations) - } else { - endorsers, err = gs.registry.endorsers(channel, chaincodeID) - } + err = gs.registry.registerChannel(channel) if err != nil { return nil, status.Errorf(codes.Unavailable, "%s", err) } @@ -104,6 +104,84 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp. ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout) defer cancel() + defaultInterest := &peer.ChaincodeInterest{ + Chaincodes: []*peer.ChaincodeCall{{ + Name: chaincodeID, + }}, + } + + var endorsers []*endorser + var responses []*peer.ProposalResponse + if len(request.EndorsingOrganizations) > 0 { + // The client is specifying the endorsing orgs and taking responsibility for ensuring it meets the signature policy + endorsers, err = gs.registry.endorsersForOrgs(channel, chaincodeID, request.EndorsingOrganizations) + if err != nil { + return nil, status.Errorf(codes.Unavailable, "%s", err) + } + } else { + // The client is delegating choice of endorsers to the gateway. + + // 1. Choose an endorser from the gateway's organization + var firstEndorser *endorser + es, ok := gs.registry.endorsersByOrg(channel, chaincodeID)[gs.registry.localEndorser.mspid] + if !ok { + // No local org endorsers for this channel/chaincode. If transient data is involved, return error + if hasTransientData { + return nil, status.Error(codes.FailedPrecondition, "no endorsers found in the gateway's organization; retry specifying endorsing organization(s) to protect transient data") + } + // Otherwise, just let discovery pick one. + endorsers, err = gs.registry.endorsers(channel, defaultInterest, "") + if err != nil { + return nil, status.Errorf(codes.Unavailable, "%s", err) + } + firstEndorser = endorsers[0] + } else { + firstEndorser = es[0].endorser + } + + gs.logger.Debugw("Sending to first endorser:", "channel", channel, "chaincode", chaincodeID, "MSPID", firstEndorser.mspid, "endpoint", firstEndorser.address) + + // 2. Process the proposal on this endorser + firstResponse, err := firstEndorser.client.ProcessProposal(ctx, signedProposal) + if err != nil { + return nil, rpcError(codes.Aborted, "failed to endorse transaction", endpointError(firstEndorser, err)) + } + if firstResponse.Response.Status < 200 || firstResponse.Response.Status >= 400 { + return nil, rpcError(codes.Aborted, "failed to endorse transaction", endpointError(firstEndorser, fmt.Errorf("error %d, %s", firstResponse.Response.Status, firstResponse.Response.Message))) + } + + // 3. Extract ChaincodeInterest and SBE policies + // The chaincode interest could be nil for legacy peers and for chaincode functions that don't produce a read-write set + interest := firstResponse.Interest + if len(interest.GetChaincodes()) == 0 { + interest = defaultInterest + } + + // 4. If transient data is involved, then we need to ensure that discovery only returns orgs which own the collections involved. + // Do this by setting NoPrivateReads to false on each collection + if hasTransientData { + for _, call := range interest.GetChaincodes() { + call.NoPrivateReads = false + } + } + + // 5. Get a set of endorsers from discovery via the registry + // The preferred discovery layout will contain the firstEndorser's Org. + endorsers, err = gs.registry.endorsers(channel, interest, firstEndorser.mspid) + if err != nil { + return nil, status.Errorf(codes.Unavailable, "%s", err) + } + + // 6. Remove the gateway org's endorser, since we've already done that + for i, e := range endorsers { + if e.mspid == firstEndorser.mspid { + endorsers = append(endorsers[:i], endorsers[i+1:]...) + responses = append(responses, firstResponse) + break + } + } + } + var wg sync.WaitGroup responseCh := make(chan *endorserResponse, len(endorsers)) // send to all the endorsers @@ -111,6 +189,7 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp. wg.Add(1) go func(e *endorser) { defer wg.Done() + gs.logger.Debugw("Sending to endorser:", "channel", channel, "chaincode", chaincodeID, "MSPID", e.mspid, "endpoint", e.address) response, err := e.client.ProcessProposal(ctx, signedProposal) switch { case err != nil: @@ -129,7 +208,6 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp. wg.Wait() close(responseCh) - var responses []*peer.ProposalResponse var errorDetails []proto.Message for response := range responseCh { if response.err != nil { diff --git a/internal/pkg/gateway/api_test.go b/internal/pkg/gateway/api_test.go index eeb0a3cca41..ce030f8a98e 100644 --- a/internal/pkg/gateway/api_test.go +++ b/internal/pkg/gateway/api_test.go @@ -132,6 +132,8 @@ type testDef struct { policyErr error expectedResponse proto.Message expectedResponses []proto.Message + transientData map[string][]byte + interest *peer.ChaincodeInterest } type preparedTest struct { @@ -150,14 +152,14 @@ type preparedTest struct { type contextKey string var ( - localhostMock = &endorser{endpointConfig: &endpointConfig{address: "localhost:7051"}} - peer1Mock = &endorser{endpointConfig: &endpointConfig{address: "peer1:8051"}} - peer2Mock = &endorser{endpointConfig: &endpointConfig{address: "peer2:9051"}} - peer3Mock = &endorser{endpointConfig: &endpointConfig{address: "peer3:10051"}} - peer4Mock = &endorser{endpointConfig: &endpointConfig{address: "peer4:11051"}} - unavailable1Mock = &endorser{endpointConfig: &endpointConfig{address: "unavailable1:12051"}} - unavailable2Mock = &endorser{endpointConfig: &endpointConfig{address: "unavailable1:13051"}} - unavailable3Mock = &endorser{endpointConfig: &endpointConfig{address: "unavailable1:14051"}} + localhostMock = &endorser{endpointConfig: &endpointConfig{address: "localhost:7051", mspid: "msp1"}} + peer1Mock = &endorser{endpointConfig: &endpointConfig{address: "peer1:8051", mspid: "msp1"}} + peer2Mock = &endorser{endpointConfig: &endpointConfig{address: "peer2:9051", mspid: "msp2"}} + peer3Mock = &endorser{endpointConfig: &endpointConfig{address: "peer3:10051", mspid: "msp2"}} + peer4Mock = &endorser{endpointConfig: &endpointConfig{address: "peer4:11051", mspid: "msp3"}} + unavailable1Mock = &endorser{endpointConfig: &endpointConfig{address: "unavailable1:12051", mspid: "msp1"}} + unavailable2Mock = &endorser{endpointConfig: &endpointConfig{address: "unavailable1:13051", mspid: "msp1"}} + unavailable3Mock = &endorser{endpointConfig: &endpointConfig{address: "unavailable1:14051", mspid: "msp1"}} ) func TestEvaluate(t *testing.T) { @@ -342,24 +344,24 @@ func TestEndorse(t *testing.T) { { name: "two endorsers", plan: endorsementPlan{ - "g1": {{endorser: localhostMock, height: 3}}, - "g2": {{endorser: peer1Mock, height: 3}}, + "g1": {{endorser: localhostMock, height: 3}}, // msp1 + "g2": {{endorser: peer2Mock, height: 3}}, // msp2 }, - expectedEndorsers: []string{"localhost:7051", "peer1:8051"}, + expectedEndorsers: []string{"localhost:7051", "peer2:9051"}, }, { name: "three endorsers, two groups", plan: endorsementPlan{ - "g1": {{endorser: localhostMock, height: 4}}, - "g2": {{endorser: peer1Mock, height: 4}, {endorser: peer2Mock, height: 5}}, + "g1": {{endorser: localhostMock, height: 4}}, // msp1 + "g2": {{endorser: peer3Mock, height: 4}, {endorser: peer2Mock, height: 5}}, // msp2 }, expectedEndorsers: []string{"localhost:7051", "peer2:9051"}, }, { name: "multiple endorsers, two groups, prefer host peer", plan: endorsementPlan{ - "g1": {{endorser: peer3Mock, height: 4}, {endorser: localhostMock, height: 4}, {endorser: unavailable1Mock, height: 4}}, - "g2": {{endorser: peer1Mock, height: 4}, {endorser: peer2Mock, height: 5}}, + "g1": {{endorser: peer2Mock, height: 4}, {endorser: localhostMock, height: 4}, {endorser: unavailable1Mock, height: 4}}, // msp1 + "g2": {{endorser: peer3Mock, height: 4}, {endorser: peer2Mock, height: 5}}, // msp2 }, expectedEndorsers: []string{"localhost:7051", "peer2:9051"}, }, @@ -411,6 +413,77 @@ func TestEndorse(t *testing.T) { }, expectedEndorsers: []string{"localhost:7051", "peer4:11051"}, }, + { + name: "non-local endorsers", + plan: endorsementPlan{ + "g1": {{endorser: peer2Mock, height: 3}, {endorser: peer3Mock, height: 4}}, // msp2 + "g2": {{endorser: peer4Mock, height: 5}}, // msp3 + }, + layouts: []endorsementLayout{ + {"g1": 1, "g2": 1}, + }, + members: []networkMember{ + {"id2", "peer2:9051", "msp2", 3}, + {"id3", "peer3:10051", "msp2", 4}, + {"id4", "peer4:11051", "msp3", 5}, + }, + expectedEndorsers: []string{"peer3:10051", "peer4:11051"}, + }, + { + name: "local endorser is not in the endorsement plan", + plan: endorsementPlan{ + "g1": {{endorser: peer2Mock, height: 3}, {endorser: peer3Mock, height: 4}}, // msp2 + "g2": {{endorser: peer4Mock, height: 5}}, // msp3 + }, + layouts: []endorsementLayout{ + {"g1": 1, "g2": 1}, + }, + members: []networkMember{ + {"id1", "localhost:7051", "msp1", 3}, + {"id2", "peer2:9051", "msp2", 3}, + {"id3", "peer3:10051", "msp2", 4}, + {"id4", "peer4:11051", "msp3", 5}, + }, + expectedEndorsers: []string{"peer3:10051", "peer4:11051"}, + }, + { + name: "non-local endorsers with transient data will fail", + plan: endorsementPlan{ + "g1": {{endorser: peer2Mock, height: 3}, {endorser: peer3Mock, height: 4}}, // msp2 + "g2": {{endorser: peer4Mock, height: 5}}, // msp3 + }, + members: []networkMember{ + {"id2", "peer2:9051", "msp2", 3}, + {"id3", "peer3:10051", "msp2", 4}, + {"id4", "peer4:11051", "msp3", 5}, + }, + transientData: map[string][]byte{"transient-key": []byte("transient-value")}, + errString: "rpc error: code = FailedPrecondition desc = no endorsers found in the gateway's organization; retry specifying endorsing organization(s) to protect transient data", + }, + { + name: "extra endorsers with transient data", + plan: endorsementPlan{ + "g1": {{endorser: localhostMock, height: 4}, {endorser: peer1Mock, height: 4}}, // msp1 + "g2": {{endorser: peer4Mock, height: 5}}, // msp3 + }, + transientData: map[string][]byte{"transient-key": []byte("transient-value")}, + expectedEndorsers: []string{"localhost:7051", "peer4:11051"}, + }, + { + name: "non-local endorsers with transient data and set endorsing orgs", + plan: endorsementPlan{ + "g1": {{endorser: peer2Mock, height: 3}, {endorser: peer3Mock, height: 4}}, // msp2 + "g2": {{endorser: peer4Mock, height: 5}}, // msp3 + }, + members: []networkMember{ + {"id2", "peer2:9051", "msp2", 3}, + {"id3", "peer3:10051", "msp2", 4}, + {"id4", "peer4:11051", "msp3", 5}, + }, + endorsingOrgs: []string{"msp2", "msp3"}, + transientData: map[string][]byte{"transient-key": []byte("transient-value")}, + expectedEndorsers: []string{"peer3:10051", "peer4:11051"}, + }, { name: "endorse with multiple layouts - non-availability of peers fails on all layouts", plan: endorsementPlan{ @@ -425,16 +498,11 @@ func TestEndorse(t *testing.T) { }, errString: "failed to select a set of endorsers that satisfy the endorsement policy", }, - { - name: "no endorsers", - plan: endorsementPlan{}, - errString: "failed to assemble transaction: at least one proposal response is required", - }, { name: "non-matching responses", plan: endorsementPlan{ - "g1": {{endorser: localhostMock, height: 4}}, - "g2": {{endorser: peer1Mock, height: 5}}, + "g1": {{endorser: localhostMock, height: 4}}, // msp1 + "g2": {{endorser: peer2Mock, height: 5}}, // msp2 }, localResponse: "different_response", errString: "failed to assemble transaction: ProposalResponsePayloads do not match", @@ -461,7 +529,7 @@ func TestEndorse(t *testing.T) { } def.discovery.PeersForEndorsementReturns(ed, nil) }, - errString: "failed to assemble transaction", + errString: "failed to select a set of endorsers that satisfy the endorsement policy", }, { name: "discovery returns incomplete protos - nil state info", @@ -493,10 +561,29 @@ func TestEndorse(t *testing.T) { Message: "rpc error: code = Aborted desc = wibble", }}, }, + { + name: "local endorser succeeds, remote endorser fails", + plan: endorsementPlan{ + "g1": {{endorser: localhostMock, height: 1}}, + "g2": {{endorser: peer4Mock, height: 1}}, + }, + endpointDefinition: &endpointDef{ + proposalError: status.Error(codes.Aborted, "remote-wobble"), + }, + postSetup: func(t *testing.T, def *preparedTest) { + def.localEndorser.ProcessProposalReturns(createProposalResponse(t, localhostMock.address, "all_good", 200, ""), nil) + }, + errString: "failed to endorse transaction", + errDetails: []*pb.EndpointError{{ + Address: "peer4:11051", + MspId: "msp3", + Message: "rpc error: code = Aborted desc = remote-wobble", + }}, + }, { name: "process proposal chaincode error", plan: endorsementPlan{ - "g1": {{endorser: peer1Mock, height: 2}}, + "g1": {{endorser: localhostMock, height: 2}}, }, endpointDefinition: &endpointDef{ proposalResponseStatus: 400, @@ -504,11 +591,46 @@ func TestEndorse(t *testing.T) { }, errString: "rpc error: code = Aborted desc = failed to endorse transaction", errDetails: []*pb.EndpointError{{ - Address: "peer1:8051", + Address: "localhost:7051", MspId: "msp1", Message: "error 400, Mock chaincode error", }}, }, + { + name: "local endorser succeeds, remote endorser chaincode error", + plan: endorsementPlan{ + "g1": {{endorser: localhostMock, height: 1}}, + "g2": {{endorser: peer4Mock, height: 1}}, + }, + endpointDefinition: &endpointDef{ + proposalResponseStatus: 400, + proposalResponseMessage: "Mock chaincode error", + }, + postSetup: func(t *testing.T, def *preparedTest) { + def.localEndorser.ProcessProposalReturns(createProposalResponse(t, localhostMock.address, "all_good", 200, ""), nil) + }, + errString: "failed to endorse transaction", + errDetails: []*pb.EndpointError{{ + Address: "peer4:11051", + MspId: "msp3", + Message: "error 400, Mock chaincode error", + }}, + }, + { + name: "first endorser returns chaincode interest", + plan: endorsementPlan{ + "g1": {{endorser: localhostMock, height: 3}}, + "g2": {{endorser: peer2Mock, height: 3}}, + }, + interest: &peer.ChaincodeInterest{ + Chaincodes: []*peer.ChaincodeCall{{ + Name: testChaincode, + CollectionNames: []string{"mycollection1", "mycollection2"}, + NoPrivateReads: true, + }}, + }, + expectedEndorsers: []string{"localhost:7051", "peer2:9051"}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -527,45 +649,11 @@ func TestEndorse(t *testing.T) { // assert the preparedTxn is the payload from the proposal response require.Equal(t, []byte("mock_response"), response.Result.Payload, "Incorrect response") - // check the correct endorsers (mock) were called with the right parameters - checkEndorsers(t, tt.expectedEndorsers, test) - - // check the prepare transaction (Envelope) contains the right number of endorsements - payload, err := protoutil.UnmarshalPayload(response.PreparedTransaction.Payload) - require.NoError(t, err) - txn, err := protoutil.UnmarshalTransaction(payload.Data) - require.NoError(t, err) - cap, err := protoutil.UnmarshalChaincodeActionPayload(txn.Actions[0].Payload) - require.NoError(t, err) - endorsements := cap.Action.Endorsements - expectedLen := len(tt.expectedEndorsers) - require.Len(t, endorsements, expectedLen) - - // check the discovery service (mock) was invoked as expected - expectedChannel := common.ChannelID(testChannel) - expectedInterest := &peer.ChaincodeInterest{ - Chaincodes: []*peer.ChaincodeCall{{ - Name: testChaincode, - }}, - } - if tt.endorsingOrgs != nil { - require.Equal(t, 2, test.discovery.PeersOfChannelCallCount()) - channel := test.discovery.PeersOfChannelArgsForCall(0) - require.Equal(t, expectedChannel, channel) - channel = test.discovery.PeersOfChannelArgsForCall(1) - require.Equal(t, expectedChannel, channel) - } else { - require.Equal(t, 1, test.discovery.PeersForEndorsementCallCount()) - channel, interest := test.discovery.PeersForEndorsementArgsForCall(0) - require.Equal(t, expectedChannel, channel) - require.Equal(t, expectedInterest, interest) - - require.Equal(t, 1, test.discovery.PeersOfChannelCallCount()) - channel = test.discovery.PeersOfChannelArgsForCall(0) - require.Equal(t, expectedChannel, channel) - } + // check the generated transaction envelope contains the correct endorsements + checkTransaction(t, tt.expectedEndorsers, response.PreparedTransaction) - require.Equal(t, 1, test.discovery.IdentityInfoCallCount()) + // check the correct endorsers (mocks) were called with the right parameters + checkEndorsers(t, tt.expectedEndorsers, test) }) } } @@ -1005,7 +1093,7 @@ func TestNilArgs(t *testing.T) { require.ErrorIs(t, err, status.Error(codes.InvalidArgument, "the proposed transaction must contain a signed proposal")) _, err = server.Endorse(ctx, &pb.EndorseRequest{ProposedTransaction: &peer.SignedProposal{ProposalBytes: []byte("jibberish")}}) - require.ErrorIs(t, err, status.Error(codes.InvalidArgument, "failed to unpack transaction proposal: error unmarshalling Proposal: unexpected EOF")) + require.ErrorContains(t, err, "rpc error: code = InvalidArgument desc = failed to unpack transaction proposal: error unmarshalling Proposal") _, err = server.Submit(ctx, nil) require.ErrorIs(t, err, status.Error(codes.InvalidArgument, "a submit request is required")) @@ -1032,7 +1120,7 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { if epDef.proposalError != nil { localEndorser.ProcessProposalReturns(nil, epDef.proposalError) } else { - localEndorser.ProcessProposalReturns(createProposalResponse(t, localResponse, 200, ""), nil) + localEndorser.ProcessProposalReturns(createProposalResponseWithInterest(t, localhostMock.address, localResponse, epDef.proposalResponseStatus, epDef.proposalResponseMessage, tt.interest), nil) } mockSigner := &idmocks.SignerSerializer{} @@ -1052,7 +1140,7 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { mockPolicy := &mocks.ACLChecker{} mockPolicy.CheckACLReturns(tt.policyErr) - validProposal := createProposal(t, testChannel, testChaincode) + validProposal := createProposal(t, testChannel, testChaincode, tt.transientData) validSignedProposal, err := protoutil.GetSignedProposal(validProposal, mockSigner) require.NoError(t, err) @@ -1154,6 +1242,25 @@ func checkEndorsers(t *testing.T, endorsers []string, test *preparedTest) { } } +func checkTransaction(t *testing.T, expectedEndorsers []string, transaction *cp.Envelope) { + // check the prepared transaction contains the correct endorsements + var actualEndorsers []string + + payload, err := protoutil.UnmarshalPayload(transaction.GetPayload()) + require.NoError(t, err) + txn, err := protoutil.UnmarshalTransaction(payload.GetData()) + require.NoError(t, err) + for _, action := range txn.GetActions() { + cap, err := protoutil.UnmarshalChaincodeActionPayload(action.GetPayload()) + require.NoError(t, err) + for _, endorsement := range cap.GetAction().GetEndorsements() { + actualEndorsers = append(actualEndorsers, string(endorsement.GetEndorser())) + } + } + + require.ElementsMatch(t, expectedEndorsers, actualEndorsers) +} + func mockDiscovery(t *testing.T, plan endorsementPlan, layouts []endorsementLayout, members []networkMember, config *dp.ConfigResult) *mocks.Discovery { discovery := &mocks.Discovery{} @@ -1182,7 +1289,7 @@ func createMockEndorsementDescriptor(t *testing.T, plan endorsementPlan, layouts quantitiesByGroup[group] = 1 // for now var peers []*dp.Peer for _, endorser := range endorsers { - peers = append(peers, createMockPeer(t, endorser.endorser.address, endorser.height)) + peers = append(peers, createMockPeer(t, &endorser)) } endorsersByGroups[group] = &dp.Peers{Peers: peers} } @@ -1203,12 +1310,12 @@ func createMockEndorsementDescriptor(t *testing.T, plan endorsementPlan, layouts return descriptor } -func createMockPeer(t *testing.T, name string, ledgerHeight uint64) *dp.Peer { +func createMockPeer(t *testing.T, endorser *endorserState) *dp.Peer { aliveMsgBytes, err := proto.Marshal( &gossip.GossipMessage{ Content: &gossip.GossipMessage_AliveMsg{ AliveMsg: &gossip.AliveMessage{ - Membership: &gossip.Member{Endpoint: name}, + Membership: &gossip.Member{Endpoint: endorser.endorser.address}, }, }, }) @@ -1220,7 +1327,7 @@ func createMockPeer(t *testing.T, name string, ledgerHeight uint64) *dp.Peer { Content: &gossip.GossipMessage_StateInfo{ StateInfo: &gossip.StateInfo{ Properties: &gossip.Properties{ - LedgerHeight: ledgerHeight, + LedgerHeight: endorser.height, }, }, }, @@ -1235,19 +1342,23 @@ func createMockPeer(t *testing.T, name string, ledgerHeight uint64) *dp.Peer { MembershipInfo: &gossip.Envelope{ Payload: aliveMsgBytes, }, - Identity: []byte(name), + Identity: marshal(&msp.SerializedIdentity{ + IdBytes: []byte(endorser.endorser.address), + Mspid: endorser.endorser.mspid, + }, t), } } func createEndpointFactory(t *testing.T, definition *endpointDef, dialer dialer) *endpointFactory { + var endpoint string return &endpointFactory{ timeout: 5 * time.Second, - connectEndorser: func(_ *grpc.ClientConn) peer.EndorserClient { + connectEndorser: func(conn *grpc.ClientConn) peer.EndorserClient { e := &mocks.EndorserClient{} if definition.proposalError != nil { e.ProcessProposalReturns(nil, definition.proposalError) } else { - e.ProcessProposalReturns(createProposalResponse(t, definition.proposalResponseValue, definition.proposalResponseStatus, definition.proposalResponseMessage), nil) + e.ProcessProposalReturns(createProposalResponse(t, endpoint, definition.proposalResponseValue, definition.proposalResponseStatus, definition.proposalResponseMessage), nil) } return e }, @@ -1266,11 +1377,14 @@ func createEndpointFactory(t *testing.T, definition *endpointDef, dialer dialer) abc.BroadcastReturns(abbc, nil) return abc }, - dialer: dialer, + dialer: func(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + endpoint = target + return dialer(ctx, target, opts...) + }, } } -func createProposal(t *testing.T, channel string, chaincode string, args ...[]byte) *peer.Proposal { +func createProposal(t *testing.T, channel string, chaincode string, transient map[string][]byte, args ...[]byte) *peer.Proposal { invocationSpec := &peer.ChaincodeInvocationSpec{ ChaincodeSpec: &peer.ChaincodeSpec{ Type: peer.ChaincodeSpec_NODE, @@ -1279,11 +1393,12 @@ func createProposal(t *testing.T, channel string, chaincode string, args ...[]by }, } - proposal, _, err := protoutil.CreateChaincodeProposal( + proposal, _, err := protoutil.CreateChaincodeProposalWithTransient( cp.HeaderType_ENDORSER_TRANSACTION, channel, invocationSpec, []byte{}, + transient, ) require.NoError(t, err, "Failed to create the proposal") @@ -1291,7 +1406,7 @@ func createProposal(t *testing.T, channel string, chaincode string, args ...[]by return proposal } -func createProposalResponse(t *testing.T, value string, status int32, errMessage string) *peer.ProposalResponse { +func createProposalResponse(t *testing.T, endorser, value string, status int32, errMessage string) *peer.ProposalResponse { response := &peer.Response{ Status: status, Payload: []byte(value), @@ -1304,7 +1419,9 @@ func createProposalResponse(t *testing.T, value string, status int32, errMessage ProposalHash: []byte{}, Extension: marshal(action, t), } - endorsement := &peer.Endorsement{} + endorsement := &peer.Endorsement{ + Endorser: []byte(endorser), + } return &peer.ProposalResponse{ Payload: marshal(payload, t), @@ -1313,6 +1430,14 @@ func createProposalResponse(t *testing.T, value string, status int32, errMessage } } +func createProposalResponseWithInterest(t *testing.T, endorser, value string, status int32, errMessage string, interest *peer.ChaincodeInterest) *peer.ProposalResponse { + response := createProposalResponse(t, endorser, value, status, errMessage) + if interest != nil { + response.Interest = interest + } + return response +} + func marshal(msg proto.Message, t *testing.T) []byte { buf, err := proto.Marshal(msg) require.NoError(t, err, "Failed to marshal message") diff --git a/internal/pkg/gateway/apiutils.go b/internal/pkg/gateway/apiutils.go index a8c60487504..e37a40fc1f9 100644 --- a/internal/pkg/gateway/apiutils.go +++ b/internal/pkg/gateway/apiutils.go @@ -19,7 +19,7 @@ import ( func getTransactionResponse(response *peer.ProposalResponse) (*peer.Response, error) { var retVal *peer.Response - if response != nil && response.Payload != nil { + if response.GetPayload() != nil { payload, err := protoutil.UnmarshalProposalResponsePayload(response.Payload) if err != nil { return nil, err @@ -30,7 +30,7 @@ func getTransactionResponse(response *peer.ProposalResponse) (*peer.Response, er return nil, err } - if extension != nil && extension.Response != nil { + if extension.GetResponse() != nil { if extension.Response.Status < 200 || extension.Response.Status >= 400 { return nil, fmt.Errorf("error %d, %s", extension.Response.Status, extension.Response.Message) } @@ -41,32 +41,32 @@ func getTransactionResponse(response *peer.ProposalResponse) (*peer.Response, er return retVal, nil } -func getChannelAndChaincodeFromSignedProposal(signedProposal *peer.SignedProposal) (string, string, error) { +func getChannelAndChaincodeFromSignedProposal(signedProposal *peer.SignedProposal) (string, string, bool, error) { if signedProposal == nil { - return "", "", fmt.Errorf("a signed proposal is required") + return "", "", false, fmt.Errorf("a signed proposal is required") } proposal, err := protoutil.UnmarshalProposal(signedProposal.ProposalBytes) if err != nil { - return "", "", err + return "", "", false, err } header, err := protoutil.UnmarshalHeader(proposal.Header) if err != nil { - return "", "", err + return "", "", false, err } channelHeader, err := protoutil.UnmarshalChannelHeader(header.ChannelHeader) if err != nil { - return "", "", err + return "", "", false, err } payload, err := protoutil.UnmarshalChaincodeProposalPayload(proposal.Payload) if err != nil { - return "", "", err + return "", "", false, err } spec, err := protoutil.UnmarshalChaincodeInvocationSpec(payload.Input) if err != nil { - return "", "", err + return "", "", false, err } - return channelHeader.ChannelId, spec.ChaincodeSpec.ChaincodeId.Name, nil + return channelHeader.ChannelId, spec.ChaincodeSpec.ChaincodeId.Name, len(payload.TransientMap) > 0, nil } func rpcError(code codes.Code, message string, details ...proto.Message) error { diff --git a/internal/pkg/gateway/gateway.go b/internal/pkg/gateway/gateway.go index aee8bf58a9e..be957321241 100644 --- a/internal/pkg/gateway/gateway.go +++ b/internal/pkg/gateway/gateway.go @@ -26,6 +26,7 @@ type Server struct { eventer Eventer policy ACLChecker options config.Options + logger *flogging.FabricLogger } type EndorserServerAdapter struct { @@ -86,6 +87,7 @@ func newServer(localEndorser peerproto.EndorserClient, discovery Discovery, find eventer: eventer, policy: policy, options: options, + logger: logger, } return gwServer diff --git a/internal/pkg/gateway/registry.go b/internal/pkg/gateway/registry.go index 63082565f5c..ea359761320 100644 --- a/internal/pkg/gateway/registry.go +++ b/internal/pkg/gateway/registry.go @@ -13,21 +13,22 @@ import ( "strings" "sync" + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/golang/protobuf/proto" dp "github.com/hyperledger/fabric-protos-go/discovery" "github.com/hyperledger/fabric-protos-go/gossip" - "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/flogging" gossipapi "github.com/hyperledger/fabric/gossip/api" - "github.com/hyperledger/fabric/gossip/common" + gossipcommon "github.com/hyperledger/fabric/gossip/common" gossipdiscovery "github.com/hyperledger/fabric/gossip/discovery" ) type Discovery interface { Config(channel string) (*dp.ConfigResult, error) IdentityInfo() gossipapi.PeerIdentitySet - PeersForEndorsement(channel common.ChannelID, interest *peer.ChaincodeInterest) (*dp.EndorsementDescriptor, error) - PeersOfChannel(common.ChannelID) gossipdiscovery.Members + PeersForEndorsement(channel gossipcommon.ChannelID, interest *peer.ChaincodeInterest) (*dp.EndorsementDescriptor, error) + PeersOfChannel(gossipcommon.ChannelID) gossipdiscovery.Members } type registry struct { @@ -49,44 +50,38 @@ type endorserState struct { } // Returns a set of endorsers that satisfies the endorsement plan for the given chaincode on a channel. -func (reg *registry) endorsers(channel string, chaincode string) ([]*endorser, error) { - err := reg.registerChannel(channel) - if err != nil { - return nil, err - } - +func (reg *registry) endorsers(channel string, interest *peer.ChaincodeInterest, preferOrg string) ([]*endorser, error) { var endorsers []*endorser + var reserveEndorsers []*endorser - interest := &peer.ChaincodeInterest{ - Chaincodes: []*peer.ChaincodeCall{{ - Name: chaincode, - }}, - } - - descriptor, err := reg.discovery.PeersForEndorsement(common.ChannelID(channel), interest) + descriptor, err := reg.discovery.PeersForEndorsement(gossipcommon.ChannelID(channel), interest) if err != nil { - return nil, err + logger.Errorw("PeersForEndorsement failed.", "error", err, "channel", channel, "ChaincodeInterest", proto.MarshalTextString(interest)) + return nil, fmt.Errorf("discovery service failed to build endorsement plan: %s", err) } + layouts := descriptor.GetLayouts() + reg.configLock.RLock() defer reg.configLock.RUnlock() - for _, layout := range descriptor.GetLayouts() { + for _, layout := range layouts { var receivers []*endorserState // The set of peers the client needs to request endorsements from abandonLayout := false + hasPreferredOrg := false for group, quantity := range layout.GetQuantitiesByGroup() { // Select n remoteEndorsers from each group sorted by block height - - // block heights var groupPeers []*endorserState for _, peer := range descriptor.GetEndorsersByGroups()[group].GetPeers() { + // extract block height msg := &gossip.GossipMessage{} - err := proto.Unmarshal(peer.GetStateInfo().GetPayload(), msg) + err = proto.Unmarshal(peer.GetStateInfo().GetPayload(), msg) if err != nil { return nil, err } - height := msg.GetStateInfo().GetProperties().GetLedgerHeight() + + // extract endpoint err = proto.Unmarshal(peer.GetMembershipInfo().GetPayload(), msg) if err != nil { return nil, err @@ -104,6 +99,10 @@ func (reg *registry) endorsers(channel string, chaincode string) ([]*endorser, e continue } + if endorser.mspid == preferOrg { + hasPreferredOrg = true + } + groupPeers = append(groupPeers, &endorserState{peer: peer, endorser: endorser, height: height}) } @@ -116,6 +115,7 @@ func (reg *registry) endorsers(channel string, chaincode string) ([]*endorser, e // sort by decreasing height sort.Slice(groupPeers, sorter(groupPeers, reg.localEndorser.address)) + // put the local org peers at the head of the slice receivers = append(receivers, groupPeers[0:quantity]...) } @@ -127,19 +127,29 @@ func (reg *registry) endorsers(channel string, chaincode string) ([]*endorser, e for _, peer := range receivers { endorsers = append(endorsers, peer.endorser) } + + // if this plan doesn't contain the `preferOrg` org, abandon it in favour of one that does, since we already have a local endorsement + // but save it in reserve in case there are no layouts with the local org + if preferOrg != "" && !hasPreferredOrg { + if reserveEndorsers == nil { + reserveEndorsers = endorsers + } + // try the next layout + continue + } + return endorsers, nil } + if reserveEndorsers != nil { + return reserveEndorsers, nil + } + return nil, fmt.Errorf("failed to select a set of endorsers that satisfy the endorsement policy") } // endorsersForOrgs returns a set of endorsers owned by the given orgs for the given chaincode on a channel. func (reg *registry) endorsersForOrgs(channel string, chaincode string, endorsingOrgs []string) ([]*endorser, error) { - err := reg.registerChannel(channel) - if err != nil { - return nil, err - } - endorsersByOrg := reg.endorsersByOrg(channel, chaincode) var endorsers []*endorser @@ -161,7 +171,7 @@ func (reg *registry) endorsersForOrgs(channel string, chaincode string, endorsin func (reg *registry) endorsersByOrg(channel string, chaincode string) map[string][]*endorserState { endorsersByOrg := make(map[string][]*endorserState) - members := reg.discovery.PeersOfChannel(common.ChannelID(channel)) + members := reg.discovery.PeersOfChannel(gossipcommon.ChannelID(channel)) reg.configLock.RLock() defer reg.configLock.RUnlock() @@ -202,12 +212,8 @@ func (reg *registry) endorsersByOrg(channel string, chaincode string) map[string // evaluator returns a single endorser, preferably from local org, if available // targetOrgs specifies the orgs that are allowed receive the request, due to private data restrictions func (reg *registry) evaluator(channel string, chaincode string, targetOrgs []string) (*endorser, error) { - err := reg.registerChannel(channel) - if err != nil { - return nil, err - } - endorsersByOrg := reg.endorsersByOrg(channel, chaincode) + // If no targetOrgs are specified (i.e. no restrictions), then populate with all available orgs if len(targetOrgs) == 0 { for org := range endorsersByOrg { @@ -319,9 +325,9 @@ func (reg *registry) registerChannel(channel string) error { // get the remoteEndorsers for the channel peers := map[string]string{} - members := reg.discovery.PeersOfChannel(common.ChannelID(channel)) + members := reg.discovery.PeersOfChannel(gossipcommon.ChannelID(channel)) for _, member := range members { - id := member.PKIid.String() // TODO this is fragile + id := member.PKIid.String() peers[id] = member.PreferredEndpoint() } for mspid, infoset := range reg.discovery.IdentityInfo().ByOrg() { diff --git a/protoutil/unmarshalers.go b/protoutil/unmarshalers.go index cc634af60dc..c72f64891f8 100644 --- a/protoutil/unmarshalers.go +++ b/protoutil/unmarshalers.go @@ -9,6 +9,8 @@ package protoutil import ( "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/ledger/rwset" + "github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset" "github.com/hyperledger/fabric-protos-go/msp" "github.com/hyperledger/fabric-protos-go/peer" "github.com/pkg/errors" @@ -156,6 +158,27 @@ func UnmarshalChaincodeProposalPayload(bytes []byte) (*peer.ChaincodeProposalPay return cpp, errors.Wrap(err, "error unmarshalling ChaincodeProposalPayload") } +// UnmarshalTxReadWriteSet unmarshals bytes to a TxReadWriteSet +func UnmarshalTxReadWriteSet(bytes []byte) (*rwset.TxReadWriteSet, error) { + rws := &rwset.TxReadWriteSet{} + err := proto.Unmarshal(bytes, rws) + return rws, errors.Wrap(err, "error unmarshalling TxReadWriteSet") +} + +// UnmarshalKVRWSet unmarshals bytes to a KVRWSet +func UnmarshalKVRWSet(bytes []byte) (*kvrwset.KVRWSet, error) { + rws := &kvrwset.KVRWSet{} + err := proto.Unmarshal(bytes, rws) + return rws, errors.Wrap(err, "error unmarshalling KVRWSet") +} + +// UnmarshalHashedRWSet unmarshals bytes to a HashedRWSet +func UnmarshalHashedRWSet(bytes []byte) (*kvrwset.HashedRWSet, error) { + hrws := &kvrwset.HashedRWSet{} + err := proto.Unmarshal(bytes, hrws) + return hrws, errors.Wrap(err, "error unmarshalling HashedRWSet") +} + // UnmarshalSignaturePolicy unmarshals bytes to a SignaturePolicyEnvelope func UnmarshalSignaturePolicy(bytes []byte) (*common.SignaturePolicyEnvelope, error) { sp := &common.SignaturePolicyEnvelope{}