Skip to content

Commit

Permalink
Merge "[FAB-2223] GetHistoryForKey Chaincode API"
Browse files Browse the repository at this point in the history
  • Loading branch information
binhn authored and Gerrit Code Review committed Feb 14, 2017
2 parents ad986bb + e2bcb17 commit bb18dbe
Show file tree
Hide file tree
Showing 11 changed files with 415 additions and 129 deletions.
14 changes: 13 additions & 1 deletion core/chaincode/chaincode_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (

//TXSimulatorKey is used to attach ledger simulation context
TXSimulatorKey string = "txsimulatorkey"

//HistoryQueryExecutorKey is used to attach ledger history query executor context
HistoryQueryExecutorKey string = "historyqueryexecutorkey"
)

//this is basically the singleton that supports the
Expand All @@ -69,6 +72,15 @@ func getTxSimulator(context context.Context) ledger.TxSimulator {
return nil
}

//use this for ledger access and make sure HistoryQueryExecutor is being used
func getHistoryQueryExecutor(context context.Context) ledger.HistoryQueryExecutor {
if historyQueryExecutor, ok := context.Value(HistoryQueryExecutorKey).(ledger.HistoryQueryExecutor); ok {
return historyQueryExecutor
}
//chaincode will not allow state operations
return nil
}

//
//chaincode runtime environment encapsulates handler and container environment
//This is where the VM that's running the chaincode would hook in
Expand Down Expand Up @@ -248,7 +260,7 @@ func (chaincodeSupport *ChaincodeSupport) registerHandler(chaincodehandler *Hand

func (chaincodeSupport *ChaincodeSupport) deregisterHandler(chaincodehandler *Handler) error {

// clean up rangeQueryIteratorMap
// clean up queryIteratorMap
for _, context := range chaincodehandler.txCtxs {
for _, v := range context.queryIteratorMap {
v.Close()
Expand Down
148 changes: 133 additions & 15 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type transactionContext struct {
// tracks open iterators used for range queries
queryIteratorMap map[string]commonledger.ResultsIterator

txsimulator ledger.TxSimulator
txsimulator ledger.TxSimulator
historyQueryExecutor ledger.HistoryQueryExecutor
}

type nextStateInfo struct {
Expand Down Expand Up @@ -194,6 +195,7 @@ func (handler *Handler) createTxContext(ctxt context.Context, chainID string, tx
queryIteratorMap: make(map[string]commonledger.ResultsIterator)}
handler.txCtxs[txid] = txctx
txctx.txsimulator = getTxSimulator(ctxt)
txctx.historyQueryExecutor = getHistoryQueryExecutor(ctxt)

return txctx, nil
}
Expand Down Expand Up @@ -417,6 +419,7 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre
{Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_GET_STATE_BY_RANGE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_GET_QUERY_RESULT.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_GET_HISTORY_FOR_KEY.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_QUERY_STATE_NEXT.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_QUERY_STATE_CLOSE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{readystate}, Dst: readystate},
Expand All @@ -425,19 +428,20 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre
{Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{readystate}, Dst: readystate},
},
fsm.Callbacks{
"before_" + pb.ChaincodeMessage_REGISTER.String(): func(e *fsm.Event) { v.beforeRegisterEvent(e, v.FSM.Current()) },
"before_" + pb.ChaincodeMessage_COMPLETED.String(): func(e *fsm.Event) { v.beforeCompletedEvent(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_STATE.String(): func(e *fsm.Event) { v.afterGetState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_STATE_BY_RANGE.String(): func(e *fsm.Event) { v.afterGetStateByRange(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_QUERY_RESULT.String(): func(e *fsm.Event) { v.afterGetQueryResult(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_QUERY_STATE_NEXT.String(): func(e *fsm.Event) { v.afterQueryStateNext(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_QUERY_STATE_CLOSE.String(): func(e *fsm.Event) { v.afterQueryStateClose(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_PUT_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_DEL_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_INVOKE_CHAINCODE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"enter_" + establishedstate: func(e *fsm.Event) { v.enterEstablishedState(e, v.FSM.Current()) },
"enter_" + readystate: func(e *fsm.Event) { v.enterReadyState(e, v.FSM.Current()) },
"enter_" + endstate: func(e *fsm.Event) { v.enterEndState(e, v.FSM.Current()) },
"before_" + pb.ChaincodeMessage_REGISTER.String(): func(e *fsm.Event) { v.beforeRegisterEvent(e, v.FSM.Current()) },
"before_" + pb.ChaincodeMessage_COMPLETED.String(): func(e *fsm.Event) { v.beforeCompletedEvent(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_STATE.String(): func(e *fsm.Event) { v.afterGetState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_STATE_BY_RANGE.String(): func(e *fsm.Event) { v.afterGetStateByRange(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_QUERY_RESULT.String(): func(e *fsm.Event) { v.afterGetQueryResult(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_HISTORY_FOR_KEY.String(): func(e *fsm.Event) { v.afterGetHistoryForKey(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_QUERY_STATE_NEXT.String(): func(e *fsm.Event) { v.afterQueryStateNext(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_QUERY_STATE_CLOSE.String(): func(e *fsm.Event) { v.afterQueryStateClose(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_PUT_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_DEL_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_INVOKE_CHAINCODE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"enter_" + establishedstate: func(e *fsm.Event) { v.enterEstablishedState(e, v.FSM.Current()) },
"enter_" + readystate: func(e *fsm.Event) { v.enterReadyState(e, v.FSM.Current()) },
"enter_" + endstate: func(e *fsm.Event) { v.enterEndState(e, v.FSM.Current()) },
},
)

Expand Down Expand Up @@ -968,7 +972,7 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) {
if err != nil {
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get ledger scan iterator. Sending %s", pb.ChaincodeMessage_ERROR)
chaincodeLogger.Errorf("Failed to get ledger query iterator. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
Expand Down Expand Up @@ -1017,6 +1021,118 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) {
}()
}

const maxGetHistoryForKeyLimit = 100

// afterGetHistoryForKey handles a GET_HISTORY_FOR_KEY request from the chaincode.
func (handler *Handler) afterGetHistoryForKey(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("Received %s, invoking get state from ledger", pb.ChaincodeMessage_GET_HISTORY_FOR_KEY)

// Query ledger history db
handler.handleGetHistoryForKey(msg)
chaincodeLogger.Debug("Exiting GET_HISTORY_FOR_KEY")
}

// Handles query to ledger history db
func (handler *Handler) handleGetHistoryForKey(msg *pb.ChaincodeMessage) {
// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
// is completed before the next one is triggered. The previous state transition is deemed complete only when
// the afterQueryState function is exited. Interesting bug fix!!
go func() {
// Check if this is the unique state request from this chaincode txid
uniqueReq := handler.createTXIDEntry(msg.Txid)
if !uniqueReq {
// Drop this request
chaincodeLogger.Error("Another state request pending for this Txid. Cannot process.")
return
}

var serialSendMsg *pb.ChaincodeMessage

defer func() {
handler.deleteTXIDEntry(msg.Txid)
chaincodeLogger.Debugf("[%s]handleGetHistoryForKey serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
handler.serialSendAsync(serialSendMsg, nil)
}()

getHistoryForKey := &pb.GetHistoryForKey{}
unmarshalErr := proto.Unmarshal(msg.Payload, getHistoryForKey)
if unmarshalErr != nil {
payload := []byte(unmarshalErr.Error())
chaincodeLogger.Errorf("Failed to unmarshall query request. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

iterID := util.GenerateUUID()

var txContext *transactionContext

txContext, serialSendMsg = handler.isValidTxSim(msg.Txid, "[%s]No ledger context for GetHistoryForKey. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR)
if txContext == nil {
return
}
chaincodeID := handler.getCCRootName()

historyIter, err := txContext.historyQueryExecutor.GetHistoryForKey(chaincodeID, getHistoryForKey.Key)
if err != nil {
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get ledger history iterator. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

handler.putQueryIterator(txContext, iterID, historyIter)

// TODO QueryStateKeyValue can be re-used for now since history records have a string (TxID)
// and value (value). But we'll need to use another structure if we add other fields like timestamp.
var keysAndValues []*pb.QueryStateKeyValue
var i = uint32(0)
var qresult commonledger.QueryResult
for ; i < maxGetHistoryForKeyLimit; i++ {
qresult, err = historyIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
queryRecord := qresult.(*ledger.KeyModification)
keyAndValue := pb.QueryStateKeyValue{Key: queryRecord.TxID, Value: queryRecord.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}

if qresult != nil {
historyIter.Close()
handler.deleteQueryIterator(txContext, iterID)
}

var payloadBytes []byte
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
payloadBytes, err = proto.Marshal(payload)
if err != nil {
historyIter.Close()
handler.deleteQueryIterator(txContext, iterID)

// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

chaincodeLogger.Debugf("Got keys and values. Sending %s", pb.ChaincodeMessage_RESPONSE)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid}

}()
}

// afterPutState handles a PUT_STATE request from the chaincode.
func (handler *Handler) afterPutState(e *fsm.Event, state string) {
_, ok := e.Args[0].(*pb.ChaincodeMessage)
Expand Down Expand Up @@ -1141,6 +1257,7 @@ func (handler *Handler) enterBusyState(e *fsm.Event, state string) {
// We grab the called channel's ledger simulator to hold the new state
ctxt := context.Background()
txsim := txContext.txsimulator
historyQueryExecutor := txContext.historyQueryExecutor
if calledCcParts.suffix != txContext.chainID {
lgr := peer.GetLedger(calledCcParts.suffix)
if lgr == nil {
Expand All @@ -1159,6 +1276,7 @@ func (handler *Handler) enterBusyState(e *fsm.Event, state string) {
txsim = txsim2
}
ctxt = context.WithValue(ctxt, TXSimulatorKey, txsim)
ctxt = context.WithValue(ctxt, HistoryQueryExecutorKey, historyQueryExecutor)

if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s] calling lccc to get chaincode data for %s on channel %s",
Expand Down
9 changes: 9 additions & 0 deletions core/chaincode/shim/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,16 @@ func (stub *ChaincodeStub) GetQueryResult(query string) (StateQueryIteratorInter
return nil, err
}
return &StateQueryIterator{stub.handler, stub.TxID, response, 0}, nil
}

// GetHistoryForKey function can be invoked by a chaincode to return a history of
// key values across time. GetHistoryForKey is intended to be used for read-only queries.
func (stub *ChaincodeStub) GetHistoryForKey(key string) (StateQueryIteratorInterface, error) {
response, err := stub.handler.handleGetHistoryForKey(key, stub.TxID)
if err != nil {
return nil, err
}
return &StateQueryIterator{stub.handler, stub.TxID, response, 0}, nil
}

//CreateCompositeKey combines the given attributes to form a composite key.
Expand Down
48 changes: 48 additions & 0 deletions core/chaincode/shim/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,54 @@ func (handler *Handler) handleGetQueryResult(query string, txid string) (*pb.Que
return nil, errors.New("Incorrect chaincode message received")
}

func (handler *Handler) handleGetHistoryForKey(key string, txid string) (*pb.QueryStateResponse, error) {
// Create the channel on which to communicate the response from validating peer
respChan, uniqueReqErr := handler.createChannel(txid)
if uniqueReqErr != nil {
chaincodeLogger.Debugf("[%s]Another state request pending for this Txid. Cannot process.", shorttxid(txid))
return nil, uniqueReqErr
}

defer handler.deleteChannel(txid)

// Send GET_HISTORY_FOR_KEY message to validator chaincode support
payload := &pb.GetHistoryForKey{Key: key}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
return nil, errors.New("Failed to process query state request")
}
msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_GET_HISTORY_FOR_KEY, Payload: payloadBytes, Txid: txid}
chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_GET_HISTORY_FOR_KEY)
responseMsg, err := handler.sendReceive(msg, respChan)
if err != nil {
chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_GET_HISTORY_FOR_KEY)
return nil, errors.New("could not send msg")
}

if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() {
// Success response
chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE)

getHistoryForKeyResponse := &pb.QueryStateResponse{}
unmarshalErr := proto.Unmarshal(responseMsg.Payload, getHistoryForKeyResponse)
if unmarshalErr != nil {
chaincodeLogger.Errorf("[%s]unmarshall error", shorttxid(responseMsg.Txid))
return nil, errors.New("Error unmarshalling QueryStateResponse.")
}

return getHistoryForKeyResponse, nil
}
if responseMsg.Type.String() == pb.ChaincodeMessage_ERROR.String() {
// Error response
chaincodeLogger.Errorf("[%s]Received %s", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_ERROR)
return nil, errors.New(string(responseMsg.Payload[:]))
}

// Incorrect chaincode message received
chaincodeLogger.Errorf("Incorrect chaincode message %s recieved. Expecting %s or %s", responseMsg.Type, pb.ChaincodeMessage_RESPONSE, pb.ChaincodeMessage_ERROR)
return nil, errors.New("Incorrect chaincode message received")
}

// handleInvokeChaincode communicates with the validator to invoke another chaincode.
func (handler *Handler) handleInvokeChaincode(chaincodeName string, args [][]byte, txid string) pb.Response {
chaincodeID := &pb.ChaincodeID{Name: chaincodeName}
Expand Down
4 changes: 4 additions & 0 deletions core/chaincode/shim/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ type ChaincodeStubInterface interface {
// the query result set
GetQueryResult(query string) (StateQueryIteratorInterface, error)

// GetHistoryForKey function can be invoked by a chaincode to return a history of
// key values across time. GetHistoryForKey is intended to be used for read-only queries.
GetHistoryForKey(key string) (StateQueryIteratorInterface, error)

// GetCallerCertificate returns caller certificate
GetCallerCertificate() ([]byte, error)

Expand Down
6 changes: 6 additions & 0 deletions core/chaincode/shim/mockstub.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ func (stub *MockStub) GetQueryResult(query string) (StateQueryIteratorInterface,
return nil, errors.New("Not Implemented")
}

// GetHistoryForKey function can be invoked by a chaincode to return a history of
// key values across time. GetHistoryForKey is intended to be used for read-only queries.
func (stub *MockStub) GetHistoryForKey(key string) (StateQueryIteratorInterface, error) {
return nil, errors.New("Not Implemented")
}

//GetStateByPartialCompositeKey function can be invoked by a chaincode to query the
//state based on a given partial composite key. This function returns an
//iterator which can be used to iterate over all composite keys whose prefix
Expand Down
18 changes: 18 additions & 0 deletions core/endorser/endorser.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (*Endorser) getTxSimulator(ledgername string) (ledger.TxSimulator, error) {
return lgr.NewTxSimulator()
}

func (*Endorser) getHistoryQueryExecutor(ledgername string) (ledger.HistoryQueryExecutor, error) {
lgr := peer.GetLedger(ledgername)
if lgr == nil {
return nil, fmt.Errorf("chain does not exist(%s)", ledgername)
}
return lgr.NewHistoryQueryExecutor()
}

//call specified chaincode (system or user)
func (e *Endorser) callChaincode(ctxt context.Context, chainID string, version string, txid string, prop *pb.Proposal, cis *pb.ChaincodeInvocationSpec, cid *pb.ChaincodeID, txsim ledger.TxSimulator) (*pb.Response, *pb.ChaincodeEvent, error) {
var err error
Expand Down Expand Up @@ -292,11 +300,21 @@ func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedPro

// obtaining once the tx simulator for this proposal. This will be nil
// for chainless proposals
// Also obtain a history query executor for history queries, since tx simulator does not cover history
var txsim ledger.TxSimulator
var historyQueryExecutor ledger.HistoryQueryExecutor
if chainID != "" {
if txsim, err = e.getTxSimulator(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, err
}
if historyQueryExecutor, err = e.getHistoryQueryExecutor(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, err
}
// Add the historyQueryExecutor to context
// TODO shouldn't we also add txsim to context here as well? Rather than passing txsim parameter
// around separately, since eventually it gets added to context anyways
ctx = context.WithValue(ctx, chaincode.HistoryQueryExecutorKey, historyQueryExecutor)

defer txsim.Done()
}
//this could be a request to a chainless SysCC
Expand Down
Loading

0 comments on commit bb18dbe

Please sign in to comment.