Skip to content

Commit

Permalink
Fixes #5001 and gossip panic when high concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: David VIEJO <dviejo@kungfusoftware.es>
  • Loading branch information
dviejokfs committed Dec 17, 2024
1 parent 0501b0d commit e8139fc
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 49 deletions.
71 changes: 35 additions & 36 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func (h *Handler) notifyRegistry(err error) {
h.Registry.Ready(h.chaincodeID)
}

// HandleRegister is invoked when chaincode tries to register.
// handleRegister is invoked when chaincode tries to register.
func (h *Handler) HandleRegister(msg *pb.ChaincodeMessage) {
h.stateLock.RLock()
state := h.state
Expand Down Expand Up @@ -746,55 +746,54 @@ func (h *Handler) HandleGetStateByRange(msg *pb.ChaincodeMessage, txContext *Tra

totalReturnLimit := h.calculateTotalReturnLimit(metadata)
iterID := h.UUIDGenerator.New()

var rangeIter commonledger.ResultsIterator
isPaginated := false
namespaceID := txContext.NamespaceID
collection := getStateByRange.Collection
if isCollectionSet(collection) {
if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
}
if err := errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
return nil, err
}
rangeIter, err = txContext.TXSimulator.GetPrivateDataRangeScanIterator(namespaceID, collection,
getStateByRange.StartKey, getStateByRange.EndKey)
} else if isMetadataSetForPagination(metadata) {
isPaginated = true
startKey := getStateByRange.StartKey
if isMetadataSetForPagination(metadata) {

// Wrap the iterator creation in a recover block
func() {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic in range scan: %v", r)
}
}()

if isCollectionSet(collection) {
if txContext.IsInitTransaction {
err = errors.New("private data APIs are not allowed in chaincode Init()")
return
}
if err = errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
return
}
rangeIter, err = txContext.TXSimulator.GetPrivateDataRangeScanIterator(namespaceID, collection,
getStateByRange.StartKey, getStateByRange.EndKey)
} else if isMetadataSetForPagination(metadata) {
isPaginated = true
startKey := getStateByRange.StartKey
if metadata.Bookmark != "" {
startKey = metadata.Bookmark
}
rangeIter, err = txContext.TXSimulator.GetStateRangeScanIteratorWithPagination(namespaceID,
startKey, getStateByRange.EndKey, metadata.PageSize)
} else {
rangeIter, err = txContext.TXSimulator.GetStateRangeScanIterator(namespaceID, getStateByRange.StartKey, getStateByRange.EndKey)
}
rangeIter, err = txContext.TXSimulator.GetStateRangeScanIteratorWithPagination(namespaceID,
startKey, getStateByRange.EndKey, metadata.PageSize)
} else {
rangeIter, err = txContext.TXSimulator.GetStateRangeScanIterator(namespaceID, getStateByRange.StartKey, getStateByRange.EndKey)
}
if err != nil {
return nil, errors.WithStack(err)
}
txContext.InitializeQueryContext(iterID, rangeIter)
}()

payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, rangeIter, iterID, isPaginated, totalReturnLimit)
if err != nil {
txContext.CleanupQueryContext(iterID)
return nil, errors.WithStack(err)
}
if payload == nil {
txContext.CleanupQueryContext(iterID)
return nil, errors.New("marshal failed: proto: Marshal called with nil")
return nil, errors.WithMessage(err, "error getting range scan iterator")
}

payloadBytes, err := proto.Marshal(payload)
if err != nil {
txContext.CleanupQueryContext(iterID)
return nil, errors.Wrap(err, "marshal failed")
if rangeIter == nil {
return nil, errors.New("nil iterator returned")
}

chaincodeLogger.Debugf("Got keys and values. Sending %s", pb.ChaincodeMessage_RESPONSE)
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
txContext.InitializeQueryContext(iterID, rangeIter)

return createResponse(txContext, rangeIter, isPaginated, totalReturnLimit)

Check failure on line 796 in core/chaincode/handler.go

View workflow job for this annotation

GitHub Actions / Basic Checks

undefined: createResponse

Check failure on line 796 in core/chaincode/handler.go

View workflow job for this annotation

GitHub Actions / Basic Checks

undefined: createResponse
}

// Handles query to ledger for query state next
Expand Down
23 changes: 21 additions & 2 deletions core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package stateleveldb

import (
"bytes"
"fmt"

"github.com/hyperledger/fabric-lib-go/common/flogging"
"github.com/hyperledger/fabric/common/ledger/dataformat"
Expand Down Expand Up @@ -164,10 +165,28 @@ func (vdb *versionedDB) GetStateRangeScanIteratorWithPagination(namespace string
if endKey == "" {
dataEndKey[len(dataEndKey)-1] = lastKeyIndicator
}
dbItr, err := vdb.db.GetIterator(dataStartKey, dataEndKey)

// Wrap the iterator creation in a recover block
var dbItr iterator.Iterator
var err error

func() {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic while creating iterator: %v", r)
}
}()
dbItr, err = vdb.db.GetIterator(dataStartKey, dataEndKey)
}()

if err != nil {
return nil, err
return nil, errors.WithMessage(err, "error getting iterator from db")
}

if dbItr == nil {
return nil, errors.New("nil iterator returned from db")
}

return newKVScanner(namespace, dbItr, pageSize), nil
}

Expand Down
15 changes: 11 additions & 4 deletions gossip/gossip/algo/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,18 @@ func (engine *PullEngine) OnRes(items []string, nonce uint64) {
}

func (engine *PullEngine) newNONCE() uint64 {
n := uint64(0)
for {
n = util.RandomUInt64()
if !engine.outgoingNONCES.Exists(n) {
engine.lock.Lock()
defer engine.lock.Unlock()

maxAttempts := 100
for i := 0; i < maxAttempts; i++ {
n := util.RandomUInt64()
if n != 0 && !engine.outgoingNONCES.Exists(n) {
return n
}
}

// If we couldn't generate a unique NONCE after max attempts,
// use time-based fallback
return uint64(time.Now().UnixNano())
}
16 changes: 9 additions & 7 deletions gossip/util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package util

import (
crand "crypto/rand"
"fmt"
"math/rand/v2"
"math/rand"
"reflect"
"runtime"
"sync"
Expand All @@ -18,12 +17,13 @@ import (
"github.com/spf13/viper"
)

var r *rand.Rand
var (
r *rand.Rand
rMutex sync.Mutex
)

func init() { // do we really need this?
var seed [32]byte
_, _ = crand.Read(seed[:])
r = rand.New(rand.NewChaCha8(seed))
r = rand.New(rand.NewSource(time.Now().UnixNano()))
}

// Equals returns whether a and b are the same
Expand Down Expand Up @@ -179,14 +179,16 @@ func SetVal(key string, val interface{}) {
// RandomInt returns, as an int, a non-negative pseudo-random integer in [0,n)
// It panics if n <= 0
func RandomInt(n int) int {
return r.IntN(n)
return r.Intn(n)
}

// RandomUInt64 returns a random uint64
//
// If we want a rand that's non-global and specific to gossip, we can
// establish one. Otherwise this uses the process-global locking RNG.
func RandomUInt64() uint64 {
rMutex.Lock()
defer rMutex.Unlock()
return r.Uint64()
}

Expand Down

0 comments on commit e8139fc

Please sign in to comment.