from vnpy.trader.constant import Interval from vnpy.trader.utility import ArrayManager, BarGenerator, load_json, save_json from vnpy.trader.object import TickData, BarData from elite_optionstrategy import ( StrategyTemplate, Variable, Parameter, PortfolioData, ChainData, OptionData, OptionBarGenerator, ) from datetime import time class AdvancedSpreadStrategy(StrategyTemplate): """基于DualThrust信号做空符合价差的策略""" author: str = "用Python的交易员" option_portfolio: str = Parameter("MO") # 期权产品代码 underlying_symbol: str = Parameter("IMJQ00.CFFEX") # 标的合约代码 k1: float = Parameter(0.3) # DualThrust参数k1 k2: float = Parameter(0.9) # DualThrust参数k2 risk_level: int = Parameter(40) # 开仓风险度 percent_add: float = Parameter(0.02) # 委托超价比例 otm_level: int = Parameter(0) # 做空期权档位 leg1_ratio: int = Parameter(4) # 顺势腿的比例 leg2_ratio: int = Parameter(1) # 逆势腿的比例 dt_signal: int = Variable(0) # 当前信号多空 trading_size: int = Variable(1) # 当前交易数量 atm_strike: float = Variable(0) # 当前平值行权价 def on_init(self) -> None: """策略初始化""" self.write_log("策略初始化") # K线截面合成器 self.obg: OptionBarGenerator = OptionBarGenerator(self.on_bars) # 订阅行情 self.subscribe_options(self.option_portfolio) self.subscribe_data(self.underlying_symbol) # 标的信号对象 self.factor: DualThrustFactor = DualThrustFactor(self.underlying_symbol, self.k1, self.k2) # 加载标的历史数据初始化 bars: list[BarData] = self.load_bars(self.underlying_symbol, 40, Interval.MINUTE) for bar in bars: self.factor.update_bar(bar) # 缓存文件名称 self.data_filename: str = f"{self.name}_data.json" def on_start(self) -> None: """策略启动""" self.write_log("策略启动") data: dict = load_json(self.data_filename) self.dt_signal = data.get("dt_signal", 0) def on_stop(self) -> None: """策略停止""" self.write_log("策略停止") data: dict = {"dt_signal": self.dt_signal} save_json(self.data_filename, data) def on_tick(self, tick: TickData) -> None: """Tick推送""" self.obg.update_tick(tick) def on_bars(self, bars: dict[str, BarData]) -> None: """K线推送""" # 回测首先计算标的信号 underlying_bar: BarData = bars.pop(self.underlying_symbol, None) if underlying_bar: self.factor.update_bar(underlying_bar) # 获取期权组合对象 portfolio: PortfolioData = self.get_portfolio(self.option_portfolio) # 更新最新期权价格到组合 price_data: dict[str, float] = {} for bar in bars.values(): price_data[bar.vt_symbol] = bar.close_price portfolio.update_price(price_data) # 获取当月期权链 front_chain: ChainData = portfolio.get_chain_by_level(0) if not front_chain: self.write_log("无法获取当月期权链,请检查是否正确添加了期权合约") return # 计算平值期权 front_chain.calculate_atm() self.atm_strike = front_chain.atm_strike # 获取当前DualThrust多空信号 dt_signal: int = self.factor.get_signal() # 如果DualThrust多头信号,且尚未做多 if dt_signal > 0 and self.dt_signal <= 0: # 做空Put call: OptionData = front_chain.get_option_by_level(cp=1, level=self.otm_level) put: OptionData = front_chain.get_option_by_level(cp=-1, level=self.otm_level) if call and put: # 清空之前的目标 self.clear_targets() atr_value: float = self.factor.get_atr() if atr_value: # self.trading_size = int(self.risk_level / atr_value) self.trading_size = 1 # max(self.trading_size, 1) else: self.trading_size = 1 self.set_target(put.vt_symbol, -self.trading_size * self.leg1_ratio) self.set_target(call.vt_symbol, -self.trading_size * self.leg2_ratio) # 如果DualThrust空头信号,且尚未做空 elif dt_signal < 0 and self.dt_signal >= 0: # 做空Call call: OptionData = front_chain.get_option_by_level(cp=1, level=self.otm_level) put: OptionData = front_chain.get_option_by_level(cp=-1, level=self.otm_level) if call and put: # 清空之前的目标 self.clear_targets() atr_value: float = self.factor.get_atr() if atr_value: # self.trading_size = int(self.risk_level / atr_value) self.trading_size = 1 #max(self.trading_size, 1) else: self.trading_size = 1 self.set_target(call.vt_symbol, -self.trading_size * self.leg1_ratio) self.set_target(put.vt_symbol, -self.trading_size * self.leg2_ratio) # 缓存DualThrust多空信号 self.dt_signal = dt_signal # 执行具体的委托交易 self.execute_trading(price_data, self.percent_add) # 推送UI事件更新 self.put_event() class DualThrustFactor: """标的物DualThrust因子(基于DualThrust输出多空信号)""" def __init__(self, vt_symbol: str, k1: float, k2: float) -> None: """构造函数""" self.vt_symbol: str = vt_symbol self.k1: float = k1 self.k2: float = k2 self.bg: BarGenerator = BarGenerator(self.update_bar, 30, self.update_window_bar) self.am: ArrayManager = ArrayManager(10) self.day_open = 0 self.day_high = 0 self.day_low = 0 self.day_range = 0 self.long_entry = 0 self.short_entry = 0 self.exit_time = time(hour=14, minute=55) self.long_entered = False self.short_entered = False self.signal: int = 0 self.bars: list[BarData] = [] # 手动维护最近两根K线 def update_tick(self, tick: TickData) -> None: """Tick更新""" self.bg.update_tick(tick) def update_bar(self, bar: BarData) -> None: """K线更新""" self.bg.update_bar(bar) def update_window_bar(self, bar: BarData) -> None: """K线更新""" self.am.update_bar(bar) if not self.am.inited: return # 维护最近两根K线 self.bars.append(bar) if len(self.bars) > 2: self.bars.pop(0) # 确保至少有两根K线用于日期比较 if len(self.bars) < 2: return last_bar = self.bars[-2] # bar = self.bars[-1] # 检查是否新的一天 if last_bar.datetime.date() != bar.datetime.date(): if self.day_high: self.day_range = self.day_high - self.day_low self.long_entry = bar.open_price + self.k1 * self.day_range self.short_entry = bar.open_price - self.k2 * self.day_range self.day_open = bar.open_price self.day_high = bar.high_price self.day_low = bar.low_price self.long_entered = False self.short_entered = False else: self.day_high = max(self.day_high, bar.high_price) self.day_low = min(self.day_low, bar.low_price) if not self.day_range: return # 生成信号逻辑 if bar.datetime.time() < self.exit_time: if bar.close_price > self.day_open: self.signal = 1 else: self.signal = -1 else: self.signal = 0 def get_signal(self) -> int: """获取当前多空信号""" return self.signal def get_atr(self) -> float: """获取ATR风险度""" if self.am.inited: return self.am.atr(14) else: return 0