Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(lib/stream): remove cache #2362

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 18 additions & 39 deletions lib/cchain/provider/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/omni-network/omni/lib/expbackoff"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/stream"
"github.com/omni-network/omni/lib/tracer"
"github.com/omni-network/omni/lib/umath"
"github.com/omni-network/omni/lib/xchain"
Expand Down Expand Up @@ -73,25 +72,6 @@ func NewGRPC(target string, network netconf.ID, opts ...func(*Provider)) (Provid
return newProvider(grpcClient, network, opts...), nil
}

// attestCacheProvider is a function that returns a cache for attestations.
type attestCacheProvider func(xchain.ChainVersion) stream.Cache[xchain.Attestation]

// nopCacheProvider returns a cache provider that doesn't cache anything.
var nopCacheProvider = func(xchain.ChainVersion) stream.Cache[xchain.Attestation] {
return stream.NewNopCache[xchain.Attestation]()
}

// WithAttestCache returns an option that enables caching of attest streams.
func WithAttestCache(limit int) func(*Provider) {
var caches sync.Map
return func(p *Provider) {
p.cacheProvider = func(chainVer xchain.ChainVersion) stream.Cache[xchain.Attestation] {
resp, _ := caches.LoadOrStore(chainVer, stream.NewCache[xchain.Attestation](limit))
return resp.(stream.Cache[xchain.Attestation]) //nolint:forcetypeassert,revive // Known type
}
}
}

func newProvider(cc gogogrpc.ClientConn, network netconf.ID, opts ...func(*Provider)) Provider {
// Stream backoff for 1s, querying new attestations after 1 consensus block
backoffFunc := func(ctx context.Context) func() {
Expand All @@ -112,25 +92,24 @@ func newProvider(cc gogogrpc.ClientConn, network netconf.ID, opts ...func(*Provi
cmtcl := cmtservice.NewServiceClient(cc)

p := Provider{
fetch: newABCIFetchFunc(acl, cmtcl, namer),
allAtts: newABCIAllAttsFunc(acl),
latest: newABCILatestFunc(acl),
window: newABCIWindowFunc(acl),
valset: newABCIValsetFunc(vcl),
val: newABCIValFunc(scl),
vals: newABCIValsFunc(scl),
signing: newABCISigningFunc(slcl),
rewards: newABCIRewards(dcl),
portalBlock: newABCIPortalBlockFunc(pcl),
networkFunc: newABCINetworkFunc(rcl),
genesisFunc: newABCIGenesisFunc(gcl),
plannedFunc: newABCIPlannedUpgradeFunc(ucl),
appliedFunc: newABCIAppliedUpgradeFunc(ucl),
chainID: newChainIDFunc(cmtcl),
backoffFunc: backoffFunc,
chainNamer: namer,
network: network,
cacheProvider: nopCacheProvider,
fetch: newABCIFetchFunc(acl, cmtcl, namer),
allAtts: newABCIAllAttsFunc(acl),
latest: newABCILatestFunc(acl),
window: newABCIWindowFunc(acl),
valset: newABCIValsetFunc(vcl),
val: newABCIValFunc(scl),
vals: newABCIValsFunc(scl),
signing: newABCISigningFunc(slcl),
rewards: newABCIRewards(dcl),
portalBlock: newABCIPortalBlockFunc(pcl),
networkFunc: newABCINetworkFunc(rcl),
genesisFunc: newABCIGenesisFunc(gcl),
plannedFunc: newABCIPlannedUpgradeFunc(ucl),
appliedFunc: newABCIAppliedUpgradeFunc(ucl),
chainID: newChainIDFunc(cmtcl),
backoffFunc: backoffFunc,
chainNamer: namer,
network: network,
}

for _, opt := range opts {
Expand Down
14 changes: 0 additions & 14 deletions lib/cchain/provider/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,6 @@ var (
Buckets: []float64{0, 1, 2, 4, 8, 16, 32, 64, 128, 256},
Help: "Number of steps in the binary search process to find the right height",
}, []string{"chain_version"})

cacheHits = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "lib",
Subsystem: "cprovider",
Name: "cache_hits_total",
Help: "Total number of cache hits per source chain version.",
}, []string{"chain_version"})

cacheMisses = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "lib",
Subsystem: "cprovider",
Name: "cache_misses_total",
Help: "Total number of cache misses per source chain version.",
}, []string{"chain_version"})
)

func fetchStepsMetrics(chainName string, lookbackSteps, binarySearchSteps uint64) {
Expand Down
55 changes: 23 additions & 32 deletions lib/cchain/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,36 @@ type valSetResponse struct {

// Provider implements cchain.Provider.
type Provider struct {
fetch fetchFunc
allAtts allAttsFunc
latest latestFunc
window windowFunc
valset valsetFunc
val valFunc
signing signingFunc
vals valsFunc
rewards rewardsFunc
chainID chainIDFunc
portalBlock portalBlockFunc
networkFunc networkFunc
genesisFunc genesisFunc
plannedFunc planedUpgradeFunc
appliedFunc appliedUpgradeFunc
backoffFunc func(context.Context) func()
chainNamer func(xchain.ChainVersion) string
network netconf.ID
cacheProvider attestCacheProvider
fetch fetchFunc
allAtts allAttsFunc
latest latestFunc
window windowFunc
valset valsetFunc
val valFunc
signing signingFunc
vals valsFunc
rewards rewardsFunc
chainID chainIDFunc
portalBlock portalBlockFunc
networkFunc networkFunc
genesisFunc genesisFunc
plannedFunc planedUpgradeFunc
appliedFunc appliedUpgradeFunc
backoffFunc func(context.Context) func()
chainNamer func(xchain.ChainVersion) string
network netconf.ID
}

// NewProviderForT creates a new provider for testing.
func NewProviderForT(_ *testing.T, fetch fetchFunc, latest latestFunc, window windowFunc,
backoffFunc func(context.Context) func(),
) Provider {
return Provider{
latest: latest,
fetch: fetch,
window: window,
backoffFunc: backoffFunc,
chainNamer: func(xchain.ChainVersion) string { return "" },
cacheProvider: nopCacheProvider,
latest: latest,
fetch: fetch,
window: window,
backoffFunc: backoffFunc,
chainNamer: func(xchain.ChainVersion) string { return "" },
}
}

Expand Down Expand Up @@ -185,7 +183,6 @@ func (p Provider) stream(
FetchBatch: func(ctx context.Context, _ uint64, offset uint64) ([]xchain.Attestation, error) {
return p.fetch(ctx, chainVer, offset)
},
Cache: p.cacheProvider(chainVer),
Backoff: p.backoffFunc,
ElemLabel: "attestation",
HeightLabel: "offset",
Expand All @@ -212,12 +209,6 @@ func (p Provider) stream(
IncCallbackErr: func() {
callbackErrTotal.WithLabelValues(workerName, srcChain).Inc()
},
IncCacheHit: func() {
cacheHits.WithLabelValues(srcChain).Inc()
},
IncCacheMiss: func() {
cacheMisses.WithLabelValues(srcChain).Inc()
},
SetStreamHeight: func(h uint64) {
streamHeight.WithLabelValues(workerName, srcChain).Set(float64(h))
},
Expand Down
82 changes: 0 additions & 82 deletions lib/stream/cache.go

This file was deleted.

21 changes: 0 additions & 21 deletions lib/stream/cache_internal_test.go

This file was deleted.

Loading
Loading