From 09af53bb94fa14da8e715b65241beaac3ce57f88 Mon Sep 17 00:00:00 2001 From: nabarun Date: Mon, 27 Jun 2022 10:33:06 +0530 Subject: [PATCH] Implement CSV file for watched addresses --- docker-compose.yml | 2 +- statediff/indexer/database/file/config.go | 2 +- .../database/file/csv_indexer_legacy_test.go | 11 +- .../indexer/database/file/csv_indexer_test.go | 349 ++++++++++++++++++ statediff/indexer/database/file/csv_writer.go | 161 +++++++- statediff/indexer/database/file/indexer.go | 178 +-------- .../database/file/indexer_shared_test.go | 11 - statediff/indexer/database/file/interfaces.go | 10 + .../database/file/sql_indexer_legacy_test.go | 10 + .../indexer/database/file/sql_indexer_test.go | 3 +- statediff/indexer/database/file/sql_writer.go | 183 ++++++++- statediff/types/schema.go | 10 + 12 files changed, 737 insertions(+), 193 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index b5f297c8e9e7..f86641d65a38 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3.2' +version: "3.2" services: migrations: diff --git a/statediff/indexer/database/file/config.go b/statediff/indexer/database/file/config.go index 2a69b4555cb8..6cd9d72880c2 100644 --- a/statediff/indexer/database/file/config.go +++ b/statediff/indexer/database/file/config.go @@ -64,7 +64,7 @@ var TestConfig = Config{ Mode: CSV, OutputDir: "./statediffing_test", FilePath: "./statediffing_test_file.sql", - WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql", + WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.csv", NodeInfo: node.Info{ GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", NetworkID: "1", diff --git a/statediff/indexer/database/file/csv_indexer_legacy_test.go b/statediff/indexer/database/file/csv_indexer_legacy_test.go index 98a8dc0078f3..6c82d5f2585b 100644 --- a/statediff/indexer/database/file/csv_indexer_legacy_test.go +++ b/statediff/indexer/database/file/csv_indexer_legacy_test.go @@ -33,9 +33,11 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/types" ) const dbDirectory = "/file" +const pgCopyStatement = `COPY %s FROM '%s' CSV` func setupCSVLegacy(t *testing.T) { mockLegacyBlock = legacyData.MockBlock @@ -81,7 +83,6 @@ func setupCSVLegacy(t *testing.T) { } func dumpCSVFileData(t *testing.T) { - pgCopyStatement := `COPY %s FROM '%s' CSV` outputDir := filepath.Join(dbDirectory, file.TestConfig.OutputDir) for _, tbl := range file.Tables { @@ -102,6 +103,14 @@ func dumpCSVFileData(t *testing.T) { } } +func dumpWatchedAddressesCSVFileData(t *testing.T) { + outputFilePath := filepath.Join(dbDirectory, file.TestConfig.WatchedAddressesFilePath) + stm := fmt.Sprintf(pgCopyStatement, types.TableWatchedAddresses.Name, outputFilePath) + + _, err = sqlxdb.Exec(stm) + require.NoError(t, err) +} + func tearDownCSV(t *testing.T) { file.TearDownDB(t, sqlxdb) diff --git a/statediff/indexer/database/file/csv_indexer_test.go b/statediff/indexer/database/file/csv_indexer_test.go index 3145eac96969..1a17a726d26d 100644 --- a/statediff/indexer/database/file/csv_indexer_test.go +++ b/statediff/indexer/database/file/csv_indexer_test.go @@ -19,6 +19,7 @@ package file_test import ( "context" "errors" + "math/big" "os" "testing" @@ -26,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/shared" + sdtypes "github.com/ethereum/go-ethereum/statediff/types" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -44,6 +46,7 @@ import ( func setupCSVIndexer(t *testing.T) { file.TestConfig.Mode = file.CSV file.TestConfig.OutputDir = "./statediffing_test" + file.TestConfig.WatchedAddressesFilePath = "./statediffing_watched_addresses_test_file.csv" if _, err := os.Stat(file.TestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) { err := os.RemoveAll(file.TestConfig.OutputDir) @@ -620,3 +623,349 @@ func TestCSVFileIndexer(t *testing.T) { } }) } + +func TestCSVFileWatchAddressMethods(t *testing.T) { + setupCSVIndexer(t) + defer tearDownCSV(t) + type res struct { + Address string `db:"address"` + CreatedAt uint64 `db:"created_at"` + WatchedAt uint64 `db:"watched_at"` + LastFilledAt uint64 `db:"last_filled_at"` + } + pgStr := "SELECT * FROM eth_meta.watched_addresses" + + t.Run("Load watched addresses (empty table)", func(t *testing.T) { + expectedData := []common.Address{} + + rows, err := ind.LoadWatchedAddresses() + require.NoError(t, err) + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + require.Equal(t, expectedData[idx], row) + } + }) + + t.Run("Insert watched addresses", func(t *testing.T) { + defer file.TearDownDB(t, sqlxdb) + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1))) + require.NoError(t, err) + dumpWatchedAddressesCSVFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + require.Equal(t, expectedData[idx], row) + } + }) + + t.Run("Insert watched addresses (some already watched)", func(t *testing.T) { + defer file.TearDownDB(t, sqlxdb) + args := []sdtypes.WatchAddressArg{ + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2))) + require.NoError(t, err) + dumpWatchedAddressesCSVFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + require.Equal(t, expectedData[idx], row) + } + }) + + t.Run("Remove watched addresses", func(t *testing.T) { + defer file.TearDownDB(t, sqlxdb) + args := []sdtypes.WatchAddressArg{ + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.RemoveWatchedAddresses(args) + require.NoError(t, err) + dumpWatchedAddressesCSVFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + require.Equal(t, expectedData[idx], row) + } + }) + + t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) { + defer file.TearDownDB(t, sqlxdb) + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{} + + err = ind.RemoveWatchedAddresses(args) + require.NoError(t, err) + dumpWatchedAddressesCSVFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + require.Equal(t, expectedData[idx], row) + } + }) + + t.Run("Set watched addresses", func(t *testing.T) { + defer file.TearDownDB(t, sqlxdb) + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2))) + require.NoError(t, err) + dumpWatchedAddressesCSVFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + require.Equal(t, expectedData[idx], row) + } + }) + + t.Run("Set watched addresses (some already watched)", func(t *testing.T) { + defer file.TearDownDB(t, sqlxdb) + args := []sdtypes.WatchAddressArg{ + { + Address: contract4Address, + CreatedAt: contract4CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract4Address, + CreatedAt: contract4CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3))) + require.NoError(t, err) + dumpWatchedAddressesCSVFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + require.Equal(t, expectedData[idx], row) + } + }) + + t.Run("Load watched addresses", func(t *testing.T) { + defer file.TearDownDB(t, sqlxdb) + expectedData := []common.Address{ + common.HexToAddress(contract4Address), + common.HexToAddress(contract2Address), + common.HexToAddress(contract3Address), + } + + rows, err := ind.LoadWatchedAddresses() + require.NoError(t, err) + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + require.Equal(t, expectedData[idx], row) + } + }) + + t.Run("Clear watched addresses", func(t *testing.T) { + defer file.TearDownDB(t, sqlxdb) + expectedData := []res{} + + err = ind.ClearWatchedAddresses() + require.NoError(t, err) + dumpWatchedAddressesCSVFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + require.Equal(t, expectedData[idx], row) + } + }) + + t.Run("Clear watched addresses (empty table)", func(t *testing.T) { + defer file.TearDownDB(t, sqlxdb) + expectedData := []res{} + + err = ind.ClearWatchedAddresses() + require.NoError(t, err) + dumpWatchedAddressesCSVFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + require.Equal(t, expectedData[idx], row) + } + }) +} diff --git a/statediff/indexer/database/file/csv_writer.go b/statediff/indexer/database/file/csv_writer.go index ae4f68e1391d..53e7a5797b0d 100644 --- a/statediff/indexer/database/file/csv_writer.go +++ b/statediff/indexer/database/file/csv_writer.go @@ -18,7 +18,9 @@ package file import ( "encoding/csv" + "errors" "fmt" + "math/big" "os" "path/filepath" "strconv" @@ -26,7 +28,9 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" node "github.com/ipfs/go-ipld-format" + "github.com/thoas/go-funk" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" @@ -55,8 +59,11 @@ type tableRow struct { } type CSVWriter struct { - dir string // dir containing output files - writers fileWriters + // dir containing output files + dir string + + writers fileWriters + watchedAddressesWriter fileWriter rows chan tableRow flushChan chan struct{} @@ -127,22 +134,30 @@ func (tx fileWriters) flush() error { return nil } -func NewCSVWriter(path string) (*CSVWriter, error) { +func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, error) { if err := os.MkdirAll(path, 0777); err != nil { return nil, fmt.Errorf("unable to make MkdirAll for path: %s err: %s", path, err) } + writers, err := makeFileWriters(path, Tables) if err != nil { return nil, err } + + watchedAddressesWriter, err := newFileWriter(watchedAddressesFilePath) + if err != nil { + return nil, err + } + csvWriter := &CSVWriter{ - writers: writers, - dir: path, - rows: make(chan tableRow), - flushChan: make(chan struct{}), - flushFinished: make(chan struct{}), - quitChan: make(chan struct{}), - doneChan: make(chan struct{}), + writers: writers, + watchedAddressesWriter: watchedAddressesWriter, + dir: path, + rows: make(chan tableRow), + flushChan: make(chan struct{}), + flushFinished: make(chan struct{}), + quitChan: make(chan struct{}), + doneChan: make(chan struct{}), } return csvWriter, nil } @@ -308,3 +323,129 @@ func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) csw.rows <- tableRow{types.TableStorageNode, values} } + +// LoadWatchedAddresses loads watched addresses from a file +func (csw *CSVWriter) loadWatchedAddresses() ([]common.Address, error) { + watchedAddressesFilePath := csw.watchedAddressesWriter.file.Name() + // load csv rows from watched addresses file + rows, err := loadWatchedAddressesRows(watchedAddressesFilePath) + if err != nil { + return nil, err + } + + // extract addresses from the csv rows + watchedAddresses := funk.Map(rows, func(row []string) common.Address { + // first column is for address in eth_meta.watched_addresses + addressString := row[0] + + return common.HexToAddress(addressString) + }).([]common.Address) + + return watchedAddresses, nil +} + +// InsertWatchedAddresses inserts the given addresses in a file +func (csw *CSVWriter) insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error { + // load csv rows from watched addresses file + watchedAddresses, err := csw.loadWatchedAddresses() + if err != nil { + return err + } + + // append rows for new addresses to existing csv file + for _, arg := range args { + // ignore if already watched + if funk.Contains(watchedAddresses, common.HexToAddress(arg.Address)) { + continue + } + + var values []interface{} + values = append(values, arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0") + row := types.TableWatchedAddresses.ToCsvRow(values...) + + // writing directly instead of using rows channel as it needs to be flushed immediately + err = csw.watchedAddressesWriter.Write(row) + if err != nil { + return err + } + } + + // watched addresses need to be flushed immediately as they also need to be read from the file + csw.watchedAddressesWriter.Flush() + err = csw.watchedAddressesWriter.Error() + if err != nil { + return err + } + + return nil +} + +// RemoveWatchedAddresses removes the given watched addresses from a file +func (csw *CSVWriter) removeWatchedAddresses(args []types.WatchAddressArg) error { + // load csv rows from watched addresses file + watchedAddressesFilePath := csw.watchedAddressesWriter.file.Name() + rows, err := loadWatchedAddressesRows(watchedAddressesFilePath) + if err != nil { + return err + } + + // get rid of rows having addresses to be removed + filteredRows := funk.Filter(rows, func(row []string) bool { + return !funk.Contains(args, func(arg types.WatchAddressArg) bool { + // Compare first column in table for address + return arg.Address == row[0] + }) + }).([][]string) + + return dumpWatchedAddressesRows(csw.watchedAddressesWriter, filteredRows) +} + +// SetWatchedAddresses clears and inserts the given addresses in a file +func (csw *CSVWriter) setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error { + var rows [][]string + for _, arg := range args { + row := types.TableWatchedAddresses.ToCsvRow(arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0") + rows = append(rows, row) + } + + return dumpWatchedAddressesRows(csw.watchedAddressesWriter, rows) +} + +// loadCSVWatchedAddresses loads csv rows from the given file +func loadWatchedAddressesRows(filePath string) ([][]string, error) { + file, err := os.Open(filePath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return [][]string{}, nil + } + + return nil, fmt.Errorf("error opening watched addresses file: %v", err) + } + + defer file.Close() + reader := csv.NewReader(file) + + return reader.ReadAll() +} + +// dumpWatchedAddressesRows dumps csv rows to the given file +func dumpWatchedAddressesRows(watchedAddressesWriter fileWriter, filteredRows [][]string) error { + file := watchedAddressesWriter.file + file.Close() + + file, err := os.Create(file.Name()) + if err != nil { + return fmt.Errorf("error creating watched addresses file: %v", err) + } + + watchedAddressesWriter.Writer = csv.NewWriter(file) + watchedAddressesWriter.file = file + + for _, row := range filteredRows { + watchedAddressesWriter.Write(row) + } + + watchedAddressesWriter.Flush() + + return nil +} diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 4e3dc62ff7dd..d66897e2c6f5 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -17,7 +17,6 @@ package file import ( - "bufio" "context" "errors" "fmt" @@ -30,8 +29,6 @@ import ( "github.com/ipfs/go-cid" node "github.com/ipfs/go-ipld-format" "github.com/multiformats/go-multihash" - pg_query "github.com/pganalyze/pg_query_go/v2" - "github.com/thoas/go-funk" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -49,7 +46,7 @@ import ( const defaultOutputDir = "./statediff_output" const defaultFilePath = "./statediff.sql" -const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.sql" +const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.csv" const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ('%s', '%d', '%d') ON CONFLICT (address) DO NOTHING;" @@ -66,14 +63,19 @@ type StateDiffIndexer struct { nodeID string wg *sync.WaitGroup removedCacheFlag *uint32 - - watchedAddressesFilePath string } // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) { var err error var writer FileWriter + + watchedAddressesFilePath := config.WatchedAddressesFilePath + if watchedAddressesFilePath == "" { + watchedAddressesFilePath = defaultWatchedAddressesFilePath + } + log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath) + switch config.Mode { case CSV: outputDir := config.OutputDir @@ -87,7 +89,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c log.Info("Writing statediff CSV files to directory", "file", outputDir) - writer, err = NewCSVWriter(outputDir) + writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath) if err != nil { return nil, err } @@ -105,27 +107,20 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c } log.Info("Writing statediff SQL statements to file", "file", filePath) - writer = NewSQLWriter(file) + writer = NewSQLWriter(file, watchedAddressesFilePath) default: return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode) } - watchedAddressesFilePath := config.WatchedAddressesFilePath - if watchedAddressesFilePath == "" { - watchedAddressesFilePath = defaultWatchedAddressesFilePath - } - log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath) - wg := new(sync.WaitGroup) writer.Loop() writer.upsertNode(config.NodeInfo) return &StateDiffIndexer{ - fileWriter: writer, - chainConfig: chainConfig, - nodeID: config.NodeInfo.ID, - wg: wg, - watchedAddressesFilePath: watchedAddressesFilePath, + fileWriter: writer, + chainConfig: chainConfig, + nodeID: config.NodeInfo.ID, + wg: wg, }, nil } @@ -550,162 +545,25 @@ func (sdi *StateDiffIndexer) Close() error { // LoadWatchedAddresses loads watched addresses from a file func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { - // load sql statements from watched addresses file - stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath) - if err != nil { - return nil, err - } - - // extract addresses from the sql statements - watchedAddresses := []common.Address{} - for _, stmt := range stmts { - addressString, err := parseWatchedAddressStatement(stmt) - if err != nil { - return nil, err - } - watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString)) - } - - return watchedAddresses, nil + return sdi.fileWriter.loadWatchedAddresses() } // InsertWatchedAddresses inserts the given addresses in a file func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { - // load sql statements from watched addresses file - stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath) - if err != nil { - return err - } - - // get already watched addresses - var watchedAddresses []string - for _, stmt := range stmts { - addressString, err := parseWatchedAddressStatement(stmt) - if err != nil { - return err - } - - watchedAddresses = append(watchedAddresses, addressString) - } - - // append statements for new addresses to existing statements - for _, arg := range args { - // ignore if already watched - if funk.Contains(watchedAddresses, arg.Address) { - continue - } - - stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64()) - stmts = append(stmts, stmt) - } - - return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts) + return sdi.fileWriter.insertWatchedAddresses(args, currentBlockNumber) } // RemoveWatchedAddresses removes the given watched addresses from a file func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error { - // load sql statements from watched addresses file - stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath) - if err != nil { - return err - } - - // get rid of statements having addresses to be removed - var filteredStmts []string - for _, stmt := range stmts { - addressString, err := parseWatchedAddressStatement(stmt) - if err != nil { - return err - } - - toRemove := funk.Contains(args, func(arg sdtypes.WatchAddressArg) bool { - return arg.Address == addressString - }) - - if !toRemove { - filteredStmts = append(filteredStmts, stmt) - } - } - - return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, filteredStmts) + return sdi.fileWriter.removeWatchedAddresses(args) } // SetWatchedAddresses clears and inserts the given addresses in a file func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { - var stmts []string - for _, arg := range args { - stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64()) - stmts = append(stmts, stmt) - } - - return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts) + return sdi.fileWriter.setWatchedAddresses(args, currentBlockNumber) } // ClearWatchedAddresses clears all the watched addresses from a file func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, big.NewInt(0)) } - -// loadWatchedAddressesStatements loads sql statements from the given file in a string slice -func loadWatchedAddressesStatements(filePath string) ([]string, error) { - file, err := os.Open(filePath) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return []string{}, nil - } - - return nil, fmt.Errorf("error opening watched addresses file: %v", err) - } - defer file.Close() - - stmts := []string{} - scanner := bufio.NewScanner(file) - for scanner.Scan() { - stmts = append(stmts, scanner.Text()) - } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error loading watched addresses: %v", err) - } - - return stmts, nil -} - -// dumpWatchedAddressesStatements dumps sql statements to the given file -func dumpWatchedAddressesStatements(filePath string, stmts []string) error { - file, err := os.Create(filePath) - if err != nil { - return fmt.Errorf("error creating watched addresses file: %v", err) - } - defer file.Close() - - for _, stmt := range stmts { - _, err := file.Write([]byte(stmt + "\n")) - if err != nil { - return fmt.Errorf("error inserting watched_addresses entry: %v", err) - } - } - - return nil -} - -// parseWatchedAddressStatement parses given sql insert statement to extract the address argument -func parseWatchedAddressStatement(stmt string) (string, error) { - parseResult, err := pg_query.Parse(stmt) - if err != nil { - return "", fmt.Errorf("error parsing sql stmt: %v", err) - } - - // extract address argument from parse output for a SQL statement of form - // "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) - // VALUES ('0xabc', '123', '130') ON CONFLICT (address) DO NOTHING;" - addressString := parseResult.Stmts[0].Stmt.GetInsertStmt(). - SelectStmt.GetSelectStmt(). - ValuesLists[0].GetList(). - Items[0].GetAConst(). - GetVal(). - GetString_(). - Str - - return addressString, nil -} diff --git a/statediff/indexer/database/file/indexer_shared_test.go b/statediff/indexer/database/file/indexer_shared_test.go index 4c3f1d2882b8..c8e0196c4284 100644 --- a/statediff/indexer/database/file/indexer_shared_test.go +++ b/statediff/indexer/database/file/indexer_shared_test.go @@ -25,7 +25,6 @@ import ( "github.com/ipfs/go-cid" "github.com/jmoiron/sqlx" "github.com/multiformats/go-multihash" - "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" @@ -190,13 +189,3 @@ func resetDB(t *testing.T) { t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err) } } - -func resetAndDumpWatchedAddressesFileData(t *testing.T) { - resetDB(t) - - sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath) - require.NoError(t, err) - - _, err = sqlxdb.Exec(string(sqlFileBytes)) - require.NoError(t, err) -} diff --git a/statediff/indexer/database/file/interfaces.go b/statediff/indexer/database/file/interfaces.go index f751a2c196f8..5f99fc53ac6d 100644 --- a/statediff/indexer/database/file/interfaces.go +++ b/statediff/indexer/database/file/interfaces.go @@ -17,10 +17,14 @@ package file import ( + "math/big" + node "github.com/ipfs/go-ipld-format" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/statediff/indexer/models" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/types" ) // Writer interface required by the file indexer @@ -47,4 +51,10 @@ type FileWriter interface { upsertIPLDDirect(blockNumber, key string, value []byte) upsertIPLDNode(blockNumber string, i node.Node) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error) + + // Methods to read and write watched addresses + loadWatchedAddresses() ([]common.Address, error) + insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error + removeWatchedAddresses(args []types.WatchAddressArg) error + setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error } diff --git a/statediff/indexer/database/file/sql_indexer_legacy_test.go b/statediff/indexer/database/file/sql_indexer_legacy_test.go index 6fed2a26c2ac..ffbadf6cf2fc 100644 --- a/statediff/indexer/database/file/sql_indexer_legacy_test.go +++ b/statediff/indexer/database/file/sql_indexer_legacy_test.go @@ -81,6 +81,16 @@ func dumpFileData(t *testing.T) { require.NoError(t, err) } +func resetAndDumpWatchedAddressesFileData(t *testing.T) { + resetDB(t) + + sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath) + require.NoError(t, err) + + _, err = sqlxdb.Exec(string(sqlFileBytes)) + require.NoError(t, err) +} + func tearDown(t *testing.T) { file.TearDownDB(t, sqlxdb) diff --git a/statediff/indexer/database/file/sql_indexer_test.go b/statediff/indexer/database/file/sql_indexer_test.go index 2f88c8750cdb..e8ddda403e39 100644 --- a/statediff/indexer/database/file/sql_indexer_test.go +++ b/statediff/indexer/database/file/sql_indexer_test.go @@ -45,6 +45,7 @@ import ( func setupIndexer(t *testing.T) { file.TestConfig.Mode = file.SQL + file.TestConfig.WatchedAddressesFilePath = "./statediffing_watched_addresses_test_file.sql" if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) { err := os.Remove(file.TestConfig.FilePath) @@ -621,7 +622,7 @@ func TestSQLFileIndexer(t *testing.T) { }) } -func TestFileWatchAddressMethods(t *testing.T) { +func TestSQLFileWatchAddressMethods(t *testing.T) { setupIndexer(t) defer tearDown(t) diff --git a/statediff/indexer/database/file/sql_writer.go b/statediff/indexer/database/file/sql_writer.go index 3c11b5eea135..c9ea7469b183 100644 --- a/statediff/indexer/database/file/sql_writer.go +++ b/statediff/indexer/database/file/sql_writer.go @@ -17,17 +17,24 @@ package file import ( + "bufio" + "errors" "fmt" "io" + "math/big" + "os" blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" node "github.com/ipfs/go-ipld-format" + pg_query "github.com/pganalyze/pg_query_go/v2" + "github.com/thoas/go-funk" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/types" ) var ( @@ -47,18 +54,21 @@ type SQLWriter struct { flushFinished chan struct{} quitChan chan struct{} doneChan chan struct{} + + watchedAddressesFilePath string } // NewSQLWriter creates a new pointer to a Writer -func NewSQLWriter(wc io.WriteCloser) *SQLWriter { +func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string) *SQLWriter { return &SQLWriter{ - wc: wc, - stmts: make(chan []byte), - collatedStmt: make([]byte, writeBufferSize), - flushChan: make(chan struct{}), - flushFinished: make(chan struct{}), - quitChan: make(chan struct{}), - doneChan: make(chan struct{}), + wc: wc, + stmts: make(chan []byte), + collatedStmt: make([]byte, writeBufferSize), + flushChan: make(chan struct{}), + flushFinished: make(chan struct{}), + quitChan: make(chan struct{}), + doneChan: make(chan struct{}), + watchedAddressesFilePath: watchedAddressesFilePath, } } @@ -257,3 +267,160 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) { sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)) } + +// LoadWatchedAddresses loads watched addresses from a file +func (sqw *SQLWriter) loadWatchedAddresses() ([]common.Address, error) { + // load sql statements from watched addresses file + stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath) + if err != nil { + return nil, err + } + + // extract addresses from the sql statements + watchedAddresses := []common.Address{} + for _, stmt := range stmts { + addressString, err := parseWatchedAddressStatement(stmt) + if err != nil { + return nil, err + } + watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString)) + } + + return watchedAddresses, nil +} + +// InsertWatchedAddresses inserts the given addresses in a file +func (sqw *SQLWriter) insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error { + // load sql statements from watched addresses file + stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath) + if err != nil { + return err + } + + // get already watched addresses + var watchedAddresses []string + for _, stmt := range stmts { + addressString, err := parseWatchedAddressStatement(stmt) + if err != nil { + return err + } + + watchedAddresses = append(watchedAddresses, addressString) + } + + // append statements for new addresses to existing statements + for _, arg := range args { + // ignore if already watched + if funk.Contains(watchedAddresses, arg.Address) { + continue + } + + stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64()) + stmts = append(stmts, stmt) + } + + return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, stmts) +} + +// RemoveWatchedAddresses removes the given watched addresses from a file +func (sqw *SQLWriter) removeWatchedAddresses(args []types.WatchAddressArg) error { + // load sql statements from watched addresses file + stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath) + if err != nil { + return err + } + + // get rid of statements having addresses to be removed + var filteredStmts []string + for _, stmt := range stmts { + addressString, err := parseWatchedAddressStatement(stmt) + if err != nil { + return err + } + + toRemove := funk.Contains(args, func(arg types.WatchAddressArg) bool { + return arg.Address == addressString + }) + + if !toRemove { + filteredStmts = append(filteredStmts, stmt) + } + } + + return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, filteredStmts) +} + +// SetWatchedAddresses clears and inserts the given addresses in a file +func (sqw *SQLWriter) setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error { + var stmts []string + for _, arg := range args { + stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64()) + stmts = append(stmts, stmt) + } + + return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, stmts) +} + +// loadWatchedAddressesStatements loads sql statements from the given file in a string slice +func loadWatchedAddressesStatements(filePath string) ([]string, error) { + file, err := os.Open(filePath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return []string{}, nil + } + + return nil, fmt.Errorf("error opening watched addresses file: %v", err) + } + defer file.Close() + + stmts := []string{} + scanner := bufio.NewScanner(file) + for scanner.Scan() { + stmts = append(stmts, scanner.Text()) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error loading watched addresses: %v", err) + } + + return stmts, nil +} + +// dumpWatchedAddressesStatements dumps sql statements to the given file +func dumpWatchedAddressesStatements(filePath string, stmts []string) error { + file, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("error creating watched addresses file: %v", err) + } + defer file.Close() + + for _, stmt := range stmts { + _, err := file.Write([]byte(stmt + "\n")) + if err != nil { + return fmt.Errorf("error inserting watched_addresses entry: %v", err) + } + } + + return nil +} + +// parseWatchedAddressStatement parses given sql insert statement to extract the address argument +func parseWatchedAddressStatement(stmt string) (string, error) { + parseResult, err := pg_query.Parse(stmt) + if err != nil { + return "", fmt.Errorf("error parsing sql stmt: %v", err) + } + + // extract address argument from parse output for a SQL statement of form + // "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) + // VALUES ('0xabc', '123', '130') ON CONFLICT (address) DO NOTHING;" + addressString := parseResult.Stmts[0].Stmt.GetInsertStmt(). + SelectStmt.GetSelectStmt(). + ValuesLists[0].GetList(). + Items[0].GetAConst(). + GetVal(). + GetString_(). + Str + + return addressString, nil +} diff --git a/statediff/types/schema.go b/statediff/types/schema.go index e7eba3526075..ac61ea64da72 100644 --- a/statediff/types/schema.go +++ b/statediff/types/schema.go @@ -172,3 +172,13 @@ var TableStateAccount = Table{ {name: "storage_root", typ: varchar}, }, } + +var TableWatchedAddresses = Table{ + "eth_meta.watched_addresses", + []column{ + {name: "address", typ: varchar}, + {name: "created_at", typ: bigint}, + {name: "watched_at", typ: bigint}, + {name: "last_filled_at", typ: bigint}, + }, +}