From 9397573bbd795db815ba325e439d2af989c32267 Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 18:01:12 +0100 Subject: [PATCH 01/14] Make owner count upserts idempotent --- models/events/sale.go | 24 ++++----- models/events/transfer.go | 33 ++++++++---- models/graph/nft.go | 2 - models/id/event.go | 14 +++++ models/jobs/addition.go | 2 - models/results/parsing.go | 10 ++-- service/parsers/erc1155_batch.go | 2 + service/parsers/erc1155_transfer.go | 2 + service/parsers/erc721_transfer.go | 2 + service/pipeline/addition_stage.go | 10 +--- service/pipeline/parsing_stage.go | 77 ++++++++++++++++----------- service/pipeline/stores.go | 10 ++-- service/workers/addition_handler.go | 4 -- service/workers/parsing_handler.go | 52 ++---------------- sql/01_tables_graph_database.sql | 3 +- sql/02_tables_events_database.sql | 1 + storage/events/transfer_repository.go | 2 + storage/graph/nft_repository.go | 27 +++++----- storage/graph/owner_repository.go | 75 ++++++-------------------- storage/graph/trait_repository.go | 2 +- storage/jobs/failure_repository.go | 2 - 21 files changed, 148 insertions(+), 208 deletions(-) create mode 100644 models/id/event.go diff --git a/models/events/sale.go b/models/events/sale.go index e54c4c25..cf24380e 100644 --- a/models/events/sale.go +++ b/models/events/sale.go @@ -5,16 +5,16 @@ import ( ) type Sale struct { - ID string `gorm:"column:id" json:"id"` - ChainID uint64 `gorm:"column:chain_id" json:"chain_id"` - MarketplaceAddress string `gorm:"column:marketplace_address" json:"marketplace_address"` - CollectionAddress string `gorm:"column:collection_address" json:"collection_address"` - TokenID string `gorm:"column:token_id" json:"token_id"` - BlockNumber uint64 `gorm:"column:block_number" json:"block_number"` - TransactionHash string `gorm:"column:transaction_hash" json:"transaction_hash"` - EventIndex uint `gorm:"column:event_index" json:"event_index"` - SellerAddress string `gorm:"column:seller_address" json:"seller_address"` - BuyerAddress string `gorm:"column:buyer_address" json:"buyer_address"` - TradePrice string `gorm:"column:trade_price" json:"trade_price"` - EmittedAt time.Time `gorm:"column:emitted_at" json:"emitted_at"` + ID string `json:"id"` + ChainID uint64 `json:"chain_id"` + MarketplaceAddress string `json:"marketplace_address"` + CollectionAddress string `json:"collection_address"` + TokenID string `json:"token_id"` + BlockNumber uint64 `json:"block_number"` + TransactionHash string `json:"transaction_hash"` + EventIndex uint `json:"event_index"` + SellerAddress string `json:"seller_address"` + BuyerAddress string `json:"buyer_address"` + TradePrice string `json:"trade_price"` + EmittedAt time.Time `json:"emitted_at"` } diff --git a/models/events/transfer.go b/models/events/transfer.go index 50df780e..b0a2e313 100644 --- a/models/events/transfer.go +++ b/models/events/transfer.go @@ -2,18 +2,29 @@ package events import ( "time" + + "github.com/NFT-com/indexer/models/id" ) type Transfer struct { - ID string `gorm:"column:id" json:"id"` - ChainID uint64 `gorm:"column:chain_id" json:"chain_id"` - CollectionAddress string `gorm:"column:collection_address" json:"collection_address"` - TokenID string `gorm:"column:token_id" json:"token_id"` - BlockNumber uint64 `gorm:"column:block_number" json:"block_number"` - TransactionHash string `gorm:"column:transaction_hash" json:"transaction_hash"` - EventIndex uint `gorm:"column:event_index" json:"event_index"` - SenderAddress string `gorm:"column:sender_address" json:"sender_address"` - ReceiverAddress string `gorm:"column:receiver_address" json:"receiver_address"` - TokenCount uint `gorm:"column:token_count" json:"token_count"` - EmittedAt time.Time `gorm:"column:emitted_at" json:"emitted_at"` + ID string `json:"id"` + ChainID uint64 `json:"chain_id"` + TokenStandard string `json:"token_standard"` + CollectionAddress string `json:"collection_address"` + TokenID string `json:"token_id"` + BlockNumber uint64 `json:"block_number"` + TransactionHash string `json:"transaction_hash"` + EventIndex uint `json:"event_index"` + SenderAddress string `json:"sender_address"` + ReceiverAddress string `json:"receiver_address"` + TokenCount uint `json:"token_count"` + EmittedAt time.Time `json:"emitted_at"` +} + +func (t Transfer) NFTID() string { + return id.NFT(t.ChainID, t.CollectionAddress, t.TokenID) +} + +func (t Transfer) EventID() string { + return id.Event(t.TransactionHash, t.EventIndex) } diff --git a/models/graph/nft.go b/models/graph/nft.go index 285644ef..4c085053 100644 --- a/models/graph/nft.go +++ b/models/graph/nft.go @@ -8,6 +8,4 @@ type NFT struct { URI string `json:"uri"` Image string `json:"image"` Description string `json:"description"` - Owner string `json:"owner"` - Number uint `json:"number"` } diff --git a/models/id/event.go b/models/id/event.go new file mode 100644 index 00000000..3e407570 --- /dev/null +++ b/models/id/event.go @@ -0,0 +1,14 @@ +package id + +import ( + "fmt" + + "github.com/google/uuid" + "golang.org/x/crypto/sha3" +) + +func Event(hash string, index uint) string { + eventHash := sha3.Sum256([]byte(fmt.Sprintf("%s-%d", hash, index))) + eventID := uuid.Must(uuid.FromBytes(eventHash[:])) + return eventID.String() +} diff --git a/models/jobs/addition.go b/models/jobs/addition.go index 74a2cfd2..4a3bd1e5 100644 --- a/models/jobs/addition.go +++ b/models/jobs/addition.go @@ -11,8 +11,6 @@ type Addition struct { ContractAddress string `json:"contract_address"` TokenID string `json:"token_id"` TokenStandard string `json:"token_standard"` - OwnerAddress string `json:"owner_address"` - TokenCount uint `json:"token_count"` } func (a Addition) NFTID() string { diff --git a/models/results/parsing.go b/models/results/parsing.go index 4dc07d4f..d4a8d145 100644 --- a/models/results/parsing.go +++ b/models/results/parsing.go @@ -6,10 +6,8 @@ import ( ) type Parsing struct { - Job *jobs.Parsing `json:"job"` - Transfers []*events.Transfer `json:"transfers"` - Sales []*events.Sale `json:"sales"` - Additions []*jobs.Addition `json:"additions"` - Modifications []*jobs.Modification `json:"modifications"` - Requests uint `json:"requests"` + Job *jobs.Parsing `json:"job"` + Transfers []*events.Transfer `json:"transfers"` + Sales []*events.Sale `json:"sales"` + Requests uint `json:"requests"` } diff --git a/service/parsers/erc1155_batch.go b/service/parsers/erc1155_batch.go index dad4703a..01fbc4ab 100644 --- a/service/parsers/erc1155_batch.go +++ b/service/parsers/erc1155_batch.go @@ -13,6 +13,7 @@ import ( "github.com/NFT-com/indexer/models/abis" "github.com/NFT-com/indexer/models/events" + "github.com/NFT-com/indexer/models/jobs" ) func ERC1155Batch(log types.Log) ([]*events.Transfer, error) { @@ -47,6 +48,7 @@ func ERC1155Batch(log types.Log) ([]*events.Transfer, error) { transfer := events.Transfer{ ID: transferID.String(), // ChainID set after parsing + TokenStandard: jobs.StandardERC1155, CollectionAddress: log.Address.Hex(), TokenID: tokenID.String(), BlockNumber: log.BlockNumber, diff --git a/service/parsers/erc1155_transfer.go b/service/parsers/erc1155_transfer.go index 7d28098d..42a4c9a9 100644 --- a/service/parsers/erc1155_transfer.go +++ b/service/parsers/erc1155_transfer.go @@ -13,6 +13,7 @@ import ( "github.com/NFT-com/indexer/models/abis" "github.com/NFT-com/indexer/models/events" + "github.com/NFT-com/indexer/models/jobs" ) func ERC1155Transfer(log types.Log) (*events.Transfer, error) { @@ -42,6 +43,7 @@ func ERC1155Transfer(log types.Log) (*events.Transfer, error) { transfer := events.Transfer{ ID: transferID.String(), // ChainID set after parsing + TokenStandard: jobs.StandardERC1155, CollectionAddress: log.Address.Hex(), TokenID: tokenID.String(), BlockNumber: log.BlockNumber, diff --git a/service/parsers/erc721_transfer.go b/service/parsers/erc721_transfer.go index 4b5ea3f5..f6878b25 100644 --- a/service/parsers/erc721_transfer.go +++ b/service/parsers/erc721_transfer.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/NFT-com/indexer/models/events" + "github.com/NFT-com/indexer/models/jobs" ) func ERC721Transfer(log types.Log) (*events.Transfer, error) { @@ -24,6 +25,7 @@ func ERC721Transfer(log types.Log) (*events.Transfer, error) { transfer := events.Transfer{ ID: transferID.String(), // ChainID set after parsing + TokenStandard: jobs.StandardERC721, CollectionAddress: log.Address.Hex(), TokenID: log.Topics[3].Big().String(), BlockNumber: log.BlockNumber, diff --git a/service/pipeline/addition_stage.go b/service/pipeline/addition_stage.go index 27bff3b4..a2e11101 100644 --- a/service/pipeline/addition_stage.go +++ b/service/pipeline/addition_stage.go @@ -125,18 +125,14 @@ func (a *AdditionStage) process(payload []byte) error { // Finally, we make the necessary changes to the DB: insert the NFT, the traits // and apply the necessary ownership changes. - err = a.nfts.Insert(result.NFT) + err = a.nfts.Upsert(result.NFT) if err != nil { return fmt.Errorf("could not insert NFT: %w", err) } - err = a.traits.Insert(result.Traits...) + err = a.traits.Upsert(result.Traits...) if err != nil { return fmt.Errorf("could not insert traits: %w", err) } - err = a.owners.Add(&result) - if err != nil { - return fmt.Errorf("could not add owner: %w", err) - } a.log.Info(). Str("job_id", result.Job.ID). @@ -144,8 +140,6 @@ func (a *AdditionStage) process(payload []byte) error { Str("contract_address", result.Job.ContractAddress). Str("token_id", result.Job.TokenID). Str("token_standard", result.Job.TokenStandard). - Str("owner_address", result.Job.OwnerAddress). - Uint("token_count", result.Job.TokenCount). Int("traits", len(result.Traits)). Msg("addition job processed") diff --git a/service/pipeline/parsing_stage.go b/service/pipeline/parsing_stage.go index 3a53cd15..63bd60ef 100644 --- a/service/pipeline/parsing_stage.go +++ b/service/pipeline/parsing_stage.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" + "github.com/google/uuid" "github.com/nsqio/go-nsq" "github.com/rs/zerolog" "go.uber.org/ratelimit" @@ -13,6 +14,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/lambda" "github.com/NFT-com/indexer/config/params" + "github.com/NFT-com/indexer/models/graph" "github.com/NFT-com/indexer/models/jobs" "github.com/NFT-com/indexer/models/results" ) @@ -130,35 +132,48 @@ func (p *ParsingStage) process(payload []byte) error { return fmt.Errorf("could not decode parsing result: %w", err) } - // As the Lambda has no access to the DB, we need to fill in the collection ID - // for all addition and modification jobs here. This could be done in a batch - // request to improve performance, but it shouldn't have a big impact. - for _, addition := range result.Additions { - collection, err := p.collections.One(addition.ChainID, addition.ContractAddress) - if err != nil { - return fmt.Errorf("could not get collection for addition: %w", err) + // We can go through the transfers and process those with zero address as mints. + var touches []*graph.NFT + var payloads [][]byte + for _, transfer := range result.Transfers { + + // Skip transfers that do not originate from the zero address so we process + // only mints. + if transfer.SenderAddress != params.AddressZero { + continue } - addition.CollectionID = collection.ID - } - for _, modification := range result.Modifications { - collection, err := p.collections.One(modification.ChainID, modification.ContractAddress) + + // Get the collection ID based on chain ID and collection address, so we can + // reference it directly for the addition job and the NFT insertion. + collection, err := p.collections.One(transfer.ChainID, transfer.CollectionAddress) if err != nil { - return fmt.Errorf("could not get collection for modification: %w", err) + return fmt.Errorf("could not get collection for transfer: %w", err) } - modification.CollectionID = collection.ID - } - // As a first step of processing the results, we want to push all of the addition - // jobs into the addition pipeline, so that the addition dispatcher can start - // its work as soon as possible. - payloads := make([][]byte, 0, len(result.Additions)) - for _, addition := range result.Additions { + // Create a placeholder NFT that we will create in the DB. + touch := graph.NFT{ + ID: transfer.NFTID(), + CollectionID: collection.ID, + TokenID: transfer.TokenID, + } + touches = append(touches, &touch) + + // Create an addition job to complete the data for the NFT. + addition := jobs.Addition{ + ID: uuid.NewString(), + ChainID: transfer.ChainID, + CollectionID: collection.ID, + ContractAddress: transfer.CollectionAddress, + TokenID: transfer.TokenID, + TokenStandard: transfer.TokenStandard, + } payload, err := json.Marshal(addition) if err != nil { return fmt.Errorf("could not encode addition job: %w", err) } payloads = append(payloads, payload) } + if len(payloads) > 0 { err = p.additions.MultiPublish(params.TopicAddition, payloads) if err != nil { @@ -166,8 +181,14 @@ func (p *ParsingStage) process(payload []byte) error { } } - // In a second step, we insert all of the transfers and sales that were parsed - // into the events database. + // Touch all the NFTs that have been created, so that we can apply owner changes + // out of order, before the full NFT information is available from the addition. + err = p.nfts.Touch(touches...) + if err != nil { + return fmt.Errorf("could not touch NFTs: %w", err) + } + + // Next, we can store all the raw events for transfers and sales. err = p.transfers.Upsert(result.Transfers...) if err != nil { return fmt.Errorf("could not upsert transfers: %w", err) @@ -177,16 +198,10 @@ func (p *ParsingStage) process(payload []byte) error { return fmt.Errorf("could not upsert sales: %w", err) } - // Finally, we make sure that we have all of the NFTs that were modified in the - // database already, at least as placeholders, and we apply the ownership changes - // for them in one batch operation. - err = p.nfts.Touch(result.Modifications...) - if err != nil { - return fmt.Errorf("could not touch NFTs: %w", err) - } - err = p.owners.Change(result.Modifications...) + // Last but not least, we can upsert the owner change updates for each transfer. + err = p.owners.Upsert(result.Transfers...) if err != nil { - return fmt.Errorf("could not change owners: %w", err) + return fmt.Errorf("could not upsert owners: %w", err) } p.log.Info(). @@ -198,8 +213,6 @@ func (p *ParsingStage) process(payload []byte) error { Strs("event_hashes", result.Job.EventHashes). Int("transfers", len(result.Transfers)). Int("sales", len(result.Sales)). - Int("additions", len(result.Additions)). - Int("modifications", len(result.Modifications)). Msg("parsing job processed") // As we can't know in advance how many requests a Lambda will make, we will diff --git a/service/pipeline/stores.go b/service/pipeline/stores.go index 1db98bff..89a9fa30 100644 --- a/service/pipeline/stores.go +++ b/service/pipeline/stores.go @@ -4,21 +4,19 @@ import ( "github.com/NFT-com/indexer/models/events" "github.com/NFT-com/indexer/models/graph" "github.com/NFT-com/indexer/models/jobs" - "github.com/NFT-com/indexer/models/results" ) type NFTStore interface { - Touch(modifications ...*jobs.Modification) error - Insert(nft *graph.NFT) error + Touch(nfts ...*graph.NFT) error + Upsert(nft *graph.NFT) error } type TraitStore interface { - Insert(traits ...*graph.Trait) error + Upsert(traits ...*graph.Trait) error } type OwnerStore interface { - Add(additions ...*results.Addition) error - Change(modifications ...*jobs.Modification) error + Upsert(transfers ...*events.Transfer) error } type BoundaryStore interface { diff --git a/service/workers/addition_handler.go b/service/workers/addition_handler.go index 01e1c07c..d3ea57af 100644 --- a/service/workers/addition_handler.go +++ b/service/workers/addition_handler.go @@ -42,8 +42,6 @@ func (a *AdditionHandler) Handle(ctx context.Context, addition *jobs.Addition) ( Str("contract_address", addition.ContractAddress). Str("token_id", addition.TokenID). Str("token_standard", addition.TokenStandard). - Str("owner_address", addition.OwnerAddress). - Uint("token_count", addition.TokenCount). Logger() log.Info(). @@ -142,8 +140,6 @@ func (a *AdditionHandler) Handle(ctx context.Context, addition *jobs.Addition) ( URI: tokenURI, Image: token.Image, Description: token.Description, - Owner: addition.OwnerAddress, - Number: addition.TokenCount, } result := results.Addition{ diff --git a/service/workers/parsing_handler.go b/service/workers/parsing_handler.go index 3c6bae24..226f2d41 100644 --- a/service/workers/parsing_handler.go +++ b/service/workers/parsing_handler.go @@ -8,7 +8,6 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/config" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/ethereum/go-ethereum/ethclient" @@ -94,7 +93,6 @@ func (p *ParsingHandler) Handle(ctx context.Context, parsing *jobs.Parsing) (*re var transfers []*events.Transfer var sales []*events.Sale timestamps := make(map[uint64]time.Time) - standards := make(map[string]string) for _, log := range logs { // skip logs for reverted transactions @@ -119,7 +117,6 @@ func (p *ParsingHandler) Handle(ctx context.Context, parsing *jobs.Parsing) (*re return nil, fmt.Errorf("could not parse ERC721 transfer: %w", err) } transfers = append(transfers, transfer) - standards[transfer.ID] = jobs.StandardERC721 p.log.Trace(). Str("transaction", log.TxHash.Hex()). @@ -138,7 +135,6 @@ func (p *ParsingHandler) Handle(ctx context.Context, parsing *jobs.Parsing) (*re return nil, fmt.Errorf("could not parse ERC1155 transfer: %w", err) } transfers = append(transfers, transfer) - standards[transfer.ID] = jobs.StandardERC1155 p.log.Trace(). Str("transaction", log.TxHash.Hex()). @@ -159,8 +155,6 @@ func (p *ParsingHandler) Handle(ctx context.Context, parsing *jobs.Parsing) (*re transfers = append(transfers, batch...) for _, transfer := range batch { - standards[transfer.ID] = jobs.StandardERC1155 - p.log.Trace(). Str("transaction", log.TxHash.Hex()). Uint("index", log.Index). @@ -217,50 +211,12 @@ func (p *ParsingHandler) Handle(ctx context.Context, parsing *jobs.Parsing) (*re sale.EmittedAt = timestamps[sale.BlockNumber] } - // Go through all transfers and convert them to mints/burns where appropriate. - var additions []*jobs.Addition - var modifications []*jobs.Modification - for _, transfer := range transfers { - switch { - - case transfer.SenderAddress == params.AddressZero: - - addition := jobs.Addition{ - ID: uuid.NewString(), - ChainID: transfer.ChainID, - // CollectionID added later - ContractAddress: transfer.CollectionAddress, - TokenID: transfer.TokenID, - TokenStandard: standards[transfer.ID], - OwnerAddress: transfer.ReceiverAddress, - TokenCount: transfer.TokenCount, - } - additions = append(additions, &addition) - - default: - - modification := jobs.Modification{ - ID: uuid.NewString(), - ChainID: transfer.ChainID, - // CollectionID added later - ContractAddress: transfer.CollectionAddress, - TokenID: transfer.TokenID, - SenderAddress: transfer.SenderAddress, - ReceiverAddress: transfer.ReceiverAddress, - TokenCount: transfer.TokenCount, - } - modifications = append(modifications, &modification) - } - } - // Put everything together for the result. result := results.Parsing{ - Job: parsing, - Sales: sales, - Transfers: transfers, - Additions: additions, - Modifications: modifications, - Requests: requests, + Job: parsing, + Sales: sales, + Transfers: transfers, + Requests: requests, } return &result, nil diff --git a/sql/01_tables_graph_database.sql b/sql/01_tables_graph_database.sql index 9d962926..20e22e3a 100644 --- a/sql/01_tables_graph_database.sql +++ b/sql/01_tables_graph_database.sql @@ -50,8 +50,9 @@ CREATE TABLE owners ( owner VARCHAR(128) NOT NULL, nft_id UUID NOT NULL REFERENCES nfts ON DELETE CASCADE, + event_id UUID NOT NULL, number NUMERIC NOT NULL, - PRIMARY KEY (owner, nft_id) + PRIMARY KEY (owner, nft_id, event_id) ); CREATE INDEX owners_nft_id_idx ON owners(nft_id); diff --git a/sql/02_tables_events_database.sql b/sql/02_tables_events_database.sql index f161e49e..9d1ef15d 100644 --- a/sql/02_tables_events_database.sql +++ b/sql/02_tables_events_database.sql @@ -2,6 +2,7 @@ CREATE TABLE transfers ( id UUID PRIMARY KEY, chain_id NUMERIC NOT NULL, + token_standard VARCHAR(128) NOT NULL, collection_address VARCHAR(128) NOT NULL, token_id VARCHAR(128) NOT NULL, block_number NUMERIC NOT NULL, diff --git a/storage/events/transfer_repository.go b/storage/events/transfer_repository.go index c3a926ec..dd43bdc5 100644 --- a/storage/events/transfer_repository.go +++ b/storage/events/transfer_repository.go @@ -34,6 +34,7 @@ func (t *TransferRepository) Upsert(transfers ...*events.Transfer) error { Columns( "id", "chain_id", + "token_standard", "collection_address", "token_id", "block_number", @@ -50,6 +51,7 @@ func (t *TransferRepository) Upsert(transfers ...*events.Transfer) error { query = query.Values( transfer.ID, transfer.ChainID, + transfer.TokenStandard, transfer.CollectionAddress, transfer.TokenID, transfer.BlockNumber, diff --git a/storage/graph/nft_repository.go b/storage/graph/nft_repository.go index c87f3f3f..97675ece 100644 --- a/storage/graph/nft_repository.go +++ b/storage/graph/nft_repository.go @@ -7,7 +7,6 @@ import ( "github.com/Masterminds/squirrel" "github.com/NFT-com/indexer/models/graph" - "github.com/NFT-com/indexer/models/jobs" ) type NFTRepository struct { @@ -24,20 +23,20 @@ func NewNFTRepository(db *sql.DB) *NFTRepository { return &n } -func (n *NFTRepository) Touch(modifications ...*jobs.Modification) error { +func (n *NFTRepository) Touch(nfts ...*graph.NFT) error { - if len(modifications) == 0 { + if len(nfts) == 0 { return nil } - set := make(map[string]*jobs.Modification, len(modifications)) - for _, modification := range modifications { - set[modification.NFTID()] = modification + set := make(map[string]*graph.NFT, len(nfts)) + for _, nft := range nfts { + set[nft.ID] = nft } - modifications = make([]*jobs.Modification, 0, len(set)) - for _, modification := range set { - modifications = append(modifications, modification) + nfts = make([]*graph.NFT, 0, len(set)) + for _, nft := range set { + nfts = append(nfts, nft) } query := n.build. @@ -55,11 +54,11 @@ func (n *NFTRepository) Touch(modifications ...*jobs.Modification) error { Suffix("ON CONFLICT (id) DO UPDATE SET " + "updated_at = EXCLUDED.updated_at") - for _, modification := range modifications { + for _, nft := range nfts { query = query.Values( - modification.NFTID(), - modification.CollectionID, - modification.TokenID, + nft.ID, + nft.CollectionID, + nft.TokenID, "", "", "", @@ -75,7 +74,7 @@ func (n *NFTRepository) Touch(modifications ...*jobs.Modification) error { return nil } -func (n *NFTRepository) Insert(nft *graph.NFT) error { +func (n *NFTRepository) Upsert(nft *graph.NFT) error { _, err := n.build. Insert("nfts"). diff --git a/storage/graph/owner_repository.go b/storage/graph/owner_repository.go index 0df1f582..4a26989e 100644 --- a/storage/graph/owner_repository.go +++ b/storage/graph/owner_repository.go @@ -6,9 +6,7 @@ import ( "github.com/Masterminds/squirrel" - "github.com/NFT-com/indexer/models/graph" - "github.com/NFT-com/indexer/models/jobs" - "github.com/NFT-com/indexer/models/results" + "github.com/NFT-com/indexer/models/events" ) type OwnerRepository struct { @@ -25,43 +23,34 @@ func NewOwnerRepository(db *sql.DB) *OwnerRepository { return &n } -func (n *OwnerRepository) Add(additions ...*results.Addition) error { +func (n *OwnerRepository) Upsert(transfers ...*events.Transfer) error { - if len(additions) == 0 { + if len(transfers) == 0 { return nil } - set := make(map[string]*results.Addition, len(additions)) - for _, addition := range additions { - key := fmt.Sprintf("%s-%s", addition.NFT.Owner, addition.NFT.ID) - existing, ok := set[key] - if ok { - existing.NFT.Number += addition.NFT.Number - continue - } - set[key] = addition - } - - additions = make([]*results.Addition, 0, len(set)) - for _, addition := range set { - additions = append(additions, addition) - } - query := n.build. Insert("owners"). Columns( "owner", "nft_id", + "event_id", "number", ). - Suffix("ON CONFLICT (nft_id, owner) DO UPDATE SET " + - "number = (owners.number + EXCLUDED.number)") + Suffix("ON CONFLICT DO NOTHING") - for _, addition := range additions { + for _, transfer := range transfers { + query = query.Values( + transfer.SenderAddress, + transfer.NFTID(), + transfer.EventID(), + -int(transfer.TokenCount), + ) query = query.Values( - addition.NFT.Owner, - addition.NFT.ID, - addition.NFT.Number, + transfer.ReceiverAddress, + transfer.NFTID(), + transfer.EventID(), + int(transfer.TokenCount), ) } @@ -72,35 +61,3 @@ func (n *OwnerRepository) Add(additions ...*results.Addition) error { return nil } - -func (o *OwnerRepository) Change(modifications ...*jobs.Modification) error { - - if len(modifications) == 0 { - return nil - } - - additions := make([]*results.Addition, 0, 2*len(modifications)) - for _, modification := range modifications { - - // First we add the count to the new owner. - addition := graph.NFT{ - ID: modification.NFTID(), - Owner: modification.ReceiverAddress, - Number: modification.TokenCount, - } - additions = append(additions, &results.Addition{NFT: &addition}) - - // Then we remove it from the old owner. - removal := addition - removal.Owner = modification.SenderAddress - removal.Number = -removal.Number - additions = append(additions, &results.Addition{NFT: &removal}) - } - - err := o.Add(additions...) - if err != nil { - return fmt.Errorf("could not add counts: %w", err) - } - - return nil -} diff --git a/storage/graph/trait_repository.go b/storage/graph/trait_repository.go index 298b593b..aac8ab5f 100644 --- a/storage/graph/trait_repository.go +++ b/storage/graph/trait_repository.go @@ -23,7 +23,7 @@ func NewTraitRepository(db *sql.DB) *TraitRepository { return &s } -func (t *TraitRepository) Insert(traits ...*graph.Trait) error { +func (t *TraitRepository) Upsert(traits ...*graph.Trait) error { if len(traits) == 0 { return nil diff --git a/storage/jobs/failure_repository.go b/storage/jobs/failure_repository.go index 3b09ac7c..0f074aae 100644 --- a/storage/jobs/failure_repository.go +++ b/storage/jobs/failure_repository.go @@ -76,8 +76,6 @@ func (b *FailureRepository) Addition(addition *jobs.Addition, message string) er addition.ContractAddress, addition.TokenID, addition.TokenStandard, - addition.OwnerAddress, - addition.TokenCount, message, ). Suffix("ON CONFLICT (id) DO UPDATE SET failure_message = EXCLUDED.failure_message") From 0a1d5065f3b7590686336398a62370c7b17c6131 Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 18:08:58 +0100 Subject: [PATCH 02/14] Remove unnecessary modification job file --- models/jobs/modification.go | 20 -------------------- 1 file changed, 20 deletions(-) delete mode 100644 models/jobs/modification.go diff --git a/models/jobs/modification.go b/models/jobs/modification.go deleted file mode 100644 index 560445b0..00000000 --- a/models/jobs/modification.go +++ /dev/null @@ -1,20 +0,0 @@ -package jobs - -import ( - "github.com/NFT-com/indexer/models/id" -) - -type Modification struct { - ID string `json:"id"` - ChainID uint64 `json:"chain_id"` - CollectionID string `json:"collection_id"` - ContractAddress string `json:"contract_address"` - TokenID string `json:"token_id"` - SenderAddress string `json:"sender_address"` - ReceiverAddress string `json:"receiver_address"` - TokenCount uint `json:"token_count"` -} - -func (m Modification) NFTID() string { - return id.NFT(m.ChainID, m.ContractAddress, m.TokenID) -} From c7d1000eca3ee2a7527547b6ecfb2155027185c5 Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 18:12:53 +0100 Subject: [PATCH 03/14] Fix NFT repo naming of parameters --- service/pipeline/stores.go | 2 +- storage/graph/nft_repository.go | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/service/pipeline/stores.go b/service/pipeline/stores.go index 89a9fa30..c8f5338f 100644 --- a/service/pipeline/stores.go +++ b/service/pipeline/stores.go @@ -7,7 +7,7 @@ import ( ) type NFTStore interface { - Touch(nfts ...*graph.NFT) error + Touch(dummies ...*graph.NFT) error Upsert(nft *graph.NFT) error } diff --git a/storage/graph/nft_repository.go b/storage/graph/nft_repository.go index 97675ece..3c1ccabf 100644 --- a/storage/graph/nft_repository.go +++ b/storage/graph/nft_repository.go @@ -23,20 +23,20 @@ func NewNFTRepository(db *sql.DB) *NFTRepository { return &n } -func (n *NFTRepository) Touch(nfts ...*graph.NFT) error { +func (n *NFTRepository) Touch(dummies ...*graph.NFT) error { - if len(nfts) == 0 { + if len(dummies) == 0 { return nil } - set := make(map[string]*graph.NFT, len(nfts)) - for _, nft := range nfts { - set[nft.ID] = nft + set := make(map[string]*graph.NFT, len(dummies)) + for _, dummy := range dummies { + set[dummy.ID] = dummy } - nfts = make([]*graph.NFT, 0, len(set)) - for _, nft := range set { - nfts = append(nfts, nft) + dummies = make([]*graph.NFT, 0, len(set)) + for _, dummy := range set { + dummies = append(dummies, dummy) } query := n.build. @@ -54,11 +54,11 @@ func (n *NFTRepository) Touch(nfts ...*graph.NFT) error { Suffix("ON CONFLICT (id) DO UPDATE SET " + "updated_at = EXCLUDED.updated_at") - for _, nft := range nfts { + for _, dummy := range dummies { query = query.Values( - nft.ID, - nft.CollectionID, - nft.TokenID, + dummy.ID, + dummy.CollectionID, + dummy.TokenID, "", "", "", From bb95733e13d1c3658d26e096e44aacb5f2f5c89d Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 18:25:55 +0100 Subject: [PATCH 04/14] Fix UUID for events --- models/id/event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/id/event.go b/models/id/event.go index 3e407570..76d72510 100644 --- a/models/id/event.go +++ b/models/id/event.go @@ -9,6 +9,6 @@ import ( func Event(hash string, index uint) string { eventHash := sha3.Sum256([]byte(fmt.Sprintf("%s-%d", hash, index))) - eventID := uuid.Must(uuid.FromBytes(eventHash[:])) + eventID := uuid.Must(uuid.FromBytes(eventHash[:16])) return eventID.String() } From 6df85fc8458925481c5446294ce48feb64517ea0 Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 18:31:37 +0100 Subject: [PATCH 05/14] Add debug logs for NFT IDs --- service/pipeline/parsing_stage.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/service/pipeline/parsing_stage.go b/service/pipeline/parsing_stage.go index 63bd60ef..38ece2ff 100644 --- a/service/pipeline/parsing_stage.go +++ b/service/pipeline/parsing_stage.go @@ -181,6 +181,13 @@ func (p *ParsingStage) process(payload []byte) error { } } + for _, touch := range touches { + p.log.Debug().Str("touch_id", touch.ID).Msg("touching NFT ID") + } + for _, transfer := range result.Transfers { + p.log.Debug().Str("transfer_id", transfer.NFTID()).Msg("transfer NFT ID") + } + // Touch all the NFTs that have been created, so that we can apply owner changes // out of order, before the full NFT information is available from the addition. err = p.nfts.Touch(touches...) From d4d6a5dca45179de4a1db9cdb39faca6a88ee31c Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 18:42:50 +0100 Subject: [PATCH 06/14] Add transfer details --- service/pipeline/parsing_stage.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/service/pipeline/parsing_stage.go b/service/pipeline/parsing_stage.go index 38ece2ff..d5b25a02 100644 --- a/service/pipeline/parsing_stage.go +++ b/service/pipeline/parsing_stage.go @@ -133,10 +133,21 @@ func (p *ParsingStage) process(payload []byte) error { } // We can go through the transfers and process those with zero address as mints. - var touches []*graph.NFT + var dummies []*graph.NFT var payloads [][]byte for _, transfer := range result.Transfers { + p.log.Debug(). + Str("sender_address", transfer.SenderAddress). + Uint64("chain_id", transfer.ChainID). + Str("collection_address", transfer.CollectionAddress). + Str("token_id", transfer.TokenID). + Str("nft_id", transfer.NFTID()). + Str("tx_hash", transfer.TransactionHash). + Uint("event_index", transfer.EventIndex). + Str("event_id", transfer.EventID()). + Msg("transfer details") + // Skip transfers that do not originate from the zero address so we process // only mints. if transfer.SenderAddress != params.AddressZero { @@ -151,12 +162,14 @@ func (p *ParsingStage) process(payload []byte) error { } // Create a placeholder NFT that we will create in the DB. - touch := graph.NFT{ + dummy := graph.NFT{ ID: transfer.NFTID(), CollectionID: collection.ID, TokenID: transfer.TokenID, } - touches = append(touches, &touch) + dummies = append(dummies, &dummy) + + p.log.Debug().Str("dummy_id", dummy.ID).Msg("dummy details") // Create an addition job to complete the data for the NFT. addition := jobs.Addition{ @@ -181,18 +194,11 @@ func (p *ParsingStage) process(payload []byte) error { } } - for _, touch := range touches { - p.log.Debug().Str("touch_id", touch.ID).Msg("touching NFT ID") - } - for _, transfer := range result.Transfers { - p.log.Debug().Str("transfer_id", transfer.NFTID()).Msg("transfer NFT ID") - } - // Touch all the NFTs that have been created, so that we can apply owner changes // out of order, before the full NFT information is available from the addition. - err = p.nfts.Touch(touches...) + err = p.nfts.Touch(dummies...) if err != nil { - return fmt.Errorf("could not touch NFTs: %w", err) + return fmt.Errorf("could not touch dummies: %w", err) } // Next, we can store all the raw events for transfers and sales. From 4b438273675fd49c1db0ab64edc336f7b6e7a38e Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 20:55:22 +0100 Subject: [PATCH 07/14] Create dummy for all transfers --- service/pipeline/parsing_stage.go | 34 +++++++++++++++---------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/service/pipeline/parsing_stage.go b/service/pipeline/parsing_stage.go index d5b25a02..b11e37ea 100644 --- a/service/pipeline/parsing_stage.go +++ b/service/pipeline/parsing_stage.go @@ -137,23 +137,6 @@ func (p *ParsingStage) process(payload []byte) error { var payloads [][]byte for _, transfer := range result.Transfers { - p.log.Debug(). - Str("sender_address", transfer.SenderAddress). - Uint64("chain_id", transfer.ChainID). - Str("collection_address", transfer.CollectionAddress). - Str("token_id", transfer.TokenID). - Str("nft_id", transfer.NFTID()). - Str("tx_hash", transfer.TransactionHash). - Uint("event_index", transfer.EventIndex). - Str("event_id", transfer.EventID()). - Msg("transfer details") - - // Skip transfers that do not originate from the zero address so we process - // only mints. - if transfer.SenderAddress != params.AddressZero { - continue - } - // Get the collection ID based on chain ID and collection address, so we can // reference it directly for the addition job and the NFT insertion. collection, err := p.collections.One(transfer.ChainID, transfer.CollectionAddress) @@ -171,6 +154,23 @@ func (p *ParsingStage) process(payload []byte) error { p.log.Debug().Str("dummy_id", dummy.ID).Msg("dummy details") + p.log.Debug(). + Str("sender_address", transfer.SenderAddress). + Uint64("chain_id", transfer.ChainID). + Str("collection_address", transfer.CollectionAddress). + Str("token_id", transfer.TokenID). + Str("nft_id", transfer.NFTID()). + Str("tx_hash", transfer.TransactionHash). + Uint("event_index", transfer.EventIndex). + Str("event_id", transfer.EventID()). + Msg("transfer details") + + // Skip transfers that do not originate from the zero address so we process + // only mints. + if transfer.SenderAddress != params.AddressZero { + continue + } + // Create an addition job to complete the data for the NFT. addition := jobs.Addition{ ID: uuid.NewString(), From 3acedaf5d729a3ec4d8076441d016ab7dc449da2 Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 22:14:01 +0100 Subject: [PATCH 08/14] Handle transfers to self --- service/pipeline/parsing_stage.go | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/service/pipeline/parsing_stage.go b/service/pipeline/parsing_stage.go index b11e37ea..e2e70cac 100644 --- a/service/pipeline/parsing_stage.go +++ b/service/pipeline/parsing_stage.go @@ -137,6 +137,13 @@ func (p *ParsingStage) process(payload []byte) error { var payloads [][]byte for _, transfer := range result.Transfers { + // We skip transfers from and to the same address; for mints, this should + // never be the case, or they never really exist. For everything else, we + // already have an addition job and we don't need to change owner info. + if transfer.SenderAddress == transfer.ReceiverAddress { + continue + } + // Get the collection ID based on chain ID and collection address, so we can // reference it directly for the addition job and the NFT insertion. collection, err := p.collections.One(transfer.ChainID, transfer.CollectionAddress) @@ -152,19 +159,6 @@ func (p *ParsingStage) process(payload []byte) error { } dummies = append(dummies, &dummy) - p.log.Debug().Str("dummy_id", dummy.ID).Msg("dummy details") - - p.log.Debug(). - Str("sender_address", transfer.SenderAddress). - Uint64("chain_id", transfer.ChainID). - Str("collection_address", transfer.CollectionAddress). - Str("token_id", transfer.TokenID). - Str("nft_id", transfer.NFTID()). - Str("tx_hash", transfer.TransactionHash). - Uint("event_index", transfer.EventIndex). - Str("event_id", transfer.EventID()). - Msg("transfer details") - // Skip transfers that do not originate from the zero address so we process // only mints. if transfer.SenderAddress != params.AddressZero { From d67cd4a0adffb8e551c6ff4ed395ea4e4b67a16d Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 22:47:38 +0100 Subject: [PATCH 09/14] Improve skipping if no-op transfers --- service/pipeline/parsing_stage.go | 7 ------- storage/graph/owner_repository.go | 7 +++++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/service/pipeline/parsing_stage.go b/service/pipeline/parsing_stage.go index e2e70cac..a16b4d93 100644 --- a/service/pipeline/parsing_stage.go +++ b/service/pipeline/parsing_stage.go @@ -137,13 +137,6 @@ func (p *ParsingStage) process(payload []byte) error { var payloads [][]byte for _, transfer := range result.Transfers { - // We skip transfers from and to the same address; for mints, this should - // never be the case, or they never really exist. For everything else, we - // already have an addition job and we don't need to change owner info. - if transfer.SenderAddress == transfer.ReceiverAddress { - continue - } - // Get the collection ID based on chain ID and collection address, so we can // reference it directly for the addition job and the NFT insertion. collection, err := p.collections.One(transfer.ChainID, transfer.CollectionAddress) diff --git a/storage/graph/owner_repository.go b/storage/graph/owner_repository.go index 4a26989e..61ff2af6 100644 --- a/storage/graph/owner_repository.go +++ b/storage/graph/owner_repository.go @@ -40,12 +40,19 @@ func (n *OwnerRepository) Upsert(transfers ...*events.Transfer) error { Suffix("ON CONFLICT DO NOTHING") for _, transfer := range transfers { + + // skip transfers that have same sender & receiver + if transfer.SenderAddress == transfer.ReceiverAddress { + continue + } + query = query.Values( transfer.SenderAddress, transfer.NFTID(), transfer.EventID(), -int(transfer.TokenCount), ) + query = query.Values( transfer.ReceiverAddress, transfer.NFTID(), From dae6deff9110a79f76e735c14c66b34d94d1878f Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 22:52:56 +0100 Subject: [PATCH 10/14] Improve conflict handling on owner changes --- storage/graph/owner_repository.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/graph/owner_repository.go b/storage/graph/owner_repository.go index 61ff2af6..f1ee992c 100644 --- a/storage/graph/owner_repository.go +++ b/storage/graph/owner_repository.go @@ -37,7 +37,7 @@ func (n *OwnerRepository) Upsert(transfers ...*events.Transfer) error { "event_id", "number", ). - Suffix("ON CONFLICT DO NOTHING") + Suffix("ON CONFLICT (owner, nft_id, event_id) DO UPDATE SET number = owners.number + EXCLUDED.number") for _, transfer := range transfers { From 1ba8045eb91c7b3c94f9c708638b40b679f78415 Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 23:06:00 +0100 Subject: [PATCH 11/14] Make contract indices case insensitive --- sql/01_tables_graph_database.sql | 2 +- sql/02_tables_events_database.sql | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/01_tables_graph_database.sql b/sql/01_tables_graph_database.sql index 20e22e3a..37596944 100644 --- a/sql/01_tables_graph_database.sql +++ b/sql/01_tables_graph_database.sql @@ -30,7 +30,7 @@ CREATE TABLE collections UNIQUE (network_id, contract_address) ); -CREATE INDEX collections_contract_address_idx ON collections(contract_address); +CREATE INDEX collections_contract_address_idx ON collections(LOWER(contract_address)); CREATE TABLE nfts ( diff --git a/sql/02_tables_events_database.sql b/sql/02_tables_events_database.sql index 9d1ef15d..c263a35f 100644 --- a/sql/02_tables_events_database.sql +++ b/sql/02_tables_events_database.sql @@ -16,7 +16,7 @@ CREATE TABLE transfers UNIQUE (block_number, transaction_hash, event_index) ); -CREATE INDEX transfers_collection_address_idx ON transfers(collection_address); +CREATE INDEX transfers_collection_address_idx ON transfers(LOWER(collection_address)); CREATE INDEX transfers_token_id_idx ON transfers(token_id); @@ -38,8 +38,8 @@ CREATE TABLE sales UNIQUE (block_number, transaction_hash, event_index) ); -CREATE INDEX sales_marketplace_address_idx ON sales(marketplace_address); +CREATE INDEX sales_marketplace_address_idx ON sales(LOWER(marketplace_address)); -CREATE INDEX sales_collection_address_idx ON sales(collection_address); +CREATE INDEX sales_collection_address_idx ON sales(LOWER(collection_address)); CREATE INDEX sales_token_id_idx ON sales(token_id); \ No newline at end of file From d5a6a424161c64c9505d6da4460408f9de75e19c Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 23:20:10 +0100 Subject: [PATCH 12/14] Fix addition failure table & inserts --- sql/03_tables_jobs_database.sql | 2 -- storage/graph/owner_repository.go | 5 ----- storage/jobs/failure_repository.go | 2 -- 3 files changed, 9 deletions(-) diff --git a/sql/03_tables_jobs_database.sql b/sql/03_tables_jobs_database.sql index cfb72834..468bad80 100644 --- a/sql/03_tables_jobs_database.sql +++ b/sql/03_tables_jobs_database.sql @@ -27,7 +27,5 @@ CREATE TABLE addition_failures contract_address VARCHAR(128) NOT NULL, token_id VARCHAR(256) NOT NULL, token_standard VARCHAR(256) NOT NULL, - owner_address VARCHAR(128) NOT NULL, - token_count NUMERIC NOT NULL, failure_message TEXT NOT NULL ); diff --git a/storage/graph/owner_repository.go b/storage/graph/owner_repository.go index f1ee992c..52a4ba79 100644 --- a/storage/graph/owner_repository.go +++ b/storage/graph/owner_repository.go @@ -41,11 +41,6 @@ func (n *OwnerRepository) Upsert(transfers ...*events.Transfer) error { for _, transfer := range transfers { - // skip transfers that have same sender & receiver - if transfer.SenderAddress == transfer.ReceiverAddress { - continue - } - query = query.Values( transfer.SenderAddress, transfer.NFTID(), diff --git a/storage/jobs/failure_repository.go b/storage/jobs/failure_repository.go index 0f074aae..4a22a6a6 100644 --- a/storage/jobs/failure_repository.go +++ b/storage/jobs/failure_repository.go @@ -66,8 +66,6 @@ func (b *FailureRepository) Addition(addition *jobs.Addition, message string) er "contract_address", "token_id", "token_standard", - "owner_address", - "token_count", "failure_message", ). Values( From 5878090aa81e4af1fd8bc417ae37f1e17b1584ad Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 23:38:58 +0100 Subject: [PATCH 13/14] Actually fix owner changes properly --- service/pipeline/parsing_stage.go | 11 ++++++++++- sql/02_tables_events_database.sql | 2 +- storage/graph/owner_repository.go | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/service/pipeline/parsing_stage.go b/service/pipeline/parsing_stage.go index a16b4d93..2e656ccd 100644 --- a/service/pipeline/parsing_stage.go +++ b/service/pipeline/parsing_stage.go @@ -14,6 +14,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/lambda" "github.com/NFT-com/indexer/config/params" + "github.com/NFT-com/indexer/models/events" "github.com/NFT-com/indexer/models/graph" "github.com/NFT-com/indexer/models/jobs" "github.com/NFT-com/indexer/models/results" @@ -135,8 +136,16 @@ func (p *ParsingStage) process(payload []byte) error { // We can go through the transfers and process those with zero address as mints. var dummies []*graph.NFT var payloads [][]byte + var owners []*events.Transfer for _, transfer := range result.Transfers { + if transfer.SenderAddress == transfer.ReceiverAddress { + continue + } + + // Collect transfers that are not no-ops for owner changes. + owners = append(owners, transfer) + // Get the collection ID based on chain ID and collection address, so we can // reference it directly for the addition job and the NFT insertion. collection, err := p.collections.One(transfer.ChainID, transfer.CollectionAddress) @@ -199,7 +208,7 @@ func (p *ParsingStage) process(payload []byte) error { } // Last but not least, we can upsert the owner change updates for each transfer. - err = p.owners.Upsert(result.Transfers...) + err = p.owners.Upsert(owners...) if err != nil { return fmt.Errorf("could not upsert owners: %w", err) } diff --git a/sql/02_tables_events_database.sql b/sql/02_tables_events_database.sql index c263a35f..45e80e15 100644 --- a/sql/02_tables_events_database.sql +++ b/sql/02_tables_events_database.sql @@ -42,4 +42,4 @@ CREATE INDEX sales_marketplace_address_idx ON sales(LOWER(marketplace_address)); CREATE INDEX sales_collection_address_idx ON sales(LOWER(collection_address)); -CREATE INDEX sales_token_id_idx ON sales(token_id); \ No newline at end of file +CREATE INDEX sales_token_id_idx ON sales(token_id); diff --git a/storage/graph/owner_repository.go b/storage/graph/owner_repository.go index 52a4ba79..e33975c0 100644 --- a/storage/graph/owner_repository.go +++ b/storage/graph/owner_repository.go @@ -37,7 +37,7 @@ func (n *OwnerRepository) Upsert(transfers ...*events.Transfer) error { "event_id", "number", ). - Suffix("ON CONFLICT (owner, nft_id, event_id) DO UPDATE SET number = owners.number + EXCLUDED.number") + Suffix("ON CONFLICT DO NOTHING") for _, transfer := range transfers { From bbb65056c51622010e6926fe4e1d61c2ebfe31b0 Mon Sep 17 00:00:00 2001 From: Max Wolter Date: Thu, 30 Jun 2022 23:42:04 +0100 Subject: [PATCH 14/14] Rename parameter for event ID --- models/id/event.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/models/id/event.go b/models/id/event.go index 76d72510..6225f1a7 100644 --- a/models/id/event.go +++ b/models/id/event.go @@ -7,8 +7,8 @@ import ( "golang.org/x/crypto/sha3" ) -func Event(hash string, index uint) string { - eventHash := sha3.Sum256([]byte(fmt.Sprintf("%s-%d", hash, index))) +func Event(tx string, index uint) string { + eventHash := sha3.Sum256([]byte(fmt.Sprintf("%s-%d", tx, index))) eventID := uuid.Must(uuid.FromBytes(eventHash[:16])) return eventID.String() }