Skip to content

Commit

Permalink
limit range get log
Browse files Browse the repository at this point in the history
  • Loading branch information
unclezoro authored and vietthang207 committed May 11, 2021
1 parent f5ccb5b commit 27f9654
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 36 deletions.
2 changes: 1 addition & 1 deletion accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics)
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics, false)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ var (
utils.ExternalSignerFlag,
utils.NoUSBFlag,
utils.DirectBroadcastFlag,
utils.RangeLimitFlag,
utils.SmartCardDaemonPathFlag,
utils.OverrideIstanbulFlag,
utils.OverrideMuirGlacierFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.KeyStoreDirFlag,
utils.NoUSBFlag,
utils.DirectBroadcastFlag,
utils.RangeLimitFlag,
utils.SmartCardDaemonPathFlag,
utils.NetworkIdFlag,
utils.GoerliFlag,
Expand Down
10 changes: 10 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ var (
Name: "directbroadcast",
Usage: "Enable directly broadcast mined block to all peers",
}
RangeLimitFlag = cli.BoolFlag{
Name: "rangelimit",
Usage: "Enable 5000 blocks limit for range query",
}
AncientFlag = DirectoryFlag{
Name: "datadir.ancient",
Usage: "Data directory for ancient chain segments (default = inside chaindata)",
Expand Down Expand Up @@ -1250,6 +1254,9 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
if ctx.GlobalIsSet(DirectBroadcastFlag.Name) {
cfg.DirectBroadcast = ctx.GlobalBool(DirectBroadcastFlag.Name)
}
if ctx.GlobalIsSet(RangeLimitFlag.Name) {
cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name)
}
if ctx.GlobalIsSet(InsecureUnlockAllowedFlag.Name) {
cfg.InsecureUnlockAllowed = ctx.GlobalBool(InsecureUnlockAllowedFlag.Name)
}
Expand Down Expand Up @@ -1529,6 +1536,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
if ctx.GlobalIsSet(DirectBroadcastFlag.Name) {
cfg.DirectBroadcast = ctx.GlobalBool(DirectBroadcastFlag.Name)
}
if ctx.GlobalIsSet(RangeLimitFlag.Name) {
cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name)
}
if ctx.GlobalIsSet(CacheNoPrefetchFlag.Name) {
cfg.NoPrefetch = ctx.GlobalBool(CacheNoPrefetchFlag.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (s *Ethereum) APIs() []rpc.API {
}, {
Namespace: "eth",
Version: "1.0",
Service: filters.NewPublicFilterAPI(s.APIBackend, false),
Service: filters.NewPublicFilterAPI(s.APIBackend, false, s.config.RangeLimit),
Public: true,
}, {
Namespace: "admin",
Expand Down
1 change: 1 addition & 0 deletions eth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type Config struct {
NoPruning bool // Whether to disable pruning and flush everything to disk
NoPrefetch bool // Whether to disable prefetching and only load state on demand
DirectBroadcast bool
RangeLimit bool

// Whitelist of required block number -> hash values to accept
Whitelist map[uint64]common.Hash `toml:"-"`
Expand Down
30 changes: 16 additions & 14 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,24 @@ type filter struct {
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
backend Backend
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
backend Backend
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
rangeLimit bool
}

// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
func NewPublicFilterAPI(backend Backend, lightMode, rangeLimit bool) *PublicFilterAPI {
api := &PublicFilterAPI{
backend: backend,
chainDb: backend.ChainDb(),
events: NewEventSystem(backend, lightMode),
filters: make(map[rpc.ID]*filter),
backend: backend,
chainDb: backend.ChainDb(),
events: NewEventSystem(backend, lightMode),
filters: make(map[rpc.ID]*filter),
rangeLimit: rangeLimit,
}
go api.timeoutLoop()

Expand Down Expand Up @@ -335,7 +337,7 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([
end = crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics, api.rangeLimit)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -390,7 +392,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
end = f.crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics, api.rangeLimit)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down
4 changes: 2 additions & 2 deletions eth/filters/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
var addr common.Address
addr[0] = byte(i)
addr[1] = byte(i / 256)
filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil, false)
if _, err := filter.Logs(context.Background()); err != nil {
b.Error("filter.Find error:", err)
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func BenchmarkNoBloomBits(b *testing.B) {
b.Log("Running filter benchmarks...")
start := time.Now()
backend := &testBackend{db: db}
filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil)
filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil, false)
filter.Logs(context.Background())
d := time.Since(start)
b.Log("Finished running filter benchmarks")
Expand Down
11 changes: 10 additions & 1 deletion eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package filters
import (
"context"
"errors"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)

const maxFilterBlockRange = 5000

type Backend interface {
ChainDb() ethdb.Database
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
Expand Down Expand Up @@ -59,11 +62,13 @@ type Filter struct {
begin, end int64 // Range interval if filtering multiple blocks

matcher *bloombits.Matcher

rangeLimit bool
}

// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
// figure out whether a particular block is interesting or not.
func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash, rangeLimit bool) *Filter {
// Flatten the address and topic filter clauses into a single bloombits filter
// system. Since the bloombits are not positional, nil topics are permitted,
// which get flattened into a nil byte slice.
Expand All @@ -90,6 +95,7 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres
filter.matcher = bloombits.NewMatcher(size, filters)
filter.begin = begin
filter.end = end
filter.rangeLimit = rangeLimit

return filter
}
Expand Down Expand Up @@ -142,6 +148,9 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
if f.end == -1 {
end = head
}
if f.rangeLimit && (int64(end)-f.begin) > maxFilterBlockRange {
return nil, fmt.Errorf("exceed maximum block range: %d", maxFilterBlockRange)
}
// Gather all indexed logs, and finish with non indexed ones
var (
logs []*types.Log
Expand Down
14 changes: 7 additions & 7 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestBlockSubscription(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestPendingTxFilter(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false, false)

transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestLogFilterCreation(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false, false)

testCases = []struct {
crit FilterCriteria
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestInvalidLogFilterCreation(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false, false)
)

// different situations where log filter creation should fail.
Expand All @@ -336,7 +336,7 @@ func TestInvalidGetLogsRequest(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false, false)
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
)

Expand All @@ -361,7 +361,7 @@ func TestLogFilter(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false, false)

firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestPendingLogsSubscription(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false, false)

firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
Expand Down
16 changes: 8 additions & 8 deletions eth/filters/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func BenchmarkFilters(b *testing.B) {
}
b.ResetTimer()

filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil, false)

for i := 0; i < b.N; i++ {
logs, _ := filter.Logs(context.Background())
Expand Down Expand Up @@ -168,14 +168,14 @@ func TestFilters(t *testing.T) {
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i])
}

filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}, false)

logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
t.Error("expected 4 log, got", len(logs))
}

filter = NewRangeFilter(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
filter = NewRangeFilter(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}, false)
logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
Expand All @@ -184,7 +184,7 @@ func TestFilters(t *testing.T) {
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
}

filter = NewRangeFilter(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
filter = NewRangeFilter(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}, false)
logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
Expand All @@ -193,30 +193,30 @@ func TestFilters(t *testing.T) {
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
}

filter = NewRangeFilter(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}})
filter = NewRangeFilter(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}, false)

logs, _ = filter.Logs(context.Background())
if len(logs) != 2 {
t.Error("expected 2 log, got", len(logs))
}

failHash := common.BytesToHash([]byte("fail"))
filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}})
filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}}, false)

logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}

failAddr := common.BytesToAddress([]byte("failmenow"))
filter = NewRangeFilter(backend, 0, -1, []common.Address{failAddr}, nil)
filter = NewRangeFilter(backend, 0, -1, []common.Address{failAddr}, nil, false)

logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}

filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}, false)

logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
Expand Down
2 changes: 1 addition & 1 deletion graphql/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ func (r *Resolver) Logs(ctx context.Context, args struct{ Filter FilterCriteria
topics = *args.Filter.Topics
}
// Construct the range filter
filter := filters.NewRangeFilter(filters.Backend(r.backend), begin, end, addresses, topics)
filter := filters.NewRangeFilter(filters.Backend(r.backend), begin, end, addresses, topics, false)
return runFilter(ctx, r.backend, filter)
}

Expand Down
2 changes: 1 addition & 1 deletion les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (s *LightEthereum) APIs() []rpc.API {
}, {
Namespace: "eth",
Version: "1.0",
Service: filters.NewPublicFilterAPI(s.ApiBackend, true),
Service: filters.NewPublicFilterAPI(s.ApiBackend, true, s.config.RangeLimit),
Public: true,
}, {
Namespace: "net",
Expand Down
3 changes: 3 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ type Config struct {
// DirectBroadcast enable directly broadcast mined block to all peers
DirectBroadcast bool `toml:",omitempty"`

// RangeLimit enable 5000 blocks limit when handle range query
RangeLimit bool `toml:",omitempty"`

// SmartCardDaemonPath is the path to the smartcard daemon's socket
SmartCardDaemonPath string `toml:",omitempty"`

Expand Down

0 comments on commit 27f9654

Please sign in to comment.