Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 87 additions & 66 deletions qlib/contrib/strategy/cost_control.py
Original file line number Diff line number Diff line change
@@ -1,101 +1,122 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
This strategy is not well maintained
"""


from .order_generator import OrderGenWInteract
from .signal_strategy import WeightStrategyBase
import copy


class SoftTopkStrategy(WeightStrategyBase):
def __init__(
self,
model,
dataset,
topk,
model=None,
dataset=None,
topk=None,
order_generator_cls_or_obj=OrderGenWInteract,
max_sold_weight=1.0,
trade_impact_limit=None,
priority="IMPACT_FIRST",
risk_degree=0.95,
buy_method="first_fill",
trade_exchange=None,
level_infra=None,
common_infra=None,
**kwargs,
):
"""
Refactored SoftTopkStrategy with a budget-constrained rebalancing engine.

Parameters
----------
topk : int
top-N stocks to buy
The number of top-N stocks to be held in the portfolio.
trade_impact_limit : float
Maximum weight change for each stock in one trade.
priority : str
"COMPLIANCE_FIRST" or "IMPACT_FIRST".
risk_degree : float
position percentage of total value buy_method:

rank_fill: assign the weight stocks that rank high first(1/topk max)
average_fill: assign the weight to the stocks rank high averagely.
The target percentage of total value to be invested.
"""
super(SoftTopkStrategy, self).__init__(
model, dataset, order_generator_cls_or_obj, trade_exchange, level_infra, common_infra, **kwargs
model=model, dataset=dataset, order_generator_cls_or_obj=order_generator_cls_or_obj, **kwargs
)

self.topk = topk
self.max_sold_weight = max_sold_weight
self.trade_impact_limit = trade_impact_limit if trade_impact_limit is not None else max_sold_weight
self.priority = priority.upper()
self.risk_degree = risk_degree
self.buy_method = buy_method

def get_risk_degree(self, trade_step=None):
"""get_risk_degree
Return the proportion of your total value you will used in investment.
Dynamically risk_degree will result in Market timing
"""
# It will use 95% amount of your total value by default
return self.risk_degree

def generate_target_weight_position(self, score, current, trade_start_time, trade_end_time):
def generate_target_weight_position(self, score, current, trade_start_time, trade_end_time, **kwargs):
"""
Generates target position using Proportional Budget Allocation.
Ensures deterministic sells and synchronized buys under impact limits.
"""
Parameters
----------
score:
pred score for this trade date, pd.Series, index is stock_id, contain 'score' column
current:
current position, use Position() class
trade_date:
trade date

generate target position from score for this date and the current position
if self.topk is None or self.topk <= 0:
return {}

The cache is not considered in the position
"""
# TODO:
# If the current stock list is more than topk(eg. The weights are modified
# by risk control), the weight will not be handled correctly.
buy_signal_stocks = set(score.sort_values(ascending=False).iloc[: self.topk].index)
cur_stock_weight = current.get_stock_weight_dict(only_stock=True)

if len(cur_stock_weight) == 0:
final_stock_weight = {code: 1 / self.topk for code in buy_signal_stocks}
else:
final_stock_weight = copy.deepcopy(cur_stock_weight)
sold_stock_weight = 0.0
for stock_id in final_stock_weight:
if stock_id not in buy_signal_stocks:
sw = min(self.max_sold_weight, final_stock_weight[stock_id])
sold_stock_weight += sw
final_stock_weight[stock_id] -= sw
if self.buy_method == "first_fill":
for stock_id in buy_signal_stocks:
add_weight = min(
max(1 / self.topk - final_stock_weight.get(stock_id, 0), 0.0),
sold_stock_weight,
)
final_stock_weight[stock_id] = final_stock_weight.get(stock_id, 0.0) + add_weight
sold_stock_weight -= add_weight
elif self.buy_method == "average_fill":
for stock_id in buy_signal_stocks:
final_stock_weight[stock_id] = final_stock_weight.get(stock_id, 0.0) + sold_stock_weight / len(
buy_signal_stocks
ideal_per_stock = self.risk_degree / self.topk
ideal_list = score.sort_values(ascending=False).iloc[: self.topk].index.tolist()

cur_weights = current.get_stock_weight_dict(only_stock=True)
initial_total_weight = sum(cur_weights.values())

# --- Case A: Cold Start ---
if not cur_weights:
fill = (
ideal_per_stock
if self.priority == "COMPLIANCE_FIRST"
else min(ideal_per_stock, self.trade_impact_limit)
)
return {code: fill for code in ideal_list}

# --- Case B: Rebalancing ---
all_tickers = set(cur_weights.keys()) | set(ideal_list)
next_weights = {t: cur_weights.get(t, 0.0) for t in all_tickers}

# Phase 1: Deterministic Sell Phase
released_cash = 0.0
for t in list(next_weights.keys()):
cur = next_weights[t]
if cur <= 1e-8:
continue

if t not in ideal_list:
sell = cur if self.priority == "COMPLIANCE_FIRST" else min(cur, self.trade_impact_limit)
next_weights[t] -= sell
released_cash += sell
elif cur > ideal_per_stock + 1e-8:
excess = cur - ideal_per_stock
sell = excess if self.priority == "COMPLIANCE_FIRST" else min(excess, self.trade_impact_limit)
next_weights[t] -= sell
released_cash += sell

# Phase 2: Budget Calculation
# Budget = Cash from sells + Available space from target risk degree
total_budget = released_cash + (self.risk_degree - initial_total_weight)

# Phase 3: Proportional Buy Allocation
if total_budget > 1e-8:
shortfalls = {
t: (ideal_per_stock - next_weights.get(t, 0.0))
for t in ideal_list
if next_weights.get(t, 0.0) < ideal_per_stock - 1e-8
}

if shortfalls:
total_shortfall = sum(shortfalls.values())
# Normalize total_budget to not exceed total_shortfall
available_to_spend = min(total_budget, total_shortfall)

for t, shortfall in shortfalls.items():
# Every stock gets its fair share based on its distance to target
share_of_budget = (shortfall / total_shortfall) * available_to_spend

# Capped by impact limit or compliance priority
max_buy_cap = (
shortfall if self.priority == "COMPLIANCE_FIRST" else min(shortfall, self.trade_impact_limit)
)
else:
raise ValueError("Buy method not found")
return final_stock_weight

next_weights[t] += min(share_of_budget, max_buy_cap)

return {k: v for k, v in next_weights.items() if v > 1e-8}
57 changes: 57 additions & 0 deletions tests/test_soft_topk_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pandas as pd
import pytest
from qlib.contrib.strategy.cost_control import SoftTopkStrategy


class MockPosition:
def __init__(self, weights):
self.weights = weights

def get_stock_weight_dict(self, only_stock=True):
return self.weights


def test_soft_topk_logic():
# Initial: A=0.8, B=0.2 (Total=1.0). Target Risk=0.95.
# Scores: A and B are low, C and D are topk.
scores = pd.Series({"C": 0.9, "D": 0.8, "A": 0.1, "B": 0.1})
current_pos = MockPosition({"A": 0.8, "B": 0.2})

topk = 2
risk_degree = 0.95
impact_limit = 0.1 # Max change per step

def create_test_strategy(priority):
strat = SoftTopkStrategy.__new__(SoftTopkStrategy)
strat.topk = topk
strat.risk_degree = risk_degree
strat.trade_impact_limit = impact_limit
strat.priority = priority.upper()
return strat

# 1. Test IMPACT_FIRST: Expect deterministic sell and limited buy
strat_i = create_test_strategy("IMPACT_FIRST")
res_i = strat_i.generate_target_weight_position(scores, current_pos, None, None)

# A should be exactly 0.8 - 0.1 = 0.7
assert abs(res_i["A"] - 0.7) < 1e-8
# B should be exactly 0.2 - 0.1 = 0.1
assert abs(res_i["B"] - 0.1) < 1e-8
# Total sells = 0.2 released. New budget = 0.2 + (0.95 - 1.0) = 0.15.
# C and D share 0.15 -> 0.075 each.
assert abs(res_i["C"] - 0.075) < 1e-8
assert abs(res_i["D"] - 0.075) < 1e-8

# 2. Test COMPLIANCE_FIRST: Expect full liquidation and full target fill
strat_c = create_test_strategy("COMPLIANCE_FIRST")
res_c = strat_c.generate_target_weight_position(scores, current_pos, None, None)

# A, B not in topk -> Liquidated
assert "A" not in res_c and "B" not in res_c
# C, D should reach ideal_per_stock (0.95/2 = 0.475)
assert abs(res_c["C"] - 0.475) < 1e-8
assert abs(res_c["D"] - 0.475) < 1e-8


if __name__ == "__main__":
pytest.main([__file__])
Loading