Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, eth, les: fix messy code #15367

Merged
merged 3 commits into from
Oct 25, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 24 additions & 43 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,16 @@ type partialMatches struct {
// Retrieval represents a request for retrieval task assignments for a given
// bit with the given number of fetch elements, or a response for such a request.
// It can also have the actual results set to be used as a delivery data struct.
//
// The contest and error fields are used by the light client to terminate matching
// early if an error is enountered on some path of the pipeline.
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
Error error
Context context.Context

Context context.Context
Error error
}

// Matcher is a pipelined system of schedulers and logic matchers which perform
Expand Down Expand Up @@ -506,54 +510,31 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
type MatcherSession struct {
matcher *Matcher

quit chan struct{} // Quit channel to request pipeline termination
kill chan struct{} // Term channel to signal non-graceful forced shutdown
ctx context.Context
err error
stopping bool
lock sync.Mutex
pend sync.WaitGroup
closer sync.Once // Sync object to ensure we only ever close once
quit chan struct{} // Quit channel to request pipeline termination
kill chan struct{} // Term channel to signal non-graceful forced shutdown

ctx context.Context // Context used by the light client to abort filtering
err atomic.Value // Global error to track retrieval failures deep in the chain

pend sync.WaitGroup
}

// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
func (s *MatcherSession) Close() {
s.lock.Lock()
stopping := s.stopping
s.stopping = true
s.lock.Unlock()
// ensure that we only close the session once
if stopping {
return
}

// Bail out if the matcher is not running
select {
case <-s.quit:
return
default:
}
// Signal termination and wait for all goroutines to tear down
close(s.quit)
time.AfterFunc(time.Second, func() { close(s.kill) })
s.pend.Wait()
s.closer.Do(func() {
// Signal termination and wait for all goroutines to tear down
close(s.quit)
time.AfterFunc(time.Second, func() { close(s.kill) })
s.pend.Wait()
})
}

// setError sets an error and stops the session
func (s *MatcherSession) setError(err error) {
s.lock.Lock()
s.err = err
s.lock.Unlock()
s.Close()
}

// Error returns an error if one has happened during the session
// Error returns any failure encountered during the matching session.
func (s *MatcherSession) Error() error {
s.lock.Lock()
defer s.lock.Unlock()

return s.err
return s.err.Load().(error)
}

// AllocateRetrieval assigns a bloom bit index to a client process that can either
Expand Down Expand Up @@ -655,9 +636,9 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan

result := <-request
if result.Error != nil {
s.setError(result.Error)
s.err.Store(result.Error)
s.Close()
}

s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
}
}
Expand Down
51 changes: 24 additions & 27 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,25 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
Reset(section uint64, lastSectionHead common.Hash) error
Reset(section uint64, prevHead common.Hash) error

// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(header *types.Header)

// Commit finalizes the section metadata and stores it into the database. This
// interface will usually be a batch writer.
// Commit finalizes the section metadata and stores it into the database.
Commit() error
}

// ChainIndexerChain interface is used for connecting the indexer to a blockchain
type ChainIndexerChain interface {
// CurrentHeader retrieves the latest locally known header.
CurrentHeader() *types.Header

// SubscribeChainEvent subscribes to new head header notifications.
SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
}

// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
Expand Down Expand Up @@ -114,21 +122,14 @@ func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
c.setValidSections(section + 1)
}

// IndexerChain interface is used for connecting the indexer to a blockchain
type IndexerChain interface {
CurrentHeader() *types.Header
SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
}

// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
func (c *ChainIndexer) Start(chain IndexerChain) {
ch := make(chan ChainEvent, 10)
sub := chain.SubscribeChainEvent(ch)
currentHeader := chain.CurrentHeader()
func (c *ChainIndexer) Start(chain ChainIndexerChain) {
events := make(chan ChainEvent, 10)
sub := chain.SubscribeChainEvent(events)

go c.eventLoop(currentHeader, ch, sub)
go c.eventLoop(chain.CurrentHeader(), events, sub)
}

// Close tears down all goroutines belonging to the indexer and returns any error
Expand All @@ -149,14 +150,12 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
}

// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}

// Return any failures
switch {
case len(errs) == 0:
Expand All @@ -173,7 +172,7 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) {
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)

Expand All @@ -193,7 +192,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent
errc <- nil
return

case ev, ok := <-ch:
case ev, ok := <-events:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
Expand All @@ -202,9 +201,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct, it will panic after light client checkpoint syncing. I think that is the only case where you don't have a common ancestor in the header chain. We can think about a nicer workaround for this corner case but this one will definitely break it.

}
c.newHead(header.Number.Uint64(), false)

Expand Down Expand Up @@ -259,8 +256,8 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
updated time.Time
updateMsg bool
updating bool
updated time.Time
)

for {
Expand All @@ -277,7 +274,7 @@ func (c *ChainIndexer) updateLoop() {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
updateMsg = true
updating = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
Expand All @@ -300,8 +297,8 @@ func (c *ChainIndexer) updateLoop() {
if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
if c.storedSections == c.knownSections && updateMsg {
updateMsg = false
if c.storedSections == c.knownSections && updating {
updating = false
c.log.Info("Finished upgrading chain index")
}

Expand Down Expand Up @@ -412,7 +409,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) {
c.storedSections = sections // needed if new > old
}

// sectionHead retrieves the last block hash of a processed section from the
// SectionHead retrieves the last block hash of a processed section from the
// index database.
func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
Expand Down
2 changes: 1 addition & 1 deletion core/chain_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
return b.stored * b.indexer.sectionSize
}

func (b *testChainIndexBackend) Reset(section uint64, lastSectionHead common.Hash) error {
func (b *testChainIndexBackend) Reset(section uint64, prevHead common.Hash) error {
b.section = section
b.headerCnt = 0
return nil
Expand Down
10 changes: 5 additions & 5 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,13 @@ func (h *priceHeap) Pop() interface{} {
// txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way.
type txPricedList struct {
all *map[common.Hash]txLookupRec // Pointer to the map of all transactions
items *priceHeap // Heap of prices of all the stored transactions
stales int // Number of stale price points to (re-heap trigger)
all *map[common.Hash]*types.Transaction // Pointer to the map of all transactions
items *priceHeap // Heap of prices of all the stored transactions
stales int // Number of stale price points to (re-heap trigger)
}

// newTxPricedList creates a new price-sorted transaction heap.
func newTxPricedList(all *map[common.Hash]txLookupRec) *txPricedList {
func newTxPricedList(all *map[common.Hash]*types.Transaction) *txPricedList {
return &txPricedList{
all: all,
items: new(priceHeap),
Expand All @@ -416,7 +416,7 @@ func (l *txPricedList) Removed() {

l.stales, l.items = 0, &reheap
for _, tx := range *l.all {
*l.items = append(*l.items, tx.tx)
*l.items = append(*l.items, tx)
}
heap.Init(l.items)
}
Expand Down
Loading