Files
Quant_Code/1.交易策略/3.期权策略/advanced_spread_strategy_SuperComboStrategy.py

216 lines
7.3 KiB
Python

from vnpy.trader.constant import Interval
from vnpy.trader.utility import ArrayManager, BarGenerator, load_json, save_json
from vnpy.trader.object import TickData, BarData
from elite_optionstrategy import (
StrategyTemplate,
Variable,
Parameter,
PortfolioData,
ChainData,
OptionData,
OptionBarGenerator,
)
class AdvancedSpreadStrategy(StrategyTemplate):
"""基于均线信号做空符合价差的策略"""
author: str = "用Python的交易员"
option_portfolio: str = Parameter("IO") # 期权产品代码
underlying_symbol: str = Parameter("IFJQ00.CFFEX") # 标的合约代码
fast_window: int = Parameter(5) # 快速均线周期
slow_window: int = Parameter(60) # 慢速均线周期
risk_level: int = Parameter(40) # 开仓风险度
percent_add: float = Parameter(0.02) # 委托超价比例
otm_level: int = Parameter(0) # 做空期权档位
leg1_ratio: int = Parameter(4) # 顺势腿的比例
leg2_ratio: int = Parameter(1) # 逆势腿的比例
ma_signal: int = Variable(0) # 当前信号多空
trading_size: int = Variable(1) # 当前交易数量
atm_strike: float = Variable(0) # 当前平值行权价
def on_init(self) -> None:
"""策略初始化"""
self.write_log("策略初始化")
# K线截面合成器
self.obg: OptionBarGenerator = OptionBarGenerator(self.on_bars)
# 订阅行情
self.subscribe_options(self.option_portfolio)
self.subscribe_data(self.underlying_symbol)
# 标的信号对象
self.factor: MaFactor = MaFactor(self.underlying_symbol, self.fast_window, self.slow_window)
# 加载标的历史数据初始化
bars: list[BarData] = self.load_bars(self.underlying_symbol, 40, Interval.MINUTE)
for bar in bars:
self.factor.update_bar(bar)
# 缓存文件名称
self.data_filename: str = f"{self.name}_data.json"
def on_start(self) -> None:
"""策略启动"""
self.write_log("策略启动")
data: dict = load_json(self.data_filename)
self.ma_signal = data.get("ma_signal", 0)
def on_stop(self) -> None:
"""策略停止"""
self.write_log("策略停止")
data: dict = {"ma_signal": self.ma_signal}
save_json(self.data_filename, data)
def on_tick(self, tick: TickData) -> None:
"""Tick推送"""
self.obg.update_tick(tick)
def on_bars(self, bars: dict[str, BarData]) -> None:
"""K线推送"""
# 回测首先计算标的信号
underlying_bar: BarData = bars.pop(self.underlying_symbol, None)
if underlying_bar:
self.factor.update_bar(underlying_bar)
# 获取期权组合对象
portfolio: PortfolioData = self.get_portfolio(self.option_portfolio)
# 更新最新期权价格到组合
price_data: dict[str, float] = {}
for bar in bars.values():
price_data[bar.vt_symbol] = bar.close_price
portfolio.update_price(price_data)
# 获取当月期权链
front_chain: ChainData = portfolio.get_chain_by_level(0)
if not front_chain:
self.write_log("无法获取当月期权链,请检查是否正确添加了期权合约")
return
# 计算平值期权
front_chain.calculate_atm()
self.atm_strike = front_chain.atm_strike
# 获取当前均线多空信号
ma_signal: int = self.factor.get_signal()
# 如果均线多头排列,且尚未做多
if ma_signal > 0 and self.ma_signal <= 0:
# 做空Put
call: OptionData = front_chain.get_option_by_level(cp=1, level=self.otm_level)
put: OptionData = front_chain.get_option_by_level(cp=-1, level=self.otm_level)
if call and put:
# 清空之前的目标
self.clear_targets()
atr_value: float = self.factor.get_atr()
if atr_value:
self.trading_size = int(self.risk_level / atr_value)
self.trading_size = max(self.trading_size, 1)
else:
self.trading_size = 1
self.set_target(put.vt_symbol, -self.trading_size * self.leg1_ratio)
self.set_target(call.vt_symbol, -self.trading_size * self.leg2_ratio)
# 如果均线空头排列,且尚未做空
elif ma_signal < 0 and self.ma_signal >= 0:
# 做空Call
call: OptionData = front_chain.get_option_by_level(cp=1, level=self.otm_level)
put: OptionData = front_chain.get_option_by_level(cp=-1, level=self.otm_level)
if call and put:
# 清空之前的目标
self.clear_targets()
atr_value: float = self.factor.get_atr()
if atr_value:
self.trading_size = int(self.risk_level / atr_value)
self.trading_size = max(self.trading_size, 1)
else:
self.trading_size = 1
self.set_target(call.vt_symbol, -self.trading_size * self.leg1_ratio)
self.set_target(put.vt_symbol, -self.trading_size * self.leg2_ratio)
# 缓存均线多空信号
self.ma_signal = ma_signal
# 执行具体的委托交易
self.execute_trading(price_data, self.percent_add)
# 推送UI事件更新
self.put_event()
class MaFactor:
"""标的物均线因子(基于均线输出多空信号)"""
def __init__(self, vt_symbol: str, fast_window: int, slow_window: int) -> None:
"""构造函数"""
self.vt_symbol: str = vt_symbol
self.fast_window: int = fast_window
self.slow_window: int = slow_window
self.bg: BarGenerator = BarGenerator(self.update_bar, 30, self.update_window_bar)
self.am: ArrayManager = ArrayManager(slow_window + 10)
self.signal: int = 0
def update_tick(self, tick: TickData) -> None:
"""Tick更新"""
self.bg.update_tick(tick)
def update_bar(self, bar: BarData) -> None:
"""K线更新"""
self.bg.update_bar(bar)
def update_window_bar(self, bar: BarData) -> None:
"""K线更新"""
self.am.update_bar(bar)
if not self.am.inited:
return
# 计算均线
self.fast_ma = self.am.sma(self.fast_window)
self.slow_ma = self.am.sma(self.slow_window)
# 判断信号
if self.fast_ma > self.slow_ma:
self.signal = 1
elif self.fast_ma < self.slow_ma:
self.signal = -1
else:
self.signal = 0
class TurtleSignalStrategy:
def __init__(self, vt_symbol: str, entry_window: int, exit_window: int, atr_window: int) -> None:
"""构造函数"""
self.vt_symbol: str = vt_symbol
self.entry_window: int = entry_window
self.exit_window: int = exit_window
self.atr_window: int = atr_window
self.bg: BarGenerator = BarGenerator(self.update_bar, 30, self.update_window_bar)
self.am: ArrayManager = ArrayManager(slow_window + 10)
def get_signal(self) -> int:
"""获取当前多空信号"""
return self.signal
def get_atr(self) -> float:
"""获取ATR风险度"""
if self.am.inited:
return self.am.atr(self.slow_window)
else:
return 0