Skip to content

Commit

Permalink
Revert "eth/filters: retrieve logs in async (ethereum#27135)"
Browse files Browse the repository at this point in the history
This reverts commit f875ab9.
  • Loading branch information
devopsbo3 authored Nov 10, 2023
1 parent 38a3335 commit d6f0b01
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 320 deletions.
168 changes: 65 additions & 103 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,32 +106,32 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return f.blockLogs(ctx, header)
}

var (
beginPending = f.begin == rpc.PendingBlockNumber.Int64()
endPending = f.end == rpc.PendingBlockNumber.Int64()
)

// special case for pending logs
if beginPending && !endPending {
return nil, errors.New("invalid block range")
}

// Short-cut if all we care about is pending logs
if beginPending && endPending {
return f.pendingLogs(), nil
if f.begin == rpc.PendingBlockNumber.Int64() {
if f.end != rpc.PendingBlockNumber.Int64() {
return nil, errors.New("invalid block range")
}
return f.pendingLogs()
}

// Figure out the limits of the filter range
header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
return nil, nil
}
var (
err error
head = header.Number.Int64()
pending = f.end == rpc.PendingBlockNumber.Int64()
)
resolveSpecial := func(number int64) (int64, error) {
var hdr *types.Header
switch number {
case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64():
case rpc.LatestBlockNumber.Int64():
return head, nil
case rpc.PendingBlockNumber.Int64():
// we should return head here since we've already captured
// that we need to get the pending logs in the pending boolean above
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if hdr == nil {
return 0, errors.New("latest header not found")
}
return head, nil
case rpc.FinalizedBlockNumber.Int64():
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber)
if hdr == nil {
Expand All @@ -147,92 +147,57 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return hdr.Number.Int64(), nil
}

var err error
// range query need to resolve the special begin/end block number
if f.begin, err = resolveSpecial(f.begin); err != nil {
return nil, err
}
if f.end, err = resolveSpecial(f.end); err != nil {
return nil, err
}

logChan, errChan := f.rangeLogsAsync(ctx)
var logs []*types.Log
for {
select {
case log := <-logChan:
logs = append(logs, log)
case err := <-errChan:
if err != nil {
// if an error occurs during extraction, we do return the extracted data
return logs, err
}
// Append the pending ones
if endPending {
pendingLogs := f.pendingLogs()
logs = append(logs, pendingLogs...)
}
return logs, nil
}
}
}

// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
// it creates and returns two channels: one for delivering log data, and one for reporting errors.
func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan error) {
// Gather all indexed logs, and finish with non indexed ones
var (
logChan = make(chan *types.Log)
errChan = make(chan error)
logs []*types.Log
end = uint64(f.end)
size, sections = f.sys.backend.BloomStatus()
)

go func() {
defer func() {
close(errChan)
close(logChan)
}()

// Gather all indexed logs, and finish with non indexed ones
var (
end = uint64(f.end)
size, sections = f.sys.backend.BloomStatus()
err error
)
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
indexed = end + 1
}
if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil {
errChan <- err
return
}
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
logs, err = f.indexedLogs(ctx, end)
} else {
logs, err = f.indexedLogs(ctx, indexed-1)
}

if err := f.unindexedLogs(ctx, end, logChan); err != nil {
errChan <- err
return
if err != nil {
return logs, err
}

errChan <- nil
}()

return logChan, errChan
}
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)
if pending {
pendingLogs, err := f.pendingLogs()
if err != nil {
return nil, err
}
logs = append(logs, pendingLogs...)
}
return logs, err
}

// indexedLogs returns the logs matching the filter criteria based on the bloom
// bits indexed available locally or via the network.
func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
// Create a matcher session and request servicing from the backend
matches := make(chan uint64, 64)

session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
if err != nil {
return err
return nil, err
}
defer session.Close()

f.sys.backend.ServiceFilter(ctx, session)

// Iterate over the matches until exhausted or context closed
var logs []*types.Log

for {
select {
case number, ok := <-matches:
Expand All @@ -242,50 +207,47 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *type
if err == nil {
f.begin = int64(end) + 1
}
return err
return logs, err
}
f.begin = int64(number) + 1

// Retrieve the suggested block and pull any truly matching logs
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
if header == nil || err != nil {
return err
return logs, err
}
found, err := f.checkMatches(ctx, header)
if err != nil {
return err
}
for _, log := range found {
logChan <- log
return logs, err
}
logs = append(logs, found...)

case <-ctx.Done():
return ctx.Err()
return logs, ctx.Err()
}
}
}

// unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
var logs []*types.Log

for ; f.begin <= int64(end); f.begin++ {
if f.begin%10 == 0 && ctx.Err() != nil {
return logs, ctx.Err()
}
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
return err
return logs, err
}
found, err := f.blockLogs(ctx, header)
if err != nil {
return err
}
for _, log := range found {
select {
case logChan <- log:
case <-ctx.Done():
return ctx.Err()
}
return logs, err
}
logs = append(logs, found...)
}
return nil
return logs, nil
}

// blockLogs returns the logs matching the filter criteria within a single block.
Expand Down Expand Up @@ -332,19 +294,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ
}

// pendingLogs returns the logs matching the filter criteria within the pending block.
func (f *Filter) pendingLogs() []*types.Log {
func (f *Filter) pendingLogs() ([]*types.Log, error) {
block, receipts := f.sys.backend.PendingBlockAndReceipts()
if block == nil {
return nil
return nil, errors.New("pending state not available")
}
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
var unfiltered []*types.Log
for _, r := range receipts {
unfiltered = append(unfiltered, r.Logs...)
}
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil
}
return nil
return nil, nil
}

func includes(addresses []common.Address, a common.Address) bool {
Expand Down
4 changes: 1 addition & 3 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ type testBackend struct {
rmLogsFeed event.Feed
pendingLogsFeed event.Feed
chainFeed event.Feed
pendingBlock *types.Block
pendingReceipts types.Receipts
}

func (b *testBackend) ChainConfig() *params.ChainConfig {
Expand Down Expand Up @@ -126,7 +124,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint
}

func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return b.pendingBlock, b.pendingReceipts
return nil, nil
}

func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
Expand Down
Loading

0 comments on commit d6f0b01

Please sign in to comment.