Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make owner count changes idempotent #164

Merged
merged 14 commits into from
Jul 1, 2022
24 changes: 12 additions & 12 deletions models/events/sale.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
)

type Sale struct {
ID string `gorm:"column:id" json:"id"`
ChainID uint64 `gorm:"column:chain_id" json:"chain_id"`
MarketplaceAddress string `gorm:"column:marketplace_address" json:"marketplace_address"`
CollectionAddress string `gorm:"column:collection_address" json:"collection_address"`
TokenID string `gorm:"column:token_id" json:"token_id"`
BlockNumber uint64 `gorm:"column:block_number" json:"block_number"`
TransactionHash string `gorm:"column:transaction_hash" json:"transaction_hash"`
EventIndex uint `gorm:"column:event_index" json:"event_index"`
SellerAddress string `gorm:"column:seller_address" json:"seller_address"`
BuyerAddress string `gorm:"column:buyer_address" json:"buyer_address"`
TradePrice string `gorm:"column:trade_price" json:"trade_price"`
EmittedAt time.Time `gorm:"column:emitted_at" json:"emitted_at"`
ID string `json:"id"`
ChainID uint64 `json:"chain_id"`
MarketplaceAddress string `json:"marketplace_address"`
CollectionAddress string `json:"collection_address"`
TokenID string `json:"token_id"`
BlockNumber uint64 `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
EventIndex uint `json:"event_index"`
SellerAddress string `json:"seller_address"`
BuyerAddress string `json:"buyer_address"`
TradePrice string `json:"trade_price"`
EmittedAt time.Time `json:"emitted_at"`
}
33 changes: 22 additions & 11 deletions models/events/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@ package events

import (
"time"

"github.com/NFT-com/indexer/models/id"
)

type Transfer struct {
ID string `gorm:"column:id" json:"id"`
ChainID uint64 `gorm:"column:chain_id" json:"chain_id"`
CollectionAddress string `gorm:"column:collection_address" json:"collection_address"`
TokenID string `gorm:"column:token_id" json:"token_id"`
BlockNumber uint64 `gorm:"column:block_number" json:"block_number"`
TransactionHash string `gorm:"column:transaction_hash" json:"transaction_hash"`
EventIndex uint `gorm:"column:event_index" json:"event_index"`
SenderAddress string `gorm:"column:sender_address" json:"sender_address"`
ReceiverAddress string `gorm:"column:receiver_address" json:"receiver_address"`
TokenCount uint `gorm:"column:token_count" json:"token_count"`
EmittedAt time.Time `gorm:"column:emitted_at" json:"emitted_at"`
ID string `json:"id"`
ChainID uint64 `json:"chain_id"`
TokenStandard string `json:"token_standard"`
CollectionAddress string `json:"collection_address"`
TokenID string `json:"token_id"`
BlockNumber uint64 `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
EventIndex uint `json:"event_index"`
SenderAddress string `json:"sender_address"`
ReceiverAddress string `json:"receiver_address"`
TokenCount uint `json:"token_count"`
EmittedAt time.Time `json:"emitted_at"`
}

func (t Transfer) NFTID() string {
return id.NFT(t.ChainID, t.CollectionAddress, t.TokenID)
}

func (t Transfer) EventID() string {
return id.Event(t.TransactionHash, t.EventIndex)
}
2 changes: 0 additions & 2 deletions models/graph/nft.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,4 @@ type NFT struct {
URI string `json:"uri"`
Image string `json:"image"`
Description string `json:"description"`
Owner string `json:"owner"`
Number uint `json:"number"`
}
14 changes: 14 additions & 0 deletions models/id/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package id

import (
"fmt"

"github.com/google/uuid"
"golang.org/x/crypto/sha3"
)

func Event(tx string, index uint) string {
eventHash := sha3.Sum256([]byte(fmt.Sprintf("%s-%d", tx, index)))
eventID := uuid.Must(uuid.FromBytes(eventHash[:16]))
return eventID.String()
}
2 changes: 0 additions & 2 deletions models/jobs/addition.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ type Addition struct {
ContractAddress string `json:"contract_address"`
TokenID string `json:"token_id"`
TokenStandard string `json:"token_standard"`
OwnerAddress string `json:"owner_address"`
TokenCount uint `json:"token_count"`
}

func (a Addition) NFTID() string {
Expand Down
20 changes: 0 additions & 20 deletions models/jobs/modification.go

This file was deleted.

10 changes: 4 additions & 6 deletions models/results/parsing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ import (
)

type Parsing struct {
Job *jobs.Parsing `json:"job"`
Transfers []*events.Transfer `json:"transfers"`
Sales []*events.Sale `json:"sales"`
Additions []*jobs.Addition `json:"additions"`
Modifications []*jobs.Modification `json:"modifications"`
Requests uint `json:"requests"`
Job *jobs.Parsing `json:"job"`
Transfers []*events.Transfer `json:"transfers"`
Sales []*events.Sale `json:"sales"`
Requests uint `json:"requests"`
}
2 changes: 2 additions & 0 deletions service/parsers/erc1155_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/NFT-com/indexer/models/abis"
"github.com/NFT-com/indexer/models/events"
"github.com/NFT-com/indexer/models/jobs"
)

func ERC1155Batch(log types.Log) ([]*events.Transfer, error) {
Expand Down Expand Up @@ -47,6 +48,7 @@ func ERC1155Batch(log types.Log) ([]*events.Transfer, error) {
transfer := events.Transfer{
ID: transferID.String(),
// ChainID set after parsing
TokenStandard: jobs.StandardERC1155,
CollectionAddress: log.Address.Hex(),
TokenID: tokenID.String(),
BlockNumber: log.BlockNumber,
Expand Down
2 changes: 2 additions & 0 deletions service/parsers/erc1155_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/NFT-com/indexer/models/abis"
"github.com/NFT-com/indexer/models/events"
"github.com/NFT-com/indexer/models/jobs"
)

func ERC1155Transfer(log types.Log) (*events.Transfer, error) {
Expand Down Expand Up @@ -42,6 +43,7 @@ func ERC1155Transfer(log types.Log) (*events.Transfer, error) {
transfer := events.Transfer{
ID: transferID.String(),
// ChainID set after parsing
TokenStandard: jobs.StandardERC1155,
CollectionAddress: log.Address.Hex(),
TokenID: tokenID.String(),
BlockNumber: log.BlockNumber,
Expand Down
2 changes: 2 additions & 0 deletions service/parsers/erc721_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"

"github.com/NFT-com/indexer/models/events"
"github.com/NFT-com/indexer/models/jobs"
)

func ERC721Transfer(log types.Log) (*events.Transfer, error) {
Expand All @@ -24,6 +25,7 @@ func ERC721Transfer(log types.Log) (*events.Transfer, error) {
transfer := events.Transfer{
ID: transferID.String(),
// ChainID set after parsing
TokenStandard: jobs.StandardERC721,
CollectionAddress: log.Address.Hex(),
TokenID: log.Topics[3].Big().String(),
BlockNumber: log.BlockNumber,
Expand Down
10 changes: 2 additions & 8 deletions service/pipeline/addition_stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,21 @@ func (a *AdditionStage) process(payload []byte) error {

// Finally, we make the necessary changes to the DB: insert the NFT, the traits
// and apply the necessary ownership changes.
err = a.nfts.Insert(result.NFT)
err = a.nfts.Upsert(result.NFT)
if err != nil {
return fmt.Errorf("could not insert NFT: %w", err)
}
err = a.traits.Insert(result.Traits...)
err = a.traits.Upsert(result.Traits...)
if err != nil {
return fmt.Errorf("could not insert traits: %w", err)
}
err = a.owners.Add(&result)
if err != nil {
return fmt.Errorf("could not add owner: %w", err)
}

a.log.Info().
Str("job_id", result.Job.ID).
Uint64("chain_id", result.Job.ChainID).
Str("contract_address", result.Job.ContractAddress).
Str("token_id", result.Job.TokenID).
Str("token_standard", result.Job.TokenStandard).
Str("owner_address", result.Job.OwnerAddress).
Uint("token_count", result.Job.TokenCount).
Int("traits", len(result.Traits)).
Msg("addition job processed")

Expand Down
86 changes: 54 additions & 32 deletions service/pipeline/parsing_stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"

"github.com/google/uuid"
"github.com/nsqio/go-nsq"
"github.com/rs/zerolog"
"go.uber.org/ratelimit"
Expand All @@ -13,6 +14,8 @@ import (
"github.com/aws/aws-sdk-go-v2/service/lambda"

"github.com/NFT-com/indexer/config/params"
"github.com/NFT-com/indexer/models/events"
"github.com/NFT-com/indexer/models/graph"
"github.com/NFT-com/indexer/models/jobs"
"github.com/NFT-com/indexer/models/results"
)
Expand Down Expand Up @@ -130,44 +133,71 @@ func (p *ParsingStage) process(payload []byte) error {
return fmt.Errorf("could not decode parsing result: %w", err)
}

// As the Lambda has no access to the DB, we need to fill in the collection ID
// for all addition and modification jobs here. This could be done in a batch
// request to improve performance, but it shouldn't have a big impact.
for _, addition := range result.Additions {
collection, err := p.collections.One(addition.ChainID, addition.ContractAddress)
if err != nil {
return fmt.Errorf("could not get collection for addition: %w", err)
// We can go through the transfers and process those with zero address as mints.
var dummies []*graph.NFT
var payloads [][]byte
var owners []*events.Transfer
for _, transfer := range result.Transfers {

if transfer.SenderAddress == transfer.ReceiverAddress {
continue
}
addition.CollectionID = collection.ID
}
for _, modification := range result.Modifications {
collection, err := p.collections.One(modification.ChainID, modification.ContractAddress)

// Collect transfers that are not no-ops for owner changes.
owners = append(owners, transfer)

// Get the collection ID based on chain ID and collection address, so we can
// reference it directly for the addition job and the NFT insertion.
collection, err := p.collections.One(transfer.ChainID, transfer.CollectionAddress)
if err != nil {
return fmt.Errorf("could not get collection for modification: %w", err)
return fmt.Errorf("could not get collection for transfer: %w", err)
}
modification.CollectionID = collection.ID
}

// As a first step of processing the results, we want to push all of the addition
// jobs into the addition pipeline, so that the addition dispatcher can start
// its work as soon as possible.
payloads := make([][]byte, 0, len(result.Additions))
for _, addition := range result.Additions {
// Create a placeholder NFT that we will create in the DB.
dummy := graph.NFT{
ID: transfer.NFTID(),
CollectionID: collection.ID,
TokenID: transfer.TokenID,
}
dummies = append(dummies, &dummy)

// Skip transfers that do not originate from the zero address so we process
// only mints.
if transfer.SenderAddress != params.AddressZero {
continue
}

// Create an addition job to complete the data for the NFT.
addition := jobs.Addition{
ID: uuid.NewString(),
ChainID: transfer.ChainID,
CollectionID: collection.ID,
ContractAddress: transfer.CollectionAddress,
TokenID: transfer.TokenID,
TokenStandard: transfer.TokenStandard,
}
payload, err := json.Marshal(addition)
if err != nil {
return fmt.Errorf("could not encode addition job: %w", err)
}
payloads = append(payloads, payload)
}

if len(payloads) > 0 {
err = p.additions.MultiPublish(params.TopicAddition, payloads)
if err != nil {
return fmt.Errorf("could not publish addition jobs: %w", err)
}
}

// In a second step, we insert all of the transfers and sales that were parsed
// into the events database.
// Touch all the NFTs that have been created, so that we can apply owner changes
// out of order, before the full NFT information is available from the addition.
err = p.nfts.Touch(dummies...)
if err != nil {
return fmt.Errorf("could not touch dummies: %w", err)
}

// Next, we can store all the raw events for transfers and sales.
err = p.transfers.Upsert(result.Transfers...)
if err != nil {
return fmt.Errorf("could not upsert transfers: %w", err)
Expand All @@ -177,16 +207,10 @@ func (p *ParsingStage) process(payload []byte) error {
return fmt.Errorf("could not upsert sales: %w", err)
}

// Finally, we make sure that we have all of the NFTs that were modified in the
// database already, at least as placeholders, and we apply the ownership changes
// for them in one batch operation.
err = p.nfts.Touch(result.Modifications...)
if err != nil {
return fmt.Errorf("could not touch NFTs: %w", err)
}
err = p.owners.Change(result.Modifications...)
// Last but not least, we can upsert the owner change updates for each transfer.
err = p.owners.Upsert(owners...)
if err != nil {
return fmt.Errorf("could not change owners: %w", err)
return fmt.Errorf("could not upsert owners: %w", err)
}

p.log.Info().
Expand All @@ -198,8 +222,6 @@ func (p *ParsingStage) process(payload []byte) error {
Strs("event_hashes", result.Job.EventHashes).
Int("transfers", len(result.Transfers)).
Int("sales", len(result.Sales)).
Int("additions", len(result.Additions)).
Int("modifications", len(result.Modifications)).
Msg("parsing job processed")

// As we can't know in advance how many requests a Lambda will make, we will
Expand Down
10 changes: 4 additions & 6 deletions service/pipeline/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@ import (
"github.com/NFT-com/indexer/models/events"
"github.com/NFT-com/indexer/models/graph"
"github.com/NFT-com/indexer/models/jobs"
"github.com/NFT-com/indexer/models/results"
)

type NFTStore interface {
Touch(modifications ...*jobs.Modification) error
Insert(nft *graph.NFT) error
Touch(dummies ...*graph.NFT) error
Upsert(nft *graph.NFT) error
}

type TraitStore interface {
Insert(traits ...*graph.Trait) error
Upsert(traits ...*graph.Trait) error
}

type OwnerStore interface {
Add(additions ...*results.Addition) error
Change(modifications ...*jobs.Modification) error
Upsert(transfers ...*events.Transfer) error
}

type BoundaryStore interface {
Expand Down
4 changes: 0 additions & 4 deletions service/workers/addition_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ func (a *AdditionHandler) Handle(ctx context.Context, addition *jobs.Addition) (
Str("contract_address", addition.ContractAddress).
Str("token_id", addition.TokenID).
Str("token_standard", addition.TokenStandard).
Str("owner_address", addition.OwnerAddress).
Uint("token_count", addition.TokenCount).
Logger()

log.Info().
Expand Down Expand Up @@ -142,8 +140,6 @@ func (a *AdditionHandler) Handle(ctx context.Context, addition *jobs.Addition) (
URI: tokenURI,
Image: token.Image,
Description: token.Description,
Owner: addition.OwnerAddress,
Number: addition.TokenCount,
}

result := results.Addition{
Expand Down
Loading