Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Use new ingestion data in /accounts/{account_id} #1868

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion services/horizon/internal/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package horizon

import (
"context"
"database/sql"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -175,7 +176,30 @@ type showActionQueryParams struct {

// getAccountInfo returns the information about an account based on the provided param.
func (w *web) getAccountInfo(ctx context.Context, qp *showActionQueryParams) (interface{}, error) {
return actions.AccountInfo(ctx, &core.Q{w.coreSession(ctx)}, qp.AccountID)
// Use AppFromContext to prevent larger refactoring of actions code. Will
// be removed once this endpoint is migrated to use new actions design.
app := AppFromContext(ctx)
var historyQ *history.Q

if app.config.EnableExperimentalIngestion {
horizonSession, err := w.horizonSession(ctx)
if err != nil {
return nil, errors.Wrap(err, "getting horizon db session")
}

err = horizonSession.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
if err != nil {
return nil, errors.Wrap(err, "error starting transaction")
}

defer horizonSession.Rollback()
historyQ = &history.Q{horizonSession}
}

return actions.AccountInfo(ctx, &core.Q{w.coreSession(ctx)}, historyQ, qp.AccountID, app.config.EnableExperimentalIngestion)
}

// getTransactionPage returns a page containing the transaction records of an account or a ledger.
Expand Down
9 changes: 7 additions & 2 deletions services/horizon/internal/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ func TestGetAccountInfo(t *testing.T) {

w := mustInitWeb(context.Background(), &history.Q{tt.HorizonSession()}, &core.Q{tt.CoreSession()}, time.Duration(5), 0, true)

res, err := w.getAccountInfo(tt.Ctx, &showActionQueryParams{AccountID: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU"})
ctx := withAppContext(tt.Ctx, &App{
config: Config{
EnableExperimentalIngestion: false,
},
})
res, err := w.getAccountInfo(ctx, &showActionQueryParams{AccountID: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU"})
tt.Assert.NoError(err)

account, ok := res.(*horizon.Account)
Expand All @@ -46,7 +51,7 @@ func TestGetAccountInfo(t *testing.T) {
}
}

_, err = w.getAccountInfo(tt.Ctx, &showActionQueryParams{AccountID: "GDBAPLDCAEJV6LSEDFEAUDAVFYSNFRUYZ4X75YYJJMMX5KFVUOHX46SQ"})
_, err = w.getAccountInfo(ctx, &showActionQueryParams{AccountID: "GDBAPLDCAEJV6LSEDFEAUDAVFYSNFRUYZ4X75YYJJMMX5KFVUOHX46SQ"})
tt.Assert.Equal(errors.Cause(err), sql.ErrNoRows)
}

Expand Down
199 changes: 197 additions & 2 deletions services/horizon/internal/actions/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@ package actions

import (
"context"
"encoding/json"
"fmt"
"net/http"

protocol "github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/services/horizon/internal/db2/core"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/resourceadapter"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/render/hal"
"github.com/stellar/go/support/render/problem"
"github.com/stellar/go/xdr"
)

// AccountInfo returns the information about an account identified by addr.
func AccountInfo(ctx context.Context, cq *core.Q, addr string) (*protocol.Account, error) {
func AccountInfo(ctx context.Context, cq *core.Q, hq *history.Q, addr string, enableExperimentalIngestion bool) (*protocol.Account, error) {
var (
coreRecord core.Account
coreData []core.AccountData
Expand Down Expand Up @@ -52,8 +55,200 @@ func AccountInfo(ctx context.Context, cq *core.Q, addr string) (*protocol.Accoun
coreSigners,
coreTrustlines,
)
if err != nil {
return nil, errors.Wrap(err, "populating account")
}

if enableExperimentalIngestion {
c, err := json.Marshal(resource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not pass in the resource directly to compareAccountResults() ? you end up unmarshaling the bytes in compareAccountResults() so you could skip that step by sending the protocol.Account instance to compareAccountResults()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's explained in the comments:

  // We send JSON bytes to compareAccountResults to prevent modifying
  // `resource` in any way.

It's to minimize chances of rendering data from expingest instead of stellar-core by accident. The compare code will be removed when expingest goes to production.

if err != nil {
return nil, errors.Wrap(err, "error marshaling resource")
}

// We send JSON bytes to compareAccountResults to prevent modifying
// `resource` in any way.
err = compareAccountResults(ctx, hq, c, addr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check that both ingestion systems are at the same ledger before comparing results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do check this using LastModifiedLedger in accountResourcesEqual.

if err != nil {
log.Ctx(ctx).WithFields(log.F{
"err": err,
"accounts_check": true, // So it's easy to find all diffs
}).Warn("error comparing core and horizon accounts")
}
}

return &resource, nil
}

func compareAccountResults(
ctx context.Context,
hq *history.Q,
expectedResourceBytes []byte,
addr string,
) error {
var (
horizonRecord history.AccountEntry
horizonData []history.Data
horizonSigners []history.AccountSigner
horizonTrustLines []history.TrustLine
newResource protocol.Account
)

horizonRecord, err := hq.GetAccountByID(addr)
if err != nil {
return err
}

horizonData, err = hq.GetAccountDataByAccountID(addr)
if err != nil {
return err
}

return &resource, errors.Wrap(err, "populating account")
horizonSigners, err = hq.GetAccountSignersByAccountID(addr)
if err != nil {
return err
}

horizonTrustLines, err = hq.GetTrustLinesByAccountID(addr)
if err != nil {
return err
}

err = resourceadapter.PopulateAccountEntry(
ctx,
&newResource,
horizonRecord,
horizonData,
horizonSigners,
horizonTrustLines,
)
if err != nil {
return err
}

var expectedResource protocol.Account
err = json.Unmarshal(expectedResourceBytes, &expectedResource)
if err != nil {
return errors.Wrap(err, "Error unmarshaling expectedResourceBytes")
}

if err = accountResourcesEqual(newResource, expectedResource); err != nil {
return errors.Wrap(
err,
fmt.Sprintf(
"Core and Horizon accounts responses do not match: %+v %+v",
expectedResource, newResource,
))

}

return nil
}

// accountResourcesEqual compares two protocol.Account objects and returns an
// error if they are different but only if `LastModifiedLedger` fields are the
// same.
func accountResourcesEqual(actual, expected protocol.Account) error {
if actual.Links != expected.Links {
return errors.New("Links are different")
}

if actual.LastModifiedLedger != expected.LastModifiedLedger {
// Modified at different ledgers so values will be different
return nil
}

if actual.ID != expected.ID ||
actual.AccountID != expected.AccountID ||
actual.Sequence != expected.Sequence ||
actual.SubentryCount != expected.SubentryCount ||
actual.InflationDestination != expected.InflationDestination ||
actual.HomeDomain != expected.HomeDomain ||
actual.Thresholds != expected.Thresholds ||
actual.Flags != expected.Flags {
return errors.New("Main fields are different")
}

// Ignore PT

// Balances
balances := map[string]protocol.Balance{}
for _, balance := range expected.Balances {
id := balance.Asset.Type + balance.Asset.Code + balance.Asset.Issuer
balances[id] = balance
}

for _, actualBalance := range actual.Balances {
id := actualBalance.Asset.Type + actualBalance.Asset.Code + actualBalance.Asset.Issuer
expectedBalance := balances[id]
delete(balances, id)

if expectedBalance.LastModifiedLedger != actualBalance.LastModifiedLedger {
// Modified at different ledgers so values will be different
continue
}

if expectedBalance.Balance != actualBalance.Balance ||
expectedBalance.Limit != actualBalance.Limit ||
expectedBalance.BuyingLiabilities != actualBalance.BuyingLiabilities ||
expectedBalance.SellingLiabilities != actualBalance.SellingLiabilities {
return errors.New("Balance " + id + " is different")
}

if expectedBalance.IsAuthorized == nil && actualBalance.IsAuthorized == nil {
continue
}

if expectedBalance.IsAuthorized != nil && actualBalance.IsAuthorized != nil &&
*expectedBalance.IsAuthorized == *actualBalance.IsAuthorized {
continue
}

return errors.New("IsAuthorized is different for " + id)
}

if len(balances) > 0 {
return errors.New("Some extra balances")
}

// Signers
signers := map[string]protocol.Signer{}
for _, signer := range expected.Signers {
signers[signer.Key] = signer
}

for _, actualSigner := range actual.Signers {
expectedSigner := signers[actualSigner.Key]
delete(signers, actualSigner.Key)

if expectedSigner != actualSigner {
return errors.New("Signer is different")
}
}

if len(signers) > 0 {
return errors.New("Extra signers")
}

// Data
data := map[string]string{}
for key, value := range expected.Data {
data[key] = value
}

for actualKey, actualValue := range actual.Data {
expectedValue := data[actualKey]
delete(data, actualKey)

if expectedValue != actualValue {
return errors.New("Data is different")
}
}

if len(data) > 0 {
return errors.New("Extra data")
}

return nil
}

// AccountsQuery query struct for accounts end-point
Expand Down
8 changes: 7 additions & 1 deletion services/horizon/internal/actions/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,13 @@ func TestAccountInfo(t *testing.T) {
tt := test.Start(t).Scenario("allow_trust")
defer tt.Finish()

account, err := AccountInfo(tt.Ctx, &core.Q{tt.CoreSession()}, signer)
account, err := AccountInfo(
tt.Ctx,
&core.Q{tt.CoreSession()},
&history.Q{tt.HorizonSession()},
signer,
false,
)
tt.Assert.NoError(err)

tt.Assert.Equal("8589934593", account.Sequence)
Expand Down
8 changes: 8 additions & 0 deletions services/horizon/internal/db2/history/account_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ func (q *Q) CountAccountsData() (int, error) {
return count, nil
}

// GetAccountDataByAccountID loads account data for a given account ID
func (q *Q) GetAccountDataByAccountID(id string) ([]Data, error) {
abuiles marked this conversation as resolved.
Show resolved Hide resolved
var data []Data
sql := selectAccountData.Where(sq.Eq{"account": id})
err := q.Select(&data, sql)
return data, err
}

// GetAccountDataByKeys loads a row from the `accounts_data` table, selected by multiple keys.
func (q *Q) GetAccountDataByKeys(keys []xdr.LedgerKeyData) ([]Data, error) {
var data []Data
Expand Down
Loading