Skip to content

Commit

Permalink
service/ingest: repeated reingest of overlapping range causes duplica…
Browse files Browse the repository at this point in the history
…te issue on liquidity pool tables(#4114) (#4114)
  • Loading branch information
sreuland authored Dec 2, 2021
1 parent 1a8c282 commit e560a90
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 48 deletions.
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

### Changes
* Return inner and outer result codes for fee bump transactions ([4081](https://github.com/stellar/go/pull/4081))
* Prevent duplicate errors related to liquidity pool tables during repeated reingestion of same range ([4114](https://github.com/stellar/go/pull/4114))

## v2.11.0

Expand Down
2 changes: 2 additions & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,11 +810,13 @@ func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
"history_ledgers": "id",
"history_operation_claimable_balances": "history_operation_id",
"history_operation_participants": "history_operation_id",
"history_operation_liquidity_pools": "history_operation_id",
"history_operations": "id",
"history_trades": "history_operation_id",
"history_trades_60000": "open_ledger_toid",
"history_transaction_claimable_balances": "history_transaction_id",
"history_transaction_participants": "history_transaction_id",
"history_transaction_liquidity_pools": "history_transaction_id",
"history_transactions": "id",
} {
err := q.DeleteRange(ctx, start, end, table, column)
Expand Down
28 changes: 5 additions & 23 deletions services/horizon/internal/docs/notes_for_developers.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,15 @@ close_ledger
payment :scott, :bartek, [:native, 5]
```

You can find more recipes in [`scc` examples](https://github.com/stellar/stellar_core_commander/tree/84d5ffb97202ecc3a0ed34a739c98e69536c0c2c/examples) and [horizon test scenarios](https://github.com/stellar/go/tree/master/services/horizon/internal/test/scenarios).
You can find more recipes in [`scc` examples](https://github.com/stellar/stellar_core_commander/tree/84d5ffb97202ecc3a0ed34a739c98e69536c0c2c/examples).

### Rebuilding scenarios
Scenarios are in [horizon test scenarios](https://github.com/stellar/go/tree/master/services/horizon/internal/test/scenarios). They are
used by many different integration tests.

1. Create a new or modify existing recipe. All new recipes should be added to [horizon test scenarios](https://github.com/stellar/go/tree/master/services/horizon/internal/test/scenarios) directory.
2. In `stellar/go` repository root directory run `./services/horizon/internal/scripts/build_test_scenarios.bash`.
3. The command above will rebuild all test scenarios. If you need to rebuild only one scenario modify `PACKAGES` environment variable temporarily in the script.
### Deprecated Scenario sql files

### Using test scenarios
1. Scenario .sql files are located in services/horizon/internal/test/scenarios and have been used in unit and integeration tests, however, they are deprecated and are not meant to be used or included in new development. They were manually maintained and have not been updated with more recent db schema changes and are not associated with db migrations.

In your `Test*` function execute:

```go
ht := StartHTTPTest(t, scenarioName)
defer ht.Finish()
```
where `scenarioName` is the name of the scenario you want to use. This will start test Horizon server with data loaded from the recipe.

When testing ingestion you can load scenario data without Horizon database like:

```go
tt := test.Start(t).ScenarioWithoutHorizon("kahuna")
defer tt.Finish()
s := ingest(tt, true)
```

Check existing tests for more examples.

## <a name="tests"></a> Running Tests

Expand Down
138 changes: 122 additions & 16 deletions services/horizon/internal/integration/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package integration
import (
"context"
"fmt"
horizon "github.com/stellar/go/services/horizon/internal"
"github.com/stellar/go/services/horizon/internal/db2/history"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/stellar/go/clients/horizonclient"
"github.com/stellar/go/keypair"
horizon "github.com/stellar/go/services/horizon/internal"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/xdr"

"github.com/stretchr/testify/assert"

"github.com/stellar/go/historyarchive"
Expand All @@ -21,22 +25,123 @@ import (
"github.com/stellar/go/txnbuild"
)

func initializeDBIntegrationTest(t *testing.T) (itest *integration.Test, reachedLedger int32) {
itest = integration.NewTest(t, protocol15Config)
func generateLiquidityPoolOps(itest *integration.Test, tt *assert.Assertions) (lastLedger int32) {

master := itest.Master()
keys, accounts := itest.CreateAccounts(2, "1000")
shareKeys, shareAccount := keys[0], accounts[0]
tradeKeys, tradeAccount := keys[1], accounts[1]

itest.MustSubmitMultiSigOperations(shareAccount, []*keypair.Full{shareKeys, master},
&txnbuild.ChangeTrust{
Line: txnbuild.ChangeTrustAssetWrapper{
Asset: txnbuild.CreditAsset{
Code: "USD",
Issuer: master.Address(),
},
},
Limit: txnbuild.MaxTrustlineLimit,
},
&txnbuild.ChangeTrust{
Line: txnbuild.LiquidityPoolShareChangeTrustAsset{
LiquidityPoolParameters: txnbuild.LiquidityPoolParameters{
AssetA: txnbuild.NativeAsset{},
AssetB: txnbuild.CreditAsset{
Code: "USD",
Issuer: master.Address(),
},
Fee: 30,
},
},
Limit: txnbuild.MaxTrustlineLimit,
},
&txnbuild.Payment{
SourceAccount: master.Address(),
Destination: shareAccount.GetAccountID(),
Asset: txnbuild.CreditAsset{
Code: "USD",
Issuer: master.Address(),
},
Amount: "1000",
},
)

poolID, err := xdr.NewPoolId(
xdr.MustNewNativeAsset(),
xdr.MustNewCreditAsset("USD", master.Address()),
30,
)
tt.NoError(err)
poolIDHexString := xdr.Hash(poolID).HexString()

itest.MustSubmitOperations(shareAccount, shareKeys,
&txnbuild.LiquidityPoolDeposit{
LiquidityPoolID: [32]byte(poolID),
MaxAmountA: "400",
MaxAmountB: "777",
MinPrice: "0.5",
MaxPrice: "2",
},
)

itest.MustSubmitOperations(tradeAccount, tradeKeys,
&txnbuild.ChangeTrust{
Line: txnbuild.ChangeTrustAssetWrapper{
Asset: txnbuild.CreditAsset{
Code: "USD",
Issuer: master.Address(),
},
},
Limit: txnbuild.MaxTrustlineLimit,
},
&txnbuild.PathPaymentStrictReceive{
SendAsset: txnbuild.NativeAsset{},
DestAsset: txnbuild.CreditAsset{
Code: "USD",
Issuer: master.Address(),
},
SendMax: "1000",
DestAmount: "2",
Destination: tradeKeys.Address(),
},
)

pool, err := itest.Client().LiquidityPoolDetail(horizonclient.LiquidityPoolRequest{
LiquidityPoolID: poolIDHexString,
})
tt.NoError(err)

txResp := itest.MustSubmitOperations(shareAccount, shareKeys,
&txnbuild.LiquidityPoolWithdraw{
LiquidityPoolID: [32]byte(poolID),
Amount: pool.TotalShares,
MinAmountA: "10",
MinAmountB: "20",
},
)

return txResp.Ledger
}

func generatePaymentOps(itest *integration.Test, tt *assert.Assertions) (lastLedger int32) {
txResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
&txnbuild.Payment{
Destination: itest.Master().Address(),
Amount: "10",
Asset: txnbuild.NativeAsset{},
},
)

return txResp.Ledger
}

func initializeDBIntegrationTest(t *testing.T) (itest *integration.Test, reachedLedger int32) {
config := integration.Config{ProtocolVersion: 18}
itest = integration.NewTest(t, config)
tt := assert.New(t)

// Initialize the database with some ledgers including some transactions we submit
op := txnbuild.Payment{
Destination: master.Address(),
Amount: "10",
Asset: txnbuild.NativeAsset{},
}
// TODO: should we enforce certain number of ledgers to be ingested?
for i := 0; i < 8; i++ {
txResp := itest.MustSubmitOperations(itest.MasterAccount(), master, &op)
reachedLedger = txResp.Ledger
}
generatePaymentOps(itest, tt)
reachedLedger = generateLiquidityPoolOps(itest, tt)

root, err := itest.Client().Root()
tt.NoError(err)
Expand All @@ -59,6 +164,7 @@ func TestReingestDB(t *testing.T) {
horizonConfig.DatabaseURL = freshHorizonPostgresURL
// Initialize the DB schema
dbConn, err := db.Open("postgres", freshHorizonPostgresURL)
tt.NoError(err)
defer dbConn.Close()
_, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0)
tt.NoError(err)
Expand Down Expand Up @@ -103,8 +209,8 @@ func TestReingestDB(t *testing.T) {
fmt.Sprintf("%d", toLedger),
))

// Reingest into the DB
tt.NoError(horizoncmd.RootCmd.Execute())
tt.NoError(horizoncmd.RootCmd.Execute(), "Repeat the same reingest range against db, should not have errors.")
}

func command(horizonConfig horizon.Config, args ...string) []string {
Expand Down
83 changes: 81 additions & 2 deletions services/horizon/internal/test/scenarios/base-horizon.sql
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,11 @@ INSERT INTO gorp_migrations VALUES ('41_add_sponsor_to_state_tables.sql', '2019-
INSERT INTO gorp_migrations VALUES ('45_add_claimable_balances_history.sql', '2019-11-30 14:19:49.163718+01');
INSERT INTO gorp_migrations VALUES ('46_add_muxed_accounts.sql', '2019-12-30 14:19:49.163718+01');
INSERT INTO gorp_migrations VALUES ('47_precompute_trade_aggregations.sql', '2019-12-30 14:19:49.163719+01');
INSERT INTO gorp_migrations VALUES ('48_rebuild_trade_aggregations.sql', '2021-12-02 01:33:33.428419+00');
INSERT INTO gorp_migrations VALUES ('49_add_brin_index_trade_aggregations.sql', '2021-12-02 01:33:33.43274+00');
INSERT INTO gorp_migrations VALUES ('50_liquidity_pools.sql', '2021-12-02 01:33:33.471893+00');
INSERT INTO gorp_migrations VALUES ('51_remove_ht_unused_indexes.sql', '2021-12-02 01:33:33.47903+00');



--
Expand Down Expand Up @@ -1408,8 +1413,11 @@ ALTER TABLE ONLY history_trades
ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_counter_asset_id_fkey FOREIGN KEY (counter_asset_id) REFERENCES history_assets(id);


-- needed for migrations to work
--
-- The following DDL is basically any manipulations that happen to the schema since migration 47. If you need to include
-- an update of schema for this scenario, append the 'up' portion from newest migration at end here, otherwise, tests
-- will run into potential errors when then db migration up/downs are run automatically by the tests.
--
ALTER TABLE accounts ADD sponsor TEXT;
CREATE INDEX accounts_by_sponsor ON accounts USING BTREE(sponsor);

Expand Down Expand Up @@ -1443,6 +1451,77 @@ ALTER TABLE history_operations ADD source_account_muxed varchar(69) NULL;
ALTER TABLE history_effects ADD address_muxed varchar(69) NULL;


-- mgiration 49
CREATE INDEX IF NOT EXISTS htrd_agg_timestamp_brin ON history_trades_60000 USING brin(timestamp);

-- mgiration 50
CREATE TABLE liquidity_pools (
id text NOT NULL, -- hex-encoded PoolID
type smallint NOT NULL,
fee integer NOT NULL,
trustline_count bigint NOT NULL CHECK (trustline_count > 0),
share_count bigint NOT NULL DEFAULT 0 CHECK(share_count >= 0),
asset_reserves jsonb NOT NULL,
last_modified_ledger integer NOT NULL,
deleted boolean NOT NULL DEFAULT false,
PRIMARY KEY (id)
);

CREATE INDEX liquidity_pools_by_asset_reserves ON liquidity_pools USING gin(asset_reserves jsonb_path_ops);
CREATE INDEX live_liquidity_pools ON liquidity_pools USING BTREE (deleted, last_modified_ledger);

CREATE SEQUENCE history_liquidity_pools_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;

CREATE TABLE history_liquidity_pools (
id bigint NOT NULL DEFAULT nextval('history_liquidity_pools_id_seq'::regclass),
liquidity_pool_id text NOT NULL
);

CREATE UNIQUE INDEX index_history_liquidity_pools_on_id ON history_liquidity_pools USING btree (id);
CREATE UNIQUE INDEX index_history_liquidity_pools_on_liquidity_pool_id ON history_liquidity_pools USING btree (liquidity_pool_id);

CREATE TABLE history_operation_liquidity_pools (
history_operation_id bigint NOT NULL,
history_liquidity_pool_id bigint NOT NULL
);

CREATE UNIQUE INDEX index_history_operation_liquidity_pools_on_ids ON history_operation_liquidity_pools USING btree (history_operation_id , history_liquidity_pool_id);
CREATE INDEX index_history_operation_liquidity_pools_on_operation_id ON history_operation_liquidity_pools USING btree (history_operation_id);

CREATE TABLE history_transaction_liquidity_pools (
history_transaction_id bigint NOT NULL,
history_liquidity_pool_id bigint NOT NULL
);

CREATE UNIQUE INDEX index_history_transaction_liquidity_pools_on_ids ON history_transaction_liquidity_pools USING btree (history_transaction_id , history_liquidity_pool_id);
CREATE INDEX index_history_transaction_liquidity_pools_on_transaction_id ON history_transaction_liquidity_pools USING btree (history_transaction_id);

ALTER TABLE trust_lines ADD liquidity_pool_id text;
CREATE INDEX trust_lines_by_liquidity_pool_id ON trust_lines USING BTREE(liquidity_pool_id);

DROP INDEX htrd_by_offer;
DROP INDEX htrd_counter_lookup;

ALTER TABLE history_trades DROP offer_id,
ALTER base_account_id DROP NOT NULL,
ALTER counter_account_id DROP NOT NULL,
ADD base_liquidity_pool_id bigint,
ADD counter_liquidity_pool_id bigint,
ADD liquidity_pool_fee int;

CREATE INDEX htrd_by_base_liquidity_pool_id ON history_trades USING BTREE(base_liquidity_pool_id);
CREATE INDEX htrd_by_counter_liquidity_pool_id ON history_trades USING BTREE(counter_liquidity_pool_id);

-- mgiration 51
DROP INDEX IF EXISTS by_account;
DROP INDEX IF EXISTS by_fee_account;


--
-- PostgreSQL database dump complete
--
12 changes: 6 additions & 6 deletions services/horizon/internal/test/scenarios/bindata.go

Large diffs are not rendered by default.

Loading

0 comments on commit e560a90

Please sign in to comment.