From aedf6371dc364ef65ffa7e3257330d13d7d32603 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Thu, 25 Mar 2021 14:07:45 +0000 Subject: [PATCH 01/18] Refactor AssetStatProcessor to make space for handling new change types --- .../processors/asset_stats_processor.go | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor.go b/services/horizon/internal/ingest/processors/asset_stats_processor.go index 72989fd133..59586cc424 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor.go @@ -40,27 +40,35 @@ func (p *AssetStatsProcessor) reset() { } func (p *AssetStatsProcessor) ProcessChange(change ingest.Change) error { - if change.Type != xdr.LedgerEntryTypeTrustline { + switch change.Type { + case xdr.LedgerEntryTypeTrustline: + if p.useLedgerEntryCache { + return p.useCachedChange(change) + } + return p.processTrustlineChange(change) + default: return nil } +} - if p.useLedgerEntryCache { - err := p.cache.AddChange(change) - if err != nil { - return errors.Wrap(err, "error adding to ledgerCache") - } +func (p *AssetStatsProcessor) useCachedChange(change ingest.Change) error { + err := p.cache.AddChange(change) + if err != nil { + return errors.Wrap(err, "error adding to ledgerCache") + } - if p.cache.Size() > maxBatchSize { - err = p.Commit() - if err != nil { - return errors.Wrap(err, "error in Commit") - } - p.reset() + if p.cache.Size() > maxBatchSize { + err = p.Commit() + if err != nil { + return errors.Wrap(err, "error in Commit") } - return nil + p.reset() } + return nil +} - if !(change.Pre == nil && change.Post != nil) { +func (p *AssetStatsProcessor) processTrustlineChange(change ingest.Change) error { + if change.Pre != nil || change.Post == nil { return errors.New("AssetStatsProcessor is in insert only mode") } From 8b6b47a7690cbe8b4064f8d0877b928fa5720da2 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Fri, 26 Mar 2021 15:09:56 +0000 Subject: [PATCH 02/18] WIP --adding claimable balances --- services/horizon/internal/db2/history/main.go | 5 +- .../processors/asset_stats_processor.go | 42 ++++++- .../processors/asset_stats_processor_test.go | 15 +++ .../ingest/processors/asset_stats_set.go | 103 +++++++++--------- .../ingest/processors/asset_stats_set_test.go | 15 ++- 5 files changed, 117 insertions(+), 63 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 4c685d3175..778506c64a 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -347,6 +347,7 @@ func (e ExpAssetStat) PagingToken() string { type ExpAssetStatAccounts struct { Authorized int32 `json:"authorized"` AuthorizedToMaintainLiabilities int32 `json:"authorized_to_maintain_liabilities"` + ClaimableBalances int32 `json:"claimable_balances"` Unauthorized int32 `json:"unauthorized"` } @@ -367,18 +368,20 @@ func (a ExpAssetStatAccounts) Add(b ExpAssetStatAccounts) ExpAssetStatAccounts { return ExpAssetStatAccounts{ Authorized: a.Authorized + b.Authorized, AuthorizedToMaintainLiabilities: a.AuthorizedToMaintainLiabilities + b.AuthorizedToMaintainLiabilities, + ClaimableBalances: a.ClaimableBalances + b.ClaimableBalances, Unauthorized: a.Unauthorized + b.Unauthorized, } } func (a ExpAssetStatAccounts) IsZero() bool { - return a.Authorized == 0 && a.AuthorizedToMaintainLiabilities == 0 && a.Unauthorized == 0 + return a == ExpAssetStatAccounts{} } // ExpAssetStatBalances represents the summarized balances for a single Asset type ExpAssetStatBalances struct { Authorized string `json:"authorized"` AuthorizedToMaintainLiabilities string `json:"authorized_to_maintain_liabilities"` + ClaimableBalances string `json:"claimable_balances"` Unauthorized string `json:"unauthorized"` } diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor.go b/services/horizon/internal/ingest/processors/asset_stats_processor.go index 59586cc424..344e4726d4 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor.go @@ -41,6 +41,11 @@ func (p *AssetStatsProcessor) reset() { func (p *AssetStatsProcessor) ProcessChange(change ingest.Change) error { switch change.Type { + case xdr.LedgerEntryTypeClaimableBalance: + if p.useLedgerEntryCache { + return p.useCachedChange(change) + } + return p.processClaimableBalanceChange(change) case xdr.LedgerEntryTypeTrustline: if p.useLedgerEntryCache { return p.useCachedChange(change) @@ -67,6 +72,31 @@ func (p *AssetStatsProcessor) useCachedChange(change ingest.Change) error { return nil } +func (p *AssetStatsProcessor) processClaimableBalanceChange(change ingest.Change) error { + switch { + case change.Pre == nil && change.Post != nil: + // Created + // newCb := change.Post.Data.MustClaimableBalance() + // Figure out the asset + // Exclude XLM + // increase claimable balance stats + case change.Pre != nil && change.Post != nil: + // Not used now. Handled for the future, just in case. + // Figure out the asset + // Exclude XLM + case change.Pre != nil && change.Post == nil: + // Removed + // oldCb := change.Pre.Data.MustClaimableBalance() + // Figure out the asset + // Exclude XLM + // decrease claimable balance stats + default: + return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") + } + + return nil +} + func (p *AssetStatsProcessor) processTrustlineChange(change ingest.Change) error { if change.Pre != nil || change.Post == nil { return errors.New("AssetStatsProcessor is in insert only mode") @@ -227,8 +257,8 @@ func (p *AssetStatsProcessor) adjustAssetStat( preTrustline *xdr.TrustLineEntry, postTrustline *xdr.TrustLineEntry, ) error { - deltaAccounts := map[xdr.Uint32]int32{} - deltaBalances := map[xdr.Uint32]int64{} + deltaAccounts := deltas{} + deltaBalances := deltas{} if preTrustline == nil && postTrustline == nil { return ingest.NewStateError(errors.New("both pre and post trustlines cannot be nil")) @@ -237,13 +267,13 @@ func (p *AssetStatsProcessor) adjustAssetStat( var trustline xdr.TrustLineEntry if preTrustline != nil { trustline = *preTrustline - deltaAccounts[preTrustline.Flags] -= 1 - deltaBalances[preTrustline.Flags] -= int64(preTrustline.Balance) + deltaAccounts.AddByFlags(preTrustline.Flags, -1) + deltaBalances.AddByFlags(preTrustline.Flags, -int64(preTrustline.Balance)) } if postTrustline != nil { trustline = *postTrustline - deltaAccounts[postTrustline.Flags] += 1 - deltaBalances[postTrustline.Flags] += int64(postTrustline.Balance) + deltaAccounts.AddByFlags(postTrustline.Flags, 1) + deltaBalances.AddByFlags(postTrustline.Flags, int64(postTrustline.Balance)) } err := p.assetStatSet.AddDelta(trustline.Asset, deltaBalances, deltaAccounts) diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor_test.go b/services/horizon/internal/ingest/processors/asset_stats_processor_test.go index cf7fb320c9..c3f30e1c5d 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor_test.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor_test.go @@ -63,6 +63,7 @@ func (s *AssetStatsProcessorTestSuiteState) TestCreateTrustLine() { Authorized: "0", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "0", NumAccounts: 1, @@ -99,6 +100,7 @@ func (s *AssetStatsProcessorTestSuiteState) TestCreateTrustLineUnauthorized() { Authorized: "0", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "0", NumAccounts: 0, @@ -249,6 +251,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestInsertTrustLine() { Authorized: "10", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "10", NumAccounts: 1, @@ -270,6 +273,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestInsertTrustLine() { Authorized: "0", AuthorizedToMaintainLiabilities: "0", Unauthorized: "10", + ClaimableBalances: "0", }, Amount: "0", NumAccounts: 0, @@ -326,6 +330,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestUpdateTrustLine() { Authorized: "100", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "100", NumAccounts: 1, @@ -339,6 +344,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestUpdateTrustLine() { Authorized: "110", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "110", NumAccounts: 1, @@ -462,6 +468,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestUpdateTrustLineAuthorization() Authorized: "0", AuthorizedToMaintainLiabilities: "0", Unauthorized: "100", + ClaimableBalances: "0", }, Amount: "0", NumAccounts: 0, @@ -477,6 +484,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestUpdateTrustLineAuthorization() Authorized: "10", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "10", NumAccounts: 1, @@ -497,6 +505,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestUpdateTrustLineAuthorization() Authorized: "100", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "100", NumAccounts: 1, @@ -512,6 +521,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestUpdateTrustLineAuthorization() Authorized: "0", AuthorizedToMaintainLiabilities: "0", Unauthorized: "10", + ClaimableBalances: "0", }, Amount: "0", NumAccounts: 0, @@ -532,6 +542,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestUpdateTrustLineAuthorization() Authorized: "100", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "100", NumAccounts: 1, @@ -547,6 +558,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestUpdateTrustLineAuthorization() Authorized: "0", AuthorizedToMaintainLiabilities: "10", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "0", NumAccounts: 0, @@ -607,6 +619,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestRemoveTrustLine() { Authorized: "0", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "0", NumAccounts: 1, @@ -632,6 +645,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestRemoveTrustLine() { Authorized: "0", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "0", NumAccounts: 0, @@ -709,6 +723,7 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestProcessUpgradeChange() { Authorized: "10", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "10", NumAccounts: 1, diff --git a/services/horizon/internal/ingest/processors/asset_stats_set.go b/services/horizon/internal/ingest/processors/asset_stats_set.go index d8c7858d9c..8f4b7fe500 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set.go @@ -23,6 +23,7 @@ type assetStatValue struct { type assetStatBalances struct { Authorized *big.Int AuthorizedToMaintainLiabilities *big.Int + ClaimableBalances *big.Int Unauthorized *big.Int } @@ -39,6 +40,12 @@ func (a *assetStatBalances) Parse(b *history.ExpAssetStatBalances) error { } a.AuthorizedToMaintainLiabilities = authorizedToMaintainLiabilities + claimableBalances, ok := new(big.Int).SetString(b.ClaimableBalances, 10) + if !ok { + return errors.New("Error parsing: " + b.ClaimableBalances) + } + a.ClaimableBalances = claimableBalances + unauthorized, ok := new(big.Int).SetString(b.Unauthorized, 10) if !ok { return errors.New("Error parsing: " + b.Unauthorized) @@ -52,18 +59,23 @@ func (a assetStatBalances) Add(b assetStatBalances) assetStatBalances { return assetStatBalances{ Authorized: big.NewInt(0).Add(a.Authorized, b.Authorized), AuthorizedToMaintainLiabilities: big.NewInt(0).Add(a.AuthorizedToMaintainLiabilities, b.AuthorizedToMaintainLiabilities), + ClaimableBalances: big.NewInt(0).Add(a.ClaimableBalances, b.ClaimableBalances), Unauthorized: big.NewInt(0).Add(a.Unauthorized, b.Unauthorized), } } func (a assetStatBalances) IsZero() bool { - return a.Authorized.Cmp(big.NewInt(0)) == 0 && a.AuthorizedToMaintainLiabilities.Cmp(big.NewInt(0)) == 0 && a.Unauthorized.Cmp(big.NewInt(0)) == 0 + return a.Authorized.Cmp(big.NewInt(0)) == 0 && + a.AuthorizedToMaintainLiabilities.Cmp(big.NewInt(0)) == 0 && + a.ClaimableBalances.Cmp(big.NewInt(0)) == 0 && + a.Unauthorized.Cmp(big.NewInt(0)) == 0 } func (a assetStatBalances) ConvertToHistoryObject() history.ExpAssetStatBalances { return history.ExpAssetStatBalances{ Authorized: a.Authorized.String(), AuthorizedToMaintainLiabilities: a.AuthorizedToMaintainLiabilities.String(), + ClaimableBalances: a.ClaimableBalances.String(), Unauthorized: a.Unauthorized.String(), } } @@ -86,31 +98,39 @@ type AssetStatSet map[assetStatKey]*assetStatValue // Add updates the set with a trustline entry from a history archive snapshot. func (s AssetStatSet) Add(trustLine xdr.TrustLineEntry) error { - flags := trustLine.Flags - return s.AddDelta( - trustLine.Asset, - map[xdr.Uint32]int64{flags: int64(trustLine.Balance)}, - map[xdr.Uint32]int32{flags: 1}, - ) + var deltaBalances deltas + var deltaAccounts deltas + deltaBalances.AddByFlags(trustLine.Flags, int64(trustLine.Balance)) + deltaAccounts.AddByFlags(trustLine.Flags, 1) + + return s.AddDelta(trustLine.Asset, deltaBalances, deltaAccounts) } -// AddDelta adds a delta balance and delta accounts to a given asset trustline. -func (s AssetStatSet) AddDelta(asset xdr.Asset, deltaBalances map[xdr.Uint32]int64, deltaAccounts map[xdr.Uint32]int32) error { - accountsEmpty := true - for _, v := range deltaAccounts { - if v != 0 { - accountsEmpty = false - break - } - } - balancesEmpty := true - for _, v := range deltaBalances { - if v != 0 { - balancesEmpty = false - break - } +type deltas struct { + Authorized int64 + AuthorizedToMaintainLiabilities int64 + Unauthorized int64 + ClaimableBalances int64 +} + +func (d *deltas) AddByFlags(flags xdr.Uint32, amount int64) { + switch xdr.TrustLineFlags(flags) { + case xdr.TrustLineFlagsAuthorizedFlag: + d.Authorized = amount + case xdr.TrustLineFlagsAuthorizedToMaintainLiabilitiesFlag: + d.AuthorizedToMaintainLiabilities = amount + default: + d.Unauthorized = amount } - if accountsEmpty && balancesEmpty { +} + +func (d deltas) isEmpty() bool { + return d == deltas{} +} + +// AddDelta adds a delta balance and delta accounts to a given asset trustline. +func (s AssetStatSet) AddDelta(asset xdr.Asset, deltaBalances, deltaAccounts deltas) error { + if deltaBalances.isEmpty() && deltaBalances.isEmpty() { return nil } @@ -124,44 +144,27 @@ func (s AssetStatSet) AddDelta(asset xdr.Asset, deltaBalances map[xdr.Uint32]int current = &assetStatValue{assetStatKey: key, balances: assetStatBalances{ Authorized: big.NewInt(0), AuthorizedToMaintainLiabilities: big.NewInt(0), + ClaimableBalances: big.NewInt(0), Unauthorized: big.NewInt(0), }} s[key] = current } - for k, v := range deltaAccounts { - flags := xdr.TrustLineFlags(k) - if flags.IsAuthorized() { - current.accounts.Authorized += v - } else if flags.IsAuthorizedToMaintainLiabilitiesFlag() { - current.accounts.AuthorizedToMaintainLiabilities += v - } else { - current.accounts.Unauthorized += v - } - } + current.accounts.Authorized += int32(deltaAccounts.Authorized) + current.accounts.AuthorizedToMaintainLiabilities += int32(deltaAccounts.AuthorizedToMaintainLiabilities) + current.accounts.ClaimableBalances += int32(deltaAccounts.ClaimableBalances) + current.accounts.Unauthorized += int32(deltaAccounts.Unauthorized) - for k, v := range deltaBalances { - flags := xdr.TrustLineFlags(k) - bigV := big.NewInt(v) - if flags.IsAuthorized() { - current.balances.Authorized.Add(current.balances.Authorized, bigV) - } else if flags.IsAuthorizedToMaintainLiabilitiesFlag() { - current.balances.AuthorizedToMaintainLiabilities.Add(current.balances.AuthorizedToMaintainLiabilities, bigV) - } else { - current.balances.Unauthorized.Add(current.balances.Unauthorized, bigV) - } - } + current.balances.Authorized.Add(current.balances.Authorized, big.NewInt(deltaBalances.Authorized)) + current.balances.AuthorizedToMaintainLiabilities.Add(current.balances.AuthorizedToMaintainLiabilities, big.NewInt(deltaBalances.AuthorizedToMaintainLiabilities)) + current.balances.ClaimableBalances.Add(current.balances.ClaimableBalances, big.NewInt(deltaBalances.ClaimableBalances)) + current.balances.Unauthorized.Add(current.balances.Unauthorized, big.NewInt(deltaBalances.Unauthorized)) // Note: it's possible that after operations above: // numAccounts != 0 && amount == 0 (ex. two accounts send some of their assets to third account) // OR // numAccounts == 0 && amount != 0 (ex. issuer issued an asset) - if current.balances.Authorized.Cmp(big.NewInt(0)) == 0 && - current.balances.AuthorizedToMaintainLiabilities.Cmp(big.NewInt(0)) == 0 && - current.balances.Unauthorized.Cmp(big.NewInt(0)) == 0 && - current.accounts.Authorized == 0 && - current.accounts.AuthorizedToMaintainLiabilities == 0 && - current.accounts.Unauthorized == 0 { + if current.balances.IsZero() && current.accounts.IsZero() { delete(s, key) } diff --git a/services/horizon/internal/ingest/processors/asset_stats_set_test.go b/services/horizon/internal/ingest/processors/asset_stats_set_test.go index 211b6b9152..8083117f3f 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set_test.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set_test.go @@ -5,6 +5,8 @@ import ( "sort" "testing" + "github.com/stretchr/testify/assert" + "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/xdr" ) @@ -27,16 +29,12 @@ func TestEmptyAssetStatSet(t *testing.T) { func assertAllEquals(t *testing.T, set AssetStatSet, expected []history.ExpAssetStat) { all := set.All() - if len(all) != len(expected) { - t.Fatalf("expected list of %v asset stats but got %v", len(expected), all) - } + assert.Len(t, all, len(expected)) sort.Slice(all, func(i, j int) bool { return all[i].AssetCode < all[j].AssetCode }) for i, got := range all { - if expected[i] != got { - t.Fatalf("expected asset stat to be %v but got %v", expected[i], got) - } + assert.Equal(t, expected[i], got) } } @@ -54,6 +52,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { Authorized: "1", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "1", NumAccounts: 1, @@ -143,6 +142,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { Authorized: "3", AuthorizedToMaintainLiabilities: "4", Unauthorized: "5", + ClaimableBalances: "0", }, Amount: "3", NumAccounts: 1, @@ -159,6 +159,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { Authorized: "10", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "10", NumAccounts: 1, @@ -207,6 +208,7 @@ func TestOverflowAssetStatSet(t *testing.T) { Authorized: "9223372036854775807", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "9223372036854775807", NumAccounts: 1, @@ -240,6 +242,7 @@ func TestOverflowAssetStatSet(t *testing.T) { Authorized: "18446744073709551614", AuthorizedToMaintainLiabilities: "0", Unauthorized: "0", + ClaimableBalances: "0", }, Amount: "18446744073709551614", NumAccounts: 2, From 8f52d459f16f08a599fed027d8ddfa1bcab837b9 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Mar 2021 16:24:31 +0100 Subject: [PATCH 03/18] Fix two bugs --- .../horizon/internal/ingest/processors/asset_stats_set.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/horizon/internal/ingest/processors/asset_stats_set.go b/services/horizon/internal/ingest/processors/asset_stats_set.go index 8f4b7fe500..c712e5dbb3 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set.go @@ -116,11 +116,11 @@ type deltas struct { func (d *deltas) AddByFlags(flags xdr.Uint32, amount int64) { switch xdr.TrustLineFlags(flags) { case xdr.TrustLineFlagsAuthorizedFlag: - d.Authorized = amount + d.Authorized += amount case xdr.TrustLineFlagsAuthorizedToMaintainLiabilitiesFlag: - d.AuthorizedToMaintainLiabilities = amount + d.AuthorizedToMaintainLiabilities += amount default: - d.Unauthorized = amount + d.Unauthorized += amount } } @@ -130,7 +130,7 @@ func (d deltas) isEmpty() bool { // AddDelta adds a delta balance and delta accounts to a given asset trustline. func (s AssetStatSet) AddDelta(asset xdr.Asset, deltaBalances, deltaAccounts deltas) error { - if deltaBalances.isEmpty() && deltaBalances.isEmpty() { + if deltaBalances.isEmpty() && deltaAccounts.isEmpty() { return nil } From 2c9a7847190b2c935af59372e471651ba494edb8 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Mar 2021 17:31:42 +0100 Subject: [PATCH 04/18] Add support for claimable balance in processor also add tests --- .../processors/asset_stats_processor.go | 165 ++++++---- .../processors/asset_stats_processor_test.go | 296 ++++++++++++++++++ .../ingest/processors/asset_stats_set_test.go | 120 +++---- 3 files changed, 473 insertions(+), 108 deletions(-) diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor.go b/services/horizon/internal/ingest/processors/asset_stats_processor.go index 344e4726d4..c20644ac7b 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor.go @@ -45,12 +45,12 @@ func (p *AssetStatsProcessor) ProcessChange(change ingest.Change) error { if p.useLedgerEntryCache { return p.useCachedChange(change) } - return p.processClaimableBalanceChange(change) + return p.addNewClaimableBalance(change) case xdr.LedgerEntryTypeTrustline: if p.useLedgerEntryCache { return p.useCachedChange(change) } - return p.processTrustlineChange(change) + return p.addNewTrustline(change) default: return nil } @@ -72,38 +72,28 @@ func (p *AssetStatsProcessor) useCachedChange(change ingest.Change) error { return nil } -func (p *AssetStatsProcessor) processClaimableBalanceChange(change ingest.Change) error { - switch { - case change.Pre == nil && change.Post != nil: - // Created - // newCb := change.Post.Data.MustClaimableBalance() - // Figure out the asset - // Exclude XLM - // increase claimable balance stats - case change.Pre != nil && change.Post != nil: - // Not used now. Handled for the future, just in case. - // Figure out the asset - // Exclude XLM - case change.Pre != nil && change.Post == nil: - // Removed - // oldCb := change.Pre.Data.MustClaimableBalance() - // Figure out the asset - // Exclude XLM - // decrease claimable balance stats - default: - return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") +func (p *AssetStatsProcessor) addNewClaimableBalance(change ingest.Change) error { + if change.Pre != nil || change.Post == nil { + return errors.New("AssetStatsProcessor is in insert only mode") + } + + post := change.Post.Data.MustClaimableBalance() + + err := p.adjustAssetStatForClaimableBalance(nil, &post) + if err != nil { + return errors.Wrap(err, "Error adjusting asset stat") } return nil } -func (p *AssetStatsProcessor) processTrustlineChange(change ingest.Change) error { +func (p *AssetStatsProcessor) addNewTrustline(change ingest.Change) error { if change.Pre != nil || change.Post == nil { return errors.New("AssetStatsProcessor is in insert only mode") } postTrustLine := change.Post.Data.MustTrustLine() - err := p.adjustAssetStat(nil, &postTrustLine) + err := p.adjustAssetStatForTrustline(nil, &postTrustLine) if err != nil { return errors.Wrap(err, "Error adjusting asset stat") } @@ -111,6 +101,46 @@ func (p *AssetStatsProcessor) processTrustlineChange(change ingest.Change) error return nil } +func (p *AssetStatsProcessor) commitClaimableBalanceChange(change ingest.Change) error { + switch { + case change.Pre == nil && change.Post != nil: + // Created + post := change.Post.Data.MustClaimableBalance() + return p.adjustAssetStatForClaimableBalance(nil, &post) + case change.Pre != nil && change.Post != nil: + // Updated + pre := change.Pre.Data.MustClaimableBalance() + post := change.Post.Data.MustClaimableBalance() + return p.adjustAssetStatForClaimableBalance(&pre, &post) + case change.Pre != nil && change.Post == nil: + // Removed + pre := change.Pre.Data.MustClaimableBalance() + return p.adjustAssetStatForClaimableBalance(&pre, nil) + default: + return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") + } +} + +func (p *AssetStatsProcessor) commitTrustlineChange(change ingest.Change) error { + switch { + case change.Pre == nil && change.Post != nil: + // Created + post := change.Post.Data.MustTrustLine() + return p.adjustAssetStatForTrustline(nil, &post) + case change.Pre != nil && change.Post != nil: + // Updated + pre := change.Pre.Data.MustTrustLine() + post := change.Post.Data.MustTrustLine() + return p.adjustAssetStatForTrustline(&pre, &post) + case change.Pre != nil && change.Post == nil: + // Removed + pre := change.Pre.Data.MustTrustLine() + return p.adjustAssetStatForTrustline(&pre, nil) + default: + return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") + } +} + func (p *AssetStatsProcessor) Commit() error { if !p.useLedgerEntryCache { return p.assetStatsQ.InsertAssetStats(p.assetStatSet.All(), maxBatchSize) @@ -119,23 +149,13 @@ func (p *AssetStatsProcessor) Commit() error { changes := p.cache.GetChanges() for _, change := range changes { var err error - - switch { - case change.Pre == nil && change.Post != nil: - // Created - postTrustLine := change.Post.Data.MustTrustLine() - err = p.adjustAssetStat(nil, &postTrustLine) - case change.Pre != nil && change.Post != nil: - // Updated - preTrustLine := change.Pre.Data.MustTrustLine() - postTrustLine := change.Post.Data.MustTrustLine() - err = p.adjustAssetStat(&preTrustLine, &postTrustLine) - case change.Pre != nil && change.Post == nil: - // Removed - preTrustLine := change.Pre.Data.MustTrustLine() - err = p.adjustAssetStat(&preTrustLine, nil) + switch change.Type { + case xdr.LedgerEntryTypeClaimableBalance: + err = p.commitClaimableBalanceChange(change) + case xdr.LedgerEntryTypeTrustline: + err = p.commitTrustlineChange(change) default: - return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") + return errors.Errorf("Change type %v is unexpected", change.Type) } if err != nil { @@ -182,6 +202,13 @@ func (p *AssetStatsProcessor) Commit() error { delta.AssetCode, delta.AssetIssuer, )) + } else if delta.Accounts.ClaimableBalances < 0 { + return ingest.NewStateError(errors.Errorf( + "Claimable balance accounts negative but DB entry does not exist for asset: %s %s %s", + delta.AssetType, + delta.AssetCode, + delta.AssetIssuer, + )) } // Insert @@ -253,30 +280,60 @@ func (p *AssetStatsProcessor) Commit() error { return nil } -func (p *AssetStatsProcessor) adjustAssetStat( - preTrustline *xdr.TrustLineEntry, - postTrustline *xdr.TrustLineEntry, +func (p *AssetStatsProcessor) adjustAssetStatForTrustline( + pre *xdr.TrustLineEntry, + post *xdr.TrustLineEntry, ) error { deltaAccounts := deltas{} deltaBalances := deltas{} - if preTrustline == nil && postTrustline == nil { + if pre == nil && post == nil { return ingest.NewStateError(errors.New("both pre and post trustlines cannot be nil")) } - var trustline xdr.TrustLineEntry - if preTrustline != nil { - trustline = *preTrustline - deltaAccounts.AddByFlags(preTrustline.Flags, -1) - deltaBalances.AddByFlags(preTrustline.Flags, -int64(preTrustline.Balance)) + var asset xdr.Asset + if pre != nil { + asset = pre.Asset + deltaAccounts.AddByFlags(pre.Flags, -1) + deltaBalances.AddByFlags(pre.Flags, -int64(pre.Balance)) + } + if post != nil { + asset = post.Asset + deltaAccounts.AddByFlags(post.Flags, 1) + deltaBalances.AddByFlags(post.Flags, int64(post.Balance)) + } + + err := p.assetStatSet.AddDelta(asset, deltaBalances, deltaAccounts) + if err != nil { + return errors.Wrap(err, "error running AssetStatSet.AddDelta") + } + return nil +} + +func (p *AssetStatsProcessor) adjustAssetStatForClaimableBalance( + pre *xdr.ClaimableBalanceEntry, + post *xdr.ClaimableBalanceEntry, +) error { + deltaAccounts := deltas{} + deltaBalances := deltas{} + + if pre == nil && post == nil { + return ingest.NewStateError(errors.New("both pre and post claimable balances cannot be nil")) + } + + var asset xdr.Asset + if pre != nil { + asset = pre.Asset + deltaAccounts.ClaimableBalances-- + deltaBalances.ClaimableBalances -= int64(pre.Amount) } - if postTrustline != nil { - trustline = *postTrustline - deltaAccounts.AddByFlags(postTrustline.Flags, 1) - deltaBalances.AddByFlags(postTrustline.Flags, int64(postTrustline.Balance)) + if post != nil { + asset = post.Asset + deltaAccounts.ClaimableBalances++ + deltaBalances.ClaimableBalances += int64(post.Amount) } - err := p.assetStatSet.AddDelta(trustline.Asset, deltaBalances, deltaAccounts) + err := p.assetStatSet.AddDelta(asset, deltaBalances, deltaAccounts) if err != nil { return errors.Wrap(err, "error running AssetStatSet.AddDelta") } diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor_test.go b/services/horizon/internal/ingest/processors/asset_stats_processor_test.go index c3f30e1c5d..417d80119e 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor_test.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor_test.go @@ -128,6 +128,125 @@ func (s *AssetStatsProcessorTestSuiteLedger) TearDownTest() { s.mockQ.AssertExpectations(s.T()) } +func (s *AssetStatsProcessorTestSuiteLedger) TestInsertClaimableBalance() { + claimableBalance := xdr.ClaimableBalanceEntry{ + Asset: xdr.MustNewCreditAsset("EUR", trustLineIssuer.Address()), + Amount: 12, + BalanceId: xdr.ClaimableBalanceId{ + Type: 0, + V0: &xdr.Hash{1, 2, 3}, + }, + } + lastModifiedLedgerSeq := xdr.Uint32(1234) + + // test inserts + + err := s.processor.ProcessChange(ingest.Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &claimableBalance, + }, + }, + }) + s.Assert().NoError(err) + + usdClaimableBalance := xdr.ClaimableBalanceEntry{ + Asset: xdr.MustNewCreditAsset("USD", trustLineIssuer.Address()), + Amount: 46, + BalanceId: xdr.ClaimableBalanceId{ + Type: 0, + V0: &xdr.Hash{4, 5, 3}, + }, + } + + err = s.processor.ProcessChange(ingest.Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &usdClaimableBalance, + }, + }, + }) + s.Assert().NoError(err) + + // test updates + + updatedClaimableBalance := claimableBalance + updatedClaimableBalance.Amount *= 2 + + err = s.processor.ProcessChange(ingest.Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &claimableBalance, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &updatedClaimableBalance, + }, + }, + }) + s.Assert().NoError(err) + + s.mockQ.On("GetAssetStat", + xdr.AssetTypeAssetTypeCreditAlphanum4, + "EUR", + trustLineIssuer.Address(), + ).Return(history.ExpAssetStat{}, sql.ErrNoRows).Once() + s.mockQ.On("InsertAssetStat", history.ExpAssetStat{ + AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "EUR", + Accounts: history.ExpAssetStatAccounts{ + ClaimableBalances: 1, + }, + Balances: history.ExpAssetStatBalances{ + Authorized: "0", + AuthorizedToMaintainLiabilities: "0", + Unauthorized: "0", + ClaimableBalances: "24", + }, + Amount: "0", + NumAccounts: 0, + }).Return(int64(1), nil).Once() + + s.mockQ.On("GetAssetStat", + xdr.AssetTypeAssetTypeCreditAlphanum4, + "USD", + trustLineIssuer.Address(), + ).Return(history.ExpAssetStat{}, sql.ErrNoRows).Once() + s.mockQ.On("InsertAssetStat", history.ExpAssetStat{ + AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "USD", + Accounts: history.ExpAssetStatAccounts{ + ClaimableBalances: 1, + }, + Balances: history.ExpAssetStatBalances{ + Authorized: "0", + AuthorizedToMaintainLiabilities: "0", + Unauthorized: "0", + ClaimableBalances: "46", + }, + Amount: "0", + NumAccounts: 0, + }).Return(int64(1), nil).Once() + + s.Assert().NoError(s.processor.Commit()) +} + func (s *AssetStatsProcessorTestSuiteLedger) TestInsertTrustLine() { // should be ignored because it's not an trust line type err := s.processor.ProcessChange(ingest.Change{ @@ -282,6 +401,76 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestInsertTrustLine() { s.Assert().NoError(s.processor.Commit()) } +func (s *AssetStatsProcessorTestSuiteLedger) TestInsertClaimableBalanceAndTrustline() { + claimableBalance := xdr.ClaimableBalanceEntry{ + Asset: xdr.MustNewCreditAsset("EUR", trustLineIssuer.Address()), + Amount: 12, + BalanceId: xdr.ClaimableBalanceId{ + Type: 0, + V0: &xdr.Hash{1, 2, 3}, + }, + } + + trustLine := xdr.TrustLineEntry{ + AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), + Asset: xdr.MustNewCreditAsset("EUR", trustLineIssuer.Address()), + Balance: 9, + Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), + } + lastModifiedLedgerSeq := xdr.Uint32(1234) + + err := s.processor.ProcessChange(ingest.Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &claimableBalance, + }, + }, + }) + s.Assert().NoError(err) + + err = s.processor.ProcessChange(ingest.Change{ + Type: xdr.LedgerEntryTypeTrustline, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTrustline, + TrustLine: &trustLine, + }, + }, + }) + s.Assert().NoError(err) + + s.mockQ.On("GetAssetStat", + xdr.AssetTypeAssetTypeCreditAlphanum4, + "EUR", + trustLineIssuer.Address(), + ).Return(history.ExpAssetStat{}, sql.ErrNoRows).Once() + s.mockQ.On("InsertAssetStat", history.ExpAssetStat{ + AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "EUR", + Accounts: history.ExpAssetStatAccounts{ + ClaimableBalances: 1, + Authorized: 1, + }, + Balances: history.ExpAssetStatBalances{ + Authorized: "9", + AuthorizedToMaintainLiabilities: "0", + Unauthorized: "0", + ClaimableBalances: "12", + }, + Amount: "9", + NumAccounts: 1, + }).Return(int64(1), nil).Once() + + s.Assert().NoError(s.processor.Commit()) +} + func (s *AssetStatsProcessorTestSuiteLedger) TestUpdateTrustLine() { lastModifiedLedgerSeq := xdr.Uint32(1234) @@ -567,6 +756,113 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestUpdateTrustLineAuthorization() s.Assert().NoError(s.processor.Commit()) } +func (s *AssetStatsProcessorTestSuiteLedger) TestRemoveClaimableBalance() { + claimableBalance := xdr.ClaimableBalanceEntry{ + Asset: xdr.MustNewCreditAsset("EUR", trustLineIssuer.Address()), + Amount: 12, + BalanceId: xdr.ClaimableBalanceId{ + Type: 0, + V0: &xdr.Hash{1, 2, 3}, + }, + } + usdClaimableBalance := xdr.ClaimableBalanceEntry{ + Asset: xdr.MustNewCreditAsset("USD", trustLineIssuer.Address()), + Amount: 21, + BalanceId: xdr.ClaimableBalanceId{ + Type: 0, + V0: &xdr.Hash{4, 5, 6}, + }, + } + + err := s.processor.ProcessChange(ingest.Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &claimableBalance, + }, + }, + Post: nil, + }) + s.Assert().NoError(err) + + err = s.processor.ProcessChange(ingest.Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &usdClaimableBalance, + }, + }, + Post: nil, + }) + s.Assert().NoError(err) + + s.mockQ.On("GetAssetStat", + xdr.AssetTypeAssetTypeCreditAlphanum4, + "EUR", + trustLineIssuer.Address(), + ).Return(history.ExpAssetStat{ + AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "EUR", + Accounts: history.ExpAssetStatAccounts{ + ClaimableBalances: 1, + }, + Balances: history.ExpAssetStatBalances{ + Authorized: "0", + AuthorizedToMaintainLiabilities: "0", + Unauthorized: "0", + ClaimableBalances: "12", + }, + Amount: "0", + NumAccounts: 0, + }, nil).Once() + s.mockQ.On("RemoveAssetStat", + xdr.AssetTypeAssetTypeCreditAlphanum4, + "EUR", + trustLineIssuer.Address(), + ).Return(int64(1), nil).Once() + + s.mockQ.On("GetAssetStat", + xdr.AssetTypeAssetTypeCreditAlphanum4, + "USD", + trustLineIssuer.Address(), + ).Return(history.ExpAssetStat{ + AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "USD", + Accounts: history.ExpAssetStatAccounts{ + Unauthorized: 1, + ClaimableBalances: 1, + }, + Balances: history.ExpAssetStatBalances{ + Authorized: "0", + AuthorizedToMaintainLiabilities: "0", + Unauthorized: "0", + ClaimableBalances: "21", + }, + Amount: "0", + NumAccounts: 0, + }, nil).Once() + s.mockQ.On("UpdateAssetStat", history.ExpAssetStat{ + AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "USD", + Accounts: history.ExpAssetStatAccounts{Unauthorized: 1}, + Balances: history.ExpAssetStatBalances{ + Authorized: "0", + AuthorizedToMaintainLiabilities: "0", + Unauthorized: "0", + ClaimableBalances: "0", + }, + Amount: "0", + NumAccounts: 0, + }).Return(int64(1), nil).Once() + + s.Assert().NoError(s.processor.Commit()) +} + func (s *AssetStatsProcessorTestSuiteLedger) TestRemoveTrustLine() { authorizedTrustLine := xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), diff --git a/services/horizon/internal/ingest/processors/asset_stats_set_test.go b/services/horizon/internal/ingest/processors/asset_stats_set_test.go index 8083117f3f..b15ed13920 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set_test.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set_test.go @@ -58,26 +58,39 @@ func TestAddAndRemoveAssetStats(t *testing.T) { NumAccounts: 1, } - err := set.Add(xdr.TrustLineEntry{ - AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), - Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), - Balance: 1, - Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), - }) - if err != nil { - t.Fatalf("unexpected error %v", err) - } + assert.NoError( + t, + set.Add(xdr.TrustLineEntry{ + AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), + Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), + Balance: 1, + Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), + }), + ) assertAllEquals(t, set, []history.ExpAssetStat{eurAssetStat}) - err = set.Add(xdr.TrustLineEntry{ - AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), - Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), - Balance: 24, - Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), - }) - if err != nil { - t.Fatalf("unexpected error %v", err) - } + eurAssetStat.Accounts.ClaimableBalances++ + eurAssetStat.Balances.ClaimableBalances = "23" + assert.NoError( + t, + set.AddDelta( + xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), + deltas{ClaimableBalances: 23}, + deltas{ClaimableBalances: 1}, + ), + ) + + assertAllEquals(t, set, []history.ExpAssetStat{eurAssetStat}) + + assert.NoError( + t, + set.Add(xdr.TrustLineEntry{ + AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), + Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), + Balance: 24, + Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), + }), + ) eurAssetStat.Balances.Authorized = "25" eurAssetStat.Amount = "25" @@ -86,48 +99,47 @@ func TestAddAndRemoveAssetStats(t *testing.T) { assertAllEquals(t, set, []history.ExpAssetStat{eurAssetStat}) usd := "USD" - err = set.Add(xdr.TrustLineEntry{ - AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - Asset: xdr.MustNewCreditAsset(usd, trustLineIssuer.Address()), - Balance: 10, - Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), - }) - if err != nil { - t.Fatalf("unexpected error %v", err) - } + assert.NoError( + t, + set.Add(xdr.TrustLineEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Asset: xdr.MustNewCreditAsset(usd, trustLineIssuer.Address()), + Balance: 10, + Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), + }), + ) ether := "ETHER" - err = set.Add(xdr.TrustLineEntry{ - AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), - Balance: 3, - Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), - }) - if err != nil { - t.Fatalf("unexpected error %v", err) - } + assert.NoError( + t, + set.Add(xdr.TrustLineEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), + Balance: 3, + Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), + }), + ) // Add an authorized_to_maintain_liabilities trust line - err = set.Add(xdr.TrustLineEntry{ - AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), - Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), - Balance: 4, - Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedToMaintainLiabilitiesFlag), - }) - if err != nil { - t.Fatalf("unexpected error %v", err) - } + assert.NoError( + t, + set.Add(xdr.TrustLineEntry{ + AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), + Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), + Balance: 4, + Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedToMaintainLiabilitiesFlag), + }), + ) // Add an unauthorized trust line - err = set.Add(xdr.TrustLineEntry{ - AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), - Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), - Balance: 5, - }) - if err != nil { - t.Fatalf("unexpected error %v", err) - } - + assert.NoError( + t, + set.Add(xdr.TrustLineEntry{ + AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), + Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), + Balance: 5, + }), + ) expected := []history.ExpAssetStat{ { AssetType: xdr.AssetTypeAssetTypeCreditAlphanum12, From 6df55a8ea5415c55bfebe615ff306578ab6fdc25 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Mar 2021 17:44:25 +0100 Subject: [PATCH 05/18] Add API implementation and tests --- protocols/horizon/main.go | 2 ++ services/horizon/internal/actions/asset_test.go | 16 ++++++++++++++++ .../internal/resourceadapter/asset_stat.go | 5 +++++ .../internal/resourceadapter/asset_stat_test.go | 4 ++++ 4 files changed, 27 insertions(+) diff --git a/protocols/horizon/main.go b/protocols/horizon/main.go index deee9e9c70..3d9b32c0be 100644 --- a/protocols/horizon/main.go +++ b/protocols/horizon/main.go @@ -185,6 +185,7 @@ func (res AssetStat) PagingToken() string { type AssetStatBalances struct { Authorized string `json:"authorized"` AuthorizedToMaintainLiabilities string `json:"authorized_to_maintain_liabilities"` + ClaimableBalances string `json:"claimable_balances"` Unauthorized string `json:"unauthorized"` } @@ -192,6 +193,7 @@ type AssetStatBalances struct { type AssetStatAccounts struct { Authorized int32 `json:"authorized"` AuthorizedToMaintainLiabilities int32 `json:"authorized_to_maintain_liabilities"` + ClaimableBalances int32 `json:"claimable_balances"` Unauthorized int32 `json:"unauthorized"` } diff --git a/services/horizon/internal/actions/asset_test.go b/services/horizon/internal/actions/asset_test.go index 7663bd4e29..985b8655ba 100644 --- a/services/horizon/internal/actions/asset_test.go +++ b/services/horizon/internal/actions/asset_test.go @@ -129,11 +129,13 @@ func TestAssetStats(t *testing.T) { Authorized: 2, AuthorizedToMaintainLiabilities: 3, Unauthorized: 4, + ClaimableBalances: 1, }, Balances: history.ExpAssetStatBalances{ Authorized: "1", AuthorizedToMaintainLiabilities: "2", Unauthorized: "3", + ClaimableBalances: "10", }, Amount: "1", NumAccounts: 2, @@ -143,11 +145,13 @@ func TestAssetStats(t *testing.T) { Authorized: usdAssetStat.Accounts.Authorized, AuthorizedToMaintainLiabilities: usdAssetStat.Accounts.AuthorizedToMaintainLiabilities, Unauthorized: usdAssetStat.Accounts.Unauthorized, + ClaimableBalances: usdAssetStat.Accounts.ClaimableBalances, }, Balances: horizon.AssetStatBalances{ Authorized: "0.0000001", AuthorizedToMaintainLiabilities: "0.0000002", Unauthorized: "0.0000003", + ClaimableBalances: "0.0000010", }, Amount: "0.0000001", NumAccounts: usdAssetStat.NumAccounts, @@ -168,11 +172,13 @@ func TestAssetStats(t *testing.T) { Authorized: 1, AuthorizedToMaintainLiabilities: 2, Unauthorized: 3, + ClaimableBalances: 0, }, Balances: history.ExpAssetStatBalances{ Authorized: "23", AuthorizedToMaintainLiabilities: "46", Unauthorized: "92", + ClaimableBalances: "0", }, Amount: "23", NumAccounts: 1, @@ -182,11 +188,13 @@ func TestAssetStats(t *testing.T) { Authorized: etherAssetStat.Accounts.Authorized, AuthorizedToMaintainLiabilities: etherAssetStat.Accounts.AuthorizedToMaintainLiabilities, Unauthorized: etherAssetStat.Accounts.Unauthorized, + ClaimableBalances: etherAssetStat.Accounts.ClaimableBalances, }, Balances: horizon.AssetStatBalances{ Authorized: "0.0000023", AuthorizedToMaintainLiabilities: "0.0000046", Unauthorized: "0.0000092", + ClaimableBalances: "0.0000000", }, Amount: "0.0000023", NumAccounts: etherAssetStat.NumAccounts, @@ -207,11 +215,13 @@ func TestAssetStats(t *testing.T) { Authorized: 2, AuthorizedToMaintainLiabilities: 3, Unauthorized: 4, + ClaimableBalances: 0, }, Balances: history.ExpAssetStatBalances{ Authorized: "1", AuthorizedToMaintainLiabilities: "2", Unauthorized: "3", + ClaimableBalances: "0", }, Amount: "1", NumAccounts: 2, @@ -221,11 +231,13 @@ func TestAssetStats(t *testing.T) { Authorized: otherUSDAssetStat.Accounts.Authorized, AuthorizedToMaintainLiabilities: otherUSDAssetStat.Accounts.AuthorizedToMaintainLiabilities, Unauthorized: otherUSDAssetStat.Accounts.Unauthorized, + ClaimableBalances: otherUSDAssetStat.Accounts.ClaimableBalances, }, Balances: horizon.AssetStatBalances{ Authorized: "0.0000001", AuthorizedToMaintainLiabilities: "0.0000002", Unauthorized: "0.0000003", + ClaimableBalances: "0.0000000", }, Amount: "0.0000001", NumAccounts: otherUSDAssetStat.NumAccounts, @@ -248,11 +260,13 @@ func TestAssetStats(t *testing.T) { Authorized: 3, AuthorizedToMaintainLiabilities: 4, Unauthorized: 5, + ClaimableBalances: 0, }, Balances: history.ExpAssetStatBalances{ Authorized: "111", AuthorizedToMaintainLiabilities: "222", Unauthorized: "333", + ClaimableBalances: "0", }, Amount: "111", NumAccounts: 3, @@ -262,11 +276,13 @@ func TestAssetStats(t *testing.T) { Authorized: eurAssetStat.Accounts.Authorized, AuthorizedToMaintainLiabilities: eurAssetStat.Accounts.AuthorizedToMaintainLiabilities, Unauthorized: eurAssetStat.Accounts.Unauthorized, + ClaimableBalances: eurAssetStat.Accounts.ClaimableBalances, }, Balances: horizon.AssetStatBalances{ Authorized: "0.0000111", AuthorizedToMaintainLiabilities: "0.0000222", Unauthorized: "0.0000333", + ClaimableBalances: "0.0000000", }, Amount: "0.0000111", NumAccounts: eurAssetStat.NumAccounts, diff --git a/services/horizon/internal/resourceadapter/asset_stat.go b/services/horizon/internal/resourceadapter/asset_stat.go index 7b0735a71f..f3c228f4c7 100644 --- a/services/horizon/internal/resourceadapter/asset_stat.go +++ b/services/horizon/internal/resourceadapter/asset_stat.go @@ -67,5 +67,10 @@ func populateAssetStatBalances(res *protocol.AssetStatBalances, row history.ExpA return errors.Wrapf(err, "Invalid amount in PopulateAssetStatBalances: %q", row.Unauthorized) } + res.ClaimableBalances, err = amount.IntStringToAmount(row.ClaimableBalances) + if err != nil { + return errors.Wrapf(err, "Invalid amount in PopulateAssetStatBalances: %q", row.ClaimableBalances) + } + return nil } diff --git a/services/horizon/internal/resourceadapter/asset_stat_test.go b/services/horizon/internal/resourceadapter/asset_stat_test.go index 2acd29c157..9cf9582dec 100644 --- a/services/horizon/internal/resourceadapter/asset_stat_test.go +++ b/services/horizon/internal/resourceadapter/asset_stat_test.go @@ -20,11 +20,13 @@ func TestPopulateExpAssetStat(t *testing.T) { Authorized: 429, AuthorizedToMaintainLiabilities: 214, Unauthorized: 107, + ClaimableBalances: 12, }, Balances: history.ExpAssetStatBalances{ Authorized: "100000000000000000000", AuthorizedToMaintainLiabilities: "50000000000000000000", Unauthorized: "2500000000000000000", + ClaimableBalances: "1200000000000000000", }, Amount: "100000000000000000000", // 10T NumAccounts: 429, @@ -45,9 +47,11 @@ func TestPopulateExpAssetStat(t *testing.T) { assert.Equal(t, int32(429), res.Accounts.Authorized) assert.Equal(t, int32(214), res.Accounts.AuthorizedToMaintainLiabilities) assert.Equal(t, int32(107), res.Accounts.Unauthorized) + assert.Equal(t, int32(12), res.Accounts.ClaimableBalances) assert.Equal(t, "10000000000000.0000000", res.Balances.Authorized) assert.Equal(t, "5000000000000.0000000", res.Balances.AuthorizedToMaintainLiabilities) assert.Equal(t, "250000000000.0000000", res.Balances.Unauthorized) + assert.Equal(t, "120000000000.0000000", res.Balances.ClaimableBalances) assert.Equal(t, "10000000000000.0000000", res.Amount) assert.Equal(t, int32(429), res.NumAccounts) assert.Equal(t, horizon.AccountFlags{}, res.Flags) From 1c1b7b4c6bb6ac61087e081bf9c7b3871ae7a546 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Mar 2021 18:09:01 +0100 Subject: [PATCH 06/18] attempting to refactor --- services/horizon/internal/ingest/main.go | 3 ++- .../processors/asset_stats_processor.go | 16 +++++------ .../ingest/processors/asset_stats_set.go | 27 +++++++++++-------- .../ingest/processors/asset_stats_set_test.go | 26 +++++++++--------- services/horizon/internal/ingest/verify.go | 9 +++++-- .../horizon/internal/ingest/verify/main.go | 2 +- .../ingest/verify_range_state_test.go | 2 +- 7 files changed, 48 insertions(+), 37 deletions(-) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index a65db1273c..9df1afcefe 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -50,7 +50,8 @@ const ( // - 12: Trigger state rebuild due to `absTime` -> `abs_time` rename // in ClaimableBalances predicates. // - 13: Trigger state rebuild to include more than just authorized assets. - CurrentVersion = 13 + // - 14: Trigger state rebuild to include claimable balances in the asset stats processor. + CurrentVersion = 14 // MaxDBConnections is the size of the postgres connection pool dedicated to Horizon ingestion: // * Ledger ingestion, diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor.go b/services/horizon/internal/ingest/processors/asset_stats_processor.go index c20644ac7b..328e00b3fc 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor.go @@ -284,8 +284,8 @@ func (p *AssetStatsProcessor) adjustAssetStatForTrustline( pre *xdr.TrustLineEntry, post *xdr.TrustLineEntry, ) error { - deltaAccounts := deltas{} - deltaBalances := deltas{} + deltaAccounts := delta{} + deltaBalances := delta{} if pre == nil && post == nil { return ingest.NewStateError(errors.New("both pre and post trustlines cannot be nil")) @@ -303,9 +303,9 @@ func (p *AssetStatsProcessor) adjustAssetStatForTrustline( deltaBalances.AddByFlags(post.Flags, int64(post.Balance)) } - err := p.assetStatSet.AddDelta(asset, deltaBalances, deltaAccounts) + err := p.assetStatSet.addDelta(asset, deltaBalances, deltaAccounts) if err != nil { - return errors.Wrap(err, "error running AssetStatSet.AddDelta") + return errors.Wrap(err, "error running AssetStatSet.addDelta") } return nil } @@ -314,8 +314,8 @@ func (p *AssetStatsProcessor) adjustAssetStatForClaimableBalance( pre *xdr.ClaimableBalanceEntry, post *xdr.ClaimableBalanceEntry, ) error { - deltaAccounts := deltas{} - deltaBalances := deltas{} + deltaAccounts := delta{} + deltaBalances := delta{} if pre == nil && post == nil { return ingest.NewStateError(errors.New("both pre and post claimable balances cannot be nil")) @@ -333,9 +333,9 @@ func (p *AssetStatsProcessor) adjustAssetStatForClaimableBalance( deltaBalances.ClaimableBalances += int64(post.Amount) } - err := p.assetStatSet.AddDelta(asset, deltaBalances, deltaAccounts) + err := p.assetStatSet.addDelta(asset, deltaBalances, deltaAccounts) if err != nil { - return errors.Wrap(err, "error running AssetStatSet.AddDelta") + return errors.Wrap(err, "error running AssetStatSet.addDelta") } return nil } diff --git a/services/horizon/internal/ingest/processors/asset_stats_set.go b/services/horizon/internal/ingest/processors/asset_stats_set.go index c712e5dbb3..a5344909f0 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set.go @@ -96,24 +96,29 @@ func (value assetStatValue) ConvertToHistoryObject() history.ExpAssetStat { // AssetStatSet represents a collection of asset stats type AssetStatSet map[assetStatKey]*assetStatValue -// Add updates the set with a trustline entry from a history archive snapshot. -func (s AssetStatSet) Add(trustLine xdr.TrustLineEntry) error { - var deltaBalances deltas - var deltaAccounts deltas +// AddTrustline updates the set with a trustline entry from a history archive snapshot. +func (s AssetStatSet) AddTrustline(trustLine xdr.TrustLineEntry) error { + var deltaBalances delta + var deltaAccounts delta deltaBalances.AddByFlags(trustLine.Flags, int64(trustLine.Balance)) deltaAccounts.AddByFlags(trustLine.Flags, 1) - return s.AddDelta(trustLine.Asset, deltaBalances, deltaAccounts) + return s.addDelta(trustLine.Asset, deltaBalances, deltaAccounts) } -type deltas struct { +// AddClaimableBalance updates the set with a claimable balance entry from a history archive snapshot. +func (s AssetStatSet) AddClaimableBalance(cBalance xdr.ClaimableBalanceEntry) error { + return s.addDelta(cBalance.Asset, delta{ClaimableBalances: int64(cBalance.Amount)}, delta{ClaimableBalances: 1}) +} + +type delta struct { Authorized int64 AuthorizedToMaintainLiabilities int64 Unauthorized int64 ClaimableBalances int64 } -func (d *deltas) AddByFlags(flags xdr.Uint32, amount int64) { +func (d *delta) AddByFlags(flags xdr.Uint32, amount int64) { switch xdr.TrustLineFlags(flags) { case xdr.TrustLineFlagsAuthorizedFlag: d.Authorized += amount @@ -124,12 +129,12 @@ func (d *deltas) AddByFlags(flags xdr.Uint32, amount int64) { } } -func (d deltas) isEmpty() bool { - return d == deltas{} +func (d delta) isEmpty() bool { + return d == delta{} } -// AddDelta adds a delta balance and delta accounts to a given asset trustline. -func (s AssetStatSet) AddDelta(asset xdr.Asset, deltaBalances, deltaAccounts deltas) error { +// addDelta adds a delta balance and delta accounts to a given asset trustline. +func (s AssetStatSet) addDelta(asset xdr.Asset, deltaBalances, deltaAccounts delta) error { if deltaBalances.isEmpty() && deltaAccounts.isEmpty() { return nil } diff --git a/services/horizon/internal/ingest/processors/asset_stats_set_test.go b/services/horizon/internal/ingest/processors/asset_stats_set_test.go index b15ed13920..4703249e01 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set_test.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set_test.go @@ -60,7 +60,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { assert.NoError( t, - set.Add(xdr.TrustLineEntry{ + set.AddTrustline(xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: 1, @@ -73,10 +73,10 @@ func TestAddAndRemoveAssetStats(t *testing.T) { eurAssetStat.Balances.ClaimableBalances = "23" assert.NoError( t, - set.AddDelta( + set.addDelta( xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), - deltas{ClaimableBalances: 23}, - deltas{ClaimableBalances: 1}, + delta{ClaimableBalances: 23}, + delta{ClaimableBalances: 1}, ), ) @@ -84,7 +84,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { assert.NoError( t, - set.Add(xdr.TrustLineEntry{ + set.AddTrustline(xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: 24, @@ -101,7 +101,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { usd := "USD" assert.NoError( t, - set.Add(xdr.TrustLineEntry{ + set.AddTrustline(xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), Asset: xdr.MustNewCreditAsset(usd, trustLineIssuer.Address()), Balance: 10, @@ -112,7 +112,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { ether := "ETHER" assert.NoError( t, - set.Add(xdr.TrustLineEntry{ + set.AddTrustline(xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), Balance: 3, @@ -120,10 +120,10 @@ func TestAddAndRemoveAssetStats(t *testing.T) { }), ) - // Add an authorized_to_maintain_liabilities trust line + // AddTrustline an authorized_to_maintain_liabilities trust line assert.NoError( t, - set.Add(xdr.TrustLineEntry{ + set.AddTrustline(xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), Balance: 4, @@ -131,10 +131,10 @@ func TestAddAndRemoveAssetStats(t *testing.T) { }), ) - // Add an unauthorized trust line + // AddTrustline an unauthorized trust line assert.NoError( t, - set.Add(xdr.TrustLineEntry{ + set.AddTrustline(xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), Balance: 5, @@ -195,7 +195,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { func TestOverflowAssetStatSet(t *testing.T) { set := AssetStatSet{} eur := "EUR" - err := set.Add(xdr.TrustLineEntry{ + err := set.AddTrustline(xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: math.MaxInt64, @@ -229,7 +229,7 @@ func TestOverflowAssetStatSet(t *testing.T) { t.Fatalf("expected asset stat to be %v but got %v", eurAssetStat, all[0]) } - err = set.Add(xdr.TrustLineEntry{ + err = set.AddTrustline(xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: math.MaxInt64, diff --git a/services/horizon/internal/ingest/verify.go b/services/horizon/internal/ingest/verify.go index d6e1d2d0cd..b51fadddb1 100644 --- a/services/horizon/internal/ingest/verify.go +++ b/services/horizon/internal/ingest/verify.go @@ -27,7 +27,7 @@ const assetStatsBatchSize = 500 // check them. // There is a test that checks it, to fix it: update the actual `verifyState` // method instead of just updating this value! -const stateVerifierExpectedIngestionVersion = 13 +const stateVerifierExpectedIngestionVersion = 14 // verifyState is called as a go routine from pipeline post hook every 64 // ledgers. It checks if the state is correct. If another go routine is already @@ -539,7 +539,7 @@ func addTrustLinesToStateVerifier( if err := verifier.Write(entry); err != nil { return err } - if err := assetStats.Add(trustline); err != nil { + if err := assetStats.AddTrustline(trustline); err != nil { return ingest.NewStateError( errors.Wrap(err, "could not add trustline to asset stats"), ) @@ -601,6 +601,11 @@ func addClaimableBalanceToStateVerifier( if err := verifier.Write(entry); err != nil { return err } + if err := assetStats.AddClaimableBalance(cBalance); err != nil { + return ingest.NewStateError( + errors.Wrap(err, "could not add claimable balance to asset stats"), + ) + } } return nil diff --git a/services/horizon/internal/ingest/verify/main.go b/services/horizon/internal/ingest/verify/main.go index 941c81ddb5..e4951d49b3 100644 --- a/services/horizon/internal/ingest/verify/main.go +++ b/services/horizon/internal/ingest/verify/main.go @@ -20,7 +20,7 @@ import ( // that will be used for equality check. type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) -// StateVerifier verifies if ledger entries provided by Add method are the same +// StateVerifier verifies if ledger entries provided by AddTrustline method are the same // as in the checkpoint ledger entries provided by CheckpointChangeReader. // The algorithm works in the following way: // 0. Develop TransformFunction. It should remove all fields and objects not diff --git a/services/horizon/internal/ingest/verify_range_state_test.go b/services/horizon/internal/ingest/verify_range_state_test.go index b522267f55..b3ed2af418 100644 --- a/services/horizon/internal/ingest/verify_range_state_test.go +++ b/services/horizon/internal/ingest/verify_range_state_test.go @@ -440,7 +440,7 @@ func (s *VerifyRangeStateTestSuite) TestVerifyFailsWhenAssetStatsMismatch() { set := processors.AssetStatSet{} trustLineIssuer := xdr.MustAddress("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") - set.Add(xdr.TrustLineEntry{ + set.AddTrustline(xdr.TrustLineEntry{ AccountId: xdr.MustAddress(keypair.MustRandom().Address()), Balance: 123, Asset: xdr.MustNewCreditAsset("EUR", trustLineIssuer.Address()), From af0ddd5805b06487cd0fb2930898de8215996990 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Mar 2021 18:27:03 +0100 Subject: [PATCH 07/18] Handle special case of native asset --- .../processors/asset_stats_processor.go | 4 ++++ .../processors/asset_stats_processor_test.go | 22 +++++++++++++++++++ .../ingest/processors/asset_stats_set.go | 4 ++++ .../ingest/processors/asset_stats_set_test.go | 12 ++++++++++ 4 files changed, 42 insertions(+) diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor.go b/services/horizon/internal/ingest/processors/asset_stats_processor.go index 328e00b3fc..a37d570d9b 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor.go @@ -333,6 +333,10 @@ func (p *AssetStatsProcessor) adjustAssetStatForClaimableBalance( deltaBalances.ClaimableBalances += int64(post.Amount) } + if asset.Type == xdr.AssetTypeAssetTypeNative { + return nil + } + err := p.assetStatSet.addDelta(asset, deltaBalances, deltaAccounts) if err != nil { return errors.Wrap(err, "error running AssetStatSet.addDelta") diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor_test.go b/services/horizon/internal/ingest/processors/asset_stats_processor_test.go index 417d80119e..53c15d9c3a 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor_test.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor_test.go @@ -137,6 +137,15 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestInsertClaimableBalance() { V0: &xdr.Hash{1, 2, 3}, }, } + + nativeClaimableBalance := xdr.ClaimableBalanceEntry{ + Asset: xdr.MustNewNativeAsset(), + Amount: 100000000, + BalanceId: xdr.ClaimableBalanceId{ + Type: 0, + V0: &xdr.Hash{1, 2, 43}, + }, + } lastModifiedLedgerSeq := xdr.Uint32(1234) // test inserts @@ -154,6 +163,19 @@ func (s *AssetStatsProcessorTestSuiteLedger) TestInsertClaimableBalance() { }) s.Assert().NoError(err) + err = s.processor.ProcessChange(ingest.Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &nativeClaimableBalance, + }, + }, + }) + s.Assert().NoError(err) + usdClaimableBalance := xdr.ClaimableBalanceEntry{ Asset: xdr.MustNewCreditAsset("USD", trustLineIssuer.Address()), Amount: 46, diff --git a/services/horizon/internal/ingest/processors/asset_stats_set.go b/services/horizon/internal/ingest/processors/asset_stats_set.go index a5344909f0..8278e2dd2c 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set.go @@ -108,6 +108,10 @@ func (s AssetStatSet) AddTrustline(trustLine xdr.TrustLineEntry) error { // AddClaimableBalance updates the set with a claimable balance entry from a history archive snapshot. func (s AssetStatSet) AddClaimableBalance(cBalance xdr.ClaimableBalanceEntry) error { + if cBalance.Asset.Type == xdr.AssetTypeAssetTypeNative { + return nil + } + return s.addDelta(cBalance.Asset, delta{ClaimableBalances: int64(cBalance.Amount)}, delta{ClaimableBalances: 1}) } diff --git a/services/horizon/internal/ingest/processors/asset_stats_set_test.go b/services/horizon/internal/ingest/processors/asset_stats_set_test.go index 4703249e01..3470fecc48 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set_test.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set_test.go @@ -38,6 +38,18 @@ func assertAllEquals(t *testing.T, set AssetStatSet, expected []history.ExpAsset } } +func TestAddNativeClaimableBalance(t *testing.T) { + set := AssetStatSet{} + claimableBalance := xdr.ClaimableBalanceEntry{ + BalanceId: xdr.ClaimableBalanceId{}, + Claimants: nil, + Asset: xdr.MustNewNativeAsset(), + Amount: 100, + } + assert.NoError(t, set.AddClaimableBalance(claimableBalance)) + assert.Empty(t, set.All()) +} + func TestAddAndRemoveAssetStats(t *testing.T) { set := AssetStatSet{} eur := "EUR" From e063933ab53c6b707ed71ecd498530860ec056e2 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Fri, 26 Mar 2021 17:36:51 +0000 Subject: [PATCH 08/18] Update docs --- .../docs/reference/endpoints/assets-all.md | 16 +++++++++++----- .../internal/docs/reference/resources/asset.md | 10 ++++++---- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/services/horizon/internal/docs/reference/endpoints/assets-all.md b/services/horizon/internal/docs/reference/endpoints/assets-all.md index 9c6463d744..37c18a158d 100644 --- a/services/horizon/internal/docs/reference/endpoints/assets-all.md +++ b/services/horizon/internal/docs/reference/endpoints/assets-all.md @@ -81,12 +81,14 @@ If called normally this endpoint responds with a [page](../resources/page.md) of "accounts": { "authorized": 2126, "authorized_to_maintain_liabilities": 32, - "unauthorized": 5 + "unauthorized": 5, + "claimable_balances": 18 }, "balances": { "authorized": "10000.0000000", "authorized_to_maintain_liabilities": "3000.0000000", - "unauthorized": "4000.0000000" + "unauthorized": "4000.0000000", + "claimable_balances": "2380.0000000" }, "flags": { "auth_required": true, @@ -106,12 +108,14 @@ If called normally this endpoint responds with a [page](../resources/page.md) of "accounts": { "authorized": 32, "authorized_to_maintain_liabilities": 124, - "unauthorized": 6 + "unauthorized": 6, + "claimable_balances": 18 }, "balances": { "authorized": "5000.0000000", "authorized_to_maintain_liabilities": "8000.0000000", "unauthorized": "2000.0000000" + "claimable_balances": "1200.0000000" }, "flags": { "auth_required": false, @@ -131,12 +135,14 @@ If called normally this endpoint responds with a [page](../resources/page.md) of "accounts": { "authorized": 91547871, "authorized_to_maintain_liabilities": 45773935, - "unauthorized": 22886967 + "unauthorized": 22886967, + "claimable_balances": 11443483 }, "balances": { "authorized": "1000000000.0000000", "authorized_to_maintain_liabilities": "500000000.0000000", - "unauthorized": "250000000.0000000" + "unauthorized": "250000000.0000000", + "claimable_balances": "12500000.0000000" }, "flags": { "auth_required": false, diff --git a/services/horizon/internal/docs/reference/resources/asset.md b/services/horizon/internal/docs/reference/resources/asset.md index 35bec0d292..730b0de73a 100644 --- a/services/horizon/internal/docs/reference/resources/asset.md +++ b/services/horizon/internal/docs/reference/resources/asset.md @@ -16,8 +16,8 @@ To learn more about the concept of assets in the Stellar network, take a look at | asset_type | string | The type of this asset: "credit_alphanum4", or "credit_alphanum12". | | asset_code | string | The code of this asset. | | asset_issuer | string | The issuer of this asset. | -| accounts | object | The number of accounts holding this asset, summarized by each state of the trust line flags. | -| balances | object | The number of units of credit issued, summarized by each state of the trust line flags. | +| accounts | object | The number of accounts and claimable balances holding this asset. Accounts are summarized by each state of the trust line flags. | +| balances | object | The number of units of credit issued, summarized by each state of the trust line flags, or if they are in a claimable balance. | | flags | object | The flags denote the enabling/disabling of certain asset issuer privileges. | | paging_token | string | A [paging token](./page.md) suitable for use as the `cursor` parameter to transaction collection resources. | @@ -49,12 +49,14 @@ To learn more about the concept of assets in the Stellar network, take a look at "accounts": { "authorized": 91547871, "authorized_to_maintain_liabilities": 45773935, - "unauthorized": 22886967 + "unauthorized": 22886967, + "claimable_balances": 11443483 }, "balances": { "authorized": "100.0000000", "authorized_to_maintain_liabilities": "50.0000000", - "unauthorized": "25.0000000" + "unauthorized": "25.0000000", + "claimable_balances": "12.5000000" }, "flags": { "auth_required": false, From 16201189a89de8da795c6036df41919c8ccd45f8 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Fri, 26 Mar 2021 18:35:22 +0000 Subject: [PATCH 09/18] update CHANGELOG --- services/horizon/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 104109aa4c..900481d20c 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -5,6 +5,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased +* Add `claimable_balances` to asset stat summaries at `/assets` ([3502](https://github.com/stellar/go/pull/3502)). * Add an endpoint which determines if Horizon is healthy enough to receive traffic ([3435](https://github.com/stellar/go/pull/3435)). * Sanitize route regular expressions for Prometheus metrics ([3459](https://github.com/stellar/go/pull/3459)). * Add asset stat summaries per trust-line flag category ([3454](https://github.com/stellar/go/pull/3454)). From a77aa02e609d35c7f9939e03f902edfddfef3463 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Fri, 26 Mar 2021 18:39:15 +0000 Subject: [PATCH 10/18] Fixing test --- services/horizon/internal/actions/asset_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/horizon/internal/actions/asset_test.go b/services/horizon/internal/actions/asset_test.go index 985b8655ba..f324d3d562 100644 --- a/services/horizon/internal/actions/asset_test.go +++ b/services/horizon/internal/actions/asset_test.go @@ -435,11 +435,13 @@ func TestAssetStatsIssuerDoesNotExist(t *testing.T) { Authorized: 2, AuthorizedToMaintainLiabilities: 3, Unauthorized: 4, + ClaimableBalances: 0, }, Balances: history.ExpAssetStatBalances{ Authorized: "1", AuthorizedToMaintainLiabilities: "2", Unauthorized: "3", + ClaimableBalances: "0", }, Amount: "1", NumAccounts: 2, @@ -457,11 +459,13 @@ func TestAssetStatsIssuerDoesNotExist(t *testing.T) { Authorized: 2, AuthorizedToMaintainLiabilities: 3, Unauthorized: 4, + ClaimableBalances: 0, }, Balances: horizon.AssetStatBalances{ Authorized: "0.0000001", AuthorizedToMaintainLiabilities: "0.0000002", Unauthorized: "0.0000003", + ClaimableBalances: "0.0000000", }, Amount: "0.0000001", NumAccounts: usdAssetStat.NumAccounts, From 608e36769f6809140f3d146b253a9017441e6952 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Fri, 26 Mar 2021 18:52:02 +0000 Subject: [PATCH 11/18] rename function --- .../internal/ingest/processors/asset_stats_processor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor.go b/services/horizon/internal/ingest/processors/asset_stats_processor.go index a37d570d9b..c64c2aaf0a 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor.go @@ -43,12 +43,12 @@ func (p *AssetStatsProcessor) ProcessChange(change ingest.Change) error { switch change.Type { case xdr.LedgerEntryTypeClaimableBalance: if p.useLedgerEntryCache { - return p.useCachedChange(change) + return p.addToCache(change) } return p.addNewClaimableBalance(change) case xdr.LedgerEntryTypeTrustline: if p.useLedgerEntryCache { - return p.useCachedChange(change) + return p.addToCache(change) } return p.addNewTrustline(change) default: @@ -56,7 +56,7 @@ func (p *AssetStatsProcessor) ProcessChange(change ingest.Change) error { } } -func (p *AssetStatsProcessor) useCachedChange(change ingest.Change) error { +func (p *AssetStatsProcessor) addToCache(change ingest.Change) error { err := p.cache.AddChange(change) if err != nil { return errors.Wrap(err, "error adding to ledgerCache") From cd8bab79172c83923fb55efe0dccd70aa037dfb4 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Mar 2021 20:24:44 +0100 Subject: [PATCH 12/18] Refactor AssetStatSet --- .../processors/asset_stats_processor.go | 125 +++--------------- .../ingest/processors/asset_stats_set.go | 88 +++++++++--- .../ingest/processors/asset_stats_set_test.go | 18 +-- services/horizon/internal/ingest/verify.go | 4 +- .../ingest/verify_range_state_test.go | 2 +- 5 files changed, 98 insertions(+), 139 deletions(-) diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor.go b/services/horizon/internal/ingest/processors/asset_stats_processor.go index c64c2aaf0a..ed5c789e3e 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor.go @@ -40,17 +40,21 @@ func (p *AssetStatsProcessor) reset() { } func (p *AssetStatsProcessor) ProcessChange(change ingest.Change) error { + if change.Type != xdr.LedgerEntryTypeClaimableBalance && change.Type != xdr.LedgerEntryTypeTrustline { + return nil + } + if p.useLedgerEntryCache { + return p.addToCache(change) + } + if change.Pre != nil || change.Post == nil { + return errors.New("AssetStatsProcessor is in insert only mode") + } + switch change.Type { case xdr.LedgerEntryTypeClaimableBalance: - if p.useLedgerEntryCache { - return p.addToCache(change) - } - return p.addNewClaimableBalance(change) + return p.assetStatSet.AddClaimableBalance(nil, change.Post.Data.ClaimableBalance) case xdr.LedgerEntryTypeTrustline: - if p.useLedgerEntryCache { - return p.addToCache(change) - } - return p.addNewTrustline(change) + return p.assetStatSet.AddTrustline(nil, change.Post.Data.TrustLine) default: return nil } @@ -72,50 +76,21 @@ func (p *AssetStatsProcessor) addToCache(change ingest.Change) error { return nil } -func (p *AssetStatsProcessor) addNewClaimableBalance(change ingest.Change) error { - if change.Pre != nil || change.Post == nil { - return errors.New("AssetStatsProcessor is in insert only mode") - } - - post := change.Post.Data.MustClaimableBalance() - - err := p.adjustAssetStatForClaimableBalance(nil, &post) - if err != nil { - return errors.Wrap(err, "Error adjusting asset stat") - } - - return nil -} - -func (p *AssetStatsProcessor) addNewTrustline(change ingest.Change) error { - if change.Pre != nil || change.Post == nil { - return errors.New("AssetStatsProcessor is in insert only mode") - } - - postTrustLine := change.Post.Data.MustTrustLine() - err := p.adjustAssetStatForTrustline(nil, &postTrustLine) - if err != nil { - return errors.Wrap(err, "Error adjusting asset stat") - } - - return nil -} - func (p *AssetStatsProcessor) commitClaimableBalanceChange(change ingest.Change) error { switch { case change.Pre == nil && change.Post != nil: // Created post := change.Post.Data.MustClaimableBalance() - return p.adjustAssetStatForClaimableBalance(nil, &post) + return p.assetStatSet.AddClaimableBalance(nil, &post) case change.Pre != nil && change.Post != nil: // Updated pre := change.Pre.Data.MustClaimableBalance() post := change.Post.Data.MustClaimableBalance() - return p.adjustAssetStatForClaimableBalance(&pre, &post) + return p.assetStatSet.AddClaimableBalance(&pre, &post) case change.Pre != nil && change.Post == nil: // Removed pre := change.Pre.Data.MustClaimableBalance() - return p.adjustAssetStatForClaimableBalance(&pre, nil) + return p.assetStatSet.AddClaimableBalance(&pre, nil) default: return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") } @@ -126,16 +101,16 @@ func (p *AssetStatsProcessor) commitTrustlineChange(change ingest.Change) error case change.Pre == nil && change.Post != nil: // Created post := change.Post.Data.MustTrustLine() - return p.adjustAssetStatForTrustline(nil, &post) + return p.assetStatSet.AddTrustline(nil, &post) case change.Pre != nil && change.Post != nil: // Updated pre := change.Pre.Data.MustTrustLine() post := change.Post.Data.MustTrustLine() - return p.adjustAssetStatForTrustline(&pre, &post) + return p.assetStatSet.AddTrustline(&pre, &post) case change.Pre != nil && change.Post == nil: // Removed pre := change.Pre.Data.MustTrustLine() - return p.adjustAssetStatForTrustline(&pre, nil) + return p.assetStatSet.AddTrustline(&pre, nil) default: return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") } @@ -279,67 +254,3 @@ func (p *AssetStatsProcessor) Commit() error { return nil } - -func (p *AssetStatsProcessor) adjustAssetStatForTrustline( - pre *xdr.TrustLineEntry, - post *xdr.TrustLineEntry, -) error { - deltaAccounts := delta{} - deltaBalances := delta{} - - if pre == nil && post == nil { - return ingest.NewStateError(errors.New("both pre and post trustlines cannot be nil")) - } - - var asset xdr.Asset - if pre != nil { - asset = pre.Asset - deltaAccounts.AddByFlags(pre.Flags, -1) - deltaBalances.AddByFlags(pre.Flags, -int64(pre.Balance)) - } - if post != nil { - asset = post.Asset - deltaAccounts.AddByFlags(post.Flags, 1) - deltaBalances.AddByFlags(post.Flags, int64(post.Balance)) - } - - err := p.assetStatSet.addDelta(asset, deltaBalances, deltaAccounts) - if err != nil { - return errors.Wrap(err, "error running AssetStatSet.addDelta") - } - return nil -} - -func (p *AssetStatsProcessor) adjustAssetStatForClaimableBalance( - pre *xdr.ClaimableBalanceEntry, - post *xdr.ClaimableBalanceEntry, -) error { - deltaAccounts := delta{} - deltaBalances := delta{} - - if pre == nil && post == nil { - return ingest.NewStateError(errors.New("both pre and post claimable balances cannot be nil")) - } - - var asset xdr.Asset - if pre != nil { - asset = pre.Asset - deltaAccounts.ClaimableBalances-- - deltaBalances.ClaimableBalances -= int64(pre.Amount) - } - if post != nil { - asset = post.Asset - deltaAccounts.ClaimableBalances++ - deltaBalances.ClaimableBalances += int64(post.Amount) - } - - if asset.Type == xdr.AssetTypeAssetTypeNative { - return nil - } - - err := p.assetStatSet.addDelta(asset, deltaBalances, deltaAccounts) - if err != nil { - return errors.Wrap(err, "error running AssetStatSet.addDelta") - } - return nil -} diff --git a/services/horizon/internal/ingest/processors/asset_stats_set.go b/services/horizon/internal/ingest/processors/asset_stats_set.go index 8278e2dd2c..236c08bf7f 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set.go @@ -1,6 +1,7 @@ package processors import ( + "github.com/stellar/go/ingest" "math/big" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -96,25 +97,6 @@ func (value assetStatValue) ConvertToHistoryObject() history.ExpAssetStat { // AssetStatSet represents a collection of asset stats type AssetStatSet map[assetStatKey]*assetStatValue -// AddTrustline updates the set with a trustline entry from a history archive snapshot. -func (s AssetStatSet) AddTrustline(trustLine xdr.TrustLineEntry) error { - var deltaBalances delta - var deltaAccounts delta - deltaBalances.AddByFlags(trustLine.Flags, int64(trustLine.Balance)) - deltaAccounts.AddByFlags(trustLine.Flags, 1) - - return s.addDelta(trustLine.Asset, deltaBalances, deltaAccounts) -} - -// AddClaimableBalance updates the set with a claimable balance entry from a history archive snapshot. -func (s AssetStatSet) AddClaimableBalance(cBalance xdr.ClaimableBalanceEntry) error { - if cBalance.Asset.Type == xdr.AssetTypeAssetTypeNative { - return nil - } - - return s.addDelta(cBalance.Asset, delta{ClaimableBalances: int64(cBalance.Amount)}, delta{ClaimableBalances: 1}) -} - type delta struct { Authorized int64 AuthorizedToMaintainLiabilities int64 @@ -122,7 +104,7 @@ type delta struct { ClaimableBalances int64 } -func (d *delta) AddByFlags(flags xdr.Uint32, amount int64) { +func (d *delta) addByFlags(flags xdr.Uint32, amount int64) { switch xdr.TrustLineFlags(flags) { case xdr.TrustLineFlagsAuthorizedFlag: d.Authorized += amount @@ -180,6 +162,72 @@ func (s AssetStatSet) addDelta(asset xdr.Asset, deltaBalances, deltaAccounts del return nil } +// AddTrustline updates the set with a trustline entry from a history archive snapshot. +func (s AssetStatSet) AddTrustline( + pre *xdr.TrustLineEntry, + post *xdr.TrustLineEntry, +) error { + deltaAccounts := delta{} + deltaBalances := delta{} + + if pre == nil && post == nil { + return ingest.NewStateError(errors.New("both pre and post trustlines cannot be nil")) + } + + var asset xdr.Asset + if pre != nil { + asset = pre.Asset + deltaAccounts.addByFlags(pre.Flags, -1) + deltaBalances.addByFlags(pre.Flags, -int64(pre.Balance)) + } + if post != nil { + asset = post.Asset + deltaAccounts.addByFlags(post.Flags, 1) + deltaBalances.addByFlags(post.Flags, int64(post.Balance)) + } + + err := s.addDelta(asset, deltaBalances, deltaAccounts) + if err != nil { + return errors.Wrap(err, "error running AssetStatSet.addDelta") + } + return nil +} + +// AddClaimableBalance updates the set with a claimable balance entry from a history archive snapshot. +func (s AssetStatSet) AddClaimableBalance( + pre *xdr.ClaimableBalanceEntry, + post *xdr.ClaimableBalanceEntry, +) error { + deltaAccounts := delta{} + deltaBalances := delta{} + + if pre == nil && post == nil { + return ingest.NewStateError(errors.New("both pre and post claimable balances cannot be nil")) + } + + var asset xdr.Asset + if pre != nil { + asset = pre.Asset + deltaAccounts.ClaimableBalances-- + deltaBalances.ClaimableBalances -= int64(pre.Amount) + } + if post != nil { + asset = post.Asset + deltaAccounts.ClaimableBalances++ + deltaBalances.ClaimableBalances += int64(post.Amount) + } + + if asset.Type == xdr.AssetTypeAssetTypeNative { + return nil + } + + err := s.addDelta(asset, deltaBalances, deltaAccounts) + if err != nil { + return errors.Wrap(err, "error running AssetStatSet.addDelta") + } + return nil +} + // Remove deletes an asset stat from the set func (s AssetStatSet) Remove(assetType xdr.AssetType, assetCode string, assetIssuer string) (history.ExpAssetStat, bool) { key := assetStatKey{assetType: assetType, assetIssuer: assetIssuer, assetCode: assetCode} diff --git a/services/horizon/internal/ingest/processors/asset_stats_set_test.go b/services/horizon/internal/ingest/processors/asset_stats_set_test.go index 3470fecc48..f5232dce5d 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set_test.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set_test.go @@ -46,7 +46,7 @@ func TestAddNativeClaimableBalance(t *testing.T) { Asset: xdr.MustNewNativeAsset(), Amount: 100, } - assert.NoError(t, set.AddClaimableBalance(claimableBalance)) + assert.NoError(t, set.AddClaimableBalance(nil, &claimableBalance)) assert.Empty(t, set.All()) } @@ -72,7 +72,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { assert.NoError( t, - set.AddTrustline(xdr.TrustLineEntry{ + set.AddTrustline(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: 1, @@ -96,7 +96,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { assert.NoError( t, - set.AddTrustline(xdr.TrustLineEntry{ + set.AddTrustline(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: 24, @@ -113,7 +113,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { usd := "USD" assert.NoError( t, - set.AddTrustline(xdr.TrustLineEntry{ + set.AddTrustline(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), Asset: xdr.MustNewCreditAsset(usd, trustLineIssuer.Address()), Balance: 10, @@ -124,7 +124,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { ether := "ETHER" assert.NoError( t, - set.AddTrustline(xdr.TrustLineEntry{ + set.AddTrustline(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), Balance: 3, @@ -135,7 +135,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { // AddTrustline an authorized_to_maintain_liabilities trust line assert.NoError( t, - set.AddTrustline(xdr.TrustLineEntry{ + set.AddTrustline(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), Balance: 4, @@ -146,7 +146,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { // AddTrustline an unauthorized trust line assert.NoError( t, - set.AddTrustline(xdr.TrustLineEntry{ + set.AddTrustline(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), Balance: 5, @@ -207,7 +207,7 @@ func TestAddAndRemoveAssetStats(t *testing.T) { func TestOverflowAssetStatSet(t *testing.T) { set := AssetStatSet{} eur := "EUR" - err := set.AddTrustline(xdr.TrustLineEntry{ + err := set.AddTrustline(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: math.MaxInt64, @@ -241,7 +241,7 @@ func TestOverflowAssetStatSet(t *testing.T) { t.Fatalf("expected asset stat to be %v but got %v", eurAssetStat, all[0]) } - err = set.AddTrustline(xdr.TrustLineEntry{ + err = set.AddTrustline(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: math.MaxInt64, diff --git a/services/horizon/internal/ingest/verify.go b/services/horizon/internal/ingest/verify.go index b51fadddb1..cf9ab3ee07 100644 --- a/services/horizon/internal/ingest/verify.go +++ b/services/horizon/internal/ingest/verify.go @@ -539,7 +539,7 @@ func addTrustLinesToStateVerifier( if err := verifier.Write(entry); err != nil { return err } - if err := assetStats.AddTrustline(trustline); err != nil { + if err := assetStats.AddTrustline(nil, &trustline); err != nil { return ingest.NewStateError( errors.Wrap(err, "could not add trustline to asset stats"), ) @@ -601,7 +601,7 @@ func addClaimableBalanceToStateVerifier( if err := verifier.Write(entry); err != nil { return err } - if err := assetStats.AddClaimableBalance(cBalance); err != nil { + if err := assetStats.AddClaimableBalance(nil, &cBalance); err != nil { return ingest.NewStateError( errors.Wrap(err, "could not add claimable balance to asset stats"), ) diff --git a/services/horizon/internal/ingest/verify_range_state_test.go b/services/horizon/internal/ingest/verify_range_state_test.go index b3ed2af418..35882690f0 100644 --- a/services/horizon/internal/ingest/verify_range_state_test.go +++ b/services/horizon/internal/ingest/verify_range_state_test.go @@ -440,7 +440,7 @@ func (s *VerifyRangeStateTestSuite) TestVerifyFailsWhenAssetStatsMismatch() { set := processors.AssetStatSet{} trustLineIssuer := xdr.MustAddress("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") - set.AddTrustline(xdr.TrustLineEntry{ + set.AddTrustline(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress(keypair.MustRandom().Address()), Balance: 123, Asset: xdr.MustNewCreditAsset("EUR", trustLineIssuer.Address()), From d7247afba822a11972bc50bb1d0fdc807d4f0f91 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Mar 2021 20:32:10 +0100 Subject: [PATCH 13/18] improve commit functions --- .../processors/asset_stats_processor.go | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor.go b/services/horizon/internal/ingest/processors/asset_stats_processor.go index ed5c789e3e..0fc71f4722 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor.go @@ -80,17 +80,13 @@ func (p *AssetStatsProcessor) commitClaimableBalanceChange(change ingest.Change) switch { case change.Pre == nil && change.Post != nil: // Created - post := change.Post.Data.MustClaimableBalance() - return p.assetStatSet.AddClaimableBalance(nil, &post) + return p.assetStatSet.AddClaimableBalance(nil, change.Post.Data.ClaimableBalance) case change.Pre != nil && change.Post != nil: // Updated - pre := change.Pre.Data.MustClaimableBalance() - post := change.Post.Data.MustClaimableBalance() - return p.assetStatSet.AddClaimableBalance(&pre, &post) + return p.assetStatSet.AddClaimableBalance(change.Pre.Data.ClaimableBalance, change.Post.Data.ClaimableBalance) case change.Pre != nil && change.Post == nil: // Removed - pre := change.Pre.Data.MustClaimableBalance() - return p.assetStatSet.AddClaimableBalance(&pre, nil) + return p.assetStatSet.AddClaimableBalance(change.Pre.Data.ClaimableBalance, nil) default: return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") } @@ -100,17 +96,13 @@ func (p *AssetStatsProcessor) commitTrustlineChange(change ingest.Change) error switch { case change.Pre == nil && change.Post != nil: // Created - post := change.Post.Data.MustTrustLine() - return p.assetStatSet.AddTrustline(nil, &post) + return p.assetStatSet.AddTrustline(nil, change.Post.Data.TrustLine) case change.Pre != nil && change.Post != nil: // Updated - pre := change.Pre.Data.MustTrustLine() - post := change.Post.Data.MustTrustLine() - return p.assetStatSet.AddTrustline(&pre, &post) + return p.assetStatSet.AddTrustline(change.Pre.Data.TrustLine, change.Post.Data.TrustLine) case change.Pre != nil && change.Post == nil: // Removed - pre := change.Pre.Data.MustTrustLine() - return p.assetStatSet.AddTrustline(&pre, nil) + return p.assetStatSet.AddTrustline(change.Pre.Data.TrustLine, nil) default: return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") } From 6e07a4f75dd42ace4754cd0f4912f3743b9a61d8 Mon Sep 17 00:00:00 2001 From: tamirms Date: Sat, 27 Mar 2021 10:00:11 +0100 Subject: [PATCH 14/18] Fix godoc strings --- .../internal/ingest/processors/asset_stats_set.go | 10 ++++++++-- services/horizon/internal/ingest/verify/main.go | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/services/horizon/internal/ingest/processors/asset_stats_set.go b/services/horizon/internal/ingest/processors/asset_stats_set.go index 236c08bf7f..2fe2032112 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set.go @@ -162,7 +162,9 @@ func (s AssetStatSet) addDelta(asset xdr.Asset, deltaBalances, deltaAccounts del return nil } -// AddTrustline updates the set with a trustline entry from a history archive snapshot. +// AddTrustline updates the set to account for how a given trustline has changed. +// pre is the trustline before the change (nil indicates that this change introduced the trustline into the ledger) +// post is the trustline after the change (nil indicates that this change removed the trustline from the ledger) func (s AssetStatSet) AddTrustline( pre *xdr.TrustLineEntry, post *xdr.TrustLineEntry, @@ -193,7 +195,11 @@ func (s AssetStatSet) AddTrustline( return nil } -// AddClaimableBalance updates the set with a claimable balance entry from a history archive snapshot. +// AddClaimableBalance updates the set to account for how a given claimable balance has changed. +// pre is the claimable balance before the change (nil indicates that this change introduced the +// claimable balance into the ledger) +// post is the claimable balance after the change (nil indicates that this change removed the +// claimable balance from the ledger) func (s AssetStatSet) AddClaimableBalance( pre *xdr.ClaimableBalanceEntry, post *xdr.ClaimableBalanceEntry, diff --git a/services/horizon/internal/ingest/verify/main.go b/services/horizon/internal/ingest/verify/main.go index e4951d49b3..941c81ddb5 100644 --- a/services/horizon/internal/ingest/verify/main.go +++ b/services/horizon/internal/ingest/verify/main.go @@ -20,7 +20,7 @@ import ( // that will be used for equality check. type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) -// StateVerifier verifies if ledger entries provided by AddTrustline method are the same +// StateVerifier verifies if ledger entries provided by Add method are the same // as in the checkpoint ledger entries provided by CheckpointChangeReader. // The algorithm works in the following way: // 0. Develop TransformFunction. It should remove all fields and objects not From 923a76863bec171c44bc22381a09e34bb5c9957a Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Wed, 31 Mar 2021 17:16:22 +0100 Subject: [PATCH 15/18] Update services/horizon/internal/docs/reference/endpoints/assets-all.md Co-authored-by: Bartek Nowotarski --- .../horizon/internal/docs/reference/endpoints/assets-all.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/docs/reference/endpoints/assets-all.md b/services/horizon/internal/docs/reference/endpoints/assets-all.md index 37c18a158d..c5281a6b32 100644 --- a/services/horizon/internal/docs/reference/endpoints/assets-all.md +++ b/services/horizon/internal/docs/reference/endpoints/assets-all.md @@ -114,7 +114,7 @@ If called normally this endpoint responds with a [page](../resources/page.md) of "balances": { "authorized": "5000.0000000", "authorized_to_maintain_liabilities": "8000.0000000", - "unauthorized": "2000.0000000" + "unauthorized": "2000.0000000", "claimable_balances": "1200.0000000" }, "flags": { From 92bc98ac464b68d0d0a94e16e2c1a6b6ad29b5f4 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Wed, 31 Mar 2021 17:18:23 +0100 Subject: [PATCH 16/18] Update changelog --- services/horizon/CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 38f9e3efb7..936b0e30ed 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## Unreleased + +* Add `claimable_balances` to asset stat summaries at `/assets` ([3502](https://github.com/stellar/go/pull/3502)). + ## v2.1.0 ### DB State Migration @@ -13,7 +17,6 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ### New features -* Add `claimable_balances` to asset stat summaries at `/assets` ([3502](https://github.com/stellar/go/pull/3502)). * Add an endpoint which determines if Horizon is healthy enough to receive traffic ([3435](https://github.com/stellar/go/pull/3435)). * Sanitize route regular expressions for Prometheus metrics ([3459](https://github.com/stellar/go/pull/3459)). * Add asset stat summaries per trust-line flag category ([3454](https://github.com/stellar/go/pull/3454)). From 148b6264875422d5d8695489d7e373bbecd0ecb1 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Thu, 1 Apr 2021 15:33:37 +0100 Subject: [PATCH 17/18] Move claimable balance balance and accounts out of the structs --- protocols/horizon/main.go | 12 +++--- .../horizon/internal/actions/asset_test.go | 40 +++++++++---------- .../internal/resourceadapter/asset_stat.go | 30 ++++++++------ .../resourceadapter/asset_stat_test.go | 4 +- 4 files changed, 46 insertions(+), 40 deletions(-) diff --git a/protocols/horizon/main.go b/protocols/horizon/main.go index 3d9b32c0be..12d96d74b1 100644 --- a/protocols/horizon/main.go +++ b/protocols/horizon/main.go @@ -166,11 +166,13 @@ type AssetStat struct { } `json:"_links"` base.Asset - PT string `json:"paging_token"` - Accounts AssetStatAccounts `json:"accounts"` + PT string `json:"paging_token"` + Accounts AssetStatAccounts `json:"accounts"` + NumClaimableBalances int32 `json:"num_claimable_balances"` // Action needed in release: horizon-v3.0.0: deprecated field - Amount string `json:"amount"` - Balances AssetStatBalances `json:"balances"` + Amount string `json:"amount"` + Balances AssetStatBalances `json:"balances"` + ClaimableBalancesAmount string `json:"claimable_balances_amount"` // Action needed in release: horizon-v3.0.0: deprecated field NumAccounts int32 `json:"num_accounts"` Flags AccountFlags `json:"flags"` @@ -185,7 +187,6 @@ func (res AssetStat) PagingToken() string { type AssetStatBalances struct { Authorized string `json:"authorized"` AuthorizedToMaintainLiabilities string `json:"authorized_to_maintain_liabilities"` - ClaimableBalances string `json:"claimable_balances"` Unauthorized string `json:"unauthorized"` } @@ -193,7 +194,6 @@ type AssetStatBalances struct { type AssetStatAccounts struct { Authorized int32 `json:"authorized"` AuthorizedToMaintainLiabilities int32 `json:"authorized_to_maintain_liabilities"` - ClaimableBalances int32 `json:"claimable_balances"` Unauthorized int32 `json:"unauthorized"` } diff --git a/services/horizon/internal/actions/asset_test.go b/services/horizon/internal/actions/asset_test.go index f324d3d562..c8416a5292 100644 --- a/services/horizon/internal/actions/asset_test.go +++ b/services/horizon/internal/actions/asset_test.go @@ -145,16 +145,16 @@ func TestAssetStats(t *testing.T) { Authorized: usdAssetStat.Accounts.Authorized, AuthorizedToMaintainLiabilities: usdAssetStat.Accounts.AuthorizedToMaintainLiabilities, Unauthorized: usdAssetStat.Accounts.Unauthorized, - ClaimableBalances: usdAssetStat.Accounts.ClaimableBalances, }, + NumClaimableBalances: usdAssetStat.Accounts.ClaimableBalances, Balances: horizon.AssetStatBalances{ Authorized: "0.0000001", AuthorizedToMaintainLiabilities: "0.0000002", Unauthorized: "0.0000003", - ClaimableBalances: "0.0000010", }, - Amount: "0.0000001", - NumAccounts: usdAssetStat.NumAccounts, + ClaimableBalancesAmount: "0.0000010", + Amount: "0.0000001", + NumAccounts: usdAssetStat.NumAccounts, Asset: base.Asset{ Type: "credit_alphanum4", Code: usdAssetStat.AssetCode, @@ -188,16 +188,16 @@ func TestAssetStats(t *testing.T) { Authorized: etherAssetStat.Accounts.Authorized, AuthorizedToMaintainLiabilities: etherAssetStat.Accounts.AuthorizedToMaintainLiabilities, Unauthorized: etherAssetStat.Accounts.Unauthorized, - ClaimableBalances: etherAssetStat.Accounts.ClaimableBalances, }, + NumClaimableBalances: etherAssetStat.Accounts.ClaimableBalances, Balances: horizon.AssetStatBalances{ Authorized: "0.0000023", AuthorizedToMaintainLiabilities: "0.0000046", Unauthorized: "0.0000092", - ClaimableBalances: "0.0000000", }, - Amount: "0.0000023", - NumAccounts: etherAssetStat.NumAccounts, + ClaimableBalancesAmount: "0.0000000", + Amount: "0.0000023", + NumAccounts: etherAssetStat.NumAccounts, Asset: base.Asset{ Type: "credit_alphanum4", Code: etherAssetStat.AssetCode, @@ -231,16 +231,16 @@ func TestAssetStats(t *testing.T) { Authorized: otherUSDAssetStat.Accounts.Authorized, AuthorizedToMaintainLiabilities: otherUSDAssetStat.Accounts.AuthorizedToMaintainLiabilities, Unauthorized: otherUSDAssetStat.Accounts.Unauthorized, - ClaimableBalances: otherUSDAssetStat.Accounts.ClaimableBalances, }, + NumClaimableBalances: otherUSDAssetStat.Accounts.ClaimableBalances, Balances: horizon.AssetStatBalances{ Authorized: "0.0000001", AuthorizedToMaintainLiabilities: "0.0000002", Unauthorized: "0.0000003", - ClaimableBalances: "0.0000000", }, - Amount: "0.0000001", - NumAccounts: otherUSDAssetStat.NumAccounts, + ClaimableBalancesAmount: "0.0000000", + Amount: "0.0000001", + NumAccounts: otherUSDAssetStat.NumAccounts, Asset: base.Asset{ Type: "credit_alphanum4", Code: otherUSDAssetStat.AssetCode, @@ -276,16 +276,16 @@ func TestAssetStats(t *testing.T) { Authorized: eurAssetStat.Accounts.Authorized, AuthorizedToMaintainLiabilities: eurAssetStat.Accounts.AuthorizedToMaintainLiabilities, Unauthorized: eurAssetStat.Accounts.Unauthorized, - ClaimableBalances: eurAssetStat.Accounts.ClaimableBalances, }, + NumClaimableBalances: eurAssetStat.Accounts.ClaimableBalances, Balances: horizon.AssetStatBalances{ Authorized: "0.0000111", AuthorizedToMaintainLiabilities: "0.0000222", Unauthorized: "0.0000333", - ClaimableBalances: "0.0000000", }, - Amount: "0.0000111", - NumAccounts: eurAssetStat.NumAccounts, + ClaimableBalancesAmount: "0.0000000", + Amount: "0.0000111", + NumAccounts: eurAssetStat.NumAccounts, Asset: base.Asset{ Type: "credit_alphanum4", Code: eurAssetStat.AssetCode, @@ -459,16 +459,16 @@ func TestAssetStatsIssuerDoesNotExist(t *testing.T) { Authorized: 2, AuthorizedToMaintainLiabilities: 3, Unauthorized: 4, - ClaimableBalances: 0, }, + NumClaimableBalances: 0, Balances: horizon.AssetStatBalances{ Authorized: "0.0000001", AuthorizedToMaintainLiabilities: "0.0000002", Unauthorized: "0.0000003", - ClaimableBalances: "0.0000000", }, - Amount: "0.0000001", - NumAccounts: usdAssetStat.NumAccounts, + ClaimableBalancesAmount: "0.0000000", + Amount: "0.0000001", + NumAccounts: usdAssetStat.NumAccounts, Asset: base.Asset{ Type: "credit_alphanum4", Code: usdAssetStat.AssetCode, diff --git a/services/horizon/internal/resourceadapter/asset_stat.go b/services/horizon/internal/resourceadapter/asset_stat.go index f3c228f4c7..1d13bf362d 100644 --- a/services/horizon/internal/resourceadapter/asset_stat.go +++ b/services/horizon/internal/resourceadapter/asset_stat.go @@ -23,16 +23,17 @@ func PopulateAssetStat( res.Asset.Type = xdr.AssetTypeToString[row.AssetType] res.Asset.Code = row.AssetCode res.Asset.Issuer = row.AssetIssuer - res.Accounts = protocol.AssetStatAccounts(row.Accounts) - res.Amount, err = amount.IntStringToAmount(row.Amount) - if err != nil { - return errors.Wrap(err, "Invalid amount in PopulateAssetStat") + res.Accounts = protocol.AssetStatAccounts{ + Authorized: row.Accounts.Authorized, + AuthorizedToMaintainLiabilities: row.Accounts.AuthorizedToMaintainLiabilities, + Unauthorized: row.Accounts.Unauthorized, } - err = populateAssetStatBalances(&res.Balances, row.Balances) + res.NumClaimableBalances = row.Accounts.ClaimableBalances + res.NumAccounts = row.NumAccounts + err = populateAssetStatBalances(res, row.Balances) if err != nil { return err } - res.NumAccounts = row.NumAccounts flags := int8(issuer.Flags) res.Flags = protocol.AccountFlags{ (flags & int8(xdr.AccountFlagsAuthRequiredFlag)) != 0, @@ -51,25 +52,30 @@ func PopulateAssetStat( return } -func populateAssetStatBalances(res *protocol.AssetStatBalances, row history.ExpAssetStatBalances) (err error) { - res.Authorized, err = amount.IntStringToAmount(row.Authorized) +func populateAssetStatBalances(res *protocol.AssetStat, row history.ExpAssetStatBalances) (err error) { + res.Amount, err = amount.IntStringToAmount(row.Authorized) + if err != nil { + return errors.Wrap(err, "Invalid amount in PopulateAssetStat") + } + + res.Balances.Authorized, err = amount.IntStringToAmount(row.Authorized) if err != nil { return errors.Wrapf(err, "Invalid amount in PopulateAssetStatBalances: %q", row.Authorized) } - res.AuthorizedToMaintainLiabilities, err = amount.IntStringToAmount(row.AuthorizedToMaintainLiabilities) + res.Balances.AuthorizedToMaintainLiabilities, err = amount.IntStringToAmount(row.AuthorizedToMaintainLiabilities) if err != nil { return errors.Wrapf(err, "Invalid amount in PopulateAssetStatBalances: %q", row.AuthorizedToMaintainLiabilities) } - res.Unauthorized, err = amount.IntStringToAmount(row.Unauthorized) + res.Balances.Unauthorized, err = amount.IntStringToAmount(row.Unauthorized) if err != nil { return errors.Wrapf(err, "Invalid amount in PopulateAssetStatBalances: %q", row.Unauthorized) } - res.ClaimableBalances, err = amount.IntStringToAmount(row.ClaimableBalances) + res.ClaimableBalancesAmount, err = amount.IntStringToAmount(row.ClaimableBalances) if err != nil { - return errors.Wrapf(err, "Invalid amount in PopulateAssetStatBalances: %q", row.ClaimableBalances) + return errors.Wrapf(err, "Invalid amount in PopulateAssetStatBalances: %q", row.Unauthorized) } return nil diff --git a/services/horizon/internal/resourceadapter/asset_stat_test.go b/services/horizon/internal/resourceadapter/asset_stat_test.go index 9cf9582dec..760b1b1689 100644 --- a/services/horizon/internal/resourceadapter/asset_stat_test.go +++ b/services/horizon/internal/resourceadapter/asset_stat_test.go @@ -47,11 +47,11 @@ func TestPopulateExpAssetStat(t *testing.T) { assert.Equal(t, int32(429), res.Accounts.Authorized) assert.Equal(t, int32(214), res.Accounts.AuthorizedToMaintainLiabilities) assert.Equal(t, int32(107), res.Accounts.Unauthorized) - assert.Equal(t, int32(12), res.Accounts.ClaimableBalances) + assert.Equal(t, int32(12), res.NumClaimableBalances) assert.Equal(t, "10000000000000.0000000", res.Balances.Authorized) assert.Equal(t, "5000000000000.0000000", res.Balances.AuthorizedToMaintainLiabilities) assert.Equal(t, "250000000000.0000000", res.Balances.Unauthorized) - assert.Equal(t, "120000000000.0000000", res.Balances.ClaimableBalances) + assert.Equal(t, "120000000000.0000000", res.ClaimableBalancesAmount) assert.Equal(t, "10000000000000.0000000", res.Amount) assert.Equal(t, int32(429), res.NumAccounts) assert.Equal(t, horizon.AccountFlags{}, res.Flags) From 44d52de87f31069078e40d3e5803dd353be40936 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Thu, 1 Apr 2021 16:10:52 +0100 Subject: [PATCH 18/18] use ingest.Change in assetStatSet.Add* methods --- .../processors/asset_stats_processor.go | 40 ++---------- .../ingest/processors/asset_stats_set.go | 34 +++++----- .../ingest/processors/asset_stats_set_test.go | 63 ++++++++++++++----- services/horizon/internal/ingest/verify.go | 21 ++++++- .../ingest/verify_range_state_test.go | 20 ++++-- 5 files changed, 103 insertions(+), 75 deletions(-) diff --git a/services/horizon/internal/ingest/processors/asset_stats_processor.go b/services/horizon/internal/ingest/processors/asset_stats_processor.go index 0fc71f4722..f8f20cbc83 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_processor.go +++ b/services/horizon/internal/ingest/processors/asset_stats_processor.go @@ -52,9 +52,9 @@ func (p *AssetStatsProcessor) ProcessChange(change ingest.Change) error { switch change.Type { case xdr.LedgerEntryTypeClaimableBalance: - return p.assetStatSet.AddClaimableBalance(nil, change.Post.Data.ClaimableBalance) + return p.assetStatSet.AddClaimableBalance(change) case xdr.LedgerEntryTypeTrustline: - return p.assetStatSet.AddTrustline(nil, change.Post.Data.TrustLine) + return p.assetStatSet.AddTrustline(change) default: return nil } @@ -76,38 +76,6 @@ func (p *AssetStatsProcessor) addToCache(change ingest.Change) error { return nil } -func (p *AssetStatsProcessor) commitClaimableBalanceChange(change ingest.Change) error { - switch { - case change.Pre == nil && change.Post != nil: - // Created - return p.assetStatSet.AddClaimableBalance(nil, change.Post.Data.ClaimableBalance) - case change.Pre != nil && change.Post != nil: - // Updated - return p.assetStatSet.AddClaimableBalance(change.Pre.Data.ClaimableBalance, change.Post.Data.ClaimableBalance) - case change.Pre != nil && change.Post == nil: - // Removed - return p.assetStatSet.AddClaimableBalance(change.Pre.Data.ClaimableBalance, nil) - default: - return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") - } -} - -func (p *AssetStatsProcessor) commitTrustlineChange(change ingest.Change) error { - switch { - case change.Pre == nil && change.Post != nil: - // Created - return p.assetStatSet.AddTrustline(nil, change.Post.Data.TrustLine) - case change.Pre != nil && change.Post != nil: - // Updated - return p.assetStatSet.AddTrustline(change.Pre.Data.TrustLine, change.Post.Data.TrustLine) - case change.Pre != nil && change.Post == nil: - // Removed - return p.assetStatSet.AddTrustline(change.Pre.Data.TrustLine, nil) - default: - return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") - } -} - func (p *AssetStatsProcessor) Commit() error { if !p.useLedgerEntryCache { return p.assetStatsQ.InsertAssetStats(p.assetStatSet.All(), maxBatchSize) @@ -118,9 +86,9 @@ func (p *AssetStatsProcessor) Commit() error { var err error switch change.Type { case xdr.LedgerEntryTypeClaimableBalance: - err = p.commitClaimableBalanceChange(change) + err = p.assetStatSet.AddClaimableBalance(change) case xdr.LedgerEntryTypeTrustline: - err = p.commitTrustlineChange(change) + err = p.assetStatSet.AddTrustline(change) default: return errors.Errorf("Change type %v is unexpected", change.Type) } diff --git a/services/horizon/internal/ingest/processors/asset_stats_set.go b/services/horizon/internal/ingest/processors/asset_stats_set.go index 2fe2032112..3df1da3b10 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set.go @@ -163,12 +163,16 @@ func (s AssetStatSet) addDelta(asset xdr.Asset, deltaBalances, deltaAccounts del } // AddTrustline updates the set to account for how a given trustline has changed. -// pre is the trustline before the change (nil indicates that this change introduced the trustline into the ledger) -// post is the trustline after the change (nil indicates that this change removed the trustline from the ledger) -func (s AssetStatSet) AddTrustline( - pre *xdr.TrustLineEntry, - post *xdr.TrustLineEntry, -) error { +// change must be a xdr.LedgerEntryTypeTrustLine type. +func (s AssetStatSet) AddTrustline(change ingest.Change) error { + var pre, post *xdr.TrustLineEntry + if change.Pre != nil { + pre = change.Pre.Data.TrustLine + } + if change.Post != nil { + post = change.Post.Data.TrustLine + } + deltaAccounts := delta{} deltaBalances := delta{} @@ -196,14 +200,16 @@ func (s AssetStatSet) AddTrustline( } // AddClaimableBalance updates the set to account for how a given claimable balance has changed. -// pre is the claimable balance before the change (nil indicates that this change introduced the -// claimable balance into the ledger) -// post is the claimable balance after the change (nil indicates that this change removed the -// claimable balance from the ledger) -func (s AssetStatSet) AddClaimableBalance( - pre *xdr.ClaimableBalanceEntry, - post *xdr.ClaimableBalanceEntry, -) error { +// change must be a xdr.LedgerEntryTypeClaimableBalance type. +func (s AssetStatSet) AddClaimableBalance(change ingest.Change) error { + var pre, post *xdr.ClaimableBalanceEntry + if change.Pre != nil { + pre = change.Pre.Data.ClaimableBalance + } + if change.Post != nil { + post = change.Post.Data.ClaimableBalance + } + deltaAccounts := delta{} deltaBalances := delta{} diff --git a/services/horizon/internal/ingest/processors/asset_stats_set_test.go b/services/horizon/internal/ingest/processors/asset_stats_set_test.go index f5232dce5d..e1f24b799a 100644 --- a/services/horizon/internal/ingest/processors/asset_stats_set_test.go +++ b/services/horizon/internal/ingest/processors/asset_stats_set_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/xdr" ) @@ -46,10 +47,37 @@ func TestAddNativeClaimableBalance(t *testing.T) { Asset: xdr.MustNewNativeAsset(), Amount: 100, } - assert.NoError(t, set.AddClaimableBalance(nil, &claimableBalance)) + assert.NoError(t, set.AddClaimableBalance( + ingest.Change{ + Post: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + ClaimableBalance: &claimableBalance, + }, + }, + }, + )) assert.Empty(t, set.All()) } +func trustlineChange(pre, post *xdr.TrustLineEntry) ingest.Change { + c := ingest.Change{} + if pre != nil { + c.Pre = &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + TrustLine: pre, + }, + } + } + if post != nil { + c.Post = &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + TrustLine: post, + }, + } + } + return c +} + func TestAddAndRemoveAssetStats(t *testing.T) { set := AssetStatSet{} eur := "EUR" @@ -72,12 +100,13 @@ func TestAddAndRemoveAssetStats(t *testing.T) { assert.NoError( t, - set.AddTrustline(nil, &xdr.TrustLineEntry{ + set.AddTrustline(trustlineChange(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: 1, Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), - }), + }, + )), ) assertAllEquals(t, set, []history.ExpAssetStat{eurAssetStat}) @@ -96,12 +125,12 @@ func TestAddAndRemoveAssetStats(t *testing.T) { assert.NoError( t, - set.AddTrustline(nil, &xdr.TrustLineEntry{ + set.AddTrustline(trustlineChange(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: 24, Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), - }), + })), ) eurAssetStat.Balances.Authorized = "25" @@ -113,44 +142,44 @@ func TestAddAndRemoveAssetStats(t *testing.T) { usd := "USD" assert.NoError( t, - set.AddTrustline(nil, &xdr.TrustLineEntry{ + set.AddTrustline(trustlineChange(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), Asset: xdr.MustNewCreditAsset(usd, trustLineIssuer.Address()), Balance: 10, Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), - }), + })), ) ether := "ETHER" assert.NoError( t, - set.AddTrustline(nil, &xdr.TrustLineEntry{ + set.AddTrustline(trustlineChange(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), Balance: 3, Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), - }), + })), ) // AddTrustline an authorized_to_maintain_liabilities trust line assert.NoError( t, - set.AddTrustline(nil, &xdr.TrustLineEntry{ + set.AddTrustline(trustlineChange(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), Balance: 4, Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedToMaintainLiabilitiesFlag), - }), + })), ) // AddTrustline an unauthorized trust line assert.NoError( t, - set.AddTrustline(nil, &xdr.TrustLineEntry{ + set.AddTrustline(trustlineChange(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(ether, trustLineIssuer.Address()), Balance: 5, - }), + })), ) expected := []history.ExpAssetStat{ { @@ -207,12 +236,12 @@ func TestAddAndRemoveAssetStats(t *testing.T) { func TestOverflowAssetStatSet(t *testing.T) { set := AssetStatSet{} eur := "EUR" - err := set.AddTrustline(nil, &xdr.TrustLineEntry{ + err := set.AddTrustline(trustlineChange(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: math.MaxInt64, Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), - }) + })) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -241,12 +270,12 @@ func TestOverflowAssetStatSet(t *testing.T) { t.Fatalf("expected asset stat to be %v but got %v", eurAssetStat, all[0]) } - err = set.AddTrustline(nil, &xdr.TrustLineEntry{ + err = set.AddTrustline(trustlineChange(nil, &xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset(eur, trustLineIssuer.Address()), Balance: math.MaxInt64, Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag), - }) + })) if err != nil { t.Fatalf("unexpected error %v", err) } diff --git a/services/horizon/internal/ingest/verify.go b/services/horizon/internal/ingest/verify.go index cf9ab3ee07..7d239aa0cd 100644 --- a/services/horizon/internal/ingest/verify.go +++ b/services/horizon/internal/ingest/verify.go @@ -539,7 +539,15 @@ func addTrustLinesToStateVerifier( if err := verifier.Write(entry); err != nil { return err } - if err := assetStats.AddTrustline(nil, &trustline); err != nil { + if err := assetStats.AddTrustline( + ingest.Change{ + Post: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + TrustLine: &trustline, + }, + }, + }, + ); err != nil { return ingest.NewStateError( errors.Wrap(err, "could not add trustline to asset stats"), ) @@ -601,7 +609,16 @@ func addClaimableBalanceToStateVerifier( if err := verifier.Write(entry); err != nil { return err } - if err := assetStats.AddClaimableBalance(nil, &cBalance); err != nil { + + if err := assetStats.AddClaimableBalance( + ingest.Change{ + Post: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + ClaimableBalance: &cBalance, + }, + }, + }, + ); err != nil { return ingest.NewStateError( errors.Wrap(err, "could not add claimable balance to asset stats"), ) diff --git a/services/horizon/internal/ingest/verify_range_state_test.go b/services/horizon/internal/ingest/verify_range_state_test.go index 35882690f0..294d484b7c 100644 --- a/services/horizon/internal/ingest/verify_range_state_test.go +++ b/services/horizon/internal/ingest/verify_range_state_test.go @@ -440,12 +440,20 @@ func (s *VerifyRangeStateTestSuite) TestVerifyFailsWhenAssetStatsMismatch() { set := processors.AssetStatSet{} trustLineIssuer := xdr.MustAddress("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") - set.AddTrustline(nil, &xdr.TrustLineEntry{ - AccountId: xdr.MustAddress(keypair.MustRandom().Address()), - Balance: 123, - Asset: xdr.MustNewCreditAsset("EUR", trustLineIssuer.Address()), - Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedToMaintainLiabilitiesFlag), - }) + set.AddTrustline( + ingest.Change{ + Post: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + TrustLine: &xdr.TrustLineEntry{ + AccountId: xdr.MustAddress(keypair.MustRandom().Address()), + Balance: 123, + Asset: xdr.MustNewCreditAsset("EUR", trustLineIssuer.Address()), + Flags: xdr.Uint32(xdr.TrustLineFlagsAuthorizedToMaintainLiabilitiesFlag), + }, + }, + }, + }, + ) stat := history.ExpAssetStat{ AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4,