""" #公众号:松鼠Quant #主页:www.quant789.com #本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! #版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 该代码的主要目的是处理Tick数据并生成交易信号。代码中定义了一个tickcome函数,它接收到Tick数据后会进行一系列的处理,包括构建Tick字典、更新上一个Tick的成交量、保存Tick数据、生成K线数据等。其中涉及到的一些函数有: on_tick(tick): 处理单个Tick数据,根据Tick数据生成K线数据。 tickdata(df, symbol): 处理Tick数据,生成K线数据。 orderflow_df_new(df_tick, df_min, symbol): 处理Tick和K线数据,生成订单流数据。 GetOrderFlow_dj(kData): 计算订单流的信号指标。 除此之外,代码中还定义了一个MyTrader类,继承自TraderApiBase,用于实现交易相关的功能。 #公众号:松鼠Quant #主页:www.quant789.com #本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! #版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 """ from concurrent.futures import ThreadPoolExecutor from multiprocessing import Process, Queue import queue import threading from AlgoPlus.CTP.MdApi import run_tick_engine from AlgoPlus.CTP.FutureAccount import get_simulate_account from AlgoPlus.CTP.FutureAccount import FutureAccount from AlgoPlus.CTP.TraderApiBase import TraderApiBase from AlgoPlus.ta.time_bar import tick_to_bar import pandas as pd from datetime import datetime, timedelta from datetime import time as s_time import operator import time import numpy as np import os import re import talib as tb import akshare as ak # 加入邮件通知 import smtplib from email.mime.text import MIMEText # 导入 MIMEText 类发送纯文本邮件 from email.mime.multipart import ( MIMEMultipart, ) # 导入 MIMEMultipart 类发送带有附件的邮件 from email.mime.application import ( MIMEApplication, ) # 导入 MIMEApplication 类发送二进制附件 # 配置邮件信息 receivers = ["240884432@qq.com"] # 设置邮件接收人地址 subject = "TD_Signal" # 设置邮件主题 订单流策略交易信号 # text = " " # 设置邮件正文 # file_path = "test.txt" # 设置邮件附件文件路径 # 配置邮件服务器信息 smtp_server = "smtp.qq.com" # 设置发送邮件的 SMTP 服务器地址 smtp_port = 465 # 设置发送邮件的 SMTP 服务器端口号,一般为 25 端口 465 sender = "240884432@qq.com" # 设置发送邮件的邮箱地址 username = "240884432@qq.com" # 设置发送邮件的邮箱用户名 password = "osjyjmbqrzxtbjbf" # zrmpcgttataabhjh,设置发送邮件的邮箱密码或授权码 tickdatadict = {} # 存储Tick数据的字典 quotedict = {} # 存储行情数据的字典 ofdatadict = {} # 存储K线数据的字典 trade_dfs = {} # pd.DataFrame({}) # 存储交易数据的DataFrame对象 previous_volume = {} # 上一个Tick的成交量 tsymbollist = {} clearing_time_dict = { "sc": s_time(2, 30), "bc": s_time(1, 0), "lu": s_time(23, 0), "nr": s_time(23, 0), "au": s_time(2, 30), "ag": s_time(2, 30), "ss": s_time(1, 0), "sn": s_time(1, 0), "ni": s_time(1, 0), "pb": s_time(1, 0), "zn": s_time(1, 0), "al": s_time(1, 0), "cu": s_time(1, 0), "ru": s_time(23, 0), "rb": s_time(23, 0), "hc": s_time(23, 0), "fu": s_time(23, 0), "bu": s_time(23, 0), "sp": s_time(23, 0), "PF": s_time(23, 0), "SR": s_time(23, 0), "CF": s_time(23, 0), "CY": s_time(23, 0), "RM": s_time(23, 0), "MA": s_time(23, 0), "TA": s_time(23, 0), "ZC": s_time(23, 0), "FG": s_time(23, 0), "OI": s_time(23, 0), "SA": s_time(23, 0), "p": s_time(23, 0), "j": s_time(23, 0), "jm": s_time(23, 0), "i": s_time(23, 0), "l": s_time(23, 0), "v": s_time(23, 0), "pp": s_time(23, 0), "eg": s_time(23, 0), "c": s_time(23, 0), "cs": s_time(23, 0), "y": s_time(23, 0), "m": s_time(23, 0), "a": s_time(23, 0), "b": s_time(23, 0), "rr": s_time(23, 0), "eb": s_time(23, 0), "pg": s_time(23, 0), } def send_mail(text): msg = MIMEMultipart() msg["From"] = sender msg["To"] = ";".join(receivers) msg["Subject"] = subject msg.attach(MIMEText(text, "plain", "utf-8")) smtp = smtplib.SMTP_SSL(smtp_server, smtp_port) smtp.login(username, password) smtp.sendmail(sender, receivers, msg.as_string()) smtp.quit() def futures_main_day(future_symbol, delta_days): # 获取当前日期的数据 today = datetime.now().strftime("%Y%m%d") # 计算20日前的日期 start_day = (datetime.now() - timedelta(days=delta_days)).strftime("%Y%m%d") futures_main_sina_hist = ak.futures_main_sina( symbol=future_symbol, start_date=start_day, end_date=today ) return futures_main_sina_hist # 交易程序--------------------------------------------------------------------------------------------------------------------------------------------------------------------- class ParamObj: # 策略需要用到的参数,在新建合约对象的时候传入!! # 策略需要用到的参数,在新建合约对象的时候传入!! # 策略需要用到的参数,在新建合约对象的时候传入!! symbol = None # 合约名称 Lots = None # 下单手数 py = None # 设置委托价格的偏移,更加容易促成成交 trailing_stop_percent = None # 跟踪出场参数 fixed_stop_loss_percent = None # 固定出场参数 dj_X = None # 开仓的堆积参数 delta = None # 开仓的delta参数 sum_delta = None # 开仓的delta累积参数 失衡 = None 堆积 = None 周期 = None # 策略需要用到的变量 cont_df = 0 pos = 0 short_trailing_stop_price = 0 long_trailing_stop_price = 0 sl_long_price = 0 sl_shor_price = 0 out_long = 0 out_short = 0 clearing_executed = False kgdata = True def __init__( self, symbol, Lots, py, trailing_stop_percent, fixed_stop_loss_percent, dj_X, delta, sum_delta, 失衡, 堆积, 周期, ): self.symbol = symbol self.Lots = Lots self.py = py self.trailing_stop_percent = trailing_stop_percent self.fixed_stop_loss_percent = fixed_stop_loss_percent self.dj_X = dj_X self.delta = delta self.sum_delta = sum_delta self.失衡 = 失衡 self.堆积 = 堆积 self.周期 = 周期 class MyTrader(TraderApiBase): def __init__( self, broker_id, td_server, investor_id, password, app_id, auth_code, md_queue=None, page_dir="", private_resume_type=2, public_resume_type=2, ): self.param_dict = {} self.queue_dict = {} self.品种 = " " def tickcome(self, md_queue): global previous_volume data = md_queue instrument_id = data["InstrumentID"].decode() # 品种代码 ActionDay = data["ActionDay"].decode() # 交易日日期 update_time = data["UpdateTime"].decode() # 更新时间 update_millisec = str(data["UpdateMillisec"]) # 更新毫秒数 created_at = ( ActionDay[:4] + "-" + ActionDay[4:6] + "-" + ActionDay[6:] + " " + update_time + "." + update_millisec ) # 创建时间 # 构建tick字典 tick = { "symbol": instrument_id, # 品种代码和交易所ID "created_at": datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S.%f"), #'created_at': created_at, # 创建时间 "price": float(data["LastPrice"]), # 最新价 "last_volume": ( int(data["Volume"]) - previous_volume.get(instrument_id, 0) if previous_volume.get(instrument_id, 0) != 0 else 0 ), # 瞬时成交量 "bid_p": float(data["BidPrice1"]), # 买价 "bid_v": int(data["BidVolume1"]), # 买量 "ask_p": float(data["AskPrice1"]), # 卖价 "ask_v": int(data["AskVolume1"]), # 卖量 "UpperLimitPrice": float(data["UpperLimitPrice"]), # 涨停价 "LowerLimitPrice": float(data["LowerLimitPrice"]), # 跌停价 "TradingDay": data["TradingDay"].decode(), # 交易日日期 "cum_volume": int(data["Volume"]), # 最新总成交量 "cum_amount": float(data["Turnover"]), # 最新总成交额 "cum_position": int(data["OpenInterest"]), # 合约持仓量 } # print('&&&&&&&&',instrument_id, tick['created_at'],'vol:',tick['last_volume']) # 更新上一个Tick的成交量 previous_volume[instrument_id] = int(data["Volume"]) if tick["last_volume"] > 0: # print(tick['created_at'],'vol:',tick['last_volume']) # 处理Tick数据 self.on_tick(tick) def can_time(self, hour, minute): hour = str(hour) minute = str(minute) if len(minute) == 1: minute = "0" + minute return int(hour + minute) def on_tick(self, tick): tm = self.can_time(tick["created_at"].hour, tick["created_at"].minute) # print(tick['symbol']) # print(1) # if tm>1500 and tm<2100 : # return if tick["last_volume"] == 0: return quotes = tick timetick = str(tick["created_at"]).replace("+08:00", "") tsymbol = tick["symbol"] if tsymbol not in tsymbollist.keys(): # 获取tick的买卖价和买卖量 tsymbollist[tsymbol] = tick bid_p = quotes["bid_p"] ask_p = quotes["ask_p"] bid_v = quotes["bid_v"] ask_v = quotes["ask_v"] else: # 获取上一个tick的买卖价和买卖量 rquotes = tsymbollist[tsymbol] bid_p = rquotes["bid_p"] ask_p = rquotes["ask_p"] bid_v = rquotes["bid_v"] ask_v = rquotes["ask_v"] tsymbollist[tsymbol] = tick tick_dt = pd.DataFrame( { "datetime": timetick, "symbol": tick["symbol"], "mainsym": tick["symbol"].rstrip("0123456789").upper(), "lastprice": tick["price"], "vol": tick["last_volume"], "bid_p": bid_p, "ask_p": ask_p, "bid_v": bid_v, "ask_v": ask_v, }, index=[0], ) sym = tick_dt["symbol"][0] # print(tick_dt) self.tickdata(tick_dt, sym) # 公众号:松鼠Quant # 主页:www.quant789.com # 本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! # 版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 def data_of(self, symbol, df): global trade_dfs # 将df数据合并到trader_df中 # if symbol not in trade_dfs.keys(): # trade_df = pd.DataFrame({}) # else: # trade_df = trade_dfs[symbol] trade_dfs[symbol] = pd.concat([trade_dfs[symbol], df], ignore_index=True) # print('!!!!!!!!!!!trader_df: ', symbol, df['datetime'].iloc[-1]) # print(trader_df) def process(self, bidDict, askDict, symbol): try: # 尝试从quotedict中获取对应品种的报价数据 dic = quotedict[symbol] bidDictResult = dic["bidDictResult"] askDictResult = dic["askDictResult"] except Exception: # 如果获取失败,则初始化bidDictResult和askDictResult为空字典 bidDictResult, askDictResult = {}, {} # 将所有买盘字典和卖盘字典的key合并,并按升序排序 sList = sorted(set(list(bidDict.keys()) + list(askDict.keys()))) # 遍历所有的key,将相同key的值进行累加 for s in sList: if s in bidDict: if s in bidDictResult: bidDictResult[s] = int(bidDict[s]) + bidDictResult[s] else: bidDictResult[s] = int(bidDict[s]) if s not in askDictResult: askDictResult[s] = 0 else: if s in askDictResult: askDictResult[s] = int(askDict[s]) + askDictResult[s] else: askDictResult[s] = int(askDict[s]) if s not in bidDictResult: bidDictResult[s] = 0 # 构建包含bidDictResult和askDictResult的字典,并存入quotedict中 df = {"bidDictResult": bidDictResult, "askDictResult": askDictResult} quotedict[symbol] = df return bidDictResult, askDictResult # 公众号:松鼠Quant # 主页:www.quant789.com # 本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! # 版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 def tickdata(self, df, symbol): tickdata = pd.DataFrame( { "datetime": df["datetime"], "symbol": df["symbol"], "lastprice": df["lastprice"], "volume": df["vol"], "bid_p": df["bid_p"], "bid_v": df["bid_v"], "ask_p": df["ask_p"], "ask_v": df["ask_v"], } ) try: if symbol in tickdatadict.keys(): rdf = tickdatadict[symbol] rdftm = pd.to_datetime(rdf["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S") now = str(tickdata["datetime"][0]) if now > rdftm: try: oo = ofdatadict[symbol] self.data_of(symbol, oo) # print('oo',oo) if symbol in quotedict.keys(): quotedict.pop(symbol) if symbol in tickdatadict.keys(): tickdatadict.pop(symbol) if symbol in ofdatadict.keys(): ofdatadict.pop(symbol) except IOError as e: print("rdftm捕获到异常", e) tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) tickdata["open"] = tickdata["lastprice"] tickdata["high"] = tickdata["lastprice"] tickdata["low"] = tickdata["lastprice"] tickdata["close"] = tickdata["lastprice"] tickdata["starttime"] = tickdata["datetime"] else: tickdata["bartime"] = rdf["bartime"] tickdata["open"] = rdf["open"] tickdata["high"] = max( tickdata["lastprice"].values, rdf["high"].values ) tickdata["low"] = min( tickdata["lastprice"].values, rdf["low"].values ) tickdata["close"] = tickdata["lastprice"] tickdata["volume"] = df["vol"] + rdf["volume"].values tickdata["starttime"] = rdf["starttime"] else: print("新bar的第一个tick进入") tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) tickdata["open"] = tickdata["lastprice"] tickdata["high"] = tickdata["lastprice"] tickdata["low"] = tickdata["lastprice"] tickdata["close"] = tickdata["lastprice"] tickdata["starttime"] = tickdata["datetime"] except IOError as e: print("捕获到异常", e) tickdata["bartime"] = pd.to_datetime(tickdata["bartime"]) param = self.param_dict[self.品种] bardata = ( tickdata.resample( on="bartime", rule=param.周期, label="right", closed="right" ) .agg( { "starttime": "first", "symbol": "last", "open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum", } ) .reset_index(drop=False) ) bardata = bardata.dropna().reset_index(drop=True) bardata["bartime"] = pd.to_datetime(bardata["bartime"][0]).strftime( "%Y-%m-%d %H:%M:%S" ) tickdatadict[symbol] = bardata tickdata["volume"] = df["vol"].values # print(bardata['symbol'].values,bardata['bartime'].values) self.orderflow_df_new(tickdata, bardata, symbol) # time.sleep(0.5) # 公众号:松鼠Quant # 主页:www.quant789.com # 本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! # 版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 def orderflow_df_new(self, df_tick, df_min, symbol): startArray = pd.to_datetime(df_min["starttime"]).values voluememin = df_min["volume"].values highs = df_min["high"].values lows = df_min["low"].values opens = df_min["open"].values closes = df_min["close"].values # endArray = pd.to_datetime(df_min['bartime']).values endArray = df_min["bartime"].values # print(endArray) deltaArray = np.zeros((len(endArray),)) tTickArray = pd.to_datetime(df_tick["datetime"]).values bp1TickArray = df_tick["bid_p"].values ap1TickArray = df_tick["ask_p"].values lastTickArray = df_tick["lastprice"].values volumeTickArray = df_tick["volume"].values symbolarray = df_tick["symbol"].values indexFinal = 0 for index, tEnd in enumerate(endArray): dt = endArray[index] start = startArray[index] bidDict = {} askDict = {} bar_vol = voluememin[index] bar_close = closes[index] bar_open = opens[index] bar_low = lows[index] bar_high = highs[index] bar_symbol = symbolarray[index] # for indexTick in range(indexFinal,len(df_tick)): # if tTickArray[indexTick] >= tEnd: # break # elif (tTickArray[indexTick] >= start) & (tTickArray[indexTick] < tEnd): Bp = round(bp1TickArray[0], 4) Ap = round(ap1TickArray[0], 4) LastPrice = round(lastTickArray[0], 4) Volume = volumeTickArray[0] if LastPrice >= Ap: if str(LastPrice) in askDict.keys(): askDict[str(LastPrice)] += Volume else: askDict[str(LastPrice)] = Volume if LastPrice <= Bp: if str(LastPrice) in bidDict.keys(): bidDict[str(LastPrice)] += Volume else: bidDict[str(LastPrice)] = Volume # indexFinal = indexTick bidDictResult, askDictResult = self.process(bidDict, askDict, symbol) bidDictResult = dict( sorted(bidDictResult.items(), key=operator.itemgetter(0)) ) askDictResult = dict( sorted(askDictResult.items(), key=operator.itemgetter(0)) ) prinslist = list(bidDictResult.keys()) asklist = list(askDictResult.values()) bidlist = list(bidDictResult.values()) delta = sum(askDictResult.values()) - sum(bidDictResult.values()) # print(prinslist,asklist,bidlist) # print(len(prinslist),len(bidDictResult),len(askDictResult)) df = pd.DataFrame( { "price": pd.Series([prinslist]), "Ask": pd.Series([asklist]), "Bid": pd.Series([bidlist]), } ) # df=pd.DataFrame({'price':pd.Series(bidDictResult.keys()),'Ask':pd.Series(askDictResult.values()),'Bid':pd.Series(bidDictResult.values())}) df["symbol"] = bar_symbol df["datetime"] = dt df["delta"] = str(delta) df["close"] = bar_close df["open"] = bar_open df["high"] = bar_high df["low"] = bar_low df["volume"] = bar_vol # df['ticktime']=tTickArray[0] df["dj"] = self.GetOrderFlow_dj(df) ofdatadict[symbol] = df # 公众号:松鼠Quant # 主页:www.quant789.com # 本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! # 版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 def GetOrderFlow_dj(self, kData): param = self.param_dict[self.品种] Config = { "Value1": param.失衡, "Value2": param.堆积, "Value4": True, } aryData = kData djcout = 0 # 遍历kData中的每一行,计算djcout指标 for index, row in aryData.iterrows(): kItem = aryData.iloc[index] high = kItem["high"] low = kItem["low"] close = kItem["close"] open = kItem["open"] dtime = kItem["datetime"] price_s = kItem["price"] Ask_s = kItem["Ask"] Bid_s = kItem["Bid"] delta = kItem["delta"] price_s = price_s Ask_s = Ask_s Bid_s = Bid_s gj = 0 xq = 0 gxx = 0 xxx = 0 # 遍历price_s中的每一个元素,计算相关指标 for i in np.arange(0, len(price_s), 1): duiji = { "price": 0, "time": 0, "longshort": 0, } if i == 0: delta = delta order = { "Price": price_s[i], "Bid": {"Value": Bid_s[i]}, "Ask": {"Value": Ask_s[i]}, } # 空头堆积 if i >= 0 and i < len(price_s) - 1: if order["Bid"]["Value"] > Ask_s[i + 1] * int(Config["Value1"]): gxx += 1 gj += 1 if gj >= int(Config["Value2"]) and Config["Value4"] == True: duiji["price"] = price_s[i] duiji["time"] = dtime duiji["longshort"] = -1 if float(duiji["price"]) > 0: djcout += -1 else: gj = 0 # 多头堆积 if i >= 1 and i < len(price_s) - 1: if order["Ask"]["Value"] > Bid_s[i - 1] * int(Config["Value1"]): xq += 1 xxx += 1 if xq >= int(Config["Value2"]) and Config["Value4"] == True: duiji["price"] = price_s[i] duiji["time"] = dtime duiji["longshort"] = 1 if float(duiji["price"]) > 0: djcout += 1 else: xq = 0 # 返回计算得到的djcout值 return djcout # 读取保存的数据 def read_to_csv(self, symbol): # 文件夹路径和文件路径 # 使用正则表达式提取英文字母并重新赋值给symbol param = self.param_dict[symbol] # symbol = ''.join(re.findall('[a-zA-Z]', str(symbol))) folder_path = "traderdata" file_path = os.path.join(folder_path, f"{str(symbol)}_traderdata.csv") # 如果文件夹不存在则创建 if not os.path.exists(folder_path): os.makedirs(folder_path) # 读取保留的模型数据CSV文件 if os.path.exists(file_path): df = pd.read_csv(file_path) if not df.empty and param.kgdata is True: # 选择最后一行数据 row = df.iloc[-1] # 根据CSV文件的列名将数据赋值给相应的属性 param.pos = int(row["pos"]) param.short_trailing_stop_price = float( row["short_trailing_stop_price"] ) param.long_trailing_stop_price = float(row["long_trailing_stop_price"]) param.sl_long_price = float(row["sl_long_price"]) param.sl_shor_price = float(row["sl_shor_price"]) # param.out_long = int(row['out_long']) # param.out_short = int(row['out_short']) print("找到历史交易数据文件,已经更新持仓,止损止盈数据", df.iloc[-1]) param.kgdata = False else: pass # print("没有找到历史交易数据文件", file_path) # 如果没有找到CSV,则初始化变量 pass # 保存数据 def save_to_csv(self, symbol): param = self.param_dict[symbol] # 使用正则表达式提取英文字母并重新赋值给symbol # symbol = ''.join(re.findall('[a-zA-Z]', str(symbol))) # 创建DataFrame data = { "datetime": [trade_dfs[symbol]["datetime"].iloc[-1]], "pos": [param.pos], "short_trailing_stop_price": [param.short_trailing_stop_price], "long_trailing_stop_price": [param.long_trailing_stop_price], "sl_long_price": [param.sl_long_price], "sl_shor_price": [param.sl_shor_price], # 'out_long': [param.out_long], # 'out_short': [param.out_short] } df = pd.DataFrame(data) # 将DataFrame保存到CSV文件 df.to_csv(f"traderdata/{str(symbol)}_traderdata.csv", index=False) # 每日收盘重置数据 def day_data_reset(self, symbol): param = self.param_dict[symbol] sec = "".join(re.findall("[a-zA-Z]", str(symbol))) # 获取当前时间 current_time = datetime.now().time() # 第一时间范围(日盘收盘) clearing_time1_start = s_time(15, 5) clearing_time1_end = s_time(15, 15) # 创建一个标志变量,用于记录是否已经执行过 param.clearing_executed = False # 检查当前时间第一个操作的时间范围内 if ( clearing_time1_start <= current_time <= clearing_time1_end and not param.clearing_executed ): param.clearing_executed = True # 设置标志变量为已执行 trade_dfs[symbol].drop( trade_dfs[symbol].index, inplace=True ) # 清除当天的行情数据 # 检查当前时间是否在第二个操作的时间范围内(夜盘收盘) elif sec in clearing_time_dict.keys(): clearing_time2_start = clearing_time_dict[sec] clearing_time2_end = s_time( clearing_time2_start.hour, clearing_time2_start.minute + 15 ) if ( clearing_time2_start <= current_time <= clearing_time2_end and not param.clearing_executed ): param.clearing_executed = True # 设置标志变量为已执行 trade_dfs[symbol].drop( trade_dfs[symbol].index, inplace=True ) # 清除当天的行情数据 else: param.clearing_executed = False pass return param.clearing_executed def OnRtnTrade(self, pTrade): print("||成交回报||", pTrade) def OnRspOrderInsert(self, pInputOrder, pRspInfo, nRequestID, bIsLast): print("||OnRspOrderInsert||", pInputOrder, pRspInfo, nRequestID, bIsLast) # 订单状态通知 def OnRtnOrder(self, pOrder): print("||订单回报||", pOrder) def cal_sig(self, symbol_queue): while True: try: data = symbol_queue.get( block=True, timeout=5 ) # 如果5秒没收到新的tick行情,则抛出异常 instrument_id = data["InstrumentID"].decode() # 品种代码 size = symbol_queue.qsize() if size > 1: print( f"当前{instrument_id}共享队列长度为{size}, 有点阻塞!!!!!" ) self.read_to_csv(instrument_id) self.day_data_reset(instrument_id) param = self.param_dict[instrument_id] self.品种 = instrument_id self.tickcome(data) trade_df = trade_dfs[instrument_id] # 新K线开始,启动交易程序 and 保存行情数据 self.read_to_csv(instrument_id) # size = symbol_queue.qsize() # if size > 2: # print(f'!!!!!当前{instrument_id}共享队列长度为:',size) if len(trade_df) > param.cont_df: # 检查文件是否存在 csv_file_path = f"traderdata/{instrument_id}_ofdata.csv" if os.path.exists(csv_file_path): # 仅保存最后一行数据 trade_df.tail(1).to_csv( csv_file_path, mode="a", header=False, index=False ) else: # 创建新文件并保存整个DataFrame trade_df.to_csv(csv_file_path, index=False) # 更新跟踪止损价格 if param.long_trailing_stop_price > 0 and param.pos > 0: # print('datetime+sig: ',dt,'旧多头出线',param.long_trailing_stop_price,'low',self.low[0]) param.long_trailing_stop_price = ( trade_df["low"].iloc[-1] if param.long_trailing_stop_price < trade_df["low"].iloc[-1] else param.long_trailing_stop_price ) self.save_to_csv(instrument_id) # print('datetime+sig: ',dt,'多头出线',param.long_trailing_stop_price) if param.short_trailing_stop_price > 0 and param.pos < 0: # print('datetime+sig: ',dt,'旧空头出线',param.short_trailing_stop_price,'high',self.high[0]) param.short_trailing_stop_price = ( trade_df["high"].iloc[-1] if trade_df["high"].iloc[-1] < param.short_trailing_stop_price else param.short_trailing_stop_price ) self.save_to_csv(instrument_id) # print('datetime+sig: ',dt,'空头出线',param.short_trailing_stop_price) param.out_long = param.long_trailing_stop_price * ( 1 - param.trailing_stop_percent ) param.out_short = param.short_trailing_stop_price * ( 1 + param.trailing_stop_percent ) # print('datetime+sig: ',dt,'空头出线',param.out_short) # print('datetime+sig: ',dt,'多头出线',param.out_long) # 跟踪出场 if param.out_long > 0: print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "预设——多头止盈——", "TR", param.out_long, "low", trade_df["low"].iloc[-1], ) if ( trade_df["low"].iloc[-1] < param.out_long and param.pos > 0 and param.sl_long_price > 0 and trade_df["low"].iloc[-1] > param.sl_long_price ): print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "多头止盈", "TR", param.out_long, "low", trade_df["low"].iloc[-1], ) # 平多 # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["BidPrice1"] - param.py, # param.Lots, # b"1", # b"1", # ) # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["BidPrice1"] - param.py, # param.Lots, # b"1", # b"3", # ) param.long_trailing_stop_price = 0 param.out_long = 0 param.sl_long_price = 0 param.pos = 0 self.save_to_csv(instrument_id) if param.out_short > 0: print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "预设——空头止盈——: ", "TR", param.out_short, "high", trade_df["high"].iloc[-1], ) if ( trade_df["high"].iloc[-1] > param.out_short and param.pos < 0 and param.sl_shor_price > 0 and trade_df["high"].iloc[-1] < param.sl_shor_price ): print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "空头止盈: ", "TR", param.out_short, "high", trade_df["high"].iloc[-1], ) # 平空 # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["AskPrice1"] + param.py, # param.Lots, # b"0", # b"1", # ) # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["AskPrice1"] + param.py, # param.Lots, # b"0", # b"3", # ) param.short_trailing_stop_price = 0 param.sl_shor_price = 0 self.out_shor = 0 param.pos = 0 self.save_to_csv(instrument_id) # 固定止损 fixed_stop_loss_L = param.sl_long_price * ( 1 - param.fixed_stop_loss_percent ) if param.pos > 0: print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "预设——多头止损", "SL", fixed_stop_loss_L, "close", trade_df["close"].iloc[-1], ) if ( param.sl_long_price > 0 and fixed_stop_loss_L > 0 and param.pos > 0 and trade_df["close"].iloc[-1] < fixed_stop_loss_L ): print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "多头止损", "SL", fixed_stop_loss_L, "close", trade_df["close"].iloc[-1], ) # 平多 # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["BidPrice1"] - param.py, # param.Lots, # b"1", # b"1", # ) # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["BidPrice1"] - param.py, # param.Lots, # b"1", # b"3", # ) param.long_trailing_stop_price = 0 param.sl_long_price = 0 param.out_long = 0 param.pos = 0 self.save_to_csv(instrument_id) fixed_stop_loss_S = param.sl_shor_price * ( 1 + param.fixed_stop_loss_percent ) if param.pos < 0: print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "预设——空头止损", "SL", fixed_stop_loss_S, "close", trade_df["close"].iloc[-1], ) if ( param.sl_shor_price > 0 and fixed_stop_loss_S > 0 and param.pos < 0 and trade_df["close"].iloc[-1] > fixed_stop_loss_S ): print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "空头止损", "SL", fixed_stop_loss_S, "close", trade_df["close"].iloc[-1], ) # 平空 # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["AskPrice1"] + param.py, # param.Lots, # b"0", # b"1", # ) # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["AskPrice1"] + param.py, # param.Lots, # b"0", # b"3", # ) param.short_trailing_stop_price = 0 param.sl_shor_price = 0 param.out_short = 0 param.pos = 0 self.save_to_csv(instrument_id) # 日均线 # AROONOSC :https://zhuanlan.zhihu.com/p/645010879 if len(trade_df["close"]) >= 120: trade_df["dayma"] = trade_df["close"][-120:].mean() print("trade_df长度:", len(trade_df["close"])) print("120条之上的dayma的值:", trade_df["dayma"]) else: trade_df["dayma"] = trade_df["close"].mean() print("120条之下的dayma的值:", trade_df["dayma"]) print("trade_df长度:", len(trade_df["close"])) # day_df = {} # day_df = futures_main_day(trade_df["symbol"], 20) # day_df["5day_ma"] = day_df["收盘价"].rolling(window=5).mean() # trade_df["dayma"] = day_df["5day_ma"].iloc[-1] # print("5日均线的值:", trade_df["dayma"]) # trade_df["aroon_osc"] = tb.AROONOSC(trade_df["high"], trade_df["low"], 5) trade_df["rinei_T3"] = tb.T3(np.array(trade_df["dayma"])) # 计算累积的delta值 trade_df["delta"] = trade_df["delta"].astype(float) trade_df["delta累计"] = trade_df["delta"].cumsum() # 获取第三大值和第三小值 abs_delta = max(trade_df["delta"].iloc[-21:-2], default=0) - min( trade_df["delta"].iloc[-21:-2], default=0 ) print("abs_delta:", abs_delta) # third_largest_delta = np.sort(arr_delta)[-2] # third_smallest_delta = np.sort(arr_delta)[2] abs_delta累计 = max( trade_df["delta累计"].iloc[-21:-2], default=0 ) - min(trade_df["delta累计"].iloc[-21:-2], default=0) print("abs_delta累计:", abs_delta累计) # third_largest_delta累计 = np.sort(arr_delta累计)[-2] # third_smallest_delta累计 = np.sort(arr_delta累计)[2] # 大于日均线 # 开多1 = trade_df["dayma"].iloc[-1] > 0 and trade_df["close"].iloc[-1] > trade_df["dayma"].iloc[-1] # 开多1 = trade_df["aroon_osc"].iloc[-1] > 0 开多1 = trade_df["close"].iloc[-1] > trade_df["rinei_T3"].iloc[-1] # 累计多空净量大于X # 开多4 = ( # trade_df["delta累计"].iloc[-1] > param.sum_delta and trade_df["delta"].iloc[-1] > param.delta # ) 开多4 = trade_df["delta累计"].iloc[-1] > ( max(trade_df["delta累计"].iloc[-21:-2], default=0) - 0.1 * abs_delta累计 ) and ( trade_df["delta"].iloc[-1] > max(trade_df["delta"].iloc[-21:-2], default=0) - 0.1 * abs_delta ) # 开多4 = trade_df["delta累计"].iloc[-1] > np.sort(trade_df["delta累计"].iloc[-21:-2], default=0)[-2] and trade_df["delta"].iloc[-1] > np.sort(trade_df["delta"].iloc[-21:-2], default=0)[-2] # 开多4 = trade_df["delta累计"].iloc[-1] > third_largest_delta累计 and trade_df["delta"].iloc[-1] > third_largest_delta # 小于日均线 # 开空1 = trade_df["dayma"].iloc[-1] > 0 and trade_df["close"].iloc[-1] < trade_df["dayma"].iloc[-1] # 开空1 = trade_df["aroon_osc"].iloc[-1] < 0 开空1 = trade_df["close"].iloc[-1] < trade_df["rinei_T3"].iloc[-1] # 累计多空净量小于X 开空4 = trade_df["delta累计"].iloc[-1] < ( min(trade_df["delta累计"].iloc[-21:-2], default=0) + 0.1 * abs_delta累计 ) and ( trade_df["delta"].iloc[-1] < min(trade_df["delta"].iloc[-21:-2], default=0) + 0.1 * abs_delta ) # 开空4 = trade_df["delta累计"].iloc[-1] < np.sort(trade_df["delta累计"].iloc[-21:-2], default=0)[2] and trade_df["delta"].iloc[-1] < np.sort(trade_df["delta"].iloc[-21:-2], default=0)[2] # 开空4 = trade_df["delta累计"].iloc[-1] < third_smallest_delta累计 and trade_df["delta"].iloc[-1] < third_smallest_delta 开多组合 = 开多1 and 开多4 and trade_df["dj"].iloc[-1] > param.dj_X 开空条件 = 开空1 and 开空4 and trade_df["dj"].iloc[-1] < -param.dj_X 平多条件 = trade_df["dj"].iloc[-1] < -param.dj_X 平空条件 = trade_df["dj"].iloc[-1] > param.dj_X # 开仓 # 多头开仓条件 if param.pos < 0 and 平空条件: print( "平空: ", "ExchangeID: ", data["ExchangeID"], "InstrumentID", data["InstrumentID"], "AskPrice1", data["AskPrice1"] + param.py, ) # 平空 # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["AskPrice1"] + param.py, # param.Lots, # b"0", # b"1", # ) # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["AskPrice1"] + param.py, # param.Lots, # b"0", # b"3", # ) param.pos = 0 param.sl_shor_price = 0 param.short_trailing_stop_price = 0 print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "反手平空:", "平仓价格:", data["AskPrice1"] + param.py, "堆积数:", trade_df["dj"].iloc[-1], ) self.save_to_csv(instrument_id) # 发送邮件 # text = f"平空交易: 交易品种为{data['InstrumentID']}, 交易时间为{trade_df['datetime'].iloc[-1]}, 反手平空的平仓价格为{data['AskPrice1']+param.py}, 交易手数位{param.Lots}" text = f"C_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, C_S_T_Price:{data['AskPrice1'] + param.py}, T_Lots:{param.Lots}" send_mail(text) if param.pos == 0 and 开多组合: print( "开多: ", "ExchangeID: ", data["ExchangeID"], "InstrumentID", data["InstrumentID"], "AskPrice1", data["AskPrice1"] + param.py, ) # 开多 # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["AskPrice1"] + param.py, # param.Lots, # b"0", # b"0", # ) print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "多头开仓", "开仓价格:", data["AskPrice1"] + param.py, "堆积数:", trade_df["dj"].iloc[-1], ) param.pos = 1 param.long_trailing_stop_price = data["AskPrice1"] param.sl_long_price = data["AskPrice1"] self.save_to_csv(instrument_id) # 发送邮件 # text = f"开多交易: 交易品种为{data['InstrumentID']}, 交易时间为{trade_df['datetime'].iloc[-1]}, 多头开仓的开仓价格{data['AskPrice1']+param.py}, 交易手数位{param.Lots}, 上一根K线的delta值为{trade_df['delta'].iloc[-1]}, 近周期内K线的delta值的最大值为{max(trade_df['delta'].iloc[-21:-2])},上一根K线的delta累计值为{trade_df['delta累计'].iloc[-1]},近周期内K线的delta累计值的最大值为{max(trade_df['delta累计'].iloc[-21:-2])}" text = f"O_L_T ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, O_L_T_Price:{data['AskPrice1'] + param.py}, T_Lots:{param.Lots}" send_mail(text) if param.pos > 0 and 平多条件: print( "平多: ", "ExchangeID: ", data["ExchangeID"], "InstrumentID", data["InstrumentID"], "BidPrice1", data["BidPrice1"] - param.py, ) # 平多 # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["BidPrice1"] - param.py, # param.Lots, # b"1", # b"1", # ) # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["BidPrice1"] - param.py, # param.Lots, # b"1", # b"3", # ) param.pos = 0 param.long_trailing_stop_price = 0 param.sl_long_price = 0 print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "反手平多", "平仓价格:", data["BidPrice1"] - param.py, "堆积数:", trade_df["dj"].iloc[-1], ) self.save_to_csv(instrument_id) # 发送邮件 # text = f"平多交易: 交易品种为{data['InstrumentID']}, 交易时间为{trade_df['datetime'].iloc[-1]}, 反手平多的平仓价格{data['BidPrice1']-param.py}, 交易手数位{param.Lots}" text = f"C_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, C_L_T_Price:{data['BidPrice1'] - param.py}, T_Lots:{param.Lots}" send_mail(text) if param.pos == 0 and 开空条件: print( "开空: ", "ExchangeID: ", data["ExchangeID"], "InstrumentID", data["InstrumentID"], "BidPrice1", data["BidPrice1"], ) # 开空 # self.insert_order( # data["ExchangeID"], # data["InstrumentID"], # data["BidPrice1"] - param.py, # param.Lots, # b"1", # b"0", # ) print( "datetime+sig: ", trade_df["datetime"].iloc[-1], "空头开仓", "开仓价格:", data["BidPrice1"] - param.py, "堆积数:", trade_df["dj"].iloc[-1], ) param.pos = -1 param.short_trailing_stop_price = data["BidPrice1"] param.sl_shor_price = data["BidPrice1"] self.save_to_csv(instrument_id) # 发送邮件 # text = f"开空交易: 交易品种为{data['InstrumentID']}, 交易时间为{trade_df['datetime'].iloc[-1]}, 空头开仓的开仓价格{data['BidPrice1']-param.py}, 交易手数位{param.Lots}, 上一根K线的delta值为{trade_df['delta'].iloc[-1]},近周期内K线的delta值的最小值为{min(trade_df['delta'].iloc[-21:-2])}, 上一根K线的delta累计值为{trade_df['delta累计'].iloc[-1]},近周期内K线的delta累计值的最小值为{min(trade_df['delta累计'].iloc[-21:-2])}" text = f"O_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, O_S_T_Price:{data['BidPrice1'] - param.py}, T_Lots:{param.Lots}" send_mail(text) print(trade_df) param.cont_df = len(trade_df) except queue.Empty: # print(f"当前合约队列为空,等待新数据插入。") pass # 将CTP推送的行情数据分发给对应线程队列去执行 def distribute_tick(self): while True: if self.status == 0: data = None while not self.md_queue.empty(): data = self.md_queue.get(block=False) instrument_id = data["InstrumentID"].decode() # 品种代码 try: self.queue_dict[instrument_id].put( data, block=False ) # 往对应合约队列中插入行情 # print(f"{instrument_id}合约数据插入。") except queue.Full: # 当某个线程阻塞导致对应队列容量超限时抛出异常,不会影响其他合约的信号计算 print( f"{instrument_id}合约信号计算阻塞导致对应队列已满,请检查对应代码逻辑后重启。" ) else: time.sleep(1) def start(self, param_dict): threads = [] self.param_dict = param_dict for symbol in param_dict.keys(): trade_dfs[symbol] = pd.DataFrame({}) self.queue_dict[symbol] = queue.Queue( 20 ) # 为每个合约创建一个限制数为10的队列,当计算发生阻塞导致队列达到限制数时会抛出异常 t = threading.Thread( target=self.cal_sig, args=(self.queue_dict[symbol],) ) # 为每个合约单独创建一个线程计算开仓逻辑 threads.append(t) t.start() self.distribute_tick() for t in threads: t.join() def run_trader( param_dict, broker_id, td_server, investor_id, password, app_id, auth_code, md_queue=None, page_dir="", private_resume_type=2, public_resume_type=2, ): my_trader = MyTrader( broker_id, td_server, investor_id, password, app_id, auth_code, md_queue, page_dir, private_resume_type, public_resume_type, ) my_trader.start(param_dict) if __name__ == "__main__": # global symbol # 公众号:松鼠Quant # 主页:www.quant789.com # 本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! # 版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 # 注意:运行前请先安装好algoplus, # pip install AlgoPlus # http://www.algo.plus/ctp/python/0103001.html # 实盘参数字典,需要实盘交易的合约,新建对应的参数对象即可,以下参数仅供测试使用,不作为实盘参考!!!! # 实盘参数字典,需要实盘交易的合约,新建对应的参数对象即可,以下参数仅供测试使用,不作为实盘参考!!!! # 实盘参数字典,需要实盘交易的合约,新建对应的参数对象即可,以下参数仅供测试使用,不作为实盘参考!!!! param_dict = {} param_dict["IM2503"] = ParamObj( symbol="IM2503", Lots=1, py=5, trailing_stop_percent=0.01, fixed_stop_loss_percent=0.02, dj_X=2, delta=200, sum_delta=200, 失衡=3, 堆积=3, 周期="2T", ) # param_dict["IF2503"] = ParamObj( # symbol="IF2503", # Lots=1, # py=5, # trailing_stop_percent=0.005, # fixed_stop_loss_percent=0.01, # dj_X=1, # delta=200, # sum_delta=200, # 失衡=3, # 堆积=3, # 周期="2T", # ) # param_dict["j2505"] = ParamObj( # symbol="j2505", # Lots=1, # py=5, # trailing_stop_percent=0.02, # fixed_stop_loss_percent=0.01, # dj_X=0, # delta=150, # sum_delta=200, # 失衡=3, # 堆积=3, # 周期="2T", # ) # param_dict["IH2503"] = ParamObj( # symbol="IH2503", # Lots=1, # py=5, # trailing_stop_percent=0.005, # fixed_stop_loss_percent=0.01, # dj_X=1, # delta=200, # sum_delta=200, # 失衡=3, # 堆积=3, # 周期="2T", # ) # param_dict['au2406'] = ParamObj(symbol='au2406', Lots=1, py=5, trailing_stop_percent=0.02, fixed_stop_loss_percent=0.01,dj_X=0,delta=15,sum_delta=20,失衡=3,堆积=3,周期='1T') # param_dict['sc2405'] = ParamObj(symbol='sc2405', Lots=1, py=5, trailing_stop_percent=0.02, fixed_stop_loss_percent=0.01,dj_X=0,delta=15,sum_delta=20,失衡=3,堆积=3,周期='1T') # param_dict['bc2406'] = ParamObj(symbol='bc2406', Lots=1, py=5, trailing_stop_percent=0.02, fixed_stop_loss_percent=0.01,dj_X=0,delta=15,sum_delta=20,失衡=3,堆积=3,周期='1T') # param_dict['lu2406'] = ParamObj(symbol='lu2406', Lots=1, py=5, trailing_stop_percent=0.02, fixed_stop_loss_percent=0.01,dj_X=0,delta=15,sum_delta=20,失衡=3,堆积=3,周期='1T') # 用simnow模拟,不要忘记屏蔽下方实盘的future_account字典 # SIMULATE_SERVER = { # '电信1': {'BrokerID': 9999, 'TDServer': "180.168.146.187:10201", 'MDServer': '180.168.146.187:10211', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'}, # '电信2': {'BrokerID': 9999, 'TDServer': "180.168.146.187:10202", 'MDServer': '180.168.146.187:10212', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'}, # '移动': {'BrokerID': 9999, 'TDServer': "218.202.237.33:10203", 'MDServer': '218.202.237.33:10213', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'}, # 'TEST': {'BrokerID': 9999, 'TDServer': "180.168.146.187:10130", 'MDServer': '180.168.146.187:10131', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'}, # 'N视界': {'BrokerID': 10010, 'TDServer': "210.14.72.12:4600", 'MDServer': '210.14.72.12:4602', 'AppID': '', 'AuthCode': ''}, # } # BrokerID统一为:9999 # 支持上期所期权、能源中心期权、中金所期权、广期所期权、郑商所期权、大商所期权 # 第一组 # Trade Front:180.168.146.187:10201,Market Front:180.168.146.187:10211;【电信】(看穿式前置,使用监控中心生产秘钥) # 第二组 # Trade Front:180.168.146.187:10202,Market Front:180.168.146.187:10212;【电信】(看穿式前置,使用监控中心生产秘钥) # 第三组 # Trade Front:218.202.237.33:10203,Market Front:218.202.237.33:10213;【移动】(看穿式前置,使用监控中心生产秘钥) # 用户注册后,默认的APPID为simnow_client_test,认证码为0000000000000000(16个0),默认开启终端认证,程序化用户可以选择不开终端认证接入。 future_account = get_simulate_account( investor_id="223828", # simnow账户,注意是登录账户的ID,SIMNOW个人首页查看 password="Zj1234!@#%", # simnow密码 server_name="电信2", # 电信1、电信2、移动、TEST、N视界 subscribe_list=list(param_dict.keys()), # 合约列表 ) # 实盘用这个,不要忘记屏蔽上方simnow的future_account字典 # future_account = FutureAccount( # broker_id='9999', # 期货公司BrokerID # server_dict={'TDServer': "180.168.146.187:10201", 'MDServer': '180.168.146.187:10211'}, # TDServer为交易服务器,MDServer为行情服务器。服务器地址格式为"ip:port。" # reserve_server_dict={}, # 备用服务器地址 # investor_id='223828', # 账户 # password='Zj1234!@#%', # 密码 # app_id='simnow_client_test', # 认证使用AppID # auth_code='0000000000000000', # 认证使用授权码 # subscribe_list=list(param_dict.keys()), # 订阅合约列表 # md_flow_path='./log', # MdApi流文件存储地址,默认MD_LOCATION # td_flow_path='./log', # TraderApi流文件存储地址,默认TD_LOCATION # ) # 实盘用这个,不要忘记屏蔽上方simnow的future_account字典 # future_account = FutureAccount( # broker_id='8888', # 期货公司BrokerID # server_dict={'TDServer': "103.140.14.210:43205", 'MDServer': '103.140.14.210:43173'}, # TDServer为交易服务器,MDServer为行情服务器。服务器地址格式为"ip:port。" # reserve_server_dict={}, # 备用服务器地址 # investor_id='155878', # 账户 # password='Zj82334475', # 密码 # app_id='vntech_vnpy_2.0', # 认证使用AppID # auth_code='N46EKN6TJ9U7V06V', # 认证使用授权码 # subscribe_list=list(param_dict.keys()), # 订阅合约列表 # md_flow_path='./log', # MdApi流文件存储地址,默认MD_LOCATION # td_flow_path='./log', # TraderApi流文件存储地址,默认TD_LOCATION # ) print("开始", len(future_account.subscribe_list)) # 共享队列 share_queue = Queue(maxsize=200) # 行情进程 md_process = Process(target=run_tick_engine, args=(future_account, [share_queue])) # 交易进程 trader_process = Process( target=run_trader, args=( param_dict, future_account.broker_id, future_account.server_dict["TDServer"], future_account.investor_id, future_account.password, future_account.app_id, future_account.auth_code, share_queue, # 队列 future_account.td_flow_path, ), ) md_process.start() trader_process.start() md_process.join() trader_process.join()