from vnpy.trader.constant import Interval from vnpy.trader.utility import ArrayManager, BarGenerator, load_json, save_json from vnpy.trader.object import TickData, BarData from elite_optionstrategy import ( StrategyTemplate, Variable, Parameter, PortfolioData, ChainData, OptionData, OptionBarGenerator, ) class AdvancedSpreadStrategy(StrategyTemplate): """基于均线信号做空符合价差的策略""" author: str = "用Python的交易员" option_portfolio: str = Parameter("IO") # 期权产品代码 underlying_symbol: str = Parameter("IFJQ00.CFFEX") # 标的合约代码 fast_window: int = Parameter(5) # 快速均线周期 slow_window: int = Parameter(60) # 慢速均线周期 risk_level: int = Parameter(40) # 开仓风险度 percent_add: float = Parameter(0.02) # 委托超价比例 otm_level: int = Parameter(0) # 做空期权档位 leg1_ratio: int = Parameter(4) # 顺势腿的比例 leg2_ratio: int = Parameter(1) # 逆势腿的比例 ma_signal: int = Variable(0) # 当前信号多空 trading_size: int = Variable(1) # 当前交易数量 atm_strike: float = Variable(0) # 当前平值行权价 def on_init(self) -> None: """策略初始化""" self.write_log("策略初始化") # K线截面合成器 self.obg: OptionBarGenerator = OptionBarGenerator(self.on_bars) # 订阅行情 self.subscribe_options(self.option_portfolio) self.subscribe_data(self.underlying_symbol) # 标的信号对象 self.factor: MaFactor = MaFactor(self.underlying_symbol, self.fast_window, self.slow_window) # 加载标的历史数据初始化 bars: list[BarData] = self.load_bars(self.underlying_symbol, 40, Interval.MINUTE) for bar in bars: self.factor.update_bar(bar) # 缓存文件名称 self.data_filename: str = f"{self.name}_data.json" def on_start(self) -> None: """策略启动""" self.write_log("策略启动") data: dict = load_json(self.data_filename) self.ma_signal = data.get("ma_signal", 0) def on_stop(self) -> None: """策略停止""" self.write_log("策略停止") data: dict = {"ma_signal": self.ma_signal} save_json(self.data_filename, data) def on_tick(self, tick: TickData) -> None: """Tick推送""" self.obg.update_tick(tick) def on_bars(self, bars: dict[str, BarData]) -> None: """K线推送""" # 回测首先计算标的信号 underlying_bar: BarData = bars.pop(self.underlying_symbol, None) if underlying_bar: self.factor.update_bar(underlying_bar) # 获取期权组合对象 portfolio: PortfolioData = self.get_portfolio(self.option_portfolio) # 更新最新期权价格到组合 price_data: dict[str, float] = {} for bar in bars.values(): price_data[bar.vt_symbol] = bar.close_price portfolio.update_price(price_data) # 获取当月期权链 front_chain: ChainData = portfolio.get_chain_by_level(0) if not front_chain: self.write_log("无法获取当月期权链,请检查是否正确添加了期权合约") return # 计算平值期权 front_chain.calculate_atm() self.atm_strike = front_chain.atm_strike # 获取当前均线多空信号 ma_signal: int = self.factor.get_signal() # 如果均线多头排列,且尚未做多 if ma_signal > 0 and self.ma_signal <= 0: # 做空Put call: OptionData = front_chain.get_option_by_level(cp=1, level=self.otm_level) put: OptionData = front_chain.get_option_by_level(cp=-1, level=self.otm_level) if call and put: # 清空之前的目标 self.clear_targets() atr_value: float = self.factor.get_atr() if atr_value: self.trading_size = int(self.risk_level / atr_value) self.trading_size = max(self.trading_size, 1) else: self.trading_size = 1 self.set_target(put.vt_symbol, -self.trading_size * self.leg1_ratio) self.set_target(call.vt_symbol, -self.trading_size * self.leg2_ratio) # 如果均线空头排列,且尚未做空 elif ma_signal < 0 and self.ma_signal >= 0: # 做空Call call: OptionData = front_chain.get_option_by_level(cp=1, level=self.otm_level) put: OptionData = front_chain.get_option_by_level(cp=-1, level=self.otm_level) if call and put: # 清空之前的目标 self.clear_targets() atr_value: float = self.factor.get_atr() if atr_value: self.trading_size = int(self.risk_level / atr_value) self.trading_size = max(self.trading_size, 1) else: self.trading_size = 1 self.set_target(call.vt_symbol, -self.trading_size * self.leg1_ratio) self.set_target(put.vt_symbol, -self.trading_size * self.leg2_ratio) # 缓存均线多空信号 self.ma_signal = ma_signal # 执行具体的委托交易 self.execute_trading(price_data, self.percent_add) # 推送UI事件更新 self.put_event() class MaFactor: """标的物均线因子(基于均线输出多空信号)""" def __init__(self, vt_symbol: str, fast_window: int, slow_window: int) -> None: """构造函数""" self.vt_symbol: str = vt_symbol self.fast_window: int = fast_window self.slow_window: int = slow_window self.bg: BarGenerator = BarGenerator(self.update_bar, 30, self.update_window_bar) self.am: ArrayManager = ArrayManager(slow_window + 10) self.signal: int = 0 def update_tick(self, tick: TickData) -> None: """Tick更新""" self.bg.update_tick(tick) def update_bar(self, bar: BarData) -> None: """K线更新""" self.bg.update_bar(bar) def update_window_bar(self, bar: BarData) -> None: """K线更新""" self.am.update_bar(bar) if not self.am.inited: return # 计算均线 self.fast_ma = self.am.sma(self.fast_window) self.slow_ma = self.am.sma(self.slow_window) # 判断信号 if self.fast_ma > self.slow_ma: self.signal = 1 elif self.fast_ma < self.slow_ma: self.signal = -1 else: self.signal = 0 class TurtleSignalStrategy: def __init__(self, vt_symbol: str, entry_window: int, exit_window: int, atr_window: int) -> None: """构造函数""" self.vt_symbol: str = vt_symbol self.entry_window: int = entry_window self.exit_window: int = exit_window self.atr_window: int = atr_window self.bg: BarGenerator = BarGenerator(self.update_bar, 30, self.update_window_bar) self.am: ArrayManager = ArrayManager(slow_window + 10) def get_signal(self) -> int: """获取当前多空信号""" return self.signal def get_atr(self) -> float: """获取ATR风险度""" if self.am.inited: return self.am.atr(self.slow_window) else: return 0