Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
fix for issue #146; mark header checked for contract if it doesnt have
Browse files Browse the repository at this point in the history
any logs at that header but other contracts do; test
  • Loading branch information
i-norden committed Oct 28, 2019
1 parent 3fff289 commit 11b5efb
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 16 deletions.
40 changes: 38 additions & 2 deletions integration_test/contract_watcher_header_sync_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ var _ = Describe("contractWatcher headerSync transformer", func() {
var blockChain core.BlockChain
var headerRepository repositories.HeaderRepository
var headerID int64
var ensAddr = strings.ToLower(constants.EnsContractAddress)
var tusdAddr = strings.ToLower(constants.TusdContractAddress)
var ensAddr = strings.ToLower(constants.EnsContractAddress) // 0x314159265dd8dbb310642f98f50c066173c1259b
var tusdAddr = strings.ToLower(constants.TusdContractAddress) // 0x8dd5fbce2f6a956c3022ba3663759011dd51e73e

BeforeEach(func() {
db, blockChain = test_helpers.SetupDBandBC()
Expand Down Expand Up @@ -377,6 +377,42 @@ var _ = Describe("contractWatcher headerSync transformer", func() {
Expect(transferLog.Value).To(Equal("2800000000000000000000"))
})

It("Marks header checked for a contract that has no logs at that header", func() {
t := transformer.NewTransformer(test_helpers.ENSandTusdConfig, blockChain, db)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
Expect(t.Start).To(Equal(int64(6885702)))

newOwnerLog := test_helpers.HeaderSyncNewOwnerLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM header_%s.newowner_event", ensAddr)).StructScan(&newOwnerLog)
Expect(err).ToNot(HaveOccurred())
transferLog := test_helpers.HeaderSyncTransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM header_%s.transfer_event", tusdAddr)).StructScan(&transferLog)
Expect(err).ToNot(HaveOccurred())
Expect(transferLog.HeaderID).ToNot(Equal(newOwnerLog.HeaderID))

type checkedHeader struct {
ID int64 `db:"id"`
HeaderID int64 `db:"header_id"`
NewOwner int64 `db:"newowner_0x314159265dd8dbb310642f98f50c066173c1259b"`
Transfer int64 `db:"transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e"`
}

transferCheckedHeader := new(checkedHeader)
err = db.QueryRowx("SELECT * FROM public.checked_headers WHERE header_id = $1", transferLog.HeaderID).StructScan(transferCheckedHeader)
Expect(err).ToNot(HaveOccurred())
Expect(transferCheckedHeader.Transfer).To(Equal(int64(1)))
Expect(transferCheckedHeader.NewOwner).To(Equal(int64(1)))

newOwnerCheckedHeader := new(checkedHeader)
err = db.QueryRowx("SELECT * FROM public.checked_headers WHERE header_id = $1", newOwnerLog.HeaderID).StructScan(newOwnerCheckedHeader)
Expect(err).ToNot(HaveOccurred())
Expect(newOwnerCheckedHeader.NewOwner).To(Equal(int64(1)))
Expect(newOwnerCheckedHeader.Transfer).To(Equal(int64(1)))
})

It("Keeps track of contract-related hashes and addresses while transforming event data if they need to be used for later method polling", func() {
var testConf config.ContractConfig
testConf = test_helpers.ENSandTusdConfig
Expand Down
10 changes: 0 additions & 10 deletions pkg/contract_watcher/header/repository/header_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"

"github.com/hashicorp/golang-lru"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"

"github.com/vulcanize/vulcanizedb/pkg/core"
Expand Down Expand Up @@ -268,12 +267,3 @@ func continuousHeaders(headers []core.Header) []core.Header {
func (r *headerRepository) CheckCache(key string) (interface{}, bool) {
return r.columns.Get(key)
}

// Used to mark a header checked as part of some external transaction so as to group into one commit
func (r *headerRepository) MarkHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, eventID string) error {
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET `+eventID+` = checked_headers.`+eventID+` + 1`, headerID, 1)
return err
}
17 changes: 15 additions & 2 deletions pkg/contract_watcher/header/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,19 +249,32 @@ func (tr *Transformer) Execute() error {
}

// Sort logs by the contract they belong to
// Start by adding every contract addr to the map
// So that if we don't have any logs for it, it is caught and the header is still marked checked for its events
for _, addr := range tr.contractAddresses {
sortedLogs[addr] = nil
}
for _, log := range allLogs {
addr := strings.ToLower(log.Address.Hex())
sortedLogs[addr] = append(sortedLogs[addr], log)
}

// Process logs for each contract
for conAddr, logs := range sortedLogs {
if logs == nil {
con := tr.Contracts[conAddr]
if len(logs) < 1 {
eventIds := make([]string, 0)
for eventName := range con.Events {
eventIds = append(eventIds, strings.ToLower(eventName+"_"+con.Address))
}
markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds)
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber)
continue
}
// Configure converter with this contract
con := tr.Contracts[conAddr]
tr.Converter.Update(con)

// Convert logs into batches of log mappings (eventName => []types.Logs
Expand Down
3 changes: 1 addition & 2 deletions pkg/contract_watcher/shared/helpers/test_helpers/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ func TearDown(db *postgres.DB) {

_, err = tx.Exec(`CREATE TABLE checked_headers (
id SERIAL PRIMARY KEY,
header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE,
check_count INTEGER NOT NULL DEFAULT 1);`)
header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE);`)
Expect(err).NotTo(HaveOccurred())

_, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e CASCADE`)
Expand Down

0 comments on commit 11b5efb

Please sign in to comment.