Skip to content

Commit

Permalink
services/horizon/internal/db2/history: Insert and query rows from his…
Browse files Browse the repository at this point in the history
…tory lookup tables with one query (#5415)
  • Loading branch information
tamirms authored Aug 23, 2024
1 parent 8257415 commit 2349c8f
Show file tree
Hide file tree
Showing 8 changed files with 466 additions and 546 deletions.
336 changes: 229 additions & 107 deletions services/horizon/internal/db2/history/account_loader.go

Large diffs are not rendered by default.

33 changes: 32 additions & 1 deletion services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestAccountLoader(t *testing.T) {
future := loader.GetFuture(address)
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid account loader state,`)
assert.Contains(t, err.Error(), `invalid loader state,`)
duplicateFuture := loader.GetFuture(address)
assert.Equal(t, future, duplicateFuture)
}
Expand Down Expand Up @@ -55,4 +55,35 @@ func TestAccountLoader(t *testing.T) {
_, err = loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)

// check that Loader works when all the previous values are already
// present in the db and also add 10 more rows to insert
loader = NewAccountLoader()
for i := 0; i < 10; i++ {
addresses = append(addresses, keypair.MustRandom().Address())
}

for _, address := range addresses {
future := loader.GetFuture(address)
_, err = future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid loader state,`)
}

assert.NoError(t, loader.Exec(context.Background(), session))
assert.Equal(t, LoaderStats{
Total: 110,
Inserted: 10,
}, loader.Stats())

for _, address := range addresses {
var internalId int64
internalId, err = loader.GetNow(address)
assert.NoError(t, err)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

}
207 changes: 41 additions & 166 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
package history

import (
"context"
"database/sql/driver"
"fmt"
"sort"
"strings"

"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -40,26 +33,13 @@ func AssetKeyFromXDR(asset xdr.Asset) AssetKey {
// A FutureAssetID is created by an AssetLoader and
// the asset id is available after calling Exec() on
// the AssetLoader.
type FutureAssetID struct {
asset AssetKey
loader *AssetLoader
}

// Value implements the database/sql/driver Valuer interface.
func (a FutureAssetID) Value() (driver.Value, error) {
return a.loader.GetNow(a.asset)
}
type FutureAssetID = future[AssetKey, Asset]

// AssetLoader will map assets to their history
// asset ids. If there is no existing mapping for a given sset,
// the AssetLoader will insert into the history_assets table to
// establish a mapping.
type AssetLoader struct {
sealed bool
set set.Set[AssetKey]
ids map[AssetKey]int64
stats LoaderStats
}
type AssetLoader = loader[AssetKey, Asset]

// NewAssetLoader will construct a new AssetLoader instance.
func NewAssetLoader() *AssetLoader {
Expand All @@ -68,152 +48,47 @@ func NewAssetLoader() *AssetLoader {
set: set.Set[AssetKey]{},
ids: map[AssetKey]int64{},
stats: LoaderStats{},
}
}

// GetFuture registers the given asset into the loader and
// returns a FutureAssetID which will hold the history asset id for
// the asset after Exec() is called.
func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID {
if a.sealed {
panic(errSealed)
}
a.set.Add(asset)
return FutureAssetID{
asset: asset,
loader: a,
}
}

// GetNow returns the history asset id for the given asset.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid asset loader state,
Exec was not called yet to properly seal and resolve %v id`, asset)
}
if internalID, ok := a.ids[asset]; !ok {
return 0, fmt.Errorf(`asset loader id %v was not found`, asset)
} else {
return internalID, nil
}
}

func (a *AssetLoader) lookupKeys(ctx context.Context, q *Q, keys []AssetKey) error {
var rows []Asset
for i := 0; i < len(keys); i += loaderLookupBatchSize {
end := ordered.Min(len(keys), i+loaderLookupBatchSize)
subset := keys[i:end]
args := make([]interface{}, 0, 3*len(subset))
placeHolders := make([]string, 0, len(subset))
for _, key := range subset {
args = append(args, key.Code, key.Type, key.Issuer)
placeHolders = append(placeHolders, "(?, ?, ?)")
}
rawSQL := fmt.Sprintf(
"SELECT * FROM history_assets WHERE (asset_code, asset_type, asset_issuer) in (%s)",
strings.Join(placeHolders, ", "),
)
err := q.SelectRaw(ctx, &rows, rawSQL, args...)
if err != nil {
return errors.Wrap(err, "could not select assets")
}

for _, row := range rows {
a.ids[AssetKey{
Type: row.Type,
Code: row.Code,
Issuer: row.Issuer,
}] = row.ID
}
}
return nil
}

// Exec will look up all the history asset ids for the assets registered in the loader.
// If there are no history asset ids for a given set of assets, Exec will insert rows
// into the history_assets table.
func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) error {
a.sealed = true
if len(a.set) == 0 {
return nil
}
q := &Q{session}
keys := make([]AssetKey, 0, len(a.set))
for key := range a.set {
keys = append(keys, key)
}

if err := a.lookupKeys(ctx, q, keys); err != nil {
return err
}
a.stats.Total += len(keys)

assetTypes := make([]string, 0, len(a.set)-len(a.ids))
assetCodes := make([]string, 0, len(a.set)-len(a.ids))
assetIssuers := make([]string, 0, len(a.set)-len(a.ids))
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Slice(keys, func(i, j int) bool {
return keys[i].String() < keys[j].String()
})
insert := 0
for _, key := range keys {
if _, ok := a.ids[key]; ok {
continue
}
assetTypes = append(assetTypes, key.Type)
assetCodes = append(assetCodes, key.Code)
assetIssuers = append(assetIssuers, key.Issuer)
keys[insert] = key
insert++
}
if insert == 0 {
return nil
}
keys = keys[:insert]

err := bulkInsert(
ctx,
q,
"history_assets",
[]string{"asset_code", "asset_type", "asset_issuer"},
[]bulkInsertField{
{
name: "asset_code",
dbType: "character varying(12)",
objects: assetCodes,
},
{
name: "asset_issuer",
dbType: "character varying(56)",
objects: assetIssuers,
},
{
name: "asset_type",
dbType: "character varying(64)",
objects: assetTypes,
},
name: "AssetLoader",
table: "history_assets",
columnsForKeys: func(keys []AssetKey) []columnValues {
assetTypes := make([]string, 0, len(keys))
assetCodes := make([]string, 0, len(keys))
assetIssuers := make([]string, 0, len(keys))
for _, key := range keys {
assetTypes = append(assetTypes, key.Type)
assetCodes = append(assetCodes, key.Code)
assetIssuers = append(assetIssuers, key.Issuer)
}

return []columnValues{
{
name: "asset_code",
dbType: "character varying(12)",
objects: assetCodes,
},
{
name: "asset_type",
dbType: "character varying(64)",
objects: assetTypes,
},
{
name: "asset_issuer",
dbType: "character varying(56)",
objects: assetIssuers,
},
}
},
mappingFromRow: func(asset Asset) (AssetKey, int64) {
return AssetKey{
Type: asset.Type,
Code: asset.Code,
Issuer: asset.Issuer,
}, asset.ID
},
less: func(a AssetKey, b AssetKey) bool {
return a.String() < b.String()
},
)
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, keys)
}

// Stats returns the number of assets registered in the loader and the number of assets
// inserted into the history_assets table.
func (a *AssetLoader) Stats() LoaderStats {
return a.stats
}

func (a *AssetLoader) Name() string {
return "AssetLoader"
}

// AssetLoaderStub is a stub wrapper around AssetLoader which allows
Expand Down
56 changes: 55 additions & 1 deletion services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestAssetLoader(t *testing.T) {
future := loader.GetFuture(key)
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid asset loader state,`)
assert.Contains(t, err.Error(), `invalid loader state,`)
duplicateFuture := loader.GetFuture(key)
assert.Equal(t, future, duplicateFuture)
}
Expand Down Expand Up @@ -106,4 +106,58 @@ func TestAssetLoader(t *testing.T) {
_, err = loader.GetNow(AssetKey{})
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)

// check that Loader works when all the previous values are already
// present in the db and also add 10 more rows to insert
loader = NewAssetLoader()
for i := 0; i < 10; i++ {
var key AssetKey
if i%2 == 0 {
code := [4]byte{0, 0, 0, 0}
copy(code[:], fmt.Sprintf("ab%d", i))
key = AssetKeyFromXDR(xdr.Asset{
Type: xdr.AssetTypeAssetTypeCreditAlphanum4,
AlphaNum4: &xdr.AlphaNum4{
AssetCode: code,
Issuer: xdr.MustAddress(keypair.MustRandom().Address())}})
} else {
code := [12]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
copy(code[:], fmt.Sprintf("abcdef%d", i))
key = AssetKeyFromXDR(xdr.Asset{
Type: xdr.AssetTypeAssetTypeCreditAlphanum12,
AlphaNum12: &xdr.AlphaNum12{
AssetCode: code,
Issuer: xdr.MustAddress(keypair.MustRandom().Address())}})

}
keys = append(keys, key)
}

for _, key := range keys {
future := loader.GetFuture(key)
_, err = future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid loader state,`)
}
assert.NoError(t, loader.Exec(context.Background(), session))
assert.Equal(t, LoaderStats{
Total: 110,
Inserted: 10,
}, loader.Stats())

for _, key := range keys {
var internalID int64
internalID, err = loader.GetNow(key)
assert.NoError(t, err)
var assetXDR xdr.Asset
if key.Type == "native" {
assetXDR = xdr.MustNewNativeAsset()
} else {
assetXDR = xdr.MustNewCreditAsset(key.Code, key.Issuer)
}
var assetID int64
assetID, err = q.GetAssetID(context.Background(), assetXDR)
assert.NoError(t, err)
assert.Equal(t, assetID, internalID)
}
}
Loading

0 comments on commit 2349c8f

Please sign in to comment.