f"""
该代码的主要目的是处理Tick数据并生成交易信号。代码中定义了一个tickcome函数,它接收到Tick数据后会进行一系列的处理,包括构建Tick字典、更新上一个Tick的成交量、保存Tick数据、生成K线数据等。其中涉及到的一些函数有:
on_tick(tick): 处理单个Tick数据,根据Tick数据生成K线数据。
tickdata(df, symbol): 处理Tick数据,生成K线数据。
orderflow_df_new(df_tick, df_min, symbol): 处理Tick和K线数据,生成订单流数据。F
GetOrderFlow_dj(kData): 计算订单流的信号指标。
除此之外,代码中还定义了一个MyTrader类,继承自TraderApiBase,用于实现交易相关的功能。
"""
# from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process, Queue
import queue
import threading
from AlgoPlus.CTP.MdApi import run_tick_engine
from AlgoPlus.CTP.FutureAccount import get_simulate_account
from AlgoPlus.CTP.FutureAccount import FutureAccount
from AlgoPlus.CTP.TraderApiBase import TraderApiBase
# from AlgoPlus.ta.time_bar import tick_to_bar
import pandas as pd
from datetime import datetime, timedelta
from datetime import time as s_time
import operator
import time
import numpy as np
import os
import re
# import talib as tb
import akshare as ak
import ast
# 加入邮件通知
import smtplib
from email.mime.text import MIMEText # 导入 MIMEText 类发送纯文本邮件
from email.mime.multipart import (
MIMEMultipart,
)
last_sent_time = 0
time_period = 48
# from email.mime.application import MIMEApplication
# 配置邮件信息
receivers = ["240884432@qq.com"] # 设置邮件接收人地址
subject = "TD_Simnow_Signal" # 设置邮件主题 订单流策略交易信号
# 配置邮件服务器信息
smtp_server = "smtp.qq.com" # 设置发送邮件的 SMTP 服务器地址
smtp_port = 465 # 设置发送邮件的 SMTP 服务器端口号,一般为 25 端口 465
sender = "240884432@qq.com" # 设置发送邮件的邮箱地址
username = "240884432@qq.com" # 设置发送邮件的邮箱用户名
password = "osjyjmbqrzxtbjbf" # zrmpcgttataabhjh,设置发送邮件的邮箱密码或授权码
tickdatadict = {} # 存储Tick数据的字典
quotedict = {} # 存储行情数据的字典
ofdatadict = {} # 存储K线数据的字典
trade_dfs = {} # pd.DataFrame({}) # 存储交易数据的DataFrame对象
previous_volume = {} # 上一个Tick的成交量
tsymbollist = {}
clearing_time_dict = {
"sc": s_time(2, 30),
"bc": s_time(1, 0),
"lu": s_time(23, 0),
"nr": s_time(23, 0),
"au": s_time(2, 30),
"ag": s_time(2, 30),
"ss": s_time(1, 0),
"sn": s_time(1, 0),
"ni": s_time(1, 0),
"pb": s_time(1, 0),
"zn": s_time(1, 0),
"al": s_time(1, 0),
"cu": s_time(1, 0),
"ru": s_time(23, 0),
"rb": s_time(23, 0),
"hc": s_time(23, 0),
"fu": s_time(23, 0),
"bu": s_time(23, 0),
"sp": s_time(23, 0),
"PF": s_time(23, 0),
"SR": s_time(23, 0),
"CF": s_time(23, 0),
"CY": s_time(23, 0),
"RM": s_time(23, 0),
"MA": s_time(23, 0),
"TA": s_time(23, 0),
"ZC": s_time(23, 0),
"FG": s_time(23, 0),
"OI": s_time(23, 0),
"SA": s_time(23, 0),
"p": s_time(23, 0),
"j": s_time(23, 0),
"jm": s_time(23, 0),
"i": s_time(23, 0),
"l": s_time(23, 0),
"v": s_time(23, 0),
"pp": s_time(23, 0),
"eg": s_time(23, 0),
"c": s_time(23, 0),
"cs": s_time(23, 0),
"y": s_time(23, 0),
"m": s_time(23, 0),
"a": s_time(23, 0),
"b": s_time(23, 0),
"rr": s_time(23, 0),
"eb": s_time(23, 0),
"pg": s_time(23, 0),
}
def send_mail(text):
# msg = MIMEMultipart()
# msg["From"] = sender
# msg["To"] = ";".join(receivers)
# msg["Subject"] = subject
# msg.attach(MIMEText(text, "plain", "utf-8"))
# smtp = smtplib.SMTP_SSL(smtp_server, smtp_port)
# smtp.login(username, password)
# smtp.sendmail(sender, receivers, msg.as_string())
# smtp.quit()
global last_sent_time
# 检查时间间隔
current_time = time.time()
if current_time - last_sent_time < 60:
print("current_time:",current_time)
print("last_sent_time:",last_sent_time)
print("一分钟内已发送过邮件,本次跳过")
return # 直接退出,不执行发送
msg = MIMEMultipart()
msg["From"] = sender
msg["To"] = ";".join(receivers)
msg["Subject"] = subject
html_content = f"""
以下是数据的最后一列:
{text}
"""
msg.attach(MIMEText(html_content, 'html'))
smtp = smtplib.SMTP_SSL(smtp_server, smtp_port)
smtp.login(username, password)
smtp.sendmail(sender, receivers, msg.as_string())
smtp.quit()
def futures_main_day(future_symbol, delta_days):
# 获取当前日期的数据
today = datetime.now().strftime("%Y%m%d")
# 计算多少日前的日期
start_day = (datetime.now() - timedelta(days=delta_days)).strftime("%Y%m%d")
futures_main_sina_hist = ak.futures_main_sina(
symbol=future_symbol, start_date=start_day, end_date=today
)
return futures_main_sina_hist
def safe_literal_eval(x):
"""带异常处理的安全转换"""
try:
return ast.literal_eval(x)
except:
return [] # 返回空列表作为占位符
def add_poc_column(df):
# 安全转换列数据
df['price'] = df['price'].apply(safe_literal_eval)
df['Ask'] = df['Ask'].apply(lambda x: list(map(int, safe_literal_eval(x))))
df['Bid'] = df['Bid'].apply(lambda x: list(map(int, safe_literal_eval(x))))
# 定义处理函数(带数据验证)
def find_poc(row):
# 验证三个列表长度一致且非空
if not (len(row['price']) == len(row['Ask']) == len(row['Bid']) > 0):
return '缺值' # 返回空值标记异常数据
sums = [a + b for a, b in zip(row['Ask'], row['Bid'])]
try:
max_index = sums.index(max(sums))
return row['price'][max_index]
except ValueError:
return '缺值' # 处理空求和列表情况
# 应用处理函数
df['POC'] = df.apply(find_poc, axis=1)
# 可选:统计异常数据
error_count = df['POC'].isnull().sum()
if error_count > 0:
print(f"警告:发现 {error_count} 行异常数据(已标记为NaN)")
return df['POC']
def ultimate_smoother(price, period):
# 初始化变量(修正角度单位为弧度)
a1 = np.exp(-1.414 * np.pi / period)
b1 = 2 * a1 * np.cos(1.414 * np.pi / period) # 将180改为np.pi
c2 = b1
c3 = -a1 ** 2
c1 = (1 + c2 - c3) / 4
# 准备输出序列
us = np.zeros(len(price))
us_new = np.zeros(len(price))
trend = [None]*(len(price))
ma_close = np.zeros(len(price))
# 前4个点用原始价格初始化
for i in range(len(price)):
if i < 4:
us[i] = price[i]
else:
# 应用递归公式
us[i] = (1 - c1) * price[i] + (2 * c1 - c2) * price[i-1] \
- (c1 + c3) * price[i-2] + c2 * us[i-1] + c3 * us[i-2]
us_new = np.around(us, decimals=2)
ma_close = price.rolling(window=5*period).mean()
if us_new[i]>price[i] and ma_close[i]>price[i]:
trend[i] = '空头趋势'
elif us_new[i] 0:
self.on_tick(tick)
def can_time(self, hour, minute):
hour = str(hour)
minute = str(minute)
if len(minute) == 1:
minute = "0" + minute
return int(hour + minute)
def on_tick(self, tick):
# tm = self.can_time(tick["created_at"].hour, tick["created_at"].minute)
if tick["last_volume"] == 0:
return
quotes = tick
timetick = str(tick["created_at"]).replace("+08:00", "")
tsymbol = tick["symbol"]
if tsymbol not in tsymbollist.keys():
# 获取tick的买卖价和买卖量
tsymbollist[tsymbol] = tick
bid_p = quotes["bid_p"]
ask_p = quotes["ask_p"]
bid_v = quotes["bid_v"]
ask_v = quotes["ask_v"]
else:
# 获取上一个tick的买卖价和买卖量
rquotes = tsymbollist[tsymbol]
bid_p = rquotes["bid_p"]
ask_p = rquotes["ask_p"]
bid_v = rquotes["bid_v"]
ask_v = rquotes["ask_v"]
tsymbollist[tsymbol] = tick
tick_dt = pd.DataFrame(
{
"datetime": timetick,
"symbol": tick["symbol"],
"mainsym": tick["symbol"].rstrip("0123456789").upper(),
"lastprice": tick["price"],
"vol": tick["last_volume"],
"bid_p": bid_p,
"ask_p": ask_p,
"bid_v": bid_v,
"ask_v": ask_v,
},
index=[0],
)
sym = tick_dt["symbol"][0]
self.tickdata(tick_dt, sym)
def data_of(self, symbol, df):
global trade_dfs
trade_dfs[symbol] = pd.concat([trade_dfs[symbol], df], ignore_index=True)
def process(self, bidDict, askDict, symbol):
try:
# 尝试从quotedict中获取对应品种的报价数据
dic = quotedict[symbol]
bidDictResult = dic["bidDictResult"]
askDictResult = dic["askDictResult"]
except Exception:
# 如果获取失败,则初始化bidDictResult和askDictResult为空字典
bidDictResult, askDictResult = {}, {}
# 将所有买盘字典和卖盘字典的key合并,并按升序排序
sList = sorted(set(list(bidDict.keys()) + list(askDict.keys())))
# 遍历所有的key,将相同key的值进行累加
for s in sList:
if s in bidDict:
if s in bidDictResult:
bidDictResult[s] = int(bidDict[s]) + bidDictResult[s]
else:
bidDictResult[s] = int(bidDict[s])
if s not in askDictResult:
askDictResult[s] = 0
else:
if s in askDictResult:
askDictResult[s] = int(askDict[s]) + askDictResult[s]
else:
askDictResult[s] = int(askDict[s])
if s not in bidDictResult:
bidDictResult[s] = 0
# 构建包含bidDictResult和askDictResult的字典,并存入quotedict中
df = {"bidDictResult": bidDictResult, "askDictResult": askDictResult}
quotedict[symbol] = df
return bidDictResult, askDictResult
def tickdata(self, df, symbol):
tickdata = pd.DataFrame(
{
"datetime": df["datetime"],
"symbol": df["symbol"],
"lastprice": df["lastprice"],
"volume": df["vol"],
"bid_p": df["bid_p"],
"bid_v": df["bid_v"],
"ask_p": df["ask_p"],
"ask_v": df["ask_v"],
}
)
try:
if symbol in tickdatadict.keys():
rdf = tickdatadict[symbol]
rdftm = pd.to_datetime(rdf["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S")
now = str(tickdata["datetime"][0])
if now > rdftm:
try:
oo = ofdatadict[symbol]
self.data_of(symbol, oo)
if symbol in quotedict.keys():
quotedict.pop(symbol)
if symbol in tickdatadict.keys():
tickdatadict.pop(symbol)
if symbol in ofdatadict.keys():
ofdatadict.pop(symbol)
except IOError as e:
print("rdftm捕获到异常", e)
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"])
tickdata["open"] = tickdata["lastprice"]
tickdata["high"] = tickdata["lastprice"]
tickdata["low"] = tickdata["lastprice"]
tickdata["close"] = tickdata["lastprice"]
tickdata["starttime"] = tickdata["datetime"]
else:
tickdata["bartime"] = rdf["bartime"]
tickdata["open"] = rdf["open"]
tickdata["high"] = max(
tickdata["lastprice"].values, rdf["high"].values
)
tickdata["low"] = min(
tickdata["lastprice"].values, rdf["low"].values
)
tickdata["close"] = tickdata["lastprice"]
tickdata["volume"] = df["vol"] + rdf["volume"].values
tickdata["starttime"] = rdf["starttime"]
else:
print("新bar的第一个tick进入")
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"])
tickdata["open"] = tickdata["lastprice"]
tickdata["high"] = tickdata["lastprice"]
tickdata["low"] = tickdata["lastprice"]
tickdata["close"] = tickdata["lastprice"]
tickdata["starttime"] = tickdata["datetime"]
except IOError as e:
print("捕获到异常", e)
tickdata["bartime"] = pd.to_datetime(tickdata["bartime"])
param = self.param_dict[self.品种]
bardata = (
tickdata.resample(
on="bartime", rule=param.周期, label="right", closed="right"
)
.agg(
{
"starttime": "first",
"symbol": "last",
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum",
}
)
.reset_index(drop=False)
)
bardata = bardata.dropna().reset_index(drop=True)
bardata["bartime"] = pd.to_datetime(bardata["bartime"][0]).strftime(
"%Y-%m-%d %H:%M:%S"
)
tickdatadict[symbol] = bardata
tickdata["volume"] = df["vol"].values
self.orderflow_df_new(tickdata, bardata, symbol)
def orderflow_df_new(self, df_tick, df_min, symbol):
# startArray = pd.to_datetime(df_min["starttime"]).values
voluememin = df_min["volume"].values
highs = df_min["high"].values
lows = df_min["low"].values
opens = df_min["open"].values
closes = df_min["close"].values
# endArray = pd.to_datetime(df_min['bartime']).values
endArray = df_min["bartime"].values
# print(endArray)
# deltaArray = np.zeros((len(endArray),))
# tTickArray = pd.to_datetime(df_tick["datetime"]).values
bp1minickArray = df_tick["bid_p"].values
ap1minickArray = df_tick["ask_p"].values
lastTickArray = df_tick["lastprice"].values
volumeTickArray = df_tick["volume"].values
symbolarray = df_tick["symbol"].values
# indexFinal = 0
for index, tEnd in enumerate(endArray):
dt = endArray[index]
# start = startArray[index]
bidDict = {}
askDict = {}
bar_vol = voluememin[index]
bar_close = closes[index]
bar_open = opens[index]
bar_low = lows[index]
bar_high = highs[index]
bar_symbol = symbolarray[index]
Bp = round(bp1minickArray[0], 4)
Ap = round(ap1minickArray[0], 4)
LastPrice = round(lastTickArray[0], 4)
Volume = volumeTickArray[0]
if LastPrice >= Ap:
if str(LastPrice) in askDict.keys():
askDict[str(LastPrice)] += Volume
else:
askDict[str(LastPrice)] = Volume
if LastPrice <= Bp:
if str(LastPrice) in bidDict.keys():
bidDict[str(LastPrice)] += Volume
else:
bidDict[str(LastPrice)] = Volume
# indexFinal = indexTick
bidDictResult, askDictResult = self.process(bidDict, askDict, symbol)
bidDictResult = dict(
sorted(bidDictResult.items(), key=operator.itemgetter(0))
)
askDictResult = dict(
sorted(askDictResult.items(), key=operator.itemgetter(0))
)
prinslist = list(bidDictResult.keys())
asklist = list(askDictResult.values())
bidlist = list(bidDictResult.values())
delta = sum(askDictResult.values()) - sum(bidDictResult.values())
df = pd.DataFrame(
{
"price": pd.Series([prinslist]),
"Ask": pd.Series([asklist]),
"Bid": pd.Series([bidlist]),
}
)
# df=pd.DataFrame({'price':pd.Series(bidDictResult.keys()),'Ask':pd.Series(askDictResult.values()),'Bid':pd.Series(bidDictResult.values())})
df["symbol"] = bar_symbol
df["datetime"] = dt
df["delta"] = str(delta)
df["close"] = bar_close
df["open"] = bar_open
df["high"] = bar_high
df["low"] = bar_low
df["volume"] = bar_vol
# df['ticktime']=tTickArray[0]
df["dj"] = self.GetOrderFlow_dj(df)
ofdatadict[symbol] = df
def GetOrderFlow_dj(self, kData):
param = self.param_dict[self.品种]
Config = {
"Value1": param.失衡,
"Value2": param.堆积,
"Value4": True,
}
aryData = kData
djcout = 0
# 遍历kData中的每一行,计算djcout指标
for index, row in aryData.iterrows():
kItem = aryData.iloc[index]
# high = kItem["high"]
# low = kItem["low"]
# close = kItem["close"]
# open = kItem["open"]
dtime = kItem["datetime"]
price_s = kItem["price"]
Ask_s = kItem["Ask"]
Bid_s = kItem["Bid"]
delta = kItem["delta"]
price_s = price_s
Ask_s = Ask_s
Bid_s = Bid_s
gj = 0
xq = 0
gxx = 0
xxx = 0
# 遍历price_s中的每一个元素,计算相关指标
for i in np.arange(0, len(price_s), 1):
duiji = {
"price": 0,
"time": 0,
"longshort": 0,
}
if i == 0:
delta = delta
order = {
"Price": price_s[i],
"Bid": {"Value": Bid_s[i]},
"Ask": {"Value": Ask_s[i]},
}
# 空头堆积
if i >= 0 and i < len(price_s) - 1:
if order["Bid"]["Value"] > Ask_s[i + 1] * int(Config["Value1"]):
gxx += 1
gj += 1
if gj >= int(Config["Value2"]) and Config["Value4"] is True:
duiji["price"] = price_s[i]
duiji["time"] = dtime
duiji["longshort"] = -1
if float(duiji["price"]) > 0:
djcout += -1
else:
gj = 0
# 多头堆积
if i >= 1 and i < len(price_s) - 1:
if order["Ask"]["Value"] > Bid_s[i - 1] * int(Config["Value1"]):
xq += 1
xxx += 1
if xq >= int(Config["Value2"]) and Config["Value4"] is True:
duiji["price"] = price_s[i]
duiji["time"] = dtime
duiji["longshort"] = 1
if float(duiji["price"]) > 0:
djcout += 1
else:
xq = 0
# 返回计算得到的djcout值
return djcout
# 读取保存的数据
def read_to_csv(self, symbol):
# 文件夹路径和文件路径
# 使用正则表达式提取英文字母并重新赋值给symbol
param = self.param_dict[symbol]
# symbol = ''.join(re.findall('[a-zA-Z]', str(symbol)))
folder_path = "traderdata"
file_path = os.path.join(folder_path, f"{str(symbol)}_traderdata.csv")
# 如果文件夹不存在则创建
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# 读取保留的模型数据CSV文件
if os.path.exists(file_path):
df = pd.read_csv(file_path)
if not df.empty and param.kgdata is True:
# 选择最后一行数据
# df = df._append(df.iloc[-1], ignore_index=True)
row = df.iloc[-1]
# 根据CSV文件的列名将数据赋值给相应的属性
param.pos = int(row["pos"])
param.short_trailing_stop_price = float(
row["short_trailing_stop_price"]
)
param.long_trailing_stop_price = float(row["long_trailing_stop_price"])
param.sl_long_price = float(row["sl_long_price"])
param.sl_shor_price = float(row["sl_shor_price"])
# param.out_long = int(row['out_long'])
# param.out_short = int(row['out_short'])
print("找到历史交易数据文件,已经更新持仓,止损止盈数据", df.iloc[-1])
param.kgdata = False
else:
pass
# print("没有找到历史交易数据文件", file_path)
# 如果没有找到CSV,则初始化变量
pass
# 保存数据
def save_to_csv(self, symbol):
param = self.param_dict[symbol]
# 使用正则表达式提取英文字母并重新赋值给symbol
# symbol = ''.join(re.findall('[a-zA-Z]', str(symbol)))
# 创建DataFrame
data = {
"datetime": [trade_dfs[symbol]["datetime"].iloc[-1]],
"pos": [param.pos],
"short_trailing_stop_price": [param.short_trailing_stop_price],
"long_trailing_stop_price": [param.long_trailing_stop_price],
"sl_long_price": [param.sl_long_price],
"sl_shor_price": [param.sl_shor_price],
# 'out_long': [param.out_long],
# 'out_short': [param.out_short]
}
df = pd.DataFrame(data)
# 将DataFrame保存到CSV文件
# df.to_csv(
# f"traderdata/{str(symbol)}_traderdata.csv",
# mode="a",
# index=False,
# header=False,
# )
traderdata_file_path = f"traderdata/{str(symbol)}_traderdata.csv"
if os.path.exists(traderdata_file_path):
# 仅保存最后一行数据
csv_df = pd.read_csv(traderdata_file_path)
if df["pos"].iloc[-1] != csv_df["pos"].iloc[-1]:
df.to_csv(traderdata_file_path, mode="a", header=False, index=False)
else:
# 创建新文件并保存整个DataFrame
df.to_csv(traderdata_file_path, index=False)
# df.to_csv(f"traderdata/{str(symbol)}_traderdata.csv", index=False)
# 每日收盘重置数据
def day_data_reset(self, symbol):
param = self.param_dict[symbol]
sec = "".join(re.findall("[a-zA-Z]", str(symbol)))
# 获取当前时间
current_time = datetime.now().time()
# 第一时间范围(日盘收盘)
clearing_time1_start = s_time(15, 5)
clearing_time1_end = s_time(15, 10)
# 创建一个标志变量,用于记录是否已经执行过
param.clearing_executed = False
# 检查当前时间第一个操作的时间范围内
if (
clearing_time1_start <= current_time <= clearing_time1_end
and not param.clearing_executed
):
param.clearing_executed = True # 设置标志变量为已执行
trade_dfs[symbol].drop(
trade_dfs[symbol].index, inplace=True
) # 清除当天的行情数据
# 检查当前时间是否在第二个操作的时间范围内(夜盘收盘)
elif sec in clearing_time_dict.keys():
clearing_time2_start = clearing_time_dict[sec]
clearing_time2_end = s_time(
clearing_time2_start.hour, clearing_time2_start.minute + 15
)
if (
clearing_time2_start <= current_time <= clearing_time2_end
and not param.clearing_executed
):
param.clearing_executed = True # 设置标志变量为已执行
trade_dfs[symbol].drop(
trade_dfs[symbol].index, inplace=True
) # 清除当天的行情数据
else:
param.clearing_executed = False
pass
return param.clearing_executed
def OnRtnTrade(self, pTrade):
print("||成交回报||", pTrade)
def OnRspOrderInsert(self, pInputOrder, pRspInfo, nRequestID, bIsLast):
print("||OnRspOrderInsert||", pInputOrder, pRspInfo, nRequestID, bIsLast)
# 订单状态通知
def OnRtnOrder(self, pOrder):
print("||订单回报||", pOrder)
def cal_sig(self, symbol_queue):
while True:
try:
data = symbol_queue.get(
block=True, timeout=5
) # 如果5秒没收到新的tick行情,则抛出异常
instrument_id = data["InstrumentID"].decode() # 品种代码
size = symbol_queue.qsize()
if size > 1:
print(
f"当前{instrument_id}共享队列长度为{size}, 有点阻塞!!!!!"
)
self.read_to_csv(instrument_id)
self.day_data_reset(instrument_id)
param = self.param_dict[instrument_id]
self.品种 = instrument_id
self.tickcome(data)
trade_df = trade_dfs[instrument_id]
# 新K线开始,启动交易程序 and 保存行情数据
# trade_df = trade_df.drop_duplicates(subset='datetime', keep='first').reset_index(drop=True)
# trade_df = trade_df[trade_df["high"] != trade_df["low"]]
self.read_to_csv(instrument_id)
if len(trade_df) > param.cont_df:
# 检查文件是否存在
csv_file_path = f"traderdata/{instrument_id}_ofdata.csv"
if os.path.exists(csv_file_path):
#jerome :保存数增加'delta累计'、POC、、终极平滑值、趋势方向
# 仅保存最后一行数据
trade_df.tail(1).to_csv(
csv_file_path, mode="a", header=False, index=False
)
else:
# 创建新文件并保存整个DataFrame
trade_df.to_csv(csv_file_path, index=False)
# 更新跟踪止损价格
if param.long_trailing_stop_price > 0 and param.pos > 0:
param.long_trailing_stop_price = (
trade_df["low"].iloc[-1]
if param.long_trailing_stop_price < trade_df["low"].iloc[-1]
else param.long_trailing_stop_price
)
self.save_to_csv(instrument_id)
if param.short_trailing_stop_price > 0 and param.pos < 0:
param.short_trailing_stop_price = (
trade_df["high"].iloc[-1]
if trade_df["high"].iloc[-1]
< param.short_trailing_stop_price
else param.short_trailing_stop_price
)
self.save_to_csv(instrument_id)
param.out_long = param.long_trailing_stop_price * (
1 - param.trailing_stop_percent
)
param.out_short = param.short_trailing_stop_price * (
1 + param.trailing_stop_percent
)
# 跟踪出场
if param.out_long > 0:
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"预设——多头止盈——",
"TR",
param.out_long,
"low",
trade_df["low"].iloc[-1],
)
if (
trade_df["low"].iloc[-1] < param.out_long
and param.pos > 0
and param.sl_long_price > 0
and trade_df["low"].iloc[-1] > param.sl_long_price
):
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"多头止盈",
"TR",
param.out_long,
"low",
trade_df["low"].iloc[-1],
)
# 平多
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["BidPrice1"] - param.py,
param.Lots,
b"1",
b"1",
)
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["BidPrice1"] - param.py,
param.Lots,
b"1",
b"3",
)
param.long_trailing_stop_price = 0
param.out_long = 0
param.sl_long_price = 0
param.pos = 0
self.save_to_csv(instrument_id)
if param.out_short > 0:
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"预设——空头止盈——: ",
"TR",
param.out_short,
"high",
trade_df["high"].iloc[-1],
)
if (
trade_df["high"].iloc[-1] > param.out_short
and param.pos < 0
and param.sl_shor_price > 0
and trade_df["high"].iloc[-1] < param.sl_shor_price
):
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"空头止盈: ",
"TR",
param.out_short,
"high",
trade_df["high"].iloc[-1],
)
# 平空
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["AskPrice1"] + param.py,
param.Lots,
b"0",
b"1",
)
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["AskPrice1"] + param.py,
param.Lots,
b"0",
b"3",
)
param.short_trailing_stop_price = 0
param.sl_shor_price = 0
self.out_shor = 0
param.pos = 0
self.save_to_csv(instrument_id)
# 固定止损
fixed_stop_loss_L = param.sl_long_price * (
1 - param.fixed_stop_loss_percent
)
if param.pos > 0:
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"预设——多头止损",
"SL",
fixed_stop_loss_L,
"close",
trade_df["close"].iloc[-1],
)
if (
param.sl_long_price > 0
and fixed_stop_loss_L > 0
and param.pos > 0
and trade_df["close"].iloc[-1] < fixed_stop_loss_L
):
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"多头止损",
"SL",
fixed_stop_loss_L,
"close",
trade_df["close"].iloc[-1],
)
# 平多
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["BidPrice1"] - param.py,
param.Lots,
b"1",
b"1",
)
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["BidPrice1"] - param.py,
param.Lots,
b"1",
b"3",
)
param.long_trailing_stop_price = 0
param.sl_long_price = 0
param.out_long = 0
param.pos = 0
self.save_to_csv(instrument_id)
fixed_stop_loss_S = param.sl_shor_price * (
1 + param.fixed_stop_loss_percent
)
if param.pos < 0:
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"预设——空头止损",
"SL",
fixed_stop_loss_S,
"close",
trade_df["close"].iloc[-1],
)
if (
param.sl_shor_price > 0
and fixed_stop_loss_S > 0
and param.pos < 0
and trade_df["close"].iloc[-1] > fixed_stop_loss_S
):
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"空头止损",
"SL",
fixed_stop_loss_S,
"close",
trade_df["close"].iloc[-1],
)
# 平空
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["AskPrice1"] + param.py,
param.Lots,
b"0",
b"1",
)
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["AskPrice1"] + param.py,
param.Lots,
b"0",
b"3",
)
param.short_trailing_stop_price = 0
param.sl_shor_price = 0
param.out_short = 0
param.pos = 0
self.save_to_csv(instrument_id)
# 日均线
# AROONOSC :https://zhuanlan.zhihu.com/p/645010879
# if len(trade_df["close"]) >= 120:
# trade_df["dayma"] = trade_df["close"][-120:].mean()
# print("trade_df长度:", len(trade_df["close"]))
# print("120条之上的dayma的值:", trade_df["dayma"])
# else:
# trade_df["dayma"] = trade_df["close"].mean()
# print("120条之下的dayma的值:", trade_df["dayma"])
# print("trade_df长度:", len(trade_df["close"]))
day_df = {}
day_df = futures_main_day(
instrument_id, 20
) # futures_main_day(trade_df["symbol"], 20)
day_df["5day_ma"] = day_df["收盘价"].rolling(window=5).mean()
day_df["5day_ma"].iloc[-1]
# trade_df["aroon_osc"] = tb.AROONOSC(trade_df["high"], trade_df["low"], 5)
# trade_df["rinei_T3"] = tb.T3(np.array(trade_df["dayma"]))
# print("交易品种为:", instrument_id)
# print("昨日5日均线:", day_df["5day_ma"].iloc[-1])
# print("昨日收盘价:", day_df["收盘价"].iloc[-1])
# 计算累积的delta值datetime.strptime(str_time, "%Y-%m-%d %H:%M:%S")
trade_df["delta"] = trade_df["delta"].astype(float)
trade_df['datetime'] = pd.to_datetime(trade_df['datetime'], format='mixed')
trade_df['delta累计'] = trade_df.groupby(trade_df['datetime'].dt.date)['delta'].cumsum()
# if len(trade_df['datetime']) == 1:
# trade_df["delta累计"].iloc[-1] = trade_df["delta"].iloc[-1]
# else:
# trade_time_1 = datetime.strptime(trade_df['datetime'].iloc[-1], "%Y-%m-%d %H:%M:%S")
# trade_time_2 = datetime.strptime(trade_df['datetime'].iloc[-2], "%Y-%m-%d %H:%M:%S")
# if trade_time_1.date() != trade_time_2.date():
# trade_df["delta累计"].iloc[-1] = trade_df["delta"].iloc[-1]
# else:
# trade_df["delta累计"].iloc[-1] = trade_df["delta"].iloc[-1]+trade_df["delta"].iloc[-2]
# trade_df["delta累计"] = trade_df["delta"].cumsum()
# 获取第三大值和第三小值
abs_delta = max(trade_df["delta"].iloc[-120:-1], default=0) - min(
trade_df["delta"].iloc[-120:-1], default=0
)
print("abs_delta:", abs_delta)
# third_largest_delta = np.sort(arr_delta)[-2]
# third_smallest_delta = np.sort(arr_delta)[2]
abs_delta累计 = max(
trade_df["delta累计"].iloc[-120:-1], default=0
) - min(trade_df["delta累计"].iloc[-120:-1], default=0)
print("abs_delta累计:", abs_delta累计)
# third_largest_delta累计 = np.sort(arr_delta累计)[-2]
# third_smallest_delta累计 = np.sort(arr_delta累计)[2]
# 大于日均线
# 开多1 = trade_df["dayma"].iloc[-1] > 0 and trade_df["close"].iloc[-1] > trade_df["dayma"].iloc[-1]
# 开多1 = trade_df["aroon_osc"].iloc[-1] > 0
# 开多1 = trade_df["close"].iloc[-1] > trade_df[
# "rinei_T3"].iloc[-1]
开多1 = day_df["收盘价"].iloc[-1] > day_df["5day_ma"].iloc[-1]
# 累计多空净量大于X
# 开多4 = (
# trade_df["delta累计"].iloc[-1] > param.sum_delta and trade_df["delta"].iloc[-1] > param.delta
# )
开多4 = trade_df["delta"].iloc[-1] > param.delta and trade_df["delta累计"].iloc[-1] > param.sum_delta
# 开多4 = trade_df["delta累计"].iloc[-1] > np.sort(trade_df["delta累计"].iloc[-120:-1], default=0)[-2] and trade_df["delta"].iloc[-1] > np.sort(trade_df["delta"].iloc[-120:-1], default=0)[-2]
# 开多4 = trade_df["delta累计"].iloc[-1] > third_largest_delta累计 and trade_df["delta"f].iloc[-1] > third_largest_delta
# 小于日均线
# 开空1 = trade_df["dayma"].iloc[-1] > 0 and trade_df["close"].iloc[-1] < trade_df["dayma"].iloc[-1]
# 开空1 = trade_df["aroon_osc"].iloc[-1] < 0
# 开空1 = trade_df["close"].iloc[-1] < trade_df[
# "rinei_T3"].iloc[-1]
开空1 = day_df["收盘价"].iloc[-1] < day_df["5day_ma"].iloc[-1]
# 累计多空净量小于X
开空4 = trade_df["delta"].iloc[-1] < -param.delta and trade_df["delta累计"].iloc[-1] <- param.sum_delta
# 开空4 = trade_df["delta累计"].iloc[-1] < np.sort(trade_df["delta累计"].iloc[-120:-1], default=0)[2] and trade_df["delta"].iloc[-1] < np.sort(trade_df["delta"].iloc[-120:-1], default=0)[2]
# 开空4 = trade_df["delta累计"].iloc[-1] < third_smallest_delta累计 and trade_df["delta"].iloc[-1] < third_smallest_delta
开多组合 = (
# 开多1
开多4
and trade_df["dj"].iloc[-1] > param.dj_X
and datetime.now().time() < s_time(14,55)
# and len(trade_df) > 120
)
开空条件 = (
# 开空1
开空4
and trade_df["dj"].iloc[-1] < -param.dj_X
and datetime.now().time() < s_time(14,55)
# and len(trade_df) > 120
)
平多条件 = (trade_df["dj"].iloc[-1] < -param.dj_X) or (datetime.now().time() >= s_time(14,55))
平空条件 = (trade_df["dj"].iloc[-1] > param.dj_X) or (datetime.now().time() >= s_time(14,55))
# 开仓
# 多头开仓条件
if param.pos < 0 and 平空条件:
print(
"平空: ",
"ExchangeID: ",
data["ExchangeID"],
"InstrumentID",
data["InstrumentID"],
"AskPrice1",
data["AskPrice1"] + param.py,
)
# 平空
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["AskPrice1"] + param.py,
param.Lots,
b"0",
b"1",
)
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["AskPrice1"] + param.py,
param.Lots,
b"0",
b"3",
)
param.pos = 0
param.sl_shor_price = 0
param.short_trailing_stop_price = 0
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"反手平空:",
"平仓价格:",
data["AskPrice1"] + param.py,
"堆积数:",
trade_df["dj"].iloc[-1],
)
self.save_to_csv(instrument_id)
# 发送邮件
# text = f"平空交易: 交易品种为{data['InstrumentID']}, 交易时间为{trade_df['datetime'].iloc[-1]}, 反手平空的平仓价格为{data['AskPrice1']+param.py}, 交易手数位{param.Lots}"
text = f"C_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, C_S_T_Price:{data['AskPrice1'] + param.py}, T_Lots:{param.Lots}"
send_mail(text)
if param.pos == 0 and 开多组合:
print(
"开多: ",
"ExchangeID: ",
data["ExchangeID"],
"InstrumentID",
data["InstrumentID"],
"AskPrice1",
data["AskPrice1"] + param.py,
)
# 开多
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["AskPrice1"] + param.py,
param.Lots,
b"0",
b"0",
)
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"多头开仓",
"开仓价格:",
data["AskPrice1"] + param.py,
"堆积数:",
trade_df["dj"].iloc[-1],
)
param.pos = 1
param.long_trailing_stop_price = data["AskPrice1"]
param.sl_long_price = data["AskPrice1"]
self.save_to_csv(instrument_id)
# 发送邮件
text = f"O_L_T ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, O_L_T_Price:{data['AskPrice1'] + param.py}, T_Lots:{param.Lots}"
send_mail(text)
if param.pos > 0 and 平多条件:
print(
"平多: ",
"ExchangeID: ",
data["ExchangeID"],
"InstrumentID",
data["InstrumentID"],
"BidPrice1",
data["BidPrice1"] - param.py,
)
# 平多
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["BidPrice1"] - param.py,
param.Lots,
b"1",
b"1",
)
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["BidPrice1"] - param.py,
param.Lots,
b"1",
b"3",
)
param.pos = 0
param.long_trailing_stop_price = 0
param.sl_long_price = 0
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"反手平多",
"平仓价格:",
data["BidPrice1"] - param.py,
"堆积数:",
trade_df["dj"].iloc[-1],
)
self.save_to_csv(instrument_id)
# 发送邮件
# text = f"平多交易: 交易品种为{data['InstrumentID']}, 交易时间为{trade_df['datetime'].iloc[-1]}, 反手平多的平仓价格{data['BidPrice1']-param.py}, 交易手数位{param.Lots}"
text = f"C_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, C_L_T_Price:{data['BidPrice1'] - param.py}, T_Lots:{param.Lots}"
send_mail(text)
if param.pos == 0 and 开空条件:
print(
"开空: ",
"ExchangeID: ",
data["ExchangeID"],
"InstrumentID",
data["InstrumentID"],
"BidPrice1",
data["BidPrice1"],
)
# 开空
self.insert_order(
data["ExchangeID"],
data["InstrumentID"],
data["BidPrice1"] - param.py,
param.Lots,
b"1",
b"0",
)
print(
"datetime+sig: ",
trade_df["datetime"].iloc[-1],
"空头开仓",
"开仓价格:",
data["BidPrice1"] - param.py,
"堆积数:",
trade_df["dj"].iloc[-1],
)
param.pos = -1
param.short_trailing_stop_price = data["BidPrice1"]
param.sl_shor_price = data["BidPrice1"]
self.save_to_csv(instrument_id)
# 发送邮件
text = f"O_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, O_S_T_Price:{data['BidPrice1'] - param.py}, T_Lots:{param.Lots}"
send_mail(text)
print(trade_df)
print("------------------------------------------------")
# print(trade_df.iloc[0])
# print(trade_df.iloc[-1])
param.cont_df = len(trade_df)
except queue.Empty:
# print(f"当前合约队列为空,等待新数据插入。")
pass
# 将CTP推送的行情数据分发给对应线程队列去执行
def distribute_tick(self):
while True:
if self.status == 0:
data = None
while not self.md_queue.empty():
data = self.md_queue.get(block=False)
instrument_id = data["InstrumentID"].decode() # 品种代码
try:
self.queue_dict[instrument_id].put(
data, block=False
) # 往对应合约队列中插入行情
# print(f"{instrument_id}合约数据插入。")
except queue.Full:
# 当某个线程阻塞导致对应队列容量超限时抛出异常,不会影响其他合约的信号计算
print(
f"{instrument_id}合约信号计算阻塞导致对应队列已满,请检查对应代码逻辑后重启。"
)
else:
time.sleep(1)
def start(self, param_dict):
threads = []
self.param_dict = param_dict
for symbol in param_dict.keys():
# folder_path = "traderdata"
# ofdata_file_path = os.path.join("traderdata", f"{str(symbol)}_ofdata.csv")
if os.path.exists(f"traderdata/{symbol}_ofdata.csv"):
columns = [
"price",
"Ask",
"Bid",
"symbol",
"datetime",
"delta",
"close",
"open",
"high",
"low",
"volume",
"dj",
]
# import csv
# with open(f"traderdata/{symbol}_ofdata.csv", "r") as f:
# reader = csv.reader(f)
# for i, row in enumerate(reader, 1):
# if len(row) != 12:
# print(f"Line {i} has {len(row)} columns: {row}")
trade_dfs[symbol] = pd.read_csv(f"traderdata/{symbol}_ofdata.csv", usecols=columns)
# time_period = 48
# trade_dfs[symbol] = trade_dfs[symbol].drop_duplicates(subset='datetime', keep='first').reset_index(drop=True)
# trade_dfs[symbol] = trade_dfs[symbol][trade_dfs[symbol]['high'] != trade_dfs[symbol]['low']]
# trade_dfs[symbol]["delta"] = trade_dfs[symbol]["delta"].astype(float)
# trade_dfs[symbol]['datetime'] = pd.to_datetime(trade_dfs[symbol]['datetime'],format='mixed')#, dayfirst=True, format='mixed'
# trade_dfs[symbol]['delta累计'] = trade_dfs[symbol].groupby(trade_dfs[symbol]['datetime'].dt.date)['delta'].cumsum()
# # trade_dfs[symbol] = trade_dfs[symbol].fillna('缺值')
# trade_dfs[symbol]['终极平滑值'],trade_dfs[symbol]['趋势方向'] = ultimate_smoother(trade_dfs[symbol]['close'],time_period)
# trade_dfs[symbol]['datetime'] = trade_dfs[symbol]['datetime'].dt.strftime("%Y-%m-%d %H:%M:%S")
# trade_dfs[symbol]['POC'] = add_poc_column(trade_dfs[symbol])
# if len(trade_dfs[symbol]) >=5*time_period and (trade_dfs[symbol]['趋势方向'].iloc[-1] != trade_dfs[symbol]['趋势方向'].iloc[-2]):
# table_text = trade_dfs[symbol].iloc[:,3:].tail(1).to_html(index=False) #price,Ask,Bid,symbol,datetime,delta,close,open,high,low,volume,dj
# send_mail(table_text)
# else:
# pass
else:
trade_dfs[symbol] = pd.DataFrame({})
self.queue_dict[symbol] = queue.Queue(
20
) # 为每个合约创建一个限制数为10的队列,当计算发生阻塞导致队列达到限制数时会抛出异常
t = threading.Thread(
target=self.cal_sig, args=(self.queue_dict[symbol],)
) # 为每个合约单独创建一个线程计算开仓逻辑
threads.append(t)
t.start()
self.distribute_tick()
for t in threads:
t.join()
def run_trader(
param_dict,
broker_id,
td_server,
investor_id,
password,
app_id,
auth_code,
md_queue=None,
page_dir="",
private_resume_type=2,
public_resume_type=2,
):
my_trader = MyTrader(
broker_id,
td_server,
investor_id,
password,
app_id,
auth_code,
md_queue,
page_dir,
private_resume_type,
public_resume_type,
)
my_trader.start(param_dict)
if __name__ == "__main__":
# 注意:运行前请先安装好algoplus,
# pip install AlgoPlus
# http://www.algo.plus/ctp/python/0103001.html
param_dict = {}
param_dict["IM2503"] = ParamObj(
symbol="IM2503",
Lots=1,
py=5,
trailing_stop_percent=0.01,
fixed_stop_loss_percent=0.02,
dj_X=8,
delta=500,
sum_delta=800,
失衡=3,
堆积=3,
周期="5min",
)
# param_dict["IF2503"] = ParamObj(
# symbol="IF2503",
# Lots=1,
# py=5,
# trailing_stop_percent=0.01,
# fixed_stop_loss_percent=0.02,
# dj_X=5,
# delta=300,
# sum_delta=300,
# 失衡=3,
# 堆积=3,
# 周期="5min",
# )
# param_dict["IH2503"] = ParamObj(
# symbol="IH2503",
# Lots=1,
# py=5,
# trailing_stop_percent=0.01,
# fixed_stop_loss_percent=0.02,
# dj_X=5,
# delta=300,
# sum_delta=300,
# 失衡=3,
# 堆积=3,
# 周期="5min",
# )
param_dict["rb2505"] = ParamObj(
symbol="rb2505",
Lots=1,
py=5,
trailing_stop_percent=0.01,
fixed_stop_loss_percent=0.02,
dj_X=8,
delta=1500,
sum_delta=2000,
失衡=3,
堆积=3,
周期="5min",
)
param_dict["ag2504"] = ParamObj(
symbol="ag2504",
Lots=1,
py=5,
trailing_stop_percent=0.01,
fixed_stop_loss_percent=0.02,
dj_X=8,
delta=1500,
sum_delta=2000,
失衡=3,
堆积=3,
周期="5min",
)
# 用simnow模拟,不要忘记屏蔽下方实盘的future_account字典
# SIMULATE_SERVER = {
# '电信1': {'BrokerID': 9999, 'TDServer': "180.168.146.187:10201", 'MDServer': '180.168.146.187:10211', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'},
# '电信2': {'BrokerID': 9999, 'TDServer': "180.168.146.187:10202", 'MDServer': '180.168.146.187:10212', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'},
# '移动': {'BrokerID': 9999, 'TDServer': "218.202.237.33:10203", 'MDServer': '218.202.237.33:10213', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'},
# 'TEST': {'BrokerID': 9999, 'TDServer': "180.168.146.187:10130", 'MDServer': '180.168.146.187:10131', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'},
# 'N视界': {'BrokerID': 10010, 'TDServer': "210.14.72.12:4600", 'MDServer': '210.14.72.12:4602', 'AppID': '', 'AuthCode': ''},
# }
# BrokerID统一为:9999
# 支持上期所期权、能源中心期权、中金所期权、广期所期权、郑商所期权、大商所期权
# 第一组
# Trade Front:180.168.146.187:10201,Market Front:180.168.146.187:10211;【电信】(看穿式前置,使用监控中心生产秘钥)
# 第二组
# Trade Front:180.168.146.187:10202,Market Front:180.168.146.187:10212;【电信】(看穿式前置,使用监控中心生产秘钥)
# 第三组
# Trade Front:218.202.237.33:10203,Market Front:218.202.237.33:10213;【移动】(看穿式前置,使用监控中心生产秘钥)
# 用户注册后,默认的APPID为simnow_client_test,认证码为0000000000000000(16个0),默认开启终端认证,程序化用户可以选择不开终端认证接入。
future_account = get_simulate_account(
investor_id="223828", # simnow账户,注意是登录账户的ID,SIMNOW个人首页查看
password="Zj1234!@#%", # simnow密码
server_name="电信1", # 电信1、电信2、移动、TEST、N视界
subscribe_list=list(param_dict.keys()), # 合约列表
)
# 实盘用这个,不要忘记屏蔽上方simnow的future_account字典
# future_account = FutureAccount(
# broker_id='9999', # 期货公司BrokerID
# server_dict={'TDServer': "180.168.146.187:10201", 'MDServer': '180.168.146.187:10211'}, # TDServer为交易服务器,MDServer为行情服务器。服务器地址格式为"ip:port。"
# reserve_server_dict={}, # 备用服务器地址
# investor_id='223828', # 账户
# password='Zj1234!@#%', # 密码
# app_id='simnow_client_test', # 认证使用AppID
# auth_code='0000000000000000', # 认证使用授权码
# subscribe_list=list(param_dict.keys()), # 订阅合约列表
# md_flow_path='./log', # MdApi流文件存储地址,默认MD_LOCATION
# td_flow_path='./log', # TraderApi流文件存储地址,默认TD_LOCATION
# )
# 实盘用这个,不要忘记屏蔽上方simnow的future_account字典
# future_account = FutureAccount(
# broker_id='8888', # 期货公司BrokerID
# server_dict={'TDServer': "103.140.14.210:43205", 'MDServer': '103.140.14.210:43173'}, # TDServer为交易服务器,MDServer为行情服务器。服务器地址格式为"ip:port。"
# reserve_server_dict={}, # 备用服务器地址
# investor_id='155878', # 账户
# password='Zj82334475', # 密码
# app_id='vntech_vnpy_2.0', # 认证使用AppID
# auth_code='N46EKN6TJ9U7V06V', # 认证使用授权码
# subscribe_list=list(param_dict.keys()), # 订阅合约列表
# md_flow_path='./log', # MdApi流文件存储地址,默认MD_LOCATION
# td_flow_path='./log', # TraderApi流文件存储地址,默认TD_LOCATION
# )
print("开始", len(future_account.subscribe_list))
# 共享队列
share_queue = Queue(maxsize=200)
# 行情进程
md_process = Process(target=run_tick_engine, args=(future_account, [share_queue]))
# 交易进程
trader_process = Process(
target=run_trader,
args=(
param_dict,
future_account.broker_id,
future_account.server_dict["TDServer"],
future_account.investor_id,
future_account.password,
future_account.app_id,
future_account.auth_code,
share_queue, # 队列
future_account.td_flow_path,
),
)
md_process.start()
trader_process.start()
md_process.join()
trader_process.join()