Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: PRT - Fix a fallback bug in Solana chain tracker #1902

Merged
merged 2 commits into from
Jan 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (cf *ChainFetcher) ChainFetcherMetadata() []pairingtypes.Metadata {
}

func (cf *ChainFetcher) CustomMessage(ctx context.Context, path string, data []byte, connectionType string, apiName string) ([]byte, error) {
utils.LavaFormatDebug("Sending CustomMessage", utils.Attribute{Key: "path", Value: path}, utils.Attribute{Key: "data", Value: data}, utils.Attribute{Key: "connectionType", Value: connectionType}, utils.Attribute{Key: "apiName", Value: apiName})
utils.LavaFormatTrace("Sending CustomMessage", utils.Attribute{Key: "path", Value: path}, utils.Attribute{Key: "data", Value: data}, utils.Attribute{Key: "connectionType", Value: connectionType}, utils.Attribute{Key: "apiName", Value: apiName})
craftData := &CraftData{Path: path, Data: data, ConnectionType: connectionType}
parsing := &spectypes.ParseDirective{
ApiName: apiName,
Expand All @@ -323,7 +323,7 @@ func (cf *ChainFetcher) CustomMessage(ctx context.Context, path string, data []b
return nil, err
}
reply, _, _, _, _, err := cf.chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil)
utils.LavaFormatDebug("CustomMessage", utils.Attribute{Key: "reply", Value: reply})
utils.LavaFormatTrace("CustomMessage", utils.Attribute{Key: "reply", Value: reply})
if err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions protocol/chaintracker/chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,12 +654,17 @@ func newCustomChainTracker(chainFetcher ChainFetcher, config ChainTrackerConfig)
// By applying a name SVM for example
case "SOLANA", "SOLANAT", "KOII", "KOIIT":
utils.LavaFormatInfo("using SVMChainTracker", utils.Attribute{Key: "chainID", Value: config.ChainId})
cache, err := ristretto.NewCache(&ristretto.Config[int64, int64]{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true})
slotCache, err := ristretto.NewCache(&ristretto.Config[int64, int64]{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true})
if err != nil {
utils.LavaFormatFatal("could not create cache", err)
}
hashCache, err := ristretto.NewCache(&ristretto.Config[int64, string]{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true})
if err != nil {
utils.LavaFormatFatal("could not create cache", err)
}
chainTracker.iChainFetcherWrapper = &SVMChainTracker{
cache: cache,
slotCache: slotCache,
hashCache: hashCache,
dataFetcher: chainTracker,
chainFetcher: chainFetcher,
}
Expand Down
75 changes: 59 additions & 16 deletions protocol/chaintracker/svm_chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ import (
"context"
"encoding/json"
fmt "fmt"
"sync/atomic"
"time"

"github.com/dgraph-io/ristretto/v2"
"github.com/lavanet/lava/v4/utils"
)

const (
CacheMaxCost = 100000 // each item cost would be 1
CacheNumCounters = 100000 // expect 100000 items
latestBlockRequest = "{\"jsonrpc\":\"2.0\",\"method\":\"getLatestBlockhash\",\"params\":[{\"commitment\":\"finalized\"}],\"id\":1}"
CacheMaxCost = 100000 // each item cost would be 1
CacheNumCounters = 100000 // expect 100000 items
latestBlockRequest = "{\"jsonrpc\":\"2.0\",\"method\":\"getLatestBlockhash\",\"params\":[{\"commitment\":\"finalized\"}],\"id\":1}"
slotCacheTTL = time.Hour * 4
hashCacheTTL = time.Hour * 1
getSlotFromCacheMaxRetries = 5
getSlotFromCacheSleepDuration = time.Millisecond * 50
)

type IChainFetcherWrapper interface {
Expand All @@ -29,7 +34,9 @@ type IChainTrackerDataFetcher interface {
type SVMChainTracker struct {
dataFetcher IChainTrackerDataFetcher
chainFetcher ChainFetcher
cache *ristretto.Cache[int64, int64] // cache for block to slot. (a few slots can point the same block, but we don't really care about that so overwrite is ok)
slotCache *ristretto.Cache[int64, int64] // cache for block to slot. (a few slots can point the same block, but we don't really care about that so overwrite is ok)
hashCache *ristretto.Cache[int64, string] // cache for block to hash.
seenBlock int64
}

type SVMLatestBlockResponse struct {
Expand All @@ -38,7 +45,8 @@ type SVMLatestBlockResponse struct {
Slot int64 `json:"slot"`
} `json:"context"`
Value struct {
LastValidBlockHeight int64 `json:"lastValidBlockHeight"`
LastValidBlockHeight int64 `json:"lastValidBlockHeight"`
BlockHash string `json:"blockhash"`
} `json:"value"`
} `json:"result"`
}
Expand All @@ -48,42 +56,77 @@ func (cs *SVMChainTracker) fetchLatestBlockNumInner(ctx context.Context) (int64,
if err != nil {
return 0, err
}

var response SVMLatestBlockResponse
if err := json.Unmarshal(latestBlockResponse, &response); err != nil {
return 0, fmt.Errorf("failed to unmarshal response: %v", err)
}

blockNum := response.Result.Value.LastValidBlockHeight
slot := response.Result.Context.Slot
cs.cache.SetWithTTL(blockNum, slot, 1, time.Hour*24)
utils.LavaFormatDebug("[SVMChainTracker] fetching latest block num", utils.LogAttr("slot", slot), utils.LogAttr("block_num", blockNum))
blockHash := response.Result.Value.BlockHash

atomic.StoreInt64(&cs.seenBlock, blockNum)
cs.slotCache.SetWithTTL(blockNum, slot, 1, slotCacheTTL)
cs.hashCache.SetWithTTL(blockNum, blockHash, 1, hashCacheTTL)

utils.LavaFormatTrace("[SVMChainTracker] fetching latest block num",
utils.LogAttr("slot", slot),
utils.LogAttr("block_num", blockNum),
utils.LogAttr("block_hash", blockHash),
)

return blockNum, nil
}

func (cs *SVMChainTracker) FetchLatestBlockNum(ctx context.Context) (int64, error) {
latestBlockNum, err := cs.fetchLatestBlockNumInner(ctx)
if err != nil {
utils.LavaFormatWarning("[SVMChainTracker] failed to get latest block num, getting from chain fetcher", err)
return cs.chainFetcher.FetchLatestBlockNum(ctx)
return 0, utils.LavaFormatWarning("[SVMChainTracker] failed to get latest block num, getting from chain fetcher", err,
utils.LogAttr("block_num", latestBlockNum),
utils.LogAttr("latest_block", cs.dataFetcher.GetAtomicLatestBlockNum()),
utils.LogAttr("server_memory", cs.dataFetcher.GetServerBlockMemory()))
}
utils.LavaFormatDebug("[SVMChainTracker] fetched latest block num", utils.LogAttr("block_num", latestBlockNum))
utils.LavaFormatTrace("[SVMChainTracker] fetched latest block num", utils.LogAttr("block_num", latestBlockNum))
return latestBlockNum, nil
}

func (cs *SVMChainTracker) FetchBlockHashByNum(ctx context.Context, blockNum int64) (string, error) {
if blockNum < cs.dataFetcher.GetAtomicLatestBlockNum()-int64(cs.dataFetcher.GetServerBlockMemory()) {
return "", ErrorFailedToFetchTooEarlyBlock.Wrapf("requested Block: %d, latest block: %d, server memory %d", blockNum, cs.dataFetcher.GetAtomicLatestBlockNum(), cs.dataFetcher.GetServerBlockMemory())
}
blockHash, ok := cs.hashCache.Get(blockNum)
if ok {
utils.LavaFormatTrace("[SVMChainTracker] FetchBlockHashByNum found block hash in cache", utils.LogAttr("block_num", blockNum), utils.LogAttr("hash", blockHash))
return blockHash, nil
}

// In SVM, the block hash is fetched by slot instead of block.
// We need to get the slot which is related to this block number.
slot, ok := cs.cache.Get(blockNum)
if !ok {
utils.LavaFormatError("slot not found in cache, falling back to direct block fetch - This error can happen on bootstrap and should resolve by itself, if persists please let the dev team know", ErrorFailedToFetchTooEarlyBlock, utils.LogAttr("block", blockNum), utils.LogAttr("latest_block", cs.dataFetcher.GetAtomicLatestBlockNum()), utils.LogAttr("server_memory", cs.dataFetcher.GetServerBlockMemory()))
return cs.chainFetcher.FetchBlockHashByNum(ctx, blockNum)
slot, err := cs.tryGetSlotFromCache(blockNum)
if err != nil {
return "", err
}
utils.LavaFormatDebug("[SVMChainTracker] FetchBlockHashByNum found slot in cache", utils.LogAttr("block_num", blockNum), utils.LogAttr("slot", slot))

utils.LavaFormatTrace("[SVMChainTracker] FetchBlockHashByNum found slot in cache", utils.LogAttr("block_num", blockNum), utils.LogAttr("slot", slot))
hash, err := cs.chainFetcher.FetchBlockHashByNum(ctx, slot)
if err == nil {
utils.LavaFormatDebug("[SVMChainTracker] FetchBlockHashByNum succeeded", utils.LogAttr("block_num", blockNum), utils.LogAttr("hash", hash), utils.LogAttr("slot", slot))
utils.LavaFormatTrace("[SVMChainTracker] FetchBlockHashByNum succeeded", utils.LogAttr("block_num", blockNum), utils.LogAttr("hash", hash), utils.LogAttr("slot", slot))
}
return hash, err
}

func (cs *SVMChainTracker) tryGetSlotFromCache(blockNum int64) (int64, error) {
if blockNum <= atomic.LoadInt64(&cs.seenBlock) {
for i := 0; i < getSlotFromCacheMaxRetries; i++ {
slot, ok := cs.slotCache.Get(blockNum)
if ok {
return slot, nil
}
time.Sleep(getSlotFromCacheSleepDuration)
}
}

return 0, fmt.Errorf("slot not found in cache. This error can happen on bootstrap and should resolve by itself, if persists please let the dev team know. "+
"block: %d, latest_block: %d, server_memory: %d", blockNum, cs.dataFetcher.GetAtomicLatestBlockNum(), cs.dataFetcher.GetServerBlockMemory())
}
Loading