Skip to content

Commit

Permalink
Merge pull request #1784 from lavanet/CNS-1008-score-store-refactor
Browse files Browse the repository at this point in the history
refactor: CNS-1008 - Optimizer Refactor Part 1: provider optimizer refactor
  • Loading branch information
ranlavanet authored Jan 21, 2025
2 parents 8943739 + 562bb29 commit e7ae90e
Show file tree
Hide file tree
Showing 23 changed files with 2,456 additions and 932 deletions.
3 changes: 1 addition & 2 deletions protocol/chainlib/consumer_ws_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,10 +722,9 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

func CreateConsumerSessionManager(chainID, apiInterface, consumerPublicAddress string) *lavasession.ConsumerSessionManager {
rand.InitRandomSeed()
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return lavasession.NewConsumerSessionManager(
&lavasession.RPCEndpoint{NetworkAddress: "stub", ChainID: chainID, ApiInterface: apiInterface, TLSEnabled: false, HealthCheckPath: "/", Geolocation: 0},
provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil, "dontcare"),
provideroptimizer.NewProviderOptimizer(provideroptimizer.StrategyBalanced, 0, 1, nil, "dontcare"),
nil, nil, consumerPublicAddress,
lavasession.NewActiveSubscriptionProvidersStorage(),
)
Expand Down
3 changes: 1 addition & 2 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc
consumerStateTracker := &mockConsumerStateTracker{}
finalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID)
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
baseLatency := common.AverageWorldLatency / 2
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2, nil, "dontcare")
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.StrategyBalanced, averageBlockTime, 2, nil, "dontcare")
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage())
consumerSessionManager.UpdateAllProviders(rpcConsumerOptions.epoch, rpcConsumerOptions.pairingList)

Expand Down
3 changes: 0 additions & 3 deletions protocol/lavaprotocol/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ func ConstructRelaySession(lavaChainID string, relayRequestData *pairingtypes.Re
copiedQOS := copyQoSServiceReport(singleConsumerSession.QoSInfo.LastQoSReport)
copiedExcellenceQOS := copyQoSServiceReport(singleConsumerSession.QoSInfo.LastExcellenceQoSReportRaw) // copy raw report for the node

// validate and fix QoS excellence report before sending it to the node
copiedExcellenceQOS.ValidateAndFixQoSExcellence()

return &pairingtypes.RelaySession{
SpecId: chainID,
ContentHash: sigs.HashMsg(relayRequestData.GetContentHashData()),
Expand Down
14 changes: 9 additions & 5 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,15 +563,15 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
sessionInfo.QoSSummeryResult = consumerSession.getQosComputedResultOrZero()
sessions[providerAddress] = sessionInfo

qosReport, rawQosReport := csm.providerOptimizer.GetExcellenceQoSReportForProvider(providerAddress)
qosReport, _ := csm.providerOptimizer.GetExcellenceQoSReportForProvider(providerAddress)
if csm.rpcEndpoint.Geolocation != uint64(endpoint.endpoint.Geolocation) {
// rawQosReport is used only when building the relay payment message to be used to update
// the provider's reputation on-chain. If the consumer and provider don't share geolocation
// (consumer geo: csm.rpcEndpoint.Geolocation, provider geo: endpoint.endpoint.Geolocation)
// we don't want to update the reputation by it, so we null the rawQosReport
rawQosReport = nil
qosReport = nil
}
consumerSession.SetUsageForSession(cuNeededForSession, qosReport, rawQosReport, usedProviders, routerKey)
consumerSession.SetUsageForSession(cuNeededForSession, qosReport, usedProviders, routerKey)
// We successfully added provider, we should ignore it if we need to fetch new
tempIgnoredProviders.providers[providerAddress] = struct{}{}
if len(sessions) == wantedSession {
Expand Down Expand Up @@ -641,7 +641,7 @@ func (csm *ConsumerSessionManager) getValidProviderAddresses(ignoredProvidersLis
}
}
var providers []string
if stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.STRATEGY_COST {
if stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.StrategyCost {
providers = csm.getTopTenProvidersForStatefulCalls(validAddresses, ignoredProvidersList)
} else {
providers, _ = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock)
Expand Down Expand Up @@ -1048,7 +1048,11 @@ func (csm *ConsumerSessionManager) OnSessionDone(
consumerSession.LatestBlock = latestServicedBlock // update latest serviced block
// calculate QoS
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount))
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock))
if !isHangingApi {
// append relay data only for non hanging apis
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, specComputeUnits, uint64(latestServicedBlock))
}

csm.updateMetricsManager(consumerSession, currentLatency, !isHangingApi) // apply latency only for non hanging apis
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ func TestEndpointSortingFlow(t *testing.T) {

func CreateConsumerSessionManager() *ConsumerSessionManager {
rand.InitRandomSeed()
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil, "dontcare"), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage())
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.StrategyBalanced, 0, 1, nil, "dontcare"), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage())
}

func TestMain(m *testing.M) {
Expand Down
4 changes: 2 additions & 2 deletions protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ type ConsumerSessionsMap map[string]*SessionInfo
type ProviderOptimizer interface {
AppendProbeRelayData(providerAddress string, latency time.Duration, success bool)
AppendRelayFailure(providerAddress string)
AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64)
AppendRelayData(providerAddress string, latency time.Duration, cu, syncBlock uint64)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, time.Time)
Strategy() provideroptimizer.Strategy
UpdateWeights(map[string]int64, uint64)
}
Expand Down
3 changes: 1 addition & 2 deletions protocol/lavasession/single_consumer_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,12 @@ func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Dura
}
}

func (scs *SingleConsumerSession) SetUsageForSession(cuNeededForSession uint64, qoSExcellenceReport *pairingtypes.QualityOfServiceReport, rawQoSExcellenceReport *pairingtypes.QualityOfServiceReport, usedProviders UsedProvidersInf, routerKey RouterKey) error {
func (scs *SingleConsumerSession) SetUsageForSession(cuNeededForSession uint64, qoSExcellenceReport *pairingtypes.QualityOfServiceReport, usedProviders UsedProvidersInf, routerKey RouterKey) error {
scs.LatestRelayCu = cuNeededForSession // set latestRelayCu
scs.RelayNum += RelayNumberIncrement // increase relayNum
if scs.RelayNum > 1 {
// we only set excellence for sessions with more than one successful relays, this guarantees data within the epoch exists
scs.QoSInfo.LastExcellenceQoSReport = qoSExcellenceReport
scs.QoSInfo.LastExcellenceQoSReportRaw = rawQoSExcellenceReport
}
scs.usedProviders = usedProviders
scs.routerKey = routerKey
Expand Down
Loading

0 comments on commit e7ae90e

Please sign in to comment.