Files

780 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
'''
使用说明:使用前需要调整的相关参数如下
1.确定python到csv文件夹下运行,修改csv文件为需要运行的csv
2.配置邮件信息和参数。
3.tickdata函数中一、修改时间冲采样resample中rule周期5T为交易周期
4.GetOrderFlow_dj函数一、堆积函数config参数暂时均为3
5.MyTrader类:
1) init函数初始化:委托价格的偏移、跟踪出场、固定出差参数、交易手数的设置;
2) day_data_reset函数、每日收盘重置数据按照交易品种设置。
3Join函数修改“开多组合”和“开空组合”
6. __main__函数设置交易账户变量
该代码的主要目的是处理Tick数据并生成交易信号。代码中定义了一个tickcome函数它接收到Tick数据后会进行一系列的处理包括构建Tick字典、更新上一个Tick的成交量、保存Tick数据、生成K线数据等。其中涉及到的一些函数有
on_tick(tick): 处理单个Tick数据根据Tick数据生成K线数据。
tickdata(df, symbol): 处理Tick数据生成K线数据。
orderflow_df_new(df_tick, df_min, symbol): 处理Tick和K线数据生成订单流数据。
GetOrderFlow_dj(kData): 计算订单流的信号指标。
除此之外代码中还定义了一个MyTrader类继承自TraderApiBase用于实现交易相关的功能。
'''
# 需要完善__main__函数中手动设置subscribe_list变量通过时间判断是否需要进行换月修改并发送邮件通知
from multiprocessing import Process, Queue
from AlgoPlus.CTP.MdApi import run_tick_engine
from AlgoPlus.CTP.FutureAccount import get_simulate_account
from AlgoPlus.CTP.FutureAccount import FutureAccount
from AlgoPlus.CTP.TraderApiBase import TraderApiBase
from AlgoPlus.ta.time_bar import tick_to_bar
import pandas as pd
from datetime import datetime
from datetime import time as s_time
import operator
import time
import numpy as np
import os
import re
# 加入邮件通知
import smtplib
from email.mime.text import MIMEText # 导入 MIMEText 类发送纯文本邮件
from email.mime.multipart import MIMEMultipart # 导入 MIMEMultipart 类发送带有附件的邮件
from email.mime.application import MIMEApplication # 导入 MIMEApplication 类发送二进制附件
## 配置邮件信息
receivers = ["***@qq.com"] # 设置邮件接收人地址
subject = "订单流策略交易信号" # 设置邮件主题
#text = " " # 设置邮件正文
# file_path = "test.txt" # 设置邮件附件文件路径
## 配置邮件服务器信息
smtp_server = "smtp.qq.com" # 设置发送邮件的 SMTP 服务器地址
smtp_port = 465 # 设置发送邮件的 SMTP 服务器端口号,一般为 25 端口 465
sender = "***@qq.com" # 设置发送邮件的邮箱地址
username = "***@qq.com" # 设置发送邮件的邮箱用户名
password = "zrmpcgttataabhjh" #zrmpcgttataabhjh设置发送邮件的邮箱密码或授权码
tickdatadict = {} # 存储Tick数据的字典
quotedict = {} # 存储行情数据的字典
ofdatadict = {} # 存储K线数据的字典
trader_df = pd.DataFrame({}) # 存储交易数据的DataFrame对象
previous_volume = {} # 上一个Tick的成交量
tsymbollist={}
# 邮件通知模块
def send_mail(text):
msg = MIMEMultipart()
msg["From"] = sender
msg["To"] = ";".join(receivers)
msg["Subject"] = subject
msg.attach(MIMEText(text, "plain", "utf-8"))
smtp = smtplib.SMTP_SSL(smtp_server, smtp_port)
smtp.login(username, password)
smtp.sendmail(sender, receivers, msg.as_string())
smtp.quit()
def tickcome(md_queue):
global previous_volume
data=md_queue
instrument_id = data['InstrumentID'].decode() # 品种代码
ActionDay = data['ActionDay'].decode() # 交易日日期
update_time = data['UpdateTime'].decode() # 更新时间
update_millisec = str(data['UpdateMillisec']) # 更新毫秒数
created_at = ActionDay[:4] + '-' + ActionDay[4:6] + '-' + ActionDay[6:] + ' ' + update_time + '.' + update_millisec #创建时间
# 构建tick字典
tick = {
'symbol': instrument_id, # 品种代码和交易所ID
'created_at':datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S.%f"),
#'created_at': created_at, # 创建时间
'price': float(data['LastPrice']), # 最新价
'last_volume': int(data['Volume']) - previous_volume.get(instrument_id, 0) if previous_volume.get(instrument_id, 0) != 0 else 0, # 瞬时成交量
'bid_p': float(data['BidPrice1']), # 买价
'bid_v': int(data['BidVolume1']), # 买量
'ask_p': float(data['AskPrice1']), # 卖价
'ask_v': int(data['AskVolume1']), # 卖量
'UpperLimitPrice': float(data['UpperLimitPrice']), # 涨停价
'LowerLimitPrice': float(data['LowerLimitPrice']), # 跌停价
'TradingDay': data['TradingDay'].decode(), # 交易日日期
'cum_volume': int(data['Volume']), # 最新总成交量
'cum_amount': float(data['Turnover']), # 最新总成交额
'cum_position': int(data['OpenInterest']), # 合约持仓量
}
# 更新上一个Tick的成交量
previous_volume[instrument_id] = int(data['Volume'])
if tick['last_volume']>0:
#print(tick['created_at'],'vol:',tick['last_volume'])
# 处理Tick数据
on_tick(tick)
def can_time(hour, minute):
hour = str(hour)
minute = str(minute)
if len(minute) == 1:
minute = "0" + minute
return int(hour + minute)
def on_tick(tick):
tm=can_time(tick['created_at'].hour,tick['created_at'].minute)
#print(tick['symbol'])
#print(1)
#if tm>1500 and tm<2100 :
# return
if tick['last_volume']==0:
return
quotes = tick
timetick=str(tick['created_at']).replace('+08:00', '')
tsymbol=tick['symbol']
if tsymbol not in tsymbollist.keys():
# 获取tick的买卖价和买卖量
tsymbollist[tsymbol]=tick
bid_p=quotes['bid_p']
ask_p=quotes['ask_p']
bid_v=quotes['bid_v']
ask_v=quotes['ask_v']
else:
# 获取上一个tick的买卖价和买卖量
rquotes =tsymbollist[tsymbol]
bid_p=rquotes['bid_p']
ask_p=rquotes['ask_p']
bid_v=rquotes['bid_v']
ask_v=rquotes['ask_v']
tsymbollist[tsymbol]=tick
tick_dt=pd.DataFrame({'datetime':timetick,'symbol':tick['symbol'],'mainsym':tick['symbol'].rstrip('0123456789').upper(),'lastprice':tick['price'],
'vol':tick['last_volume'],
'bid_p':bid_p,'ask_p':ask_p,'bid_v':bid_v,'ask_v':ask_v},index=[0])
sym = tick_dt['symbol'][0]
#print(tick_dt)
tickdata(tick_dt,sym)
# 这个函数的主要目的是将输入的买盘和卖盘字典合并、排序、累加并将处理后的结果存储在一个全局字典quotedict中同时返回这个结果。
def data_of(df):
global trader_df
# 将df数据合并到trader_df中
trader_df = pd.concat([trader_df, df], ignore_index=True)
#print('trader_df: ', len(trader_df))
#print(trader_df)
def process(bidDict, askDict, symbol):
try:
# 尝试从quotedict中获取对应品种的报价数据
dic = quotedict[symbol]
bidDictResult = dic['bidDictResult']
askDictResult = dic['askDictResult']
except:
# 如果获取失败则初始化bidDictResult和askDictResult为空字典
bidDictResult, askDictResult = {}, {}
# 将所有买盘字典和卖盘字典的key合并并按升序排序
sList = sorted(set(list(bidDict.keys()) + list(askDict.keys())))
# 遍历所有的key将相同key的值进行累加
for s in sList:
if s in bidDict:
if s in bidDictResult:
bidDictResult[s] = int(bidDict[s]) + bidDictResult[s]
else:
bidDictResult[s] = int(bidDict[s])
if s not in askDictResult:
askDictResult[s] = 0
else:
if s in askDictResult:
askDictResult[s] = int(askDict[s]) + askDictResult[s]
else:
askDictResult[s] = int(askDict[s])
if s not in bidDictResult:
bidDictResult[s] = 0
# 构建包含bidDictResult和askDictResult的字典并存入quotedict中
df = {'bidDictResult': bidDictResult, 'askDictResult': askDictResult}
quotedict[symbol] = df
return bidDictResult, askDictResult
def tickdata(df,symbol):
tickdata =pd.DataFrame({'datetime':df['datetime'],'symbol':df['symbol'],'lastprice':df['lastprice'],
'volume':df['vol'],'bid_p':df['bid_p'],'bid_v':df['bid_v'],'ask_p':df['ask_p'],'ask_v':df['ask_v']})
try:
if symbol in tickdatadict.keys():
rdf=tickdatadict[symbol]
rdftm=pd.to_datetime(rdf['bartime'][0]).strftime('%Y-%m-%d %H:%M:%S')
now=str(tickdata['datetime'][0])
if now>rdftm:
try:
oo=ofdatadict[symbol]
data_of(oo)
#print('oo',oo)
if symbol in quotedict.keys():
quotedict.pop(symbol)
if symbol in tickdatadict.keys():
tickdatadict.pop(symbol)
if symbol in ofdatadict.keys():
ofdatadict.pop(symbol)
except IOError as e:
print('rdftm捕获到异常',e)
tickdata['bartime'] = pd.to_datetime(tickdata['datetime'])
tickdata['open'] = tickdata['lastprice']
tickdata['high'] = tickdata['lastprice']
tickdata['low'] = tickdata['lastprice']
tickdata['close'] = tickdata['lastprice']
tickdata['starttime'] = tickdata['datetime']
else:
tickdata['bartime'] = rdf['bartime']
tickdata['open'] = rdf['open']
tickdata['high'] = max(tickdata['lastprice'].values,rdf['high'].values)
tickdata['low'] = min(tickdata['lastprice'].values,rdf['low'].values)
tickdata['close'] = tickdata['lastprice']
tickdata['volume']=df['vol']+rdf['volume'].values
tickdata['starttime'] = rdf['starttime']
else :
print('新bar的第一个tick进入')
tickdata['bartime'] = pd.to_datetime(tickdata['datetime'])
tickdata['open'] = tickdata['lastprice']
tickdata['high'] = tickdata['lastprice']
tickdata['low'] = tickdata['lastprice']
tickdata['close'] = tickdata['lastprice']
tickdata['starttime'] = tickdata['datetime']
except IOError as e:
print('捕获到异常',e)
tickdata['bartime'] = pd.to_datetime(tickdata['bartime'])
bardata = tickdata.resample(on = 'bartime',rule = '1T',label = 'right',closed = 'right').agg({'starttime':'first','symbol':'last','open':'first','high':'max','low':'min','close':'last','volume':'sum'}).reset_index(drop = False)
bardata =bardata.dropna().reset_index(drop = True)
bardata['bartime'] = pd.to_datetime(bardata['bartime'][0]).strftime('%Y-%m-%d %H:%M:%S')
tickdatadict[symbol]=bardata
tickdata['volume']=df['vol'].values
#print(bardata['symbol'].values,bardata['bartime'].values)
orderflow_df_new(tickdata,bardata,symbol)
# time.sleep(0.5)
def orderflow_df_new(df_tick,df_min,symbol):
startArray = pd.to_datetime(df_min['starttime']).values
voluememin= df_min['volume'].values
highs=df_min['high'].values
lows=df_min['low'].values
opens=df_min['open'].values
closes=df_min['close'].values
#endArray = pd.to_datetime(df_min['bartime']).values
endArray = df_min['bartime'].values
#print(endArray)
deltaArray = np.zeros((len(endArray),))
tTickArray = pd.to_datetime(df_tick['datetime']).values
bp1TickArray = df_tick['bid_p'].values
ap1TickArray = df_tick['ask_p'].values
lastTickArray = df_tick['lastprice'].values
volumeTickArray = df_tick['volume'].values
symbolarray = df_tick['symbol'].values
indexFinal = 0
for index,tEnd in enumerate(endArray):
dt=endArray[index]
start = startArray[index]
bidDict = {}
askDict = {}
bar_vol=voluememin[index]
bar_close=closes[index]
bar_open=opens[index]
bar_low=lows[index]
bar_high=highs[index]
bar_symbol=symbolarray[index]
# for indexTick in range(indexFinal,len(df_tick)):
# if tTickArray[indexTick] >= tEnd:
# break
# elif (tTickArray[indexTick] >= start) & (tTickArray[indexTick] < tEnd):
Bp = round(bp1TickArray[0],4)
Ap = round(ap1TickArray[0],4)
LastPrice = round(lastTickArray[0],4)
Volume = volumeTickArray[0]
if LastPrice >= Ap:
if str(LastPrice) in askDict.keys():
askDict[str(LastPrice)] += Volume
else:
askDict[str(LastPrice)] = Volume
if LastPrice <= Bp:
if str(LastPrice) in bidDict.keys():
bidDict[str(LastPrice)] += Volume
else:
bidDict[str(LastPrice)] = Volume
# indexFinal = indexTick
bidDictResult,askDictResult = process(bidDict,askDict,symbol)
bidDictResult=dict(sorted(bidDictResult.items(),key=operator.itemgetter(0)))
askDictResult=dict(sorted(askDictResult.items(),key=operator.itemgetter(0)))
prinslist=list(bidDictResult.keys())
asklist=list(askDictResult.values())
bidlist=list(bidDictResult.values())
delta=(sum(askDictResult.values()) - sum(bidDictResult.values()))
#print(prinslist,asklist,bidlist)
#print(len(prinslist),len(bidDictResult),len(askDictResult))
df=pd.DataFrame({'price':pd.Series([prinslist]),'Ask':pd.Series([asklist]),'Bid':pd.Series([bidlist])})
#df=pd.DataFrame({'price':pd.Series(bidDictResult.keys()),'Ask':pd.Series(askDictResult.values()),'Bid':pd.Series(bidDictResult.values())})
df['symbol']=bar_symbol
df['datetime']=dt
df['delta']=str(delta)
df['close']=bar_close
df['open']=bar_open
df['high']=bar_high
df['low']=bar_low
df['volume']=bar_vol
#df['ticktime']=tTickArray[0]
df['dj'] = GetOrderFlow_dj(df)
ofdatadict[symbol]=df
#公众号松鼠Quant
#主页www.quant789.com
#本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!!
#版权归松鼠Quant所有禁止转发、转卖源码违者必究。
def GetOrderFlow_dj(kData):
Config = {
'Value1': 3,
'Value2': 3,
'Value3': 3,
'Value4': True,
}
aryData = kData
djcout = 0
# 遍历kData中的每一行计算djcout指标
for index, row in aryData.iterrows():
kItem = aryData.iloc[index]
high = kItem['high']
low = kItem['low']
close = kItem['close']
open = kItem['open']
dtime = kItem['datetime']
price_s = kItem['price']
Ask_s = kItem['Ask']
Bid_s = kItem['Bid']
delta = kItem['delta']
price_s = price_s
Ask_s = Ask_s
Bid_s = Bid_s
gj = 0
xq = 0
gxx = 0
xxx = 0
# 遍历price_s中的每一个元素计算相关指标
for i in np.arange(0, len(price_s), 1):
duiji = {
'price': 0,
'time': 0,
'longshort': 0,
}
if i == 0:
delta = delta
order = {
"Price": price_s[i],
"Bid": { "Value":Bid_s[i]},
"Ask": { "Value":Ask_s[i]}
}
#空头堆积
if i >= 0 and i < len(price_s) - 1:
if (order["Bid"]["Value"] > Ask_s[i + 1] * int(Config['Value1'])):
gxx += 1
gj += 1
if gj >= int(Config['Value2']) and Config['Value4'] == True:
duiji['price'] = price_s[i]
duiji['time'] = dtime
duiji['longshort'] = -1
if float(duiji['price']) > 0:
djcout += -1
else:
gj = 0
#多头堆积
if i >= 1 and i < len(price_s) - 1:
if (order["Ask"]["Value"] > Bid_s[i - 1] * int(Config['Value1'])):
xq += 1
xxx += 1
if xq >= int(Config['Value2']) and Config['Value4'] == True:
duiji['price'] = price_s[i]
duiji['time'] = dtime
duiji['longshort'] = 1
if float(duiji['price']) > 0:
djcout += 1
else:
xq = 0
# 返回计算得到的djcout值
return djcout
#交易程序---------------------------------------------------------------------------------------------------------------------------------------------------------------------
class MyTrader(TraderApiBase):
def __init__(self, broker_id, td_server, investor_id, password, app_id, auth_code, md_queue=None, page_dir='', private_resume_type=2, public_resume_type=2):
self.py=5 #设置委托价格的偏移,更加容易促成成交。仅螺纹,其他品种根据最小点波动,自己设置
self.cont_df=0
self.trailing_stop_percent = 0.02 #跟踪出场参数
self.fixed_stop_loss_percent = 0.01 #固定出场参数
self.dj_X=1 #开仓的堆积参数
self.pos=0
self.Lots=1
self.short_trailing_stop_price = 0
self.long_trailing_stop_price = 0
self.sl_long_price=0
self.sl_shor_price=0
self.out_long=0
self.out_short=0
self.clearing_executed=False
self.kgdata=True
#读取保存的数据
def read_to_csv(self,symbol):
# 文件夹路径和文件路径
# 使用正则表达式提取英文字母并重新赋值给symbol
symbol = ''.join(re.findall('[a-zA-Z]', str(symbol)))
folder_path = "traderdata"
file_path = os.path.join(folder_path, f"{str(symbol)}traderdata.csv")
# 如果文件夹不存在则创建
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# 读取保留的模型数据CSV文件
if os.path.exists(file_path):
df = pd.read_csv(file_path)
if not df.empty and self.kgdata==True:
# 选择最后一行数据
row = df.iloc[-1]
# 根据CSV文件的列名将数据赋值给相应的属性
self.pos = int(row['pos'])
self.short_trailing_stop_price = float(row['short_trailing_stop_price'])
self.long_trailing_stop_price = float(row['long_trailing_stop_price'])
self.sl_long_price = float(row['sl_long_price'])
self.sl_shor_price = float(row['sl_shor_price'])
# self.out_long = int(row['out_long'])
# self.out_short = int(row['out_short'])
print("找到历史交易数据文件,已经更新持仓,止损止盈数据", df.iloc[-1])
self.kgdata=False
else:
pass
#print("没有找到历史交易数据文件", file_path)
#如果没有找到CSV则初始化变量
pass
#保存数据
def save_to_csv(self,symbol):
# 使用正则表达式提取英文字母并重新赋值给symbol
symbol = ''.join(re.findall('[a-zA-Z]', str(symbol)))
# 创建DataFrame
data = {
'datetime': [trader_df['datetime'].iloc[-1]],
'pos': [self.pos],
'short_trailing_stop_price': [self.short_trailing_stop_price],
'long_trailing_stop_price': [self.long_trailing_stop_price],
'sl_long_price': [self.sl_long_price],
'sl_shor_price': [self.sl_shor_price],
# 'out_long': [self.out_long],
# 'out_short': [self.out_short]
}
df = pd.DataFrame(data)
# 将DataFrame保存到CSV文件
df.to_csv(f"traderdata/{str(symbol)}traderdata.csv", index=False)
#每日收盘重置数据
def day_data_reset(self):
# 获取当前时间
current_time = datetime.now().time()
# 第一时间范围
clearing_time1_start = s_time(15,00)
clearing_time1_end = s_time(15,15)
# 第二时间范围
clearing_time2_start = s_time(23,0)
clearing_time2_end = s_time(23,15)
# 创建一个标志变量,用于记录是否已经执行过
self.clearing_executed = False
# 检查当前时间第一个操作的时间范围内
if clearing_time1_start <= current_time <= clearing_time1_end and not self.clearing_executed :
self.clearing_executed = True # 设置标志变量为已执行
trader_df.drop(trader_df.index,inplace=True)#清除当天的行情数据
# 检查当前时间是否在第二个操作的时间范围内
elif clearing_time2_start <= current_time <= clearing_time2_end and not self.clearing_executed :
self.clearing_executed = True # 设置标志变量为已执行
trader_df.drop(trader_df.index,inplace=True) #清除当天的行情数据
else:
self.clearing_executed = False
pass
return self.clearing_executed
def OnRtnTrade(self, pTrade):
print("||成交回报||", pTrade)
def OnRspOrderInsert(self, pInputOrder, pRspInfo, nRequestID, bIsLast):
print("||OnRspOrderInsert||", pInputOrder, pRspInfo, nRequestID, bIsLast)
# 订单状态通知
def OnRtnOrder(self, pOrder):
print("||订单回报||", pOrder)
def Join(self):
data = None
while True:
if self.status == 0:
while not self.md_queue.empty():
data = self.md_queue.get(block=False)
instrument_id = data['InstrumentID'].decode() # 品种代码
self.read_to_csv(instrument_id)
self.day_data_reset()
tickcome(data)
#新K线开始启动交易程序 and 保存行情数据
if len(trader_df)>self.cont_df:
# 检查文件是否存在
csv_file_path = f"traderdata/{instrument_id}_ofdata.csv"
if os.path.exists(csv_file_path):
# 仅保存最后一行数据
trader_df.tail(1).to_csv(csv_file_path, mode='a', header=False, index=False)
else:
# 创建新文件并保存整个DataFrame
trader_df.to_csv(csv_file_path, index=False)
# 更新跟踪止损价格
if self.long_trailing_stop_price >0 and self.pos>0:
#print('datetime+sig: ',dt,'旧多头出线',self.long_trailing_stop_price,'low',self.low[0])
self.long_trailing_stop_price = trader_df['low'].iloc[-1] if self.long_trailing_stop_price<trader_df['low'].iloc[-1] else self.long_trailing_stop_price
self.save_to_csv(instrument_id)
#print('datetime+sig: ',dt,'多头出线',self.long_trailing_stop_price)
if self.short_trailing_stop_price >0 and self.pos<0:
#print('datetime+sig: ',dt,'旧空头出线',self.short_trailing_stop_price,'high',self.high[0])
self.short_trailing_stop_price = trader_df['high'].iloc[-1] if trader_df['high'].iloc[-1] <self.short_trailing_stop_price else self.short_trailing_stop_price
self.save_to_csv(instrument_id)
#print('datetime+sig: ',dt,'空头出线',self.short_trailing_stop_price)
self.out_long=self.long_trailing_stop_price * (1 - self.trailing_stop_percent)
self.out_short=self.short_trailing_stop_price*(1 + self.trailing_stop_percent)
#print('datetime+sig: ',dt,'空头出线',self.out_short)
#print('datetime+sig: ',dt,'多头出线',self.out_long)
# 跟踪出场
if self.out_long >0:
print('datetime+sig: ',trader_df['datetime'].iloc[-1],'预设——多头止盈——','TR',self.out_long,'low', trader_df['low'].iloc[-1])
if trader_df['low'].iloc[-1] < self.out_long and self.pos>0 and self.sl_long_price>0 and trader_df['low'].iloc[-1]>self.sl_long_price:
print('datetime+sig: ',trader_df['datetime'].iloc[-1],'多头止盈','TR',self.out_long,'low', trader_df['low'].iloc[-1])
#平多
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-self.py,self.Lots,b'1',b'1')
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-self.py,self.Lots,b'1',b'3')
self.long_trailing_stop_price = 0
self.out_long=0
self.sl_long_price=0
self.pos = 0
self.save_to_csv(instrument_id)
if self.out_short>0:
print('datetime+sig: ',trader_df['datetime'].iloc[-1],'预设——空头止盈——: ','TR',self.out_short,'high', trader_df['high'].iloc[-1])
if trader_df['high'].iloc[-1] > self.out_short and self.pos<0 and self.sl_shor_price>0 and trader_df['high'].iloc[-1]<self.sl_shor_price:
print('datetime+sig: ',trader_df['datetime'].iloc[-1],'空头止盈: ','TR',self.out_short,'high', trader_df['high'].iloc[-1])
#平空
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+self.py,self.Lots,b'0',b'1')
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+self.py,self.Lots,b'0',b'3')
self.short_trailing_stop_price = 0
self.sl_shor_price=0
self.out_shor=0
self.pos = 0
self.save_to_csv(instrument_id)
# 固定止损
self.fixed_stop_loss_L = self.sl_long_price * (1 - self.fixed_stop_loss_percent)
if self.pos>0:
print('datetime+sig: ', trader_df['datetime'].iloc[-1], '预设——多头止损', 'SL', self.fixed_stop_loss_L, 'close', trader_df['close'].iloc[-1])
if self.sl_long_price>0 and self.fixed_stop_loss_L>0 and self.pos > 0 and trader_df['close'].iloc[-1] < self.fixed_stop_loss_L:
print('datetime+sig: ', trader_df['datetime'].iloc[-1], '多头止损', 'SL', self.fixed_stop_loss_L, 'close', trader_df['close'].iloc[-1])
#平多
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-self.py,self.Lots,b'1',b'1')
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-self.py,self.Lots,b'1',b'3')
self.long_trailing_stop_price = 0
self.sl_long_price=0
self.out_long = 0
self.pos = 0
self.save_to_csv(instrument_id)
self.fixed_stop_loss_S = self.sl_shor_price * (1 + self.fixed_stop_loss_percent)
if self.pos<0:
print('datetime+sig: ', trader_df['datetime'].iloc[-1], '预设——空头止损', 'SL', self.fixed_stop_loss_S, 'close', trader_df['close'].iloc[-1])
if self.sl_shor_price>0 and self.fixed_stop_loss_S>0 and self.pos < 0 and trader_df['close'].iloc[-1] > self.fixed_stop_loss_S:
print('datetime+sig: ', trader_df['datetime'].iloc[-1], '空头止损', 'SL', self.fixed_stop_loss_S, 'close', trader_df['close'].iloc[-1])
#平空
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+self.py,self.Lots,b'0',b'1')
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+self.py,self.Lots,b'0',b'3')
self.short_trailing_stop_price = 0
self.sl_shor_price=0
self.out_short = 0
self.pos = 0
self.save_to_csv(instrument_id)
#日均线
trader_df['dayma']=trader_df['close'].mean()
# 计算累积的delta值
trader_df['delta'] = trader_df['delta'].astype(float)
trader_df['delta累计'] = trader_df['delta'].cumsum()
#大于日均线
开多1=trader_df['dayma'].iloc[-1] > 0 and trader_df['close'].iloc[-1] > trader_df['dayma'].iloc[-1]
#累计多空净量大于X
开多4=trader_df['delta累计'].iloc[-1] > 2000 and trader_df['delta'].iloc[-1] > 1500
#小于日均线
开空1=trader_df['dayma'].iloc[-1]>0 and trader_df['close'].iloc[-1] < trader_df['dayma'].iloc[-1]
#累计多空净量小于X
开空4=trader_df['delta累计'].iloc[-1] < -2000 and trader_df['delta'].iloc[-1] < -1500
开多组合= 开多1 and 开多4 and trader_df['dj'].iloc[-1]>self.dj_X
开空条件= 开空1 and 开空4 and trader_df['dj'].iloc[-1]<-self.dj_X
平多条件=trader_df['dj'].iloc[-1]<-self.dj_X
平空条件=trader_df['dj'].iloc[-1]>self.dj_X
#开仓
#多头开仓条件
if self.pos<0 and 平空条件 :
print('平空: ','ExchangeID: ',data['ExchangeID'],'InstrumentID',data['InstrumentID'],'AskPrice1',data['AskPrice1']+self.py)
#insert_order:买卖方向开仓0平仓1强平2平今3平昨4强减5本地强平6
#平空
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+self.py,self.Lots,b'0',b'1')
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+self.py,self.Lots,b'0',b'3')
self.pos=0
self.sl_shor_price=0
self.short_trailing_stop_price=0
print('datetime+sig: ', trader_df['datetime'].iloc[-1], '反手平空:', '平仓价格:', data['AskPrice1']+self.py,'堆积数:', trader_df['dj'].iloc[-1])
self.save_to_csv(instrument_id)
#发送邮件
text = f"平空交易: 交易品种为{data['InstrumentID']}, 交易时间为{trader_df['datetime'].iloc[-1]}, 反手平空的平仓价格{data['AskPrice1']+self.py}"
send_mail(text)
if self.pos==0 and 开多组合:
print('开多: ','ExchangeID: ',data['ExchangeID'],'InstrumentID',data['InstrumentID'],'AskPrice1',data['AskPrice1']+self.py)
#开多
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+self.py,self.Lots,b'0',b'0')
print('datetime+sig: ', trader_df['datetime'].iloc[-1], '多头开仓', '开仓价格:', data['AskPrice1']+self.py,'堆积数:', trader_df['dj'].iloc[-1])
self.pos=1
self.long_trailing_stop_price=data['AskPrice1']
self.sl_long_price=data['AskPrice1']
self.save_to_csv(instrument_id)
#发送邮件
text = f"开多交易: 交易品种为{data['InstrumentID']}, 交易时间为{trader_df['datetime'].iloc[-1]}, 多头开仓的开仓价格{data['AskPrice1']+self.py}预设——多头止盈——TR{self.out_long},多头止损SL{self.fixed_stop_loss_L}"
send_mail(text)
if self.pos>0 and 平多条件 :
print('平多: ','ExchangeID: ',data['ExchangeID'],'InstrumentID',data['InstrumentID'],'BidPrice1',data['BidPrice1']-self.py)
#平多
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-self.py,self.Lots,b'1',b'1')
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-self.py,self.Lots,b'1',b'3')
self.pos=0
self.long_trailing_stop_price=0
self.sl_long_price=0
print('datetime+sig: ', trader_df['datetime'].iloc[-1], '反手平多', '平仓价格:', data['BidPrice1']-self.py,'堆积数:', trader_df['dj'].iloc[-1])
self.save_to_csv(instrument_id)
#发送邮件
text = f"平多交易: 交易品种为{data['InstrumentID']}, 交易时间为{trader_df['datetime'].iloc[-1]}, 反手平多的平仓价格{data['BidPrice1']-self.py}"
send_mail(text)
if self.pos==0 and 开空条件 :
print('开空: ','ExchangeID: ',data['ExchangeID'],'InstrumentID',data['InstrumentID'],'BidPrice1',data['BidPrice1'])
#开空
self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-self.py,self.Lots,b'1',b'0')
print('datetime+sig: ', trader_df['datetime'].iloc[-1], '空头开仓', '开仓价格:', data['BidPrice1']-self.py,'堆积数:', trader_df['dj'].iloc[-1])
self.pos=-1
self.short_trailing_stop_price=data['BidPrice1']
self.sl_shor_price=data['BidPrice1']
self.save_to_csv(instrument_id)
# 发送邮件
text = f"开空交易: 交易品种为{data['InstrumentID']}, 交易时间为{trader_df['datetime'].iloc[-1]}, 空头开仓的开仓价格{data['BidPrice1']-self.py},预设——空头止盈——TR{self.out_short},空头止损{self.fixed_stop_loss_S}"
send_mail(text)
print(trader_df)
self.cont_df=len(trader_df)
else:
time.sleep(1)
def run_trader(broker_id, td_server, investor_id, password, app_id, auth_code, md_queue=None, page_dir='', private_resume_type=2, public_resume_type=2):
my_trader = MyTrader(broker_id, td_server, investor_id, password, app_id, auth_code, md_queue, page_dir, private_resume_type, public_resume_type)
my_trader.Join()
if __name__ == '__main__':
#global symbol
#公众号松鼠Quant
#主页www.quant789.com
#本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!!
#版权归松鼠Quant所有禁止转发、转卖源码违者必究。
#注意运行前请先安装好algoplus,
# pip install AlgoPlus
#http://www.algo.plus/ctp/python/0103001.html
#用simnow模拟不要忘记屏蔽下方实盘的future_account字典
future_account = get_simulate_account(
investor_id='135858', # simnow账户注意是登录账户的IDSIMNOW个人首页查看
password='Zj82334475', # simnow密码
server_name='TEST', # 电信1、电信2、移动、TEST、N视界
subscribe_list=[b'rb2405'], # 合约列表
)
#实盘用这个不要忘记屏蔽上方simnow的future_account字典
# future_account = FutureAccount(
# broker_id='', # 期货公司BrokerID
# server_dict={'TDServer': "ip:port", 'MDServer': 'ip:port'}, # TDServer为交易服务器MDServer为行情服务器。服务器地址格式为"ip:port。"
# reserve_server_dict={}, # 备用服务器地址
# investor_id='', # 账户
# password='', # 密码
# app_id='simnow_client_test', # 认证使用AppID
# auth_code='0000000000000000', # 认证使用授权码
# subscribe_list=[b'rb2405'], # 订阅合约列表
# md_flow_path='./log', # MdApi流文件存储地址默认MD_LOCATION
# td_flow_path='./log', # TraderApi流文件存储地址默认TD_LOCATION
# )
print('开始',len(future_account.subscribe_list))
# 共享队列
share_queue = Queue(maxsize=200)
# 行情进程
md_process = Process(target=run_tick_engine, args=(future_account, [share_queue]))
# 交易进程
trader_process = Process(target=run_trader, args=(
future_account.broker_id,
future_account.server_dict['TDServer'],
future_account.investor_id,
future_account.password,
future_account.app_id,
future_account.auth_code,
share_queue, # 队列
future_account.td_flow_path
))
md_process.start()
trader_process.start()
# success = f"行情和交易启动成功!{future_account.subscribe_list}"
# send_mail(success)
md_process.join()
trader_process.join()