""" 订单流交易策略 功能:处理Tick行情数据,计算订单流指标,生成交易信号并执行交易 """ import queue # 队列模块,用于线程间通信 import threading # 线程模块 import os # 操作系统模块 import json # JSON模块 import re # 正则表达式模块 import csv # CSV模块 import time as time_module # 标准库时间模块 from datetime import datetime, time as datetime_time # datetime时间对象 from multiprocessing import Process, Queue # 多进程模块 import pandas as pd # 数据分析库 import numpy as np # 数值计算库 import smtplib # SMTP邮件发送库 from email.mime.text import MIMEText # 邮件文本格式 from email.mime.multipart import MIMEMultipart # 邮件多部分格式 from CtpPlus.CTP.MdApi import run_tick_engine # 行情引擎 from CtpPlus.CTP.FutureAccount import get_simulate_account # 获取模拟账户 from CtpPlus.CTP.TraderApiBase import TraderApiBase # 交易API基类 # 导入配置 from config import ( SMTP_CONFIG, TRADING_PARAMS, SIMNOW_CONFIG, SYSTEM_CONFIG, NIGHT_CLEARING_TIME, FEISHU_CONFIG ) # ============ 全局数据字典 ============ class DataStore: """全局数据存储""" tickdata = {} # K线数据 {symbol: DataFrame} quote = {} # 报价数据 {symbol: {bid, ask}} ofdata = {} # 订单流数据 {symbol: DataFrame} trade_dfs = {} # 交易数据 {symbol: DataFrame} prev_volume = {} # 上个Tick成交量 {symbol: volume} last_tick = {} # 上条Tick数据 {symbol: tick} # ============ 参数配置类 ============ class TradingParam: """交易参数配置""" def __init__( self, symbol, lots, price_offset, delta_threshold, imbalance_ratio, accumulation_threshold, period, min_volume=0, merge_price=1, mini_price=0.2 ): self.symbol = symbol # 合约代码 self.lots = lots # 交易手数 self.price_offset = price_offset # 价格偏移量(下单时加减) self.delta_threshold = delta_threshold # Delta阈值(开仓条件) self.imbalance_ratio = imbalance_ratio # 失衡比率(堆积判断) self.accumulation_threshold = accumulation_threshold # 堆积阈值 self.period = period # K线周期 self.min_volume = min_volume # 最小成交量过滤 self.merge_price = merge_price # 价格档位合并数量 self.mini_price = mini_price # 最小价格变动 # 持仓状态 self.position = 0 # 当前持仓(1多头/-1空头/0无持仓) self.pending_long_price = 0 # 待执行开多价格 self.pending_short_price = 0 # 待执行开空价格 self.sl_long_price = 0 # 多头止损价 self.sl_short_price = 0 # 空头止损价 self.clearing_executed = False # 日终清算是否已执行 self.load_historical_data = True # 是否加载历史数据 self.processed_rows = 0 # 已处理的行数(用于判断新K线) @classmethod def from_config(cls, symbol, config_dict): """从配置字典创建参数""" return cls( symbol=symbol, # 合约代码 lots=config_dict["lots"], # 手数 price_offset=config_dict["price_offset"], # 价格偏移 delta_threshold=config_dict["delta_threshold"], # Delta阈值 imbalance_ratio=config_dict["imbalance_ratio"], # 失衡比率 accumulation_threshold=config_dict["accumulation_threshold"], # 堆积阈值 period=config_dict["period"], # K线周期 min_volume=config_dict.get("min_volume", 0), # 最小成交量 merge_price=config_dict.get("merge_price", 1), # 价格合并 mini_price=config_dict.get("mini_price", 0.2) # 最小价格 ) # ============ 邮件通知模块 ============ class MailNotifier: """邮件通知类""" def __init__(self): self.config = SMTP_CONFIG # 加载SMTP配置 def send(self, text, subject="TD_Simnow_Signal"): """发送邮件通知""" if not self.config["password"]: # 未配置密码则跳过 print("SMTP密码未配置,跳过邮件发送") return try: msg = MIMEMultipart() # 创建多部分邮件对象 msg["From"] = self.config["sender"] # 发件人 msg["To"] = ";".join(self.config["receivers"]) # 收件人(逗号分隔) msg["Subject"] = subject # 邮件主题 msg.attach(MIMEText(text, "plain", "utf-8")) # 添加纯文本内容 smtp = smtplib.SMTP_SSL(self.config["server"], self.config["port"]) # 连接SSL SMTP smtp.login(self.config["username"], self.config["password"]) # 登录 smtp.sendmail(self.config["sender"], self.config["receivers"], msg.as_string()) # 发送 smtp.quit() # 关闭连接 except Exception as e: print(f"邮件发送失败: {e}") # 捕获并打印异常 class FeishuNotifier: """飞书通知类""" def __init__(self): self.config = FEISHU_CONFIG # 加载飞书配置 def send(self, text): """发送飞书消息""" if not self.config.get("enabled", False): # 未启用则跳过 return try: import requests # 导入requests库 headers = {"Content-Type": "application/json"} # 请求头 data = { "msg_type": "text", # 消息类型为文本 "content": {"text": text} # 文本内容 } response = requests.post(self.config["webhook_url"], headers=headers, json=data) # POST请求 if response.status_code != 200: # 检查响应状态 print(f"飞书消息发送失败,状态码: {response.status_code}") except Exception as e: print(f"飞书消息发送失败: {e}") # ============ 交易引擎类 ============ class OrderFlowTrader(TraderApiBase): """订单流交易引擎""" def __init__(self, broker_id, td_server, investor_id, password, app_id, auth_code, md_queue=None, page_dir=""): self.param_dict = {} # 交易参数字典 {symbol: TradingParam} self.queue_dict = {} # 信号计算队列字典 {symbol: Queue} self.current_symbol = " " # 当前处理的合约代码 self.last_stops_loatime = {} # 上次加载止盈止损的时间 {symbol: timestamp} # 线程锁(保证多线程访问共享数据的安全性) self._lock = threading.RLock() # 综合锁 self._trade_locks = {} # 交易数据锁 self._prev_volume_lock = threading.RLock() # 成交量锁 self._tickdata_lock = threading.RLock() # K线数据锁 self._quote_lock = threading.RLock() # 报价数据锁 self._ofdata_lock = threading.RLock() # 订单流数据锁 self._last_tick_lock = threading.RLock() # 上个Tick锁 # 邮件通知器 self.mail = MailNotifier() # 飞书通知器 self.feishu = FeishuNotifier() # 数据存储 self._json_save_counter = {} # JSON保存计数器 self.md_queue = md_queue # 行情队列 self._tick_record_counter = {} # tick录制计数器 {symbol: count} # ============ 行情数据处理 ============ def tickcome(self, md_queue): """接收并处理Tick行情数据""" data = md_queue # 从队列获取行情数据 # === tick录制(实盘模式) === if SYSTEM_CONFIG.get("record_tick", False): instrument_id_raw = data["InstrumentID"] symbol = instrument_id_raw.decode() if isinstance(instrument_id_raw, bytes) else instrument_id_raw self._tick_record_counter[symbol] = self._tick_record_counter.get(symbol, 0) + 1 interval = SYSTEM_CONFIG.get("tick_record_interval", 1) if self._tick_record_counter[symbol] % interval == 0: self._record_tick_to_csv(data, symbol) instrument_id = data["InstrumentID"].decode() # 解码合约代码 action_day = data["ActionDay"].decode() # 解码交易日期 update_time = data["UpdateTime"].decode() # 解码更新时间 # 格式化日期为YYYY-MM-DD格式 action_day_fmt = f"{action_day[:4]}-{action_day[4:6]}-{action_day[6:]}" # 组合完整的时间戳(包含毫秒) created_at = datetime.strptime( f"{action_day_fmt} {update_time}.{data['UpdateMillisec']}", "%Y-%m-%d %H:%M:%S.%f" ) # 计算瞬时成交量(当前成交量 - 上个Tick成交量) with self._prev_volume_lock: prev_vol = DataStore.prev_volume.get(instrument_id, 0) # 获取上次成交量 curr_vol = int(data["Volume"]) # 当前总成交量 last_vol = curr_vol - prev_vol if prev_vol != 0 else 0 # 计算瞬时成交量 DataStore.prev_volume[instrument_id] = curr_vol # 更新存储的成交量 if last_vol <= 0: # 无增量则返回(避免重复处理) return # 构建Tick数据结构 tick = { "symbol": instrument_id, # 合约代码 "created_at": created_at, # 时间戳 "price": float(data["LastPrice"]), # 最新价 "last_volume": last_vol, # 瞬时成交量 "bid_p": float(data["BidPrice1"]), # 买一价 "bid_v": int(data["BidVolume1"]), # 买一量 "ask_p": float(data["AskPrice1"]), # 卖一价 "ask_v": int(data["AskVolume1"]), # 卖一量 "upper_limit": float(data["UpperLimitPrice"]), # 涨停价 "lower_limit": float(data["LowerLimitPrice"]), # 跌停价 "trading_day": data["TradingDay"].decode(), # 交易日 "cum_volume": curr_vol, # 累计成交量 "cum_amount": float(data["Turnover"]), # 累计成交额 "cum_position": int(data["OpenInterest"]), # 累计持仓量 } self.on_tick(tick) # 调用Tick处理回调 def _record_tick_to_csv(self, data: dict, symbol: str): """录制 tick 数据到 CSV""" data_dir = SYSTEM_CONFIG["data_dir"] os.makedirs(data_dir, exist_ok=True) csv_path = f"{data_dir}/{symbol}_tick.csv" # CSV 字段 fields = [ "InstrumentID", "ActionDay", "UpdateTime", "UpdateMillisec", "LastPrice", "Volume", "BidPrice1", "BidVolume1", "AskPrice1", "AskVolume1", "UpperLimitPrice", "LowerLimitPrice", "TradingDay", "Turnover", "OpenInterest" ] # 写入字段(bytes 解码) row = {} for field in fields: val = data.get(field) if isinstance(val, bytes): row[field] = val.decode() else: row[field] = val file_exists = os.path.exists(csv_path) with open(csv_path, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fields) if not file_exists: writer.writeheader() writer.writerow(row) def on_tick(self, tick): """处理单个Tick数据""" if tick["last_volume"] == 0: # 无成交量则返回 return tsymbol = tick["symbol"] # 合约代码 # 获取上条Tick的买卖盘数据进行对比 with self._last_tick_lock: prev_tick = DataStore.last_tick.get(tsymbol) # 获取上条Tick if prev_tick: # 存在则使用上条数据 bid_p = prev_tick["bid_p"] ask_p = prev_tick["ask_p"] bid_v = prev_tick["bid_v"] ask_v = prev_tick["ask_v"] else: # 不存在则使用当前数据初始化 bid_p = tick["bid_p"] ask_p = tick["ask_p"] bid_v = tick["bid_v"] ask_v = tick["ask_v"] DataStore.last_tick[tsymbol] = tick # 更新存储的上条Tick # 格式化时间字符串(毫秒精度) timetick = tick["created_at"].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 构建DataFrame格式的Tick数据 tick_dt = pd.DataFrame({ "datetime": timetick, # 时间戳字符串 "symbol": tick["symbol"], # 合约代码 "mainsym": tick["symbol"].rstrip("0123456789").upper(), # 主合约代码(去除数字) "lastprice": tick["price"], # 最新价 "vol": tick["last_volume"], # 瞬时成交量 "bid_p": bid_p, # 买一价 "ask_p": ask_p, # 卖一价 "bid_v": bid_v, # 买一量 "ask_v": ask_v, # 卖一量 }, index=[0]) self.tickdata(tick_dt, tsymbol) # 调用K线聚合处理 def data_of(self, symbol, df): """将订单流数据添加到交易DataFrame""" with self._lock: if symbol not in self._trade_locks: # 首次访问则创建锁 self._trade_locks[symbol] = threading.RLock() trade_lock = self._trade_locks[symbol] with trade_lock: existing = DataStore.trade_dfs.get(symbol) # 获取现有数据 if existing is None or existing.empty: # 无数据则直接赋值 DataStore.trade_dfs[symbol] = df else: # 检查是否有重复的 datetime,有则删除旧数据 if "datetime" in df.columns and "datetime" in existing.columns: new_dt = df["datetime"].iloc[0] if len(df) > 0 else None if new_dt is not None: # 删除已存在的相同 datetime 的旧数据 existing = existing[existing["datetime"] != new_dt] DataStore.trade_dfs[symbol] = pd.concat([existing, df], ignore_index=True) # 合并数据 def process_quote(self, bid_dict, ask_dict, symbol): """处理并合并买卖盘数据""" with self._quote_lock: dic = DataStore.quote.get(symbol) # 获取该合约的现有报价 if dic: bid_result = dic["bid_result"].copy() # 复制买盘数据 ask_result = dic["ask_result"].copy() # 复制卖盘数据 else: bid_result, ask_result = {}, {} # 无数据则初始化空字典 # 合并价格档位(买卖盘价格可能有差异) price_list = sorted(set(bid_dict.keys()) | set(ask_dict.keys())) for price in price_list: bid_val = int(bid_dict.get(price, 0)) # 获取买盘量 ask_val = int(ask_dict.get(price, 0)) # 获取卖盘量 if bid_val: # 累加买盘量 bid_result[price] = bid_result.get(price, 0) + bid_val ask_result.setdefault(price, 0) # 确保卖盘有该价格档位 if ask_val: # 累加卖盘量 ask_result[price] = ask_result.get(price, 0) + ask_val bid_result.setdefault(price, 0) # 确保买盘有该价格档位 DataStore.quote[symbol] = {"bid_result": bid_result, "ask_result": ask_result} # 更新存储 return bid_result, ask_result # 返回合并后的数据 def tickdata(self, df, symbol): """将Tick数据聚合成K线""" tickdata = pd.DataFrame({ "datetime": df["datetime"], # 时间 "symbol": df["symbol"], # 合约代码 "lastprice": df["lastprice"], # 最新价 "vol": df["vol"], # 成交量 "bid_p": df["bid_p"], # 买一价 "bid_v": df["bid_v"], # 买一量 "ask_p": df["ask_p"], # 卖一价 "ask_v": df["ask_v"], # 卖一量 }) try: with self._tickdata_lock: rdf = DataStore.tickdata.get(symbol) # 获取已存储的K线数据 if rdf is not None: # 已存在K线数据(需要累加) rdftm = pd.to_datetime(rdf["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S") # 获取K线时间 # 获取当前 tick 的 bar 时间(可能还未设置,用 datetime 作为备选) tick_bartime = tickdata.get("bartime", tickdata["datetime"])[0] now_bartime = pd.to_datetime(tick_bartime).strftime("%Y-%m-%d %H:%M:%S") if now_bartime > rdftm: # 新K线开始(时间大于上一K线) # 保存旧K线的订单流数据 with self._ofdata_lock: of = DataStore.ofdata.pop(symbol, None) if of is not None: self.data_of(symbol, of) # 写入交易DataFrame # 保存 ofdata.json(改善时间戳对齐:旧K线完成时即保存) self.save_ofdata_json(symbol) # 清空旧K线的报价数据 with self._quote_lock: DataStore.quote.pop(symbol, None) # 清空旧K线的Tick数据 with self._tickdata_lock: DataStore.tickdata.pop(symbol, None) # 初始化新K线数据 tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) # K线时间 tickdata["open"] = tickdata["lastprice"] # 开盘价 tickdata["high"] = tickdata["lastprice"] # 最高价 tickdata["low"] = tickdata["lastprice"] # 最低价 tickdata["close"] = tickdata["lastprice"] # 收盘价 tickdata["starttime"] = tickdata["datetime"] # 开始时间 else: # 同一K线内:累加数据 tickdata["bartime"] = rdf["bartime"] # 继承K线时间 tickdata["open"] = rdf["open"] # 继承开盘价 lastprice_val = float(tickdata["lastprice"].values[0]) # 当前价格 # 更新最高价(取最大值) tickdata["high"] = max(lastprice_val, float(rdf["high"].values[0]) if len(rdf["high"].values) > 0 else 0) # 更新最低价(取最小值) tickdata["low"] = min(lastprice_val, float(rdf["low"].values[0]) if len(rdf["low"].values) > 0 else 0) tickdata["close"] = tickdata["lastprice"] # 更新收盘价 tickdata["vol"] = df["vol"].values + rdf["vol"].values # 累加成交量 tickdata["starttime"] = rdf["starttime"] # 继承开始时间 else: # 首条Tick,创建新K线 print("新bar的第一个tick进入") tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) tickdata["open"] = tickdata["lastprice"] tickdata["high"] = tickdata["lastprice"] tickdata["low"] = tickdata["lastprice"] tickdata["close"] = tickdata["lastprice"] tickdata["starttime"] = tickdata["datetime"] except Exception as e: print(f"tickdata异常: {e}") if "bartime" not in tickdata.columns: # 确保bartime存在 tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) # 确保bartime存在且格式正确 if "bartime" not in tickdata.columns: tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) tickdata["bartime"] = pd.to_datetime(tickdata["bartime"]) param = self.param_dict[self.current_symbol] # 获取交易参数 # 重采样聚合K线(按指定周期合并) bardata = ( tickdata.resample(on="bartime", rule=param.period, label="right", closed="right") .agg({ "starttime": "first", # 开始时间取第一个 "symbol": "last", # 合约代码取最后一个 "open": "first", # 开盘价取第一个 "high": "max", # 最高价取最大 "low": "min", # 最低价取最小 "close": "last", # 收盘价取最后一个 "vol": "sum", # 成交量求和 }) .reset_index(drop=False) ) bardata = bardata.dropna().reset_index(drop=True) # 删除空值行 # 只保留最后一个 bar(避免同一周期产生多行) if len(bardata) > 1: bardata = bardata.iloc[[-1]] bardata["bartime"] = pd.to_datetime(bardata["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S") # 格式化时间 with self._tickdata_lock: DataStore.tickdata[symbol] = bardata # 更新存储的K线数据 # 使用单tick成交量进行后续处理 tickdata["vol"] = df["vol"].values self.orderflow_df_new(tickdata, bardata, symbol) # 生成订单流数据 def merge_price_levels(self, price_dict, merge_count, mini_price): """合并价格档位""" if not price_dict or merge_count <= 1 or mini_price <= 0: # 参数无效则返回原数据 return price_dict merged = {} sorted_prices = sorted([float(p) for p in price_dict.keys()]) # 排序价格 if not sorted_prices: # 无价格则返回空 return {} min_price = sorted_prices[0] # 最低价 merge_interval = merge_count * mini_price # 合并间隔 # 按间隔分组合并成交量 for price_str, vol in price_dict.items(): price = float(price_str) group_idx = int((price - min_price) // merge_interval) # 计算组索引 merged_price = min_price + group_idx * merge_interval # 计算合并后的价格 key = str(merged_price) merged[key] = merged.get(key, 0) + vol # 累加成交量 return merged # 返回合并后的数据 def filter_by_min_vol(self, price_dict, min_vol): """过滤最小成交量以下的价格档位""" if min_vol <= 0: # 无阈值则返回原数据 return price_dict return {k: v for k, v in price_dict.items() if v >= min_vol} # 过滤低成交量档位 def orderflow_df_new(self, df_tick, df_min, symbol): """生成订单流DataFrame""" param = self.param_dict[symbol] # 获取交易参数 min_vol = param.min_volume # 最小成交量 merge_price = param.merge_price # 价格合并数量 # 单tick成交量,用于bid/ask方向判断 bar_vol = int(df_tick["vol"].values[0]) # K线周期内成交量之和(resample聚合结果) period_vol = int(df_min["vol"].values[0]) bar_close = float(df_min["close"].values[0]) # K线收盘价 bar_open = float(df_min["open"].values[0]) # K线开盘价 bar_low = float(df_min["low"].values[0]) # K线最低价 bar_high = float(df_min["high"].values[0]) # K线最高价 dt = df_min["bartime"].values[0] # K线时间 bp1 = float(df_tick["bid_p"].values[0]) # 买一价 ap1 = float(df_tick["ask_p"].values[0]) # 卖一价 last_price = float(df_tick["lastprice"].values[0]) # 最新价 volume = bar_vol # 成交量 bar_symbol = df_tick["symbol"].values[0] # 合约代码 bid_dict = {} # 买盘字典 ask_dict = {} # 卖盘字典 # 判断价格位置并归类到买卖盘 if last_price >= ap1: # 价格>=卖一价,记录到卖盘 ask_dict[str(last_price)] = ask_dict.get(str(last_price), 0) + volume if last_price <= bp1: # 价格<=买一价,记录到买盘 bid_dict[str(last_price)] = bid_dict.get(str(last_price), 0) + volume bid_result, ask_result = self.process_quote(bid_dict, ask_dict, symbol) # 合并买卖盘数据 # 应用价格合并 if merge_price > 1: bid_result = self.merge_price_levels(bid_result, merge_price, param.mini_price) ask_result = self.merge_price_levels(ask_result, merge_price, param.mini_price) # 价格升序排序 bid_result = dict(sorted(bid_result.items(), key=lambda x: x[0])) ask_result = dict(sorted(ask_result.items(), key=lambda x: x[0])) price_list = list(bid_result.keys()) # 价格列表 ask_list = list(ask_result.values()) # 卖盘量列表 bid_list = list(bid_result.values()) # 买盘量列表 delta = sum(ask_list) - sum(bid_list) # Delta = 卖量 - 买量 # 构建订单流DataFrame df = pd.DataFrame({ "price": pd.Series([price_list]), # 价格列表 "Ask": pd.Series([ask_list]), # 卖盘量 "Bid": pd.Series([bid_list]), # 买盘量 }) df["symbol"] = bar_symbol # 合约代码 df["datetime"] = dt # 时间 df["delta"] = delta # Delta值 df["close"] = bar_close # 收盘价 df["open"] = bar_open # 开盘价 df["high"] = bar_high # 最高价 df["low"] = bar_low # 最低价 df["vol"] = period_vol # 周期成交量 # 计算堆积指标 dj_count, dj_high, dj_low = self.calculate_accumulation(df, min_vol) df["dj"] = dj_count # 堆积数量 df["dj_price_high"] = dj_high # 堆积区间高价 df["dj_price_low"] = dj_low # 堆积区间低价 with self._ofdata_lock: DataStore.ofdata[symbol] = df # 更新订单流数据 def calculate_accumulation(self, kdata, min_vol=0): """计算堆积指标(Order Flow Delta) Args: kdata: 订单流DataFrame min_vol: 最小成交量过滤 Returns: (dj_count, dj_price_high, dj_price_low) """ param = self.param_dict[self.current_symbol] # 获取交易参数 imbalance_ratio = int(param.imbalance_ratio) # 失衡比率 accumulation_threshold = int(param.accumulation_threshold) # 堆积阈值 dj_count = 0 # 堆积计数 dj_price_high = 0 # 堆积价格区间高 dj_price_low = 0 # 堆积价格区间低 bearish_zones = [] # 空头堆积区间 bullish_zones = [] # 多头堆积区间 # 遍历每个价格档位 for item in kdata.itertuples(index=False): price_s = item.price # 价格序列 ask_s = item.Ask # 卖盘序列 bid_s = item.Bid # 买盘序列 bear_count = 0 # 空头连续计数 bull_count = 0 # 多头连续计数 bear_start, bear_end = None, None # 空头区间起止 bull_start, bull_end = None, None # 多头区间起止 price_len = len(price_s) # 价格档位数量 for i in range(price_len): # 获取各档位成交量(低于最小值的视为0) bid_val = bid_s[i] if i < len(bid_s) and bid_s[i] >= min_vol else 0 ask_val = ask_s[i] if i < len(ask_s) and ask_s[i] >= min_vol else 0 next_ask = ask_s[i + 1] if i + 1 < len(ask_s) and ask_s[i + 1] >= min_vol else 0 prev_bid = bid_s[i - 1] if i > 0 and bid_s[i - 1] >= min_vol else 0 # 空头堆积: 买盘 > 卖盘 * 失衡比率 if i < price_len - 1: if bid_val > next_ask * imbalance_ratio: # 判断空头堆积 if bear_start is None: bear_start = price_s[i] # 记录空头区间起点 bear_end = price_s[i] # 更新空头区间终点 bear_count += 1 # 连续计数+1 if bear_count >= accumulation_threshold and float(price_s[i]) > 0: # 达到阈值 dj_count -= 1 # 堆积计数-1 else: # 堆积中断 if bear_count >= accumulation_threshold and bear_start is not None: bearish_zones.append((bear_start, bear_end)) # 记录空头区间 bear_count = 0 # 重置计数 bear_start, bear_end = None, None # 重置区间 # 多头堆积: 卖盘 > 买盘 * 失衡比率 if i >= 1 and i < price_len - 1: if ask_val > prev_bid * imbalance_ratio: # 判断多头堆积 if bull_start is None: bull_start = price_s[i] # 记录多头区间起点 bull_end = price_s[i] # 更新多头区间终点 bull_count += 1 # 连续计数+1 if bull_count >= accumulation_threshold and float(price_s[i]) > 0: # 达到阈值 dj_count += 1 # 堆积计数+1 else: # 堆积中断 if bull_count >= accumulation_threshold and bull_start is not None: bullish_zones.append((bull_start, bull_end)) # 记录多头区间 bull_count = 0 # 重置计数 bull_start, bull_end = None, None # 重置区间 # 处理最后堆积(未中断的区间) if bear_count >= accumulation_threshold and bear_start is not None: bearish_zones.append((bear_start, bear_end)) if bull_count >= accumulation_threshold and bull_start is not None: bullish_zones.append((bull_start, bull_end)) # 计算价格范围 all_zones = bearish_zones + bullish_zones if all_zones: all_prices = [float(p) for zone in all_zones for p in zone] dj_price_high = max(all_prices) # 取最高价 dj_price_low = min(all_prices) # 取最低价 return dj_count, dj_price_high, dj_price_low # 返回堆积结果 # ============ 数据持久化 ============ def read_from_csv(self, symbol): """从CSV读取历史交易数据""" param = self.param_dict[symbol] # 获取交易参数 data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录 os.makedirs(data_dir, exist_ok=True) # 确保目录存在 file_path = f"{data_dir}/{symbol}traderdata.csv" # 数据文件路径 if os.path.exists(file_path): # 文件存在则读取 df = pd.read_csv(file_path) if not df.empty and param.load_historical_data: # 有数据且需加载 row = df.iloc[-1] # 取最后一行 param.position = int(row["pos"]) # 恢复持仓 param.sl_long_price = float(row["sl_long_price"]) # 恢复多头止损价 param.sl_short_price = float(row["sl_short_price"]) # 恢复空头止损价 print(f"加载历史数据: {df.iloc[-1]}") param.load_historical_data = False # 标记已加载 def save_to_csv(self, symbol): """保存交易数据到CSV""" # 确保 symbol 是字符串类型 if isinstance(symbol, bytes): symbol = symbol.decode() symbol = str(symbol) # 从 param_dict 获取参数(需要确保是字符串 key) if symbol in self.param_dict: param = self.param_dict[symbol] else: # 尝试找到匹配的 key for key in self.param_dict: if str(key) == symbol: param = self.param_dict[key] break else: print(f"警告: 未找到 symbol {symbol} 的参数配置") return symbol = str(symbol) data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录 os.makedirs(data_dir, exist_ok=True) # 确保目录存在 # 构建保存数据 data = { "datetime": [DataStore.trade_dfs[symbol]["datetime"].iloc[-1]], # 最后时间 "pos": [param.position], # 持仓 "sl_long_price": [param.sl_long_price], # 多头止损价 "sl_short_price": [param.sl_short_price], # 空头止损价 } df = pd.DataFrame(data) file_path = f"{data_dir}/{symbol}traderdata.csv" # 文件路径 if os.path.exists(file_path): # 文件存在则追加 csv_df = pd.read_csv(file_path) if df["pos"].iloc[-1] != csv_df["pos"].iloc[-1]: # 持仓变化则保存 df.to_csv(file_path, mode="a", header=False, index=False) else: # 文件不存在则新建 df.to_csv(file_path, index=False) self.save_stops_to_json(symbol, DataStore.trade_dfs[symbol]) # 同时保存止盈止损JSON def save_stops_to_json(self, symbol, trade_df): """保存止盈止损数据到JSON""" # 确保 symbol 是字符串类型 if isinstance(symbol, bytes): symbol = symbol.decode() symbol = str(symbol) # 获取参数 if symbol in self.param_dict: param = self.param_dict[symbol] else: for key in self.param_dict: if str(key) == symbol: param = self.param_dict[key] break else: print(f"警告: 未找到 symbol {symbol} 的参数配置") return data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录 os.makedirs(data_dir, exist_ok=True) # 确保目录存在 file_path = f"{data_dir}/{symbol}_stops.json" # JSON文件路径 param = self.param_dict[symbol] # 获取堆积价格区间 if trade_df is not None and len(trade_df) > 0: dj_low = float(trade_df['dj_price_low'].iloc[-1]) if 'dj_price_low' in trade_df.columns else 0 dj_high = float(trade_df['dj_price_high'].iloc[-1]) if 'dj_price_high' in trade_df.columns else 0 else: dj_low, dj_high = 0, 0 # 构建止盈止损数据结构 stops_data = { "long": { # 多头止盈止损 "position": param.position if param.position > 0 else 0, # 持仓 "entry_price": dj_high if param.position > 0 else 0, # 入场价 "stop_loss": dj_low if param.position > 0 else 0 # 止损价 }, "short": { # 空头止盈止损 "position": abs(param.position) if param.position < 0 else 0, # 持仓 "entry_price": dj_high if param.position < 0 else 0, # 入场价 "stop_loss": dj_low if param.position < 0 else 0 # 止损价 } } with open(file_path, 'w', encoding='utf-8') as f: # 写入JSON文件 json.dump(stops_data, f, indent=4, ensure_ascii=False) def load_stops_from_json(self, symbol): """从JSON加载止盈止损数据""" # 确保 symbol 是字符串类型 if isinstance(symbol, bytes): symbol = symbol.decode() symbol = str(symbol) current_time = time_module.time() # 当前时间戳 interval = SYSTEM_CONFIG["stops_load_interval"] # 加载间隔 # 处理 last_stops_loatime 的 key 类型 last_stops_time = self.last_stops_loatime normalized_key = symbol # 检查是否在加载间隔内(避免频繁读取磁盘) if symbol in last_stops_time or normalized_key in [str(k) for k in last_stops_time]: # 找到匹配的 key for k in last_stops_time: if str(k) == symbol: if current_time - last_stops_time.get(k, 0) <= interval: return False # 在间隔内,跳过 break file_path = f"{SYSTEM_CONFIG['data_dir']}/{symbol}_stops.json" # JSON文件路径 if os.path.exists(file_path): # 文件存在则读取 try: with open(file_path, 'r', encoding='utf-8') as f: stops_data = json.load(f) # 获取参数 if symbol in self.param_dict: param = self.param_dict[symbol] else: for key in self.param_dict: if str(key) == symbol: param = self.param_dict[key] break else: print(f"警告: 未找到 symbol {symbol} 的参数配置") return False # 恢复多头止盈止损 if stops_data['long']['position'] > 0: param.position = stops_data['long']['position'] param.sl_long_price = stops_data['long']['entry_price'] # 恢复空头止盈止损 elif stops_data['short']['position'] > 0: param.position = -stops_data['short']['position'] param.sl_short_price = stops_data['short']['entry_price'] self.last_stops_loatime[symbol] = current_time # 记录加载时间 print(f"加载止盈止损信息: {symbol}") return True except Exception as e: print(f"加载止盈止损信息失败: {e}") return False def daily_reset(self, symbol): """每日收盘数据重置""" param = self.param_dict[symbol] # 获取交易参数 sec = "".join(re.findall("[a-zA-Z]", str(symbol))) # 提取合约字母部分(如ru) current_time = datetime.now().time() # 当前时间 clearing_time1_start = datetime_time(15, 5) # 日盘清算开始(15:05) clearing_time1_end = datetime_time(15, 10) # 日盘清算结束(15:10) param.clearing_executed = False # 重置清算标志 # 判断是否在日盘清算时间段 if clearing_time1_start <= current_time <= clearing_time1_end: param.clearing_executed = True # 标记已执行清算 DataStore.trade_dfs[symbol] = DataStore.trade_dfs[symbol].iloc[0:0] # 清空数据 # 判断是否在夜盘清算时间段 elif sec in NIGHT_CLEARING_TIME: hour, minute = NIGHT_CLEARING_TIME[sec] # 获取夜盘清算时间 clearing_time2_start = datetime_time(hour, minute) clearing_time2_end = datetime_time(hour, minute + 15) # 15分钟后结束 if clearing_time2_start <= current_time <= clearing_time2_end: param.clearing_executed = True DataStore.trade_dfs[symbol] = DataStore.trade_dfs[symbol].iloc[0:0] return param.clearing_executed # 返回是否执行了清算 # ============ CTP回调 ============ def OnRtnTrade(self, pTrade): """成交回报回调""" print("||成交回报||", pTrade) def OnRspOrderInsert(self, pInputOrder, pRspInfo, nRequestID, bIsLast): """下单响应回调""" print("||OnRspOrderInsert||", pInputOrder, pRspInfo, nRequestID, bIsLast) def OnRtnOrder(self, pOrder): """订单回报回调""" print("||订单回报||", pOrder) # ============ 交易辅助 ============ def place_order(self, exchange_id, instrument_id, price, lots, direction, offset): """统一下单方法""" self.insert_order(exchange_id, instrument_id, price, lots, direction, offset) def place_close_orders(self, exchange_id, instrument_id, price, lots, direction): """平仓(下单两次:平昨+平今)""" self.place_order(exchange_id, instrument_id, price, lots, direction, b"1") # 平昨 self.place_order(exchange_id, instrument_id, price, lots, direction, b"3") # 平今 # ============ 交易信号计算 ============ def check_stop_loss(self, trade_df, param, data): """检查并执行止损""" # 多头止损条件:有多头持仓 且 收盘价 < 多头止损价 if param.position > 0 and param.sl_long_price > 0 and \ trade_df["close"].iloc[-1] < param.sl_long_price: price = data["BidPrice1"] - param.price_offset # 计算止损价 print(f"多头止损: close={trade_df['close'].iloc[-1]}, SL={param.sl_long_price}") self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"1") # 执行平仓 param.sl_long_price = 0 # 重置止损价 param.position = 0 # 清空持仓 self.save_to_csv(data["InstrumentID"]) # 保存数据 # 发送邮件通知 text = f"SL_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ f"Price:{price}, SL:{param.sl_long_price}, Lots:{param.lots}, " \ f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \ f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}" self.mail.send(text) return True # 空头止损条件:有空头持仓 且 收盘价 > 空头止损价 if param.position < 0 and param.sl_short_price > 0 and \ trade_df["close"].iloc[-1] > param.sl_short_price: price = data["AskPrice1"] + param.price_offset print(f"空头止损: close={trade_df['close'].iloc[-1]}, SL={param.sl_short_price}") self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"0") # 执行平仓 param.sl_short_price = 0 param.position = 0 self.save_to_csv(data["InstrumentID"]) text = f"SL_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ f"Price:{price}, SL:{param.sl_short_price}, Lots:{param.lots}, " \ f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \ f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}" self.mail.send(text) return True return False # 无止损触发 def calculate_delta_cumulative(self, trade_df, param): """计算Delta累计""" if trade_df["delta"].dtype == object: # 类型为对象则转换 trade_df["delta"] = trade_df["delta"].astype(float) trade_df["datetime"] = pd.to_datetime(trade_df["datetime"]) # 转换时间格式 # 交易日判断(根据时间确定所属交易日) last_dt = pd.to_datetime(trade_df["datetime"].iloc[-1]) hour = last_dt.hour if hour >= 21: # 夜盘21点后属于下一个交易日 trading_day = (last_dt + pd.Timedelta(days=1)).date() elif hour < 15: # 15点前属于当日 trading_day = last_dt.date() else: # 15点后也属于当日 trading_day = last_dt.date() trade_df["trading_day"] = str(trading_day) # 添加交易日列 # 按交易日分组累计Delta trade_df["delta累计"] = trade_df.groupby("trading_day")["delta"].cumsum() trade_df["datetime"] = trade_df["datetime"].dt.strftime("%Y-%m-%d %H:%M:%S") # 还原时间格式 def calculate_signals(self, trade_df, param): """计算交易信号""" if len(trade_df) < 2: # 数据不足则返回 return None, None, None, None dj_last = trade_df["dj"].iloc[-1] # 最新堆积值 delta_last = trade_df["delta"].iloc[-1] # 最新Delta值 # 开多条件:堆积值>=阈值 且 Delta>=阈值 bull_signal = dj_last >= param.accumulation_threshold and \ delta_last >= param.delta_threshold # 开空条件:堆积值<=-阈值 且 Delta<=-阈值 bear_signal = dj_last <= -param.accumulation_threshold and \ delta_last <= -param.delta_threshold print(f"开多:{bull_signal}, 开空:{bear_signal}, 持仓:{param.position}") print(f"dj:{dj_last}, delta:{delta_last}") return bull_signal, bear_signal, dj_last, delta_last # 返回信号 def execute_open_long(self, trade_df, param, data): """执行开多""" param.pending_long_price = trade_df['dj_price_high'].iloc[-1] # 记录待买入价格(堆积区间高价) print(f"开多信号触发,记录待买入价格: {param.pending_long_price}") def execute_open_short(self, trade_df, param, data): """执行开空""" param.pending_short_price = trade_df['dj_price_low'].iloc[-1] # 记录待卖出价格(堆积区间低价) print(f"开空信号触发,记录待卖出价格: {param.pending_short_price}") def check_pending_orders(self, trade_df, param, data): """检查并执行待成交订单""" # 检查待开多订单(价格回落到堆积区间高价时买入) if param.position == 0 and param.pending_long_price > 0: current_price = data["AskPrice1"] # 当前卖一价 if current_price <= param.pending_long_price: # 价格触达 price = current_price + param.price_offset # 计算下单价 print(f"执行开多: 价格={price}") self.place_order(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"0", b"0") # 开多 param.position = 1 # 更新持仓 param.sl_long_price = trade_df['dj_price_low'].iloc[-1] # 设置止损价 param.pending_long_price = 0 # 清空待执行价格 self.save_to_csv(data["InstrumentID"]) text = f"O_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ f"Price:{price}, Lots:{param.lots}, " \ f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \ f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}" self.mail.send(text) # 检查待开空订单(价格反弹到堆积区间低价时卖出) if param.position == 0 and param.pending_short_price > 0: current_price = data["BidPrice1"] # 当前买一价 if current_price >= param.pending_short_price: # 价格触达 price = current_price - param.price_offset print(f"执行开空: 价格={price}") self.place_order(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"1", b"0") # 开空 param.position = -1 param.sl_short_price = trade_df['dj_price_high'].iloc[-1] # 设置止损价 param.pending_short_price = 0 self.save_to_csv(data["InstrumentID"]) text = f"O_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ f"Price:{price}, Lots:{param.lots}, " \ f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \ f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}" self.mail.send(text) def execute_close_long(self, trade_df, param, data): """平多""" price = data["BidPrice1"] - param.price_offset # 计算平多价格 print(f"平多: BidPrice1={price}") self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"1") # 平多 param.position = 0 # 清空持仓 param.sl_long_price = 0 # 重置止损价 param.pending_long_price = 0 # 重置待执行价格 self.save_to_csv(data["InstrumentID"]) text = f"C_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ f"Price:{price}, Lots:{param.lots}, " \ f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}" self.mail.send(text) def execute_close_short(self, trade_df, param, data): """平空""" price = data["AskPrice1"] + param.price_offset print(f"平空: AskPrice1={price}") self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"0") # 平空 param.position = 0 param.sl_short_price = 0 param.pending_short_price = 0 self.save_to_csv(data["InstrumentID"]) text = f"C_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ f"Price:{price}, Lots:{param.lots}, dj:{trade_df['dj'].iloc[-1]}" self.mail.send(text) def save_ofdata_json(self, symbol): """保存订单流数据到JSON(新K线生成时调用)""" data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录 os.makedirs(data_dir, exist_ok=True) # 确保目录存在 json_path = f"{data_dir}/{symbol}_ofdata.json" # JSON文件路径 trade_df = DataStore.trade_dfs.get(symbol) # 获取交易数据 if trade_df is None or trade_df.empty: return # 无数据则返回 save_df = trade_df.copy() if 'datetime' in save_df.columns: save_df['datetime'] = save_df['datetime'].astype(str) # 转换时间格式 # 去重:保留每个 datetime 的最后一条记录 save_df = save_df.drop_duplicates(subset=['datetime'], keep='last') try: save_df.to_json(json_path, orient='records', force_ascii=False, lines=True) # 写入JSON except Exception as e: print(f"JSON保存失败: {e}") def save_json_periodically(self, instrument_id, trade_df): """定期保存订单流数据到JSON(已废弃,改为新K线触发保存)""" # 此方法已废弃,保留用于兼容 pass def cal_sig(self, symbol_queue): """交易信号计算线程主循环""" while True: try: data = symbol_queue.get(block=True, timeout=SYSTEM_CONFIG["queue_timeout"]) # 从队列获取数据 instrument_id = data["InstrumentID"].decode() # 解码合约代码 # 检查队列阻塞 size = symbol_queue.qsize() if size > SYSTEM_CONFIG.get("queue_warning_size", 5): print(f"{instrument_id}队列长度={size},有点阻塞!") # 读取历史数据 self.read_from_csv(instrument_id) self.load_stops_from_json(instrument_id) self.daily_reset(instrument_id) param = self.param_dict[instrument_id] # 获取参数 self.current_symbol = instrument_id # 设置当前合约 # 处理Tick数据 self.tickcome(data) trade_df = DataStore.trade_dfs[instrument_id] # 获取交易数据 # 新K线开始时 if len(trade_df) > param.processed_rows: # 止损检查 if self.check_stop_loss(trade_df, param, data): param.processed_rows = len(trade_df) continue # 计算Delta累计(必须在保存之前) self.calculate_delta_cumulative(trade_df, param) # 计算交易信号 bull_signal, bear_signal, dj_last, delta_last = self.calculate_signals(trade_df, param) if bull_signal or bear_signal: # 平仓信号(与持仓相反) if param.position < 0 and (bear_signal or bull_signal): self.execute_close_short(trade_df, param, data) elif param.position > 0 and (bull_signal or bear_signal): self.execute_close_long(trade_df, param, data) # 开仓信号(无持仓) if bull_signal and param.position == 0: self.execute_open_long(trade_df, param, data) if bear_signal and param.position == 0: self.execute_open_short(trade_df, param, data) # 检查待成交订单 self.check_pending_orders(trade_df, param, data) param.processed_rows = len(trade_df) except queue.Empty: # 队列为空则继续循环 pass def distribute_tick(self): """行情分发线程""" while True: if self.status == 0: # 连接正常 try: data = self.md_queue.get(block=True, timeout=0.1) # 获取行情数据 instrument_id = data["InstrumentID"].decode() try: self.queue_dict[instrument_id].put(data, block=True, timeout=1.0) # 分发到各合约队列 except queue.Full: print(f"{instrument_id}合约信号计算阻塞导致队列已满") except queue.Empty: pass else: # 连接断开则等待 time_module.sleep(0.1) def start(self, param_dict): """启动交易引擎""" self.param_dict = param_dict # 保存参数 now = datetime.now() # 当前时间 current_hour = now.hour if current_hour >= 21: # 21点后属于下一日 today_date = (now + pd.Timedelta(days=1)).date() elif current_hour < 15: # 15点前属于当日 today_date = now.date() else: # 15点后属于当日 today_date = now.date() data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录 for symbol in param_dict.keys(): # 加载历史订单流数据 json_path = f"{data_dir}/{symbol}_ofdata.json" if os.path.exists(json_path): try: DataStore.trade_dfs[symbol] = pd.read_json(json_path, lines=True) if not DataStore.trade_dfs[symbol].empty: print(f"加载历史订单流: {json_path}, 共{len(DataStore.trade_dfs[symbol])}条") except Exception as e: print(f"加载失败: {e}") DataStore.trade_dfs[symbol] = pd.DataFrame({}) else: DataStore.trade_dfs[symbol] = pd.DataFrame({}) # 创建品种队列 self.queue_dict[symbol] = queue.Queue(SYSTEM_CONFIG["queue_max_size"]) # 启动信号计算线程 sig_thread = threading.Thread( target=self.cal_sig, args=(self.queue_dict[symbol],), daemon=True, # 守护线程 name=f"CalSig_{symbol}" ) sig_thread.start() # 启动行情分发线程 distribute_thread = threading.Thread(target=self.distribute_tick, daemon=True) distribute_thread.start() try: distribute_thread.join() # 等待分发线程结束 except KeyboardInterrupt: # 捕获中断信号 print("接收到中断信号,正在关闭...") raise # ============ 主程序入口 ============ def run_trader(param_dict, broker_id, td_server, investor_id, password, app_id, auth_code, md_queue=None, page_dir=""): """启动交易进程""" trader = OrderFlowTrader( # 创建交易引擎实例 broker_id, td_server, investor_id, password, app_id, auth_code, md_queue, page_dir ) trader.start(param_dict) # 启动交易引擎 if __name__ == "__main__": # 创建品种参数(自动从配置读取所有合约) param_dict = {} for symbol, config in TRADING_PARAMS.items(): param_dict[symbol] = TradingParam.from_config(symbol, config) # ===模式切换:回测模式 === if SYSTEM_CONFIG.get("mode", "live") == "backtest": from backtest import Backtester backtester = Backtester(param_dict) backtester.run() else: # === 实盘/模拟模式 === # 连接SimNow模拟交易 future_account = get_simulate_account( investor_id=SIMNOW_CONFIG["investor_id"], password=SIMNOW_CONFIG["password"], server_name=SIMNOW_CONFIG["server_name"], subscribe_list=list(param_dict.keys()), ) # 连接实盘(备用) # from CtpPlus.CTP.FutureAccount import FutureAccount # future_account = FutureAccount(...) print("开始", len(future_account.subscribe_list)) # 创建共享队列 share_queue = Queue(maxsize=SYSTEM_CONFIG["queue_share_size"]) # 启动行情进程 md_process = Process(target=run_tick_engine, args=(future_account, [share_queue])) # 启动交易进程 trader_process = Process( target=run_trader, args=( param_dict, future_account.broker_id, future_account.server_dict["TDServer"], future_account.investor_id, future_account.password, future_account.app_id, future_account.auth_code, share_queue, future_account.td_flow_path, ), ) md_process.start() # 启动行情进程 trader_process.start() # 启动交易进程 md_process.join() # 等待行情进程结束 trader_process.join() # 等待交易进程结束