Files

264 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from vnpy_ctastrategy import (
CtaTemplate,
TargetPosTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager,
)
import pandas as pd
import numpy as np
import math
class vip11(TargetPosTemplate):
""""""
author = "松鼠Quant"
#默认螺纹888 1H
length = 15
M=35
smoothingFactor = 0.1
TS = 50 #移动止损止盈幅度 参数范围及步长: 5-2005
lots=1 #下单手数
fastLength = 12
slowLength = 26
MACDLength =9
current_bar = 0
Highup=0
Lowdown=0
HigherAfterEntry = float('inf')
LowerAfterEntry = -float('inf')
liQKA = 0
DliqPoint = 0
KliqPoint = 0
OBVValue=[0]
MAOBV=[0]
M2=[0]
cond1=[0]*2
cond2=[0]*2
cond3=[0]*2
kong_cond1=[0]*2
kong_cond2=[0]*2
kong_cond3=[0]*2
MACDValue=[0]*3
normalizedMACD=[1]*3
smoothedMACD=[1]*3
smoothedNormalizedMACD=[1]*3
STCValue=[1]*3
MA1=0
MA2=0
MA3=0
gobro=0
parameters = ["length",'M','smoothingFactor',"TS",'lots']
variables = ["HigherAfterEntry","LowerAfterEntry","liQKA","DliqPoint","KliqPoint"]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
#df: pd.DataFrame = pd.read_csv(r"index_contract.csv")
#self.contracts_sizes = {row.vt_symbol.split('.')[0]: row.contract_size for _, row in df.iterrows()}
self.bg = BarGenerator(self.on_bar)
self.am = ArrayManager(300)
self.pos=0
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(1)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def calculate_con(self,Price1, Price2, Length):
Matches = 0
Price2=Price2-Length
Price2list=[]
Price2list.extend([Price2 + j for j in range(1, Length)]) # 使用extend将元素添加到列表中
# 检查是否有足够的数据进行计算
if len(Price1) < Length :
return 0
else:
for i in range(1,Length-1):
# 计算Con条件
if (Price1[i] >= Price1[i-1] and Price2list[i] >= Price2list[i-1]) or \
(Price1[i] < Price1[i-1] and Price2list[i] < Price2list[i-1]):
Matches += 1
# 计算结果并返回
return 2 * Matches / Length - 1
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
# 如果历史数据还没有初始化完毕,则直接返回
# 注意am 是交易系统中的一个数据管理器,可能用于管理历史数据
# bar 是当前的K线数据其中包含了开盘价、最高价、最低价、收盘价和成交量等信息
self.current_bar=self.current_bar+1 #Bar线计数
#print(bar.datetime,"current_bar:",self.current_bar)
# 如果当前Bar的计数器值小于self.M*2
if(self.current_bar<self.M*2):
return
#均线
self.MA1=am.ema(self.M,array=True)
self.MA2=am.ema(self.M*2,array=True)
# 如果当前Bar的计数器值小于self.MACDLength
if(self.current_bar<self.MACDLength):
return
a1 = 2 / (self.fastLength + 1)
a2 = 2 / (self.slowLength + 1)
# 计算相关系数
r2 = 0.5 * math.pow(self.calculate_con(am.close_array[-self.MACDLength:], self.current_bar, self.MACDLength), 2) + 0.5
kk = r2 * ((1 - a1) * (1 - a2)) + (1 - r2) * ((1 - a1) / (1 - a2))
# 计算MACD值
self.MACDValue.append((am.close_array[-1] - am.close_array[-2]) * (a1 - a2) + (-a2 - a1 + 2) * self.MACDValue[-1] - kk * self.MACDValue[-2])
# 如果当前Bar的计数器值小于self.fastLength
if(self.current_bar<self.fastLength):
return
lowestMACD = min(self.MACDValue[-self.fastLength:])
highestMACD = max(self.MACDValue[-self.fastLength:]) - lowestMACD
# 归一化MACD
self.normalizedMACD.append((self.MACDValue[-1] - lowestMACD) / highestMACD * 100 if highestMACD > 0 else self.normalizedMACD[-1])
# 平滑MACD
self.smoothedMACD.append(self.normalizedMACD[-1] if self.smoothedMACD[-1]==1 else self.smoothedMACD[-2] + self.smoothingFactor * (self.normalizedMACD[-1] - self.smoothedMACD[-2]))
# 计算平滑后MACD的最低值和最高值
lowestSmoothedMACD = min(self.smoothedMACD[-self.fastLength:])
highestSmoothedMACD = max(self.smoothedMACD[-self.fastLength:]) - lowestSmoothedMACD
# 归一化平滑后MACD
self.smoothedNormalizedMACD.append((self.smoothedMACD[-1] - lowestSmoothedMACD) / highestSmoothedMACD * 100 if highestSmoothedMACD > 0 else self.smoothedNormalizedMACD[-1])
# 平滑归一化后MACD得到STC值
self.STCValue.append(self.smoothedNormalizedMACD[-1] if self.STCValue[-1]==1 else self.STCValue[-2] + self.smoothingFactor * (self.smoothedNormalizedMACD[-1] - self.STCValue[-2]))
STCvalue1=round(self.STCValue[-1]-50,0)
STCvalue2=round(self.STCValue[-2]-50,0)
STCvalue3=round(self.STCValue[-3]-50,0)
# 如果当前Bar的计数器值小于self.fastLength
if(self.current_bar<self.length):
return
Highup=max(am.high_array[-self.length:-1])
Lowdown=min(am.low_array[-self.length:-1])
#/*开平仓条件代码-----------------------------------------------------------------------------------------------------------------------------------*/
cond1=STCvalue1>STCvalue2 and STCvalue3>=STCvalue2
cond2=STCvalue1<STCvalue2 and STCvalue3<=STCvalue2
cond3=self.MA1[-1]>self.MA2[-1] and am.close_array[-1]>self.MA2[-1]
cond4=self.MA1[-1]<self.MA2[-1] and am.close_array[-1]<self.MA2[-1]
if cond1:
self.gobro=1
if cond2:
self.gobro=-1
if self.pos!=1 and self.gobro==1 and bar.high_price>=Highup and STCvalue1<0 and cond3==True:
if self.pos == 0:
self.buy(bar.close_price, self.lots)
elif self.pos < 0:
self.cover(bar.close_price, self.lots)
self.buy(bar.close_price, self.lots)
self.liQKA = 1
self.LowerAfterEntry = bar.close_price
if self.pos!=-1 and self.gobro==-1 and bar.low_price<=Lowdown and STCvalue1>0 and cond4==True :
if self.pos == 0:
self.short(bar.close_price, self.lots)
elif self.pos > 0:
self.sell(bar.close_price, self.lots)
self.short(bar.close_price, self.lots)
self.liQKA = 1
self.HigherAfterEntry = bar.close_price
#记录入场后的最高价和最低价
if self.pos>0:
#self.HigherAfterEntry = HigherAfterEntry
self.LowerAfterEntry = max(self.LowerAfterEntry, bar.low_price)
elif self.pos<0:
self.HigherAfterEntry = min(self.HigherAfterEntry, bar.high_price)
#self.LowerAfterEntry = LowerAfterEntry
if self.pos == 0:
self.liQKA = 1
else:
self.liQKA = self.liQKA - 0.1
self.liQKA = max(self.liQKA, 0.5)
if self.pos>0:
DliqPoint = self.LowerAfterEntry - ((bar.open_price)*self.TS/1000)*self.liQKA
#print(bar.datetime,"DliqPoint",DliqPoint,"low_price",bar.low_price,"self.LowerAfterEntry",self.LowerAfterEntry)
if self.pos<0:
KliqPoint = self.HigherAfterEntry + ((bar.open_price)*self.TS/1000)*self.liQKA
#print(bar.datetime,"KliqPoint",KliqPoint,"high_price",bar.high_price,"self.HigherAfterEntry",self.HigherAfterEntry)
if (self.pos>0 and bar.low_price <= DliqPoint and DliqPoint>0):
#多头出场
self.sell(bar.close_price, self.lots)
self.gobro=0
elif (self.pos<0 and bar.high_price > KliqPoint and KliqPoint>0):
#空头出场
self.cover(bar.close_price, self.lots)
self.gobro=0
self.current_bar += 1
self.put_event()
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
pass