From 63c4e8dacec814a4a1972ebf0995ecfec202eade Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 9 Aug 2022 15:27:08 +0200 Subject: [PATCH 01/12] services/horizon: Fix ReapLookupTables query --- services/horizon/internal/db2/history/main.go | 22 +++++++++---------- .../horizon/internal/db2/history/main_test.go | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 8bbfe93a0b..5534e8112a 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -884,18 +884,18 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ return nil, errors.Wrapf(err, "error running query: %s", query) } - offsets[table] += batchSize - - // Check if offset exceeds table size and then reset it - var count int64 - err = q.GetRaw(ctx, &count, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)) + // Find new offset + var newOffset int64 + err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize)) if err != nil { - return nil, err + if q.NoRows(err) { + newOffset = 0 + } else { + return nil, err + } } - if offsets[table] > count { - offsets[table] = 0 - } + offsets[table] = newOffset } return offsets, nil } @@ -911,7 +911,7 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ // (select count(*) from history_transaction_claimable_balances // where history_claimable_balance_id = hcb.id) as c2, // 1 as cx, -// from history_claimable_balances hcb order by id limit 100 offset 1000) +// from history_claimable_balances hcb where id > 1000 order by id limit 100) // as sub where c1 = 0 and c2 = 0 and 1=1); // // In short it checks the 100 rows omiting 1000 row of history_claimable_balances @@ -945,7 +945,7 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie } } - _, err = fmt.Fprintf(&sb, "1 as cx from %s hcb order by id limit %d offset %d) as sub where ", table, batchSize, offset) + _, err = fmt.Fprintf(&sb, "1 as cx from %s hcb where id >= %d order by id limit %d) as sub where ", table, offset, batchSize) if err != nil { return "", err } diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index 8123f4c786..cb6579be81 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -107,6 +107,6 @@ func TestConstructReapLookupTablesQuery(t *testing.T) { "(select count(*) from history_trades where base_account_id = hcb.id) as c2, "+ "(select count(*) from history_trades where counter_account_id = hcb.id) as c3, "+ "(select count(*) from history_transaction_participants where history_account_id = hcb.id) as c4, "+ - "1 as cx from history_accounts hcb order by id limit 10 offset 0) as sub "+ + "1 as cx from history_accounts hcb where id >= 0 order by id limit 10) as sub "+ "where c0 = 0 and c1 = 0 and c2 = 0 and c3 = 0 and c4 = 0 and 1=1);", query) } From fde02a6520e47bf779c4446e5ea1bfc11d5b92ad Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 9 Aug 2022 15:36:33 +0200 Subject: [PATCH 02/12] Find new offsets before running a query --- services/horizon/internal/db2/history/main.go | 18 +++++++++--------- services/horizon/internal/ingest/main.go | 2 ++ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 5534e8112a..bd92d11cb0 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -876,15 +876,7 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ return nil, errors.Wrap(err, "error constructing a query") } - _, err = q.ExecRaw( - context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType), - query, - ) - if err != nil { - return nil, errors.Wrapf(err, "error running query: %s", query) - } - - // Find new offset + // Find new offset before removing the rows var newOffset int64 err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize)) if err != nil { @@ -895,6 +887,14 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ } } + _, err = q.ExecRaw( + context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType), + query, + ) + if err != nil { + return nil, errors.Wrapf(err, "error running query: %s", query) + } + offsets[table] = newOffset } return offsets, nil diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 0baad13355..4068bbe5e2 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -721,6 +721,8 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { } s.reapOffsets = newOffsets + // Remove before merging + log.WithField("offsets", newOffsets).Info("New offsets") reapDuration := time.Since(reapStart).Seconds() s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration)) } From 12ef5b97c3347e9c411e764eb19ae23b925621b5 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 9 Aug 2022 16:32:45 +0200 Subject: [PATCH 03/12] Improve query --- services/horizon/internal/db2/history/main.go | 14 +++++++------- services/horizon/internal/db2/history/main_test.go | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index bd92d11cb0..b6f1cf4304 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -906,13 +906,13 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ // delete from history_claimable_balances where id in // (select id from // (select id, -// (select count(*) from history_operation_claimable_balances -// where history_claimable_balance_id = hcb.id) as c1, -// (select count(*) from history_transaction_claimable_balances -// where history_claimable_balance_id = hcb.id) as c2, +// (select 1 from history_operation_claimable_balances +// where history_claimable_balance_id = hcb.id limit 1) as c1, +// (select 1 from history_transaction_claimable_balances +// where history_claimable_balance_id = hcb.id limit 1) as c2, // 1 as cx, // from history_claimable_balances hcb where id > 1000 order by id limit 100) -// as sub where c1 = 0 and c2 = 0 and 1=1); +// as sub where c1 IS NULL and c2 IS NULL and 1=1); // // In short it checks the 100 rows omiting 1000 row of history_claimable_balances // and counts occurences of each row in corresponding history tables. @@ -935,7 +935,7 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie for i, historyTable := range historyTables { _, err = fmt.Fprintf( &sb, - `(select count(*) from %s where %s = hcb.id) as c%d, `, + `(select 1 from %s where %s = hcb.id limit 1) as c%d, `, historyTable.name, historyTable.objectField, i, @@ -951,7 +951,7 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie } for i := range historyTables { - _, err = fmt.Fprintf(&sb, "c%d = 0 and ", i) + _, err = fmt.Fprintf(&sb, "c%d IS NULL and ", i) if err != nil { return "", err } diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index cb6579be81..792f9826aa 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -102,11 +102,11 @@ func TestConstructReapLookupTablesQuery(t *testing.T) { assert.Equal(t, "delete from history_accounts where id IN "+ "(select id from "+ - "(select id, (select count(*) from history_effects where history_account_id = hcb.id) as c0, "+ - "(select count(*) from history_operation_participants where history_account_id = hcb.id) as c1, "+ - "(select count(*) from history_trades where base_account_id = hcb.id) as c2, "+ - "(select count(*) from history_trades where counter_account_id = hcb.id) as c3, "+ - "(select count(*) from history_transaction_participants where history_account_id = hcb.id) as c4, "+ + "(select id, (select 1 from history_effects where history_account_id = hcb.id limit 1) as c0, "+ + "(select 1 from history_operation_participants where history_account_id = hcb.id limit 1) as c1, "+ + "(select 1 from history_trades where base_account_id = hcb.id limit 1) as c2, "+ + "(select 1 from history_trades where counter_account_id = hcb.id limit 1) as c3, "+ + "(select 1 from history_transaction_participants where history_account_id = hcb.id limit 1) as c4, "+ "1 as cx from history_accounts hcb where id >= 0 order by id limit 10) as sub "+ - "where c0 = 0 and c1 = 0 and c2 = 0 and c3 = 0 and c4 = 0 and 1=1);", query) + "where c0 IS NULL and c1 IS NULL and c2 IS NULL and c3 IS NULL and c4 IS NULL and 1=1);", query) } From afaf1e952c6fdb056f078f0198fee7e1cdeb3075 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 9 Aug 2022 16:41:54 +0200 Subject: [PATCH 04/12] Improve query even more --- services/horizon/internal/db2/history/main.go | 40 +++++++++++++++++++ .../horizon/internal/db2/history/main_test.go | 2 +- .../horizon/internal/db2/history/reap_test.go | 18 ++++++++- 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index b6f1cf4304..1061e0582e 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -850,6 +850,46 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ } for table, historyTables := range map[string][]tableObjectFieldPair{ + "history_accounts": { + { + name: "history_effects", + objectField: "history_account_id", + }, + { + name: "history_operation_participants", + objectField: "history_account_id", + }, + { + name: "history_trades", + objectField: "base_account_id", + }, + { + name: "history_trades", + objectField: "counter_account_id", + }, + { + name: "history_transaction_participants", + objectField: "history_account_id", + }, + }, + "history_assets": { + { + name: "history_trades", + objectField: "base_asset_id", + }, + { + name: "history_trades", + objectField: "counter_asset_id", + }, + { + name: "history_trades_60000", + objectField: "base_asset_id", + }, + { + name: "history_trades_60000", + objectField: "counter_asset_id", + }, + }, "history_claimable_balances": { { name: "history_operation_claimable_balances", diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index 792f9826aa..a053bbc35e 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -100,7 +100,7 @@ func TestConstructReapLookupTablesQuery(t *testing.T) { require.NoError(t, err) assert.Equal(t, - "delete from history_accounts where id IN "+ + "delee from history_accounts where id IN "+ "(select id from "+ "(select id, (select 1 from history_effects where history_account_id = hcb.id limit 1) as c0, "+ "(select 1 from history_operation_participants where history_account_id = hcb.id limit 1) as c1, "+ diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index cdd16b5387..0936194a78 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -21,6 +21,8 @@ func TestReapLookupTables(t *testing.T) { var ( prevLedgers, curLedgers int + prevAccounts, curAccounts int + prevAssets, curAssets int prevClaimableBalances, curClaimableBalances int prevLiquidityPools, curLiquidityPools int ) @@ -29,6 +31,10 @@ func TestReapLookupTables(t *testing.T) { { err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`) tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) @@ -55,6 +61,10 @@ func TestReapLookupTables(t *testing.T) { { err := db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) @@ -63,12 +73,18 @@ func TestReapLookupTables(t *testing.T) { tt.Assert.Equal(61, prevLedgers, "prevLedgers") tt.Assert.Equal(1, curLedgers, "curLedgers") + tt.Assert.Equal(25, prevAccounts, "prevAccounts") + tt.Assert.Equal(1, curAccounts, "curAccounts") + tt.Assert.Equal(7, prevAssets, "prevAssets") + tt.Assert.Equal(0, curAssets, "curAssets") tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools") tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") - tt.Assert.Len(newOffsets, 2) + tt.Assert.Len(newOffsets, 4) + tt.Assert.Equal(int64(0), newOffsets["history_accounts"]) + tt.Assert.Equal(int64(0), newOffsets["history_assets"]) tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"]) tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"]) } From 9b1cf595929213092b585f8ae3846ba9f001fc15 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 9 Aug 2022 16:46:33 +0200 Subject: [PATCH 05/12] Fix test --- services/horizon/internal/db2/history/main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index a053bbc35e..792f9826aa 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -100,7 +100,7 @@ func TestConstructReapLookupTablesQuery(t *testing.T) { require.NoError(t, err) assert.Equal(t, - "delee from history_accounts where id IN "+ + "delete from history_accounts where id IN "+ "(select id from "+ "(select id, (select 1 from history_effects where history_account_id = hcb.id limit 1) as c0, "+ "(select 1 from history_operation_participants where history_account_id = hcb.id limit 1) as c1, "+ From 25267c2dfcb4f52f28d787d22e8621c4d1e8809f Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 9 Aug 2022 16:58:06 +0200 Subject: [PATCH 06/12] Remove foreign constraints --- .../horizon/internal/db2/schema/bindata.go | 23 +++++++++++++++++++ .../59_remove_foreign_key_constraints.sql | 20 ++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 services/horizon/internal/db2/schema/migrations/59_remove_foreign_key_constraints.sql diff --git a/services/horizon/internal/db2/schema/bindata.go b/services/horizon/internal/db2/schema/bindata.go index eae5d78fd7..9b2c6ae6a5 100644 --- a/services/horizon/internal/db2/schema/bindata.go +++ b/services/horizon/internal/db2/schema/bindata.go @@ -54,6 +54,7 @@ // migrations/56_txsub_read_only.sql (784B) // migrations/57_trade_aggregation_autovac.sql (282B) // migrations/58_add_index_by_id_optimization.sql (868B) +// migrations/59_remove_foreign_key_constraints.sql (981B) // migrations/5_create_trades_table.sql (1.1kB) // migrations/6_create_assets_table.sql (366B) // migrations/7_modify_trades_table.sql (2.303kB) @@ -1208,6 +1209,26 @@ func migrations58_add_index_by_id_optimizationSql() (*asset, error) { return a, nil } +var _migrations59_remove_foreign_key_constraintsSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x92\xd1\x4a\xc3\x30\x18\x85\xef\xf3\x14\xe7\x72\x43\xf6\x04\xbb\x8a\xcb\x3f\x19\x96\x44\xb2\x78\xb1\xab\x10\xd7\xa8\x41\x5c\x25\x89\xc8\xde\x5e\x94\x52\xda\xda\xa2\x42\xd8\xf5\x39\x1c\xbe\x7c\xf9\x57\x2b\x5c\xbd\x86\xa7\xe8\xb2\xc7\xfd\x1b\x63\xbc\x32\xa4\x61\xf8\x75\x45\x50\xb2\x3a\xe0\x39\xa4\xdc\xc4\xb3\xcd\xd1\xd5\x3e\x41\x68\x75\x87\x8d\x92\x7b\xa3\xf9\x4e\x9a\x51\x6c\x1f\x5c\xf2\xd6\x1d\x8f\xcd\xfb\x29\xdb\x50\xdb\xc7\x17\x7f\x5e\x97\x19\x4d\xc9\x97\x9b\xfc\x06\xf4\xb1\x38\x6a\xb7\x3b\xa4\x65\x7d\xcd\xa2\xf9\x38\xfd\x2a\x9a\x01\x00\x17\xe2\x9f\xae\xb1\x55\x9a\x76\x37\x12\xb7\x74\xc0\x62\xd4\x58\x42\xd3\x96\x34\xc9\x0d\xed\xbb\xb1\x36\x4e\x8b\x50\x2f\xd7\xe5\xb0\xfa\xef\x9f\x82\x6a\xf3\x69\xa4\xaf\xb0\x1c\xd0\xcc\x5f\x0f\xa9\x7e\x96\x2e\x63\x6b\xf2\x60\x66\xd0\xfe\xe8\x0c\xec\x33\x00\x00\xff\xff\xd7\x39\x62\x6b\xd5\x03\x00\x00") + +func migrations59_remove_foreign_key_constraintsSqlBytes() ([]byte, error) { + return bindataRead( + _migrations59_remove_foreign_key_constraintsSql, + "migrations/59_remove_foreign_key_constraints.sql", + ) +} + +func migrations59_remove_foreign_key_constraintsSql() (*asset, error) { + bytes, err := migrations59_remove_foreign_key_constraintsSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "migrations/59_remove_foreign_key_constraints.sql", size: 0, mode: os.FileMode(0), modTime: time.Unix(0, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x8a, 0xef, 0x36, 0x64, 0x26, 0xdf, 0xec, 0x49, 0x62, 0x47, 0xca, 0x8c, 0x1e, 0x29, 0x37, 0x7c, 0x9f, 0xc8, 0x47, 0x94, 0x28, 0xc9, 0xdb, 0x3e, 0x95, 0xdb, 0x8d, 0xc8, 0xf0, 0x38, 0xf1, 0x8c}} + return a, nil +} + var _migrations5_create_trades_tableSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x94\x94\x51\x6f\xaa\x40\x10\x85\xdf\xf9\x15\x13\x9f\x30\x17\x93\x7b\x6f\x5a\x5f\x4c\x9a\x58\x25\xad\xa9\xc1\xd6\x4a\xd2\x37\xb2\xb0\x23\x6c\xa2\x2c\x99\x1d\xda\xf0\xef\x1b\x68\x69\x10\x57\xad\xaf\x9c\x39\x67\x38\xbb\x5f\x76\x34\x82\x3f\x7b\x95\x92\x60\x84\xb0\x70\x66\x6b\x7f\xba\xf1\x61\x33\xbd\x5f\xfa\x90\x29\xc3\x9a\xaa\x88\x49\x48\x34\xe0\x3a\x00\xf0\xf3\x51\x17\x48\x82\x95\xce\x23\x25\x21\x56\xa9\xca\x19\x82\xd5\x06\x82\x70\xb9\xf4\x9a\xc9\x81\x26\x89\x34\x00\x95\x33\xa6\x48\x1d\xb5\x91\xf5\x76\x8b\x64\x35\x37\xb2\xc1\xdd\xee\x84\x5e\xcb\x71\x59\x9d\x75\xeb\x9d\x8c\x84\x31\xc8\x11\x57\x05\x42\x92\x09\x12\x09\x23\xc1\xbb\xa0\x4a\xe5\xa9\x3b\xbe\x19\xf6\x22\x3b\x1e\x65\x4c\x89\x64\x71\xdd\x8e\xcf\xb8\x12\x2d\x6d\x9b\xfe\xfd\xb7\x7b\xf6\xba\xcc\xb9\xff\xff\x30\x7b\xf4\x67\x4f\xe0\x76\x47\xee\xe0\xef\xf0\xbb\x57\xac\xcb\x34\xe3\x6b\x9b\x1d\xb8\xae\xe8\x76\xe0\xfb\x75\xbb\xd6\x75\xb6\xdf\xe1\x50\xdd\xd0\x19\x4e\x9c\x96\xbf\x30\x58\xbc\x84\x3e\x2c\x82\xb9\xff\x06\x19\x93\x8c\x0a\x25\x61\x15\xf4\x91\x0c\x5f\x17\xc1\x03\xc4\x4c\x88\xe0\xda\xc8\xf4\x5a\x0a\x3b\xe1\x9d\xd4\xb8\x8a\x1a\x0c\x2f\x45\xb7\xac\xda\x52\xea\x90\xfa\xb6\x2e\x65\xf4\x90\xf4\xfa\xe4\x78\xc7\x00\x9e\x5a\xf7\x75\x78\x97\x16\x1e\xb1\xe2\x1d\x5f\xa8\x67\x63\xa3\x5e\xdb\x7d\x17\xe6\xfa\x23\x77\xe6\xeb\xd5\xb3\xfd\x5d\x48\x84\x49\x84\xc4\x89\xf3\x19\x00\x00\xff\xff\x79\x87\x24\x6b\x4c\x04\x00\x00") func migrations5_create_trades_tableSqlBytes() ([]byte, error) { @@ -1473,6 +1494,7 @@ var _bindata = map[string]func() (*asset, error){ "migrations/56_txsub_read_only.sql": migrations56_txsub_read_onlySql, "migrations/57_trade_aggregation_autovac.sql": migrations57_trade_aggregation_autovacSql, "migrations/58_add_index_by_id_optimization.sql": migrations58_add_index_by_id_optimizationSql, + "migrations/59_remove_foreign_key_constraints.sql": migrations59_remove_foreign_key_constraintsSql, "migrations/5_create_trades_table.sql": migrations5_create_trades_tableSql, "migrations/6_create_assets_table.sql": migrations6_create_assets_tableSql, "migrations/7_modify_trades_table.sql": migrations7_modify_trades_tableSql, @@ -1577,6 +1599,7 @@ var _bintree = &bintree{nil, map[string]*bintree{ "56_txsub_read_only.sql": &bintree{migrations56_txsub_read_onlySql, map[string]*bintree{}}, "57_trade_aggregation_autovac.sql": &bintree{migrations57_trade_aggregation_autovacSql, map[string]*bintree{}}, "58_add_index_by_id_optimization.sql": &bintree{migrations58_add_index_by_id_optimizationSql, map[string]*bintree{}}, + "59_remove_foreign_key_constraints.sql": &bintree{migrations59_remove_foreign_key_constraintsSql, map[string]*bintree{}}, "5_create_trades_table.sql": &bintree{migrations5_create_trades_tableSql, map[string]*bintree{}}, "6_create_assets_table.sql": &bintree{migrations6_create_assets_tableSql, map[string]*bintree{}}, "7_modify_trades_table.sql": &bintree{migrations7_modify_trades_tableSql, map[string]*bintree{}}, diff --git a/services/horizon/internal/db2/schema/migrations/59_remove_foreign_key_constraints.sql b/services/horizon/internal/db2/schema/migrations/59_remove_foreign_key_constraints.sql new file mode 100644 index 0000000000..7faebe98fe --- /dev/null +++ b/services/horizon/internal/db2/schema/migrations/59_remove_foreign_key_constraints.sql @@ -0,0 +1,20 @@ +-- +migrate Up + +ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_base_account_id_fkey; +ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_base_asset_id_fkey; +ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_counter_account_id_fkey; +ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_counter_asset_id_fkey; + +-- +migrate Down + +ALTER TABLE ONLY history_trades + ADD CONSTRAINT history_trades_base_account_id_fkey FOREIGN KEY (base_account_id) REFERENCES history_accounts(id); + +ALTER TABLE ONLY history_trades + ADD CONSTRAINT history_trades_base_asset_id_fkey FOREIGN KEY (base_asset_id) REFERENCES history_assets(id); + +ALTER TABLE ONLY history_trades + ADD CONSTRAINT history_trades_counter_account_id_fkey FOREIGN KEY (counter_account_id) REFERENCES history_accounts(id); + +ALTER TABLE ONLY history_trades + ADD CONSTRAINT history_trades_counter_asset_id_fkey FOREIGN KEY (counter_asset_id) REFERENCES history_assets(id); From 0b3b8d972feb7848a8fe96e48063f86afac57098 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 9 Aug 2022 17:08:43 +0200 Subject: [PATCH 07/12] Remove history_assets --- services/horizon/internal/db2/history/main.go | 18 ------------------ .../horizon/internal/db2/history/reap_test.go | 10 +--------- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 1061e0582e..dfa02a4d97 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -872,24 +872,6 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ objectField: "history_account_id", }, }, - "history_assets": { - { - name: "history_trades", - objectField: "base_asset_id", - }, - { - name: "history_trades", - objectField: "counter_asset_id", - }, - { - name: "history_trades_60000", - objectField: "base_asset_id", - }, - { - name: "history_trades_60000", - objectField: "counter_asset_id", - }, - }, "history_claimable_balances": { { name: "history_operation_claimable_balances", diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index 0936194a78..04fc9eb0ef 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -22,7 +22,6 @@ func TestReapLookupTables(t *testing.T) { var ( prevLedgers, curLedgers int prevAccounts, curAccounts int - prevAssets, curAssets int prevClaimableBalances, curClaimableBalances int prevLiquidityPools, curLiquidityPools int ) @@ -33,8 +32,6 @@ func TestReapLookupTables(t *testing.T) { tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevAssets, `SELECT COUNT(*) FROM history_assets`) - tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) @@ -63,8 +60,6 @@ func TestReapLookupTables(t *testing.T) { tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) - tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) @@ -75,16 +70,13 @@ func TestReapLookupTables(t *testing.T) { tt.Assert.Equal(1, curLedgers, "curLedgers") tt.Assert.Equal(25, prevAccounts, "prevAccounts") tt.Assert.Equal(1, curAccounts, "curAccounts") - tt.Assert.Equal(7, prevAssets, "prevAssets") - tt.Assert.Equal(0, curAssets, "curAssets") tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools") tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") - tt.Assert.Len(newOffsets, 4) + tt.Assert.Len(newOffsets, 3) tt.Assert.Equal(int64(0), newOffsets["history_accounts"]) - tt.Assert.Equal(int64(0), newOffsets["history_assets"]) tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"]) tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"]) } From 98e68be7a434f7166930b0e2fc28dbb86ee924ea Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 9 Aug 2022 21:59:32 +0200 Subject: [PATCH 08/12] Decrease batchSize --- services/horizon/internal/db2/history/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index dfa02a4d97..7f2c037c1c 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -843,7 +843,7 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ return nil, errors.New("cannot be called outside of an ingestion transaction") } - const batchSize = 10000 + const batchSize = 1000 if offsets == nil { offsets = make(map[string]int64) From a4b617262c937a5bc0647d1272f4fc740f380e40 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 9 Aug 2022 21:59:32 +0200 Subject: [PATCH 09/12] Decrease batchSize --- services/horizon/internal/db2/history/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index dfa02a4d97..7f2c037c1c 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -843,7 +843,7 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ return nil, errors.New("cannot be called outside of an ingestion transaction") } - const batchSize = 10000 + const batchSize = 1000 if offsets == nil { offsets = make(map[string]int64) From d32a2fb5b25a168fda49c22141bcbaccaef685de Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Wed, 10 Aug 2022 13:08:53 +0200 Subject: [PATCH 10/12] Remove debug log --- services/horizon/internal/ingest/main.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index e5993b965b..d67c4f082b 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -724,8 +724,6 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { } s.reapOffsets = newOffsets - // Remove before merging - log.WithField("offsets", newOffsets).Info("New offsets") reapDuration := time.Since(reapStart).Seconds() s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration)) } From 8c2e40cccd0d79cf515cdc1fa3b7b9f8823c677e Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Wed, 10 Aug 2022 13:51:29 +0200 Subject: [PATCH 11/12] Add deleted rows count --- services/horizon/internal/db2/history/main.go | 28 +++++++++++++------ .../horizon/internal/db2/history/reap_test.go | 8 +++++- services/horizon/internal/ingest/main.go | 13 ++++++++- services/horizon/internal/ingest/main_test.go | 12 +++++--- .../internal/ingest/resume_state_test.go | 2 +- 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 7f2c037c1c..c28e902b6c 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -261,7 +261,7 @@ type IngestionQ interface { NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error - ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) + ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error) QTransactions QTrustLines @@ -838,13 +838,19 @@ type tableObjectFieldPair struct { // which aren't used (orphaned), i.e. history entries for them were reaped. // This method must be executed inside ingestion transaction. Otherwise it may // create invalid state in lookup and history tables. -func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) { +func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( + map[string]int64, // deleted rows count + map[string]int64, // new offsets + error, +) { if q.GetTx() == nil { - return nil, errors.New("cannot be called outside of an ingestion transaction") + return nil, nil, errors.New("cannot be called outside of an ingestion transaction") } const batchSize = 1000 + deletedCount := make(map[string]int64) + if offsets == nil { offsets = make(map[string]int64) } @@ -895,7 +901,7 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ } { query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) if err != nil { - return nil, errors.Wrap(err, "error constructing a query") + return nil, nil, errors.Wrap(err, "error constructing a query") } // Find new offset before removing the rows @@ -905,21 +911,27 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ if q.NoRows(err) { newOffset = 0 } else { - return nil, err + return nil, nil, err } } - _, err = q.ExecRaw( + res, err := q.ExecRaw( context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType), query, ) if err != nil { - return nil, errors.Wrapf(err, "error running query: %s", query) + return nil, nil, errors.Wrapf(err, "error running query: %s", query) + } + + rows, err := res.RowsAffected() + if err != nil { + return nil, nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query) } + deletedCount[table] = rows offsets[table] = newOffset } - return offsets, nil + return deletedCount, offsets, nil } // constructReapLookupTablesQuery creates a query like (using history_claimable_balances diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index 04fc9eb0ef..5e62dc606f 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -48,7 +48,7 @@ func TestReapLookupTables(t *testing.T) { err = q.Begin() tt.Require.NoError(err) - newOffsets, err := q.ReapLookupTables(tt.Ctx, nil) + deletedCount, newOffsets, err := q.ReapLookupTables(tt.Ctx, nil) tt.Require.NoError(err) err = q.Commit() @@ -68,12 +68,18 @@ func TestReapLookupTables(t *testing.T) { tt.Assert.Equal(61, prevLedgers, "prevLedgers") tt.Assert.Equal(1, curLedgers, "curLedgers") + tt.Assert.Equal(25, prevAccounts, "prevAccounts") tt.Assert.Equal(1, curAccounts, "curAccounts") + tt.Assert.Equal(int64(24), deletedCount["history_accounts"], `deletedCount["history_accounts"]`) + tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") + tt.Assert.Equal(int64(1), deletedCount["history_claimable_balances"], `deletedCount["history_claimable_balances"]`) + tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools") tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") + tt.Assert.Equal(int64(1), deletedCount["history_liquidity_pools"], `deletedCount["history_liquidity_pools"]`) tt.Assert.Len(newOffsets, 3) tt.Assert.Equal(int64(0), newOffsets["history_accounts"]) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index d67c4f082b..a9517742f9 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -711,7 +711,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { defer cancel() reapStart := time.Now() - newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets) + deletedCount, newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets) if err != nil { log.WithField("err", err).Warn("Error reaping lookup tables") return @@ -723,6 +723,17 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { return } + totalDeleted := int64(0) + reapLog := log + for table, c := range deletedCount { + totalDeleted += c + reapLog = reapLog.WithField(table, c) + } + + if totalDeleted > 0 { + reapLog.Info("Reaper deleted rows from lookup tables") + } + s.reapOffsets = newOffsets reapDuration := time.Since(reapStart).Seconds() s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration)) diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index b5d0dfb78c..7b930e80a2 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -374,12 +374,16 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder(maxBatchSize int) history.TradeBatc return args.Get(0).(history.TradeBatchInsertBuilder) } -func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) { +func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error) { args := m.Called(ctx, offsets) - if args.Get(0) == nil { - return nil, args.Error(1) + var r1, r2 map[string]int64 + if args.Get(0) != nil { + r1 = args.Get(0).(map[string]int64) + } + if args.Get(1) != nil { + r1 = args.Get(1).(map[string]int64) } - return args.Get(0).(map[string]int64), args.Error(1) + return r1, r2, args.Error(2) } func (m *mockDBQ) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error { diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index eadae11bc5..953c7c3e74 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -413,7 +413,7 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(uint32(101), nil) s.historyQ.On("Begin").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() - s.historyQ.On("ReapLookupTables", mock.AnythingOfType("*context.timerCtx"), mock.Anything).Return(nil, errors.New("error reaping objects")).Once() + s.historyQ.On("ReapLookupTables", mock.AnythingOfType("*context.timerCtx"), mock.Anything).Return(nil, nil, errors.New("error reaping objects")).Once() s.historyQ.On("Rollback").Return(nil).Once() next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) From 0317d5a4902295e3bc25728f2b4dac7be51803da Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Wed, 10 Aug 2022 14:43:17 +0200 Subject: [PATCH 12/12] Activate reaping only when --history-retention-count enabled --- services/horizon/internal/ingest/main.go | 5 ++ .../internal/ingest/resume_state_test.go | 48 +++++++++++++++++-- services/horizon/internal/init.go | 4 +- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index a9517742f9..8d6fa27354 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -85,6 +85,7 @@ type Config struct { HistoryArchiveURL string DisableStateVerification bool + EnableReapLookupTables bool EnableExtendedLogLedgerStats bool ReingestEnabled bool @@ -679,6 +680,10 @@ func (s *system) maybeVerifyState(lastIngestedLedger uint32) { } func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { + if !s.config.EnableReapLookupTables { + return + } + // Check if lastIngestedLedger is the last one available in the backend sequence, err := s.ledgerBackend.GetLatestLedgerSequence(s.ctx) if err != nil { diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index 953c7c3e74..7f336a79ae 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -283,8 +283,6 @@ func (s *ResumeTestTestSuite) mockSuccessfulIngestion() { ).Return(nil).Once() s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() - // Reap lookup tables - s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(uint32(0), nil) } func (s *ResumeTestTestSuite) TestBumpIngestLedger() { *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} @@ -368,8 +366,46 @@ func (s *ResumeTestTestSuite) TestErrorSettingCursorIgnored() { s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once() - // Reap lookup tables - s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(uint32(0), nil) + + next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) + s.Assert().NoError(err) + s.Assert().Equal( + transition{ + node: resumeState{latestSuccessfullyProcessedLedger: 101}, + sleepDuration: 0, + }, + next, + ) +} + +func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() + s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() + s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(100), nil) + + s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")). + Run(func(args mock.Arguments) { + meta := args.Get(0).(xdr.LedgerCloseMeta) + s.Assert().Equal(uint32(101), meta.LedgerSequence()) + }). + Return( + ledgerStats{}, + nil, + ).Once() + s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(101)).Return(nil).Once() + s.historyQ.On("Commit").Return(nil).Once() + + s.stellarCoreClient.On( + "SetCursor", + mock.AnythingOfType("*context.timerCtx"), + defaultCoreCursorName, + int32(101), + ).Return(nil).Once() + + s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() + s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once() + // Reap lookup tables not executed next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) s.Assert().NoError(err) @@ -383,6 +419,10 @@ func (s *ResumeTestTestSuite) TestErrorSettingCursorIgnored() { } func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { + s.system.config.EnableReapLookupTables = true + defer func() { + s.system.config.EnableReapLookupTables = false + }() s.historyQ.On("Begin").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 3c548691e5..4150a1e2a7 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -2,10 +2,11 @@ package horizon import ( "context" - "github.com/stellar/go/services/horizon/internal/paths" "net/http" "runtime" + "github.com/stellar/go/services/horizon/internal/paths" + "github.com/getsentry/raven-go" "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/exp/orderbook" @@ -116,6 +117,7 @@ func initIngester(app *App) { RemoteCaptiveCoreURL: app.config.RemoteCaptiveCoreURL, EnableCaptiveCore: app.config.EnableCaptiveCoreIngestion, DisableStateVerification: app.config.IngestDisableStateVerification, + EnableReapLookupTables: app.config.HistoryRetentionCount > 0, EnableExtendedLogLedgerStats: app.config.IngestEnableExtendedLogLedgerStats, RoundingSlippageFilter: app.config.RoundingSlippageFilter, EnableIngestionFiltering: app.config.EnableIngestionFiltering,