Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(twap): prune records over multiple blocks #7427

Merged
merged 11 commits into from
Feb 9, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* [#7360](https://github.com/osmosis-labs/osmosis/pull/7360) Bump cometbft-db from 0.8.0 to 0.10.0
* [#7385](https://github.com/osmosis-labs/osmosis/pull/7385) Add missing protobuf interface
* [#7427](https://github.com/osmosis-labs/osmosis/pull/7427) Prune TWAP records over multiple blocks, instead of all at once at epoch

## v22.0.5

Expand Down
17 changes: 17 additions & 0 deletions proto/osmosis/twap/v1beta1/twap_record.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,20 @@ message TwapRecord {
(gogoproto.moretags) = "yaml:\"last_error_time\""
];
}

// PruningState allows us to spread out the pruning of TWAP records over time,
// instead of pruning all at once at the end of the epoch.
message PruningState {
// is_pruning is true if the pruning process is ongoing.
// This tells the module to continue pruning the TWAP records
// at the EndBlock.
bool is_pruning = 1;
// last_kept_time is the time of the last kept TWAP record.
// This is used to determine all TWAP records that are older than
// last_kept_time and should be pruned.
google.protobuf.Timestamp last_kept_time = 2 [
(gogoproto.nullable) = false,
(gogoproto.stdtime) = true,
(gogoproto.moretags) = "yaml:\"last_kept_time\""
];
}
4 changes: 0 additions & 4 deletions x/twap/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ func (k Keeper) PruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime ti
return k.pruneRecordsBeforeTimeButNewest(ctx, lastKeptTime)
}

func (k Keeper) PruneRecords(ctx sdk.Context) error {
return k.pruneRecords(ctx)
}

func (k Keeper) GetInterpolatedRecord(ctx sdk.Context, poolId uint64, asset0Denom string, asset1Denom string, t time.Time) (types.TwapRecord, error) {
return k.getInterpolatedRecord(ctx, poolId, t, asset0Denom, asset1Denom)
}
Expand Down
29 changes: 29 additions & 0 deletions x/twap/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"

paramtypes "github.com/cosmos/cosmos-sdk/x/params/types"

Expand Down Expand Up @@ -100,3 +101,31 @@ func (k Keeper) GetGeometricStrategy() *geometric {
func (k Keeper) GetArithmeticStrategy() *arithmetic {
return &arithmetic{k}
}

// GetPruningState gets the current pruning state, which is used to determine
// whether to prune historical records in the EndBlock. This allows us to spread
// out the computational cost of pruning over time rather than all at once at epoch.
func (k Keeper) GetPruningState(ctx sdk.Context) types.PruningState {
store := ctx.KVStore(k.storeKey)
state := types.PruningState{}

bz := store.Get(types.PruningStateKey)
if bz == nil {
return state
}
err := proto.Unmarshal(bz, &state)
if err != nil {
panic(err)
}
return state
}

func (k Keeper) SetPruningState(ctx sdk.Context, state types.PruningState) {
store := ctx.KVStore(k.storeKey)

bz, err := proto.Marshal(&state)
if err != nil {
panic(err)
}
store.Set(types.PruningStateKey, bz)
}
9 changes: 6 additions & 3 deletions x/twap/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/osmosis-labs/osmosis/osmomath"
concentratedliquiditytypes "github.com/osmosis-labs/osmosis/v22/x/concentrated-liquidity/types"
gammtypes "github.com/osmosis-labs/osmosis/v22/x/gamm/types"
"github.com/osmosis-labs/osmosis/v22/x/twap/types"
epochtypes "github.com/osmosis-labs/osmosis/x/epochs/types"
)

Expand All @@ -24,9 +25,11 @@ func (k Keeper) EpochHooks() epochtypes.EpochHooks {

func (hook *epochhook) AfterEpochEnd(ctx sdk.Context, epochIdentifier string, epochNumber int64) error {
if epochIdentifier == hook.k.PruneEpochIdentifier(ctx) {
if err := hook.k.pruneRecords(ctx); err != nil {
ctx.Logger().Error("Error pruning old twaps at the epoch end", err)
}
lastKeptTime := ctx.BlockTime().Add(-hook.k.RecordHistoryKeepPeriod(ctx))
hook.k.SetPruningState(ctx, types.PruningState{
IsPruning: true,
LastKeptTime: lastKeptTime,
})
}
return nil
}
Expand Down
20 changes: 8 additions & 12 deletions x/twap/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ func (k Keeper) EndBlock(ctx sdk.Context) {
" Skipping record update. Underlying err: %w", id, err).Error())
}
}

state := k.GetPruningState(ctx)
if state.IsPruning {
err := k.pruneRecordsBeforeTimeButNewest(ctx, state.LastKeptTime)
if err != nil {
ctx.Logger().Error("Error pruning old twaps at the end block", err)
}
}
}

// updateRecords updates all records for a given pool id.
Expand Down Expand Up @@ -195,18 +203,6 @@ func (k Keeper) updateRecord(ctx sdk.Context, record types.TwapRecord) (types.Tw
return newRecord, nil
}

// pruneRecords prunes twap records that happened earlier than recordHistoryKeepPeriod
// before current block time while preserving the most recent record before the threshold.
// Such record is preserved for each pool.
// See TWAP keeper's `pruneRecordsBeforeTimeButNewest(...)` for more details about the reasons for
// keeping this record.
func (k Keeper) pruneRecords(ctx sdk.Context) error {
recordHistoryKeepPeriod := k.RecordHistoryKeepPeriod(ctx)

lastKeptTime := ctx.BlockTime().Add(-recordHistoryKeepPeriod)
return k.pruneRecordsBeforeTimeButNewest(ctx, lastKeptTime)
}
Comment on lines -198 to -208
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helper func is no longer needed since we determine the lastKeptTime at epoch end and store in a state entry.

Copy link
Member

@ValarDragon ValarDragon Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we should still keep this function so we can unit test it? (Just move the logic into here)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you think the current test is good / simple enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can reimplement the test, I thought the state.go test tests the same thing, but I can double check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is tested in even more depth here

// TestPruneRecordsBeforeTime tests that all twap records earlier than
// current block time - given time are pruned from the store while
// the newest record for each pool before the time to keep is preserved.
func (s *TestSuite) TestPruneRecordsBeforeTimeButNewest() {
// N.B.: the records follow the following naming convention:
// <pool id><delta from base time in seconds><delta from base time in milliseconds>
// These are manually created to be able to refer to them by name
// for convenience.
// Create 6 records of 4 pools from base time, each in different pool with the difference of 1 second between them. Pool 2 is a 3 asset pool.
pool1Min2SBaseMs, pool2Min1SBaseMsAB, pool2Min1SBaseMsAC, pool2Min1SBaseMsBC, pool3BaseSecBaseMs, pool4Plus1SBaseMs := s.createTestRecordsFromTime(baseTime)
// Create 6 records of 4 pools from base time - 1 ms, each in different pool with the difference of 1 second between them. Pool 2 is a 3 asset pool.
pool1Min2SMin1Ms, pool2Min1SMin1MsAB, pool2Min1SMin1MsAC, pool2Min1SMin1MsBC, pool3BaseSecMin1Ms, pool4Plus1SMin1Ms := s.createTestRecordsFromTime(baseTime.Add(-time.Millisecond))
// Create 6 records of 4 pools from base time - 2 ms, each in different pool with the difference of 1 second between them. Pool 2 is a 3 asset pool.
pool1Min2SMin2Ms, pool2Min1SMin2MsAB, pool2Min1SMin2MsAC, pool2Min1SMin2MsBC, pool3BaseSecMin2Ms, pool4Plus1SMin2Ms := s.createTestRecordsFromTime(baseTime.Add(2 * -time.Millisecond))
// Create 6 records of 4 pools from base time - 3 ms, each in different pool with the difference of 1 second between them. Pool 2 is a 3 asset pool.
pool1Min2SMin3Ms, pool2Min1SMin3MsAB, pool2Min1SMin3MsAC, pool2Min1SMin3MsBC, pool3BaseSecMin3Ms, pool4Plus1SMin3Ms := s.createTestRecordsFromTime(baseTime.Add(3 * -time.Millisecond))
// Create 12 records in the same pool from base time , each record with the difference of 1 second between them.
pool5Min2SBaseMsAB, pool5Min2SBaseMsAC, pool5Min2SBaseMsBC,
pool5Min1SBaseMsAB, pool5Min1SBaseMsAC, pool5Min1SBaseMsBC,
pool5BaseSecBaseMsAB, pool5BaseSecBaseMsAC, pool5BaseSecBaseMsBC,
pool5Plus1SBaseMsAB, pool5Plus1SBaseMsAC, pool5Plus1SBaseMsBC := s.CreateTestRecordsFromTimeInPool(baseTime, 5)
// Create 12 records in the same pool from base time - 1 ms, each record with the difference of 1 second between them
pool5Min2SMin1MsAB, pool5Min2SMin1MsAC, pool5Min2SMin1MsBC,
pool5Min1SMin1MsAB, pool5Min1SMin1MsAC, pool5Min1SMin1MsBC,
pool5BaseSecMin1MsAB, pool5BaseSecMin1MsAC, pool5BaseSecMin1MsBC,
pool5Plus1SMin1MsAB, pool5Plus1SMin1MsAC, pool5Plus1SMin1MsBC := s.CreateTestRecordsFromTimeInPool(baseTime.Add(-time.Millisecond), 5)
tests := map[string]struct {
// order does not follow any specific pattern
// across many test cases on purpose.
recordsToPreSet []types.TwapRecord
lastKeptTime time.Time
expectedKeptRecords []types.TwapRecord
overwriteLimit uint16
}{
"base time; across pool 3; 4 records; 3 before lastKeptTime; 2 deleted and newest kept": {
recordsToPreSet: []types.TwapRecord{
pool3BaseSecMin1Ms, // base time - 1ms; kept since newest before lastKeptTime
pool3BaseSecBaseMs, // base time; kept since at lastKeptTime
pool3BaseSecMin3Ms, // base time - 3ms; deleted
pool3BaseSecMin2Ms, // base time - 2ms; deleted
},
lastKeptTime: baseTime,
expectedKeptRecords: []types.TwapRecord{pool3BaseSecMin1Ms, pool3BaseSecBaseMs},
},
"base time - 1s - 2 ms; across pool 2; 12 records; 3 before lastKeptTime; none pruned since newest kept": {
recordsToPreSet: []types.TwapRecord{
pool2Min1SMin2MsAB, pool2Min1SMin2MsAC, pool2Min1SMin2MsBC, // base time - 1s - 2ms; kept since at lastKeptTime
pool2Min1SMin1MsAB, pool2Min1SMin1MsAC, pool2Min1SMin1MsBC, // base time - 1s - 1ms; kept since older than at lastKeptTime
pool2Min1SBaseMsAB, pool2Min1SBaseMsAC, pool2Min1SBaseMsBC, // base time - 1s; kept since older than lastKeptTime
pool2Min1SMin3MsAB, pool2Min1SMin3MsAC, pool2Min1SMin3MsBC, // base time - 1s - 3ms; kept since newest before lastKeptTime
},
lastKeptTime: baseTime.Add(-time.Second).Add(2 * -time.Millisecond),
expectedKeptRecords: []types.TwapRecord{
pool2Min1SMin3MsAB, pool2Min1SMin3MsAC, pool2Min1SMin3MsBC,
pool2Min1SMin2MsAB, pool2Min1SMin2MsAC, pool2Min1SMin2MsBC,
pool2Min1SMin1MsAB, pool2Min1SMin1MsAC, pool2Min1SMin1MsBC,
pool2Min1SBaseMsAB, pool2Min1SBaseMsAC, pool2Min1SBaseMsBC,
},
},
"base time - 2s - 3 ms; across pool 1; 4 records; none before lastKeptTime; none pruned": {
recordsToPreSet: []types.TwapRecord{
pool1Min2SMin3Ms, // base time - 2s - 3ms; kept since older than lastKeptTime
pool1Min2SMin1Ms, // base time - 2s - 1ms; kept since older than lastKeptTime
pool1Min2SMin2Ms, // base time - 2s - 2ms; kept since older than lastKeptTime
pool1Min2SBaseMs, // base time - 2s; kept since older than lastKeptTime
},
lastKeptTime: baseTime.Add(2 * -time.Second).Add(3 * -time.Millisecond),
expectedKeptRecords: []types.TwapRecord{pool1Min2SMin3Ms, pool1Min2SMin2Ms, pool1Min2SMin1Ms, pool1Min2SBaseMs},
},
"base time + 1s + 1ms; across pool 4; 4 records; all before lastKeptTime; 3 deleted and newest kept": {
recordsToPreSet: []types.TwapRecord{
pool4Plus1SBaseMs, // base time + 1s; kept since newest before lastKeptTime
pool4Plus1SMin3Ms, // base time + 1s - 3ms; deleted
pool4Plus1SMin1Ms, // base time + 1s -1ms; deleted
pool4Plus1SMin2Ms, // base time + 1s - 2ms; deleted
},
lastKeptTime: baseTime.Add(time.Second).Add(time.Millisecond),
expectedKeptRecords: []types.TwapRecord{pool4Plus1SBaseMs},
},
"base time; across pool 3 and pool 5; pool 3: 4 total records; 3 before lastKeptTime; 2 deleted and newest 2 kept. pool 5: 24 total records; 12 before lastKeptTime; 12 deleted and 12 kept": {
recordsToPreSet: []types.TwapRecord{
pool3BaseSecMin3Ms, // base time - 3ms; deleted
pool3BaseSecMin2Ms, // base time - 2ms; deleted
pool3BaseSecMin1Ms, // base time - 1ms; kept since newest before lastKeptTime
pool3BaseSecBaseMs, // base time; kept since at lastKeptTime
pool5Min2SBaseMsAB, pool5Min2SBaseMsAC, pool5Min2SBaseMsBC, // base time - 2s; deleted
pool5Min1SBaseMsAB, pool5Min1SBaseMsAC, pool5Min1SBaseMsBC, // base time - 1s; ; deleted
pool5BaseSecBaseMsAB, pool5BaseSecBaseMsAC, pool5BaseSecBaseMsBC, // base time; kept since at lastKeptTime
pool5Plus1SBaseMsAB, pool5Plus1SBaseMsAC, pool5Plus1SBaseMsBC, // base time + 1s; kept since older than lastKeptTime
pool5Min2SMin1MsAB, pool5Min2SMin1MsAC, pool5Min2SMin1MsBC, // base time - 2s - 1ms; deleted
pool5Min1SMin1MsAB, pool5Min1SMin1MsAC, pool5Min1SMin1MsBC, // base time - 1s - 1ms; deleted
pool5BaseSecMin1MsAB, pool5BaseSecMin1MsAC, pool5BaseSecMin1MsBC, // base time - 1ms; kept since newest before lastKeptTime
pool5Plus1SMin1MsAB, pool5Plus1SMin1MsAC, pool5Plus1SMin1MsBC, // base time + 1s - 1ms; kept since older than lastKeptTime
},
lastKeptTime: baseTime,
expectedKeptRecords: []types.TwapRecord{
pool3BaseSecMin1Ms,
pool5BaseSecMin1MsAB, pool5BaseSecMin1MsAC, pool5BaseSecMin1MsBC,
pool3BaseSecBaseMs,
pool5BaseSecBaseMsAB, pool5BaseSecBaseMsAC, pool5BaseSecBaseMsBC,
pool5Plus1SMin1MsAB, pool5Plus1SMin1MsAC, pool5Plus1SMin1MsBC,
pool5Plus1SBaseMsAB, pool5Plus1SBaseMsAC, pool5Plus1SBaseMsBC,
},
},
"base time - 1s - 2 ms; all pools; all test records": {
recordsToPreSet: []types.TwapRecord{
pool3BaseSecMin3Ms, // base time - 3ms; kept since older
pool3BaseSecMin2Ms, // base time - 2ms; kept since older
pool3BaseSecMin1Ms, // base time - 1ms; kept since older
pool3BaseSecBaseMs, // base time; kept since older
pool2Min1SMin3MsAB, pool2Min1SMin3MsAC, pool2Min1SMin3MsBC, // base time - 1s - 3ms; kept since newest before lastKeptTime
pool2Min1SMin2MsAB, pool2Min1SMin2MsAC, pool2Min1SMin2MsBC, // base time - 1s - 2ms; kept since at lastKeptTime
pool2Min1SMin1MsAB, pool2Min1SMin1MsAC, pool2Min1SMin1MsBC, // base time - 1s - 1ms; kept since older
pool2Min1SBaseMsAB, pool2Min1SBaseMsAC, pool2Min1SBaseMsBC, // base time - 1s; kept since older
pool1Min2SMin3Ms, // base time - 2s - 3ms; deleted
pool1Min2SMin2Ms, // base time - 2s - 2ms; deleted
pool1Min2SMin1Ms, // base time - 2s - 1ms; deleted
pool1Min2SBaseMs, // base time - 2s; kept since newest before lastKeptTime
pool4Plus1SMin3Ms, // base time + 1s - 3ms; kept since older
pool4Plus1SMin2Ms, // base time + 1s - 2ms; kept since older
pool4Plus1SMin1Ms, // base time + 1s -1ms; kept since older
pool4Plus1SBaseMs, // base time + 1s; kept since older
pool5Min2SBaseMsAB, pool5Min2SBaseMsAC, pool5Min2SBaseMsBC, // base time - 2s; kept since newest before lastKeptTime
pool5Min1SBaseMsAB, pool5Min1SBaseMsAC, pool5Min1SBaseMsBC, // base time - 1s; kept since older
pool5BaseSecBaseMsAB, pool5BaseSecBaseMsAC, pool5BaseSecBaseMsBC, // base time; kept since older
pool5Plus1SBaseMsAB, pool5Plus1SBaseMsAC, pool5Plus1SBaseMsBC, // base time + 1s; kept since older
pool5Min2SMin1MsAB, pool5Min2SMin1MsAC, pool5Min2SMin1MsBC, // base time - 2s - 1ms; deleted
pool5Min1SMin1MsAB, pool5Min1SMin1MsAC, pool5Min1SMin1MsBC, // base time - 1s - 1ms; kept since older
pool5BaseSecMin1MsAB, pool5BaseSecMin1MsAC, pool5BaseSecMin1MsBC, // base time - 1ms; kept since older
pool5Plus1SMin1MsAB, pool5Plus1SMin1MsAC, pool5Plus1SMin1MsBC, // base time + 1s - 1ms; kept since older
},
lastKeptTime: baseTime.Add(-time.Second).Add(2 * -time.Millisecond),
expectedKeptRecords: []types.TwapRecord{
pool1Min2SBaseMs, // base time - 2s; kept since newest before lastKeptTime
pool5Min2SBaseMsAB, pool5Min2SBaseMsAC, pool5Min2SBaseMsBC, // base time - 2s; kept since newest before lastKeptTime
pool2Min1SMin3MsAB, pool2Min1SMin3MsAC, pool2Min1SMin3MsBC, // base time - 1s - 3ms; kept since newest before lastKeptTime
pool2Min1SMin2MsAB, pool2Min1SMin2MsAC, pool2Min1SMin2MsBC, // base time - 1s - 2ms; kept since at lastKeptTime
pool2Min1SMin1MsAB, pool2Min1SMin1MsAC, pool2Min1SMin1MsBC, // base time - 1s - 1ms; kept since older
pool5Min1SMin1MsAB, pool5Min1SMin1MsAC, pool5Min1SMin1MsBC, // base time - 1s - 1ms; kept since older
pool2Min1SBaseMsAB, pool2Min1SBaseMsAC, pool2Min1SBaseMsBC, // base time - 1s; kept since older
pool5Min1SBaseMsAB, pool5Min1SBaseMsAC, pool5Min1SBaseMsBC, // base time - 1s; kept since older
pool3BaseSecMin3Ms, // base time - 3ms; kept since older
pool3BaseSecMin2Ms, // base time - 2ms; kept since older
pool3BaseSecMin1Ms, // base time - 1ms; kept since older
pool5BaseSecMin1MsAB, pool5BaseSecMin1MsAC, pool5BaseSecMin1MsBC, // base time - 1ms; kept since older
pool3BaseSecBaseMs, // base time; kept since older
pool5BaseSecBaseMsAB, pool5BaseSecBaseMsAC, pool5BaseSecBaseMsBC, // base time; kept since older
pool4Plus1SMin3Ms, // base time + 1s - 3ms; kept since older
pool4Plus1SMin2Ms, // base time + 1s - 2ms; kept since older
pool4Plus1SMin1Ms, // base time + 1s -1ms; kept since older
pool5Plus1SMin1MsAB, pool5Plus1SMin1MsAC, pool5Plus1SMin1MsBC, // base time + 1s - 1ms; kept since older
pool4Plus1SBaseMs, // base time + 1s; kept since older
pool5Plus1SBaseMsAB, pool5Plus1SBaseMsAC, pool5Plus1SBaseMsBC, // base time + 1s; kept since older
},
},
"no pre-set records - no error": {
recordsToPreSet: []types.TwapRecord{},
lastKeptTime: baseTime,
expectedKeptRecords: []types.TwapRecord{},
},
"base time; across pool 3; 4 records; 3 before lastKeptTime; only 1 deleted due to limit set to 1": {
recordsToPreSet: []types.TwapRecord{
pool3BaseSecMin1Ms, // base time - 1ms; kept since newest before lastKeptTime
pool3BaseSecBaseMs, // base time; kept since at lastKeptTime
pool3BaseSecMin3Ms, // base time - 3ms; in queue for deletion
pool3BaseSecMin2Ms, // base time - 2ms; deleted
},
lastKeptTime: baseTime,
expectedKeptRecords: []types.TwapRecord{pool3BaseSecMin3Ms, pool3BaseSecMin1Ms, pool3BaseSecBaseMs},
overwriteLimit: 1,
},
}
for name, tc := range tests {
s.Run(name, func() {
s.SetupTest()
s.preSetRecords(tc.recordsToPreSet)
ctx := s.Ctx
twapKeeper := s.twapkeeper
if tc.overwriteLimit != 0 {
originalLimit := twap.NumRecordsToPrunePerBlock
defer func() {
twap.NumRecordsToPrunePerBlock = originalLimit
}()
twap.NumRecordsToPrunePerBlock = tc.overwriteLimit
}
state := types.PruningState{
IsPruning: true,
LastKeptTime: tc.lastKeptTime,
LastKeySeen: types.FormatHistoricalTimeIndexTWAPKey(tc.lastKeptTime, 0, "", ""),
}
err := twapKeeper.PruneRecordsBeforeTimeButNewest(ctx, state)
s.Require().NoError(err)
s.validateExpectedRecords(tc.expectedKeptRecords)
})
}
}

Deleting the func along with the test is fine imo


// recordWithUpdatedAccumulators returns a record, with updated accumulator values and time for provided newTime,
// otherwise referred to as "interpolating the record" to the target time.
// This does not mutate the passed in record.
Expand Down
56 changes: 0 additions & 56 deletions x/twap/logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,62 +572,6 @@ type computeThreeAssetArithmeticTwapTestCase struct {
expErr bool
}

// TestPruneRecords tests that twap records earlier than
// current block time - RecordHistoryKeepPeriod are pruned from the store
// while keeping the newest record before the above time threshold.
// Such record is kept for each pool.
func (s *TestSuite) TestPruneRecords() {
recordHistoryKeepPeriod := s.twapkeeper.RecordHistoryKeepPeriod(s.Ctx)

pool1OlderMin2MsRecord, // deleted
pool2OlderMin1MsRecordAB, pool2OlderMin1MsRecordAC, pool2OlderMin1MsRecordBC, // deleted
pool3OlderBaseRecord, // kept as newest under keep period
pool4OlderPlus1Record := // kept as newest under keep period
s.createTestRecordsFromTime(baseTime.Add(2 * -recordHistoryKeepPeriod))

pool1Min2MsRecord, // kept as newest under keep period
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC, // kept as newest under keep period
pool3BaseRecord, // kept as it is at the keep period boundary
pool4Plus1Record := // kept as it is above the keep period boundary
s.createTestRecordsFromTime(baseTime.Add(-recordHistoryKeepPeriod))

// non-ascending insertion order.
recordsToPreSet := []types.TwapRecord{
pool2OlderMin1MsRecordAB, pool2OlderMin1MsRecordAC, pool2OlderMin1MsRecordBC,
pool4Plus1Record,
pool4OlderPlus1Record,
pool3OlderBaseRecord,
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC,
pool3BaseRecord,
pool1Min2MsRecord,
pool1OlderMin2MsRecord,
}

// tMin2Record is before the threshold and is pruned away.
// tmin1Record is the newest record before current block time - record history keep period.
// All other records happen after the threshold and are kept.
expectedKeptRecords := []types.TwapRecord{
pool3OlderBaseRecord,
pool4OlderPlus1Record,
pool1Min2MsRecord,
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC,
pool3BaseRecord,
pool4Plus1Record,
}
s.SetupTest()
s.preSetRecords(recordsToPreSet)

ctx := s.Ctx
twapKeeper := s.twapkeeper

ctx = ctx.WithBlockTime(baseTime)

err := twapKeeper.PruneRecords(ctx)
s.Require().NoError(err)

s.validateExpectedRecords(expectedKeptRecords)
}

// TestUpdateRecords tests that the records are updated correctly.
// It tests the following:
// - two-asset pools
Expand Down
17 changes: 17 additions & 0 deletions x/twap/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/osmosis-labs/osmosis/v22/x/twap/types"
)

var NumRecordsToPrunePerBlock uint16 = 1000

type timeTooOldError struct {
Time time.Time
}
Expand Down Expand Up @@ -93,6 +95,8 @@ func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime ti
}
seenPoolAssetTriplets := map[uniqueTriplet]struct{}{}
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved

var numPruned uint16

for ; iter.Valid(); iter.Next() {
timeIndexKey := iter.Key()
timeS, poolId, asset0, asset1, err := types.ParseFieldsFromHistoricalTimeKey(timeIndexKey)
Expand All @@ -117,6 +121,19 @@ func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime ti
store.Delete(timeIndexKey)
poolIndexKey := types.FormatHistoricalPoolIndexTWAPKeyFromStrTime(poolId, asset0, asset1, timeS)
store.Delete(poolIndexKey)

numPruned++
czarcas7ic marked this conversation as resolved.
Show resolved Hide resolved
czarcas7ic marked this conversation as resolved.
Show resolved Hide resolved
if numPruned >= NumRecordsToPrunePerBlock {
// We have hit the limit, so we stop pruning.
break
}
}

if !iter.Valid() {
// The iterator is exhausted, so we have pruned all records.
state := k.GetPruningState(ctx)
state.IsPruning = false
k.SetPruningState(ctx, state)
}
return nil
}
Expand Down
24 changes: 24 additions & 0 deletions x/twap/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ func (s *TestSuite) TestPruneRecordsBeforeTimeButNewest() {
lastKeptTime time.Time

expectedKeptRecords []types.TwapRecord

overwriteLimit uint16
}{
"base time; across pool 3; 4 records; 3 before lastKeptTime; 2 deleted and newest kept": {
recordsToPreSet: []types.TwapRecord{
Expand Down Expand Up @@ -507,6 +509,20 @@ func (s *TestSuite) TestPruneRecordsBeforeTimeButNewest() {

expectedKeptRecords: []types.TwapRecord{},
},
"base time; across pool 3; 4 records; 3 before lastKeptTime; only 1 deleted due to limit set to 1": {
recordsToPreSet: []types.TwapRecord{
pool3BaseSecMin1Ms, // base time - 1ms; kept since newest before lastKeptTime
pool3BaseSecBaseMs, // base time; kept since at lastKeptTime
pool3BaseSecMin3Ms, // base time - 3ms; in queue for deletion
pool3BaseSecMin2Ms, // base time - 2ms; deleted
},

lastKeptTime: baseTime,

expectedKeptRecords: []types.TwapRecord{pool3BaseSecMin3Ms, pool3BaseSecMin1Ms, pool3BaseSecBaseMs},

overwriteLimit: 1,
},
}
for name, tc := range tests {
s.Run(name, func() {
Expand All @@ -516,6 +532,14 @@ func (s *TestSuite) TestPruneRecordsBeforeTimeButNewest() {
ctx := s.Ctx
twapKeeper := s.twapkeeper

if tc.overwriteLimit != 0 {
originalLimit := twap.NumRecordsToPrunePerBlock
defer func() {
twap.NumRecordsToPrunePerBlock = originalLimit
}()
twap.NumRecordsToPrunePerBlock = tc.overwriteLimit
}

err := twapKeeper.PruneRecordsBeforeTimeButNewest(ctx, tc.lastKeptTime)
s.Require().NoError(err)

Expand Down
1 change: 1 addition & 0 deletions x/twap/types/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
)

var (
PruningStateKey = []byte{0x01}
mostRecentTWAPsNoSeparator = "recent_twap"
historicalTWAPTimeIndexNoSeparator = "historical_time_index"
historicalTWAPPoolIndexNoSeparator = "historical_pool_index"
Expand Down
Loading