Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(twap): prune records over multiple blocks #7427

Merged
merged 11 commits into from
Feb 9, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* [#7360](https://github.com/osmosis-labs/osmosis/pull/7360) Bump cometbft-db from 0.8.0 to 0.10.0
* [#7385](https://github.com/osmosis-labs/osmosis/pull/7385) Add missing protobuf interface
* [#7427](https://github.com/osmosis-labs/osmosis/pull/7427) Prune TWAP records over multiple blocks, instead of all at once at epoch

## v23.0.0

Expand Down
20 changes: 20 additions & 0 deletions proto/osmosis/twap/v1beta1/twap_record.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,23 @@ message TwapRecord {
(gogoproto.moretags) = "yaml:\"last_error_time\""
];
}

// PruningState allows us to spread out the pruning of TWAP records over time,
// instead of pruning all at once at the end of the epoch.
message PruningState {
// is_pruning is true if the pruning process is ongoing.
// This tells the module to continue pruning the TWAP records
// at the EndBlock.
bool is_pruning = 1;
// last_kept_time is the time of the last kept TWAP record.
// This is used to determine all TWAP records that are older than
// last_kept_time and should be pruned.
google.protobuf.Timestamp last_kept_time = 2 [
(gogoproto.nullable) = false,
(gogoproto.stdtime) = true,
(gogoproto.moretags) = "yaml:\"last_kept_time\""
];
// last_key_seen is the last key of the TWAP records that were pruned
// before reaching the block's prune limit
bytes last_key_seen = 3;
}
8 changes: 2 additions & 6 deletions x/twap/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ func (k Keeper) UpdateRecords(ctx sdk.Context, poolId uint64) error {
return k.updateRecords(ctx, poolId)
}

func (k Keeper) PruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime time.Time) error {
return k.pruneRecordsBeforeTimeButNewest(ctx, lastKeptTime)
}

func (k Keeper) PruneRecords(ctx sdk.Context) error {
return k.pruneRecords(ctx)
func (k Keeper) PruneRecordsBeforeTimeButNewest(ctx sdk.Context, state types.PruningState) error {
return k.pruneRecordsBeforeTimeButNewest(ctx, state)
}

func (k Keeper) GetInterpolatedRecord(ctx sdk.Context, poolId uint64, asset0Denom string, asset1Denom string, t time.Time) (types.TwapRecord, error) {
Expand Down
29 changes: 29 additions & 0 deletions x/twap/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"

paramtypes "github.com/cosmos/cosmos-sdk/x/params/types"

Expand Down Expand Up @@ -100,3 +101,31 @@ func (k Keeper) GetGeometricStrategy() *geometric {
func (k Keeper) GetArithmeticStrategy() *arithmetic {
return &arithmetic{k}
}

// GetPruningState gets the current pruning state, which is used to determine
// whether to prune historical records in the EndBlock. This allows us to spread
// out the computational cost of pruning over time rather than all at once at epoch.
func (k Keeper) GetPruningState(ctx sdk.Context) types.PruningState {
store := ctx.KVStore(k.storeKey)
state := types.PruningState{}

bz := store.Get(types.PruningStateKey)
if bz == nil {
return state
}
err := proto.Unmarshal(bz, &state)
if err != nil {
panic(err)
}
return state
}

func (k Keeper) SetPruningState(ctx sdk.Context, state types.PruningState) {
store := ctx.KVStore(k.storeKey)

bz, err := proto.Marshal(&state)
if err != nil {
panic(err)
}
store.Set(types.PruningStateKey, bz)
}
10 changes: 7 additions & 3 deletions x/twap/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/osmosis-labs/osmosis/osmomath"
concentratedliquiditytypes "github.com/osmosis-labs/osmosis/v23/x/concentrated-liquidity/types"
gammtypes "github.com/osmosis-labs/osmosis/v23/x/gamm/types"
"github.com/osmosis-labs/osmosis/v23/x/twap/types"
epochtypes "github.com/osmosis-labs/osmosis/x/epochs/types"
)

Expand All @@ -24,9 +25,12 @@ func (k Keeper) EpochHooks() epochtypes.EpochHooks {

func (hook *epochhook) AfterEpochEnd(ctx sdk.Context, epochIdentifier string, epochNumber int64) error {
if epochIdentifier == hook.k.PruneEpochIdentifier(ctx) {
if err := hook.k.pruneRecords(ctx); err != nil {
ctx.Logger().Error("Error pruning old twaps at the epoch end", err)
}
lastKeptTime := ctx.BlockTime().Add(-hook.k.RecordHistoryKeepPeriod(ctx))
hook.k.SetPruningState(ctx, types.PruningState{
IsPruning: true,
LastKeptTime: lastKeptTime,
LastKeySeen: types.FormatHistoricalTimeIndexTWAPKey(lastKeptTime, 0, "", ""),
})
}
return nil
}
Expand Down
18 changes: 9 additions & 9 deletions x/twap/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,20 +275,20 @@ func (s *TestSuite) TestAfterEpochEnd() {
err = s.App.TwapKeeper.EpochHooks().AfterEpochEnd(s.Ctx, allEpochs[i].Identifier, int64(1))
s.Require().NoError(err)

recordsAfterEpoch, err := s.twapkeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
lastKeptTime := s.Ctx.BlockTime().Add(-s.twapkeeper.RecordHistoryKeepPeriod(s.Ctx))
pruneState := s.twapkeeper.GetPruningState(s.Ctx)

// old record should have been pruned here
// however, the newest younger than the prune threshold
// is kept.
// state entry should be set for pruning state
if allEpochs[i].Identifier == pruneEpochIdentifier {
s.Require().Equal(1, len(recordsAfterEpoch))
s.Require().Equal(newestRecord, recordsAfterEpoch[0])
s.Require().Equal(true, pruneState.IsPruning)
s.Require().Equal(lastKeptTime, pruneState.LastKeptTime)

// quit test once the record has been pruned
return
// reset pruning state to make sure other epochs do not modify it
s.twapkeeper.SetPruningState(s.Ctx, types.PruningState{})
} else { // pruning should not be triggered at first, not pruning epoch
s.Require().NoError(err)
s.Require().Equal(twapsBeforeEpoch, recordsAfterEpoch)
s.Require().Equal(false, pruneState.IsPruning)
s.Require().Equal(time.Time{}, pruneState.LastKeptTime)
}
}
}
Expand Down
20 changes: 8 additions & 12 deletions x/twap/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ func (k Keeper) EndBlock(ctx sdk.Context) {
" Skipping record update. Underlying err: %w", id, err).Error())
}
}

state := k.GetPruningState(ctx)
if state.IsPruning {
err := k.pruneRecordsBeforeTimeButNewest(ctx, state)
if err != nil {
ctx.Logger().Error("Error pruning old twaps at the end block", err)
}
}
}

// updateRecords updates all records for a given pool id.
Expand Down Expand Up @@ -195,18 +203,6 @@ func (k Keeper) updateRecord(ctx sdk.Context, record types.TwapRecord) (types.Tw
return newRecord, nil
}

// pruneRecords prunes twap records that happened earlier than recordHistoryKeepPeriod
// before current block time while preserving the most recent record before the threshold.
// Such record is preserved for each pool.
// See TWAP keeper's `pruneRecordsBeforeTimeButNewest(...)` for more details about the reasons for
// keeping this record.
func (k Keeper) pruneRecords(ctx sdk.Context) error {
recordHistoryKeepPeriod := k.RecordHistoryKeepPeriod(ctx)

lastKeptTime := ctx.BlockTime().Add(-recordHistoryKeepPeriod)
return k.pruneRecordsBeforeTimeButNewest(ctx, lastKeptTime)
}
Comment on lines -198 to -208
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helper func is no longer needed since we determine the lastKeptTime at epoch end and store in a state entry.

Copy link
Member

@ValarDragon ValarDragon Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we should still keep this function so we can unit test it? (Just move the logic into here)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you think the current test is good / simple enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can reimplement the test, I thought the state.go test tests the same thing, but I can double check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is tested in even more depth here

// TestPruneRecordsBeforeTime tests that all twap records earlier than
// current block time - given time are pruned from the store while
// the newest record for each pool before the time to keep is preserved.
func (s *TestSuite) TestPruneRecordsBeforeTimeButNewest() {
// N.B.: the records follow the following naming convention:
// <pool id><delta from base time in seconds><delta from base time in milliseconds>
// These are manually created to be able to refer to them by name
// for convenience.
// Create 6 records of 4 pools from base time, each in different pool with the difference of 1 second between them. Pool 2 is a 3 asset pool.
pool1Min2SBaseMs, pool2Min1SBaseMsAB, pool2Min1SBaseMsAC, pool2Min1SBaseMsBC, pool3BaseSecBaseMs, pool4Plus1SBaseMs := s.createTestRecordsFromTime(baseTime)
// Create 6 records of 4 pools from base time - 1 ms, each in different pool with the difference of 1 second between them. Pool 2 is a 3 asset pool.
pool1Min2SMin1Ms, pool2Min1SMin1MsAB, pool2Min1SMin1MsAC, pool2Min1SMin1MsBC, pool3BaseSecMin1Ms, pool4Plus1SMin1Ms := s.createTestRecordsFromTime(baseTime.Add(-time.Millisecond))
// Create 6 records of 4 pools from base time - 2 ms, each in different pool with the difference of 1 second between them. Pool 2 is a 3 asset pool.
pool1Min2SMin2Ms, pool2Min1SMin2MsAB, pool2Min1SMin2MsAC, pool2Min1SMin2MsBC, pool3BaseSecMin2Ms, pool4Plus1SMin2Ms := s.createTestRecordsFromTime(baseTime.Add(2 * -time.Millisecond))
// Create 6 records of 4 pools from base time - 3 ms, each in different pool with the difference of 1 second between them. Pool 2 is a 3 asset pool.
pool1Min2SMin3Ms, pool2Min1SMin3MsAB, pool2Min1SMin3MsAC, pool2Min1SMin3MsBC, pool3BaseSecMin3Ms, pool4Plus1SMin3Ms := s.createTestRecordsFromTime(baseTime.Add(3 * -time.Millisecond))
// Create 12 records in the same pool from base time , each record with the difference of 1 second between them.
pool5Min2SBaseMsAB, pool5Min2SBaseMsAC, pool5Min2SBaseMsBC,
pool5Min1SBaseMsAB, pool5Min1SBaseMsAC, pool5Min1SBaseMsBC,
pool5BaseSecBaseMsAB, pool5BaseSecBaseMsAC, pool5BaseSecBaseMsBC,
pool5Plus1SBaseMsAB, pool5Plus1SBaseMsAC, pool5Plus1SBaseMsBC := s.CreateTestRecordsFromTimeInPool(baseTime, 5)
// Create 12 records in the same pool from base time - 1 ms, each record with the difference of 1 second between them
pool5Min2SMin1MsAB, pool5Min2SMin1MsAC, pool5Min2SMin1MsBC,
pool5Min1SMin1MsAB, pool5Min1SMin1MsAC, pool5Min1SMin1MsBC,
pool5BaseSecMin1MsAB, pool5BaseSecMin1MsAC, pool5BaseSecMin1MsBC,
pool5Plus1SMin1MsAB, pool5Plus1SMin1MsAC, pool5Plus1SMin1MsBC := s.CreateTestRecordsFromTimeInPool(baseTime.Add(-time.Millisecond), 5)
tests := map[string]struct {
// order does not follow any specific pattern
// across many test cases on purpose.
recordsToPreSet []types.TwapRecord
lastKeptTime time.Time
expectedKeptRecords []types.TwapRecord
overwriteLimit uint16
}{
"base time; across pool 3; 4 records; 3 before lastKeptTime; 2 deleted and newest kept": {
recordsToPreSet: []types.TwapRecord{
pool3BaseSecMin1Ms, // base time - 1ms; kept since newest before lastKeptTime
pool3BaseSecBaseMs, // base time; kept since at lastKeptTime
pool3BaseSecMin3Ms, // base time - 3ms; deleted
pool3BaseSecMin2Ms, // base time - 2ms; deleted
},
lastKeptTime: baseTime,
expectedKeptRecords: []types.TwapRecord{pool3BaseSecMin1Ms, pool3BaseSecBaseMs},
},
"base time - 1s - 2 ms; across pool 2; 12 records; 3 before lastKeptTime; none pruned since newest kept": {
recordsToPreSet: []types.TwapRecord{
pool2Min1SMin2MsAB, pool2Min1SMin2MsAC, pool2Min1SMin2MsBC, // base time - 1s - 2ms; kept since at lastKeptTime
pool2Min1SMin1MsAB, pool2Min1SMin1MsAC, pool2Min1SMin1MsBC, // base time - 1s - 1ms; kept since older than at lastKeptTime
pool2Min1SBaseMsAB, pool2Min1SBaseMsAC, pool2Min1SBaseMsBC, // base time - 1s; kept since older than lastKeptTime
pool2Min1SMin3MsAB, pool2Min1SMin3MsAC, pool2Min1SMin3MsBC, // base time - 1s - 3ms; kept since newest before lastKeptTime
},
lastKeptTime: baseTime.Add(-time.Second).Add(2 * -time.Millisecond),
expectedKeptRecords: []types.TwapRecord{
pool2Min1SMin3MsAB, pool2Min1SMin3MsAC, pool2Min1SMin3MsBC,
pool2Min1SMin2MsAB, pool2Min1SMin2MsAC, pool2Min1SMin2MsBC,
pool2Min1SMin1MsAB, pool2Min1SMin1MsAC, pool2Min1SMin1MsBC,
pool2Min1SBaseMsAB, pool2Min1SBaseMsAC, pool2Min1SBaseMsBC,
},
},
"base time - 2s - 3 ms; across pool 1; 4 records; none before lastKeptTime; none pruned": {
recordsToPreSet: []types.TwapRecord{
pool1Min2SMin3Ms, // base time - 2s - 3ms; kept since older than lastKeptTime
pool1Min2SMin1Ms, // base time - 2s - 1ms; kept since older than lastKeptTime
pool1Min2SMin2Ms, // base time - 2s - 2ms; kept since older than lastKeptTime
pool1Min2SBaseMs, // base time - 2s; kept since older than lastKeptTime
},
lastKeptTime: baseTime.Add(2 * -time.Second).Add(3 * -time.Millisecond),
expectedKeptRecords: []types.TwapRecord{pool1Min2SMin3Ms, pool1Min2SMin2Ms, pool1Min2SMin1Ms, pool1Min2SBaseMs},
},
"base time + 1s + 1ms; across pool 4; 4 records; all before lastKeptTime; 3 deleted and newest kept": {
recordsToPreSet: []types.TwapRecord{
pool4Plus1SBaseMs, // base time + 1s; kept since newest before lastKeptTime
pool4Plus1SMin3Ms, // base time + 1s - 3ms; deleted
pool4Plus1SMin1Ms, // base time + 1s -1ms; deleted
pool4Plus1SMin2Ms, // base time + 1s - 2ms; deleted
},
lastKeptTime: baseTime.Add(time.Second).Add(time.Millisecond),
expectedKeptRecords: []types.TwapRecord{pool4Plus1SBaseMs},
},
"base time; across pool 3 and pool 5; pool 3: 4 total records; 3 before lastKeptTime; 2 deleted and newest 2 kept. pool 5: 24 total records; 12 before lastKeptTime; 12 deleted and 12 kept": {
recordsToPreSet: []types.TwapRecord{
pool3BaseSecMin3Ms, // base time - 3ms; deleted
pool3BaseSecMin2Ms, // base time - 2ms; deleted
pool3BaseSecMin1Ms, // base time - 1ms; kept since newest before lastKeptTime
pool3BaseSecBaseMs, // base time; kept since at lastKeptTime
pool5Min2SBaseMsAB, pool5Min2SBaseMsAC, pool5Min2SBaseMsBC, // base time - 2s; deleted
pool5Min1SBaseMsAB, pool5Min1SBaseMsAC, pool5Min1SBaseMsBC, // base time - 1s; ; deleted
pool5BaseSecBaseMsAB, pool5BaseSecBaseMsAC, pool5BaseSecBaseMsBC, // base time; kept since at lastKeptTime
pool5Plus1SBaseMsAB, pool5Plus1SBaseMsAC, pool5Plus1SBaseMsBC, // base time + 1s; kept since older than lastKeptTime
pool5Min2SMin1MsAB, pool5Min2SMin1MsAC, pool5Min2SMin1MsBC, // base time - 2s - 1ms; deleted
pool5Min1SMin1MsAB, pool5Min1SMin1MsAC, pool5Min1SMin1MsBC, // base time - 1s - 1ms; deleted
pool5BaseSecMin1MsAB, pool5BaseSecMin1MsAC, pool5BaseSecMin1MsBC, // base time - 1ms; kept since newest before lastKeptTime
pool5Plus1SMin1MsAB, pool5Plus1SMin1MsAC, pool5Plus1SMin1MsBC, // base time + 1s - 1ms; kept since older than lastKeptTime
},
lastKeptTime: baseTime,
expectedKeptRecords: []types.TwapRecord{
pool3BaseSecMin1Ms,
pool5BaseSecMin1MsAB, pool5BaseSecMin1MsAC, pool5BaseSecMin1MsBC,
pool3BaseSecBaseMs,
pool5BaseSecBaseMsAB, pool5BaseSecBaseMsAC, pool5BaseSecBaseMsBC,
pool5Plus1SMin1MsAB, pool5Plus1SMin1MsAC, pool5Plus1SMin1MsBC,
pool5Plus1SBaseMsAB, pool5Plus1SBaseMsAC, pool5Plus1SBaseMsBC,
},
},
"base time - 1s - 2 ms; all pools; all test records": {
recordsToPreSet: []types.TwapRecord{
pool3BaseSecMin3Ms, // base time - 3ms; kept since older
pool3BaseSecMin2Ms, // base time - 2ms; kept since older
pool3BaseSecMin1Ms, // base time - 1ms; kept since older
pool3BaseSecBaseMs, // base time; kept since older
pool2Min1SMin3MsAB, pool2Min1SMin3MsAC, pool2Min1SMin3MsBC, // base time - 1s - 3ms; kept since newest before lastKeptTime
pool2Min1SMin2MsAB, pool2Min1SMin2MsAC, pool2Min1SMin2MsBC, // base time - 1s - 2ms; kept since at lastKeptTime
pool2Min1SMin1MsAB, pool2Min1SMin1MsAC, pool2Min1SMin1MsBC, // base time - 1s - 1ms; kept since older
pool2Min1SBaseMsAB, pool2Min1SBaseMsAC, pool2Min1SBaseMsBC, // base time - 1s; kept since older
pool1Min2SMin3Ms, // base time - 2s - 3ms; deleted
pool1Min2SMin2Ms, // base time - 2s - 2ms; deleted
pool1Min2SMin1Ms, // base time - 2s - 1ms; deleted
pool1Min2SBaseMs, // base time - 2s; kept since newest before lastKeptTime
pool4Plus1SMin3Ms, // base time + 1s - 3ms; kept since older
pool4Plus1SMin2Ms, // base time + 1s - 2ms; kept since older
pool4Plus1SMin1Ms, // base time + 1s -1ms; kept since older
pool4Plus1SBaseMs, // base time + 1s; kept since older
pool5Min2SBaseMsAB, pool5Min2SBaseMsAC, pool5Min2SBaseMsBC, // base time - 2s; kept since newest before lastKeptTime
pool5Min1SBaseMsAB, pool5Min1SBaseMsAC, pool5Min1SBaseMsBC, // base time - 1s; kept since older
pool5BaseSecBaseMsAB, pool5BaseSecBaseMsAC, pool5BaseSecBaseMsBC, // base time; kept since older
pool5Plus1SBaseMsAB, pool5Plus1SBaseMsAC, pool5Plus1SBaseMsBC, // base time + 1s; kept since older
pool5Min2SMin1MsAB, pool5Min2SMin1MsAC, pool5Min2SMin1MsBC, // base time - 2s - 1ms; deleted
pool5Min1SMin1MsAB, pool5Min1SMin1MsAC, pool5Min1SMin1MsBC, // base time - 1s - 1ms; kept since older
pool5BaseSecMin1MsAB, pool5BaseSecMin1MsAC, pool5BaseSecMin1MsBC, // base time - 1ms; kept since older
pool5Plus1SMin1MsAB, pool5Plus1SMin1MsAC, pool5Plus1SMin1MsBC, // base time + 1s - 1ms; kept since older
},
lastKeptTime: baseTime.Add(-time.Second).Add(2 * -time.Millisecond),
expectedKeptRecords: []types.TwapRecord{
pool1Min2SBaseMs, // base time - 2s; kept since newest before lastKeptTime
pool5Min2SBaseMsAB, pool5Min2SBaseMsAC, pool5Min2SBaseMsBC, // base time - 2s; kept since newest before lastKeptTime
pool2Min1SMin3MsAB, pool2Min1SMin3MsAC, pool2Min1SMin3MsBC, // base time - 1s - 3ms; kept since newest before lastKeptTime
pool2Min1SMin2MsAB, pool2Min1SMin2MsAC, pool2Min1SMin2MsBC, // base time - 1s - 2ms; kept since at lastKeptTime
pool2Min1SMin1MsAB, pool2Min1SMin1MsAC, pool2Min1SMin1MsBC, // base time - 1s - 1ms; kept since older
pool5Min1SMin1MsAB, pool5Min1SMin1MsAC, pool5Min1SMin1MsBC, // base time - 1s - 1ms; kept since older
pool2Min1SBaseMsAB, pool2Min1SBaseMsAC, pool2Min1SBaseMsBC, // base time - 1s; kept since older
pool5Min1SBaseMsAB, pool5Min1SBaseMsAC, pool5Min1SBaseMsBC, // base time - 1s; kept since older
pool3BaseSecMin3Ms, // base time - 3ms; kept since older
pool3BaseSecMin2Ms, // base time - 2ms; kept since older
pool3BaseSecMin1Ms, // base time - 1ms; kept since older
pool5BaseSecMin1MsAB, pool5BaseSecMin1MsAC, pool5BaseSecMin1MsBC, // base time - 1ms; kept since older
pool3BaseSecBaseMs, // base time; kept since older
pool5BaseSecBaseMsAB, pool5BaseSecBaseMsAC, pool5BaseSecBaseMsBC, // base time; kept since older
pool4Plus1SMin3Ms, // base time + 1s - 3ms; kept since older
pool4Plus1SMin2Ms, // base time + 1s - 2ms; kept since older
pool4Plus1SMin1Ms, // base time + 1s -1ms; kept since older
pool5Plus1SMin1MsAB, pool5Plus1SMin1MsAC, pool5Plus1SMin1MsBC, // base time + 1s - 1ms; kept since older
pool4Plus1SBaseMs, // base time + 1s; kept since older
pool5Plus1SBaseMsAB, pool5Plus1SBaseMsAC, pool5Plus1SBaseMsBC, // base time + 1s; kept since older
},
},
"no pre-set records - no error": {
recordsToPreSet: []types.TwapRecord{},
lastKeptTime: baseTime,
expectedKeptRecords: []types.TwapRecord{},
},
"base time; across pool 3; 4 records; 3 before lastKeptTime; only 1 deleted due to limit set to 1": {
recordsToPreSet: []types.TwapRecord{
pool3BaseSecMin1Ms, // base time - 1ms; kept since newest before lastKeptTime
pool3BaseSecBaseMs, // base time; kept since at lastKeptTime
pool3BaseSecMin3Ms, // base time - 3ms; in queue for deletion
pool3BaseSecMin2Ms, // base time - 2ms; deleted
},
lastKeptTime: baseTime,
expectedKeptRecords: []types.TwapRecord{pool3BaseSecMin3Ms, pool3BaseSecMin1Ms, pool3BaseSecBaseMs},
overwriteLimit: 1,
},
}
for name, tc := range tests {
s.Run(name, func() {
s.SetupTest()
s.preSetRecords(tc.recordsToPreSet)
ctx := s.Ctx
twapKeeper := s.twapkeeper
if tc.overwriteLimit != 0 {
originalLimit := twap.NumRecordsToPrunePerBlock
defer func() {
twap.NumRecordsToPrunePerBlock = originalLimit
}()
twap.NumRecordsToPrunePerBlock = tc.overwriteLimit
}
state := types.PruningState{
IsPruning: true,
LastKeptTime: tc.lastKeptTime,
LastKeySeen: types.FormatHistoricalTimeIndexTWAPKey(tc.lastKeptTime, 0, "", ""),
}
err := twapKeeper.PruneRecordsBeforeTimeButNewest(ctx, state)
s.Require().NoError(err)
s.validateExpectedRecords(tc.expectedKeptRecords)
})
}
}

Deleting the func along with the test is fine imo


// recordWithUpdatedAccumulators returns a record, with updated accumulator values and time for provided newTime,
// otherwise referred to as "interpolating the record" to the target time.
// This does not mutate the passed in record.
Expand Down
56 changes: 0 additions & 56 deletions x/twap/logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,62 +572,6 @@ type computeThreeAssetArithmeticTwapTestCase struct {
expErr bool
}

// TestPruneRecords tests that twap records earlier than
// current block time - RecordHistoryKeepPeriod are pruned from the store
// while keeping the newest record before the above time threshold.
// Such record is kept for each pool.
func (s *TestSuite) TestPruneRecords() {
recordHistoryKeepPeriod := s.twapkeeper.RecordHistoryKeepPeriod(s.Ctx)

pool1OlderMin2MsRecord, // deleted
pool2OlderMin1MsRecordAB, pool2OlderMin1MsRecordAC, pool2OlderMin1MsRecordBC, // deleted
pool3OlderBaseRecord, // kept as newest under keep period
pool4OlderPlus1Record := // kept as newest under keep period
s.createTestRecordsFromTime(baseTime.Add(2 * -recordHistoryKeepPeriod))

pool1Min2MsRecord, // kept as newest under keep period
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC, // kept as newest under keep period
pool3BaseRecord, // kept as it is at the keep period boundary
pool4Plus1Record := // kept as it is above the keep period boundary
s.createTestRecordsFromTime(baseTime.Add(-recordHistoryKeepPeriod))

// non-ascending insertion order.
recordsToPreSet := []types.TwapRecord{
pool2OlderMin1MsRecordAB, pool2OlderMin1MsRecordAC, pool2OlderMin1MsRecordBC,
pool4Plus1Record,
pool4OlderPlus1Record,
pool3OlderBaseRecord,
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC,
pool3BaseRecord,
pool1Min2MsRecord,
pool1OlderMin2MsRecord,
}

// tMin2Record is before the threshold and is pruned away.
// tmin1Record is the newest record before current block time - record history keep period.
// All other records happen after the threshold and are kept.
expectedKeptRecords := []types.TwapRecord{
pool3OlderBaseRecord,
pool4OlderPlus1Record,
pool1Min2MsRecord,
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC,
pool3BaseRecord,
pool4Plus1Record,
}
s.SetupTest()
s.preSetRecords(recordsToPreSet)

ctx := s.Ctx
twapKeeper := s.twapkeeper

ctx = ctx.WithBlockTime(baseTime)

err := twapKeeper.PruneRecords(ctx)
s.Require().NoError(err)

s.validateExpectedRecords(expectedKeptRecords)
}

// TestUpdateRecords tests that the records are updated correctly.
// It tests the following:
// - two-asset pools
Expand Down
38 changes: 36 additions & 2 deletions x/twap/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"github.com/osmosis-labs/osmosis/v23/x/twap/types"
)

// NumRecordsToPrunePerBlock is the number of records to prune per block.
// Two records are deleted per incentive record:
// 1. by time index
// 2. by pool index
// Therefore, setting this to 1000 means 500 complete incentive records are deleted per block.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: twap records

Copy link
Member

@p0mvn p0mvn Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we elaborate on the choice here? Why 1000/500?

var NumRecordsToPrunePerBlock uint16 = 1000

type timeTooOldError struct {
Time time.Time
}
Expand Down Expand Up @@ -73,15 +80,22 @@ func (k Keeper) StoreHistoricalTWAP(ctx sdk.Context, twap types.TwapRecord) {
// So, in order to have correct behavior for the desired guarantee,
// we keep the newest record that is older than the pruning time.
// This is why we would keep the -50 hour and -1hour twaps despite a 48hr pruning period
func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime time.Time) error {
//
// If we reach the per block pruning limit, we store the last key seen in the pruning state.
// This is so that we can continue pruning from where we left off in the next block.
// If we have pruned all records, we set the pruning state to not pruning.
// There is a small bug here where we store more seenPoolAssetTriplets than we need to.
// Issue added here: https://github.com/osmosis-labs/osmosis/issues/7435
// The bloat is minimal though, and is not at risk of getting out of hand.
func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, state types.PruningState) error {
store := ctx.KVStore(k.storeKey)

// Reverse iterator guarantees that we iterate through the newest per pool first.
// Due to how it is indexed, we will only iterate times starting from
// lastKeptTime exclusively down to the oldest record.
iter := store.ReverseIterator(
[]byte(types.HistoricalTWAPTimeIndexPrefix),
types.FormatHistoricalTimeIndexTWAPKey(lastKeptTime, 0, "", ""))
state.LastKeySeen)
defer iter.Close()

// We mark what (pool id, asset 0, asset 1) triplets we've seen.
Expand All @@ -93,6 +107,8 @@ func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime ti
}
seenPoolAssetTriplets := map[uniqueTriplet]struct{}{}
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved

var numPruned uint16

for ; iter.Valid(); iter.Next() {
timeIndexKey := iter.Key()
timeS, poolId, asset0, asset1, err := types.ParseFieldsFromHistoricalTimeKey(timeIndexKey)
Expand All @@ -117,6 +133,24 @@ func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime ti
store.Delete(timeIndexKey)
poolIndexKey := types.FormatHistoricalPoolIndexTWAPKeyFromStrTime(poolId, asset0, asset1, timeS)
store.Delete(poolIndexKey)

// Increment the number of records pruned by 2, since we delete two records per iteration.
numPruned += 2

if numPruned >= NumRecordsToPrunePerBlock {
// We have hit the limit, so we stop pruning.
break
}
}

if !iter.Valid() {
// The iterator is exhausted, so we have pruned all records.
state.IsPruning = false
k.SetPruningState(ctx, state)
} else {
// We have not pruned all records, so we update the last key seen.
state.LastKeySeen = iter.Key()
k.SetPruningState(ctx, state)
Comment on lines +151 to +153
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may ahve missed it, but I don't think we have any logic testing this code path. (or rather that keys from this are working as we want)

From reading the code, this loosk right to me!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't, I will add before merge

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test shows pruning happening over multiple blocks f84a716

It verifies

  1. We see the extra record that we save due to the bug
  2. Pruning eventually gets set to false when we iterate over records
  3. The correct records get pruned

@ValarDragon

}
return nil
}
Expand Down
Loading