Skip to content

Commit

Permalink
Merge "FAB-2055 GetHistoryForKey() returns timestamp"
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Apr 3, 2017
2 parents 835712f + b2f9d56 commit 2418dc5
Show file tree
Hide file tree
Showing 22 changed files with 530 additions and 347 deletions.
58 changes: 58 additions & 0 deletions core/chaincode/exectransaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,13 @@ func startTxSimulation(ctxt context.Context, chainID string) (context.Context, l
if err != nil {
return nil, nil, err
}
historyQueryExecutor, err := lgr.NewHistoryQueryExecutor()
if err != nil {
return nil, nil, err
}

ctxt = context.WithValue(ctxt, TXSimulatorKey, txsim)
ctxt = context.WithValue(ctxt, HistoryQueryExecutorKey, historyQueryExecutor)
return ctxt, txsim, nil
}

Expand Down Expand Up @@ -1137,6 +1142,59 @@ func TestQueries(t *testing.T) {
//Reset the query limit to default
viper.Set("ledger.state.queryLimit", 10000)

if ledgerconfig.IsHistoryDBEnabled() == true {

f = "put"
args = util.ToChaincodeArgs(f, "marble12", "{\"docType\":\"marble\",\"name\":\"marble12\",\"color\":\"red\",\"size\":30,\"owner\":\"jerry\"}")
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber)
nextBlockNumber++
if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", ccID, err)
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

f = "put"
args = util.ToChaincodeArgs(f, "marble12", "{\"docType\":\"marble\",\"name\":\"marble12\",\"color\":\"red\",\"size\":30,\"owner\":\"jerry\"}")
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber)
nextBlockNumber++
if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", ccID, err)
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

//The following history query for "marble12" should return 3 records
f = "history"
args = util.ToChaincodeArgs(f, "marble12")

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, retval, err := invoke(ctxt, chainID, spec, nextBlockNumber)
nextBlockNumber++
if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", ccID, err)
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

var history []interface{}
err = json.Unmarshal(retval, &history)

//default query limit of 10000 is used, query should return all records that meet the criteria
if len(history) != 3 {
t.Fail()
t.Logf("Error detected with the history query, should have returned 3 but returned %v", len(keys))
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

}

if ledgerconfig.IsCouchDBEnabled() == true {

//The following rich query for should return 9 marbles
Expand Down
203 changes: 88 additions & 115 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package chaincode

import (
"bytes"
"encoding/gob"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -193,7 +194,8 @@ func (handler *Handler) createTxContext(ctxt context.Context, chainID string, tx
if handler.txCtxs[txid] != nil {
return nil, fmt.Errorf("txid:%s exists", txid)
}
txctx := &transactionContext{chainID: chainID, signedProp: signedProp, proposal: prop, responseNotifier: make(chan *pb.ChaincodeMessage, 1),
txctx := &transactionContext{chainID: chainID, signedProp: signedProp,
proposal: prop, responseNotifier: make(chan *pb.ChaincodeMessage, 1),
queryIteratorMap: make(map[string]commonledger.ResultsIterator)}
handler.txCtxs[txid] = txctx
txctx.txsimulator = getTxSimulator(ctxt)
Expand Down Expand Up @@ -720,38 +722,20 @@ func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) {
}

handler.putQueryIterator(txContext, iterID, rangeIter)
var payload *pb.QueryResponse
payload, err = getQueryResponse(handler, txContext, rangeIter, iterID)

var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()
var qresult commonledger.QueryResult
for ; i < queryLimit; i++ {
qresult, err = rangeIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
kv := qresult.(*ledger.KV)
keyAndValue := pb.QueryStateKeyValue{Key: kv.Key, Value: kv.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}
if qresult != nil {
if err != nil {
rangeIter.Close()
handler.deleteQueryIterator(txContext, iterID)
//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}

payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
payloadBytes, err := proto.Marshal(payload)
var payloadBytes []byte
payloadBytes, err = proto.Marshal(payload)
if err != nil {
rangeIter.Close()
handler.deleteQueryIterator(txContext, iterID)
Expand All @@ -769,6 +753,60 @@ func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) {
}()
}

func getBytes(qresult interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(qresult)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

//getQueryResponse takes an iterator and fetch state to construct QueryResponse
func getQueryResponse(handler *Handler, txContext *transactionContext, iter commonledger.ResultsIterator,
iterID string) (*pb.QueryResponse, error) {

var i = 0
var err error
var queryLimit = ledgerconfig.GetQueryLimit()
var queryResult commonledger.QueryResult
var queryResultsBytes []*pb.QueryResultBytes

for ; i < queryLimit; i++ {
queryResult, err = iter.Next()
if err != nil {
return nil, err
}
if queryResult == nil {
break
}
var resultBytes []byte
resultBytes, err = getBytes(queryResult)
if err != nil {
return nil, err
}

qresultBytes := pb.QueryResultBytes{ResultBytes: resultBytes}
queryResultsBytes = append(queryResultsBytes, &qresultBytes)
}

if queryResult == nil {
iter.Close()
handler.deleteQueryIterator(txContext, iterID)
} else {
//TODO: remove this else part completely when paging design is implemented
//FAB-2462 - Re-introduce paging for range queries and rich queries
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
iter.Close()
handler.deleteQueryIterator(txContext, iterID)
}
//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryResponse{Data: data, HasMore: qresult != nil, Id: iterID}
return &pb.QueryResponse{Results: queryResultsBytes, HasMore: false, Id: iterID}, nil
}

// afterQueryStateNext handles a QUERY_STATE_NEXT request from the chaincode.
func (handler *Handler) afterQueryStateNext(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
Expand Down Expand Up @@ -824,39 +862,17 @@ func (handler *Handler) handleQueryStateNext(msg *pb.ChaincodeMessage) {
return
}

var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()

var qresult commonledger.QueryResult
var err error
for ; i < queryLimit; i++ {
qresult, err = queryIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult != nil {
break
}
kv := qresult.(*ledger.KV)
keyAndValue := pb.QueryStateKeyValue{Key: kv.Key, Value: kv.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}
payload, err := getQueryResponse(handler, txContext, queryIter, queryStateNext.Id)

if qresult != nil {
if err != nil {
queryIter.Close()
handler.deleteQueryIterator(txContext, queryStateNext.Id)

//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: queryStateNext.Id}
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: queryStateNext.Id}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
queryIter.Close()
Expand Down Expand Up @@ -927,7 +943,7 @@ func (handler *Handler) handleQueryStateClose(msg *pb.ChaincodeMessage) {
handler.deleteQueryIterator(txContext, queryStateClose.Id)
}

payload := &pb.QueryStateResponse{HasMore: false, Id: queryStateClose.Id}
payload := &pb.QueryResponse{HasMore: false, Id: queryStateClose.Id}
payloadBytes, err := proto.Marshal(payload)
if err != nil {

Expand Down Expand Up @@ -1010,48 +1026,27 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) {
}

handler.putQueryIterator(txContext, iterID, executeIter)
var payload *pb.QueryResponse
payload, err = getQueryResponse(handler, txContext, executeIter, iterID)

var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()
var qresult commonledger.QueryResult
for ; i < queryLimit; i++ {
qresult, err = executeIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
queryRecord := qresult.(*ledger.QueryRecord)
keyAndValue := pb.QueryStateKeyValue{Key: queryRecord.Key, Value: queryRecord.Record}
keysAndValues = append(keysAndValues, &keyAndValue)
}

if qresult != nil {
if err != nil {
executeIter.Close()
handler.deleteQueryIterator(txContext, iterID)

//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

var payloadBytes []byte

//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
payloadBytes, err = proto.Marshal(payload)
if err != nil {
executeIter.Close()
handler.deleteQueryIterator(txContext, iterID)

// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
chaincodeLogger.Errorf("Failed marshall response. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
Expand Down Expand Up @@ -1128,41 +1123,19 @@ func (handler *Handler) handleGetHistoryForKey(msg *pb.ChaincodeMessage) {

handler.putQueryIterator(txContext, iterID, historyIter)

// TODO QueryStateKeyValue can be re-used for now since history records have a string (TxID)
// and value (value). But we'll need to use another structure if we add other fields like timestamp.
var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()
var qresult commonledger.QueryResult
for ; i < queryLimit; i++ {
qresult, err = historyIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
queryRecord := qresult.(*ledger.KeyModification)
keyAndValue := pb.QueryStateKeyValue{Key: queryRecord.TxID, Value: queryRecord.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}
var payload *pb.QueryResponse
payload, err = getQueryResponse(handler, txContext, historyIter, iterID)

if qresult != nil {
if err != nil {
historyIter.Close()
handler.deleteQueryIterator(txContext, iterID)

//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

var payloadBytes []byte

//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
payloadBytes, err = proto.Marshal(payload)
if err != nil {
historyIter.Close()
Expand Down
Loading

0 comments on commit 2418dc5

Please sign in to comment.