Skip to content

Commit

Permalink
feat(relayer): cached height
Browse files Browse the repository at this point in the history
  • Loading branch information
sideninja committed Nov 4, 2024
1 parent d00a986 commit 19c6b34
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 145 deletions.
299 changes: 168 additions & 131 deletions lib/cchain/provider/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/omni-network/omni/lib/cchain"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/expbackoff"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/tracer"
"github.com/omni-network/omni/lib/umath"
Expand Down Expand Up @@ -352,7 +351,7 @@ func newABCIAllAttsFunc(cl atypes.QueryClient) allAttsFunc {
}

func newABCIFetchFunc(attCl atypes.QueryClient, cmtCl cmtservice.ServiceClient, chainNamer func(xchain.ChainVersion) string) fetchFunc {
return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64) ([]xchain.Attestation, error) {
return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64, cursor uint64) ([]xchain.Attestation, uint64, error) {
const endpoint = "fetch_attestations"
defer latency(endpoint)()

Expand All @@ -365,40 +364,60 @@ func newABCIFetchFunc(attCl atypes.QueryClient, cmtCl cmtservice.ServiceClient,
atts, ok, err := attsFromAtHeight(ctx, attCl, chainVer, fromOffset, 0)
if err != nil {
incQueryErr(endpoint)
return nil, errors.Wrap(err, "abci query attestations-from")
return nil, 0, errors.Wrap(err, "abci query attestations-from")
} else if ok {
fetchStepsMetrics(chainName, 0, 0)
return atts, nil
binarySearchStepsMetric(chainName, 0)
lookbackStepsMetric(chainName, 0)

return atts, cursor, nil
}

earliestAttestationAtLatestHeight, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, 0)
if err != nil {
incQueryErr(endpoint)
return nil, errors.Wrap(err, "abci query earliest-attestation-in-state")
return nil, 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

// Either no attestations have happened yet, or the queried fromOffset is in the "future"
// Caller has to wait and retry in both cases
if !ok || earliestAttestationAtLatestHeight.AttestOffset < fromOffset {
// First attestation hasn't happened yet, return empty
return []xchain.Attestation{}, nil
// First attestation hasn't happened yet, return empty and set cursor to latest height
return []xchain.Attestation{}, earliestAttestationAtLatestHeight.BlockHeight, nil
}

latestBlockResp, err := cmtCl.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{})
if err != nil {
return []xchain.Attestation{}, 0, errors.Wrap(err, "query latest block")
}

latestHeight := uint64(latestBlockResp.SdkBlock.Header.Height)

// Binary search range from cached to latest
searchStart, searchEnd := cursor, latestHeight
if cursor == 0 {
// Unless no cached height provided, then perform lookback search
searchStart, searchEnd, err = lookbackRange(ctx, attCl, chainVer, chainName, fromOffset, latestHeight)
if err != nil {
incQueryErr(endpoint)
return nil, 0, errors.Wrap(err, "lookback search")
}
}

offsetHeight, err := searchOffsetInHistory(ctx, cmtCl, attCl, chainVer, chainName, fromOffset)
offsetHeight, err := binarySearch(ctx, attCl, chainVer, chainName, fromOffset, searchStart, searchEnd)
if err != nil {
incQueryErr(endpoint)
return nil, errors.Wrap(err, "searching offset in history")
return nil, 0, errors.Wrap(err, "binary search")
}

atts, ok, err = attsFromAtHeight(ctx, attCl, chainVer, fromOffset, offsetHeight)
if err != nil {
incQueryErr(endpoint)
return nil, errors.Wrap(err, "abci query attestations-from")
return nil, 0, errors.Wrap(err, "abci query attestations-from")
} else if !ok {
return nil, errors.New("expected to find attestations [BUG]")
return nil, 0, errors.New("expected to find attestations [BUG]")
}

return atts, nil
return atts, offsetHeight, nil
}
}

Expand Down Expand Up @@ -570,124 +589,6 @@ func spanName(endpoint string) string {
return "cprovider/" + endpoint
}

// searchOffsetInHistory searches the consensus state history and
// returns a historical consensus block height that contains an approved attestation
// for the provided chain version and fromOffset.
func searchOffsetInHistory(ctx context.Context, cmtCl cmtservice.ServiceClient, attCl atypes.QueryClient, chainVer xchain.ChainVersion, chainName string, fromOffset uint64) (uint64, error) {
const endpoint = "search_offset"
defer latency(endpoint)()

// Exponentially backoff to find a good start point for binary search, this prefers more recent queries

latestBlockResp, err := cmtCl.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{})
if err != nil {
return 0, errors.Wrap(err, "query latest block")
}
latestHeight := uint64(latestBlockResp.SdkBlock.Header.Height)

var startHeightIndex uint64
endHeightIndex := latestHeight
lookback := uint64(1)
var lookbackStepsCounter uint64 // For metrics only
queryHeight := endHeightIndex
for {
lookbackStepsCounter++
if queryHeight <= lookback {
// Query from the start, but don't break out yet -- we need to find the earliest height that we have state for
queryHeight = 1
} else {
queryHeight -= lookback
}

if queryHeight == 0 || queryHeight >= latestHeight {
return 0, errors.New("unexpected query height [BUG]", "height", queryHeight) // This should never happen
}
earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, queryHeight)
if IsErrHistoryPruned(err) {
// We've jumped to before the prune height, but _might_ still have the requested offset
earliestStoreHeight, err := getEarliestStoreHeight(ctx, attCl, chainVer, queryHeight+1)
if err != nil {
return 0, errors.Wrap(err, "failed to get earliest store height")
}
earliestAtt, ok, err = queryEarliestAttestation(ctx, attCl, chainVer, earliestStoreHeight)
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

// If we're so far back that no attestation is found, or that we're before fromOffset,
// that's a good breaking point for binary search
if !ok || earliestAtt.AttestOffset <= fromOffset {
startHeightIndex = earliestStoreHeight
break
}

// Otherwise, we just don't have the needed state, fail
return 0, ErrHistoryPruned
}
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

// If we're before the first attestation, or found an earlier attestation, it's a good start height
if !ok || earliestAtt.AttestOffset <= fromOffset {
startHeightIndex = queryHeight
break
}

// Otherwise, keep moving back
endHeightIndex = queryHeight
lookback *= 2
}

// We now have reasonable start and end indices for binary search
var binarySearchStepsCounter uint64 // For metrics only
for startHeightIndex <= endHeightIndex {
binarySearchStepsCounter++
midHeightIndex := startHeightIndex + umath.SubtractOrZero(endHeightIndex, startHeightIndex)/2

earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, midHeightIndex)
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

if !ok {
// If we're so far back that there's no attestation at all, move forward
startHeightIndex = midHeightIndex + 1
continue
}

latestAtt, ok, err := queryLatestAttestation(ctx, attCl, chainVer, midHeightIndex)
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query latest-attestation")
}

if !ok {
return 0, errors.New("no latest attestation found despite earlier check [BUG]")
}

if fromOffset >= earliestAtt.AttestOffset && fromOffset <= latestAtt.AttestOffset {
log.Debug(ctx, "Fetching offset from history", "chain", chainName, "from", fromOffset, "latest", latestHeight, "found", midHeightIndex, "lookback", lookbackStepsCounter, "search", binarySearchStepsCounter)
fetchStepsMetrics(chainName, lookbackStepsCounter, binarySearchStepsCounter)

return midHeightIndex, nil
}

// Query at a lower or higher height depending on whether fromOffset
// is smaller or larger than the earliest offset we found
if fromOffset < earliestAtt.AttestOffset {
endHeightIndex = umath.SubtractOrZero(midHeightIndex, 1)
} else {
startHeightIndex = midHeightIndex + 1
}
}

return 0, errors.New("unexpectedly reach end of search method [BUG]")
}

// getEarliestStoreHeight walks forward from startPoint, and returns the first height for which we have the state in our Store.
func getEarliestStoreHeight(ctx context.Context, cl atypes.QueryClient, chainVer xchain.ChainVersion, startPoint uint64) (uint64, error) {
// Note: the correct thing to do here would be to query the node's Status, and look at its EarliestStoreHeight
Expand Down Expand Up @@ -771,3 +672,139 @@ func attsFromAtHeight(ctx context.Context, cl atypes.QueryClient, chainVer xchai

return atts, true, nil
}

// binarySearch uses a binary search between defined start and end consensus block heights to
// find the attestations with the provided fromOffset. It returns the consensus block height
// that contains the attestation offset.
func binarySearch(
ctx context.Context,
attCl atypes.QueryClient,
chainVer xchain.ChainVersion,
chainName string,
fromOffset uint64,
startHeight uint64,
endHeight uint64,
) (uint64, error) {
const endpoint = "offset_binary_search"
defer latency(endpoint)()

for steps := 0; startHeight <= endHeight; steps++ {
midHeight := startHeight + umath.SubtractOrZero(endHeight, startHeight)/2

earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, midHeight)
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
} else if !ok {
// If we're so far back that there's no attestation at all, move forward
startHeight = midHeight + 1
continue
}

if fromOffset == earliestAtt.AttestOffset {
binarySearchStepsMetric(chainName, steps)
return midHeight, nil
}

latestAtt, ok, err := queryLatestAttestation(ctx, attCl, chainVer, midHeight)
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query latest-attestation")
} else if !ok {
return 0, errors.New("no latest attestation found despite earlier check [BUG]")
}

if fromOffset >= earliestAtt.AttestOffset && fromOffset <= latestAtt.AttestOffset {
binarySearchStepsMetric(chainName, steps)

return midHeight, nil
}

// Query at a lower or higher height depending on whether fromOffset
// is smaller or larger than the earliest offset we found
if fromOffset < earliestAtt.AttestOffset {
endHeight = umath.SubtractOrZero(midHeight, 1)
} else {
startHeight = midHeight + 1
}
}

return 0, errors.New("unexpectedly reach end of search method [BUG]")
}

// lookbackRange does an exponential lookback from the latest block height provided
// until attestation found has lower or equal offset than the provided from offset, this
// guarantees the fromOffset attestation will be found in the range. Start and
// end height defining the range are returned.
//
//nolint:nonamedreturns // named returned for clarity
func lookbackRange(
ctx context.Context,
attCl atypes.QueryClient,
chainVer xchain.ChainVersion,
chainName string,
fromOffset uint64,
latestHeight uint64,
) (startHeight uint64, endHeight uint64, err error) {
const endpoint = "offset_lookback"
defer latency(endpoint)()

endHeight = latestHeight
lookback := uint64(1)
queryHeight := endHeight

for steps := 0; ; steps++ {
if queryHeight <= lookback {
// Query from the start, but don't break out yet -- we need to find the earliest height that we have state for
queryHeight = 1
} else {
queryHeight -= lookback
}

if queryHeight == 0 || queryHeight >= latestHeight {
return 0, 0, errors.New("unexpected query height [BUG]", "height", queryHeight) // This should never happen
}
earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, queryHeight)
if IsErrHistoryPruned(err) {
// We've jumped to before the prune height, but _might_ still have the requested offset
earliestStoreHeight, err := getEarliestStoreHeight(ctx, attCl, chainVer, queryHeight+1)
if err != nil {
return 0, 0, errors.Wrap(err, "failed to get earliest store height")
}

earliestAtt, ok, err = queryEarliestAttestation(ctx, attCl, chainVer, earliestStoreHeight)
if err != nil {
incQueryErr(endpoint)
return 0, 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

// If we're so far back that no attestation is found, or that we're before fromOffset,
// that's a good breaking point for binary search
if !ok || earliestAtt.AttestOffset <= fromOffset {
lookbackStepsMetric(chainName, steps)
startHeight = earliestStoreHeight

return startHeight, endHeight, nil
}

// Otherwise, we just don't have the needed state, fail
return 0, 0, ErrHistoryPruned
}
if err != nil {
incQueryErr(endpoint)
return 0, 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

// If we're before the first attestation, or found an earlier attestation, it's a good start height
if !ok || earliestAtt.AttestOffset <= fromOffset {
lookbackStepsMetric(chainName, steps)
startHeight = queryHeight

return startHeight, endHeight, nil
}

// Otherwise, keep moving back
endHeight = queryHeight
lookback *= 2
}
}
9 changes: 6 additions & 3 deletions lib/cchain/provider/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ var (
}, []string{"chain_version"})
)

func fetchStepsMetrics(chainName string, lookbackSteps, binarySearchSteps uint64) {
fetchLookbackSteps.WithLabelValues(chainName).Observe(float64(lookbackSteps))
fetchBinarySearchSteps.WithLabelValues(chainName).Observe(float64(binarySearchSteps))
func lookbackStepsMetric(chainName string, steps int) {
fetchLookbackSteps.WithLabelValues(chainName).Observe(float64(steps))
}

func binarySearchStepsMetric(chainName string, steps int) {
fetchBinarySearchSteps.WithLabelValues(chainName).Observe(float64(steps))
}

func latency(endpoint string) func() {
Expand Down
Loading

0 comments on commit 19c6b34

Please sign in to comment.