''' #公众号:松鼠Quant #主页:www.quant789.com #本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! #版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 该代码的主要目的是处理Tick数据并生成交易信号。代码中定义了一个tickcome函数,它接收到Tick数据后会进行一系列的处理,包括构建Tick字典、更新上一个Tick的成交量、保存Tick数据、生成K线数据等。其中涉及到的一些函数有: on_tick(tick): 处理单个Tick数据,根据Tick数据生成K线数据。 tickdata(df, symbol): 处理Tick数据,生成K线数据。 orderflow_df_new(df_tick, df_min, symbol): 处理Tick和K线数据,生成订单流数据。 GetOrderFlow_dj(kData): 计算订单流的信号指标。 除此之外,代码中还定义了一个MyTrader类,继承自TraderApiBase,用于实现交易相关的功能。 #公众号:松鼠Quant #主页:www.quant789.com #本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! #版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 ''' from concurrent.futures import ThreadPoolExecutor from multiprocessing import Process, Queue import queue import threading from AlgoPlus.CTP.MdApi import run_tick_engine from AlgoPlus.CTP.FutureAccount import get_simulate_account from AlgoPlus.CTP.FutureAccount import FutureAccount from AlgoPlus.CTP.TraderApiBase import TraderApiBase from AlgoPlus.ta.time_bar import tick_to_bar import pandas as pd from datetime import datetime, timedelta from datetime import time as s_time import operator import time import numpy as np import os import csv import re # # jerome:增加tushare接口 # import tushare as ts # ts.set_token('78282dabb315ee578fb73a9b328f493026e97d5af709acb331b7b348') # pro = ts.pro_api() tickdatadict = {} # 存储Tick数据的字典 quotedict = {} # 存储行情数据的字典 ofdatadict = {} # 存储K线数据的字典 trade_dfs = {} #pd.DataFrame({}) # 存储交易数据的DataFrame对象 previous_volume = {} # 上一个Tick的成交量 tsymbollist={} #过渡tick 截面数据_资金流向={} top_two_symbols=[] # Jerome:夜盘商品期货交易品种 commodity_night_dict = {'sc': s_time(2,30), 'bc': s_time(1,0), 'lu': s_time(23,0), 'nr': s_time(23,0),'au': s_time(2,30), 'ag': s_time(2,30), 'ss': s_time(1,0), 'sn': s_time(1,0), 'ni': s_time(1,0), 'pb': s_time(1,0),'zn': s_time(1,0), 'al': s_time(1,0), 'cu': s_time(1,0), 'ru': s_time(23,0), 'rb': s_time(23,0), 'hc': s_time(23,0), 'fu': s_time(23,0), 'bu': s_time(23,0), 'sp': s_time(23,0), 'PF': s_time(23,0), 'SR': s_time(23,0), 'CF': s_time(23,0), 'CY': s_time(23,0), 'RM': s_time(23,0), 'MA': s_time(23,0), 'TA': s_time(23,0), 'ZC': s_time(23,0), 'FG': s_time(23,0), 'OI': s_time(23,0), 'SA': s_time(23,0), 'p': s_time(23,0), 'j': s_time(23,0), 'jm': s_time(23,0), 'i': s_time(23,0), 'l': s_time(23,0), 'v': s_time(23,0), 'pp': s_time(23,0), 'eg': s_time(23,0), 'c': s_time(23,0), 'cs': s_time(23,0), 'y': s_time(23,0), 'm': s_time(23,0), 'a': s_time(23,0), 'b': s_time(23,0), 'rr': s_time(23,0), 'eb': s_time(23,0), 'pg': s_time(23,0), 'SH': s_time(23,00)} 合约信息={ "A": {"合约单位": 10, "保证金": 0.12}, "AG": {"合约单位": 15, "保证金": 0.12}, "AL": {"合约单位": 5, "保证金": 0.12}, "AO": {"合约单位": 20, "保证金": 0.09}, "AP": {"合约单位": 10, "保证金": 0.1}, "AU": {"合约单位": 1000, "保证金": 0.1}, "B": {"合约单位": 10, "保证金": 0.09}, "BC": {"合约单位": 5, "保证金": 0.12}, "BR": {"合约单位": 5, "保证金": 0.12}, "BU": {"合约单位": 10, "保证金": 0.15}, "C": {"合约单位": 10, "保证金": 0.12}, "CF": {"合约单位": 5, "保证金": 0.07}, "CJ": {"合约单位": 5, "保证金": 0.12}, "CS": {"合约单位": 10, "保证金": 0.09}, "CU": {"合约单位": 5, "保证金": 0.12}, "CY": {"合约单位": 5, "保证金": 0.07}, "EB": {"合约单位": 5, "保证金": 0.12}, "EC": {"合约单位": 50, "保证金": 0.12}, "EG": {"合约单位": 10, "保证金": 0.12}, "FB": {"合约单位": 10, "保证金": 0.1}, "FG": {"合约单位": 20, "保证金": 0.09}, "FU": {"合约单位": 10, "保证金": 0.15}, "HC": {"合约单位": 10, "保证金": 0.13}, "I": {"合约单位": 100, "保证金": 0.13}, "IC": {"合约单位": 1, "保证金": 0.14}, "IF": {"合约单位": 1, "保证金": 0.12}, "IH": {"合约单位": 1, "保证金": 0.12}, "IM": {"合约单位": 1, "保证金": 0.15}, "J": {"合约单位": 100, "保证金": 0.2}, "JD": {"合约单位": 10, "保证金": 0.09}, "JM": {"合约单位": 60, "保证金": 0.2}, "L": {"合约单位": 5, "保证金": 0.11}, "LC": {"合约单位": 1, "保证金": 0.09}, "LH": {"合约单位": 16, "保证金": 0.15}, "LU": {"合约单位": 10, "保证金": 0.15}, "M": {"合约单位": 10, "保证金": 0.1}, "MA": {"合约单位": 10, "保证金": 0.08}, "NI": {"合约单位": 1, "保证金": 0.19}, "NR": {"合约单位": 10, "保证金": 0.1}, "OI": {"合约单位": 10, "保证金": 0.09}, "P": {"合约单位": 10, "保证金": 0.12}, "PB": {"合约单位": 5, "保证金": 0.14}, "PF": {"合约单位": 5, "保证金": 0.08}, "PG": {"合约单位": 20, "保证金": 0.13}, "PK": {"合约单位": 5, "保证金": 0.08}, "PP": {"合约单位": 5, "保证金": 0.11}, "PX": {"合约单位": 5, "保证金": 0.12}, "RB": {"合约单位": 10, "保证金": 0.13}, "RM": {"合约单位": 10, "保证金": 0.09}, "RR": {"合约单位": 10, "保证金": 0.06}, "RU": {"合约单位": 10, "保证金": 0.1}, "SA": {"合约单位": 20, "保证金": 0.09}, "SC": {"合约单位": 1000, "保证金": 0.15}, "SF": {"合约单位": 5, "保证金": 0.12}, "SH": {"合约单位": 30, "保证金": 0.09}, "SI": {"合约单位": 5, "保证金": 0.1}, "SM": {"合约单位": 5, "保证金": 0.12}, "SN": {"合约单位": 1, "保证金": 0.14}, "SP": {"合约单位": 10, "保证金": 0.15}, "SR": {"合约单位": 10, "保证金": 0.07}, "SS": {"合约单位": 5, "保证金": 0.14}, "T": {"合约单位": 10000, "保证金": 0.02}, "TA": {"合约单位": 5, "保证金": 0.07}, "TF": {"合约单位": 10000, "保证金": 0.012}, "TL": {"合约单位": 10000, "保证金": 0.035}, "TS": {"合约单位": 20000, "保证金": 0.005}, "UR": {"合约单位": 20, "保证金": 0.08}, "V": {"合约单位": 5, "保证金": 0.11}, "Y": {"合约单位": 10, "保证金": 0.09}, "ZN": {"合约单位": 5, "保证金": 0.14}, } fees_df = pd.read_csv('./futures_fees_info.csv', usecols= [1, 4, 17, 19, 25],names=['合约', '合约乘数', '做多保证金率', '做空保证金率', '品种代码']) contacts_df = pd.read_csv('./main_contacts.csv', usecols= [16, 17],names=['主连代码', '品种代码']) clearing_time_dict = {'sc': s_time(2,30), 'bc': s_time(1,0), 'lu': s_time(23,0), 'nr': s_time(23,0),'au': s_time(2,30), 'ag': s_time(2,30), 'ss': s_time(1,0), 'sn': s_time(1,0), 'ni': s_time(1,0), 'pb': s_time(1,0),'zn': s_time(1,0), 'al': s_time(1,0), 'cu': s_time(1,0), 'ru': s_time(23,0), 'rb': s_time(23,0), 'hc': s_time(23,0), 'fu': s_time(23,0), 'bu': s_time(23,0), 'sp': s_time(23,0), 'PF': s_time(23,0), 'SR': s_time(23,0), 'CF': s_time(23,0), 'CY': s_time(23,0), 'RM': s_time(23,0), 'MA': s_time(23,0), 'TA': s_time(23,0), 'ZC': s_time(23,0), 'FG': s_time(23,0), 'OI': s_time(23,0), 'SA': s_time(23,0), 'p': s_time(23,0), 'j': s_time(23,0), 'jm': s_time(23,0), 'i': s_time(23,0), 'l': s_time(23,0), 'v': s_time(23,0), 'pp': s_time(23,0), 'eg': s_time(23,0), 'c': s_time(23,0), 'cs': s_time(23,0), 'y': s_time(23,0), 'm': s_time(23,0), 'a': s_time(23,0), 'b': s_time(23,0), 'rr': s_time(23,0), 'eb': s_time(23,0), 'pg': s_time(23,0)} #交易程序--------------------------------------------------------------------------------------------------------------------------------------------------------------------- class ParamObj: # 策略需要用到的参数,在新建合约对象的时候传入!! # 策略需要用到的参数,在新建合约对象的时候传入!! # 策略需要用到的参数,在新建合约对象的时候传入!! symbol = None #合约名称 Lots = None #下单手数 py = None #设置委托价格的偏移,更加容易促成成交 trailing_stop_percent = None #跟踪出场参数 fixed_stop_loss_percent = None #固定出场参数 dj_X = None #开仓的堆积参数 delta = None #开仓的delta参数 sum_delta = None #开仓的delta累积参数 失衡=None 堆积=None 周期=None # 策略需要用到的变量 cont_df = 0 pos = 0 clearing_executed = False kgdata = True 多头止损价格 = 0 空头止损价格 = 0 多头成本价格 = 0 空头成本价格 = 0 多头开仓历时=0 空头开仓历时=0 平_多时间=4 平_空时间=4 def __init__(self, symbol, Lots, py,dj_X, delta, sum_delta,失衡,堆积,周期,平_多时间,平_空时间): self.symbol = symbol self.Lots = Lots self.py = py self.dj_X = dj_X self.delta = delta self.sum_delta = sum_delta self.失衡=失衡 self.堆积=堆积 self.周期=周期 self.平_多时间=平_多时间 self.平_空时间=平_空时间 class MyTrader(TraderApiBase): def __init__(self, broker_id, td_server, investor_id, password, app_id, auth_code, md_queue=None, page_dir='', private_resume_type=2, public_resume_type=2): self.param_dict = {} self.queue_dict = {} self.品种=' ' def tickcome(self,md_queue): global previous_volume data=md_queue instrument_id = data['InstrumentID'].decode() # 品种代码 ActionDay = data['ActionDay'].decode() # 交易日日期 update_time = data['UpdateTime'].decode() # 更新时间 update_millisec = str(data['UpdateMillisec']) # 更新毫秒数 created_at = ActionDay[:4] + '-' + ActionDay[4:6] + '-' + ActionDay[6:] + ' ' + update_time + '.' + update_millisec #创建时间 # 构建tick字典 tick = { 'symbol': instrument_id, # 品种代码和交易所ID 'created_at':datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S.%f"), #'created_at': created_at, # 创建时间 'price': float(data['LastPrice']), # 最新价 'last_volume': int(data['Volume']) - previous_volume.get(instrument_id, 0) if previous_volume.get(instrument_id, 0) != 0 else 0, # 瞬时成交量 'bid_p': float(data['BidPrice1']), # 买价 'bid_v': int(data['BidVolume1']), # 买量 'ask_p': float(data['AskPrice1']), # 卖价 'ask_v': int(data['AskVolume1']), # 卖量 'UpperLimitPrice': float(data['UpperLimitPrice']), # 涨停价 'LowerLimitPrice': float(data['LowerLimitPrice']), # 跌停价 'TradingDay': data['TradingDay'].decode(), # 交易日日期 'cum_volume': int(data['Volume']), # 最新总成交量 'cum_amount': float(data['Turnover']), # 最新总成交额 'cum_position': int(data['OpenInterest']), # 合约持仓量 } # print('&&&&&&&&',instrument_id, tick['created_at'],'vol:',tick['last_volume']) # 更新上一个Tick的成交量 previous_volume[instrument_id] = int(data['Volume']) if tick['last_volume']>0: #print(tick['created_at'],'vol:',tick['last_volume']) # 处理Tick数据 self.on_tick(tick) def can_time(self,hour, minute): hour = str(hour) minute = str(minute) if len(minute) == 1: minute = "0" + minute return int(hour + minute) def on_tick(self,tick): tm=self.can_time(tick['created_at'].hour,tick['created_at'].minute) #print(tick['symbol']) #print(1) #if tm>1500 and tm<2100 : # return if tick['last_volume']==0: return quotes = tick timetick=str(tick['created_at']).replace('+08:00', '') tsymbol=tick['symbol'] if tsymbol not in tsymbollist.keys(): # 获取tick的买卖价和买卖量 tsymbollist[tsymbol]=tick bid_p=quotes['bid_p'] ask_p=quotes['ask_p'] bid_v=quotes['bid_v'] ask_v=quotes['ask_v'] else: # 获取上一个tick的买卖价和买卖量 rquotes =tsymbollist[tsymbol] bid_p=rquotes['bid_p'] ask_p=rquotes['ask_p'] bid_v=rquotes['bid_v'] ask_v=rquotes['ask_v'] tsymbollist[tsymbol]=tick tick_dt=pd.DataFrame({'datetime':timetick,'symbol':tick['symbol'],'mainsym':tick['symbol'].rstrip('0123456789').upper(),'lastprice':tick['price'], 'vol':tick['last_volume'], 'bid_p':bid_p,'ask_p':ask_p,'bid_v':bid_v,'ask_v':ask_v},index=[0]) sym = tick_dt['symbol'][0] #print(tick_dt) self.tickdata(tick_dt,sym) #公众号:松鼠Quant #主页:www.quant789.com #本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! #版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 def data_of(self,symbol, df): global trade_dfs # 将df数据合并到trader_df中 # if symbol not in trade_dfs.keys(): # trade_df = pd.DataFrame({}) # else: # trade_df = trade_dfs[symbol] trade_dfs[symbol] = pd.concat([trade_dfs[symbol], df], ignore_index=True) # print('!!!!!!!!!!!trader_df: ', symbol, df['datetime'].iloc[-1]) #print(trader_df) def process(self,bidDict, askDict, symbol): try: # 尝试从quotedict中获取对应品种的报价数据 dic = quotedict[symbol] bidDictResult = dic['bidDictResult'] askDictResult = dic['askDictResult'] except: # 如果获取失败,则初始化bidDictResult和askDictResult为空字典 bidDictResult, askDictResult = {}, {} # 将所有买盘字典和卖盘字典的key合并,并按升序排序 sList = sorted(set(list(bidDict.keys()) + list(askDict.keys()))) # 遍历所有的key,将相同key的值进行累加 for s in sList: if s in bidDict: if s in bidDictResult: bidDictResult[s] = int(bidDict[s]) + bidDictResult[s] else: bidDictResult[s] = int(bidDict[s]) if s not in askDictResult: askDictResult[s] = 0 else: if s in askDictResult: askDictResult[s] = int(askDict[s]) + askDictResult[s] else: askDictResult[s] = int(askDict[s]) if s not in bidDictResult: bidDictResult[s] = 0 # 构建包含bidDictResult和askDictResult的字典,并存入quotedict中 df = {'bidDictResult': bidDictResult, 'askDictResult': askDictResult} quotedict[symbol] = df return bidDictResult, askDictResult #公众号:松鼠Quant #主页:www.quant789.com #本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! #版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 def tickdata(self,df,symbol): tickdata =pd.DataFrame({'datetime':df['datetime'],'symbol':df['symbol'],'lastprice':df['lastprice'], 'volume':df['vol'],'bid_p':df['bid_p'],'bid_v':df['bid_v'],'ask_p':df['ask_p'],'ask_v':df['ask_v']}) try: if symbol in tickdatadict.keys(): rdf=tickdatadict[symbol] rdftm=pd.to_datetime(rdf['bartime'][0]).strftime('%Y-%m-%d %H:%M:%S') now=str(tickdata['datetime'][0]) if now>rdftm: try: oo=ofdatadict[symbol] self.data_of(symbol, oo) #print('oo',oo) if symbol in quotedict.keys(): quotedict.pop(symbol) if symbol in tickdatadict.keys(): tickdatadict.pop(symbol) if symbol in ofdatadict.keys(): ofdatadict.pop(symbol) except IOError as e: print('rdftm捕获到异常',e) tickdata['bartime'] = pd.to_datetime(tickdata['datetime']) tickdata['open'] = tickdata['lastprice'] tickdata['high'] = tickdata['lastprice'] tickdata['low'] = tickdata['lastprice'] tickdata['close'] = tickdata['lastprice'] tickdata['starttime'] = tickdata['datetime'] else: tickdata['bartime'] = rdf['bartime'] tickdata['open'] = rdf['open'] tickdata['high'] = max(tickdata['lastprice'].values,rdf['high'].values) tickdata['low'] = min(tickdata['lastprice'].values,rdf['low'].values) tickdata['close'] = tickdata['lastprice'] tickdata['volume']=df['vol']+rdf['volume'].values tickdata['starttime'] = rdf['starttime'] else : print('新bar的第一个tick进入') tickdata['bartime'] = pd.to_datetime(tickdata['datetime']) tickdata['open'] = tickdata['lastprice'] tickdata['high'] = tickdata['lastprice'] tickdata['low'] = tickdata['lastprice'] tickdata['close'] = tickdata['lastprice'] tickdata['starttime'] = tickdata['datetime'] except IOError as e: print('捕获到异常',e) tickdata['bartime'] = pd.to_datetime(tickdata['bartime']) param = self.param_dict[self.品种] bardata = tickdata.resample(on = 'bartime',rule = param.周期,label = 'right',closed = 'right').agg({'starttime':'first','symbol':'last','open':'first','high':'max','low':'min','close':'last','volume':'sum'}).reset_index(drop = False) bardata =bardata.dropna().reset_index(drop = True) bardata['bartime'] = pd.to_datetime(bardata['bartime'][0]).strftime('%Y-%m-%d %H:%M:%S') tickdatadict[symbol]=bardata tickdata['volume']=df['vol'].values #print(bardata['symbol'].values,bardata['bartime'].values) self.orderflow_df_new(tickdata,bardata,symbol) # time.sleep(0.5) #公众号:松鼠Quant #主页:www.quant789.com #本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! #版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 def orderflow_df_new(self,df_tick,df_min,symbol): startArray = pd.to_datetime(df_min['starttime']).values voluememin= df_min['volume'].values highs=df_min['high'].values lows=df_min['low'].values opens=df_min['open'].values closes=df_min['close'].values #endArray = pd.to_datetime(df_min['bartime']).values endArray = df_min['bartime'].values #print(endArray) deltaArray = np.zeros((len(endArray),)) tTickArray = pd.to_datetime(df_tick['datetime']).values bp1TickArray = df_tick['bid_p'].values ap1TickArray = df_tick['ask_p'].values lastTickArray = df_tick['lastprice'].values volumeTickArray = df_tick['volume'].values symbolarray = df_tick['symbol'].values indexFinal = 0 for index,tEnd in enumerate(endArray): dt=endArray[index] start = startArray[index] bidDict = {} askDict = {} bar_vol=voluememin[index] bar_close=closes[index] bar_open=opens[index] bar_low=lows[index] bar_high=highs[index] bar_symbol=symbolarray[index] # for indexTick in range(indexFinal,len(df_tick)): # if tTickArray[indexTick] >= tEnd: # break # elif (tTickArray[indexTick] >= start) & (tTickArray[indexTick] < tEnd): Bp = round(bp1TickArray[0],4) Ap = round(ap1TickArray[0],4) LastPrice = round(lastTickArray[0],4) Volume = volumeTickArray[0] if LastPrice >= Ap: if str(LastPrice) in askDict.keys(): askDict[str(LastPrice)] += Volume else: askDict[str(LastPrice)] = Volume if LastPrice <= Bp: if str(LastPrice) in bidDict.keys(): bidDict[str(LastPrice)] += Volume else: bidDict[str(LastPrice)] = Volume # indexFinal = indexTick bidDictResult,askDictResult = self.process(bidDict,askDict,symbol) bidDictResult=dict(sorted(bidDictResult.items(),key=operator.itemgetter(0))) askDictResult=dict(sorted(askDictResult.items(),key=operator.itemgetter(0))) prinslist=list(bidDictResult.keys()) asklist=list(askDictResult.values()) bidlist=list(bidDictResult.values()) delta=(sum(askDictResult.values()) - sum(bidDictResult.values())) #print(prinslist,asklist,bidlist) #print(len(prinslist),len(bidDictResult),len(askDictResult)) df=pd.DataFrame({'price':pd.Series([prinslist]),'Ask':pd.Series([asklist]),'Bid':pd.Series([bidlist])}) #df=pd.DataFrame({'price':pd.Series(bidDictResult.keys()),'Ask':pd.Series(askDictResult.values()),'Bid':pd.Series(bidDictResult.values())}) df['symbol']=bar_symbol df['datetime']=dt df['delta']=str(delta) df['close']=bar_close df['open']=bar_open df['high']=bar_high df['low']=bar_low df['volume']=bar_vol #df['ticktime']=tTickArray[0] df['dj'] = self.GetOrderFlow_dj(df) ofdatadict[symbol]=df #公众号:松鼠Quant #主页:www.quant789.com #本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! #版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 def GetOrderFlow_dj(self,kData): param = self.param_dict[self.品种] Config = { 'Value1': param.失衡, 'Value2': param.堆积, 'Value4': True, } aryData = kData djcout = 0 # 遍历kData中的每一行,计算djcout指标 for index, row in aryData.iterrows(): kItem = aryData.iloc[index] high = kItem['high'] low = kItem['low'] close = kItem['close'] open = kItem['open'] dtime = kItem['datetime'] price_s = kItem['price'] Ask_s = kItem['Ask'] Bid_s = kItem['Bid'] delta = kItem['delta'] price_s = price_s Ask_s = Ask_s Bid_s = Bid_s gj = 0 xq = 0 gxx = 0 xxx = 0 # 遍历price_s中的每一个元素,计算相关指标 for i in np.arange(0, len(price_s), 1): duiji = { 'price': 0, 'time': 0, 'longshort': 0, } if i == 0: delta = delta order= { "Price":price_s[i], "Bid":{ "Value":Bid_s[i]}, "Ask":{ "Value":Ask_s[i]} } #空头堆积 if i >= 0 and i < len(price_s) - 1: if (order["Bid"]["Value"] > Ask_s[i + 1] * int(Config['Value1'])): gxx += 1 gj += 1 if gj >= int(Config['Value2']) and Config['Value4'] == True: duiji['price'] = price_s[i] duiji['time'] = dtime duiji['longshort'] = -1 if float(duiji['price']) > 0: djcout += -1 else: gj = 0 #多头堆积 if i >= 1 and i <= len(price_s) - 1: if (order["Ask"]["Value"] > Bid_s[i - 1] * int(Config['Value1'])): xq += 1 xxx += 1 if xq >= int(Config['Value2']) and Config['Value4'] == True: duiji['price'] = price_s[i] duiji['time'] = dtime duiji['longshort'] = 1 if float(duiji['price']) > 0: djcout += 1 else: xq = 0 # 返回计算得到的djcout值 return djcout # def OnRtnTrade(self, pTrade): # print("||成交回报||", pTrade) # def OnRspOrderInsert(self, pInputOrder, pRspInfo, nRequestID, bIsLast): # print("||OnRspOrderInsert||", pInputOrder, pRspInfo, nRequestID, bIsLast) # 订单状态通知 # def OnRtnOrder(self, pOrder): # print("||订单回报||", pOrder) #保存OF数据 def save_trade_data(self,df,instrument_id): # 定义文件路径 csv_file_path = f"ofdata/{instrument_id}_ofdata.csv" directory = os.path.dirname(csv_file_path) # 检查目录是否存在,如果不存在则创建它 if not os.path.exists(directory): os.makedirs(directory) # 如果文件夹存在,则追加数据;否则,创建新文件并保存整个 DataFrame if os.path.exists(csv_file_path): # 仅保存最后一行数据 df.tail(1).to_csv(csv_file_path, mode='a', header=False, index=False) else: # 创建新文件并保存整个 DataFrame df.to_csv(csv_file_path, index=False) def w_log(self,symbol,log_message): # 创建文件夹(如果不存在) log_dir = 'tradelogs' if not os.path.exists(log_dir): os.makedirs(log_dir) # 写入日志到 CSV 文件 now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_file = os.path.join(log_dir, f'{symbol}log.csv') # 打印日志到控制台 print(f'{now}: {log_message}') with open(log_file, 'a', newline='') as csvfile: writer = csv.writer(csvfile) # 如果文件为空,写入表头 if csvfile.tell() == 0: writer.writerow(['时间', '日志内容']) writer.writerow([now, log_message]) # 计算单品种资金流向数据 def 资金流向计算(self,df): # 将 'delta' 和 'close' 转换为数值类型,强制转换错误 df['delta'] = pd.to_numeric(df['delta'], errors='coerce') df['close'] = pd.to_numeric(df['close'], errors='coerce') # 删除 'delta' 或 'close' 中的 NaN 值 df = df.dropna(subset=['delta', 'close']) symbol=df['symbol'].iloc[-1] # symbol = ''.join(filter(str.isalpha, symbol)).upper() # 资金净流向 = abs(sum(df['delta']*df['close']))*合约信息[symbol]['合约单位']*合约信息[symbol]['保证金'] hycs = int(fees_df[fees_df['合约'] == symbol]['合约乘数'].iloc[0]) bzj = (float(fees_df[fees_df['合约'] == symbol]['做多保证金率'].iloc[0]) + float(fees_df[fees_df['合约'] == symbol]['做空保证金率'].iloc[0]))/2 print("%s品种的合约乘数:%s,保证金率:%s"%(symbol,hycs,bzj)) 资金净流向 = abs(sum(df['delta']*df['close']))*hycs*bzj symbol = ''.join(filter(str.isalpha, symbol)).upper() 更新状态=True # 将结果放入 DataFrame result_df = pd.DataFrame({ '资金净流向': [资金净流向], '更新状态': [更新状态] }) return result_df # 收盘清仓 def 收盘清仓(self,symbol,data): param = self.param_dict[symbol] # 获取当前时间 current_time = datetime.now().time() # 设置清仓操作的时间范围1:14:55到15:00 clearing_time1_start = s_time(14, 55) clearing_time1_end = s_time(15, 0) # 设置清仓操作的时间范围2:00:55到01:00 clearing_time2_start = s_time(22, 55) clearing_time2_end = s_time(23, 0) clearing_time3_start = s_time(0, 55) clearing_time3_end = s_time(1, 0) clearing_time4_start = s_time(2, 25) clearing_time4_end = s_time(2, 30) now = datetime.now() # 创建一个标志变量,用于记录是否已经执行过清仓操作 #param.clearing_executed = False #jerome:增加修改合约代码识别 alpha_chars = "" numeric_chars = "" for char in symbol: if char.isalpha(): alpha_chars = char elif char.isdigit(): numeric_chars = char # 检查当前时间是否在第一个清仓操作的时间范围内,并且清仓操作未执行过 if clearing_time1_start <= current_time <= clearing_time1_end and not param.clearing_executed : #trade_dfs.drop(trade_dfs.index,inplace=True)#清除当天的行情数据 if param.pos>0: #平多 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'3') param.pos=0 self.w_log(f"{symbol}多头清仓操作") pass elif param.pos<0: #平空 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'3') param.pos=0 self.w_log(f"{symbol}空头清仓操作") pass param.clearing_executed = True # 设置标志变量为已执行 # 检查当前时间是否在第二个清仓操作的时间范围内,并且清仓操作未执行过 elif clearing_time2_start <= current_time <= clearing_time2_end and not param.clearing_executed and commodity_night_dict[alpha_chars] == s_time(23,00): #trade_dfs.drop(trade_dfs.index,inplace=True) #清除当天的行情数据 if param.pos>0: #平多 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'3') param.pos=0 self.w_log(f"{symbol}多头清仓操作") pass elif param.pos<0: #平空 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'3') param.pos=0 self.w_log(f"{symbol}空头清仓操作") pass # 检查当前时间是否在第三个清仓操作的时间范围内,并且清仓操作未执行过 elif clearing_time3_start <= current_time <= clearing_time3_end and not param.clearing_executed and commodity_night_dict[alpha_chars] == s_time(1,00): #trade_dfs.drop(trade_dfs.index,inplace=True) #清除当天的行情数据 if param.pos>0: #平多 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'3') param.pos=0 self.w_log(f"{symbol}多头清仓操作") pass elif param.pos<0: #平空 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'3') param.pos=0 self.w_log(f"{symbol}空头清仓操作") pass elif clearing_time4_start <= current_time <= clearing_time4_end and not param.clearing_executed and commodity_night_dict[alpha_chars] == s_time(2,30): #trade_dfs.drop(trade_dfs.index,inplace=True) #清除当天的行情数据 if param.pos>0: #平多 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'3') param.pos=0 self.w_log(f"{symbol}多头清仓操作") pass elif param.pos<0: #平空 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'3') param.pos=0 self.w_log(f"{symbol}空头清仓操作") pass param.clearing_executed = True # 设置标志变量为已执行 # 如果不在任何清仓操作的时间范围内,可以执行其他操作或不执行任何操作 else: param.clearing_executed = False pass #print("不在清仓操作时间范围内") return param.clearing_executed def cal_sig(self, symbol_queue): while True: try: now=datetime.now() data = symbol_queue.get(block=True, timeout=5) # 如果5秒没收到新的tick行情,则抛出异常 instrument_id = data['InstrumentID'].decode() # 品种代码 size = symbol_queue.qsize() if size > 1: print(f'当前{instrument_id}共享队列长度为{size}, 有点阻塞!!!!!') param = self.param_dict[instrument_id] self.品种=instrument_id #OF计算数据开始 self.tickcome(data) trade_df = trade_dfs[instrument_id] #清仓开关 run_kg=self.收盘清仓(instrument_id,data) 最新价=data['LastPrice'] # #多头时间出场-----------------------暂时用不到------------------------ # if param.pos>0 and param.多头开仓历时>param.平_多时间 : # #平多 # self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'1') # self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'3') # param.pos=0 # param.多头止损价格=0 # param.多头开仓历时=0 # param.多头成本价格=0 # self.w_log(f'历时{param.平_多时间}***多头***{self.品种}时间出场') #多头价格止损 if param.pos>0 and 最新价0 and len(trade_df)>1 and 最新价param.多头成本价格 : #平多 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'3') param.pos=0 param.多头止损价格=0 param.多头开仓历时=0 param.多头成本价格=0 self.w_log(instrument_id,f'多头跟踪止盈触发: {最新价} ***{instrument_id}多头止盈***') # # #空头时间出场-----------------------暂时用不到------------------------ # if param.pos<0 and param.空头开仓历时>param.平_空时间 : # #平空 # self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'1') # self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'3') # param.pos=0 # param.空头止损价格=0 # param.空头开仓历时=0 # param.空头成本价格=0 # self.w_log(f'历时{param.平_空时间}***空头***{self.品种}时间出场') #空头价格止损 if param.pos<0 and 最新价>param.空头止损价格 : #平空 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'3') param.pos=0 param.空头止损价格=0 param.空头开仓历时=0 param.空头成本价格=0 self.w_log(instrument_id,f'空头止损触发: {最新价},空头止损价格:{param.空头止损价格} ***{instrument_id}空头止损***') # #空头跟踪止盈 elif param.pos<0 and len(trade_df)>1 and 最新价>trade_df['10_MA'].iloc[-1] and 最新价param.cont_df and run_kg==False: #开仓历时更新: if param.pos>0: param.多头开仓历时+=1 elif param.pos<0: param.空头开仓历时+=1 #计算截面数据 截面数据_资金流向[instrument_id]=self.资金流向计算(trade_df) #日均线 trade_df['dayma']=trade_df['close'].mean() # 计算5日均线和10日均线,即使数据不足5日或10日 trade_df['5_MA'] = trade_df['close'].rolling(window=5, min_periods=1).mean() trade_df['10_MA'] = trade_df['close'].rolling(window=10, min_periods=1).mean() # 计算累积的delta值 trade_df['delta'] = trade_df['delta'].astype(float) trade_df['delta累计'] = trade_df['delta'].cumsum() #大于日均线 开多1=trade_df['dayma'].iloc[-1] > 0 and trade_df['close'].iloc[-1] > trade_df['dayma'].iloc[-1] #累计多空净量大于X 开多4=trade_df['delta累计'].iloc[-1] > param.sum_delta and trade_df['delta'].iloc[-1] > param.delta #小于日均线 开空1=trade_df['dayma'].iloc[-1]>0 and trade_df['close'].iloc[-1] < trade_df['dayma'].iloc[-1] #累计多空净量小于X 开空4=trade_df['delta累计'].iloc[-1] < -param.sum_delta and trade_df['delta'].iloc[-1] < -param.delta 开多组合= 开多1 and 开多4 and trade_df['dj'].iloc[-1]>param.dj_X and trade_df['5_MA'].iloc[-1] > trade_df['10_MA'].iloc[-1] 开空条件= 开空1 and 开空4 and trade_df['dj'].iloc[-1]<-param.dj_X and trade_df['5_MA'].iloc[-1] < trade_df['10_MA'].iloc[-1] #开平仓 #换仓平多 if param.pos>0 and instrument_id not in top_two_symbols: self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'3') param.pos=0 param.多头止损价格=0 param.多头开仓历时=0 param.多头成本价格=0 self.w_log(instrument_id,f'当前截面品种{top_two_symbols},{instrument_id}不在范围,***多平仓***') #多头开仓 if param.pos==0 and 开多组合 and instrument_id in top_two_symbols : #开多 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'0') param.pos=1 param.多头止损价格=trade_df['low'].iloc[-2] param.多头成本价格=data['AskPrice1'] param.多头开仓历时=1 self.w_log(instrument_id,f'品种:{instrument_id},委托价格: {param.多头成本价格}***开多***') #换仓平空 if param.pos<0 and instrument_id not in top_two_symbols : self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'1') self.insert_order(data['ExchangeID'], data['InstrumentID'], data['AskPrice1']+param.py,param.Lots,b'0',b'3') param.pos=0 param.空头止损价格=0 param.空头开仓历时=0 param.空头成本价格=0 self.w_log(instrument_id,f'当前截面品种{top_two_symbols},{instrument_id}不在范围,***空平仓***') #空头开仓 if param.pos==0 and 开空条件 and instrument_id in top_two_symbols : #开空 self.insert_order(data['ExchangeID'], data['InstrumentID'], data['BidPrice1']-param.py,param.Lots,b'1',b'0') param.pos=-1 param.空头止损价格=trade_df['high'].iloc[-2] param.空头成本价格=data['BidPrice1'] param.空头开仓历时=1 self.w_log(instrument_id,f'品种:{instrument_id},委托价格: {param.空头成本价格}***开空***') #print(trade_df) #保存OF数据 self.save_trade_data(trade_df,instrument_id) #保存bar计数 param.cont_df=len(trade_df) except queue.Empty: # print(f"当前合约队列为空,等待新数据插入。") pass # 将CTP推送的行情数据分发给对应线程队列去执行 def distribute_tick(self): while True: if self.status == 0: data = None while not self.md_queue.empty(): data = self.md_queue.get(block=False) instrument_id = data['InstrumentID'].decode() # 品种代码 try: self.queue_dict[instrument_id].put(data, block=False) # 往对应合约队列中插入行情 # print(f"{instrument_id}合约数据插入。") except queue.Full: # 当某个线程阻塞导致对应队列容量超限时抛出异常,不会影响其他合约的信号计算 print(f"{instrument_id}合约信号计算阻塞导致对应队列已满,请检查对应代码逻辑后重启。") else: time.sleep(1) def start(self, param_dict): threads = [] self.param_dict = param_dict thread1 = threading.Thread(target=横截面计算) for symbol in param_dict.keys(): trade_dfs[symbol] = pd.DataFrame({}) self.queue_dict[symbol] = queue.Queue(10) #为每个合约创建一个限制数为10的队列,当计算发生阻塞导致队列达到限制数时会抛出异常 t = threading.Thread(target=self.cal_sig, args=(self.queue_dict[symbol],)) # 为每个合约单独创建一个线程计算开仓逻辑 threads.append(t) t.start() thread1.start() self.distribute_tick() for t in threads: t.join() thread1.join() # 定义横截面计算函数 def 横截面计算(): global top_two_symbols while True: if len(截面数据_资金流向)>1: goss=1 # 检查是否有任何品种的更新状态为 False,如果有则直接返回 for symbol, data in 截面数据_资金流向.items(): if not data['更新状态'].iloc[-1]: #print(f"品种 {symbol} 的更新状态为 False,跳过计算。") goss=0 break if goss==1: # 继续计算资金净流向 sorted_items = sorted(截面数据_资金流向.items(), key=lambda item: item[1]['资金净流向'].iloc[-1], reverse=True) max_symbol = sorted_items[0][0] second_max_symbol = sorted_items[1][0] top_two_symbols = [max_symbol, second_max_symbol] print(f'最大值和第二大值对应的键: {top_two_symbols}') print(f'截面数据_资金流向: {截面数据_资金流向}') # 将所有品种的更新状态设置为 False for symbol in 截面数据_资金流向: 截面数据_资金流向[symbol]['更新状态'] = False time.sleep(1) # Jerome:tushare库版本 # def get_main_contact_on_time(main_symbol_code): # data_str = '' # alpha_chars = '' # numeric_chars = '' # main_code = '' # now = datetime.now() # if now.hour < 15: # data_str = (now - timedelta(days=1)).date().strftime('%Y%m%d') # else: # data_str = now.date().strftime('%Y%m%d') # main_symbol = pro.fut_mapping(ts_code=main_symbol_code, trade_date = data_str).loc[0,'mapping_ts_code'].split('.')[0] # exchange_id = pro.fut_mapping(ts_code=main_symbol_code, trade_date = data_str).loc[0,'mapping_ts_code'].split('.')[1] # for char in main_symbol: # if char.isalpha(): # alpha_chars += char # elif char.isdigit(): # numeric_chars += char # # print("未修改的主连代码:",main_symbol) # # print("未修改的主连代码的字母部分:",alpha_chars) # # print("未修改的主连代码的数字部分:",numeric_chars) # # print("未修改的主连代码对应的交易所:",exchange_id) # if exchange_id == 'CFX': # main_code = main_symbol # elif exchange_id == 'SHF' or exchange_id == 'DCE' or exchange_id == 'GFE' or exchange_id == 'INE': # lower_alpha_chars = str.lower(alpha_chars) # # print("上期所、大商所、广交所和能源中心主连代码修改为小写的字母部分:",lower_alpha_chars) # main_code = lower_alpha_chars + numeric_chars # elif exchange_id == 'ZCE': # true_numeric_chars = numeric_chars[1:] # # print("郑商所主连代码删减数字后数字部分:",true_numeric_chars) # main_code = alpha_chars + true_numeric_chars # print("最终使用的主连代码:",main_code) # return main_code def get_main_contact_on_time(main_symbol_code,contacts_df): main_symbol = contacts_df[contacts_df['品种代码'] == main_symbol_code]['主连代码'].iloc[0] print("最终使用的主连代码:",main_symbol) return main_symbol def run_trader(param_dict, broker_id, td_server, investor_id, password, app_id, auth_code, md_queue=None, page_dir='', private_resume_type=2, public_resume_type=2): my_trader = MyTrader(broker_id, td_server, investor_id, password, app_id, auth_code, md_queue, page_dir, private_resume_type, public_resume_type) my_trader.start(param_dict) if __name__ == '__main__': #global symbol #公众号:松鼠Quant #主页:www.quant789.com #本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!! #版权归松鼠Quant所有,禁止转发、转卖源码违者必究。 sb_1 = get_main_contact_on_time('IH', contacts_df) sb_2 = get_main_contact_on_time('ag', contacts_df) sb_3 = get_main_contact_on_time('eb', contacts_df) sb_4 = get_main_contact_on_time('si', contacts_df) sb_5 = get_main_contact_on_time('sc', contacts_df) sb_6 = get_main_contact_on_time('SA', contacts_df) # print(sb_3) # print(sb_4) # print(sb_5) # print(sb_6) #注意:运行前请先安装好algoplus, # pip install AlgoPlus #http://www.algo.plus/ctp/python/0103001.html # 实盘参数字典,需要实盘交易的合约,新建对应的参数对象即可,以下参数仅供测试使用,不作为实盘参考!!!! 历时平仓=4 param_dict = {} # param_dict['UR409'] = ParamObj(symbol='UR409', Lots=1, py=6, dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) # param_dict['SR409'] = ParamObj(symbol='SR409', Lots=1, py=6,dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) # param_dict['eb2407'] = ParamObj(symbol='eb2407', Lots=1, py=6, dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) # param_dict['SA409'] = ParamObj(symbol='SA409', Lots=1, py=6,dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) # param_dict['pp409'] = ParamObj(symbol='pp409', Lots=1, py=6, dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) # param_dict['OI409'] = ParamObj(symbol='OI409', Lots=1, py=6, dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) param_dict[sb_1] = ParamObj(symbol=sb_1, Lots=1, py=6, dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) param_dict[sb_2] = ParamObj(symbol=sb_2, Lots=1, py=6,dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) param_dict[sb_3] = ParamObj(symbol=sb_3, Lots=1, py=6, dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) param_dict[sb_4] = ParamObj(symbol=sb_4, Lots=1, py=6,dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) param_dict[sb_5] = ParamObj(symbol=sb_5, Lots=1, py=6, dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) param_dict[sb_6] = ParamObj(symbol=sb_6, Lots=1, py=6, dj_X=0,delta=500,sum_delta=2000,失衡=3,堆积=3,周期='1T',平_多时间=历时平仓,平_空时间=历时平仓) # #用simnow模拟,不要忘记屏蔽下方实盘的future_account字典 future_account = get_simulate_account( investor_id='223828', # simnow账户,注意是登录账户的ID,SIMNOW个人首页查看 password='Zj1234!@#%', # simnow密码 server_name='电信1', # 电信1、电信2、移动、TEST、N视界 subscribe_list=list(param_dict.keys()), # 合约列表 ) # #实盘用这个,不要忘记屏蔽上方simnow的future_account字典 # future_account = FutureAccount( # broker_id='', # 期货公司BrokerID # server_dict={'TDServer': "ip:port", 'MDServer': 'ip:port'}, # TDServer为交易服务器,MDServer为行情服务器。服务器地址格式为"ip:port。" # reserve_server_dict={}, # 备用服务器地址 # investor_id='', # 账户 # password='', # 密码 # app_id='simnow_client_test', # 认证使用AppID # auth_code='0000000000000000', # 认证使用授权码 # subscribe_list=list(param_dict.keys()), # 订阅合约列表 # md_flow_path='./log', # MdApi流文件存储地址,默认MD_LOCATION # td_flow_path='./log', # TraderApi流文件存储地址,默认TD_LOCATION # ) print('开始',len(future_account.subscribe_list)) # 共享队列 share_queue = Queue(maxsize=200) # 行情进程 md_process = Process(target=run_tick_engine, args=(future_account, [share_queue])) # 交易进程 trader_process = Process(target=run_trader, args=( param_dict, future_account.broker_id, future_account.server_dict['TDServer'], future_account.investor_id, future_account.password, future_account.app_id, future_account.auth_code, share_queue, # 队列 future_account.td_flow_path )) md_process.start() trader_process.start() md_process.join() trader_process.join()