From 875d18e85b65451c66c8eaefda0f527c618ec0da Mon Sep 17 00:00:00 2001 From: Tangui Clairet <181825613+Tangui-Bitfly@users.noreply.github.com> Date: Thu, 21 Nov 2024 19:32:52 +0100 Subject: [PATCH] Bigtable with :muscle: --- backend/internal/e2e/data_test.go | 94 +++ backend/pkg/commons/db2/data/data.go | 185 ++---- .../commons/db2/data/data_external_test.go | 8 +- backend/pkg/commons/db2/data/data_test.go | 32 +- backend/pkg/commons/db2/data/filter.go | 255 ++------ backend/pkg/commons/db2/data/filter_test.go | 152 ++--- backend/pkg/commons/db2/data/keys.go | 298 ++++----- backend/pkg/commons/db2/data/option.go | 108 ++-- backend/pkg/commons/db2/data/parser.go | 58 ++ backend/pkg/commons/db2/database/bigtable.go | 102 ++- .../pkg/commons/db2/database/bigtable_test.go | 38 +- backend/pkg/commons/db2/database/option.go | 36 +- backend/pkg/commons/db2/database/remote.go | 2 +- backend/pkg/commons/db2/database/store.go | 2 +- .../pkg/commons/db2/databasetest/bigtable.go | 1 + backend/pkg/commons/db2/metadata/key.go | 54 ++ backend/pkg/commons/indexer/cache.go | 15 + backend/pkg/commons/indexer/indexer.go | 67 ++ backend/pkg/commons/indexer/indexer_test.go | 71 +++ backend/pkg/commons/indexer/tranformer.go | 163 +++++ backend/pkg/commons/rpc/erigon.go | 12 +- backend/pkg/commons/types/eth1.pb.go | 582 ++++++++++-------- backend/pkg/commons/types/eth1.proto | 17 + 23 files changed, 1394 insertions(+), 958 deletions(-) create mode 100644 backend/internal/e2e/data_test.go create mode 100644 backend/pkg/commons/db2/data/parser.go create mode 100644 backend/pkg/commons/db2/metadata/key.go create mode 100644 backend/pkg/commons/indexer/cache.go create mode 100644 backend/pkg/commons/indexer/indexer.go create mode 100644 backend/pkg/commons/indexer/indexer_test.go create mode 100644 backend/pkg/commons/indexer/tranformer.go diff --git a/backend/internal/e2e/data_test.go b/backend/internal/e2e/data_test.go new file mode 100644 index 000000000..c75f039e1 --- /dev/null +++ b/backend/internal/e2e/data_test.go @@ -0,0 +1,94 @@ +package e2e + +import ( + "context" + "encoding/hex" + "fmt" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + + "github.com/gobitfly/beaconchain/internal/th" + "github.com/gobitfly/beaconchain/pkg/commons/db2/data" + "github.com/gobitfly/beaconchain/pkg/commons/db2/database" + "github.com/gobitfly/beaconchain/pkg/commons/db2/databasetest" + "github.com/gobitfly/beaconchain/pkg/commons/indexer" + "github.com/gobitfly/beaconchain/pkg/commons/rpc" + "github.com/gobitfly/beaconchain/pkg/commons/types" +) + +func TestStoreWithBackend(t *testing.T) { + clientBT, adminBT := databasetest.NewBigTable(t) + bigtable, err := database.NewBigTableWithClient(context.Background(), clientBT, adminBT, data.Schema) + if err != nil { + t.Fatal(err) + } + + store := data.NewStore(database.Wrap(bigtable, data.Table)) + backend := th.NewBackend(t) + _, usdt := backend.DeployToken(t, "usdt", "usdt", backend.BankAccount.From) + + transform := indexer.NewTransformer(indexer.NoopCache{}) + indexer := indexer.New(store, transform.Tx, transform.ERC20) + + client, err := rpc.NewErigonClient(backend.Endpoint) + if err != nil { + t.Fatal(err) + } + + var addresses []common.Address + for i := 0; i < 10; i++ { + temp := th.CreateEOA(t) + addresses = append(addresses, temp.From) + for j := 0; j < 25; j++ { + if err := backend.Client().SendTransaction(context.Background(), backend.MakeTx(t, backend.BankAccount, &temp.From, big.NewInt(1), nil)); err != nil { + t.Fatal(err) + } + if _, err := usdt.Mint(backend.BankAccount.TransactOpts, temp.From, big.NewInt(1)); err != nil { + t.Fatal(i, j, err) + } + backend.Commit() + lastBlock, err := backend.Client().BlockNumber(context.Background()) + if err != nil { + t.Fatal(err) + } + block, _, err := client.GetBlock(int64(lastBlock), "geth") + if err != nil { + t.Fatal(err) + } + if err := indexer.IndexBlocksWithTransformers(fmt.Sprintf("%d", backend.ChainID), []*types.Eth1Block{block}); err != nil { + t.Fatal(err) + } + } + } + + efficiencies := make(map[string]int64) + interactions, _, err := store.Get(addresses, nil, 25, data.WithDatabaseStats(func(msg string, args ...any) { + var efficiency int64 + var rowRange string + for i := 0; i < len(args); i = i + 2 { + if args[i].(string) == database.KeyStatEfficiency { + efficiency = args[i+1].(int64) + } + if args[i].(string) == database.KeyStatRange { + rowRange = args[i+1].(string) + } + } + efficiencies[rowRange] = efficiency + })) + if err != nil { + t.Fatal(err) + } + for _, interaction := range interactions { + t.Log(interaction.Type, interaction.ChainID, "0x"+interaction.From, "0x"+interaction.To, "0x"+hex.EncodeToString(interaction.Hash), interaction.Time) + } + if got, want := len(efficiencies), len(addresses); got != want { + t.Errorf("got %d want %d", got, want) + } + for rowRange, efficiency := range efficiencies { + if got, want := efficiency, int64(1); got != want { + t.Errorf("efficiency for %s: got %d, want %d", rowRange, got, want) + } + } +} diff --git a/backend/pkg/commons/db2/data/data.go b/backend/pkg/commons/db2/data/data.go index 24879dc01..82690245e 100644 --- a/backend/pkg/commons/db2/data/data.go +++ b/backend/pkg/commons/db2/data/data.go @@ -26,112 +26,45 @@ func NewStore(store database.Database) Store { } } -type TransferWithIndexes struct { - Indexed *types.Eth1ERC20Indexed - TxIndex int - LogIndex int -} - -func (store Store) BlockERC20TransfersToItems(chainID string, transfers []TransferWithIndexes) (map[string][]database.Item, error) { - items := make(map[string][]database.Item) - for _, transfer := range transfers { - b, err := proto.Marshal(transfer.Indexed) - if err != nil { - return nil, err - } - key := keyERC20(chainID, transfer.Indexed.ParentHash, transfer.LogIndex) - item := []database.Item{{Family: defaultFamily, Column: key}} - items[key] = []database.Item{{Family: defaultFamily, Column: dataColumn, Data: b}} - - items[keyERC20Time(chainID, transfer.Indexed, transfer.Indexed.From, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20Time(chainID, transfer.Indexed, transfer.Indexed.To, transfer.TxIndex, transfer.LogIndex)] = item - - items[keyERC20ContractAllTime(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20ContractTime(chainID, transfer.Indexed, transfer.Indexed.From, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20ContractTime(chainID, transfer.Indexed, transfer.Indexed.To, transfer.TxIndex, transfer.LogIndex)] = item - - items[keyERC20To(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20From(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20Sent(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20Received(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item - } - return items, nil +func (store Store) AddItems(items map[string][]database.Item) error { + return store.db.BulkAdd(items) } func (store Store) AddBlockERC20Transfers(chainID string, transactions []TransferWithIndexes) error { - items, err := store.BlockERC20TransfersToItems(chainID, transactions) + items, err := BlockERC20TransfersToItemsV2(chainID, transactions) if err != nil { return err } return store.db.BulkAdd(items) } -func (store Store) BlockTransactionsToItems(chainID string, transactions []*types.Eth1TransactionIndexed) (map[string][]database.Item, error) { - items := make(map[string][]database.Item) - for i, transaction := range transactions { - b, err := proto.Marshal(transaction) - if err != nil { - return nil, err - } - key := keyTx(chainID, transaction.GetHash()) - item := []database.Item{{Family: defaultFamily, Column: key}} - items[key] = []database.Item{{Family: defaultFamily, Column: dataColumn, Data: b}} - items[keyTxSent(chainID, transaction, i)] = item - items[keyTxReceived(chainID, transaction, i)] = item - - items[keyTxTime(chainID, transaction, transaction.To, i)] = item - items[keyTxBlock(chainID, transaction, transaction.To, i)] = item - items[keyTxMethod(chainID, transaction, transaction.To, i)] = item - - items[keyTxTime(chainID, transaction, transaction.From, i)] = item - items[keyTxBlock(chainID, transaction, transaction.From, i)] = item - items[keyTxMethod(chainID, transaction, transaction.From, i)] = item - - if transaction.ErrorMsg != "" { - items[keyTxError(chainID, transaction, transaction.To, i)] = item - items[keyTxError(chainID, transaction, transaction.From, i)] = item - } - - if transaction.IsContractCreation { - items[keyTxContractCreation(chainID, transaction, transaction.To, i)] = item - items[keyTxContractCreation(chainID, transaction, transaction.From, i)] = item - } - } - return items, nil -} - func (store Store) AddBlockTransactions(chainID string, transactions []*types.Eth1TransactionIndexed) error { - items, err := store.BlockTransactionsToItems(chainID, transactions) + items, err := BlockTransactionsToItemsV2(chainID, transactions) if err != nil { return err } return store.db.BulkAdd(items) } -func (store Store) Get(chainIDs []string, addresses []common.Address, prefixes map[string]map[string]string, limit int64, opts ...Option) ([]*Interaction, map[string]map[string]string, error) { - sources := map[formatType]unMarshalInteraction{ - typeTx: unMarshalTx, - typeTransfer: unMarshalTransfer, - } +func (store Store) Get(addresses []common.Address, prefixes map[string]string, limit int64, opts ...Option) ([]*Interaction, map[string]string, error) { options := apply(opts) - if options.ignoreTxs { - delete(sources, typeTx) + + filter, err := newQueryFilter(options) + if err != nil { + return nil, nil, err } - if options.ignoreTransfers { - delete(sources, typeTransfer) + databaseOptions := []database.Option{ + database.WithLimit(limit), + database.WithOpenRange(true), } - var interactions []*interactionWithInfo - for interactionType, unMarshalFunc := range sources { - filter, err := makeFilters(options, interactionType) - if err != nil { - return nil, nil, err - } - temp, err := store.getBy(unMarshalFunc, chainIDs, addresses, prefixes, limit, filter) - if err != nil { - return nil, nil, err - } - interactions = append(interactions, temp...) + if options.statsReporter != nil { + databaseOptions = append(databaseOptions, database.WithStats(options.statsReporter)) + } + interactions, err := store.getBy(addresses, prefixes, filter, databaseOptions) + if err != nil { + return nil, nil, err } + sort.Sort(byTimeDesc(interactions)) if int64(len(interactions)) > limit { interactions = interactions[:limit] @@ -139,59 +72,59 @@ func (store Store) Get(chainIDs []string, addresses []common.Address, prefixes m var res []*Interaction if prefixes == nil { - prefixes = make(map[string]map[string]string) + prefixes = make(map[string]string) } for i := 0; i < len(interactions); i++ { - if prefixes[interactions[i].chainID] == nil { - prefixes[interactions[i].chainID] = make(map[string]string) - } - prefixes[interactions[i].chainID][interactions[i].root] = interactions[i].key + prefixes[interactions[i].root] = interactions[i].key res = append(res, interactions[i].Interaction) } return res, prefixes, nil } -func (store Store) getBy(unMarshal unMarshalInteraction, chainIDs []string, addresses []common.Address, prefixes map[string]map[string]string, limit int64, condition filter) ([]*interactionWithInfo, error) { +func (store Store) getBy(addresses []common.Address, prefixes map[string]string, condition filter, databaseOptions []database.Option) ([]*interactionWithInfo, error) { var interactions []*interactionWithInfo - for _, chainID := range chainIDs { - for _, address := range addresses { - root := condition.get(chainID, address) - prefix := root - if prefixes != nil && prefixes[chainID] != nil && prefixes[chainID][root] != "" { - prefix = prefixes[chainID][root] + for _, address := range addresses { + root := condition.get(address) + prefix := root + if prefixes != nil && prefixes[root] != "" { + prefix = prefixes[root] + } + upper := condition.limit(root) + indexRows, err := store.db.GetRowsRange(upper, prefix, databaseOptions...) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + continue } - upper := condition.limit(root) - indexRows, err := store.db.GetRowsRange(upper, prefix, database.WithLimit(limit), database.WithOpenRange(true)) - if err != nil { - if errors.Is(err, database.ErrNotFound) { - continue - } - return nil, err + return nil, err + } + txKeys := make(map[string]string) + for _, row := range indexRows { + for key := range row.Values { + txKey := strings.TrimPrefix(key, fmt.Sprintf("%s:", defaultFamily)) + txKeys[txKey] = row.Key } - txKeys := make(map[string]string) - for _, row := range indexRows { - for key := range row.Values { - txKey := strings.TrimPrefix(key, fmt.Sprintf("%s:", defaultFamily)) - txKeys[txKey] = row.Key - } + } + txRows, err := store.db.GetRowsWithKeys(maps.Keys(txKeys)) + if err != nil { + return nil, err + } + for _, row := range txRows { + parts := strings.Split(row.Key, ":") + unMarshal := unMarshalTx + if parts[0] == "ERC20" { + unMarshal = unMarshalTransfer } - txRows, err := store.db.GetRowsWithKeys(maps.Keys(txKeys)) + interaction, err := unMarshal(row.Values[fmt.Sprintf("%s:%s", defaultFamily, dataColumn)]) if err != nil { return nil, err } - for _, row := range txRows { - interaction, err := unMarshal(row.Values[fmt.Sprintf("%s:%s", defaultFamily, dataColumn)]) - if err != nil { - return nil, err - } - interaction.ChainID = chainID - interactions = append(interactions, &interactionWithInfo{ - Interaction: interaction, - chainID: chainID, - root: root, - key: txKeys[row.Key], - }) - } + interaction.ChainID = parts[1] + interactions = append(interactions, &interactionWithInfo{ + Interaction: interaction, + chainID: parts[1], + root: root, + key: txKeys[row.Key], + }) } } return interactions, nil @@ -231,8 +164,6 @@ type Interaction struct { var erc20Transfer, _ = hex.DecodeString("a9059cbb") -type unMarshalInteraction func(b []byte) (*Interaction, error) - func unMarshalTx(b []byte) (*Interaction, error) { tx := &types.Eth1TransactionIndexed{} if err := proto.Unmarshal(b, tx); err != nil { diff --git a/backend/pkg/commons/db2/data/data_external_test.go b/backend/pkg/commons/db2/data/data_external_test.go index 04afd2bd3..efdaa9907 100644 --- a/backend/pkg/commons/db2/data/data_external_test.go +++ b/backend/pkg/commons/db2/data/data_external_test.go @@ -77,7 +77,7 @@ func TestStoreExternal(t *testing.T) { chainIDs: chainIDs, addresses: addresses, opts: []data.Option{ - data.IgnoreTransfers(), + data.OnlyTransactions(), data.ByMethod("a9059cbb"), }, }, @@ -87,15 +87,15 @@ func TestStoreExternal(t *testing.T) { chainIDs: chainIDs, addresses: addresses, opts: []data.Option{ - data.IgnoreTransactions(), + data.OnlyTransactions(), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var lastPrefixes map[string]map[string]string + var lastPrefixes map[string]string for i := 0; i < tt.scroll+1; i++ { - interactions, prefixes, err := store.Get(chainIDs, addresses, lastPrefixes, tt.limit, tt.opts...) + interactions, prefixes, err := store.Get(addresses, lastPrefixes, tt.limit, tt.opts...) if err != nil { t.Fatal(err) } diff --git a/backend/pkg/commons/db2/data/data_test.go b/backend/pkg/commons/db2/data/data_test.go index 0942331a6..18f477f9f 100644 --- a/backend/pkg/commons/db2/data/data_test.go +++ b/backend/pkg/commons/db2/data/data_test.go @@ -3,13 +3,10 @@ package data import ( "context" "encoding/hex" - "slices" - "sort" "testing" "time" "github.com/ethereum/go-ethereum/common" - "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/timestamppb" "github.com/gobitfly/beaconchain/pkg/commons/db2/database" @@ -125,7 +122,7 @@ func TestStore(t *testing.T) { }, }, limit: 1, - opts: []Option{IgnoreTransfers(), ByMethod(hex.EncodeToString([]byte("foo")))}, + opts: []Option{OnlyTransactions(), ByMethod(hex.EncodeToString([]byte("foo")))}, addresses: []common.Address{alice}, expectedHashes: []string{"hash1"}, }, @@ -139,7 +136,7 @@ func TestStore(t *testing.T) { }, }, limit: 1, - opts: []Option{WithTimeRange(timestamppb.New(t0), timestamppb.New(t0.Add(1*time.Second)))}, + opts: []Option{WithTimeRange(timestamppb.New(t0), timestamppb.New(t1))}, addresses: []common.Address{alice}, expectedHashes: []string{"hash2", "hash1"}, }, @@ -170,7 +167,8 @@ func TestStore(t *testing.T) { opts: []Option{OnlyReceived()}, addresses: []common.Address{bob}, expectedHashes: []string{"hash3", "hash1"}, - }, { + }, + { name: "only transfers", txs: map[string][][]*types.Eth1TransactionIndexed{ "1": { @@ -184,7 +182,7 @@ func TestStore(t *testing.T) { }, }, limit: 1, - opts: []Option{IgnoreTransactions()}, + opts: []Option{OnlyTransfers()}, addresses: []common.Address{alice}, expectedHashes: []string{"hash3", "hash1"}, }, @@ -202,7 +200,7 @@ func TestStore(t *testing.T) { }, }, limit: 1, - opts: []Option{IgnoreTransfers()}, + opts: []Option{OnlyTransactions()}, addresses: []common.Address{alice}, expectedHashes: []string{"hash3", "hash1"}, }, @@ -242,7 +240,7 @@ func TestStore(t *testing.T) { }, }, limit: 2, - opts: []Option{IgnoreTransactions(), ByAsset(usdc), WithTimeRange(timestamppb.New(t0.Add(1*time.Second)), timestamppb.New(t0.Add(3*time.Second)))}, + opts: []Option{OnlyTransfers(), ByAsset(usdc), WithTimeRange(timestamppb.New(t1), timestamppb.New(t0.Add(3*time.Second)))}, addresses: []common.Address{alice}, expectedHashes: []string{"hash4", "hash3", "hash2"}, }, @@ -258,7 +256,7 @@ func TestStore(t *testing.T) { }, }, limit: 2, - opts: []Option{IgnoreTransactions(), OnlySent(), ByAsset(usdc), WithTimeRange(timestamppb.New(t0.Add(1*time.Second)), timestamppb.New(t0.Add(3*time.Second)))}, + opts: []Option{OnlyTransfers(), OnlySent(), ByAsset(usdc), WithTimeRange(timestamppb.New(t1), timestamppb.New(t0.Add(3*time.Second)))}, addresses: []common.Address{alice}, expectedHashes: []string{"hash3"}, }, @@ -274,7 +272,7 @@ func TestStore(t *testing.T) { }, }, limit: 2, - opts: []Option{IgnoreTransactions(), OnlyReceived(), ByAsset(usdc), WithTimeRange(timestamppb.New(t0.Add(1*time.Second)), timestamppb.New(t0.Add(3*time.Second)))}, + opts: []Option{OnlyTransfers(), OnlyReceived(), ByAsset(usdc), WithTimeRange(timestamppb.New(t1), timestamppb.New(t0.Add(3*time.Second)))}, addresses: []common.Address{alice}, expectedHashes: []string{"hash3"}, }, @@ -296,12 +294,9 @@ func TestStore(t *testing.T) { } } } - chainIDs := append(maps.Keys(tt.txs), maps.Keys(tt.transfers)...) - sort.Strings(chainIDs) - chainIDs = slices.Compact(chainIDs) - var suffix map[string]map[string]string + var suffix map[string]string for i := int64(0); i < int64(len(tt.expectedHashes))/tt.limit; i++ { - txs, newSuffix, err := store.Get(chainIDs, tt.addresses, suffix, tt.limit, tt.opts...) + txs, newSuffix, err := store.Get(tt.addresses, suffix, tt.limit, tt.opts...) if err != nil { t.Fatalf("tx %d: %v", i, err) } @@ -322,7 +317,10 @@ func TestStore(t *testing.T) { } } -var t0 = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) +var ( + t0 = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + t1 = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Add(1 * time.Second) +) func newTx(hash string, from, to common.Address, method string, delta int64) *types.Eth1TransactionIndexed { return &types.Eth1TransactionIndexed{ diff --git a/backend/pkg/commons/db2/data/filter.go b/backend/pkg/commons/db2/data/filter.go index aea5b19f6..ce3ba9f55 100644 --- a/backend/pkg/commons/db2/data/filter.go +++ b/backend/pkg/commons/db2/data/filter.go @@ -8,236 +8,87 @@ import ( "github.com/golang/protobuf/ptypes/timestamp" ) -type formatType string - -const ( - typeTx = formatType("tx") - typeTransfer = formatType("transfer") -) - -type filterType string - -const ( - byMethod = filterType("byMethod") - bySent = filterType("bySent") - byReceived = filterType("byReceived") - byAsset = filterType("byAsset") - byAssetSent = filterType("byAssetSent") - byAssetReceived = filterType("byAssetReceived") -) - type filter interface { - get(chainID string, address common.Address) string + get(address common.Address) string limit(prefix string) string } -type chainFilter interface { - addByMethod(method string) error - addBySent() error - addByReceived() error - addByAsset(asset common.Address) error - addTimeRange(from *timestamp.Timestamp, to *timestamp.Timestamp) error - valid() error - filterType() filterType - filter -} - -type chainFilterTx struct { - base string - filtered filterType - - method *string - from *timestamp.Timestamp - to *timestamp.Timestamp -} - -func newChainFilterTx() *chainFilterTx { - return &chainFilterTx{ - base: ":I:TX:
:TIME", - } -} - -func (c *chainFilterTx) addByMethod(method string) error { - if c.filtered != "" { - return fmt.Errorf("filter tx already filtered by %s", c.filtered) - } - c.base = ":I:TX:
:METHOD:" - c.method = &method - c.filtered = byMethod - return nil -} - -func (c *chainFilterTx) addBySent() error { - if c.filtered != "" { - return fmt.Errorf("filter tx already filtered by %s", c.filtered) - } - c.base = ":I:TX:
:TO" - c.filtered = bySent - return nil -} - -func (c *chainFilterTx) addByReceived() error { - if c.filtered != "" { - return fmt.Errorf("filter tx already filtered by %s", c.filtered) - } - c.base = ":I:TX:
:FROM" - c.filtered = byReceived - return nil +func toHex(b []byte) string { + return fmt.Sprintf("%x", b) } -func (c *chainFilterTx) addByAsset(common.Address) error { - return fmt.Errorf("cannot filter tx by asset") +type queryFilter struct { + query string + timeFrom *timestamp.Timestamp + timeTo *timestamp.Timestamp } -func (c *chainFilterTx) addTimeRange(from *timestamp.Timestamp, to *timestamp.Timestamp) error { - if from == nil || to == nil { - return fmt.Errorf("invalid time range: empty border") +func newQueryFilter(options options) (*queryFilter, error) { + query := []string{"all"} + params := []string{"
"} + if options.onlySent && options.onlyReceived { + options.onlySent = false + options.onlyReceived = false } - c.from = from - c.to = to - return nil -} - -func (c *chainFilterTx) filterType() filterType { - return c.filtered -} - -func (c *chainFilterTx) valid() error { - return nil -} - -func (c *chainFilterTx) get(chainID string, address common.Address) string { - query := strings.Replace(c.base, "", chainID, 1) - query = strings.Replace(query, "
", fmt.Sprintf("%x", address.Bytes()), 1) - if c.method != nil { - query = strings.Replace(query, "", *c.method, 1) + if options.asset != nil && options.method != nil { + return nil, fmt.Errorf("cannot filter by method and by asset together") } - if c.to != nil { - query = fmt.Sprintf("%s:%s", query, reversePaddedTimestamp(c.to)) + if options.method != nil { + options.onlyTxs = true } - return query -} - -func (c *chainFilterTx) limit(prefix string) string { - if c.from != nil { - index := strings.LastIndex(prefix, ":") - return toSuccessor(fmt.Sprintf("%s:%s", prefix[:index], reversePaddedTimestamp(c.from))) + if options.asset != nil { + options.onlyTransfers = true } - return toSuccessor(prefix) -} - -type chainFilterTransfer struct { - base string - filtered filterType - - asset *common.Address - from *timestamp.Timestamp - to *timestamp.Timestamp -} - -func newChainFilterTransfer() *chainFilterTransfer { - return &chainFilterTransfer{ - base: ":I:ERC20:
:TIME", + if options.onlyTxs && options.onlyTransfers { + options.onlyTxs = false + options.onlyTransfers = false } -} - -func (c *chainFilterTransfer) addByMethod(string) error { - return fmt.Errorf("cannot filter transfer by method") -} - -func (c *chainFilterTransfer) addBySent() error { - if c.filtered != "" { - if c.filtered != byAsset { - return fmt.Errorf("filter transfer already filtered by %s", c.filtered) - } - return c.addByAssetSent() + if options.onlyReceived { + query[0] = "in" + params[0] = "
" } - c.base = ":I:ERC20:
:TOKEN_SENT" - c.filtered = bySent - return nil -} - -func (c *chainFilterTransfer) addByReceived() error { - if c.filtered != "" { - if c.filtered != byAsset { - return fmt.Errorf("filter transfer already filtered by %s", c.filtered) - } - return c.addByAssetReceived() + if options.onlySent { + query[0] = "out" + params[0] = "
" } - c.base = ":I:ERC20:
:TOKEN_RECEIVED" - c.filtered = byReceived - return nil -} - -func (c *chainFilterTransfer) addByAssetReceived() error { - c.base = ":I:ERC20:
:TOKEN_RECEIVED:" - c.filtered = byAssetReceived - return nil -} - -func (c *chainFilterTransfer) addByAssetSent() error { - c.base = ":I:ERC20:
:TOKEN_SENT:" - c.filtered = byAssetSent - return nil -} - -func (c *chainFilterTransfer) addByAsset(asset common.Address) error { - if c.filtered != "" { - if c.filtered != byReceived && c.filtered != bySent { - return fmt.Errorf("filter transfer already filtered by %s", c.filtered) - } + if options.chainID != nil { + query = append(query, "chainID") + params = append(params, *options.chainID) } - c.asset = &asset - if c.filtered == byReceived { - return c.addByAssetReceived() + if options.onlyTxs { + query = append(query, "TX") } - if c.filtered == bySent { - return c.addByAssetSent() + if options.onlyTransfers { + query = append(query, "ERC20") } - c.base = ":I:ERC20::
:TIME" - c.filtered = byAsset - return nil -} - -func (c *chainFilterTransfer) addTimeRange(from, to *timestamp.Timestamp) error { - if from == nil || to == nil { - return fmt.Errorf("invalid time range: empty border") + if options.method != nil { + query = append(query, "method") + params = append(params, *options.method) } - if c.filtered == byReceived || c.filtered == bySent { - return fmt.Errorf("cannot apply range over filter by %s", c.filtered) + if options.from == nil || options.to == nil { + options.from = nil + options.to = nil } - c.from = from - c.to = to - return nil -} -func (c *chainFilterTransfer) filterType() filterType { - return c.filtered + return &queryFilter{ + query: strings.Join(append(query, params...), ":"), + timeFrom: options.from, + timeTo: options.to, + }, nil } -func (c *chainFilterTransfer) valid() error { - if (c.from != nil || c.to != nil) && (c.filtered == bySent || c.filtered == byReceived) { - return fmt.Errorf("cannot apply range over filter by %s", c.filtered) - } - return nil -} - -func (c *chainFilterTransfer) get(chainID string, address common.Address) string { - query := strings.Replace(c.base, "", chainID, 1) - query = strings.Replace(query, "
", fmt.Sprintf("%x", address.Bytes()), 1) - if c.asset != nil { - query = strings.Replace(query, "", fmt.Sprintf("%x", c.asset.Bytes()), 1) - } - if c.to != nil { - query = fmt.Sprintf("%s:%s", query, reversePaddedTimestamp(c.to)) +func (f queryFilter) get(address common.Address) string { + query := strings.Replace(f.query, "
", toHex(address.Bytes()), 1) + if f.timeTo != nil { + query = fmt.Sprintf("%s:%s", query, reversePaddedTimestamp(f.timeTo)) } return query } -func (c *chainFilterTransfer) limit(prefix string) string { - if c.from != nil { +func (f queryFilter) limit(prefix string) string { + if f.timeFrom != nil { index := strings.LastIndex(prefix, ":") - return toSuccessor(fmt.Sprintf("%s:%s", prefix[:index], reversePaddedTimestamp(c.from))) + return toSuccessor(fmt.Sprintf("%s:%s", prefix[:index], reversePaddedTimestamp(f.timeFrom))) } return toSuccessor(prefix) } diff --git a/backend/pkg/commons/db2/data/filter_test.go b/backend/pkg/commons/db2/data/filter_test.go index 8bf258cd4..a31358bfe 100644 --- a/backend/pkg/commons/db2/data/filter_test.go +++ b/backend/pkg/commons/db2/data/filter_test.go @@ -1,139 +1,115 @@ package data import ( + "strings" "testing" "github.com/ethereum/go-ethereum/common" "google.golang.org/protobuf/types/known/timestamppb" ) -func TestFilter(t *testing.T) { +func TestQueryFilter(t *testing.T) { tests := []struct { - name string - filter chainFilter - add func(chainFilter) error - expectErr bool - expectType filterType + name string + options []Option + want string + err string }{ { - name: "tx by asset should err", - filter: newChainFilterTx(), - add: func(c chainFilter) error { - return c.addByAsset(common.Address{}) + name: "all", + want: "all:
", + }, + { + name: "err for ByMethod and ByAsset", + options: []Option{ + ByMethod(""), + ByAsset(common.Address{}), }, - expectErr: true, + err: "cannot filter by method and by asset together", }, { - name: "tx invalid time range", - filter: newChainFilterTx(), - add: func(c chainFilter) error { - return c.addTimeRange(nil, nil) + name: "only sent", + options: []Option{ + OnlySent(), }, - expectErr: true, + want: "out:
", }, { - name: "transfer by method should err", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - return c.addByMethod("") + name: "received", + options: []Option{ + OnlyReceived(), }, - expectErr: true, + want: "in:
", }, { - name: "transfer by asset sent", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addByAsset(common.Address{}); err != nil { - return err - } - return c.addBySent() + name: "sent", + options: []Option{ + OnlySent(), }, - expectType: byAssetSent, + want: "out:
", }, { - name: "transfer by asset received", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addByAsset(common.Address{}); err != nil { - return err - } - return c.addByReceived() + name: "on a chain ID", + options: []Option{ + ByChainID("1234"), }, - expectType: byAssetReceived, + want: "all:chainID:
:1234", }, { - name: "transfer by sent asset", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addBySent(); err != nil { - return err - } - return c.addByAsset(common.Address{}) + name: "sent on a chain ID", + options: []Option{ + OnlySent(), + ByChainID("1234"), }, - expectType: byAssetSent, + want: "out:chainID:
:1234", }, { - name: "transfer by received asset", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addByReceived(); err != nil { - return err - } - return c.addByAsset(common.Address{}) + name: "received on a chain ID", + options: []Option{ + OnlyReceived(), + ByChainID("1234"), }, - expectType: byAssetReceived, + want: "in:chainID:
:1234", }, { - name: "transfer invalid time range", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - return c.addTimeRange(nil, nil) + name: "received TX on a chain ID", + options: []Option{ + OnlyTransactions(), + OnlyReceived(), + ByChainID("1234"), }, - expectErr: true, + want: "in:chainID:TX:
:1234", }, { - name: "transfer time range over bySent should err", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addTimeRange(timestamppb.New(t0), timestamppb.New(t0)); err != nil { - return err - } - if err := c.addBySent(); err != nil { - return err - } - return c.valid() + name: "received method on a chain ID", + options: []Option{ + ByMethod("bar"), + OnlyReceived(), + ByChainID("1234"), }, - expectErr: true, + want: "in:chainID:TX:method:
:1234:bar", }, { - name: "transfer time range over byReceived should err", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addTimeRange(timestamppb.New(t0), timestamppb.New(t0)); err != nil { - return err - } - if err := c.addByReceived(); err != nil { - return err - } - return c.valid() + name: "with time range", + options: []Option{ + WithTimeRange(timestamppb.New(t0), timestamppb.New(t1)), }, - expectErr: true, + want: "all:
:" + reversePaddedTimestamp(timestamppb.New(t1)), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := tt.add(tt.filter) + filter, err := newQueryFilter(apply(tt.options)) if err != nil { - if !tt.expectErr { - t.Errorf("unexpected err: %s", err) + if got, want := err.Error(), tt.err; got != want { + t.Errorf("got error %v, want %v", got, want) } return } - if tt.expectErr { - t.Error("expected err but got nil") - } - if got, want := tt.filter.filterType(), tt.expectType; got != want { - t.Errorf("got %v, want %v", got, want) + addr := common.Address{} + query := filter.get(addr) + if got, want := query, strings.ReplaceAll(tt.want, "
", toHex(addr.Bytes())); got != want { + t.Errorf("get() = %v, want %v", got, want) } }) } diff --git a/backend/pkg/commons/db2/data/keys.go b/backend/pkg/commons/db2/data/keys.go index a039db47c..f6be0a643 100644 --- a/backend/pkg/commons/db2/data/keys.go +++ b/backend/pkg/commons/db2/data/keys.go @@ -11,8 +11,8 @@ import ( ) const ( - maxInt = 9223372036854775807 - maxExecutionLayerBlockNumber = 1000000000 + maxInt = 9223372036854775807 + // maxExecutionLayerBlockNumber = 1000000000 txPerBlockLimit = 10_000 logPerTxLimit = 100_000 @@ -38,191 +38,139 @@ func reversePaddedTimestamp(timestamp *timestamppb.Timestamp) string { return fmt.Sprintf("%019d", maxInt-timestamp.Seconds) } -func reversedPaddedBlockNumber(blockNumber uint64) string { - return fmt.Sprintf("%09d", maxExecutionLayerBlockNumber-blockNumber) -} - -func keyTx(chainID string, hash []byte) string { - format := ":TX:" - replacer := strings.NewReplacer("", chainID, "", fmt.Sprintf("%x", hash)) - return replacer.Replace(format) -} - -func keyTxSent(chainID string, tx *types.Eth1TransactionIndexed, index int) string { - format := ":I:TX::TO::