Skip to content

Commit

Permalink
services/horizon: Reap history object tables when ingestion is idle (#…
Browse files Browse the repository at this point in the history
…4518)

While Horizon removes history data when `--history-retention-count` flag is set
it doesn't clear lookup historical tables. Lookup tables are `[id, key name]`
pairs that allow setting pointers to keys in historical tables, thus saving disk
space. This data can occupy a vast space on disk and is never used when old
historical data is deleted.

This commit adds code responsible for clearing orphaned rows in lookup
historical tables. Orphaned rows can appear when old data is removed by reaper.
The new code is separate from the existing reaper code (see "Alternative
solutions" below) and activates after each ledger if there are no more ledgers
to ingest in the backend. This has two advantages: it does not slow down catchup
and it works only when ingestion is idle which shouldn't affect ingestion at
all. To ensure performance is not affected, the `ReapLookupTables` method is
called with context with 5 seconds timeout which means that if it does not
finish the work in specified time it will simply be cancelled.

The solution here requires new indexes added in c2d52f0 (without it finding the
rows to delete is slow). For each lookup table, we check the number of
occurences of a given lookup ID in all the tables in which lookup table is used.
If no occurences are found, the row is removed from a lookup table. 

Rows are removed in batches of 10000 rows (can be modified in the future). The
cursor is updated when tables is processed so after next ledger ingesion the
next chunk of rows is checked. When cursor reaches the end of table it is reset
back to 0. This ensures that all the orphaned rows are removed eventually (some
rows can be skipped because new rows are added to lookup tables by ingestion and
some are removed by reaper so `offset` does not always skip to the place is
should to cover entire table).

#### Alternative solutions

While working on this I tried to implement @fons'es idea from #4396 which was
removing rows before clearing historical data which are not present in other
ranges. There is a general problem with this solution. The lookup tables are
actively used by ingestion which means that if rows are deleted while ingestion
reads a given row it can create inconsistent data. We could modify reaper to
aquire ingestion lock but if there are many ledgers to remove it can affect
ingestion.

We could also write a query that finds and removes all the orphaned rows but
it's too slow to be executed between ingestion of two consecutive ledgers.
  • Loading branch information
bartekn authored Aug 9, 2022
1 parent f38ecd8 commit ee063a7
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 3 deletions.
140 changes: 140 additions & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -825,6 +826,145 @@ func (q *Q) CloneIngestionQ() IngestionQ {
return &Q{q.Clone()}
}

type tableObjectFieldPair struct {
// name is a table name of history table
name string
// objectField is a name of object field in history table which uses
// the lookup table.
objectField string
}

// ReapLookupTables removes rows from lookup tables like history_claimable_balances
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) {
if q.GetTx() == nil {
return nil, errors.New("cannot be called outside of an ingestion transaction")
}

const batchSize = 10000

if offsets == nil {
offsets = make(map[string]int64)
}

for table, historyTables := range map[string][]tableObjectFieldPair{
"history_claimable_balances": {
{
name: "history_operation_claimable_balances",
objectField: "history_claimable_balance_id",
},
{
name: "history_transaction_claimable_balances",
objectField: "history_claimable_balance_id",
},
},
"history_liquidity_pools": {
{
name: "history_operation_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
{
name: "history_transaction_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
},
} {
query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])
if err != nil {
return nil, errors.Wrap(err, "error constructing a query")
}

_, err = q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType),
query,
)
if err != nil {
return nil, errors.Wrapf(err, "error running query: %s", query)
}

offsets[table] += batchSize

// Check if offset exceeds table size and then reset it
var count int64
err = q.GetRaw(ctx, &count, fmt.Sprintf("SELECT COUNT(*) FROM %s", table))
if err != nil {
return nil, err
}

if offsets[table] > count {
offsets[table] = 0
}
}
return offsets, nil
}

// constructReapLookupTablesQuery creates a query like (using history_claimable_balances
// as an example):
//
// delete from history_claimable_balances where id in
// (select id from
// (select id,
// (select count(*) from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id) as c1,
// (select count(*) from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id) as c2,
// 1 as cx,
// from history_claimable_balances hcb order by id limit 100 offset 1000)
// as sub where c1 = 0 and c2 = 0 and 1=1);
//
// In short it checks the 100 rows omiting 1000 row of history_claimable_balances
// and counts occurences of each row in corresponding history tables.
// If there are no history rows for a given id, the row in
// history_claimable_balances is removed.
//
// The offset param should be increased before each execution. Given that
// some rows will be removed and some will be added by ingestion it's
// possible that rows will be skipped from deletion. But offset is reset
// when it reaches the table size so eventually all orphaned rows are
// deleted.
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) (string, error) {
var sb strings.Builder
var err error
_, err = fmt.Fprintf(&sb, "delete from %s where id IN (select id from (select id, ", table)
if err != nil {
return "", err
}

for i, historyTable := range historyTables {
_, err = fmt.Fprintf(
&sb,
`(select count(*) from %s where %s = hcb.id) as c%d, `,
historyTable.name,
historyTable.objectField,
i,
)
if err != nil {
return "", err
}
}

_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb order by id limit %d offset %d) as sub where ", table, batchSize, offset)
if err != nil {
return "", err
}

for i := range historyTables {
_, err = fmt.Fprintf(&sb, "c%d = 0 and ", i)
if err != nil {
return "", err
}
}

_, err = sb.WriteString("1=1);")
if err != nil {
return "", err
}

return sb.String(), nil
}

// DeleteRangeAll deletes a range of rows from all history tables between
// `start` and `end` (exclusive).
func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
Expand Down
44 changes: 44 additions & 0 deletions services/horizon/internal/db2/history/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

"github.com/stellar/go/services/horizon/internal/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLatestLedger(t *testing.T) {
Expand Down Expand Up @@ -66,3 +68,45 @@ func TestElderLedger(t *testing.T) {
tt.Assert.Equal(1, seq)
}
}

func TestConstructReapLookupTablesQuery(t *testing.T) {
query, err := constructReapLookupTablesQuery(
"history_accounts",
[]tableObjectFieldPair{
{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
{
name: "history_transaction_participants",
objectField: "history_account_id",
},
},
10,
0,
)

require.NoError(t, err)
assert.Equal(t,
"delete from history_accounts where id IN "+
"(select id from "+
"(select id, (select count(*) from history_effects where history_account_id = hcb.id) as c0, "+
"(select count(*) from history_operation_participants where history_account_id = hcb.id) as c1, "+
"(select count(*) from history_trades where base_account_id = hcb.id) as c2, "+
"(select count(*) from history_trades where counter_account_id = hcb.id) as c3, "+
"(select count(*) from history_transaction_participants where history_account_id = hcb.id) as c4, "+
"1 as cx from history_accounts hcb order by id limit 10 offset 0) as sub "+
"where c0 = 0 and c1 = 0 and c2 = 0 and c3 = 0 and c4 = 0 and 1=1);", query)
}
74 changes: 74 additions & 0 deletions services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package history_test

import (
"testing"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ledger"
"github.com/stellar/go/services/horizon/internal/reap"
"github.com/stellar/go/services/horizon/internal/test"
)

func TestReapLookupTables(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
ledgerState := &ledger.State{}
ledgerState.SetStatus(tt.Scenario("kahuna"))

db := tt.HorizonSession()

sys := reap.New(0, db, ledgerState)

var (
prevLedgers, curLedgers int
prevClaimableBalances, curClaimableBalances int
prevLiquidityPools, curLiquidityPools int
)

// Prev
{
err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`)
tt.Require.NoError(err)
}

ledgerState.SetStatus(tt.LoadLedgerStatus())
sys.RetentionCount = 1
err := sys.DeleteUnretainedHistory(tt.Ctx)
tt.Require.NoError(err)

q := &history.Q{tt.HorizonSession()}

err = q.Begin()
tt.Require.NoError(err)

newOffsets, err := q.ReapLookupTables(tt.Ctx, nil)
tt.Require.NoError(err)

err = q.Commit()
tt.Require.NoError(err)

// cur
{
err := db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`)
tt.Require.NoError(err)
}

tt.Assert.Equal(61, prevLedgers, "prevLedgers")
tt.Assert.Equal(1, curLedgers, "curLedgers")
tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances")
tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances")
tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools")
tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools")

tt.Assert.Len(newOffsets, 2)
tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"])
tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"])
}
1 change: 1 addition & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ func (r resumeState) run(s *system) (transition, error) {
localLog.Info("Processed ledger")

s.maybeVerifyState(ingestLedger)
s.maybeReapLookupTables(ingestLedger)

return resumeImmediately(ingestLedger), nil
}
Expand Down
62 changes: 62 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ type Metrics struct {
// duration of rebuilding trade aggregation buckets.
LedgerIngestionTradeAggregationDuration prometheus.Summary

// LedgerIngestionReapLookupTablesDuration exposes timing metrics about the rate and
// duration of reaping lookup tables.
LedgerIngestionReapLookupTablesDuration prometheus.Summary

// StateVerifyDuration exposes timing metrics about the rate and
// duration of state verification.
StateVerifyDuration prometheus.Summary
Expand Down Expand Up @@ -201,6 +205,8 @@ type system struct {
disableStateVerification bool

checkpointManager historyarchive.CheckpointManager

reapOffsets map[string]int64
}

func NewSystem(config Config) (System, error) {
Expand Down Expand Up @@ -313,6 +319,11 @@ func (s *system) initMetrics() {
Help: "ledger ingestion trade aggregation rebuild durations, sliding window = 10m",
})

s.metrics.LedgerIngestionReapLookupTablesDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "ledger_ingestion_reap_lookup_tables_duration_seconds",
Help: "ledger ingestion reap lookup tables durations, sliding window = 10m",
})

s.metrics.StateVerifyDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "state_verify_duration_seconds",
Help: "state verification durations, sliding window = 10m",
Expand Down Expand Up @@ -445,6 +456,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.LocalLatestLedger)
registry.MustRegister(s.metrics.LedgerIngestionDuration)
registry.MustRegister(s.metrics.LedgerIngestionTradeAggregationDuration)
registry.MustRegister(s.metrics.LedgerIngestionReapLookupTablesDuration)
registry.MustRegister(s.metrics.StateVerifyDuration)
registry.MustRegister(s.metrics.StateInvalidGauge)
registry.MustRegister(s.metrics.LedgerStatsCounter)
Expand Down Expand Up @@ -663,6 +675,56 @@ func (s *system) maybeVerifyState(lastIngestedLedger uint32) {
}
}

func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
// Check if lastIngestedLedger is the last one available in the backend
sequence, err := s.ledgerBackend.GetLatestLedgerSequence(s.ctx)
if err != nil {
log.WithField("err", err).Error("Error getting latest ledger sequence from backend")
return
}

if sequence != lastIngestedLedger {
// Catching up - skip reaping tables in this cycle.
return
}

err = s.historyQ.Begin()
if err != nil {
log.WithField("err", err).Error("Error starting a transaction")
return
}
defer s.historyQ.Rollback()

// If so block ingestion in the cluster to reap tables
_, err = s.historyQ.GetLastLedgerIngest(s.ctx)
if err != nil {
log.WithField("err", err).Error(getLastIngestedErrMsg)
return
}

// Make sure reaping will not take more than 5s, which is average ledger
// closing time.
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
defer cancel()

reapStart := time.Now()
newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets)
if err != nil {
log.WithField("err", err).Warn("Error reaping lookup tables")
return
}

err = s.historyQ.Commit()
if err != nil {
log.WithField("err", err).Error("Error commiting a transaction")
return
}

s.reapOffsets = newOffsets
reapDuration := time.Since(reapStart).Seconds()
s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration))
}

func (s *system) incrementStateVerificationErrors() int {
s.stateVerificationMutex.Lock()
defer s.stateVerificationMutex.Unlock()
Expand Down
Loading

0 comments on commit ee063a7

Please sign in to comment.