245 lines
8.1 KiB
Python
245 lines
8.1 KiB
Python
from vnpy.trader.constant import Interval
|
||
from vnpy.trader.utility import ArrayManager, BarGenerator, load_json, save_json
|
||
from vnpy.trader.object import TickData, BarData
|
||
|
||
from elite_optionstrategy import (
|
||
StrategyTemplate,
|
||
Variable,
|
||
Parameter,
|
||
PortfolioData,
|
||
ChainData,
|
||
OptionData,
|
||
OptionBarGenerator,
|
||
)
|
||
|
||
from datetime import time
|
||
|
||
class AdvancedSpreadStrategy(StrategyTemplate):
|
||
"""基于DualThrust信号做空符合价差的策略"""
|
||
|
||
author: str = "用Python的交易员"
|
||
|
||
option_portfolio: str = Parameter("MO") # 期权产品代码
|
||
underlying_symbol: str = Parameter("IMJQ00.CFFEX") # 标的合约代码
|
||
k1: float = Parameter(0.3) # DualThrust参数k1
|
||
k2: float = Parameter(0.9) # DualThrust参数k2
|
||
risk_level: int = Parameter(40) # 开仓风险度
|
||
percent_add: float = Parameter(0.02) # 委托超价比例
|
||
otm_level: int = Parameter(0) # 做空期权档位
|
||
leg1_ratio: int = Parameter(4) # 顺势腿的比例
|
||
leg2_ratio: int = Parameter(1) # 逆势腿的比例
|
||
|
||
dt_signal: int = Variable(0) # 当前信号多空
|
||
trading_size: int = Variable(1) # 当前交易数量
|
||
atm_strike: float = Variable(0) # 当前平值行权价
|
||
|
||
def on_init(self) -> None:
|
||
"""策略初始化"""
|
||
self.write_log("策略初始化")
|
||
|
||
# K线截面合成器
|
||
self.obg: OptionBarGenerator = OptionBarGenerator(self.on_bars)
|
||
|
||
# 订阅行情
|
||
self.subscribe_options(self.option_portfolio)
|
||
self.subscribe_data(self.underlying_symbol)
|
||
|
||
# 标的信号对象
|
||
self.factor: DualThrustFactor = DualThrustFactor(self.underlying_symbol, self.k1, self.k2)
|
||
|
||
# 加载标的历史数据初始化
|
||
bars: list[BarData] = self.load_bars(self.underlying_symbol, 40, Interval.MINUTE)
|
||
for bar in bars:
|
||
self.factor.update_bar(bar)
|
||
|
||
# 缓存文件名称
|
||
self.data_filename: str = f"{self.name}_data.json"
|
||
|
||
def on_start(self) -> None:
|
||
"""策略启动"""
|
||
self.write_log("策略启动")
|
||
|
||
data: dict = load_json(self.data_filename)
|
||
self.dt_signal = data.get("dt_signal", 0)
|
||
|
||
def on_stop(self) -> None:
|
||
"""策略停止"""
|
||
self.write_log("策略停止")
|
||
|
||
data: dict = {"dt_signal": self.dt_signal}
|
||
save_json(self.data_filename, data)
|
||
|
||
def on_tick(self, tick: TickData) -> None:
|
||
"""Tick推送"""
|
||
self.obg.update_tick(tick)
|
||
|
||
def on_bars(self, bars: dict[str, BarData]) -> None:
|
||
"""K线推送"""
|
||
# 回测首先计算标的信号
|
||
underlying_bar: BarData = bars.pop(self.underlying_symbol, None)
|
||
if underlying_bar:
|
||
self.factor.update_bar(underlying_bar)
|
||
|
||
# 获取期权组合对象
|
||
portfolio: PortfolioData = self.get_portfolio(self.option_portfolio)
|
||
|
||
# 更新最新期权价格到组合
|
||
price_data: dict[str, float] = {}
|
||
for bar in bars.values():
|
||
price_data[bar.vt_symbol] = bar.close_price
|
||
|
||
portfolio.update_price(price_data)
|
||
|
||
# 获取当月期权链
|
||
front_chain: ChainData = portfolio.get_chain_by_level(0)
|
||
if not front_chain:
|
||
self.write_log("无法获取当月期权链,请检查是否正确添加了期权合约")
|
||
return
|
||
|
||
# 计算平值期权
|
||
front_chain.calculate_atm()
|
||
self.atm_strike = front_chain.atm_strike
|
||
|
||
# 获取当前DualThrust多空信号
|
||
dt_signal: int = self.factor.get_signal()
|
||
|
||
# 如果DualThrust多头信号,且尚未做多
|
||
if dt_signal > 0 and self.dt_signal <= 0:
|
||
# 做空Put
|
||
call: OptionData = front_chain.get_option_by_level(cp=1, level=self.otm_level)
|
||
put: OptionData = front_chain.get_option_by_level(cp=-1, level=self.otm_level)
|
||
|
||
if call and put:
|
||
# 清空之前的目标
|
||
self.clear_targets()
|
||
|
||
atr_value: float = self.factor.get_atr()
|
||
if atr_value:
|
||
# self.trading_size = int(self.risk_level / atr_value)
|
||
self.trading_size = 1 # max(self.trading_size, 1)
|
||
else:
|
||
self.trading_size = 1
|
||
|
||
self.set_target(put.vt_symbol, -self.trading_size * self.leg1_ratio)
|
||
self.set_target(call.vt_symbol, -self.trading_size * self.leg2_ratio)
|
||
# 如果DualThrust空头信号,且尚未做空
|
||
elif dt_signal < 0 and self.dt_signal >= 0:
|
||
# 做空Call
|
||
call: OptionData = front_chain.get_option_by_level(cp=1, level=self.otm_level)
|
||
put: OptionData = front_chain.get_option_by_level(cp=-1, level=self.otm_level)
|
||
|
||
if call and put:
|
||
# 清空之前的目标
|
||
self.clear_targets()
|
||
|
||
atr_value: float = self.factor.get_atr()
|
||
if atr_value:
|
||
# self.trading_size = int(self.risk_level / atr_value)
|
||
self.trading_size = 1 #max(self.trading_size, 1)
|
||
else:
|
||
self.trading_size = 1
|
||
|
||
self.set_target(call.vt_symbol, -self.trading_size * self.leg1_ratio)
|
||
self.set_target(put.vt_symbol, -self.trading_size * self.leg2_ratio)
|
||
|
||
# 缓存DualThrust多空信号
|
||
self.dt_signal = dt_signal
|
||
|
||
# 执行具体的委托交易
|
||
self.execute_trading(price_data, self.percent_add)
|
||
|
||
# 推送UI事件更新
|
||
self.put_event()
|
||
|
||
|
||
class DualThrustFactor:
|
||
"""标的物DualThrust因子(基于DualThrust输出多空信号)"""
|
||
|
||
def __init__(self, vt_symbol: str, k1: float, k2: float) -> None:
|
||
"""构造函数"""
|
||
self.vt_symbol: str = vt_symbol
|
||
self.k1: float = k1
|
||
self.k2: float = k2
|
||
|
||
self.bg: BarGenerator = BarGenerator(self.update_bar, 30, self.update_window_bar)
|
||
self.am: ArrayManager = ArrayManager(10)
|
||
|
||
self.day_open = 0
|
||
self.day_high = 0
|
||
self.day_low = 0
|
||
self.day_range = 0
|
||
self.long_entry = 0
|
||
self.short_entry = 0
|
||
self.exit_time = time(hour=14, minute=55)
|
||
|
||
self.long_entered = False
|
||
self.short_entered = False
|
||
|
||
self.signal: int = 0
|
||
self.bars: list[BarData] = [] # 手动维护最近两根K线
|
||
|
||
def update_tick(self, tick: TickData) -> None:
|
||
"""Tick更新"""
|
||
self.bg.update_tick(tick)
|
||
|
||
def update_bar(self, bar: BarData) -> None:
|
||
"""K线更新"""
|
||
self.bg.update_bar(bar)
|
||
|
||
def update_window_bar(self, bar: BarData) -> None:
|
||
"""K线更新"""
|
||
self.am.update_bar(bar)
|
||
if not self.am.inited:
|
||
return
|
||
|
||
# 维护最近两根K线
|
||
self.bars.append(bar)
|
||
if len(self.bars) > 2:
|
||
self.bars.pop(0)
|
||
|
||
# 确保至少有两根K线用于日期比较
|
||
if len(self.bars) < 2:
|
||
return
|
||
|
||
last_bar = self.bars[-2]
|
||
# bar = self.bars[-1]
|
||
|
||
# 检查是否新的一天
|
||
if last_bar.datetime.date() != bar.datetime.date():
|
||
if self.day_high:
|
||
self.day_range = self.day_high - self.day_low
|
||
self.long_entry = bar.open_price + self.k1 * self.day_range
|
||
self.short_entry = bar.open_price - self.k2 * self.day_range
|
||
|
||
self.day_open = bar.open_price
|
||
self.day_high = bar.high_price
|
||
self.day_low = bar.low_price
|
||
|
||
self.long_entered = False
|
||
self.short_entered = False
|
||
else:
|
||
self.day_high = max(self.day_high, bar.high_price)
|
||
self.day_low = min(self.day_low, bar.low_price)
|
||
|
||
if not self.day_range:
|
||
return
|
||
|
||
# 生成信号逻辑
|
||
if bar.datetime.time() < self.exit_time:
|
||
if bar.close_price > self.day_open:
|
||
self.signal = 1
|
||
else:
|
||
self.signal = -1
|
||
else:
|
||
self.signal = 0
|
||
|
||
def get_signal(self) -> int:
|
||
"""获取当前多空信号"""
|
||
return self.signal
|
||
|
||
def get_atr(self) -> float:
|
||
"""获取ATR风险度"""
|
||
if self.am.inited:
|
||
return self.am.atr(14)
|
||
else:
|
||
return 0 |