Skip to content

Commit

Permalink
added latest snapshot height querying (#970)
Browse files Browse the repository at this point in the history
* added latest snapshot height querying

* added errors to abci query
  • Loading branch information
brennanjl authored Sep 11, 2024
1 parent cc87c18 commit d155a3f
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 14 deletions.
12 changes: 11 additions & 1 deletion cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,17 @@ func buildStatesyncer(d *coreDependencies) *statesync.StateSyncer {
// arbitrary, 10s is a reasonable time out for a http server regardless
// of the location and network routes.
ctx, cancel := context.WithTimeout(d.ctx, 10*time.Second)
res, err := clt.Header(ctx, nil)

// we will first get the latest snapshot height that the trusted node has
latestSnapshot, err := statesync.GetLatestSnapshotInfo(ctx, clt)
if err != nil {
cancel()
d.log.Warnf("failed to get latest snapshot from snap provider: %v", err)
continue
}

latestHeight := int64(latestSnapshot.Height)
res, err := clt.Header(ctx, &latestHeight)
if err != nil {
cancel()
d.log.Warnf("failed to get header from snap provider: %v", err)
Expand Down
45 changes: 33 additions & 12 deletions internal/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -1594,10 +1594,10 @@ func (a *AbciApp) ProcessProposal(ctx context.Context, req *abciTypes.RequestPro

func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abciTypes.ResponseQuery, error) {
a.log.Debug("ABCI Query", zap.String("path", req.Path), zap.String("data", string(req.Data)))

if req.Path == statesync.ABCISnapshotQueryPath { // "/snapshot/height"
switch {
case req.Path == statesync.ABCISnapshotQueryPath:
if a.snapshotter == nil {
return &abciTypes.ResponseQuery{}, nil
return nil, fmt.Errorf("this node is not configured to serve snapshots")
}

var snapshot *statesync.Snapshot
Expand All @@ -1614,15 +1614,36 @@ func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abci
}

if !exists {
return &abciTypes.ResponseQuery{}, nil
return nil, fmt.Errorf("snapshot not found for height %s", height)
}

bts, err := json.Marshal(snapshot)
bts, err := snapshot.MarshalBinary()
if err != nil {
return nil, err
}
return &abciTypes.ResponseQuery{Value: bts}, nil
} else if strings.HasPrefix(req.Path, ABCIPeerFilterPath) {
case req.Path == statesync.ABCILatestSnapshotHeightPath:
if a.snapshotter == nil {
return nil, fmt.Errorf("this node is not configured to serve snapshots")
}
snaps := a.snapshotter.ListSnapshots()
if len(snaps) == 0 {
return nil, fmt.Errorf("no snapshots available")
}
latest := snaps[len(snaps)-1]
for _, snap := range snaps {
if snap.Height > latest.Height {
latest = snap
}
}

bts, err := latest.MarshalBinary()
if err != nil {
return nil, err
}

return &abciTypes.ResponseQuery{Value: bts}, nil
case strings.HasPrefix(req.Path, ABCIPeerFilterPath):
// When CometBFT connects to a peer, it sends two queries to the ABCI application
// using the following paths, with no additional data:
// - `/p2p/filter/addr/<IP:PORT>`
Expand All @@ -1633,7 +1654,7 @@ func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abci

paths := strings.Split(req.Path[ABCIPeerFilterPathLen:], "/")
if len(paths) != 2 {
return &abciTypes.ResponseQuery{Code: 1}, fmt.Errorf("invalid path: %s", req.Path)
return nil, fmt.Errorf("invalid path: %s", req.Path)
}

switch paths[0] {
Expand All @@ -1644,16 +1665,16 @@ func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abci
}
// ID is not in the allowed list of peers, so reject the connection
a.log.Warn("Connection attempt rejected, peer is not allowed to connect", zap.String("peerID", paths[1]))
return &abciTypes.ResponseQuery{Code: 1}, nil
return nil, fmt.Errorf("node rejected connection attempt from peer %s", paths[1])
case "addr":
return &abciTypes.ResponseQuery{Code: abciTypes.CodeTypeOK}, nil
default:
return &abciTypes.ResponseQuery{Code: 1}, fmt.Errorf("invalid path: %s", req.Path)
return nil, fmt.Errorf("invalid path: %s", req.Path)
}

default:
// If the query path is not recognized, return an error.
return nil, fmt.Errorf("unknown query path: %s", req.Path)
}

return &abciTypes.ResponseQuery{}, nil
}

type EventBroadcaster func(ctx context.Context, db sql.DB, block *common.BlockContext) error
Expand Down
8 changes: 8 additions & 0 deletions internal/statesync/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ func loadSnapshot(headerFile string) (*Snapshot, error) {
}
return &snapshot, nil
}

func (s *Snapshot) MarshalBinary() ([]byte, error) {
return json.Marshal(s)
}

func (s *Snapshot) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, s)
}
24 changes: 23 additions & 1 deletion internal/statesync/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import (

"github.com/kwilteam/kwil-db/core/log"

cometClient "github.com/cometbft/cometbft/rpc/client"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
)

const (
ABCISnapshotQueryPath = "/snapshot/height"
ABCISnapshotQueryPath = "/snapshot/height"
ABCILatestSnapshotHeightPath = "/snapshot/latest"
)

// StateSyncer is responsible for initializing the database state from the
Expand Down Expand Up @@ -304,3 +306,23 @@ func ChainRPCClient(server string) (*rpchttp.HTTP, error) {
}
return c, nil
}

// GetLatestSnapshotHeight queries the trusted snapshot providers to get the latest snapshot height.
func GetLatestSnapshotInfo(ctx context.Context, client cometClient.ABCIClient) (*Snapshot, error) {
res, err := client.ABCIQuery(ctx, ABCILatestSnapshotHeightPath, nil)
if err != nil {
return nil, err
}

if len(res.Response.Value) == 0 {
return nil, errors.New("no snapshot found")
}

var snap Snapshot
err = json.Unmarshal(res.Response.Value, &snap)
if err != nil {
return nil, err
}

return &snap, nil
}

0 comments on commit d155a3f

Please sign in to comment.