Skip to content

Commit

Permalink
FAB-11455 peer side changes to support pagination
Browse files Browse the repository at this point in the history
We need to make necessary changes in the core/chaincode/handler.go for
pagination of results. Currently, the handler.go in the peer calls the ledger
to execute queries (range query, rich query) on behalf of the chaincode.
On an execution of a query, the ledger returns an iterator. Using the
iterator, peer constructs the query response with records and send the
response to chaincode.

This CR makes the following changes to support pagination of results.

When a pagination of results is enabled for a query (i.e., when the query
metadata is passed with either pageSize or a bookmark or both for range/rich
queries), the peer would call the new pagination enabled range/rich query
APIs at ledger which would return a response metadata in addition to an
iterator. The response metadata would contain the next bookmark and the
number of records fetched for the query. Peer retrieves all fetched records
from ledger, constructs a query response including both records and
response metadata. The chaincode can pass on the bookmark in the response
metadata to the application/sdk.

Change-Id: Id770dd184369d1f7eaa30f046e1923c8fde564a5
Signed-off-by: senthil <cendhu@gmail.com>
Signed-off-by: Chris Elder <chris.elder@us.ibm.com>
  • Loading branch information
cendhu authored and Chris Elder committed Sep 7, 2018
1 parent 35b3e2f commit 0ee45ac
Show file tree
Hide file tree
Showing 14 changed files with 618 additions and 108 deletions.
3 changes: 2 additions & 1 deletion core/chaincode/chaincode_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hyperledger/fabric/core/common/sysccprovider"
"github.com/hyperledger/fabric/core/container/ccintf"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/peer"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
Expand Down Expand Up @@ -178,7 +179,7 @@ func (cs *ChaincodeSupport) HandleChaincodeStream(stream ccintf.ChaincodeStream)
SystemCCProvider: cs.SystemCCProvider,
SystemCCVersion: util.GetSysCCVersion(),
InstantiationPolicyChecker: CheckInstantiationPolicyFunc(ccprovider.CheckInstantiationPolicy),
QueryResponseBuilder: &QueryResponseGenerator{MaxResultLimit: 100},
QueryResponseBuilder: &QueryResponseGenerator{MaxResultLimit: 100, TotalQueryLimit: ledgerconfig.GetTotalQueryLimit()},
UUIDGenerator: UUIDGeneratorFunc(util.GenerateUUID),
LedgerGetter: peer.Default,
AppConfig: cs.appConfig,
Expand Down
100 changes: 100 additions & 0 deletions core/chaincode/exectransaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"net"
"os"
"path/filepath"
"reflect"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -1129,6 +1130,56 @@ func TestQueries(t *testing.T) {
return
}

type PageResponse struct {
Bookmark string `json:"bookmark"`
Keys []string `json:"keys"`
}

//The following range query for "marble001" to "marble011" should return 10 marbles, in pages of 2
f = "keysByPage"
args = util.ToChaincodeArgs(f, "marble001", "marble011", "2", "")

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, retval, err = invoke(chainID, spec, nextBlockNumber, nil, chaincodeSupport)
nextBlockNumber++
if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", ccID, err)
return
}
queryPage := &PageResponse{}

json.Unmarshal(retval, &queryPage)

expectedResult := []string{"marble001", "marble002"}

if !reflect.DeepEqual(expectedResult, queryPage.Keys) {
t.Fail()
t.Logf("Error detected with the paginated range query. Returned: %v should have returned: %v", queryPage.Keys, expectedResult)
return
}

// query for the next page
args = util.ToChaincodeArgs(f, "marble001", "marble011", "2", queryPage.Bookmark)
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, retval, err = invoke(chainID, spec, nextBlockNumber, nil, chaincodeSupport)
nextBlockNumber++
if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", ccID, err)
return
}

json.Unmarshal(retval, &queryPage)

expectedResult = []string{"marble003", "marble004"}

if !reflect.DeepEqual(expectedResult, queryPage.Keys) {
t.Fail()
t.Logf("Error detected with the paginated range query second page. Returned: %v should have returned: %v %v", queryPage.Keys, expectedResult, queryPage.Bookmark)
return
}

// ExecuteQuery supported only for CouchDB and
// query limits apply for CouchDB range and rich queries only
if ledgerconfig.IsCouchDBEnabled() == true {
Expand Down Expand Up @@ -1236,6 +1287,55 @@ func TestQueries(t *testing.T) {
return
}

//The following rich query should return 5 marbles due to the queryLimit
f = "queryByPage"
args = util.ToChaincodeArgs(f, "{\"selector\":{\"owner\":\"jerry\"}}", "2", "")

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, retval, err = invoke(chainID, spec, nextBlockNumber, nil, chaincodeSupport)
nextBlockNumber++
if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", ccID, err)
return
}

queryPage := &PageResponse{}

json.Unmarshal(retval, &queryPage)

expectedResult := []string{"marble001", "marble003"}

if !reflect.DeepEqual(expectedResult, queryPage.Keys) {
t.Fail()
t.Logf("Error detected with the paginated range query. Returned: %v should have returned: %v", queryPage.Keys, expectedResult)
return
}

// set args for the next page
args = util.ToChaincodeArgs(f, "{\"selector\":{\"owner\":\"jerry\"}}", "2", queryPage.Bookmark)

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, retval, err = invoke(chainID, spec, nextBlockNumber, nil, chaincodeSupport)
nextBlockNumber++
if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", ccID, err)
return
}

queryPage = &PageResponse{}

json.Unmarshal(retval, &queryPage)

expectedResult = []string{"marble005", "marble007"}

if !reflect.DeepEqual(expectedResult, queryPage.Keys) {
t.Fail()
t.Logf("Error detected with the paginated range query. Returned: %v should have returned: %v", queryPage.Keys, expectedResult)
return
}

}

// modifications for history query
Expand Down
32 changes: 18 additions & 14 deletions core/chaincode/fake/query_response_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

118 changes: 111 additions & 7 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hyperledger/fabric/core/common/sysccprovider"
"github.com/hyperledger/fabric/core/container/ccintf"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -82,7 +83,8 @@ func (c CheckInstantiationPolicyFunc) CheckInstantiationPolicy(name, version str
// QueryResponseBuilder is responsible for building QueryResponse messages for query
// transactions initiated by chaincode.
type QueryResponseBuilder interface {
BuildQueryResponse(txContext *TransactionContext, iter commonledger.ResultsIterator, iterID string) (*pb.QueryResponse, error)
BuildQueryResponse(txContext *TransactionContext, iter commonledger.ResultsIterator,
iterID string, isPaginated bool, totalReturnLimit int32) (*pb.QueryResponse, error)
}

// ChaincodeDefinitionGetter is responsible for retrieving a chaincode definition
Expand Down Expand Up @@ -643,21 +645,49 @@ func (h *Handler) HandleGetStateByRange(msg *pb.ChaincodeMessage, txContext *Tra
return nil, errors.Wrap(err, "unmarshal failed")
}

metadata, err := getQueryMetadataFromBytes(getStateByRange.Metadata)
if err != nil {
return nil, err
}

totalReturnLimit := calculateTotalReturnLimit(metadata)

iterID := h.UUIDGenerator.New()
chaincodeName := h.ChaincodeName()

var rangeIter commonledger.ResultsIterator
var paginationInfo map[string]interface{}

isPaginated := false

if isCollectionSet(getStateByRange.Collection) {
rangeIter, err = txContext.TXSimulator.GetPrivateDataRangeScanIterator(chaincodeName, getStateByRange.Collection, getStateByRange.StartKey, getStateByRange.EndKey)
rangeIter, err = txContext.TXSimulator.GetPrivateDataRangeScanIterator(chaincodeName, getStateByRange.Collection,
getStateByRange.StartKey, getStateByRange.EndKey)
} else if isMetadataSetForPagination(metadata) {
paginationInfo, err = createPaginationInfoFromMetadata(metadata, totalReturnLimit, pb.ChaincodeMessage_GET_STATE_BY_RANGE)
if err != nil {
return nil, err
}
isPaginated = true

startKey := getStateByRange.StartKey

if isMetadataSetForPagination(metadata) {
if metadata.Bookmark != "" {
startKey = metadata.Bookmark
}
}
rangeIter, err = txContext.TXSimulator.GetStateRangeScanIteratorWithMetadata(chaincodeName,
startKey, getStateByRange.EndKey, paginationInfo)
} else {
rangeIter, err = txContext.TXSimulator.GetStateRangeScanIterator(chaincodeName, getStateByRange.StartKey, getStateByRange.EndKey)
}
if err != nil {
return nil, errors.WithStack(err)
}

txContext.InitializeQueryContext(iterID, rangeIter)
payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, rangeIter, iterID)

payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, rangeIter, iterID, isPaginated, totalReturnLimit)
if err != nil {
txContext.CleanupQueryContext(iterID)
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -686,7 +716,9 @@ func (h *Handler) HandleQueryStateNext(msg *pb.ChaincodeMessage, txContext *Tran
return nil, errors.New("query iterator not found")
}

payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, queryIter, queryStateNext.Id)
totalReturnLimit := calculateTotalReturnLimit(nil)

payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, queryIter, queryStateNext.Id, false, totalReturnLimit)
if err != nil {
txContext.CleanupQueryContext(queryStateNext.Id)
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -734,9 +766,28 @@ func (h *Handler) HandleGetQueryResult(msg *pb.ChaincodeMessage, txContext *Tran
return nil, errors.Wrap(err, "unmarshal failed")
}

metadata, err := getQueryMetadataFromBytes(getQueryResult.Metadata)
if err != nil {
return nil, err
}

totalReturnLimit := calculateTotalReturnLimit(metadata)
isPaginated := false

var executeIter commonledger.ResultsIterator
var paginationInfo map[string]interface{}

if isCollectionSet(getQueryResult.Collection) {
executeIter, err = txContext.TXSimulator.ExecuteQueryOnPrivateData(chaincodeName, getQueryResult.Collection, getQueryResult.Query)
} else if isMetadataSetForPagination(metadata) {
paginationInfo, err = createPaginationInfoFromMetadata(metadata, totalReturnLimit, pb.ChaincodeMessage_GET_QUERY_RESULT)
if err != nil {
return nil, err
}
isPaginated = true
executeIter, err = txContext.TXSimulator.ExecuteQueryWithMetadata(chaincodeName,
getQueryResult.Query, paginationInfo)

} else {
executeIter, err = txContext.TXSimulator.ExecuteQuery(chaincodeName, getQueryResult.Query)
}
Expand All @@ -746,7 +797,7 @@ func (h *Handler) HandleGetQueryResult(msg *pb.ChaincodeMessage, txContext *Tran

txContext.InitializeQueryContext(iterID, executeIter)

payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, executeIter, iterID)
payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, executeIter, iterID, isPaginated, totalReturnLimit)
if err != nil {
txContext.CleanupQueryContext(iterID)
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -778,8 +829,10 @@ func (h *Handler) HandleGetHistoryForKey(msg *pb.ChaincodeMessage, txContext *Tr
return nil, errors.WithStack(err)
}

totalReturnLimit := calculateTotalReturnLimit(nil)

txContext.InitializeQueryContext(iterID, historyIter)
payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, historyIter, iterID)
payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, historyIter, iterID, false, totalReturnLimit)
if err != nil {
txContext.CleanupQueryContext(iterID)
return nil, errors.WithStack(err)
Expand All @@ -799,6 +852,57 @@ func isCollectionSet(collection string) bool {
return collection != ""
}

func isMetadataSetForPagination(metadata *pb.QueryMetadata) bool {
if metadata == nil {
return false
}

if metadata.PageSize == 0 && metadata.Bookmark == "" {
return false
}

return true
}

func getQueryMetadataFromBytes(metadataBytes []byte) (*pb.QueryMetadata, error) {
if metadataBytes != nil {
metadata := &pb.QueryMetadata{}
err := proto.Unmarshal(metadataBytes, metadata)
if err != nil {
return nil, errors.Wrap(err, "unmarshal failed")
}
return metadata, nil
}
return nil, nil
}

func createPaginationInfoFromMetadata(metadata *pb.QueryMetadata, totalReturnLimit int32, queryType pb.ChaincodeMessage_Type) (map[string]interface{}, error) {
paginationInfoMap := make(map[string]interface{})

switch queryType {
case pb.ChaincodeMessage_GET_QUERY_RESULT:
paginationInfoMap["bookmark"] = metadata.Bookmark
case pb.ChaincodeMessage_GET_STATE_BY_RANGE:
// this is a no-op for range query
default:
return nil, errors.New("query type must be either GetQueryResult or GetStateByRange")
}

paginationInfoMap["limit"] = totalReturnLimit
return paginationInfoMap, nil
}

func calculateTotalReturnLimit(metadata *pb.QueryMetadata) int32 {
totalReturnLimit := int32(ledgerconfig.GetTotalQueryLimit())
if metadata != nil {
pageSize := int32(metadata.PageSize)
if pageSize > 0 && pageSize < totalReturnLimit {
totalReturnLimit = pageSize
}
}
return totalReturnLimit
}

func (h *Handler) getTxContextForInvoke(channelID string, txid string, payload []byte, format string, args ...interface{}) (*TransactionContext, error) {
// if we have a channelID, just get the txsim from isValidTxSim
if channelID != "" {
Expand Down
Loading

0 comments on commit 0ee45ac

Please sign in to comment.