Skip to content

Commit

Permalink
saving what I have
Browse files Browse the repository at this point in the history
  • Loading branch information
kcalvinalvin committed Jan 15, 2024
1 parent 91cdf0d commit 8c43a87
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 23 deletions.
3 changes: 3 additions & 0 deletions netsync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcd/mempool"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/query"

Check failure on line 15 in netsync/interface.go

View workflow job for this annotation

GitHub Actions / Unit race

no required module provides package github.com/lightninglabs/neutrino/query; to add it:

Check failure on line 15 in netsync/interface.go

View workflow job for this annotation

GitHub Actions / Unit coverage

no required module provides package github.com/lightninglabs/neutrino/query; to add it:

Check failure on line 15 in netsync/interface.go

View workflow job for this annotation

GitHub Actions / Build

no required module provides package github.com/lightninglabs/neutrino/query; to add it:
)

// PeerNotifier exposes methods to notify peers of status changes to
Expand All @@ -38,4 +39,6 @@ type Config struct {
MaxPeers int

FeeEstimator *mempool.FeeEstimator

ConnectedPeers func() (<-chan query.Peer, func(), error)
}
177 changes: 161 additions & 16 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/btcsuite/btcd/mempool"
peerpkg "github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/query"
)

const (
Expand Down Expand Up @@ -173,6 +174,57 @@ func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) {
m[hash] = struct{}{}
}

type checkpointedBlocksQuery struct {
syncMgr *SyncManager
msgs []wire.Message
msgBlockChan chan *wire.MsgBlock
}

func (c *checkpointedBlocksQuery) handleResponse(req, resp wire.Message,
peerAddr string) query.Progress {

r, ok := resp.(*wire.MsgBlock)
if !ok {
// We are only looking for block messages.
return query.Progress{
Finished: false,
Progressed: false,
}
}

//log.Infof("received block %s", r.BlockHash().String())

select {
case c.msgBlockChan <- r:
//state := c.syncMgr.peerStates[peerAddr]
log.Infof("got block %v from peer %v", r.BlockHash().String(), peerAddr)
case <-c.syncMgr.quit:
return query.Progress{
Finished: false,
Progressed: false,
}
}

//log.Infof("send query finish")
return query.Progress{
Finished: true,
Progressed: true,
}
}

// requests
func (c *checkpointedBlocksQuery) requests() []*query.Request {
reqs := make([]*query.Request, len(c.msgs))
for idx, m := range c.msgs {
reqs[idx] = &query.Request{
Req: m,
HandleResp: c.handleResponse,
}
}

return reqs
}

// SyncManager is used to communicate block related messages with peers. The
// SyncManager is started as by executing Start() in a goroutine. Once started,
// it selects peers to sync from and starts the initial block download. Once the
Expand All @@ -190,6 +242,10 @@ type SyncManager struct {
wg sync.WaitGroup
quit chan struct{}

fetchManager query.WorkManager
queuedBlocks *list.List
startBlock *list.Element

// These fields should only be accessed from the blockHandler thread
rejectedTxns map[chainhash.Hash]struct{}
requestedTxns map[chainhash.Hash]struct{}
Expand Down Expand Up @@ -376,7 +432,7 @@ func (sm *SyncManager) startSync() {
// event the progress time hasn't been updated recently.
sm.lastProgressTime = time.Now()
} else {
log.Warnf("No sync peer candidates available")
log.Warnf("No sync peer candidates available. Chain tip at %s(%d)")
}
}

Expand Down Expand Up @@ -688,15 +744,16 @@ func (sm *SyncManager) current() bool {
// handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer := bmsg.peer
state, exists := sm.peerStates[peer]
_, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received block message from unknown peer %s", peer)
return
}

// If we didn't ask for this block then the peer is misbehaving.
blockHash := bmsg.block.Hash()
if _, exists = state.requestedBlocks[*blockHash]; !exists {
//if _, exists = state.requestedBlocks[*blockHash]; !exists {
if _, exists = sm.requestedBlocks[*blockHash]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
Expand Down Expand Up @@ -737,14 +794,13 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// Remove block from request maps. Either chain will know about it and
// so we shouldn't have any more instances of trying to fetch it, or we
// will fail the insert and thus we'll retry next time we get an inv.
delete(state.requestedBlocks, *blockHash)
//delete(state.requestedBlocks, *blockHash)
delete(sm.requestedBlocks, *blockHash)

// Process the block to include validation, best chain selection, orphan
// handling, etc.
_, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags)
if err != nil {
// When the error is a rule error, it means the block was simply
if err != nil { // When the error is a rule error, it means the block was simply
// rejected as opposed to something actually going wrong, so log
// it as such. Otherwise, something really did go wrong, so log
// it as an actual error.
Expand Down Expand Up @@ -855,7 +911,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// getting short.
if !isCheckpointBlock {
if sm.startHeader != nil &&
len(state.requestedBlocks) < minInFlightBlocks {
len(sm.requestedBlocks) < minInFlightBlocks {
sm.fetchHeaderBlocks()
}
return
Expand Down Expand Up @@ -900,12 +956,29 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
// list of blocks to be downloaded based on the current list of headers.
func (sm *SyncManager) fetchHeaderBlocks() {
//log.Infof("fetchHeaderBlocks called")
// Nothing to do if there is no start header.
if sm.startHeader == nil {
log.Warnf("fetchHeaderBlocks called with no start header")
return
}

node, ok := sm.startHeader.Value.(*headerNode)
if !ok {
log.Warn("Header list node type is not a headerNode")
return
}

if node.height-1024 > sm.chain.BestSnapshot().Height {
log.Infof("return as current best chain is %d "+
"but startheader is %d",
sm.chain.BestSnapshot().Height,
node.height)
return
}

queryMessages := make([]wire.Message, 0, sm.headerList.Len())

// Build up a getdata request for the list of blocks the headers
// describe. The size hint will be limited to wire.MaxInvPerMsg by
// the function, so no need to double check it here.
Expand All @@ -918,19 +991,15 @@ func (sm *SyncManager) fetchHeaderBlocks() {
continue
}

iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
iv := wire.NewInvVect(wire.InvTypeWitnessBlock, node.hash)
haveInv, err := sm.haveInventory(iv)
if err != nil {
log.Warnf("Unexpected failure when checking for "+
"existing inventory during header block "+
"fetch: %v", err)
}
if !haveInv {
syncPeerState := sm.peerStates[sm.syncPeer]

sm.requestedBlocks[*node.hash] = struct{}{}
syncPeerState.requestedBlocks[*node.hash] = struct{}{}

// If we're fetching from a witness enabled peer
// post-fork, then ensure that we receive all the
// witness data in the blocks.
Expand All @@ -940,15 +1009,61 @@ func (sm *SyncManager) fetchHeaderBlocks() {

gdmsg.AddInvVect(iv)
numRequested++
queryMessages = append(queryMessages, gdmsg)
gdmsg = wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len()))
}

sm.startHeader = e.Next()
if numRequested >= wire.MaxInvPerMsg {

if numRequested > 1024 {
break
}

//// Only batch 16 blocks per work.
//if numRequested != 0 && numRequested%1 == 0 {
// queryMessages = append(queryMessages, gdmsg)
// gdmsg = wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len()))

// if numRequested >= wire.MaxInvPerMsg {
// //queryMessages = append(queryMessages, gdmsg)
// break
// }
//}
}
if len(gdmsg.InvList) > 0 {
sm.syncPeer.QueueMessage(gdmsg, nil)

blockChan := make(chan *wire.MsgBlock, numRequested)
if len(queryMessages) == 0 {
return
}
q := checkpointedBlocksQuery{
syncMgr: sm,
msgs: queryMessages,
msgBlockChan: blockChan,
}
log.Infof("query messages: %d", len(queryMessages))

// Not sure if this is even needed. Maybe it's ok to just ignore stuff
// from the error channel since any errors in the blocks are gonna be
// caught anyways.
go func() {
errChan := sm.fetchManager.Query(
q.requests(),
query.Cancel(sm.quit),
query.NoRetryMax(),
)

select {
//case msgBlock := <-blockChan:
case err := <-errChan:
if err != nil {
log.Infof("err: %v", err)
}

case <-sm.quit:
return
}

}()
}

// handleHeadersMsg handles block header messages from all peers. Headers are
Expand Down Expand Up @@ -1551,6 +1666,24 @@ func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done
}

sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}

//if !sm.headersFirstMode {
// sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
// return
//}

//// Ignore QueueBlock requests on headers first.
//done <- struct{}{}

////for block.MsgBlock().Header.PrevBlock != sm.chain.BestSnapshot().Hash {
//// done <- struct{}{}

//// select {
//// case <-sm.quit:
//// }
////}

////sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
}

// QueueInv adds the passed inv message and peer to the block handling queue.
Expand Down Expand Up @@ -1605,6 +1738,10 @@ func (sm *SyncManager) Start() {
return
}

if err := sm.fetchManager.Start(); err != nil {
log.Info(err)
}

log.Trace("Starting sync manager")
sm.wg.Add(1)
go sm.blockHandler()
Expand Down Expand Up @@ -1675,7 +1812,15 @@ func New(config *Config) (*SyncManager, error) {
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
feeEstimator: config.FeeEstimator,
queuedBlocks: list.New(),
fetchManager: query.NewWorkManager(
&query.Config{
ConnectedPeers: config.ConnectedPeers,
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
},
),
feeEstimator: config.FeeEstimator,
}

best := sm.chain.BestSnapshot()
Expand Down
Loading

0 comments on commit 8c43a87

Please sign in to comment.