Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Fix ReapLookupTables query #4525

Merged
merged 15 commits into from
Aug 11, 2022
86 changes: 60 additions & 26 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error)
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -839,18 +839,46 @@ type tableObjectFieldPair struct {
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) {
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
map[string]int64, // deleted rows count
map[string]int64, // new offsets
error,
) {
if q.GetTx() == nil {
return nil, errors.New("cannot be called outside of an ingestion transaction")
return nil, nil, errors.New("cannot be called outside of an ingestion transaction")
}

const batchSize = 10000
const batchSize = 1000

deletedCount := make(map[string]int64)

if offsets == nil {
offsets = make(map[string]int64)
}

for table, historyTables := range map[string][]tableObjectFieldPair{
"history_accounts": {
{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
{
name: "history_transaction_participants",
objectField: "history_account_id",
},
},
"history_claimable_balances": {
{
name: "history_operation_claimable_balances",
Expand All @@ -874,31 +902,37 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[
} {
query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])
if err != nil {
return nil, errors.Wrap(err, "error constructing a query")
return nil, nil, errors.Wrap(err, "error constructing a query")
}

// Find new offset before removing the rows
var newOffset int64
err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize))
if err != nil {
if q.NoRows(err) {
newOffset = 0
} else {
return nil, nil, err
}
}

_, err = q.ExecRaw(
res, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType),
query,
)
if err != nil {
return nil, errors.Wrapf(err, "error running query: %s", query)
return nil, nil, errors.Wrapf(err, "error running query: %s", query)
}

offsets[table] += batchSize

// Check if offset exceeds table size and then reset it
var count int64
err = q.GetRaw(ctx, &count, fmt.Sprintf("SELECT COUNT(*) FROM %s", table))
rows, err := res.RowsAffected()
if err != nil {
return nil, err
return nil, nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query)
}

if offsets[table] > count {
offsets[table] = 0
}
deletedCount[table] = rows
offsets[table] = newOffset
}
return offsets, nil
return deletedCount, offsets, nil
}

// constructReapLookupTablesQuery creates a query like (using history_claimable_balances
Expand All @@ -908,13 +942,13 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[
//
// (select id from
// (select id,
// (select count(*) from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id) as c1,
// (select count(*) from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id) as c2,
// (select 1 from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c1,
// (select 1 from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c2,
// 1 as cx,
// from history_claimable_balances hcb order by id limit 100 offset 1000)
// as sub where c1 = 0 and c2 = 0 and 1=1);
// from history_claimable_balances hcb where id > 1000 order by id limit 100)
// as sub where c1 IS NULL and c2 IS NULL and 1=1);
//
// In short it checks the 100 rows omiting 1000 row of history_claimable_balances
// and counts occurences of each row in corresponding history tables.
Expand All @@ -937,7 +971,7 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie
for i, historyTable := range historyTables {
_, err = fmt.Fprintf(
&sb,
`(select count(*) from %s where %s = hcb.id) as c%d, `,
`(select 1 from %s where %s = hcb.id limit 1) as c%d, `,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice optimization 👍

historyTable.name,
historyTable.objectField,
i,
Expand All @@ -947,13 +981,13 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie
}
}

_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb order by id limit %d offset %d) as sub where ", table, batchSize, offset)
_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb where id >= %d order by id limit %d) as sub where ", table, offset, batchSize)
if err != nil {
return "", err
}

for i := range historyTables {
_, err = fmt.Fprintf(&sb, "c%d = 0 and ", i)
_, err = fmt.Fprintf(&sb, "c%d IS NULL and ", i)
if err != nil {
return "", err
}
Expand Down
14 changes: 7 additions & 7 deletions services/horizon/internal/db2/history/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func TestConstructReapLookupTablesQuery(t *testing.T) {
assert.Equal(t,
"delete from history_accounts where id IN "+
"(select id from "+
"(select id, (select count(*) from history_effects where history_account_id = hcb.id) as c0, "+
"(select count(*) from history_operation_participants where history_account_id = hcb.id) as c1, "+
"(select count(*) from history_trades where base_account_id = hcb.id) as c2, "+
"(select count(*) from history_trades where counter_account_id = hcb.id) as c3, "+
"(select count(*) from history_transaction_participants where history_account_id = hcb.id) as c4, "+
"1 as cx from history_accounts hcb order by id limit 10 offset 0) as sub "+
"where c0 = 0 and c1 = 0 and c2 = 0 and c3 = 0 and c4 = 0 and 1=1);", query)
"(select id, (select 1 from history_effects where history_account_id = hcb.id limit 1) as c0, "+
"(select 1 from history_operation_participants where history_account_id = hcb.id limit 1) as c1, "+
"(select 1 from history_trades where base_account_id = hcb.id limit 1) as c2, "+
"(select 1 from history_trades where counter_account_id = hcb.id limit 1) as c3, "+
"(select 1 from history_transaction_participants where history_account_id = hcb.id limit 1) as c4, "+
"1 as cx from history_accounts hcb where id >= 0 order by id limit 10) as sub "+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are negative ids possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the 0 in id >=0 condition is the offset param and it's there just to ensure we move forward when iterating over the tables in batches. So for next cycle it will be id >=1000 if the ID after 1000 rows (batch size) is equal 1000.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could probably clean this up with WITH clauses, and named sub-queries?

"where c0 IS NULL and c1 IS NULL and c2 IS NULL and c3 IS NULL and c4 IS NULL and 1=1);", query)
}
18 changes: 16 additions & 2 deletions services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestReapLookupTables(t *testing.T) {

var (
prevLedgers, curLedgers int
prevAccounts, curAccounts int
prevClaimableBalances, curClaimableBalances int
prevLiquidityPools, curLiquidityPools int
)
Expand All @@ -29,6 +30,8 @@ func TestReapLookupTables(t *testing.T) {
{
err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`)
Expand All @@ -45,7 +48,7 @@ func TestReapLookupTables(t *testing.T) {
err = q.Begin()
tt.Require.NoError(err)

newOffsets, err := q.ReapLookupTables(tt.Ctx, nil)
deletedCount, newOffsets, err := q.ReapLookupTables(tt.Ctx, nil)
tt.Require.NoError(err)

err = q.Commit()
Expand All @@ -55,6 +58,8 @@ func TestReapLookupTables(t *testing.T) {
{
err := db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`)
Expand All @@ -63,12 +68,21 @@ func TestReapLookupTables(t *testing.T) {

tt.Assert.Equal(61, prevLedgers, "prevLedgers")
tt.Assert.Equal(1, curLedgers, "curLedgers")

tt.Assert.Equal(25, prevAccounts, "prevAccounts")
tt.Assert.Equal(1, curAccounts, "curAccounts")
tt.Assert.Equal(int64(24), deletedCount["history_accounts"], `deletedCount["history_accounts"]`)

tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances")
tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances")
tt.Assert.Equal(int64(1), deletedCount["history_claimable_balances"], `deletedCount["history_claimable_balances"]`)

tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools")
tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools")
tt.Assert.Equal(int64(1), deletedCount["history_liquidity_pools"], `deletedCount["history_liquidity_pools"]`)

tt.Assert.Len(newOffsets, 2)
tt.Assert.Len(newOffsets, 3)
tt.Assert.Equal(int64(0), newOffsets["history_accounts"])
tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"])
tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"])
}
23 changes: 23 additions & 0 deletions services/horizon/internal/db2/schema/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +migrate Up

ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_base_account_id_fkey;
ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_base_asset_id_fkey;
ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_counter_account_id_fkey;
ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_counter_asset_id_fkey;

-- +migrate Down

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_base_account_id_fkey FOREIGN KEY (base_account_id) REFERENCES history_accounts(id);

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_base_asset_id_fkey FOREIGN KEY (base_asset_id) REFERENCES history_assets(id);

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_counter_account_id_fkey FOREIGN KEY (counter_account_id) REFERENCES history_accounts(id);

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_counter_asset_id_fkey FOREIGN KEY (counter_asset_id) REFERENCES history_assets(id);
18 changes: 17 additions & 1 deletion services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Config struct {
HistoryArchiveURL string

DisableStateVerification bool
EnableReapLookupTables bool
EnableExtendedLogLedgerStats bool

ReingestEnabled bool
Expand Down Expand Up @@ -680,6 +681,10 @@ func (s *system) maybeVerifyState(lastIngestedLedger uint32) {
}

func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
if !s.config.EnableReapLookupTables {
return
}

// Check if lastIngestedLedger is the last one available in the backend
sequence, err := s.ledgerBackend.GetLatestLedgerSequence(s.ctx)
if err != nil {
Expand Down Expand Up @@ -712,7 +717,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
defer cancel()

reapStart := time.Now()
newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets)
deletedCount, newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets)
if err != nil {
log.WithField("err", err).Warn("Error reaping lookup tables")
return
Expand All @@ -724,6 +729,17 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
return
}

totalDeleted := int64(0)
reapLog := log
for table, c := range deletedCount {
totalDeleted += c
reapLog = reapLog.WithField(table, c)
}

if totalDeleted > 0 {
reapLog.Info("Reaper deleted rows from lookup tables")
}

s.reapOffsets = newOffsets
reapDuration := time.Since(reapStart).Seconds()
s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration))
Expand Down
12 changes: 8 additions & 4 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,16 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder(maxBatchSize int) history.TradeBatc
return args.Get(0).(history.TradeBatchInsertBuilder)
}

func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) {
func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error) {
args := m.Called(ctx, offsets)
if args.Get(0) == nil {
return nil, args.Error(1)
var r1, r2 map[string]int64
if args.Get(0) != nil {
r1 = args.Get(0).(map[string]int64)
}
if args.Get(1) != nil {
r1 = args.Get(1).(map[string]int64)
}
return args.Get(0).(map[string]int64), args.Error(1)
return r1, r2, args.Error(2)
}

func (m *mockDBQ) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error {
Expand Down
Loading