@@ -47,12 +47,18 @@ type transactionContext struct {
47
47
responseNotifier chan * pb.ChaincodeMessage
48
48
49
49
// tracks open iterators used for range queries
50
- queryIteratorMap map [string ]commonledger.ResultsIterator
50
+ queryIteratorMap map [string ]commonledger.ResultsIterator
51
+ pendingQueryResults map [string ]* pendingQueryResult
51
52
52
53
txsimulator ledger.TxSimulator
53
54
historyQueryExecutor ledger.HistoryQueryExecutor
54
55
}
55
56
57
+ type pendingQueryResult struct {
58
+ batch []* pb.QueryResultBytes
59
+ count int
60
+ }
61
+
56
62
type nextStateInfo struct {
57
63
msg * pb.ChaincodeMessage
58
64
sendToCC bool
@@ -181,7 +187,8 @@ func (handler *Handler) createTxContext(ctxt context.Context, chainID string, tx
181
187
}
182
188
txctx := & transactionContext {chainID : chainID , signedProp : signedProp ,
183
189
proposal : prop , responseNotifier : make (chan * pb.ChaincodeMessage , 1 ),
184
- queryIteratorMap : make (map [string ]commonledger.ResultsIterator )}
190
+ queryIteratorMap : make (map [string ]commonledger.ResultsIterator ),
191
+ pendingQueryResults : make (map [string ]* pendingQueryResult )}
185
192
handler .txCtxs [txCtxID ] = txctx
186
193
txctx .txsimulator = getTxSimulator (ctxt )
187
194
txctx .historyQueryExecutor = getHistoryQueryExecutor (ctxt )
@@ -205,11 +212,12 @@ func (handler *Handler) deleteTxContext(chainID, txid string) {
205
212
}
206
213
}
207
214
208
- func (handler * Handler ) putQueryIterator (txContext * transactionContext , queryID string ,
215
+ func (handler * Handler ) initializeQueryContext (txContext * transactionContext , queryID string ,
209
216
queryIterator commonledger.ResultsIterator ) {
210
217
handler .Lock ()
211
218
defer handler .Unlock ()
212
219
txContext .queryIteratorMap [queryID ] = queryIterator
220
+ txContext .pendingQueryResults [queryID ] = & pendingQueryResult {batch : make ([]* pb.QueryResultBytes , 0 )}
213
221
}
214
222
215
223
func (handler * Handler ) getQueryIterator (txContext * transactionContext , queryID string ) commonledger.ResultsIterator {
@@ -218,10 +226,12 @@ func (handler *Handler) getQueryIterator(txContext *transactionContext, queryID
218
226
return txContext .queryIteratorMap [queryID ]
219
227
}
220
228
221
- func (handler * Handler ) deleteQueryIterator (txContext * transactionContext , queryID string ) {
229
+ func (handler * Handler ) cleanupQueryContext (txContext * transactionContext , queryID string ) {
222
230
handler .Lock ()
223
231
defer handler .Unlock ()
232
+ txContext .queryIteratorMap [queryID ].Close ()
224
233
delete (txContext .queryIteratorMap , queryID )
234
+ delete (txContext .pendingQueryResults , queryID )
225
235
}
226
236
227
237
// Check if the transactor is allow to call this chaincode on this channel
@@ -710,8 +720,7 @@ func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) {
710
720
711
721
errHandler := func (err error , iter commonledger.ResultsIterator , errFmt string , errArgs ... interface {}) {
712
722
if iter != nil {
713
- iter .Close ()
714
- handler .deleteQueryIterator (txContext , iterID )
723
+ handler .cleanupQueryContext (txContext , iterID )
715
724
}
716
725
payload := []byte (err .Error ())
717
726
chaincodeLogger .Errorf (errFmt , errArgs ... )
@@ -730,7 +739,8 @@ func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) {
730
739
return
731
740
}
732
741
733
- handler .putQueryIterator (txContext , iterID , rangeIter )
742
+ handler .initializeQueryContext (txContext , iterID , rangeIter )
743
+
734
744
var payload * pb.QueryResponse
735
745
payload , err = getQueryResponse (handler , txContext , rangeIter , iterID )
736
746
if err != nil {
@@ -755,39 +765,52 @@ const maxResultLimit = 100
755
765
//getQueryResponse takes an iterator and fetch state to construct QueryResponse
756
766
func getQueryResponse (handler * Handler , txContext * transactionContext , iter commonledger.ResultsIterator ,
757
767
iterID string ) (* pb.QueryResponse , error ) {
758
-
759
- var err error
760
- var queryResult commonledger.QueryResult
761
- var queryResultsBytes []* pb.QueryResultBytes
762
-
763
- for i := 0 ; i < maxResultLimit ; i ++ {
764
- queryResult , err = iter .Next ()
765
- if err != nil {
768
+ pendingQueryResults := txContext .pendingQueryResults [iterID ]
769
+ for {
770
+ queryResult , err := iter .Next ()
771
+ switch {
772
+ case err != nil :
766
773
chaincodeLogger .Errorf ("Failed to get query result from iterator" )
767
- break
768
- }
769
- if queryResult == nil {
770
- break
771
- }
772
- var resultBytes []byte
773
- resultBytes , err = proto .Marshal (queryResult .(proto.Message ))
774
- if err != nil {
775
- chaincodeLogger .Errorf ("Failed to get encode query result as bytes" )
776
- break
774
+ handler .cleanupQueryContext (txContext , iterID )
775
+ return nil , err
776
+ case queryResult == nil :
777
+ // nil response from iterator indicates end of query results
778
+ batch := pendingQueryResults .cut ()
779
+ handler .cleanupQueryContext (txContext , iterID )
780
+ return & pb.QueryResponse {Results : batch , HasMore : false , Id : iterID }, nil
781
+ case pendingQueryResults .count == maxResultLimit :
782
+ // max number of results queued up, cut batch, then add current result to pending batch
783
+ batch := pendingQueryResults .cut ()
784
+ if err := pendingQueryResults .add (queryResult ); err != nil {
785
+ handler .cleanupQueryContext (txContext , iterID )
786
+ return nil , err
787
+ }
788
+ return & pb.QueryResponse {Results : batch , HasMore : true , Id : iterID }, nil
789
+ default :
790
+ if err := pendingQueryResults .add (queryResult ); err != nil {
791
+ handler .cleanupQueryContext (txContext , iterID )
792
+ return nil , err
793
+ }
777
794
}
778
-
779
- qresultBytes := pb.QueryResultBytes {ResultBytes : resultBytes }
780
- queryResultsBytes = append (queryResultsBytes , & qresultBytes )
781
795
}
796
+ }
782
797
783
- if queryResult == nil || err != nil {
784
- iter .Close ()
785
- handler .deleteQueryIterator (txContext , iterID )
786
- if err != nil {
787
- return nil , err
788
- }
798
+ func (p * pendingQueryResult ) cut () []* pb.QueryResultBytes {
799
+ batch := p .batch
800
+ p .batch = nil
801
+ p .count = 0
802
+ return batch
803
+ }
804
+
805
+ func (p * pendingQueryResult ) add (queryResult commonledger.QueryResult ) error {
806
+ queryResultBytes , err := proto .Marshal (queryResult .(proto.Message ))
807
+ if err != nil {
808
+ chaincodeLogger .Errorf ("Failed to get encode query result as bytes" )
809
+ return err
789
810
}
790
- return & pb.QueryResponse {Results : queryResultsBytes , HasMore : queryResult != nil , Id : iterID }, nil
811
+ p .batch = append (p .batch , & pb.QueryResultBytes {ResultBytes : queryResultBytes })
812
+ p .count = len (p .batch )
813
+ return nil
791
814
}
792
815
793
816
// afterQueryStateNext handles a QUERY_STATE_NEXT request from the chaincode.
@@ -831,8 +854,7 @@ func (handler *Handler) handleQueryStateNext(msg *pb.ChaincodeMessage) {
831
854
832
855
errHandler := func (payload []byte , iter commonledger.ResultsIterator , errFmt string , errArgs ... interface {}) {
833
856
if iter != nil {
834
- iter .Close ()
835
- handler .deleteQueryIterator (txContext , queryStateNext .Id )
857
+ handler .cleanupQueryContext (txContext , queryStateNext .Id )
836
858
}
837
859
chaincodeLogger .Errorf (errFmt , errArgs ... )
838
860
serialSendMsg = & pb.ChaincodeMessage {Type : pb .ChaincodeMessage_ERROR , Payload : payload , Txid : msg .Txid , ChannelId : msg .ChannelId }
@@ -931,8 +953,7 @@ func (handler *Handler) handleQueryStateClose(msg *pb.ChaincodeMessage) {
931
953
932
954
iter := handler .getQueryIterator (txContext , queryStateClose .Id )
933
955
if iter != nil {
934
- iter .Close ()
935
- handler .deleteQueryIterator (txContext , queryStateClose .Id )
956
+ handler .cleanupQueryContext (txContext , queryStateClose .Id )
936
957
}
937
958
938
959
payload := & pb.QueryResponse {HasMore : false , Id : queryStateClose .Id }
@@ -989,8 +1010,7 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) {
989
1010
990
1011
errHandler := func (payload []byte , iter commonledger.ResultsIterator , errFmt string , errArgs ... interface {}) {
991
1012
if iter != nil {
992
- iter .Close ()
993
- handler .deleteQueryIterator (txContext , iterID )
1013
+ handler .cleanupQueryContext (txContext , iterID )
994
1014
}
995
1015
chaincodeLogger .Errorf (errFmt , errArgs ... )
996
1016
serialSendMsg = & pb.ChaincodeMessage {Type : pb .ChaincodeMessage_ERROR , Payload : payload , Txid : msg .Txid , ChannelId : msg .ChannelId }
@@ -1025,7 +1045,8 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) {
1025
1045
return
1026
1046
}
1027
1047
1028
- handler .putQueryIterator (txContext , iterID , executeIter )
1048
+ handler .initializeQueryContext (txContext , iterID , executeIter )
1049
+
1029
1050
var payload * pb.QueryResponse
1030
1051
payload , err = getQueryResponse (handler , txContext , executeIter , iterID )
1031
1052
if err != nil {
@@ -1087,8 +1108,7 @@ func (handler *Handler) handleGetHistoryForKey(msg *pb.ChaincodeMessage) {
1087
1108
1088
1109
errHandler := func (payload []byte , iter commonledger.ResultsIterator , errFmt string , errArgs ... interface {}) {
1089
1110
if iter != nil {
1090
- iter .Close ()
1091
- handler .deleteQueryIterator (txContext , iterID )
1111
+ handler .cleanupQueryContext (txContext , iterID )
1092
1112
}
1093
1113
chaincodeLogger .Errorf (errFmt , errArgs ... )
1094
1114
serialSendMsg = & pb.ChaincodeMessage {Type : pb .ChaincodeMessage_ERROR , Payload : payload , Txid : msg .Txid , ChannelId : msg .ChannelId }
@@ -1115,7 +1135,7 @@ func (handler *Handler) handleGetHistoryForKey(msg *pb.ChaincodeMessage) {
1115
1135
return
1116
1136
}
1117
1137
1118
- handler .putQueryIterator (txContext , iterID , historyIter )
1138
+ handler .initializeQueryContext (txContext , iterID , historyIter )
1119
1139
1120
1140
var payload * pb.QueryResponse
1121
1141
payload , err = getQueryResponse (handler , txContext , historyIter , iterID )
0 commit comments