diff --git a/core/chaincode/chaincode_support.go b/core/chaincode/chaincode_support.go index a75320864fa..4858337bfb5 100644 --- a/core/chaincode/chaincode_support.go +++ b/core/chaincode/chaincode_support.go @@ -17,6 +17,7 @@ import ( "github.com/hyperledger/fabric/core/common/sysccprovider" "github.com/hyperledger/fabric/core/container/ccintf" "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/core/peer" pb "github.com/hyperledger/fabric/protos/peer" "github.com/pkg/errors" @@ -178,7 +179,7 @@ func (cs *ChaincodeSupport) HandleChaincodeStream(stream ccintf.ChaincodeStream) SystemCCProvider: cs.SystemCCProvider, SystemCCVersion: util.GetSysCCVersion(), InstantiationPolicyChecker: CheckInstantiationPolicyFunc(ccprovider.CheckInstantiationPolicy), - QueryResponseBuilder: &QueryResponseGenerator{MaxResultLimit: 100}, + QueryResponseBuilder: &QueryResponseGenerator{MaxResultLimit: 100, TotalQueryLimit: ledgerconfig.GetTotalQueryLimit()}, UUIDGenerator: UUIDGeneratorFunc(util.GenerateUUID), LedgerGetter: peer.Default, AppConfig: cs.appConfig, diff --git a/core/chaincode/exectransaction_test.go b/core/chaincode/exectransaction_test.go index 00028e011b2..7088de4d583 100644 --- a/core/chaincode/exectransaction_test.go +++ b/core/chaincode/exectransaction_test.go @@ -16,6 +16,7 @@ import ( "net" "os" "path/filepath" + "reflect" "runtime" "strconv" "strings" @@ -1129,6 +1130,56 @@ func TestQueries(t *testing.T) { return } + type PageResponse struct { + Bookmark string `json:"bookmark"` + Keys []string `json:"keys"` + } + + //The following range query for "marble001" to "marble011" should return 10 marbles, in pages of 2 + f = "keysByPage" + args = util.ToChaincodeArgs(f, "marble001", "marble011", "2", "") + + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, retval, err = invoke(chainID, spec, nextBlockNumber, nil, chaincodeSupport) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + return + } + queryPage := &PageResponse{} + + json.Unmarshal(retval, &queryPage) + + expectedResult := []string{"marble001", "marble002"} + + if !reflect.DeepEqual(expectedResult, queryPage.Keys) { + t.Fail() + t.Logf("Error detected with the paginated range query. Returned: %v should have returned: %v", queryPage.Keys, expectedResult) + return + } + + // query for the next page + args = util.ToChaincodeArgs(f, "marble001", "marble011", "2", queryPage.Bookmark) + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, retval, err = invoke(chainID, spec, nextBlockNumber, nil, chaincodeSupport) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + return + } + + json.Unmarshal(retval, &queryPage) + + expectedResult = []string{"marble003", "marble004"} + + if !reflect.DeepEqual(expectedResult, queryPage.Keys) { + t.Fail() + t.Logf("Error detected with the paginated range query second page. Returned: %v should have returned: %v %v", queryPage.Keys, expectedResult, queryPage.Bookmark) + return + } + // ExecuteQuery supported only for CouchDB and // query limits apply for CouchDB range and rich queries only if ledgerconfig.IsCouchDBEnabled() == true { @@ -1236,6 +1287,55 @@ func TestQueries(t *testing.T) { return } + //The following rich query should return 5 marbles due to the queryLimit + f = "queryByPage" + args = util.ToChaincodeArgs(f, "{\"selector\":{\"owner\":\"jerry\"}}", "2", "") + + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, retval, err = invoke(chainID, spec, nextBlockNumber, nil, chaincodeSupport) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + return + } + + queryPage := &PageResponse{} + + json.Unmarshal(retval, &queryPage) + + expectedResult := []string{"marble001", "marble003"} + + if !reflect.DeepEqual(expectedResult, queryPage.Keys) { + t.Fail() + t.Logf("Error detected with the paginated range query. Returned: %v should have returned: %v", queryPage.Keys, expectedResult) + return + } + + // set args for the next page + args = util.ToChaincodeArgs(f, "{\"selector\":{\"owner\":\"jerry\"}}", "2", queryPage.Bookmark) + + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, retval, err = invoke(chainID, spec, nextBlockNumber, nil, chaincodeSupport) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + return + } + + queryPage = &PageResponse{} + + json.Unmarshal(retval, &queryPage) + + expectedResult = []string{"marble005", "marble007"} + + if !reflect.DeepEqual(expectedResult, queryPage.Keys) { + t.Fail() + t.Logf("Error detected with the paginated range query. Returned: %v should have returned: %v", queryPage.Keys, expectedResult) + return + } + } // modifications for history query diff --git a/core/chaincode/fake/query_response_builder.go b/core/chaincode/fake/query_response_builder.go index aa9ee9b9f96..e5969acf4a5 100644 --- a/core/chaincode/fake/query_response_builder.go +++ b/core/chaincode/fake/query_response_builder.go @@ -5,17 +5,19 @@ import ( "sync" commonledger "github.com/hyperledger/fabric/common/ledger" - chaincode_test "github.com/hyperledger/fabric/core/chaincode" + "github.com/hyperledger/fabric/core/chaincode" pb "github.com/hyperledger/fabric/protos/peer" ) type QueryResponseBuilder struct { - BuildQueryResponseStub func(txContext *chaincode_test.TransactionContext, iter commonledger.ResultsIterator, iterID string) (*pb.QueryResponse, error) + BuildQueryResponseStub func(txContext *chaincode.TransactionContext, iter commonledger.ResultsIterator, iterID string, isPaginated bool, totalReturnLimit int32) (*pb.QueryResponse, error) buildQueryResponseMutex sync.RWMutex buildQueryResponseArgsForCall []struct { - txContext *chaincode_test.TransactionContext - iter commonledger.ResultsIterator - iterID string + txContext *chaincode.TransactionContext + iter commonledger.ResultsIterator + iterID string + isPaginated bool + totalReturnLimit int32 } buildQueryResponseReturns struct { result1 *pb.QueryResponse @@ -29,18 +31,20 @@ type QueryResponseBuilder struct { invocationsMutex sync.RWMutex } -func (fake *QueryResponseBuilder) BuildQueryResponse(txContext *chaincode_test.TransactionContext, iter commonledger.ResultsIterator, iterID string) (*pb.QueryResponse, error) { +func (fake *QueryResponseBuilder) BuildQueryResponse(txContext *chaincode.TransactionContext, iter commonledger.ResultsIterator, iterID string, isPaginated bool, totalReturnLimit int32) (*pb.QueryResponse, error) { fake.buildQueryResponseMutex.Lock() ret, specificReturn := fake.buildQueryResponseReturnsOnCall[len(fake.buildQueryResponseArgsForCall)] fake.buildQueryResponseArgsForCall = append(fake.buildQueryResponseArgsForCall, struct { - txContext *chaincode_test.TransactionContext - iter commonledger.ResultsIterator - iterID string - }{txContext, iter, iterID}) - fake.recordInvocation("BuildQueryResponse", []interface{}{txContext, iter, iterID}) + txContext *chaincode.TransactionContext + iter commonledger.ResultsIterator + iterID string + isPaginated bool + totalReturnLimit int32 + }{txContext, iter, iterID, isPaginated, totalReturnLimit}) + fake.recordInvocation("BuildQueryResponse", []interface{}{txContext, iter, iterID, isPaginated, totalReturnLimit}) fake.buildQueryResponseMutex.Unlock() if fake.BuildQueryResponseStub != nil { - return fake.BuildQueryResponseStub(txContext, iter, iterID) + return fake.BuildQueryResponseStub(txContext, iter, iterID, isPaginated, totalReturnLimit) } if specificReturn { return ret.result1, ret.result2 @@ -54,10 +58,10 @@ func (fake *QueryResponseBuilder) BuildQueryResponseCallCount() int { return len(fake.buildQueryResponseArgsForCall) } -func (fake *QueryResponseBuilder) BuildQueryResponseArgsForCall(i int) (*chaincode_test.TransactionContext, commonledger.ResultsIterator, string) { +func (fake *QueryResponseBuilder) BuildQueryResponseArgsForCall(i int) (*chaincode.TransactionContext, commonledger.ResultsIterator, string, bool, int32) { fake.buildQueryResponseMutex.RLock() defer fake.buildQueryResponseMutex.RUnlock() - return fake.buildQueryResponseArgsForCall[i].txContext, fake.buildQueryResponseArgsForCall[i].iter, fake.buildQueryResponseArgsForCall[i].iterID + return fake.buildQueryResponseArgsForCall[i].txContext, fake.buildQueryResponseArgsForCall[i].iter, fake.buildQueryResponseArgsForCall[i].iterID, fake.buildQueryResponseArgsForCall[i].isPaginated, fake.buildQueryResponseArgsForCall[i].totalReturnLimit } func (fake *QueryResponseBuilder) BuildQueryResponseReturns(result1 *pb.QueryResponse, result2 error) { diff --git a/core/chaincode/handler.go b/core/chaincode/handler.go index 2f512b146f8..1fdb00e7985 100644 --- a/core/chaincode/handler.go +++ b/core/chaincode/handler.go @@ -22,6 +22,7 @@ import ( "github.com/hyperledger/fabric/core/common/sysccprovider" "github.com/hyperledger/fabric/core/container/ccintf" "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" pb "github.com/hyperledger/fabric/protos/peer" "github.com/pkg/errors" ) @@ -82,7 +83,8 @@ func (c CheckInstantiationPolicyFunc) CheckInstantiationPolicy(name, version str // QueryResponseBuilder is responsible for building QueryResponse messages for query // transactions initiated by chaincode. type QueryResponseBuilder interface { - BuildQueryResponse(txContext *TransactionContext, iter commonledger.ResultsIterator, iterID string) (*pb.QueryResponse, error) + BuildQueryResponse(txContext *TransactionContext, iter commonledger.ResultsIterator, + iterID string, isPaginated bool, totalReturnLimit int32) (*pb.QueryResponse, error) } // ChaincodeDefinitionGetter is responsible for retrieving a chaincode definition @@ -643,21 +645,49 @@ func (h *Handler) HandleGetStateByRange(msg *pb.ChaincodeMessage, txContext *Tra return nil, errors.Wrap(err, "unmarshal failed") } + metadata, err := getQueryMetadataFromBytes(getStateByRange.Metadata) + if err != nil { + return nil, err + } + + totalReturnLimit := calculateTotalReturnLimit(metadata) + iterID := h.UUIDGenerator.New() chaincodeName := h.ChaincodeName() var rangeIter commonledger.ResultsIterator + var paginationInfo map[string]interface{} + + isPaginated := false + if isCollectionSet(getStateByRange.Collection) { - rangeIter, err = txContext.TXSimulator.GetPrivateDataRangeScanIterator(chaincodeName, getStateByRange.Collection, getStateByRange.StartKey, getStateByRange.EndKey) + rangeIter, err = txContext.TXSimulator.GetPrivateDataRangeScanIterator(chaincodeName, getStateByRange.Collection, + getStateByRange.StartKey, getStateByRange.EndKey) + } else if isMetadataSetForPagination(metadata) { + paginationInfo, err = createPaginationInfoFromMetadata(metadata, totalReturnLimit, pb.ChaincodeMessage_GET_STATE_BY_RANGE) + if err != nil { + return nil, err + } + isPaginated = true + + startKey := getStateByRange.StartKey + + if isMetadataSetForPagination(metadata) { + if metadata.Bookmark != "" { + startKey = metadata.Bookmark + } + } + rangeIter, err = txContext.TXSimulator.GetStateRangeScanIteratorWithMetadata(chaincodeName, + startKey, getStateByRange.EndKey, paginationInfo) } else { rangeIter, err = txContext.TXSimulator.GetStateRangeScanIterator(chaincodeName, getStateByRange.StartKey, getStateByRange.EndKey) } if err != nil { return nil, errors.WithStack(err) } - txContext.InitializeQueryContext(iterID, rangeIter) - payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, rangeIter, iterID) + + payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, rangeIter, iterID, isPaginated, totalReturnLimit) if err != nil { txContext.CleanupQueryContext(iterID) return nil, errors.WithStack(err) @@ -686,7 +716,9 @@ func (h *Handler) HandleQueryStateNext(msg *pb.ChaincodeMessage, txContext *Tran return nil, errors.New("query iterator not found") } - payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, queryIter, queryStateNext.Id) + totalReturnLimit := calculateTotalReturnLimit(nil) + + payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, queryIter, queryStateNext.Id, false, totalReturnLimit) if err != nil { txContext.CleanupQueryContext(queryStateNext.Id) return nil, errors.WithStack(err) @@ -734,9 +766,28 @@ func (h *Handler) HandleGetQueryResult(msg *pb.ChaincodeMessage, txContext *Tran return nil, errors.Wrap(err, "unmarshal failed") } + metadata, err := getQueryMetadataFromBytes(getQueryResult.Metadata) + if err != nil { + return nil, err + } + + totalReturnLimit := calculateTotalReturnLimit(metadata) + isPaginated := false + var executeIter commonledger.ResultsIterator + var paginationInfo map[string]interface{} + if isCollectionSet(getQueryResult.Collection) { executeIter, err = txContext.TXSimulator.ExecuteQueryOnPrivateData(chaincodeName, getQueryResult.Collection, getQueryResult.Query) + } else if isMetadataSetForPagination(metadata) { + paginationInfo, err = createPaginationInfoFromMetadata(metadata, totalReturnLimit, pb.ChaincodeMessage_GET_QUERY_RESULT) + if err != nil { + return nil, err + } + isPaginated = true + executeIter, err = txContext.TXSimulator.ExecuteQueryWithMetadata(chaincodeName, + getQueryResult.Query, paginationInfo) + } else { executeIter, err = txContext.TXSimulator.ExecuteQuery(chaincodeName, getQueryResult.Query) } @@ -746,7 +797,7 @@ func (h *Handler) HandleGetQueryResult(msg *pb.ChaincodeMessage, txContext *Tran txContext.InitializeQueryContext(iterID, executeIter) - payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, executeIter, iterID) + payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, executeIter, iterID, isPaginated, totalReturnLimit) if err != nil { txContext.CleanupQueryContext(iterID) return nil, errors.WithStack(err) @@ -778,8 +829,10 @@ func (h *Handler) HandleGetHistoryForKey(msg *pb.ChaincodeMessage, txContext *Tr return nil, errors.WithStack(err) } + totalReturnLimit := calculateTotalReturnLimit(nil) + txContext.InitializeQueryContext(iterID, historyIter) - payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, historyIter, iterID) + payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, historyIter, iterID, false, totalReturnLimit) if err != nil { txContext.CleanupQueryContext(iterID) return nil, errors.WithStack(err) @@ -799,6 +852,57 @@ func isCollectionSet(collection string) bool { return collection != "" } +func isMetadataSetForPagination(metadata *pb.QueryMetadata) bool { + if metadata == nil { + return false + } + + if metadata.PageSize == 0 && metadata.Bookmark == "" { + return false + } + + return true +} + +func getQueryMetadataFromBytes(metadataBytes []byte) (*pb.QueryMetadata, error) { + if metadataBytes != nil { + metadata := &pb.QueryMetadata{} + err := proto.Unmarshal(metadataBytes, metadata) + if err != nil { + return nil, errors.Wrap(err, "unmarshal failed") + } + return metadata, nil + } + return nil, nil +} + +func createPaginationInfoFromMetadata(metadata *pb.QueryMetadata, totalReturnLimit int32, queryType pb.ChaincodeMessage_Type) (map[string]interface{}, error) { + paginationInfoMap := make(map[string]interface{}) + + switch queryType { + case pb.ChaincodeMessage_GET_QUERY_RESULT: + paginationInfoMap["bookmark"] = metadata.Bookmark + case pb.ChaincodeMessage_GET_STATE_BY_RANGE: + // this is a no-op for range query + default: + return nil, errors.New("query type must be either GetQueryResult or GetStateByRange") + } + + paginationInfoMap["limit"] = totalReturnLimit + return paginationInfoMap, nil +} + +func calculateTotalReturnLimit(metadata *pb.QueryMetadata) int32 { + totalReturnLimit := int32(ledgerconfig.GetTotalQueryLimit()) + if metadata != nil { + pageSize := int32(metadata.PageSize) + if pageSize > 0 && pageSize < totalReturnLimit { + totalReturnLimit = pageSize + } + } + return totalReturnLimit +} + func (h *Handler) getTxContextForInvoke(channelID string, txid string, payload []byte, format string, args ...interface{}) (*TransactionContext, error) { // if we have a channelID, just get the txsim from isValidTxSim if channelID != "" { diff --git a/core/chaincode/handler_registry_test.go b/core/chaincode/handler_registry_test.go index fe63061c9b0..51a918ad671 100644 --- a/core/chaincode/handler_registry_test.go +++ b/core/chaincode/handler_registry_test.go @@ -212,10 +212,10 @@ var _ = Describe("HandlerRegistry", func() { }) Describe("Deregister", func() { - var fakeResultsIterator *mock.ResultsIterator + var fakeResultsIterator *mock.QueryResultsIterator BeforeEach(func() { - fakeResultsIterator = &mock.ResultsIterator{} + fakeResultsIterator = &mock.QueryResultsIterator{} transactionContexts := chaincode.NewTransactionContexts() txContext, err := transactionContexts.Create(&ccprovider.TransactionParams{ diff --git a/core/chaincode/handler_test.go b/core/chaincode/handler_test.go index 057ec657d09..698d412d99d 100644 --- a/core/chaincode/handler_test.go +++ b/core/chaincode/handler_test.go @@ -942,7 +942,7 @@ var _ = Describe("Handler", func() { incomingMessage *pb.ChaincodeMessage request *pb.GetStateByRange expectedResponse *pb.ChaincodeMessage - fakeIterator *mock.ResultsIterator + fakeIterator *mock.QueryResultsIterator expectedQueryResponse *pb.QueryResponse expectedPayload []byte ) @@ -962,7 +962,7 @@ var _ = Describe("Handler", func() { ChannelId: "channel-id", } - fakeIterator = &mock.ResultsIterator{} + fakeIterator = &mock.QueryResultsIterator{} fakeTxSimulator.GetStateRangeScanIteratorReturns(fakeIterator, nil) expectedQueryResponse = &pb.QueryResponse{ @@ -991,6 +991,8 @@ var _ = Describe("Handler", func() { Expect(pqr).To(Equal(&chaincode.PendingQueryResult{})) iter := txContext.GetQueryIterator("generated-query-id") Expect(iter).To(Equal(fakeIterator)) + retCount := txContext.GetTotalReturnCount("generated-query-id") + Expect(*retCount).To(Equal(int32(0))) }) It("returns the response message", func() { @@ -1111,13 +1113,15 @@ var _ = Describe("Handler", func() { Expect(pqr).To(BeNil()) iter := txContext.GetQueryIterator("generated-query-id") Expect(iter).To(BeNil()) + retCount := txContext.GetTotalReturnCount("generated-query-id") + Expect(retCount).To(BeNil()) }) }) }) Describe("HandleQueryStateNext", func() { var ( - fakeIterator *mock.ResultsIterator + fakeIterator *mock.QueryResultsIterator expectedQueryResponse *pb.QueryResponse request *pb.QueryStateNext incomingMessage *pb.ChaincodeMessage @@ -1130,7 +1134,7 @@ var _ = Describe("Handler", func() { payload, err := proto.Marshal(request) Expect(err).NotTo(HaveOccurred()) - fakeIterator = &mock.ResultsIterator{} + fakeIterator = &mock.QueryResultsIterator{} txContext.InitializeQueryContext("query-state-next-id", fakeIterator) incomingMessage = &pb.ChaincodeMessage{ @@ -1152,7 +1156,7 @@ var _ = Describe("Handler", func() { Expect(err).NotTo(HaveOccurred()) Expect(fakeQueryResponseBuilder.BuildQueryResponseCallCount()).To(Equal(1)) - tctx, iter, id := fakeQueryResponseBuilder.BuildQueryResponseArgsForCall(0) + tctx, iter, id, _, _ := fakeQueryResponseBuilder.BuildQueryResponseArgsForCall(0) Expect(tctx).To(Equal(txContext)) Expect(iter).To(Equal(fakeIterator)) Expect(id).To(Equal("query-state-next-id")) @@ -1212,6 +1216,8 @@ var _ = Describe("Handler", func() { Expect(pqr).To(BeNil()) iter := txContext.GetQueryIterator("generated-query-id") Expect(iter).To(BeNil()) + retCount := txContext.GetTotalReturnCount("generated-query-id") + Expect(retCount).To(BeNil()) }) }) @@ -1232,13 +1238,15 @@ var _ = Describe("Handler", func() { Expect(pqr).To(BeNil()) iter := txContext.GetQueryIterator("generated-query-id") Expect(iter).To(BeNil()) + retCount := txContext.GetTotalReturnCount("generated-query-id") + Expect(retCount).To(BeNil()) }) }) }) Describe("HandleQueryStateClose", func() { var ( - fakeIterator *mock.ResultsIterator + fakeIterator *mock.QueryResultsIterator expectedQueryResponse *pb.QueryResponse request *pb.QueryStateClose incomingMessage *pb.ChaincodeMessage @@ -1251,7 +1259,7 @@ var _ = Describe("Handler", func() { payload, err := proto.Marshal(request) Expect(err).NotTo(HaveOccurred()) - fakeIterator = &mock.ResultsIterator{} + fakeIterator = &mock.QueryResultsIterator{} txContext.InitializeQueryContext("query-state-close-id", fakeIterator) incomingMessage = &pb.ChaincodeMessage{ @@ -1308,7 +1316,7 @@ var _ = Describe("Handler", func() { request *pb.GetQueryResult incomingMessage *pb.ChaincodeMessage expectedQueryResponse *pb.QueryResponse - fakeIterator *mock.ResultsIterator + fakeIterator *mock.QueryResultsIterator ) BeforeEach(func() { @@ -1330,7 +1338,7 @@ var _ = Describe("Handler", func() { } fakeQueryResponseBuilder.BuildQueryResponseReturns(expectedQueryResponse, nil) - fakeIterator = &mock.ResultsIterator{} + fakeIterator = &mock.QueryResultsIterator{} fakeTxSimulator.ExecuteQueryReturns(fakeIterator, nil) }) @@ -1386,6 +1394,8 @@ var _ = Describe("Handler", func() { Expect(pqr).To(Equal(&chaincode.PendingQueryResult{})) iter := txContext.GetQueryIterator("generated-query-id") Expect(iter).To(Equal(fakeIterator)) + retCount := txContext.GetTotalReturnCount("generated-query-id") + Expect(*retCount).To(Equal(int32(0))) }) Context("and ExecuteQueryOnPrivateData fails", func() { @@ -1405,7 +1415,7 @@ var _ = Describe("Handler", func() { Expect(err).NotTo(HaveOccurred()) Expect(fakeQueryResponseBuilder.BuildQueryResponseCallCount()).To(Equal(1)) - tctx, iter, iterID := fakeQueryResponseBuilder.BuildQueryResponseArgsForCall(0) + tctx, iter, iterID, _, _ := fakeQueryResponseBuilder.BuildQueryResponseArgsForCall(0) Expect(tctx).To(Equal(txContext)) Expect(iter).To(Equal(fakeIterator)) Expect(iterID).To(Equal("generated-query-id")) @@ -1428,6 +1438,8 @@ var _ = Describe("Handler", func() { Expect(pqr).To(BeNil()) iter := txContext.GetQueryIterator("generated-query-id") Expect(iter).To(BeNil()) + retCount := txContext.GetTotalReturnCount("generated-query-id") + Expect(retCount).To(BeNil()) }) }) @@ -1459,6 +1471,8 @@ var _ = Describe("Handler", func() { Expect(pqr).To(BeNil()) iter := txContext.GetQueryIterator("generated-query-id") Expect(iter).To(BeNil()) + retCount := txContext.GetTotalReturnCount("generated-query-id") + Expect(retCount).To(BeNil()) }) }) }) @@ -1468,7 +1482,7 @@ var _ = Describe("Handler", func() { request *pb.GetHistoryForKey incomingMessage *pb.ChaincodeMessage expectedQueryResponse *pb.QueryResponse - fakeIterator *mock.ResultsIterator + fakeIterator *mock.QueryResultsIterator ) BeforeEach(func() { @@ -1490,7 +1504,7 @@ var _ = Describe("Handler", func() { } fakeQueryResponseBuilder.BuildQueryResponseReturns(expectedQueryResponse, nil) - fakeIterator = &mock.ResultsIterator{} + fakeIterator = &mock.QueryResultsIterator{} fakeHistoryQueryExecutor.GetHistoryForKeyReturns(fakeIterator, nil) }) @@ -1512,6 +1526,8 @@ var _ = Describe("Handler", func() { Expect(pqr).To(Equal(&chaincode.PendingQueryResult{})) iter := txContext.GetQueryIterator("generated-query-id") Expect(iter).To(Equal(fakeIterator)) + retCount := txContext.GetTotalReturnCount("generated-query-id") + Expect(*retCount).To(Equal(int32(0))) }) It("builds a query response", func() { @@ -1519,7 +1535,7 @@ var _ = Describe("Handler", func() { Expect(err).NotTo(HaveOccurred()) Expect(fakeQueryResponseBuilder.BuildQueryResponseCallCount()).To(Equal(1)) - tctx, iter, iterID := fakeQueryResponseBuilder.BuildQueryResponseArgsForCall(0) + tctx, iter, iterID, _, _ := fakeQueryResponseBuilder.BuildQueryResponseArgsForCall(0) Expect(tctx).To(Equal(txContext)) Expect(iter).To(Equal(fakeIterator)) Expect(iterID).To(Equal("generated-query-id")) @@ -1564,6 +1580,8 @@ var _ = Describe("Handler", func() { Expect(pqr).To(BeNil()) iter := txContext.GetQueryIterator("generated-query-id") Expect(iter).To(BeNil()) + retCount := txContext.GetTotalReturnCount("generated-query-id") + Expect(retCount).To(BeNil()) }) }) @@ -1584,6 +1602,8 @@ var _ = Describe("Handler", func() { Expect(pqr).To(BeNil()) iter := txContext.GetQueryIterator("generated-query-id") Expect(iter).To(BeNil()) + retCount := txContext.GetTotalReturnCount("generated-query-id") + Expect(retCount).To(BeNil()) }) }) }) @@ -2432,11 +2452,11 @@ var _ = Describe("Handler", func() { }) Describe("Notify", func() { - var fakeIterator *mock.ResultsIterator + var fakeIterator *mock.QueryResultsIterator var incomingMessage *pb.ChaincodeMessage BeforeEach(func() { - fakeIterator = &mock.ResultsIterator{} + fakeIterator = &mock.QueryResultsIterator{} incomingMessage = &pb.ChaincodeMessage{ Txid: "tx-id", ChannelId: "channel-id", diff --git a/core/chaincode/mock/results_iterator.go b/core/chaincode/mock/results_iterator.go index 598d9a86192..2fd03afcda5 100644 --- a/core/chaincode/mock/results_iterator.go +++ b/core/chaincode/mock/results_iterator.go @@ -4,29 +4,38 @@ package mock import ( "sync" - "github.com/hyperledger/fabric/common/ledger" + commonledger "github.com/hyperledger/fabric/common/ledger" ) -type ResultsIterator struct { - NextStub func() (ledger.QueryResult, error) +type QueryResultsIterator struct { + NextStub func() (commonledger.QueryResult, error) nextMutex sync.RWMutex nextArgsForCall []struct{} nextReturns struct { - result1 ledger.QueryResult + result1 commonledger.QueryResult result2 error } nextReturnsOnCall map[int]struct { - result1 ledger.QueryResult + result1 commonledger.QueryResult result2 error } - CloseStub func() - closeMutex sync.RWMutex - closeArgsForCall []struct{} + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct{} + GetBookmarkAndCloseStub func() string + getBookmarkAndCloseMutex sync.RWMutex + getBookmarkAndCloseArgsForCall []struct{} + getBookmarkAndCloseReturns struct { + result1 string + } + getBookmarkAndCloseReturnsOnCall map[int]struct { + result1 string + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *ResultsIterator) Next() (ledger.QueryResult, error) { +func (fake *QueryResultsIterator) Next() (commonledger.QueryResult, error) { fake.nextMutex.Lock() ret, specificReturn := fake.nextReturnsOnCall[len(fake.nextArgsForCall)] fake.nextArgsForCall = append(fake.nextArgsForCall, struct{}{}) @@ -41,35 +50,35 @@ func (fake *ResultsIterator) Next() (ledger.QueryResult, error) { return fake.nextReturns.result1, fake.nextReturns.result2 } -func (fake *ResultsIterator) NextCallCount() int { +func (fake *QueryResultsIterator) NextCallCount() int { fake.nextMutex.RLock() defer fake.nextMutex.RUnlock() return len(fake.nextArgsForCall) } -func (fake *ResultsIterator) NextReturns(result1 ledger.QueryResult, result2 error) { +func (fake *QueryResultsIterator) NextReturns(result1 commonledger.QueryResult, result2 error) { fake.NextStub = nil fake.nextReturns = struct { - result1 ledger.QueryResult + result1 commonledger.QueryResult result2 error }{result1, result2} } -func (fake *ResultsIterator) NextReturnsOnCall(i int, result1 ledger.QueryResult, result2 error) { +func (fake *QueryResultsIterator) NextReturnsOnCall(i int, result1 commonledger.QueryResult, result2 error) { fake.NextStub = nil if fake.nextReturnsOnCall == nil { fake.nextReturnsOnCall = make(map[int]struct { - result1 ledger.QueryResult + result1 commonledger.QueryResult result2 error }) } fake.nextReturnsOnCall[i] = struct { - result1 ledger.QueryResult + result1 commonledger.QueryResult result2 error }{result1, result2} } -func (fake *ResultsIterator) Close() { +func (fake *QueryResultsIterator) Close() { fake.closeMutex.Lock() fake.closeArgsForCall = append(fake.closeArgsForCall, struct{}{}) fake.recordInvocation("Close", []interface{}{}) @@ -79,19 +88,61 @@ func (fake *ResultsIterator) Close() { } } -func (fake *ResultsIterator) CloseCallCount() int { +func (fake *QueryResultsIterator) CloseCallCount() int { fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() return len(fake.closeArgsForCall) } -func (fake *ResultsIterator) Invocations() map[string][][]interface{} { +func (fake *QueryResultsIterator) GetBookmarkAndClose() string { + fake.getBookmarkAndCloseMutex.Lock() + ret, specificReturn := fake.getBookmarkAndCloseReturnsOnCall[len(fake.getBookmarkAndCloseArgsForCall)] + fake.getBookmarkAndCloseArgsForCall = append(fake.getBookmarkAndCloseArgsForCall, struct{}{}) + fake.recordInvocation("GetBookmarkAndClose", []interface{}{}) + fake.getBookmarkAndCloseMutex.Unlock() + if fake.GetBookmarkAndCloseStub != nil { + return fake.GetBookmarkAndCloseStub() + } + if specificReturn { + return ret.result1 + } + return fake.getBookmarkAndCloseReturns.result1 +} + +func (fake *QueryResultsIterator) GetBookmarkAndCloseCallCount() int { + fake.getBookmarkAndCloseMutex.RLock() + defer fake.getBookmarkAndCloseMutex.RUnlock() + return len(fake.getBookmarkAndCloseArgsForCall) +} + +func (fake *QueryResultsIterator) GetBookmarkAndCloseReturns(result1 string) { + fake.GetBookmarkAndCloseStub = nil + fake.getBookmarkAndCloseReturns = struct { + result1 string + }{result1} +} + +func (fake *QueryResultsIterator) GetBookmarkAndCloseReturnsOnCall(i int, result1 string) { + fake.GetBookmarkAndCloseStub = nil + if fake.getBookmarkAndCloseReturnsOnCall == nil { + fake.getBookmarkAndCloseReturnsOnCall = make(map[int]struct { + result1 string + }) + } + fake.getBookmarkAndCloseReturnsOnCall[i] = struct { + result1 string + }{result1} +} + +func (fake *QueryResultsIterator) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() fake.nextMutex.RLock() defer fake.nextMutex.RUnlock() fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() + fake.getBookmarkAndCloseMutex.RLock() + defer fake.getBookmarkAndCloseMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value @@ -99,7 +150,7 @@ func (fake *ResultsIterator) Invocations() map[string][][]interface{} { return copiedInvocations } -func (fake *ResultsIterator) recordInvocation(key string, args []interface{}) { +func (fake *QueryResultsIterator) recordInvocation(key string, args []interface{}) { fake.invocationsMutex.Lock() defer fake.invocationsMutex.Unlock() if fake.invocations == nil { diff --git a/core/chaincode/mock/tx_simulator.go b/core/chaincode/mock/tx_simulator.go index 8b553573071..959d9ecf2d4 100644 --- a/core/chaincode/mock/tx_simulator.go +++ b/core/chaincode/mock/tx_simulator.go @@ -1676,5 +1676,3 @@ func (fake *TxSimulator) recordInvocation(key string, args []interface{}) { } fake.invocations[key] = append(fake.invocations[key], args) } - -var _ ledger.TxSimulator = new(TxSimulator) diff --git a/core/chaincode/query_response_generator.go b/core/chaincode/query_response_generator.go index a1bce1a6b7e..efe2b87062b 100644 --- a/core/chaincode/query_response_generator.go +++ b/core/chaincode/query_response_generator.go @@ -7,18 +7,29 @@ SPDX-License-Identifier: Apache-2.0 package chaincode import ( + "github.com/golang/protobuf/proto" commonledger "github.com/hyperledger/fabric/common/ledger" pb "github.com/hyperledger/fabric/protos/peer" ) type QueryResponseGenerator struct { - MaxResultLimit int + MaxResultLimit int + TotalQueryLimit int } -// NewQueryResponse takes an iterator and fetch state to construct QueryResponse -func (q *QueryResponseGenerator) BuildQueryResponse(txContext *TransactionContext, iter commonledger.ResultsIterator, iterID string) (*pb.QueryResponse, error) { +// BuildQueryResponse takes an iterator and fetch state to construct QueryResponse +func (q *QueryResponseGenerator) BuildQueryResponse(txContext *TransactionContext, iter commonledger.ResultsIterator, + iterID string, isPaginated bool, totalReturnLimit int32) (*pb.QueryResponse, error) { + pendingQueryResults := txContext.GetPendingQueryResult(iterID) + totalReturnCount := txContext.GetTotalReturnCount(iterID) + for { + // if the total count has been reached, return the result and prevent the Next() being called + if *totalReturnCount >= totalReturnLimit { + return createQueryResponse(txContext, iterID, isPaginated, pendingQueryResults, *totalReturnCount) + } + queryResult, err := iter.Next() switch { case err != nil: @@ -27,18 +38,20 @@ func (q *QueryResponseGenerator) BuildQueryResponse(txContext *TransactionContex return nil, err case queryResult == nil: - // nil response from iterator indicates end of query results - batch := pendingQueryResults.Cut() - txContext.CleanupQueryContext(iterID) - return &pb.QueryResponse{Results: batch, HasMore: false, Id: iterID}, nil - case pendingQueryResults.Size() == q.MaxResultLimit: - // max number of results queued up, cut batch, then add current result to pending batch + return createQueryResponse(txContext, iterID, isPaginated, pendingQueryResults, *totalReturnCount) + + case !isPaginated && pendingQueryResults.Size() == q.MaxResultLimit: + // if explicit pagination is not used + // if the max number of results is queued up, cut batch, then add current result to pending batch + // MaxResultLimit is for batching between chaincode shim and handler + // MaxResultLimit does not limit the records returned to the client batch := pendingQueryResults.Cut() if err := pendingQueryResults.Add(queryResult); err != nil { txContext.CleanupQueryContext(iterID) return nil, err } + *totalReturnCount++ return &pb.QueryResponse{Results: batch, HasMore: true, Id: iterID}, nil default: @@ -46,6 +59,35 @@ func (q *QueryResponseGenerator) BuildQueryResponse(txContext *TransactionContex txContext.CleanupQueryContext(iterID) return nil, err } + *totalReturnCount++ + } + } +} + +func createQueryResponse(txContext *TransactionContext, iterID string, isPaginated bool, pendingQueryResults *PendingQueryResult, totalReturnCount int32) (*pb.QueryResponse, error) { + + batch := pendingQueryResults.Cut() + + if isPaginated { + // when explicit pagination is enabled, return the batch with the responseMetadata + bookmark := txContext.CleanupQueryContextWithBookmark(iterID) + responseMetadata := createResponseMetadata(totalReturnCount, bookmark) + responseMetadataBytes, err := proto.Marshal(responseMetadata) + if err != nil { + return nil, err } + return &pb.QueryResponse{Results: batch, HasMore: false, Id: iterID, Metadata: responseMetadataBytes}, nil } + + // if explicit pagination is not used, then the end of the resultset has been reached, return the batch + txContext.CleanupQueryContext(iterID) + return &pb.QueryResponse{Results: batch, HasMore: false, Id: iterID}, nil + +} + +func createResponseMetadata(returnCount int32, bookmark string) *pb.QueryResponseMetadata { + responseMetadata := &pb.QueryResponseMetadata{} + responseMetadata.Bookmark = bookmark + responseMetadata.FetchedRecordsCount = int32(returnCount) + return responseMetadata } diff --git a/core/chaincode/query_response_generator_test.go b/core/chaincode/query_response_generator_test.go index 3a5b98d0e68..e8fbaf7eaf3 100644 --- a/core/chaincode/query_response_generator_test.go +++ b/core/chaincode/query_response_generator_test.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package chaincode_test import ( + "errors" "fmt" "math" "testing" @@ -14,10 +15,11 @@ import ( "github.com/hyperledger/fabric/core/chaincode" "github.com/hyperledger/fabric/core/chaincode/mock" "github.com/hyperledger/fabric/protos/ledger/queryresult" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) +const totalQueryLimit = 103 + func TestBuildQueryResponse(t *testing.T) { queryResult := &queryresult.KV{ Key: "key", @@ -28,24 +30,44 @@ func TestBuildQueryResponse(t *testing.T) { // test various boundry cases around maxResultLimit const maxResultLimit = 10 testCases := []struct { + recordCount int expectedResultCount int expectedHasMoreCount int + isPaginated bool + maxResultLimit int + totalQueryLimit int }{ - {0, 0}, - {1, 0}, - {10, 0}, - {maxResultLimit - 2, 0}, - {maxResultLimit - 1, 0}, - {maxResultLimit, 0}, - {maxResultLimit + 1, 1}, - {maxResultLimit + 2, 1}, - {int(math.Floor(maxResultLimit * 1.5)), 1}, - {maxResultLimit * 2, 1}, - {10*maxResultLimit - 2, 9}, - {10*maxResultLimit - 1, 9}, - {10 * maxResultLimit, 9}, - {10*maxResultLimit + 1, 10}, - {10*maxResultLimit + 2, 10}, + {0, 0, 0, false, maxResultLimit, totalQueryLimit}, + {1, 1, 0, false, maxResultLimit, totalQueryLimit}, + {10, 10, 0, false, maxResultLimit, totalQueryLimit}, + {maxResultLimit - 2, maxResultLimit - 2, 0, false, maxResultLimit, totalQueryLimit}, + {maxResultLimit - 1, maxResultLimit - 1, 0, false, maxResultLimit, totalQueryLimit}, + {maxResultLimit, maxResultLimit, 0, false, maxResultLimit, totalQueryLimit}, + {maxResultLimit + 1, maxResultLimit + 1, 1, false, maxResultLimit, totalQueryLimit}, + {maxResultLimit + 2, maxResultLimit + 2, 1, false, maxResultLimit, totalQueryLimit}, + {int(math.Floor(maxResultLimit * 1.5)), int(math.Floor(maxResultLimit * 1.5)), 1, false, maxResultLimit, totalQueryLimit}, + {maxResultLimit * 2, maxResultLimit * 2, 1, false, maxResultLimit, totalQueryLimit}, + {10*maxResultLimit - 2, 10*maxResultLimit - 2, 9, false, maxResultLimit, totalQueryLimit}, + {10*maxResultLimit - 1, 10*maxResultLimit - 1, 9, false, maxResultLimit, totalQueryLimit}, + {10 * maxResultLimit, 10 * maxResultLimit, 9, false, maxResultLimit, totalQueryLimit}, + {10*maxResultLimit + 1, 10*maxResultLimit + 1, 10, false, maxResultLimit, totalQueryLimit}, + {10*maxResultLimit + 2, 10*maxResultLimit + 2, 10, false, maxResultLimit, totalQueryLimit}, + {10*maxResultLimit + 3, 10*maxResultLimit + 3, 10, false, maxResultLimit, totalQueryLimit}, + {10*maxResultLimit + 5, 10*maxResultLimit + 3, 10, false, maxResultLimit, totalQueryLimit}, + {10, 5, 1, false, 4, 5}, + {10, 5, 0, false, 5, 5}, + {10, 5, 0, false, 6, 5}, + {0, 0, 0, true, maxResultLimit, totalQueryLimit}, + {1, 1, 0, true, maxResultLimit, totalQueryLimit}, + {10, 10, 0, true, maxResultLimit, totalQueryLimit}, + {maxResultLimit, maxResultLimit, 0, true, maxResultLimit, totalQueryLimit}, + {maxResultLimit + 1, maxResultLimit + 1, 0, true, maxResultLimit, totalQueryLimit}, + {10*maxResultLimit + 2, 10*maxResultLimit + 2, 0, true, maxResultLimit, totalQueryLimit}, + {10*maxResultLimit + 3, totalQueryLimit, 0, true, maxResultLimit, totalQueryLimit}, + {10*maxResultLimit + 4, totalQueryLimit, 0, true, maxResultLimit, totalQueryLimit}, + {10, 5, 0, true, 4, 5}, + {10, 5, 0, false, 5, 5}, + {10, 5, 0, false, 6, 5}, } for _, tc := range testCases { @@ -55,25 +77,26 @@ func TestBuildQueryResponse(t *testing.T) { TXSimulator: txSimulator, } - resultsIterator := &mock.ResultsIterator{} + resultsIterator := &mock.QueryResultsIterator{} transactionContext.InitializeQueryContext("query-id", resultsIterator) - for i := 0; i < tc.expectedResultCount; i++ { + for i := 0; i < tc.recordCount; i++ { resultsIterator.NextReturnsOnCall(i, queryResult, nil) } - resultsIterator.NextReturnsOnCall(tc.expectedResultCount, nil, nil) + resultsIterator.NextReturnsOnCall(tc.recordCount, nil, nil) responseGenerator := &chaincode.QueryResponseGenerator{ - MaxResultLimit: maxResultLimit, + MaxResultLimit: tc.maxResultLimit, + TotalQueryLimit: tc.totalQueryLimit, } - - for totalResultCount, hasMoreCount := 0, 0; hasMoreCount <= tc.expectedHasMoreCount; hasMoreCount++ { - queryResponse, err := responseGenerator.BuildQueryResponse(transactionContext, resultsIterator, "query-id") + totalResultCount := 0 + for hasMoreCount := 0; hasMoreCount <= tc.expectedHasMoreCount; hasMoreCount++ { + queryResponse, err := responseGenerator.BuildQueryResponse(transactionContext, resultsIterator, "query-id", tc.isPaginated, int32(tc.totalQueryLimit)) assert.NoError(t, err) switch { case hasMoreCount < tc.expectedHasMoreCount: // max limit sized batch retrieved, more expected assert.True(t, queryResponse.GetHasMore()) - assert.Len(t, queryResponse.GetResults(), maxResultLimit) + assert.Len(t, queryResponse.GetResults(), tc.maxResultLimit) default: // remainder retrieved, no more expected assert.Len(t, queryResponse.GetResults(), tc.expectedResultCount-totalResultCount) @@ -82,8 +105,16 @@ func TestBuildQueryResponse(t *testing.T) { } totalResultCount += len(queryResponse.GetResults()) } - assert.Equal(t, tc.expectedResultCount+1, resultsIterator.NextCallCount()) - assert.Equal(t, 1, resultsIterator.CloseCallCount()) + + // assert the total number of records is correct + assert.Equal(t, tc.expectedResultCount, totalResultCount) + + if tc.isPaginated { + // this case checks if the expected method was called to close the recordset + assert.Equal(t, 1, resultsIterator.GetBookmarkAndCloseCallCount()) + } else { + assert.Equal(t, 1, resultsIterator.CloseCallCount()) + } }) } } @@ -106,7 +137,7 @@ func TestBuildQueryResponseErrors(t *testing.T) { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { txSimulator := &mock.TxSimulator{} transactionContext := &chaincode.TransactionContext{TXSimulator: txSimulator} - resultsIterator := &mock.ResultsIterator{} + resultsIterator := &mock.QueryResultsIterator{} resultsIterator.NextReturns(validResult, nil) if tc.errorOnNextCall >= 0 { resultsIterator.NextReturnsOnCall(tc.errorOnNextCall, nil, errors.New("next-failed")) @@ -117,10 +148,11 @@ func TestBuildQueryResponseErrors(t *testing.T) { transactionContext.InitializeQueryContext("query-id", resultsIterator) responseGenerator := &chaincode.QueryResponseGenerator{ - MaxResultLimit: 3, + MaxResultLimit: 3, + TotalQueryLimit: totalQueryLimit, } - resp, err := responseGenerator.BuildQueryResponse(transactionContext, resultsIterator, "query-id") + resp, err := responseGenerator.BuildQueryResponse(transactionContext, resultsIterator, "query-id", false, totalQueryLimit) if tc.expectedErrValue == "" { assert.NoError(t, err) } else { diff --git a/core/chaincode/transaction_context.go b/core/chaincode/transaction_context.go index 5b6bf6bfcb3..eec690c6a4b 100644 --- a/core/chaincode/transaction_context.go +++ b/core/chaincode/transaction_context.go @@ -26,6 +26,7 @@ type TransactionContext struct { queryMutex sync.Mutex queryIteratorMap map[string]commonledger.ResultsIterator pendingQueryResults map[string]*PendingQueryResult + totalReturnCount map[string]*int32 } func (t *TransactionContext) InitializeQueryContext(queryID string, iter commonledger.ResultsIterator) { @@ -36,8 +37,13 @@ func (t *TransactionContext) InitializeQueryContext(queryID string, iter commonl if t.pendingQueryResults == nil { t.pendingQueryResults = map[string]*PendingQueryResult{} } + if t.totalReturnCount == nil { + t.totalReturnCount = map[string]*int32{} + } t.queryIteratorMap[queryID] = iter t.pendingQueryResults[queryID] = &PendingQueryResult{} + zeroValue := int32(0) + t.totalReturnCount[queryID] = &zeroValue t.queryMutex.Unlock() } @@ -55,6 +61,13 @@ func (t *TransactionContext) GetPendingQueryResult(queryID string) *PendingQuery return result } +func (t *TransactionContext) GetTotalReturnCount(queryID string) *int32 { + t.queryMutex.Lock() + result := t.totalReturnCount[queryID] + t.queryMutex.Unlock() + return result +} + func (t *TransactionContext) CleanupQueryContext(queryID string) { t.queryMutex.Lock() defer t.queryMutex.Unlock() @@ -64,6 +77,22 @@ func (t *TransactionContext) CleanupQueryContext(queryID string) { } delete(t.queryIteratorMap, queryID) delete(t.pendingQueryResults, queryID) + delete(t.totalReturnCount, queryID) +} + +func (t *TransactionContext) CleanupQueryContextWithBookmark(queryID string) string { + t.queryMutex.Lock() + defer t.queryMutex.Unlock() + iter := t.queryIteratorMap[queryID] + bookmark := "" + if iter != nil { + if queryResultIterator, ok := iter.(commonledger.QueryResultsIterator); ok { + bookmark = queryResultIterator.GetBookmarkAndClose() + } + } + delete(t.queryIteratorMap, queryID) + delete(t.pendingQueryResults, queryID) + return bookmark } func (t *TransactionContext) CloseQueryIterators() { diff --git a/core/chaincode/transaction_context_test.go b/core/chaincode/transaction_context_test.go index 197546be22d..89047b4bd9d 100644 --- a/core/chaincode/transaction_context_test.go +++ b/core/chaincode/transaction_context_test.go @@ -17,21 +17,21 @@ import ( var _ = Describe("TransactionContext", func() { var ( - resultsIterator *mock.ResultsIterator + resultsIterator *mock.QueryResultsIterator transactionContext *chaincode.TransactionContext ) BeforeEach(func() { - resultsIterator = &mock.ResultsIterator{} + resultsIterator = &mock.QueryResultsIterator{} transactionContext = &chaincode.TransactionContext{} }) Describe("InitializeQueryContext", func() { - var iter1, iter2 *mock.ResultsIterator + var iter1, iter2 *mock.QueryResultsIterator BeforeEach(func() { - iter1 = &mock.ResultsIterator{} - iter2 = &mock.ResultsIterator{} + iter1 = &mock.QueryResultsIterator{} + iter2 = &mock.QueryResultsIterator{} }) It("stores a references to the results iterator", func() { @@ -50,6 +50,13 @@ var _ = Describe("TransactionContext", func() { Expect(pqr).To(Equal(&chaincode.PendingQueryResult{})) }) + + It("populates a total return count", func() { + transactionContext.InitializeQueryContext("query-id", iter1) + count := transactionContext.GetTotalReturnCount("query-id") + + Expect(*count).To(Equal(int32(0))) + }) }) Describe("GetQueryIterator", func() { @@ -91,6 +98,26 @@ var _ = Describe("TransactionContext", func() { }) }) + Describe("GetPendingTotalRecordCount", func() { + Context("when a query context has been initialized", func() { + BeforeEach(func() { + transactionContext.InitializeQueryContext("query-id", nil) + }) + + It("returns a non-nil total record count", func() { + retCount := transactionContext.GetTotalReturnCount("query-id") + Expect(*retCount).To(Equal(int32(0))) + }) + }) + + Context("when a query context has not been initialized", func() { + It("returns a nil total return count", func() { + retCount := transactionContext.GetTotalReturnCount("query-id") + Expect(retCount).To(BeNil()) + }) + }) + }) + Describe("CleanupQueryContext", func() { It("removes references to the the iterator and results", func() { transactionContext.InitializeQueryContext("query-id", resultsIterator) @@ -100,6 +127,8 @@ var _ = Describe("TransactionContext", func() { Expect(iter).To(BeNil()) pqr := transactionContext.GetPendingQueryResult("query-id") Expect(pqr).To(BeNil()) + retCount := transactionContext.GetTotalReturnCount("query-id") + Expect(retCount).To(BeNil()) }) It("closes the query iterator", func() { @@ -121,11 +150,11 @@ var _ = Describe("TransactionContext", func() { }) Describe("CloseQueryIterators", func() { - var resultsIterators []*mock.ResultsIterator + var resultsIterators []*mock.QueryResultsIterator BeforeEach(func() { for i := 0; i < 5; i++ { - resultsIterators = append(resultsIterators, &mock.ResultsIterator{}) + resultsIterators = append(resultsIterators, &mock.QueryResultsIterator{}) transactionContext.InitializeQueryContext(fmt.Sprintf("query-id-%d", i+1), resultsIterators[i]) } }) diff --git a/core/chaincode/transaction_contexts_test.go b/core/chaincode/transaction_contexts_test.go index 3a03e5dd19e..1706979093f 100644 --- a/core/chaincode/transaction_contexts_test.go +++ b/core/chaincode/transaction_contexts_test.go @@ -141,12 +141,12 @@ var _ = Describe("TransactionContexts", func() { }) Describe("Close", func() { - var fakeIterators []*mock.ResultsIterator + var fakeIterators []*mock.QueryResultsIterator BeforeEach(func() { - fakeIterators = make([]*mock.ResultsIterator, 6) + fakeIterators = make([]*mock.QueryResultsIterator, 6) for i := 0; i < len(fakeIterators); i++ { - fakeIterators[i] = &mock.ResultsIterator{} + fakeIterators[i] = &mock.QueryResultsIterator{} } txContext, err := txContexts.Create(&ccprovider.TransactionParams{ diff --git a/examples/chaincode/go/map/map.go b/examples/chaincode/go/map/map.go index cace8c14ede..dfa60e58b76 100644 --- a/examples/chaincode/go/map/map.go +++ b/examples/chaincode/go/map/map.go @@ -29,6 +29,11 @@ import ( type SimpleChaincode struct { } +type PageResponse struct { + Bookmark string `json:"bookmark"` + Keys []string `json:"keys"` +} + // Init is a no-op func (t *SimpleChaincode) Init(stub shim.ChaincodeStubInterface) pb.Response { return shim.Success(nil) @@ -245,6 +250,61 @@ func (t *SimpleChaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Response { } return shim.Success(jsonKeys) + + case "keysByPage": + if len(args) < 4 { + return shim.Error("put operation must include four arguments, a key and value") + } + startKey := args[0] + endKey := args[1] + pageSize, parserr := strconv.ParseInt(args[2], 10, 32) + if parserr != nil { + return shim.Error(fmt.Sprintf("error parsing range pagesize: %s", parserr)) + } + bookmark := args[3] + + //sleep needed to test peer's timeout behavior when using iterators + stime := 0 + if len(args) > 2 { + stime, _ = strconv.Atoi(args[2]) + } + + keysIter, resp, err := stub.GetStateByRangeWithPagination(startKey, endKey, int32(pageSize), bookmark) + if err != nil { + return shim.Error(fmt.Sprintf("keys operation failed. Error accessing state: %s", err)) + } + defer keysIter.Close() + + var keys []string + for keysIter.HasNext() { + //if sleeptime is specied, take a nap + if stime > 0 { + time.Sleep(time.Duration(stime) * time.Millisecond) + } + + response, iterErr := keysIter.Next() + if iterErr != nil { + return shim.Error(fmt.Sprintf("keys operation failed. Error accessing state: %s", err)) + } + keys = append(keys, response.Key) + } + + for key, value := range keys { + fmt.Printf("key %d contains %s\n", key, value) + } + + jsonResp := PageResponse{ + Bookmark: resp.Bookmark, + Keys: keys, + } + + queryResp, err := json.Marshal(jsonResp) + if err != nil { + return shim.Error(fmt.Sprintf("keys operation failed. Error marshaling JSON: %s", err)) + } + + return shim.Success(queryResp) + case "query": query := args[0] keysIter, err := stub.GetQueryResult(query) @@ -268,6 +328,46 @@ func (t *SimpleChaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Response { } return shim.Success(jsonKeys) + + case "queryByPage": + query := args[0] + pageSize, parserr := strconv.ParseInt(args[1], 10, 32) + if parserr != nil { + return shim.Error(fmt.Sprintf("error parsing range pagesize: %s", parserr)) + } + bookmark := args[2] + + keysIter, resp, err := stub.GetQueryResultWithPagination(query, int32(pageSize), bookmark) + if err != nil { + return shim.Error(fmt.Sprintf("query operation failed. Error accessing state: %s", err)) + } + defer keysIter.Close() + + var keys []string + for keysIter.HasNext() { + response, iterErr := keysIter.Next() + if iterErr != nil { + return shim.Error(fmt.Sprintf("query operation failed. Error accessing state: %s", err)) + } + keys = append(keys, response.Key) + } + + for key, value := range keys { + fmt.Printf("key %d contains %s\n", key, value) + } + + jsonResp := PageResponse{ + Bookmark: resp.Bookmark, + Keys: keys, + } + + queryResp, err := json.Marshal(jsonResp) + if err != nil { + return shim.Error(fmt.Sprintf("keys operation failed. Error marshaling JSON: %s", err)) + } + + return shim.Success(queryResp) + case "history": key := args[0] keysIter, err := stub.GetHistoryForKey(key)