Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added latest snapshot height querying #970

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,17 @@ func buildStatesyncer(d *coreDependencies) *statesync.StateSyncer {
// arbitrary, 10s is a reasonable time out for a http server regardless
// of the location and network routes.
ctx, cancel := context.WithTimeout(d.ctx, 10*time.Second)
res, err := clt.Header(ctx, nil)

// we will first get the latest snapshot height that the trusted node has
latestSnapshot, err := statesync.GetLatestSnapshotInfo(ctx, clt)
if err != nil {
cancel()
d.log.Warnf("failed to get latest snapshot from snap provider: %v", err)
continue
}

latestHeight := int64(latestSnapshot.Height)
res, err := clt.Header(ctx, &latestHeight)
if err != nil {
cancel()
d.log.Warnf("failed to get header from snap provider: %v", err)
Expand Down
33 changes: 28 additions & 5 deletions internal/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,9 @@ func (a *AbciApp) ProcessProposal(ctx context.Context, req *abciTypes.RequestPro
func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abciTypes.ResponseQuery, error) {
a.log.Debug("ABCI Query", zap.String("path", req.Path), zap.String("data", string(req.Data)))

if req.Path == statesync.ABCISnapshotQueryPath { // "/snapshot/height"
switch {
// TODO: why do we not return errors in case of the snapshotter not being configured?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, I don't see a major issue with erroring out here. Let's add that.

case req.Path == statesync.ABCISnapshotQueryPath:
if a.snapshotter == nil {
return &abciTypes.ResponseQuery{}, nil
}
Expand All @@ -1622,7 +1624,28 @@ func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abci
return nil, err
}
return &abciTypes.ResponseQuery{Value: bts}, nil
} else if strings.HasPrefix(req.Path, ABCIPeerFilterPath) {
case req.Path == statesync.ABCILatestSnapshotHeightPath:
if a.snapshotter == nil {
return &abciTypes.ResponseQuery{}, nil
}
snaps := a.snapshotter.ListSnapshots()
if len(snaps) == 0 {
return &abciTypes.ResponseQuery{}, nil
}
latest := snaps[len(snaps)-1]
for _, snap := range snaps {
if snap.Height > latest.Height {
latest = snap
}
}

bts, err := latest.MarshalBinary()
if err != nil {
return nil, err
}

return &abciTypes.ResponseQuery{Value: bts}, nil
case strings.HasPrefix(req.Path, ABCIPeerFilterPath):
// When CometBFT connects to a peer, it sends two queries to the ABCI application
// using the following paths, with no additional data:
// - `/p2p/filter/addr/<IP:PORT>`
Expand Down Expand Up @@ -1650,10 +1673,10 @@ func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abci
default:
return &abciTypes.ResponseQuery{Code: 1}, fmt.Errorf("invalid path: %s", req.Path)
}

default:
// If the query path is not recognized, return an error.
return &abciTypes.ResponseQuery{Code: 1}, fmt.Errorf("unknown query path: %s", req.Path)
}

return &abciTypes.ResponseQuery{}, nil
}

type EventBroadcaster func(ctx context.Context, db sql.DB, block *common.BlockContext) error
Expand Down
8 changes: 8 additions & 0 deletions internal/statesync/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ func loadSnapshot(headerFile string) (*Snapshot, error) {
}
return &snapshot, nil
}

func (s *Snapshot) MarshalBinary() ([]byte, error) {
return json.Marshal(s)
}

func (s *Snapshot) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, s)
}
24 changes: 23 additions & 1 deletion internal/statesync/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import (

"github.com/kwilteam/kwil-db/core/log"

cometClient "github.com/cometbft/cometbft/rpc/client"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
)

const (
ABCISnapshotQueryPath = "/snapshot/height"
ABCISnapshotQueryPath = "/snapshot/height"
ABCILatestSnapshotHeightPath = "/snapshot/latest"
)

// StateSyncer is responsible for initializing the database state from the
Expand Down Expand Up @@ -304,3 +306,23 @@ func ChainRPCClient(server string) (*rpchttp.HTTP, error) {
}
return c, nil
}

// GetLatestSnapshotHeight queries the trusted snapshot providers to get the latest snapshot height.
func GetLatestSnapshotInfo(ctx context.Context, client cometClient.ABCIClient) (*Snapshot, error) {
res, err := client.ABCIQuery(ctx, ABCILatestSnapshotHeightPath, nil)
if err != nil {
return nil, err
}

if len(res.Response.Value) == 0 {
return nil, errors.New("no snapshot found")
}

var snap Snapshot
err = json.Unmarshal(res.Response.Value, &snap)
if err != nil {
return nil, err
}

return &snap, nil
}
Loading