Skip to content

Commit

Permalink
[FAB-2223] GetHistoryForKey Chaincode API
Browse files Browse the repository at this point in the history
Expose GetHistoryForKey ledger API to chaincode.

HistoryQueryExecutor utilizes a history database separate from
state database, therefore it is made available outside the normal
TxSimulator to minimize impact to state transactions (e.g. reduce
lock times).

HistoryQueryExecutor is made available to chaincode context, supporting
GetHistoryForKey function calls from chaincode. Similar to
GetQueryResult, existing chaincode iterator and Next support are re-used.

marbles02 getHistoryForKey example is delivered. Call chaincode function
as follows:
peer chaincode invoke -C myc1 -n marbles -c
       '{"Args":["getHistoryForMarble","marble1"]}'

Additional unit tests will be delivered next.

Change-Id: Iee4dd92b27deb7c42cb83991fb6a3b924edd1e63
Signed-off-by: denyeart <enyeart@us.ibm.com>
  • Loading branch information
denyeart committed Feb 14, 2017
1 parent 5090331 commit e2bcb17
Show file tree
Hide file tree
Showing 11 changed files with 415 additions and 129 deletions.
14 changes: 13 additions & 1 deletion core/chaincode/chaincode_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (

//TXSimulatorKey is used to attach ledger simulation context
TXSimulatorKey string = "txsimulatorkey"

//HistoryQueryExecutorKey is used to attach ledger history query executor context
HistoryQueryExecutorKey string = "historyqueryexecutorkey"
)

//this is basically the singleton that supports the
Expand All @@ -69,6 +72,15 @@ func getTxSimulator(context context.Context) ledger.TxSimulator {
return nil
}

//use this for ledger access and make sure HistoryQueryExecutor is being used
func getHistoryQueryExecutor(context context.Context) ledger.HistoryQueryExecutor {
if historyQueryExecutor, ok := context.Value(HistoryQueryExecutorKey).(ledger.HistoryQueryExecutor); ok {
return historyQueryExecutor
}
//chaincode will not allow state operations
return nil
}

//
//chaincode runtime environment encapsulates handler and container environment
//This is where the VM that's running the chaincode would hook in
Expand Down Expand Up @@ -248,7 +260,7 @@ func (chaincodeSupport *ChaincodeSupport) registerHandler(chaincodehandler *Hand

func (chaincodeSupport *ChaincodeSupport) deregisterHandler(chaincodehandler *Handler) error {

// clean up rangeQueryIteratorMap
// clean up queryIteratorMap
for _, context := range chaincodehandler.txCtxs {
for _, v := range context.queryIteratorMap {
v.Close()
Expand Down
148 changes: 133 additions & 15 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type transactionContext struct {
// tracks open iterators used for range queries
queryIteratorMap map[string]commonledger.ResultsIterator

txsimulator ledger.TxSimulator
txsimulator ledger.TxSimulator
historyQueryExecutor ledger.HistoryQueryExecutor
}

type nextStateInfo struct {
Expand Down Expand Up @@ -194,6 +195,7 @@ func (handler *Handler) createTxContext(ctxt context.Context, chainID string, tx
queryIteratorMap: make(map[string]commonledger.ResultsIterator)}
handler.txCtxs[txid] = txctx
txctx.txsimulator = getTxSimulator(ctxt)
txctx.historyQueryExecutor = getHistoryQueryExecutor(ctxt)

return txctx, nil
}
Expand Down Expand Up @@ -417,6 +419,7 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre
{Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_GET_STATE_BY_RANGE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_GET_QUERY_RESULT.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_GET_HISTORY_FOR_KEY.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_QUERY_STATE_NEXT.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_QUERY_STATE_CLOSE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{readystate}, Dst: readystate},
Expand All @@ -425,19 +428,20 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre
{Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{readystate}, Dst: readystate},
},
fsm.Callbacks{
"before_" + pb.ChaincodeMessage_REGISTER.String(): func(e *fsm.Event) { v.beforeRegisterEvent(e, v.FSM.Current()) },
"before_" + pb.ChaincodeMessage_COMPLETED.String(): func(e *fsm.Event) { v.beforeCompletedEvent(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_STATE.String(): func(e *fsm.Event) { v.afterGetState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_STATE_BY_RANGE.String(): func(e *fsm.Event) { v.afterGetStateByRange(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_QUERY_RESULT.String(): func(e *fsm.Event) { v.afterGetQueryResult(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_QUERY_STATE_NEXT.String(): func(e *fsm.Event) { v.afterQueryStateNext(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_QUERY_STATE_CLOSE.String(): func(e *fsm.Event) { v.afterQueryStateClose(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_PUT_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_DEL_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_INVOKE_CHAINCODE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"enter_" + establishedstate: func(e *fsm.Event) { v.enterEstablishedState(e, v.FSM.Current()) },
"enter_" + readystate: func(e *fsm.Event) { v.enterReadyState(e, v.FSM.Current()) },
"enter_" + endstate: func(e *fsm.Event) { v.enterEndState(e, v.FSM.Current()) },
"before_" + pb.ChaincodeMessage_REGISTER.String(): func(e *fsm.Event) { v.beforeRegisterEvent(e, v.FSM.Current()) },
"before_" + pb.ChaincodeMessage_COMPLETED.String(): func(e *fsm.Event) { v.beforeCompletedEvent(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_STATE.String(): func(e *fsm.Event) { v.afterGetState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_STATE_BY_RANGE.String(): func(e *fsm.Event) { v.afterGetStateByRange(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_QUERY_RESULT.String(): func(e *fsm.Event) { v.afterGetQueryResult(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_HISTORY_FOR_KEY.String(): func(e *fsm.Event) { v.afterGetHistoryForKey(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_QUERY_STATE_NEXT.String(): func(e *fsm.Event) { v.afterQueryStateNext(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_QUERY_STATE_CLOSE.String(): func(e *fsm.Event) { v.afterQueryStateClose(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_PUT_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_DEL_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_INVOKE_CHAINCODE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"enter_" + establishedstate: func(e *fsm.Event) { v.enterEstablishedState(e, v.FSM.Current()) },
"enter_" + readystate: func(e *fsm.Event) { v.enterReadyState(e, v.FSM.Current()) },
"enter_" + endstate: func(e *fsm.Event) { v.enterEndState(e, v.FSM.Current()) },
},
)

Expand Down Expand Up @@ -968,7 +972,7 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) {
if err != nil {
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get ledger scan iterator. Sending %s", pb.ChaincodeMessage_ERROR)
chaincodeLogger.Errorf("Failed to get ledger query iterator. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
Expand Down Expand Up @@ -1017,6 +1021,118 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) {
}()
}

const maxGetHistoryForKeyLimit = 100

// afterGetHistoryForKey handles a GET_HISTORY_FOR_KEY request from the chaincode.
func (handler *Handler) afterGetHistoryForKey(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("Received %s, invoking get state from ledger", pb.ChaincodeMessage_GET_HISTORY_FOR_KEY)

// Query ledger history db
handler.handleGetHistoryForKey(msg)
chaincodeLogger.Debug("Exiting GET_HISTORY_FOR_KEY")
}

// Handles query to ledger history db
func (handler *Handler) handleGetHistoryForKey(msg *pb.ChaincodeMessage) {
// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
// is completed before the next one is triggered. The previous state transition is deemed complete only when
// the afterQueryState function is exited. Interesting bug fix!!
go func() {
// Check if this is the unique state request from this chaincode txid
uniqueReq := handler.createTXIDEntry(msg.Txid)
if !uniqueReq {
// Drop this request
chaincodeLogger.Error("Another state request pending for this Txid. Cannot process.")
return
}

var serialSendMsg *pb.ChaincodeMessage

defer func() {
handler.deleteTXIDEntry(msg.Txid)
chaincodeLogger.Debugf("[%s]handleGetHistoryForKey serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
handler.serialSendAsync(serialSendMsg, nil)
}()

getHistoryForKey := &pb.GetHistoryForKey{}
unmarshalErr := proto.Unmarshal(msg.Payload, getHistoryForKey)
if unmarshalErr != nil {
payload := []byte(unmarshalErr.Error())
chaincodeLogger.Errorf("Failed to unmarshall query request. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

iterID := util.GenerateUUID()

var txContext *transactionContext

txContext, serialSendMsg = handler.isValidTxSim(msg.Txid, "[%s]No ledger context for GetHistoryForKey. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR)
if txContext == nil {
return
}
chaincodeID := handler.getCCRootName()

historyIter, err := txContext.historyQueryExecutor.GetHistoryForKey(chaincodeID, getHistoryForKey.Key)
if err != nil {
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get ledger history iterator. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

handler.putQueryIterator(txContext, iterID, historyIter)

// TODO QueryStateKeyValue can be re-used for now since history records have a string (TxID)
// and value (value). But we'll need to use another structure if we add other fields like timestamp.
var keysAndValues []*pb.QueryStateKeyValue
var i = uint32(0)
var qresult commonledger.QueryResult
for ; i < maxGetHistoryForKeyLimit; i++ {
qresult, err = historyIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
queryRecord := qresult.(*ledger.KeyModification)
keyAndValue := pb.QueryStateKeyValue{Key: queryRecord.TxID, Value: queryRecord.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}

if qresult != nil {
historyIter.Close()
handler.deleteQueryIterator(txContext, iterID)
}

var payloadBytes []byte
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
payloadBytes, err = proto.Marshal(payload)
if err != nil {
historyIter.Close()
handler.deleteQueryIterator(txContext, iterID)

// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

chaincodeLogger.Debugf("Got keys and values. Sending %s", pb.ChaincodeMessage_RESPONSE)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid}

}()
}

// afterPutState handles a PUT_STATE request from the chaincode.
func (handler *Handler) afterPutState(e *fsm.Event, state string) {
_, ok := e.Args[0].(*pb.ChaincodeMessage)
Expand Down Expand Up @@ -1141,6 +1257,7 @@ func (handler *Handler) enterBusyState(e *fsm.Event, state string) {
// We grab the called channel's ledger simulator to hold the new state
ctxt := context.Background()
txsim := txContext.txsimulator
historyQueryExecutor := txContext.historyQueryExecutor
if calledCcParts.suffix != txContext.chainID {
lgr := peer.GetLedger(calledCcParts.suffix)
if lgr == nil {
Expand All @@ -1159,6 +1276,7 @@ func (handler *Handler) enterBusyState(e *fsm.Event, state string) {
txsim = txsim2
}
ctxt = context.WithValue(ctxt, TXSimulatorKey, txsim)
ctxt = context.WithValue(ctxt, HistoryQueryExecutorKey, historyQueryExecutor)

if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s] calling lccc to get chaincode data for %s on channel %s",
Expand Down
9 changes: 9 additions & 0 deletions core/chaincode/shim/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,16 @@ func (stub *ChaincodeStub) GetQueryResult(query string) (StateQueryIteratorInter
return nil, err
}
return &StateQueryIterator{stub.handler, stub.TxID, response, 0}, nil
}

// GetHistoryForKey function can be invoked by a chaincode to return a history of
// key values across time. GetHistoryForKey is intended to be used for read-only queries.
func (stub *ChaincodeStub) GetHistoryForKey(key string) (StateQueryIteratorInterface, error) {
response, err := stub.handler.handleGetHistoryForKey(key, stub.TxID)
if err != nil {
return nil, err
}
return &StateQueryIterator{stub.handler, stub.TxID, response, 0}, nil
}

//CreateCompositeKey combines the given attributes to form a composite key.
Expand Down
48 changes: 48 additions & 0 deletions core/chaincode/shim/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,54 @@ func (handler *Handler) handleGetQueryResult(query string, txid string) (*pb.Que
return nil, errors.New("Incorrect chaincode message received")
}

func (handler *Handler) handleGetHistoryForKey(key string, txid string) (*pb.QueryStateResponse, error) {
// Create the channel on which to communicate the response from validating peer
respChan, uniqueReqErr := handler.createChannel(txid)
if uniqueReqErr != nil {
chaincodeLogger.Debugf("[%s]Another state request pending for this Txid. Cannot process.", shorttxid(txid))
return nil, uniqueReqErr
}

defer handler.deleteChannel(txid)

// Send GET_HISTORY_FOR_KEY message to validator chaincode support
payload := &pb.GetHistoryForKey{Key: key}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
return nil, errors.New("Failed to process query state request")
}
msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_GET_HISTORY_FOR_KEY, Payload: payloadBytes, Txid: txid}
chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_GET_HISTORY_FOR_KEY)
responseMsg, err := handler.sendReceive(msg, respChan)
if err != nil {
chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_GET_HISTORY_FOR_KEY)
return nil, errors.New("could not send msg")
}

if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() {
// Success response
chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE)

getHistoryForKeyResponse := &pb.QueryStateResponse{}
unmarshalErr := proto.Unmarshal(responseMsg.Payload, getHistoryForKeyResponse)
if unmarshalErr != nil {
chaincodeLogger.Errorf("[%s]unmarshall error", shorttxid(responseMsg.Txid))
return nil, errors.New("Error unmarshalling QueryStateResponse.")
}

return getHistoryForKeyResponse, nil
}
if responseMsg.Type.String() == pb.ChaincodeMessage_ERROR.String() {
// Error response
chaincodeLogger.Errorf("[%s]Received %s", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_ERROR)
return nil, errors.New(string(responseMsg.Payload[:]))
}

// Incorrect chaincode message received
chaincodeLogger.Errorf("Incorrect chaincode message %s recieved. Expecting %s or %s", responseMsg.Type, pb.ChaincodeMessage_RESPONSE, pb.ChaincodeMessage_ERROR)
return nil, errors.New("Incorrect chaincode message received")
}

// handleInvokeChaincode communicates with the validator to invoke another chaincode.
func (handler *Handler) handleInvokeChaincode(chaincodeName string, args [][]byte, txid string) pb.Response {
chaincodeID := &pb.ChaincodeID{Name: chaincodeName}
Expand Down
4 changes: 4 additions & 0 deletions core/chaincode/shim/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ type ChaincodeStubInterface interface {
// the query result set
GetQueryResult(query string) (StateQueryIteratorInterface, error)

// GetHistoryForKey function can be invoked by a chaincode to return a history of
// key values across time. GetHistoryForKey is intended to be used for read-only queries.
GetHistoryForKey(key string) (StateQueryIteratorInterface, error)

// GetCallerCertificate returns caller certificate
GetCallerCertificate() ([]byte, error)

Expand Down
6 changes: 6 additions & 0 deletions core/chaincode/shim/mockstub.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ func (stub *MockStub) GetQueryResult(query string) (StateQueryIteratorInterface,
return nil, errors.New("Not Implemented")
}

// GetHistoryForKey function can be invoked by a chaincode to return a history of
// key values across time. GetHistoryForKey is intended to be used for read-only queries.
func (stub *MockStub) GetHistoryForKey(key string) (StateQueryIteratorInterface, error) {
return nil, errors.New("Not Implemented")
}

//GetStateByPartialCompositeKey function can be invoked by a chaincode to query the
//state based on a given partial composite key. This function returns an
//iterator which can be used to iterate over all composite keys whose prefix
Expand Down
18 changes: 18 additions & 0 deletions core/endorser/endorser.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (*Endorser) getTxSimulator(ledgername string) (ledger.TxSimulator, error) {
return lgr.NewTxSimulator()
}

func (*Endorser) getHistoryQueryExecutor(ledgername string) (ledger.HistoryQueryExecutor, error) {
lgr := peer.GetLedger(ledgername)
if lgr == nil {
return nil, fmt.Errorf("chain does not exist(%s)", ledgername)
}
return lgr.NewHistoryQueryExecutor()
}

//call specified chaincode (system or user)
func (e *Endorser) callChaincode(ctxt context.Context, chainID string, version string, txid string, prop *pb.Proposal, cis *pb.ChaincodeInvocationSpec, cid *pb.ChaincodeID, txsim ledger.TxSimulator) (*pb.Response, *pb.ChaincodeEvent, error) {
var err error
Expand Down Expand Up @@ -298,11 +306,21 @@ func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedPro

// obtaining once the tx simulator for this proposal. This will be nil
// for chainless proposals
// Also obtain a history query executor for history queries, since tx simulator does not cover history
var txsim ledger.TxSimulator
var historyQueryExecutor ledger.HistoryQueryExecutor
if chainID != "" {
if txsim, err = e.getTxSimulator(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, err
}
if historyQueryExecutor, err = e.getHistoryQueryExecutor(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, err
}
// Add the historyQueryExecutor to context
// TODO shouldn't we also add txsim to context here as well? Rather than passing txsim parameter
// around separately, since eventually it gets added to context anyways
ctx = context.WithValue(ctx, chaincode.HistoryQueryExecutorKey, historyQueryExecutor)

defer txsim.Done()
}
//this could be a request to a chainless SysCC
Expand Down
Loading

0 comments on commit e2bcb17

Please sign in to comment.