Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added latest snapshot height querying #970

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,17 @@ func buildStatesyncer(d *coreDependencies) *statesync.StateSyncer {
// arbitrary, 10s is a reasonable time out for a http server regardless
// of the location and network routes.
ctx, cancel := context.WithTimeout(d.ctx, 10*time.Second)
res, err := clt.Header(ctx, nil)

// we will first get the latest snapshot height that the trusted node has
latestSnapshot, err := statesync.GetLatestSnapshotInfo(ctx, clt)
if err != nil {
cancel()
d.log.Warnf("failed to get latest snapshot from snap provider: %v", err)
continue
}

latestHeight := int64(latestSnapshot.Height)
res, err := clt.Header(ctx, &latestHeight)
if err != nil {
cancel()
d.log.Warnf("failed to get header from snap provider: %v", err)
Expand Down
45 changes: 33 additions & 12 deletions internal/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -1594,10 +1594,10 @@ func (a *AbciApp) ProcessProposal(ctx context.Context, req *abciTypes.RequestPro

func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abciTypes.ResponseQuery, error) {
a.log.Debug("ABCI Query", zap.String("path", req.Path), zap.String("data", string(req.Data)))

if req.Path == statesync.ABCISnapshotQueryPath { // "/snapshot/height"
switch {
case req.Path == statesync.ABCISnapshotQueryPath:
if a.snapshotter == nil {
return &abciTypes.ResponseQuery{}, nil
return nil, fmt.Errorf("this node is not configured to serve snapshots")
}

var snapshot *statesync.Snapshot
Expand All @@ -1614,15 +1614,36 @@ func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abci
}

if !exists {
return &abciTypes.ResponseQuery{}, nil
return nil, fmt.Errorf("snapshot not found for height %s", height)
}

bts, err := json.Marshal(snapshot)
bts, err := snapshot.MarshalBinary()
if err != nil {
return nil, err
}
return &abciTypes.ResponseQuery{Value: bts}, nil
} else if strings.HasPrefix(req.Path, ABCIPeerFilterPath) {
case req.Path == statesync.ABCILatestSnapshotHeightPath:
if a.snapshotter == nil {
return nil, fmt.Errorf("this node is not configured to serve snapshots")
}
snaps := a.snapshotter.ListSnapshots()
if len(snaps) == 0 {
return nil, fmt.Errorf("no snapshots available")
}
latest := snaps[len(snaps)-1]
for _, snap := range snaps {
if snap.Height > latest.Height {
latest = snap
}
}

bts, err := latest.MarshalBinary()
if err != nil {
return nil, err
}

return &abciTypes.ResponseQuery{Value: bts}, nil
case strings.HasPrefix(req.Path, ABCIPeerFilterPath):
// When CometBFT connects to a peer, it sends two queries to the ABCI application
// using the following paths, with no additional data:
// - `/p2p/filter/addr/<IP:PORT>`
Expand All @@ -1633,7 +1654,7 @@ func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abci

paths := strings.Split(req.Path[ABCIPeerFilterPathLen:], "/")
if len(paths) != 2 {
return &abciTypes.ResponseQuery{Code: 1}, fmt.Errorf("invalid path: %s", req.Path)
return nil, fmt.Errorf("invalid path: %s", req.Path)
}

switch paths[0] {
Expand All @@ -1644,16 +1665,16 @@ func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abci
}
// ID is not in the allowed list of peers, so reject the connection
a.log.Warn("Connection attempt rejected, peer is not allowed to connect", zap.String("peerID", paths[1]))
return &abciTypes.ResponseQuery{Code: 1}, nil
return nil, fmt.Errorf("node rejected connection attempt from peer %s", paths[1])
case "addr":
return &abciTypes.ResponseQuery{Code: abciTypes.CodeTypeOK}, nil
default:
return &abciTypes.ResponseQuery{Code: 1}, fmt.Errorf("invalid path: %s", req.Path)
return nil, fmt.Errorf("invalid path: %s", req.Path)
}

default:
// If the query path is not recognized, return an error.
return nil, fmt.Errorf("unknown query path: %s", req.Path)
}

return &abciTypes.ResponseQuery{}, nil
}

type EventBroadcaster func(ctx context.Context, db sql.DB, block *common.BlockContext) error
Expand Down
8 changes: 8 additions & 0 deletions internal/statesync/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ func loadSnapshot(headerFile string) (*Snapshot, error) {
}
return &snapshot, nil
}

func (s *Snapshot) MarshalBinary() ([]byte, error) {
return json.Marshal(s)
}

func (s *Snapshot) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, s)
}
24 changes: 23 additions & 1 deletion internal/statesync/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import (

"github.com/kwilteam/kwil-db/core/log"

cometClient "github.com/cometbft/cometbft/rpc/client"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
)

const (
ABCISnapshotQueryPath = "/snapshot/height"
ABCISnapshotQueryPath = "/snapshot/height"
ABCILatestSnapshotHeightPath = "/snapshot/latest"
)

// StateSyncer is responsible for initializing the database state from the
Expand Down Expand Up @@ -304,3 +306,23 @@ func ChainRPCClient(server string) (*rpchttp.HTTP, error) {
}
return c, nil
}

// GetLatestSnapshotHeight queries the trusted snapshot providers to get the latest snapshot height.
func GetLatestSnapshotInfo(ctx context.Context, client cometClient.ABCIClient) (*Snapshot, error) {
res, err := client.ABCIQuery(ctx, ABCILatestSnapshotHeightPath, nil)
if err != nil {
return nil, err
}

if len(res.Response.Value) == 0 {
return nil, errors.New("no snapshot found")
}

var snap Snapshot
err = json.Unmarshal(res.Response.Value, &snap)
if err != nil {
return nil, err
}

return &snap, nil
}
Loading