Skip to content

Commit

Permalink
[FAB-204] Expose ledger rich query API to chaincode
Browse files Browse the repository at this point in the history
Adding a new chaincode API called ExecuteQuery() that
executes a given rich query on the state DB
(if it is supported).

For example, if CouchDB is the state DB, chaincode can
utilize ExecuteQuery() API to execute declarative JSON
query to find documents which satisfy given criteria.
http://docs.couchdb.org/en/2.0.0/api/database/find.html

Change-Id: I296d261c247badc679a7d5bfa25172a9e19ccf34
Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu committed Jan 26, 2017
1 parent dffcaf4 commit d5467f3
Show file tree
Hide file tree
Showing 12 changed files with 514 additions and 242 deletions.
2 changes: 1 addition & 1 deletion core/chaincode/chaincode_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (chaincodeSupport *ChaincodeSupport) deregisterHandler(chaincodehandler *Ha

// clean up rangeQueryIteratorMap
for _, context := range chaincodehandler.txCtxs {
for _, v := range context.rangeQueryIteratorMap {
for _, v := range context.queryIteratorMap {
v.Close()
}
}
Expand Down
58 changes: 54 additions & 4 deletions core/chaincode/exectransaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
putils "github.com/hyperledger/fabric/protos/utils"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/msp"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
Expand Down Expand Up @@ -855,9 +856,7 @@ func TestChaincodeInvokeChaincodeErrorCase(t *testing.T) {
}

// Test the invocation of a transaction.
func TestRangeQuery(t *testing.T) {
//TODO enable after ledger enables RangeQuery
t.Skip()
func TestQueries(t *testing.T) {

chainID := util.GetTestChainID()

Expand Down Expand Up @@ -891,8 +890,44 @@ func TestRangeQuery(t *testing.T) {
}

// Invoke second chaincode, which will inturn invoke the first chaincode
f = "put"
args = util.ToChaincodeArgs(f, "key1", "{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, _, err = invoke(ctxt, chainID, spec)

if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", chaincodeID, err)
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

f = "put"
args = util.ToChaincodeArgs(f, "key2", "{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, _, err = invoke(ctxt, chainID, spec)

if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", chaincodeID, err)
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

f = "put"
args = util.ToChaincodeArgs(f, "key3", "{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, _, err = invoke(ctxt, chainID, spec)

if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", chaincodeID, err)
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

f = "keys"
args = util.ToChaincodeArgs(f)
args = util.ToChaincodeArgs(f, "key0", "key3")

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, _, err = invoke(ctxt, chainID, spec)
Expand All @@ -903,6 +938,21 @@ func TestRangeQuery(t *testing.T) {
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

if ledgerconfig.IsCouchDBEnabled() == true {
f = "query"
args = util.ToChaincodeArgs(f, "{\"selector\":{\"currency\":\"USD\"}}")

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, _, err = invoke(ctxt, chainID, spec)

if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", chaincodeID, err)
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}
}
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
}

Expand Down
219 changes: 165 additions & 54 deletions core/chaincode/handler.go

Large diffs are not rendered by default.

33 changes: 21 additions & 12 deletions core/chaincode/shim/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,12 @@ func (stub *ChaincodeStub) DelState(key string) error {
return stub.handler.handleDelState(key, stub.TxID)
}

// StateRangeQueryIterator allows a chaincode to iterate over a range of
// StateQueryIterator allows a chaincode to iterate over a set of
// key/value pairs in the state.
type StateRangeQueryIterator struct {
type StateQueryIterator struct {
handler *Handler
uuid string
response *pb.RangeQueryStateResponse
response *pb.QueryStateResponse
currentLoc int
}

Expand All @@ -329,12 +329,21 @@ type StateRangeQueryIterator struct {
// an iterator will be returned that can be used to iterate over all keys
// between the startKey and endKey, inclusive. The order in which keys are
// returned by the iterator is random.
func (stub *ChaincodeStub) RangeQueryState(startKey, endKey string) (StateRangeQueryIteratorInterface, error) {
func (stub *ChaincodeStub) RangeQueryState(startKey, endKey string) (StateQueryIteratorInterface, error) {
response, err := stub.handler.handleRangeQueryState(startKey, endKey, stub.TxID)
if err != nil {
return nil, err
}
return &StateRangeQueryIterator{stub.handler, stub.TxID, response, 0}, nil
return &StateQueryIterator{stub.handler, stub.TxID, response, 0}, nil
}

func (stub *ChaincodeStub) ExecuteQuery(query string) (StateQueryIteratorInterface, error) {
response, err := stub.handler.handleExecuteQueryState(query, stub.TxID)
if err != nil {
return nil, err
}
return &StateQueryIterator{stub.handler, stub.TxID, response, 0}, nil

}

//Given a list of attributes, createCompositeKey function combines these attributes
Expand All @@ -359,11 +368,11 @@ func createCompositeKey(stub ChaincodeStubInterface, objectType string, attribut
//matches the given partial composite key. This function should be used only for
//a partial composite key. For a full composite key, an iter with empty response
//would be returned.
func (stub *ChaincodeStub) PartialCompositeKeyQuery(objectType string, attributes []string) (StateRangeQueryIteratorInterface, error) {
func (stub *ChaincodeStub) PartialCompositeKeyQuery(objectType string, attributes []string) (StateQueryIteratorInterface, error) {
return partialCompositeKeyQuery(stub, objectType, attributes)
}

func partialCompositeKeyQuery(stub ChaincodeStubInterface, objectType string, attributes []string) (StateRangeQueryIteratorInterface, error) {
func partialCompositeKeyQuery(stub ChaincodeStubInterface, objectType string, attributes []string) (StateQueryIteratorInterface, error) {
partialCompositeKey, _ := stub.CreateCompositeKey(objectType, attributes)
keysIter, err := stub.RangeQueryState(partialCompositeKey+"1", partialCompositeKey+":")
if err != nil {
Expand All @@ -374,23 +383,23 @@ func partialCompositeKeyQuery(stub ChaincodeStubInterface, objectType string, at

// HasNext returns true if the range query iterator contains additional keys
// and values.
func (iter *StateRangeQueryIterator) HasNext() bool {
func (iter *StateQueryIterator) HasNext() bool {
if iter.currentLoc < len(iter.response.KeysAndValues) || iter.response.HasMore {
return true
}
return false
}

// Next returns the next key and value in the range query iterator.
func (iter *StateRangeQueryIterator) Next() (string, []byte, error) {
func (iter *StateQueryIterator) Next() (string, []byte, error) {
if iter.currentLoc < len(iter.response.KeysAndValues) {
keyValue := iter.response.KeysAndValues[iter.currentLoc]
iter.currentLoc++
return keyValue.Key, keyValue.Value, nil
} else if !iter.response.HasMore {
return "", nil, errors.New("No such key")
} else {
response, err := iter.handler.handleRangeQueryStateNext(iter.response.ID, iter.uuid)
response, err := iter.handler.handleQueryStateNext(iter.response.ID, iter.uuid)

if err != nil {
return "", nil, err
Expand All @@ -407,8 +416,8 @@ func (iter *StateRangeQueryIterator) Next() (string, []byte, error) {

// Close closes the range query iterator. This should be called when done
// reading from the iterator to free up resources.
func (iter *StateRangeQueryIterator) Close() error {
_, err := iter.handler.handleRangeQueryStateClose(iter.response.ID, iter.uuid)
func (iter *StateQueryIterator) Close() error {
_, err := iter.handler.handleQueryStateClose(iter.response.ID, iter.uuid)
return err
}

Expand Down
84 changes: 66 additions & 18 deletions core/chaincode/shim/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (handler *Handler) handleDelState(key string, txid string) error {
return errors.New("Incorrect chaincode message received")
}

func (handler *Handler) handleRangeQueryState(startKey, endKey string, txid string) (*pb.RangeQueryStateResponse, error) {
func (handler *Handler) handleRangeQueryState(startKey, endKey string, txid string) (*pb.QueryStateResponse, error) {
// Create the channel on which to communicate the response from validating peer
respChan, uniqueReqErr := handler.createChannel(txid)
if uniqueReqErr != nil {
Expand Down Expand Up @@ -522,7 +522,7 @@ func (handler *Handler) handleRangeQueryState(startKey, endKey string, txid stri
// Success response
chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE)

rangeQueryResponse := &pb.RangeQueryStateResponse{}
rangeQueryResponse := &pb.QueryStateResponse{}
unmarshalErr := proto.Unmarshal(responseMsg.Payload, rangeQueryResponse)
if unmarshalErr != nil {
chaincodeLogger.Errorf("[%s]unmarshall error", shorttxid(responseMsg.Txid))
Expand All @@ -542,7 +542,7 @@ func (handler *Handler) handleRangeQueryState(startKey, endKey string, txid stri
return nil, errors.New("Incorrect chaincode message received")
}

func (handler *Handler) handleRangeQueryStateNext(id, txid string) (*pb.RangeQueryStateResponse, error) {
func (handler *Handler) handleQueryStateNext(id, txid string) (*pb.QueryStateResponse, error) {
// Create the channel on which to communicate the response from validating peer
respChan, uniqueReqErr := handler.createChannel(txid)
if uniqueReqErr != nil {
Expand All @@ -553,31 +553,31 @@ func (handler *Handler) handleRangeQueryStateNext(id, txid string) (*pb.RangeQue
defer handler.deleteChannel(txid)

// Send RANGE_QUERY_STATE_NEXT message to validator chaincode support
payload := &pb.RangeQueryStateNext{ID: id}
payload := &pb.QueryStateNext{ID: id}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
return nil, errors.New("Failed to process range query state next request")
}
msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT, Payload: payloadBytes, Txid: txid}
chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT)
msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_QUERY_STATE_NEXT, Payload: payloadBytes, Txid: txid}
chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_QUERY_STATE_NEXT)
responseMsg, err := handler.sendReceive(msg, respChan)
if err != nil {
chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT)
chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_QUERY_STATE_NEXT)
return nil, errors.New("could not send msg")
}

if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() {
// Success response
chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE)

rangeQueryResponse := &pb.RangeQueryStateResponse{}
unmarshalErr := proto.Unmarshal(responseMsg.Payload, rangeQueryResponse)
queryResponse := &pb.QueryStateResponse{}
unmarshalErr := proto.Unmarshal(responseMsg.Payload, queryResponse)
if unmarshalErr != nil {
chaincodeLogger.Errorf("[%s]unmarshall error", shorttxid(responseMsg.Txid))
return nil, errors.New("Error unmarshalling RangeQueryStateResponse.")
}

return rangeQueryResponse, nil
return queryResponse, nil
}
if responseMsg.Type.String() == pb.ChaincodeMessage_ERROR.String() {
// Error response
Expand All @@ -590,7 +590,7 @@ func (handler *Handler) handleRangeQueryStateNext(id, txid string) (*pb.RangeQue
return nil, errors.New("Incorrect chaincode message received")
}

func (handler *Handler) handleRangeQueryStateClose(id, txid string) (*pb.RangeQueryStateResponse, error) {
func (handler *Handler) handleQueryStateClose(id, txid string) (*pb.QueryStateResponse, error) {
// Create the channel on which to communicate the response from validating peer
respChan, uniqueReqErr := handler.createChannel(txid)
if uniqueReqErr != nil {
Expand All @@ -601,31 +601,79 @@ func (handler *Handler) handleRangeQueryStateClose(id, txid string) (*pb.RangeQu
defer handler.deleteChannel(txid)

// Send RANGE_QUERY_STATE_CLOSE message to validator chaincode support
payload := &pb.RangeQueryStateClose{ID: id}
payload := &pb.QueryStateClose{ID: id}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
return nil, errors.New("Failed to process range query state close request")
}
msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE, Payload: payloadBytes, Txid: txid}
chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE)
msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_QUERY_STATE_CLOSE, Payload: payloadBytes, Txid: txid}
chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_QUERY_STATE_CLOSE)
responseMsg, err := handler.sendReceive(msg, respChan)
if err != nil {
chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE)
chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_QUERY_STATE_CLOSE)
return nil, errors.New("could not send msg")
}

if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() {
// Success response
chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE)

rangeQueryResponse := &pb.RangeQueryStateResponse{}
unmarshalErr := proto.Unmarshal(responseMsg.Payload, rangeQueryResponse)
queryResponse := &pb.QueryStateResponse{}
unmarshalErr := proto.Unmarshal(responseMsg.Payload, queryResponse)
if unmarshalErr != nil {
chaincodeLogger.Errorf("[%s]unmarshall error", shorttxid(responseMsg.Txid))
return nil, errors.New("Error unmarshalling RangeQueryStateResponse.")
}

return rangeQueryResponse, nil
return queryResponse, nil
}
if responseMsg.Type.String() == pb.ChaincodeMessage_ERROR.String() {
// Error response
chaincodeLogger.Errorf("[%s]Received %s", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_ERROR)
return nil, errors.New(string(responseMsg.Payload[:]))
}

// Incorrect chaincode message received
chaincodeLogger.Errorf("Incorrect chaincode message %s recieved. Expecting %s or %s", responseMsg.Type, pb.ChaincodeMessage_RESPONSE, pb.ChaincodeMessage_ERROR)
return nil, errors.New("Incorrect chaincode message received")
}

func (handler *Handler) handleExecuteQueryState(query string, txid string) (*pb.QueryStateResponse, error) {
// Create the channel on which to communicate the response from validating peer
respChan, uniqueReqErr := handler.createChannel(txid)
if uniqueReqErr != nil {
chaincodeLogger.Debugf("[%s]Another state request pending for this Txid. Cannot process.", shorttxid(txid))
return nil, uniqueReqErr
}

defer handler.deleteChannel(txid)

// Send EXECUTE_QUERY_STATE message to validator chaincode support
payload := &pb.ExecuteQueryState{Query: query}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
return nil, errors.New("Failed to process range query state request")
}
msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_EXECUTE_QUERY_STATE, Payload: payloadBytes, Txid: txid}
chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_EXECUTE_QUERY_STATE)
responseMsg, err := handler.sendReceive(msg, respChan)
if err != nil {
chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_EXECUTE_QUERY_STATE)
return nil, errors.New("could not send msg")
}

if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() {
// Success response
chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE)

executeQueryResponse := &pb.QueryStateResponse{}
unmarshalErr := proto.Unmarshal(responseMsg.Payload, executeQueryResponse)
if unmarshalErr != nil {
chaincodeLogger.Errorf("[%s]unmarshall error", shorttxid(responseMsg.Txid))
return nil, errors.New("Error unmarshalling QueryStateResponse.")
}

return executeQueryResponse, nil
}
if responseMsg.Type.String() == pb.ChaincodeMessage_ERROR.String() {
// Error response
Expand Down
10 changes: 6 additions & 4 deletions core/chaincode/shim/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,22 @@ type ChaincodeStubInterface interface {
// an iterator will be returned that can be used to iterate over all keys
// between the startKey and endKey, inclusive. The order in which keys are
// returned by the iterator is random.
RangeQueryState(startKey, endKey string) (StateRangeQueryIteratorInterface, error)
RangeQueryState(startKey, endKey string) (StateQueryIteratorInterface, error)

//PartialCompositeKeyQuery function can be invoked by a chaincode to query the
//state based on a given partial composite key. This function returns an
//iterator which can be used to iterate over all composite keys whose prefix
//matches the given partial composite key. This function should be used only for
//a partial composite key. For a full composite key, an iter with empty response
//would be returned.
PartialCompositeKeyQuery(objectType string, keys []string) (StateRangeQueryIteratorInterface, error)
PartialCompositeKeyQuery(objectType string, keys []string) (StateQueryIteratorInterface, error)

//Given a list of attributes, createCompundKey function combines these attributes
//to form a composite key.
CreateCompositeKey(objectType string, attributes []string) (string, error)

ExecuteQuery(query string) (StateQueryIteratorInterface, error)

// GetCallerCertificate returns caller certificate
GetCallerCertificate() ([]byte, error)

Expand All @@ -104,9 +106,9 @@ type ChaincodeStubInterface interface {
SetEvent(name string, payload []byte) error
}

// StateRangeQueryIteratorInterface allows a chaincode to iterate over a range of
// StateQueryIteratorInterface allows a chaincode to iterate over a set of
// key/value pairs in the state.
type StateRangeQueryIteratorInterface interface {
type StateQueryIteratorInterface interface {

// HasNext returns true if the range query iterator contains additional keys
// and values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Map<String, String> rangeQueryState(String startKey, String endKey) {
*/
public Map<String, ByteString> rangeQueryRawState(String startKey, String endKey) {
Map<String, ByteString> map = new HashMap<>();
for (Chaincode.RangeQueryStateKeyValue mapping : handler.handleRangeQueryState(
for (Chaincode.QueryStateKeyValue mapping : handler.handleRangeQueryState(
startKey, endKey, uuid).getKeysAndValuesList()) {
map.put(mapping.getKey(), mapping.getValue());
}
Expand Down
Loading

0 comments on commit d5467f3

Please sign in to comment.