from vnpy_ctastrategy import ( CtaTemplate, TargetPosTemplate, StopOrder, TickData, BarData, TradeData, OrderData, BarGenerator, ArrayManager, ) import numpy as np import numpy as np import talib class vip15(CtaTemplate): author = "quant789.com" period = 5 pds = 24 trailing_stop_rate = 65 lots = 1 kg = 0 parameters = ["period", "pds", "trailing_stop_rate", "lots"] variables = ["sdo_value", "lots", "barcout", "kg", "dd", "ss"] def __init__(self, cta_engine, strategy_name, vt_symbol, setting): super().__init__(cta_engine, strategy_name, vt_symbol, setting) self.bg = BarGenerator(self.on_bar) self.am = ArrayManager(300) # Adjust size as needed self.sdo_array = 0 self.barcout = 0 self.dd = 0 self.ss = 0 self.over_bought = 40 self.over_sold = -40 self.lb_period = 50 self.highest_low_after_entry = 0 self.lowest_high_after_entry = 0 self.liq_ka = 1 self.dliq_point = 0 self.kliq_point = 0 self.ent_bar = 0 self.ent_price = 0 def calculate_sdo(self): if len(self.am.close_array) < self.lb_period: return [] close = self.am.close_array dist = np.abs(close - np.roll(close, self.period)) lowest_dist = np.minimum.accumulate(dist[::-1])[::-1] highest_dist = np.maximum.accumulate(dist[::-1])[::-1] # 避免除以零 denom = highest_dist - lowest_dist denom[denom == 0] = 1e-10 # 将零值替换为一个很小的数 dval = np.where(denom != 0, (dist - lowest_dist) / denom, 0) ddval = np.zeros_like(dval) ddval[close > np.roll(close, self.period)] = dval[close > np.roll(close, self.period)] ddval[close < np.roll(close, self.period)] = -dval[close < np.roll(close, self.period)] sdo = np.convolve(ddval, np.ones(self.pds), 'valid') / self.pds * 100 return sdo def on_init(self): self.write_log("策略初始化") self.load_bar(10) def on_start(self): self.write_log("策略启动") def on_stop(self): self.write_log("策略停止") def on_tick(self, tick: TickData): self.bg.update_tick(tick) def on_bar(self, bar: BarData): self.am.update_bar(bar) if not self.am.inited: return self.sdo_array = self.calculate_sdo() cond1 = self.sdo_array[-2] > self.over_bought and self.sdo_array[-1] <= self.over_bought cond2 = self.sdo_array[-2] < self.over_sold and self.sdo_array[-1] >= self.over_sold hh = self.am.high_array[-self.period:].max() ll = self.am.low_array[-self.period:].min() if cond1==True or cond2==True: self.dd = hh self.ss = ll self.kg = 1 self.barcout = 0 if self.kg > 0: self.barcout += 1 if self.barcout > self.pds: self.barcout = 0 self.dd = 0 self.ss = 0 self.kg = 0 if self.kg > 0 and bar.high_price >= self.dd and self.dd>0 : # self.buy(bar.close_price, self.lots) self.kg = 0 self.barcout = 0 self.highest_low_after_entry = bar.close_price self.initialize_vars_on_entry(bar) if self.kg > 0 and bar.low_price <= self.ss and self.ss>0 : # self.short(bar.close_price, self.lots) self.kg = 0 self.barcout = 0 self.lowest_high_after_entry = bar.close_price self.initialize_vars_on_entry(bar) else: self.trailing_stop(bar) self.put_event() def trailing_stop(self, bar: BarData): if self.pos > 0: self.highest_low_after_entry = max(self.highest_low_after_entry, bar.low_price) self.dliq_point = self.highest_low_after_entry - (bar.open_price * self.trailing_stop_rate / 10000) * self.liq_ka if bar.low_price <= self.dliq_point and bar.datetime.timestamp() > self.ent_bar: self.sell(bar.close_price , abs(self.pos)) elif self.pos < 0: self.lowest_high_after_entry = min(self.lowest_high_after_entry, bar.high_price) self.kliq_point = self.lowest_high_after_entry + (bar.open_price * self.trailing_stop_rate / 10000) * self.liq_ka if bar.high_price >= self.kliq_point and bar.datetime.timestamp() > self.ent_bar: self.cover(bar.close_price , abs(self.pos)) def initialize_vars_on_entry(self, bar: BarData): self.ent_bar = bar.datetime.timestamp() self.ent_price = bar.close_price self.liq_ka = 1 def on_order(self, order: OrderData): pass def on_stop_order(self, stop_order: StopOrder): pass