Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Increase logging in contract watcher #145

Merged
merged 1 commit into from
Oct 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/contract_watcher/full/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package transformer
import (
"errors"

"github.com/sirupsen/logrus"

"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/converter"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/retriever"
Expand Down Expand Up @@ -106,7 +108,11 @@ func (tr *Transformer) Init() error {

// Get contract name if it has one
var name = new(string)
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock)
pollingErr := tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock)
if pollingErr != nil {
// can't return this error because "name" might not exist on the contract
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

logrus.Warnf("error fetching contract data: %s", pollingErr.Error())
}

// Remove any potential accidental duplicate inputs in arg filter values
eventArgs := map[string]bool{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/contract_watcher/header/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (c *Converter) Convert(logs []gethTypes.Log, event types.Event, headerID in

// Convert the given watched event logs into types.Logs; returns a map of event names to a slice of their converted logs
func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error) {
contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
boundContract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
eventsToLogs := make(map[string][]types.Log)
for _, event := range events {
eventsToLogs[event.Name] = make([]types.Log, 0, len(logs))
Expand All @@ -141,7 +141,7 @@ func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.E
// If the log is of this event type, process it as such
if event.Sig() == log.Topics[0] {
values := make(map[string]interface{})
err := contract.UnpackLogIntoMap(values, event.Name, log)
err := boundContract.UnpackLogIntoMap(values, event.Name, log)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/contract_watcher/header/repository/header_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/hashicorp/golang-lru"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"

"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
Expand Down Expand Up @@ -148,7 +149,10 @@ func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids [
pgStr = pgStr[:len(pgStr)-2]
_, err = tx.Exec(pgStr, header.Id)
if err != nil {
tx.Rollback()
rollbackErr := tx.Rollback()
if rollbackErr != nil {
logrus.Warnf("error rolling back transaction: %s", rollbackErr.Error())
}
return err
}
}
Expand Down Expand Up @@ -246,6 +250,7 @@ func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlock
// Returns a continuous set of headers
func continuousHeaders(headers []core.Header) []core.Header {
if len(headers) < 1 {
logrus.Trace("no headers to arrange continuously")
return headers
}
previousHeader := headers[0].BlockNumber
Expand Down
14 changes: 7 additions & 7 deletions pkg/contract_watcher/header/repository/header_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ var _ = Describe("Repository", func() {
h1 := missingHeaders[0]
h2 := missingHeaders[1]
h3 := missingHeaders[2]
Expect(h1.BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber)))
Expect(h2.BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber)))
Expect(h3.BlockNumber).To(Equal(int64(mocks.MockHeader3.BlockNumber)))
Expect(h1.BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
Expect(h2.BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
Expect(h3.BlockNumber).To(Equal(mocks.MockHeader3.BlockNumber))
})

It("Returns only contiguous chunks of headers", func() {
Expand All @@ -150,8 +150,8 @@ var _ = Describe("Repository", func() {
missingHeaders, err := contractHeaderRepo.MissingHeaders(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber)))
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber)))
Expect(missingHeaders[0].BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
Expect(missingHeaders[1].BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
})

It("Fails if eventID does not yet exist in check_headers table", func() {
Expand Down Expand Up @@ -199,8 +199,8 @@ var _ = Describe("Repository", func() {
missingHeaders, err := contractHeaderRepo.MissingHeadersForAll(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber)))
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber)))
Expect(missingHeaders[0].BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
Expect(missingHeaders[1].BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
})

It("returns headers after starting header if starting header not missing", func() {
Expand Down
18 changes: 12 additions & 6 deletions pkg/contract_watcher/header/retriever/block_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ var _ = Describe("Block Retriever", func() {

Describe("RetrieveFirstBlock", func() {
It("Retrieves block number of earliest header in the database", func() {
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
_, err := headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
Expect(err).ToNot(HaveOccurred())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
Expect(err).ToNot(HaveOccurred())
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
Expect(err).ToNot(HaveOccurred())

i, err := r.RetrieveFirstBlock()
Expect(err).NotTo(HaveOccurred())
Expand All @@ -61,9 +64,12 @@ var _ = Describe("Block Retriever", func() {

Describe("RetrieveMostRecentBlock", func() {
It("Retrieves the latest header's block number", func() {
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
_, err := headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
Expect(err).ToNot(HaveOccurred())
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
Expect(err).ToNot(HaveOccurred())
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
Expect(err).ToNot(HaveOccurred())

i, err := r.RetrieveMostRecentBlock()
Expect(err).ToNot(HaveOccurred())
Expand Down
102 changes: 56 additions & 46 deletions pkg/contract_watcher/header/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package transformer

import (
"errors"
"fmt"
"strings"

"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/sirupsen/logrus"

"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/converter"
Expand Down Expand Up @@ -107,22 +109,22 @@ func (tr *Transformer) Init() error {
// Configure Abi
if tr.Config.Abis[contractAddr] == "" {
// If no abi is given in the config, this method will try fetching from internal look-up table and etherscan
err := tr.Parser.Parse(contractAddr)
if err != nil {
return err
parseErr := tr.Parser.Parse(contractAddr)
if parseErr != nil {
return fmt.Errorf("error parsing contract by address: %s", parseErr.Error())
}
} else {
// If we have an abi from the config, load that into the parser
err := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr])
if err != nil {
return err
parseErr := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr])
if parseErr != nil {
return fmt.Errorf("error parsing contract abi: %s", parseErr.Error())
}
}

// Get first block and most recent block number in the header repo
firstBlock, err := tr.Retriever.RetrieveFirstBlock()
if err != nil {
return err
firstBlock, retrieveErr := tr.Retriever.RetrieveFirstBlock()
if retrieveErr != nil {
return fmt.Errorf("error retrieving first block: %s", retrieveErr.Error())
}

// Set to specified range if it falls within the bounds
Expand All @@ -132,7 +134,11 @@ func (tr *Transformer) Init() error {

// Get contract name if it has one
var name = new(string)
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1)
pollingErr := tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1)
if pollingErr != nil {
// can't return this error because "name" might not exist on the contract
logrus.Warnf("error fetching contract data: %s", pollingErr.Error())
}

// Remove any potential accidental duplicate inputs
eventArgs := map[string]bool{}
Expand Down Expand Up @@ -165,9 +171,9 @@ func (tr *Transformer) Init() error {
tr.sortedEventIds[con.Address] = make([]string, 0, len(con.Events))
for _, event := range con.Events {
eventId := strings.ToLower(event.Name + "_" + con.Address)
err := tr.HeaderRepository.AddCheckColumn(eventId)
if err != nil {
return err
addColumnErr := tr.HeaderRepository.AddCheckColumn(eventId)
if addColumnErr != nil {
return fmt.Errorf("error adding check column: %s", addColumnErr.Error())
}
// Keep track of this event id; sorted and unsorted
tr.sortedEventIds[con.Address] = append(tr.sortedEventIds[con.Address], eventId)
Expand All @@ -180,9 +186,9 @@ func (tr *Transformer) Init() error {
tr.sortedMethodIds[con.Address] = make([]string, 0, len(con.Methods))
for _, m := range con.Methods {
methodId := strings.ToLower(m.Name + "_" + con.Address)
err := tr.HeaderRepository.AddCheckColumn(methodId)
if err != nil {
return err
addColumnErr := tr.HeaderRepository.AddCheckColumn(methodId)
if addColumnErr != nil {
return fmt.Errorf("error adding check column: %s", addColumnErr.Error())
}
tr.sortedMethodIds[con.Address] = append(tr.sortedMethodIds[con.Address], methodId)
}
Expand All @@ -202,9 +208,9 @@ func (tr *Transformer) Execute() error {
}

// Find unchecked headers for all events across all contracts; these are returned in asc order
missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds)
if err != nil {
return err
missingHeaders, missingHeadersErr := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds)
if missingHeadersErr != nil {
return fmt.Errorf("error getting missing headers: %s", missingHeadersErr.Error())
}

// Iterate over headers
Expand All @@ -216,23 +222,24 @@ func (tr *Transformer) Execute() error {
// Map to sort batch fetched logs by which contract they belong to, for post fetch processing
sortedLogs := make(map[string][]gethTypes.Log)
// And fetch all event logs across contracts at this header
allLogs, err := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header)
if err != nil {
return err
allLogs, fetchErr := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header)
if fetchErr != nil {
return fmt.Errorf("error fetching logs: %s", fetchErr.Error())
}

// If no logs are found mark the header checked for all of these eventIDs
// and continue to method polling and onto the next iteration
if len(allLogs) < 1 {
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds)
if err != nil {
return err
markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds)
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
err = tr.methodPolling(header, tr.sortedMethodIds)
if err != nil {
return err
pollingErr := tr.methodPolling(header, tr.sortedMethodIds)
if pollingErr != nil {
return fmt.Errorf("error polling methods: %s", pollingErr.Error())
}
tr.Start = header.BlockNumber + 1 // Empty header; setup to start at the next header
logrus.Tracef("no logs found for block %d, continuing", header.BlockNumber)
continue
}

Expand All @@ -245,41 +252,43 @@ func (tr *Transformer) Execute() error {
// Process logs for each contract
for conAddr, logs := range sortedLogs {
if logs == nil {
logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber)
continue
}
// Configure converter with this contract
con := tr.Contracts[conAddr]
tr.Converter.Update(con)

// Convert logs into batches of log mappings (eventName => []types.Logs
convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
if err != nil {
return err
convertedLogs, convertErr := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
if convertErr != nil {
return fmt.Errorf("error converting logs: %s", convertErr.Error())
}
// Cycle through each type of event log and persist them
for eventName, logs := range convertedLogs {
// If logs for this event are empty, mark them checked at this header and continue
if len(logs) < 1 {
eventId := strings.ToLower(eventName + "_" + con.Address)
err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
if err != nil {
return err
markCheckedErr := tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
logrus.Tracef("no logs found for event %s on contract %s at block %d, continuing", eventName, conAddr, header.BlockNumber)
continue
}
// If logs aren't empty, persist them
// Header is marked checked in the transactions
err = tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
if err != nil {
return err
persistErr := tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
if persistErr != nil {
return fmt.Errorf("error persisting logs: %s", persistErr.Error())
}
}
}

// Poll contracts at this block height
err = tr.methodPolling(header, tr.sortedMethodIds)
if err != nil {
return err
pollingErr := tr.methodPolling(header, tr.sortedMethodIds)
if pollingErr != nil {
return fmt.Errorf("error polling methods: %s", pollingErr.Error())
}
// Success; setup to start at the next header
tr.Start = header.BlockNumber + 1
Expand All @@ -294,19 +303,20 @@ func (tr *Transformer) methodPolling(header core.Header, sortedMethodIds map[str
// Skip method polling processes if no methods are specified
// Also don't try to poll methods below this contract's specified starting block
if len(con.Methods) == 0 || header.BlockNumber < con.StartingBlock {
logrus.Tracef("not polling contract: %s", con.Address)
continue
}

// Poll all methods for this contract at this header
err := tr.Poller.PollContractAt(*con, header.BlockNumber)
if err != nil {
return err
pollingErr := tr.Poller.PollContractAt(*con, header.BlockNumber)
if pollingErr != nil {
return fmt.Errorf("error polling contract %s: %s", con.Address, pollingErr.Error())
}

// Mark this header checked for the methods
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address])
if err != nil {
return err
markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address])
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/contract_watcher/header/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var _ = Describe("Transformer", func() {
err := t.Init()

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
Expect(err.Error()).To(ContainSubstring(fakes.FakeError.Error()))
})
})

Expand Down Expand Up @@ -109,7 +109,7 @@ var _ = Describe("Transformer", func() {
err := t.Init()

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
Expect(err.Error()).To(ContainSubstring(fakes.FakeError.Error()))
})
})
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/contract_watcher/shared/constants/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
var SupportsInterfaceABI = `[{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}]`

// Individual event interfaces for constructing ABI from
var SupportsInterace = `{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}`
var SupportsInterface = `{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}`
var AddrChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"a","type":"address"}],"name":"AddrChanged","type":"event"}`
var ContentChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"hash","type":"bytes32"}],"name":"ContentChanged","type":"event"}`
var NameChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"name","type":"string"}],"name":"NameChanged","type":"event"}`
Expand Down
2 changes: 1 addition & 1 deletion pkg/contract_watcher/shared/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newFetcherError(err error, fetchMethod string) *fetcherError {

// Fetcher struct
type Fetcher struct {
BlockChain core.BlockChain // Underyling Blockchain
BlockChain core.BlockChain // Underlying Blockchain
}

// Fetcher error
Expand Down
4 changes: 2 additions & 2 deletions pkg/contract_watcher/shared/helpers/test_helpers/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func SetupDBandBC() (*postgres.DB, core.BlockChain) {
rpcClient := client.NewRpcClient(rawRpcClient, infuraIPC)
ethClient := ethclient.NewClient(rawRpcClient)
blockChainClient := client.NewEthClient(ethClient)
node := node.MakeNode(rpcClient)
madeNode := node.MakeNode(rpcClient)
transactionConverter := rpc2.NewRpcTransactionConverter(ethClient)
blockChain := geth.NewBlockChain(blockChainClient, rpcClient, node, transactionConverter)
blockChain := geth.NewBlockChain(blockChainClient, rpcClient, madeNode, transactionConverter)

db, err := postgres.NewDB(config.Database{
Hostname: "localhost",
Expand Down
Loading