From abed7371b3956c6705aac597a9d1aaf55db6f80e Mon Sep 17 00:00:00 2001 From: zhoujie Date: Thu, 28 May 2026 17:28:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=AE=A2=E5=8D=95=E6=B5=81?= =?UTF-8?q?=E7=AD=96=E7=95=A5V1.0.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 1.交易策略/4.orderflow/config.py | 143 ++++ 1.交易策略/4.orderflow/run.py | 1189 ++++++++++++++++++++++++++++++ 2 files changed, 1332 insertions(+) create mode 100644 1.交易策略/4.orderflow/config.py create mode 100644 1.交易策略/4.orderflow/run.py diff --git a/1.交易策略/4.orderflow/config.py b/1.交易策略/4.orderflow/config.py new file mode 100644 index 0000000..69a9df2 --- /dev/null +++ b/1.交易策略/4.orderflow/config.py @@ -0,0 +1,143 @@ +""" +订单流交易系统配置 +所有敏感配置和参数都在此文件中管理 +""" +import os + +# ============ SMTP邮件配置 ============ +SMTP_CONFIG = { + "server": "smtp.qq.com", + "port": 465, + "sender": "240884432@qq.com", + "receivers": ["240884432@qq.com"], + "username": "240884432@qq.com", + "password": os.getenv("SMTP_PASSWORD", "osjyjmbqrzxtbjbf"), # 从环境变量读取 +} + +# ============ 飞书通知配置 ============ +FEISHU_CONFIG = { + "enabled": True, # 是否启用飞书通知 + "webhook_url": "https://open.feishu.cn/open-apis/bot/v2/hook/8608dfa4-e599-462a-8dba-6ac72873dd27", + "receivers": ["240884432@qq.com"], # 备用接收人列表 +} + +# ============ 信号通知阈值配置 ============ +SIGNAL_THRESHOLD = { + "time_period": 30, # 平滑周期 + "delta_sum_trend": 0, # delta累计趋势(保留字段) + "delta_trend": 0, # delta趋势(保留字段) + "dj_trend": 0, # dj趋势(保留字段) + "delta_rate": 0.8, # delta信号阈值比例(0.8倍前120根最大值) + "dj_rate": 0.8, # dj信号阈值比例(0.8倍前120根极值) + "min_delta_abs": 350, # delta最小绝对值 + "min_dj_abs": 8, # dj最小绝对值 +} + +# ============ 交易参数配置 ============ +# key: 合约代码, value: 交易参数 +TRADING_PARAMS = { + "IM2606": { + "lots": 1, + "price_offset": 5, + "delta_threshold": 300, + "imbalance_ratio": 3, + "accumulation_threshold": 3, + "period": "3min", + "min_volume": 10, + "merge_price": 5, + "mini_price": 0.2, + }, + "jm2609": { + "lots": 1, + "price_offset": 1, # 黄金波动较小,价格偏移也小 + "delta_threshold": 300, + "imbalance_ratio": 3, + "accumulation_threshold": 3, + "period": "3min", + "min_volume": 5, # 黄金成交量较大,可适当调高 + "merge_price": 2, + "mini_price": 0.5, # 黄金最小变动价位 + }, +} + +# ============ SimNow模拟账户配置 ============ +SIMNOW_CONFIG = { + "investor_id": "223828", + "password": os.getenv("SIMNOW_PASSWORD", "Zj1234!@#%"), + "server_name": "电信1", # 交易服务器(电信1、电信2、移动、TEST、N视界、TEST环境) +} + +# ============ 实盘账户配置(注释备用) ============ +# LIVE_ACCOUNT_CONFIG = { +# "broker_id": "", +# "td_server": "121.37.80.177:20002", +# "md_server": "121.37.80.177:20004", +# "investor_id": "1114", +# "password": os.getenv("LIVE_PASSWORD", ""), +# "app_id": "", +# "auth_code": "", +# } + +# ============ 系统常量 ============ +SYSTEM_CONFIG = { + "queue_timeout": 5, + "queue_max_size": 10, + "queue_share_size": 200, + "queue_warning_size": 5, # 超过此数量才警告 + "json_save_interval": 10, + "json_records_limit": 20, + "stops_load_interval": 60, + "data_dir": "traderdata", +} + +# ============ 夜盘收盘时间字典 ============ +# key: 品种代码(去掉数字), value: 夜盘收盘时间 +NIGHT_CLEARING_TIME = { + "sc": (2, 30), # SC 原油 + "bc": (1, 0), # BC 碳酸锂 + "lu": (23, 0), # LU 低硫燃料油 + "nr": (23, 0), # NR 天然橡胶 + "au": (2, 30), # AU 黄金 + "ag": (2, 30), # AG 白银 + "ss": (1, 0), # SS 不锈钢 + "sn": (1, 0), # SN 锡 + "ni": (1, 0), # NI 镍 + "pb": (1, 0), # PB 铅 + "zn": (1, 0), # ZN 锌 + "al": (1, 0), # AL 铝 + "cu": (1, 0), # CU 铜 + "ru": (23, 0), # RU 橡胶 + "rb": (23, 0), # RB 螺纹钢 + "hc": (23, 0), # HC 热卷 + "fu": (23, 0), # FU 燃料油 + "bu": (23, 0), # BU 沥青 + "sp": (23, 0), # SP 纸浆 + "PF": (23, 0), # PF 短纤维 + "SR": (23, 0), # SR 白糖 + "CF": (23, 0), # CF 棉花 + "CY": (23, 0), # CY 棉纱 + "RM": (23, 0), # RM 菜籽粕 + "MA": (23, 0), # MA 甲醇 + "TA": (23, 0), # TA PTA + "ZC": (23, 0), # ZC 动力煤 + "FG": (23, 0), # FG 玻璃 + "OI": (23, 0), # OI 菜籽油 + "SA": (23, 0), # SA 纯碱 + "p": (23, 0), # P 棕榈油 + "j": (23, 0), # J 焦炭 + "jm": (23, 0), # JM 焦煤 + "i": (23, 0), # I 铁矿石 + "l": (23, 0), # L 塑料 + "v": (23, 0), # V PVC + "pp": (23, 0), # PP 聚丙烯 + "eg": (23, 0), # EG 乙二醇 + "c": (23, 0), # C 玉米 + "cs": (23, 0), # CS 玉米淀粉 + "y": (23, 0), # Y 豆油 + "m": (23, 0), # M 豆粕 + "a": (23, 0), # A 大豆 + "b": (23, 0), # B 豆二 + "rr": (23, 0), # RR 粳米 + "eb": (23, 0), # EB 苯乙烯 + "pg": (23, 0), # PG 液化石油气 +} \ No newline at end of file diff --git a/1.交易策略/4.orderflow/run.py b/1.交易策略/4.orderflow/run.py new file mode 100644 index 0000000..845cd01 --- /dev/null +++ b/1.交易策略/4.orderflow/run.py @@ -0,0 +1,1189 @@ +""" +订单流交易策略 +功能:处理Tick行情数据,计算订单流指标,生成交易信号并执行交易 +""" +import queue # 队列模块,用于线程间通信 +import threading # 线程模块 +import os # 操作系统模块 +import json # JSON模块 +import re # 正则表达式模块 +import time as time_module # 标准库时间模块 +from datetime import datetime, time as datetime_time # datetime时间对象 +from multiprocessing import Process, Queue # 多进程模块 + +import pandas as pd # 数据分析库 +import numpy as np # 数值计算库 +import smtplib # SMTP邮件发送库 +from email.mime.text import MIMEText # 邮件文本格式 +from email.mime.multipart import MIMEMultipart # 邮件多部分格式 + +from CtpPlus.CTP.MdApi import run_tick_engine # 行情引擎 +from CtpPlus.CTP.FutureAccount import get_simulate_account # 获取模拟账户 +from CtpPlus.CTP.TraderApiBase import TraderApiBase # 交易API基类 + +# 导入配置 +from config import ( + SMTP_CONFIG, TRADING_PARAMS, SIMNOW_CONFIG, SYSTEM_CONFIG, NIGHT_CLEARING_TIME, + FEISHU_CONFIG +) + +# ============ 全局数据字典 ============ +class DataStore: + """全局数据存储""" + tickdata = {} # K线数据 {symbol: DataFrame} + quote = {} # 报价数据 {symbol: {bid, ask}} + ofdata = {} # 订单流数据 {symbol: DataFrame} + trade_dfs = {} # 交易数据 {symbol: DataFrame} + prev_volume = {} # 上个Tick成交量 {symbol: volume} + last_tick = {} # 上条Tick数据 {symbol: tick} + + +# ============ 参数配置类 ============ + +class TradingParam: + """交易参数配置""" + def __init__( + self, symbol, lots, price_offset, delta_threshold, + imbalance_ratio, accumulation_threshold, period, + min_volume=0, merge_price=1, mini_price=0.2 + ): + self.symbol = symbol # 合约代码 + self.lots = lots # 交易手数 + self.price_offset = price_offset # 价格偏移量(下单时加减) + self.delta_threshold = delta_threshold # Delta阈值(开仓条件) + self.imbalance_ratio = imbalance_ratio # 失衡比率(堆积判断) + self.accumulation_threshold = accumulation_threshold # 堆积阈值 + self.period = period # K线周期 + self.min_volume = min_volume # 最小成交量过滤 + self.merge_price = merge_price # 价格档位合并数量 + self.mini_price = mini_price # 最小价格变动 + + # 持仓状态 + self.position = 0 # 当前持仓(1多头/-1空头/0无持仓) + self.pending_long_price = 0 # 待执行开多价格 + self.pending_short_price = 0 # 待执行开空价格 + self.sl_long_price = 0 # 多头止损价 + self.sl_short_price = 0 # 空头止损价 + self.clearing_executed = False # 日终清算是否已执行 + self.load_historical_data = True # 是否加载历史数据 + self.processed_rows = 0 # 已处理的行数(用于判断新K线) + + @classmethod + def from_config(cls, symbol, config_dict): + """从配置字典创建参数""" + return cls( + symbol=symbol, # 合约代码 + lots=config_dict["lots"], # 手数 + price_offset=config_dict["price_offset"], # 价格偏移 + delta_threshold=config_dict["delta_threshold"], # Delta阈值 + imbalance_ratio=config_dict["imbalance_ratio"], # 失衡比率 + accumulation_threshold=config_dict["accumulation_threshold"], # 堆积阈值 + period=config_dict["period"], # K线周期 + min_volume=config_dict.get("min_volume", 0), # 最小成交量 + merge_price=config_dict.get("merge_price", 1), # 价格合并 + mini_price=config_dict.get("mini_price", 0.2) # 最小价格 + ) + + +# ============ 邮件通知模块 ============ + +class MailNotifier: + """邮件通知类""" + def __init__(self): + self.config = SMTP_CONFIG # 加载SMTP配置 + + def send(self, text, subject="TD_Simnow_Signal"): + """发送邮件通知""" + if not self.config["password"]: # 未配置密码则跳过 + print("SMTP密码未配置,跳过邮件发送") + return + + try: + msg = MIMEMultipart() # 创建多部分邮件对象 + msg["From"] = self.config["sender"] # 发件人 + msg["To"] = ";".join(self.config["receivers"]) # 收件人(逗号分隔) + msg["Subject"] = subject # 邮件主题 + msg.attach(MIMEText(text, "plain", "utf-8")) # 添加纯文本内容 + + smtp = smtplib.SMTP_SSL(self.config["server"], self.config["port"]) # 连接SSL SMTP + smtp.login(self.config["username"], self.config["password"]) # 登录 + smtp.sendmail(self.config["sender"], self.config["receivers"], msg.as_string()) # 发送 + smtp.quit() # 关闭连接 + except Exception as e: + print(f"邮件发送失败: {e}") # 捕获并打印异常 + + +class FeishuNotifier: + """飞书通知类""" + def __init__(self): + self.config = FEISHU_CONFIG # 加载飞书配置 + + def send(self, text): + """发送飞书消息""" + if not self.config.get("enabled", False): # 未启用则跳过 + return + + try: + import requests # 导入requests库 + headers = {"Content-Type": "application/json"} # 请求头 + data = { + "msg_type": "text", # 消息类型为文本 + "content": {"text": text} # 文本内容 + } + response = requests.post(self.config["webhook_url"], headers=headers, json=data) # POST请求 + if response.status_code != 200: # 检查响应状态 + print(f"飞书消息发送失败,状态码: {response.status_code}") + except Exception as e: + print(f"飞书消息发送失败: {e}") + + +# ============ 交易引擎类 ============ + +class OrderFlowTrader(TraderApiBase): + """订单流交易引擎""" + + def __init__(self, broker_id, td_server, investor_id, password, + app_id, auth_code, md_queue=None, page_dir=""): + self.param_dict = {} # 交易参数字典 {symbol: TradingParam} + self.queue_dict = {} # 信号计算队列字典 {symbol: Queue} + self.current_symbol = " " # 当前处理的合约代码 + self.last_stops_loatime = {} # 上次加载止盈止损的时间 {symbol: timestamp} + + # 线程锁(保证多线程访问共享数据的安全性) + self._lock = threading.RLock() # 综合锁 + self._trade_locks = {} # 交易数据锁 + self._prev_volume_lock = threading.RLock() # 成交量锁 + self._tickdata_lock = threading.RLock() # K线数据锁 + self._quote_lock = threading.RLock() # 报价数据锁 + self._ofdata_lock = threading.RLock() # 订单流数据锁 + self._last_tick_lock = threading.RLock() # 上个Tick锁 + + # 邮件通知器 + self.mail = MailNotifier() + + # 飞书通知器 + self.feishu = FeishuNotifier() + + # 数据存储 + self._json_save_counter = {} # JSON保存计数器 + self.md_queue = md_queue # 行情队列 + + # ============ 行情数据处理 ============ + + def tickcome(self, md_queue): + """接收并处理Tick行情数据""" + data = md_queue # 从队列获取行情数据 + instrument_id = data["InstrumentID"].decode() # 解码合约代码 + action_day = data["ActionDay"].decode() # 解码交易日期 + update_time = data["UpdateTime"].decode() # 解码更新时间 + + # 格式化日期为YYYY-MM-DD格式 + action_day_fmt = f"{action_day[:4]}-{action_day[4:6]}-{action_day[6:]}" + + # 组合完整的时间戳(包含毫秒) + created_at = datetime.strptime( + f"{action_day_fmt} {update_time}.{data['UpdateMillisec']}", + "%Y-%m-%d %H:%M:%S.%f" + ) + + # 计算瞬时成交量(当前成交量 - 上个Tick成交量) + with self._prev_volume_lock: + prev_vol = DataStore.prev_volume.get(instrument_id, 0) # 获取上次成交量 + curr_vol = int(data["Volume"]) # 当前总成交量 + last_vol = curr_vol - prev_vol if prev_vol != 0 else 0 # 计算瞬时成交量 + DataStore.prev_volume[instrument_id] = curr_vol # 更新存储的成交量 + + if last_vol <= 0: # 无增量则返回(避免重复处理) + return + + # 构建Tick数据结构 + tick = { + "symbol": instrument_id, # 合约代码 + "created_at": created_at, # 时间戳 + "price": float(data["LastPrice"]), # 最新价 + "last_volume": last_vol, # 瞬时成交量 + "bid_p": float(data["BidPrice1"]), # 买一价 + "bid_v": int(data["BidVolume1"]), # 买一量 + "ask_p": float(data["AskPrice1"]), # 卖一价 + "ask_v": int(data["AskVolume1"]), # 卖一量 + "upper_limit": float(data["UpperLimitPrice"]), # 涨停价 + "lower_limit": float(data["LowerLimitPrice"]), # 跌停价 + "trading_day": data["TradingDay"].decode(), # 交易日 + "cum_volume": curr_vol, # 累计成交量 + "cum_amount": float(data["Turnover"]), # 累计成交额 + "cum_position": int(data["OpenInterest"]), # 累计持仓量 + } + self.on_tick(tick) # 调用Tick处理回调 + + def on_tick(self, tick): + """处理单个Tick数据""" + if tick["last_volume"] == 0: # 无成交量则返回 + return + + tsymbol = tick["symbol"] # 合约代码 + + # 获取上条Tick的买卖盘数据进行对比 + with self._last_tick_lock: + prev_tick = DataStore.last_tick.get(tsymbol) # 获取上条Tick + if prev_tick: # 存在则使用上条数据 + bid_p = prev_tick["bid_p"] + ask_p = prev_tick["ask_p"] + bid_v = prev_tick["bid_v"] + ask_v = prev_tick["ask_v"] + else: # 不存在则使用当前数据初始化 + bid_p = tick["bid_p"] + ask_p = tick["ask_p"] + bid_v = tick["bid_v"] + ask_v = tick["ask_v"] + DataStore.last_tick[tsymbol] = tick # 更新存储的上条Tick + + # 格式化时间字符串(毫秒精度) + timetick = tick["created_at"].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + # 构建DataFrame格式的Tick数据 + tick_dt = pd.DataFrame({ + "datetime": timetick, # 时间戳字符串 + "symbol": tick["symbol"], # 合约代码 + "mainsym": tick["symbol"].rstrip("0123456789").upper(), # 主合约代码(去除数字) + "lastprice": tick["price"], # 最新价 + "vol": tick["last_volume"], # 瞬时成交量 + "bid_p": bid_p, # 买一价 + "ask_p": ask_p, # 卖一价 + "bid_v": bid_v, # 买一量 + "ask_v": ask_v, # 卖一量 + }, index=[0]) + self.tickdata(tick_dt, tsymbol) # 调用K线聚合处理 + + def data_of(self, symbol, df): + """将订单流数据添加到交易DataFrame""" + with self._lock: + if symbol not in self._trade_locks: # 首次访问则创建锁 + self._trade_locks[symbol] = threading.RLock() + trade_lock = self._trade_locks[symbol] + + with trade_lock: + existing = DataStore.trade_dfs.get(symbol) # 获取现有数据 + if existing is None or existing.empty: # 无数据则直接赋值 + DataStore.trade_dfs[symbol] = df + else: + # 检查是否有重复的 datetime,有则删除旧数据 + if "datetime" in df.columns and "datetime" in existing.columns: + new_dt = df["datetime"].iloc[0] if len(df) > 0 else None + if new_dt is not None: + # 删除已存在的相同 datetime 的旧数据 + existing = existing[existing["datetime"] != new_dt] + DataStore.trade_dfs[symbol] = pd.concat([existing, df], ignore_index=True) # 合并数据 + + def process_quote(self, bid_dict, ask_dict, symbol): + """处理并合并买卖盘数据""" + with self._quote_lock: + dic = DataStore.quote.get(symbol) # 获取该合约的现有报价 + if dic: + bid_result = dic["bid_result"].copy() # 复制买盘数据 + ask_result = dic["ask_result"].copy() # 复制卖盘数据 + else: + bid_result, ask_result = {}, {} # 无数据则初始化空字典 + + # 合并价格档位(买卖盘价格可能有差异) + price_list = sorted(set(bid_dict.keys()) | set(ask_dict.keys())) + + for price in price_list: + bid_val = int(bid_dict.get(price, 0)) # 获取买盘量 + ask_val = int(ask_dict.get(price, 0)) # 获取卖盘量 + + if bid_val: # 累加买盘量 + bid_result[price] = bid_result.get(price, 0) + bid_val + ask_result.setdefault(price, 0) # 确保卖盘有该价格档位 + if ask_val: # 累加卖盘量 + ask_result[price] = ask_result.get(price, 0) + ask_val + bid_result.setdefault(price, 0) # 确保买盘有该价格档位 + + DataStore.quote[symbol] = {"bid_result": bid_result, "ask_result": ask_result} # 更新存储 + return bid_result, ask_result # 返回合并后的数据 + + def tickdata(self, df, symbol): + """将Tick数据聚合成K线""" + tickdata = pd.DataFrame({ + "datetime": df["datetime"], # 时间 + "symbol": df["symbol"], # 合约代码 + "lastprice": df["lastprice"], # 最新价 + "vol": df["vol"], # 成交量 + "bid_p": df["bid_p"], # 买一价 + "bid_v": df["bid_v"], # 买一量 + "ask_p": df["ask_p"], # 卖一价 + "ask_v": df["ask_v"], # 卖一量 + }) + + try: + with self._tickdata_lock: + rdf = DataStore.tickdata.get(symbol) # 获取已存储的K线数据 + + if rdf is not None: # 已存在K线数据(需要累加) + rdftm = pd.to_datetime(rdf["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S") # 获取K线时间 + # 获取当前 tick 的 bar 时间(可能还未设置,用 datetime 作为备选) + tick_bartime = tickdata.get("bartime", tickdata["datetime"])[0] + now_bartime = pd.to_datetime(tick_bartime).strftime("%Y-%m-%d %H:%M:%S") + + if now_bartime > rdftm: # 新K线开始(时间大于上一K线) + # 保存旧K线的订单流数据 + with self._ofdata_lock: + of = DataStore.ofdata.pop(symbol, None) + if of is not None: + self.data_of(symbol, of) # 写入交易DataFrame + + # 清空旧K线的报价数据 + with self._quote_lock: + DataStore.quote.pop(symbol, None) + # 清空旧K线的Tick数据 + with self._tickdata_lock: + DataStore.tickdata.pop(symbol, None) + + # 初始化新K线数据 + tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) # K线时间 + tickdata["open"] = tickdata["lastprice"] # 开盘价 + tickdata["high"] = tickdata["lastprice"] # 最高价 + tickdata["low"] = tickdata["lastprice"] # 最低价 + tickdata["close"] = tickdata["lastprice"] # 收盘价 + tickdata["starttime"] = tickdata["datetime"] # 开始时间 + else: # 同一K线内:累加数据 + tickdata["bartime"] = rdf["bartime"] # 继承K线时间 + tickdata["open"] = rdf["open"] # 继承开盘价 + lastprice_val = float(tickdata["lastprice"].values[0]) # 当前价格 + # 更新最高价(取最大值) + tickdata["high"] = max(lastprice_val, float(rdf["high"].values[0]) if len(rdf["high"].values) > 0 else 0) + # 更新最低价(取最小值) + tickdata["low"] = min(lastprice_val, float(rdf["low"].values[0]) if len(rdf["low"].values) > 0 else 0) + tickdata["close"] = tickdata["lastprice"] # 更新收盘价 + tickdata["vol"] = df["vol"].values + rdf["vol"].values # 累加成交量 + tickdata["starttime"] = rdf["starttime"] # 继承开始时间 + else: # 首条Tick,创建新K线 + print("新bar的第一个tick进入") + tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) + tickdata["open"] = tickdata["lastprice"] + tickdata["high"] = tickdata["lastprice"] + tickdata["low"] = tickdata["lastprice"] + tickdata["close"] = tickdata["lastprice"] + tickdata["starttime"] = tickdata["datetime"] + except Exception as e: + print(f"tickdata异常: {e}") + if "bartime" not in tickdata.columns: # 确保bartime存在 + tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) + + # 确保bartime存在且格式正确 + if "bartime" not in tickdata.columns: + tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) + tickdata["bartime"] = pd.to_datetime(tickdata["bartime"]) + + param = self.param_dict[self.current_symbol] # 获取交易参数 + + # 重采样聚合K线(按指定周期合并) + bardata = ( + tickdata.resample(on="bartime", rule=param.period, label="right", closed="right") + .agg({ + "starttime": "first", # 开始时间取第一个 + "symbol": "last", # 合约代码取最后一个 + "open": "first", # 开盘价取第一个 + "high": "max", # 最高价取最大 + "low": "min", # 最低价取最小 + "close": "last", # 收盘价取最后一个 + "vol": "sum", # 成交量求和 + }) + .reset_index(drop=False) + ) + + bardata = bardata.dropna().reset_index(drop=True) # 删除空值行 + + # 只保留最后一个 bar(避免同一周期产生多行) + if len(bardata) > 1: + bardata = bardata.iloc[[-1]] + + bardata["bartime"] = pd.to_datetime(bardata["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S") # 格式化时间 + + with self._tickdata_lock: + DataStore.tickdata[symbol] = bardata # 更新存储的K线数据 + + # 使用单tick成交量进行后续处理 + tickdata["vol"] = df["vol"].values + self.orderflow_df_new(tickdata, bardata, symbol) # 生成订单流数据 + + def merge_price_levels(self, price_dict, merge_count, mini_price): + """合并价格档位""" + if not price_dict or merge_count <= 1 or mini_price <= 0: # 参数无效则返回原数据 + return price_dict + + merged = {} + sorted_prices = sorted([float(p) for p in price_dict.keys()]) # 排序价格 + + if not sorted_prices: # 无价格则返回空 + return {} + + min_price = sorted_prices[0] # 最低价 + merge_interval = merge_count * mini_price # 合并间隔 + + # 按间隔分组合并成交量 + for price_str, vol in price_dict.items(): + price = float(price_str) + group_idx = int((price - min_price) // merge_interval) # 计算组索引 + merged_price = min_price + group_idx * merge_interval # 计算合并后的价格 + key = str(merged_price) + merged[key] = merged.get(key, 0) + vol # 累加成交量 + + return merged # 返回合并后的数据 + + def filter_by_min_vol(self, price_dict, min_vol): + """过滤最小成交量以下的价格档位""" + if min_vol <= 0: # 无阈值则返回原数据 + return price_dict + return {k: v for k, v in price_dict.items() if v >= min_vol} # 过滤低成交量档位 + + def orderflow_df_new(self, df_tick, df_min, symbol): + """生成订单流DataFrame""" + param = self.param_dict[symbol] # 获取交易参数 + min_vol = param.min_volume # 最小成交量 + merge_price = param.merge_price # 价格合并数量 + + # 单tick成交量,用于bid/ask方向判断 + bar_vol = int(df_tick["vol"].values[0]) + # K线周期内成交量之和(resample聚合结果) + period_vol = int(df_min["vol"].values[0]) + bar_close = float(df_min["close"].values[0]) # K线收盘价 + bar_open = float(df_min["open"].values[0]) # K线开盘价 + bar_low = float(df_min["low"].values[0]) # K线最低价 + bar_high = float(df_min["high"].values[0]) # K线最高价 + dt = df_min["bartime"].values[0] # K线时间 + + bp1 = float(df_tick["bid_p"].values[0]) # 买一价 + ap1 = float(df_tick["ask_p"].values[0]) # 卖一价 + last_price = float(df_tick["lastprice"].values[0]) # 最新价 + volume = bar_vol # 成交量 + bar_symbol = df_tick["symbol"].values[0] # 合约代码 + + bid_dict = {} # 买盘字典 + ask_dict = {} # 卖盘字典 + + # 判断价格位置并归类到买卖盘 + if last_price >= ap1: # 价格>=卖一价,记录到卖盘 + ask_dict[str(last_price)] = ask_dict.get(str(last_price), 0) + volume + if last_price <= bp1: # 价格<=买一价,记录到买盘 + bid_dict[str(last_price)] = bid_dict.get(str(last_price), 0) + volume + + bid_result, ask_result = self.process_quote(bid_dict, ask_dict, symbol) # 合并买卖盘数据 + + # 应用价格合并 + if merge_price > 1: + bid_result = self.merge_price_levels(bid_result, merge_price, param.mini_price) + ask_result = self.merge_price_levels(ask_result, merge_price, param.mini_price) + + # 价格升序排序 + bid_result = dict(sorted(bid_result.items(), key=lambda x: x[0])) + ask_result = dict(sorted(ask_result.items(), key=lambda x: x[0])) + + price_list = list(bid_result.keys()) # 价格列表 + ask_list = list(ask_result.values()) # 卖盘量列表 + bid_list = list(bid_result.values()) # 买盘量列表 + + delta = sum(ask_list) - sum(bid_list) # Delta = 卖量 - 买量 + + # 构建订单流DataFrame + df = pd.DataFrame({ + "price": pd.Series([price_list]), # 价格列表 + "Ask": pd.Series([ask_list]), # 卖盘量 + "Bid": pd.Series([bid_list]), # 买盘量 + }) + df["symbol"] = bar_symbol # 合约代码 + df["datetime"] = dt # 时间 + df["delta"] = delta # Delta值 + df["close"] = bar_close # 收盘价 + df["open"] = bar_open # 开盘价 + df["high"] = bar_high # 最高价 + df["low"] = bar_low # 最低价 + df["vol"] = period_vol # 周期成交量 + + # 计算堆积指标 + dj_count, dj_high, dj_low = self.calculate_accumulation(df, min_vol) + df["dj"] = dj_count # 堆积数量 + df["dj_price_high"] = dj_high # 堆积区间高价 + df["dj_price_low"] = dj_low # 堆积区间低价 + + with self._ofdata_lock: + DataStore.ofdata[symbol] = df # 更新订单流数据 + + def calculate_accumulation(self, kdata, min_vol=0): + """计算堆积指标(Order Flow Delta) + + Args: + kdata: 订单流DataFrame + min_vol: 最小成交量过滤 + + Returns: + (dj_count, dj_price_high, dj_price_low) + """ + param = self.param_dict[self.current_symbol] # 获取交易参数 + imbalance_ratio = int(param.imbalance_ratio) # 失衡比率 + accumulation_threshold = int(param.accumulation_threshold) # 堆积阈值 + + dj_count = 0 # 堆积计数 + dj_price_high = 0 # 堆积价格区间高 + dj_price_low = 0 # 堆积价格区间低 + bearish_zones = [] # 空头堆积区间 + bullish_zones = [] # 多头堆积区间 + + # 遍历每个价格档位 + for item in kdata.itertuples(index=False): + price_s = item.price # 价格序列 + ask_s = item.Ask # 卖盘序列 + bid_s = item.Bid # 买盘序列 + + bear_count = 0 # 空头连续计数 + bull_count = 0 # 多头连续计数 + bear_start, bear_end = None, None # 空头区间起止 + bull_start, bull_end = None, None # 多头区间起止 + + price_len = len(price_s) # 价格档位数量 + + for i in range(price_len): + # 获取各档位成交量(低于最小值的视为0) + bid_val = bid_s[i] if i < len(bid_s) and bid_s[i] >= min_vol else 0 + ask_val = ask_s[i] if i < len(ask_s) and ask_s[i] >= min_vol else 0 + next_ask = ask_s[i + 1] if i + 1 < len(ask_s) and ask_s[i + 1] >= min_vol else 0 + prev_bid = bid_s[i - 1] if i > 0 and bid_s[i - 1] >= min_vol else 0 + + # 空头堆积: 买盘 > 卖盘 * 失衡比率 + if i < price_len - 1: + if bid_val > next_ask * imbalance_ratio: # 判断空头堆积 + if bear_start is None: + bear_start = price_s[i] # 记录空头区间起点 + bear_end = price_s[i] # 更新空头区间终点 + bear_count += 1 # 连续计数+1 + if bear_count >= accumulation_threshold and float(price_s[i]) > 0: # 达到阈值 + dj_count -= 1 # 堆积计数-1 + else: # 堆积中断 + if bear_count >= accumulation_threshold and bear_start is not None: + bearish_zones.append((bear_start, bear_end)) # 记录空头区间 + bear_count = 0 # 重置计数 + bear_start, bear_end = None, None # 重置区间 + + # 多头堆积: 卖盘 > 买盘 * 失衡比率 + if i >= 1 and i < price_len - 1: + if ask_val > prev_bid * imbalance_ratio: # 判断多头堆积 + if bull_start is None: + bull_start = price_s[i] # 记录多头区间起点 + bull_end = price_s[i] # 更新多头区间终点 + bull_count += 1 # 连续计数+1 + if bull_count >= accumulation_threshold and float(price_s[i]) > 0: # 达到阈值 + dj_count += 1 # 堆积计数+1 + else: # 堆积中断 + if bull_count >= accumulation_threshold and bull_start is not None: + bullish_zones.append((bull_start, bull_end)) # 记录多头区间 + bull_count = 0 # 重置计数 + bull_start, bull_end = None, None # 重置区间 + + # 处理最后堆积(未中断的区间) + if bear_count >= accumulation_threshold and bear_start is not None: + bearish_zones.append((bear_start, bear_end)) + if bull_count >= accumulation_threshold and bull_start is not None: + bullish_zones.append((bull_start, bull_end)) + + # 计算价格范围 + all_zones = bearish_zones + bullish_zones + if all_zones: + all_prices = [float(p) for zone in all_zones for p in zone] + dj_price_high = max(all_prices) # 取最高价 + dj_price_low = min(all_prices) # 取最低价 + + return dj_count, dj_price_high, dj_price_low # 返回堆积结果 + + # ============ 数据持久化 ============ + + def read_from_csv(self, symbol): + """从CSV读取历史交易数据""" + param = self.param_dict[symbol] # 获取交易参数 + data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录 + os.makedirs(data_dir, exist_ok=True) # 确保目录存在 + file_path = f"{data_dir}/{symbol}traderdata.csv" # 数据文件路径 + + if os.path.exists(file_path): # 文件存在则读取 + df = pd.read_csv(file_path) + if not df.empty and param.load_historical_data: # 有数据且需加载 + row = df.iloc[-1] # 取最后一行 + param.position = int(row["pos"]) # 恢复持仓 + param.sl_long_price = float(row["sl_long_price"]) # 恢复多头止损价 + param.sl_short_price = float(row["sl_short_price"]) # 恢复空头止损价 + print(f"加载历史数据: {df.iloc[-1]}") + param.load_historical_data = False # 标记已加载 + + def save_to_csv(self, symbol): + """保存交易数据到CSV""" + # 确保 symbol 是字符串类型 + if isinstance(symbol, bytes): + symbol = symbol.decode() + symbol = str(symbol) + + # 从 param_dict 获取参数(需要确保是字符串 key) + if symbol in self.param_dict: + param = self.param_dict[symbol] + else: + # 尝试找到匹配的 key + for key in self.param_dict: + if str(key) == symbol: + param = self.param_dict[key] + break + else: + print(f"警告: 未找到 symbol {symbol} 的参数配置") + return + symbol = str(symbol) + data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录 + os.makedirs(data_dir, exist_ok=True) # 确保目录存在 + + # 构建保存数据 + data = { + "datetime": [DataStore.trade_dfs[symbol]["datetime"].iloc[-1]], # 最后时间 + "pos": [param.position], # 持仓 + "sl_long_price": [param.sl_long_price], # 多头止损价 + "sl_short_price": [param.sl_short_price], # 空头止损价 + } + + df = pd.DataFrame(data) + file_path = f"{data_dir}/{symbol}traderdata.csv" # 文件路径 + + if os.path.exists(file_path): # 文件存在则追加 + csv_df = pd.read_csv(file_path) + if df["pos"].iloc[-1] != csv_df["pos"].iloc[-1]: # 持仓变化则保存 + df.to_csv(file_path, mode="a", header=False, index=False) + else: # 文件不存在则新建 + df.to_csv(file_path, index=False) + + self.save_stops_to_json(symbol, DataStore.trade_dfs[symbol]) # 同时保存止盈止损JSON + + def save_stops_to_json(self, symbol, trade_df): + """保存止盈止损数据到JSON""" + # 确保 symbol 是字符串类型 + if isinstance(symbol, bytes): + symbol = symbol.decode() + symbol = str(symbol) + + # 获取参数 + if symbol in self.param_dict: + param = self.param_dict[symbol] + else: + for key in self.param_dict: + if str(key) == symbol: + param = self.param_dict[key] + break + else: + print(f"警告: 未找到 symbol {symbol} 的参数配置") + return + + data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录 + os.makedirs(data_dir, exist_ok=True) # 确保目录存在 + file_path = f"{data_dir}/{symbol}_stops.json" # JSON文件路径 + + param = self.param_dict[symbol] + + # 获取堆积价格区间 + if trade_df is not None and len(trade_df) > 0: + dj_low = float(trade_df['dj_price_low'].iloc[-1]) if 'dj_price_low' in trade_df.columns else 0 + dj_high = float(trade_df['dj_price_high'].iloc[-1]) if 'dj_price_high' in trade_df.columns else 0 + else: + dj_low, dj_high = 0, 0 + + # 构建止盈止损数据结构 + stops_data = { + "long": { # 多头止盈止损 + "position": param.position if param.position > 0 else 0, # 持仓 + "entry_price": dj_high if param.position > 0 else 0, # 入场价 + "stop_loss": dj_low if param.position > 0 else 0 # 止损价 + }, + "short": { # 空头止盈止损 + "position": abs(param.position) if param.position < 0 else 0, # 持仓 + "entry_price": dj_high if param.position < 0 else 0, # 入场价 + "stop_loss": dj_low if param.position < 0 else 0 # 止损价 + } + } + + with open(file_path, 'w', encoding='utf-8') as f: # 写入JSON文件 + json.dump(stops_data, f, indent=4, ensure_ascii=False) + + def load_stops_from_json(self, symbol): + """从JSON加载止盈止损数据""" + # 确保 symbol 是字符串类型 + if isinstance(symbol, bytes): + symbol = symbol.decode() + symbol = str(symbol) + + current_time = time_module.time() # 当前时间戳 + interval = SYSTEM_CONFIG["stops_load_interval"] # 加载间隔 + + # 处理 last_stops_loatime 的 key 类型 + last_stops_time = self.last_stops_loatime + normalized_key = symbol + + # 检查是否在加载间隔内(避免频繁读取磁盘) + if symbol in last_stops_time or normalized_key in [str(k) for k in last_stops_time]: + # 找到匹配的 key + for k in last_stops_time: + if str(k) == symbol: + if current_time - last_stops_time.get(k, 0) <= interval: + return False # 在间隔内,跳过 + break + + file_path = f"{SYSTEM_CONFIG['data_dir']}/{symbol}_stops.json" # JSON文件路径 + + if os.path.exists(file_path): # 文件存在则读取 + try: + with open(file_path, 'r', encoding='utf-8') as f: + stops_data = json.load(f) + + # 获取参数 + if symbol in self.param_dict: + param = self.param_dict[symbol] + else: + for key in self.param_dict: + if str(key) == symbol: + param = self.param_dict[key] + break + else: + print(f"警告: 未找到 symbol {symbol} 的参数配置") + return False + + # 恢复多头止盈止损 + if stops_data['long']['position'] > 0: + param.position = stops_data['long']['position'] + param.sl_long_price = stops_data['long']['entry_price'] + # 恢复空头止盈止损 + elif stops_data['short']['position'] > 0: + param.position = -stops_data['short']['position'] + param.sl_short_price = stops_data['short']['entry_price'] + + self.last_stops_loatime[symbol] = current_time # 记录加载时间 + print(f"加载止盈止损信息: {symbol}") + return True + except Exception as e: + print(f"加载止盈止损信息失败: {e}") + + return False + + def daily_reset(self, symbol): + """每日收盘数据重置""" + param = self.param_dict[symbol] # 获取交易参数 + sec = "".join(re.findall("[a-zA-Z]", str(symbol))) # 提取合约字母部分(如ru) + current_time = datetime.now().time() # 当前时间 + + clearing_time1_start = datetime_time(15, 5) # 日盘清算开始(15:05) + clearing_time1_end = datetime_time(15, 10) # 日盘清算结束(15:10) + + param.clearing_executed = False # 重置清算标志 + + # 判断是否在日盘清算时间段 + if clearing_time1_start <= current_time <= clearing_time1_end: + param.clearing_executed = True # 标记已执行清算 + DataStore.trade_dfs[symbol] = DataStore.trade_dfs[symbol].iloc[0:0] # 清空数据 + # 判断是否在夜盘清算时间段 + elif sec in NIGHT_CLEARING_TIME: + hour, minute = NIGHT_CLEARING_TIME[sec] # 获取夜盘清算时间 + clearing_time2_start = datetime_time(hour, minute) + clearing_time2_end = datetime_time(hour, minute + 15) # 15分钟后结束 + if clearing_time2_start <= current_time <= clearing_time2_end: + param.clearing_executed = True + DataStore.trade_dfs[symbol] = DataStore.trade_dfs[symbol].iloc[0:0] + + return param.clearing_executed # 返回是否执行了清算 + + # ============ CTP回调 ============ + + def OnRtnTrade(self, pTrade): + """成交回报回调""" + print("||成交回报||", pTrade) + + def OnRspOrderInsert(self, pInputOrder, pRspInfo, nRequestID, bIsLast): + """下单响应回调""" + print("||OnRspOrderInsert||", pInputOrder, pRspInfo, nRequestID, bIsLast) + + def OnRtnOrder(self, pOrder): + """订单回报回调""" + print("||订单回报||", pOrder) + + # ============ 交易辅助 ============ + + def place_order(self, exchange_id, instrument_id, price, lots, direction, offset): + """统一下单方法""" + self.insert_order(exchange_id, instrument_id, price, lots, direction, offset) + + def place_close_orders(self, exchange_id, instrument_id, price, lots, direction): + """平仓(下单两次:平昨+平今)""" + self.place_order(exchange_id, instrument_id, price, lots, direction, b"1") # 平昨 + self.place_order(exchange_id, instrument_id, price, lots, direction, b"3") # 平今 + + # ============ 交易信号计算 ============ + + def check_stop_loss(self, trade_df, param, data): + """检查并执行止损""" + # 多头止损条件:有多头持仓 且 收盘价 < 多头止损价 + if param.position > 0 and param.sl_long_price > 0 and \ + trade_df["close"].iloc[-1] < param.sl_long_price: + price = data["BidPrice1"] - param.price_offset # 计算止损价 + print(f"多头止损: close={trade_df['close'].iloc[-1]}, SL={param.sl_long_price}") + self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"1") # 执行平仓 + param.sl_long_price = 0 # 重置止损价 + param.position = 0 # 清空持仓 + self.save_to_csv(data["InstrumentID"]) # 保存数据 + # 发送邮件通知 + text = f"SL_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ + f"Price:{price}, SL:{param.sl_long_price}, Lots:{param.lots}, " \ + f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \ + f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}" + self.mail.send(text) + return True + + # 空头止损条件:有空头持仓 且 收盘价 > 空头止损价 + if param.position < 0 and param.sl_short_price > 0 and \ + trade_df["close"].iloc[-1] > param.sl_short_price: + price = data["AskPrice1"] + param.price_offset + print(f"空头止损: close={trade_df['close'].iloc[-1]}, SL={param.sl_short_price}") + self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"0") # 执行平仓 + param.sl_short_price = 0 + param.position = 0 + self.save_to_csv(data["InstrumentID"]) + text = f"SL_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ + f"Price:{price}, SL:{param.sl_short_price}, Lots:{param.lots}, " \ + f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \ + f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}" + self.mail.send(text) + return True + + return False # 无止损触发 + + def calculate_delta_cumulative(self, trade_df, param): + """计算Delta累计""" + if trade_df["delta"].dtype == object: # 类型为对象则转换 + trade_df["delta"] = trade_df["delta"].astype(float) + + trade_df["datetime"] = pd.to_datetime(trade_df["datetime"]) # 转换时间格式 + + # 交易日判断(根据时间确定所属交易日) + last_dt = pd.to_datetime(trade_df["datetime"].iloc[-1]) + hour = last_dt.hour + if hour >= 21: # 夜盘21点后属于下一个交易日 + trading_day = (last_dt + pd.Timedelta(days=1)).date() + elif hour < 15: # 15点前属于当日 + trading_day = last_dt.date() + else: # 15点后也属于当日 + trading_day = last_dt.date() + + trade_df["trading_day"] = str(trading_day) # 添加交易日列 + + # 按交易日分组累计Delta + trade_df["delta累计"] = trade_df.groupby("trading_day")["delta"].cumsum() + + trade_df["datetime"] = trade_df["datetime"].dt.strftime("%Y-%m-%d %H:%M:%S") # 还原时间格式 + + def calculate_signals(self, trade_df, param): + """计算交易信号""" + if len(trade_df) < 2: # 数据不足则返回 + return None, None, None, None + + dj_last = trade_df["dj"].iloc[-1] # 最新堆积值 + delta_last = trade_df["delta"].iloc[-1] # 最新Delta值 + + # 开多条件:堆积值>=阈值 且 Delta>=阈值 + bull_signal = dj_last >= param.accumulation_threshold and \ + delta_last >= param.delta_threshold + + # 开空条件:堆积值<=-阈值 且 Delta<=-阈值 + bear_signal = dj_last <= -param.accumulation_threshold and \ + delta_last <= -param.delta_threshold + + print(f"开多:{bull_signal}, 开空:{bear_signal}, 持仓:{param.position}") + print(f"dj:{dj_last}, delta:{delta_last}") + + return bull_signal, bear_signal, dj_last, delta_last # 返回信号 + + def execute_open_long(self, trade_df, param, data): + """执行开多""" + param.pending_long_price = trade_df['dj_price_high'].iloc[-1] # 记录待买入价格(堆积区间高价) + print(f"开多信号触发,记录待买入价格: {param.pending_long_price}") + + def execute_open_short(self, trade_df, param, data): + """执行开空""" + param.pending_short_price = trade_df['dj_price_low'].iloc[-1] # 记录待卖出价格(堆积区间低价) + print(f"开空信号触发,记录待卖出价格: {param.pending_short_price}") + + def check_pending_orders(self, trade_df, param, data): + """检查并执行待成交订单""" + # 检查待开多订单(价格回落到堆积区间高价时买入) + if param.position == 0 and param.pending_long_price > 0: + current_price = data["AskPrice1"] # 当前卖一价 + if current_price <= param.pending_long_price: # 价格触达 + price = current_price + param.price_offset # 计算下单价 + print(f"执行开多: 价格={price}") + self.place_order(data["ExchangeID"], data["InstrumentID"], + price, param.lots, b"0", b"0") # 开多 + param.position = 1 # 更新持仓 + param.sl_long_price = trade_df['dj_price_low'].iloc[-1] # 设置止损价 + param.pending_long_price = 0 # 清空待执行价格 + self.save_to_csv(data["InstrumentID"]) + text = f"O_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ + f"Price:{price}, Lots:{param.lots}, " \ + f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \ + f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}" + self.mail.send(text) + + # 检查待开空订单(价格反弹到堆积区间低价时卖出) + if param.position == 0 and param.pending_short_price > 0: + current_price = data["BidPrice1"] # 当前买一价 + if current_price >= param.pending_short_price: # 价格触达 + price = current_price - param.price_offset + print(f"执行开空: 价格={price}") + self.place_order(data["ExchangeID"], data["InstrumentID"], + price, param.lots, b"1", b"0") # 开空 + param.position = -1 + param.sl_short_price = trade_df['dj_price_high'].iloc[-1] # 设置止损价 + param.pending_short_price = 0 + self.save_to_csv(data["InstrumentID"]) + text = f"O_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ + f"Price:{price}, Lots:{param.lots}, " \ + f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \ + f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}" + self.mail.send(text) + + def execute_close_long(self, trade_df, param, data): + """平多""" + price = data["BidPrice1"] - param.price_offset # 计算平多价格 + print(f"平多: BidPrice1={price}") + self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"1") # 平多 + param.position = 0 # 清空持仓 + param.sl_long_price = 0 # 重置止损价 + param.pending_long_price = 0 # 重置待执行价格 + self.save_to_csv(data["InstrumentID"]) + text = f"C_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ + f"Price:{price}, Lots:{param.lots}, " \ + f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}" + self.mail.send(text) + + def execute_close_short(self, trade_df, param, data): + """平空""" + price = data["AskPrice1"] + param.price_offset + print(f"平空: AskPrice1={price}") + self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"0") # 平空 + param.position = 0 + param.sl_short_price = 0 + param.pending_short_price = 0 + self.save_to_csv(data["InstrumentID"]) + text = f"C_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \ + f"Price:{price}, Lots:{param.lots}, dj:{trade_df['dj'].iloc[-1]}" + self.mail.send(text) + + def save_ofdata_json(self, symbol): + """保存订单流数据到JSON(新K线生成时调用)""" + data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录 + os.makedirs(data_dir, exist_ok=True) # 确保目录存在 + json_path = f"{data_dir}/{symbol}_ofdata.json" # JSON文件路径 + + trade_df = DataStore.trade_dfs.get(symbol) # 获取交易数据 + if trade_df is None or trade_df.empty: + return # 无数据则返回 + + save_df = trade_df.copy() + if 'datetime' in save_df.columns: + save_df['datetime'] = save_df['datetime'].astype(str) # 转换时间格式 + # 去重:保留每个 datetime 的最后一条记录 + save_df = save_df.drop_duplicates(subset=['datetime'], keep='last') + + try: + save_df.to_json(json_path, orient='records', force_ascii=False, lines=True) # 写入JSON + except Exception as e: + print(f"JSON保存失败: {e}") + + def save_json_periodically(self, instrument_id, trade_df): + """定期保存订单流数据到JSON(已废弃,改为新K线触发保存)""" + # 此方法已废弃,保留用于兼容 + pass + + def cal_sig(self, symbol_queue): + """交易信号计算线程主循环""" + while True: + try: + data = symbol_queue.get(block=True, timeout=SYSTEM_CONFIG["queue_timeout"]) # 从队列获取数据 + instrument_id = data["InstrumentID"].decode() # 解码合约代码 + + # 检查队列阻塞 + size = symbol_queue.qsize() + if size > SYSTEM_CONFIG.get("queue_warning_size", 5): + print(f"{instrument_id}队列长度={size},有点阻塞!") + + # 读取历史数据 + self.read_from_csv(instrument_id) + self.load_stops_from_json(instrument_id) + self.daily_reset(instrument_id) + + param = self.param_dict[instrument_id] # 获取参数 + self.current_symbol = instrument_id # 设置当前合约 + + # 处理Tick数据 + self.tickcome(data) + trade_df = DataStore.trade_dfs[instrument_id] # 获取交易数据 + + # 新K线开始时 + if len(trade_df) > param.processed_rows: + # 止损检查 + if self.check_stop_loss(trade_df, param, data): + param.processed_rows = len(trade_df) + continue + + # 计算Delta累计(必须在保存之前) + self.calculate_delta_cumulative(trade_df, param) + + # 保存 ofdata.json(计算 delta累计 之后) + self.save_ofdata_json(instrument_id) + + # 计算交易信号 + bull_signal, bear_signal, dj_last, delta_last = self.calculate_signals(trade_df, param) + + if bull_signal or bear_signal: + # 平仓信号(与持仓相反) + if param.position < 0 and (bear_signal or bull_signal): + self.execute_close_short(trade_df, param, data) + elif param.position > 0 and (bull_signal or bear_signal): + self.execute_close_long(trade_df, param, data) + + # 开仓信号(无持仓) + if bull_signal and param.position == 0: + self.execute_open_long(trade_df, param, data) + if bear_signal and param.position == 0: + self.execute_open_short(trade_df, param, data) + + # 检查待成交订单 + self.check_pending_orders(trade_df, param, data) + + param.processed_rows = len(trade_df) + + except queue.Empty: # 队列为空则继续循环 + pass + + def distribute_tick(self): + """行情分发线程""" + while True: + if self.status == 0: # 连接正常 + try: + data = self.md_queue.get(block=True, timeout=0.1) # 获取行情数据 + instrument_id = data["InstrumentID"].decode() + + try: + self.queue_dict[instrument_id].put(data, block=True, timeout=1.0) # 分发到各合约队列 + except queue.Full: + print(f"{instrument_id}合约信号计算阻塞导致队列已满") + except queue.Empty: + pass + else: # 连接断开则等待 + time_module.sleep(0.1) + + def start(self, param_dict): + """启动交易引擎""" + self.param_dict = param_dict # 保存参数 + + now = datetime.now() # 当前时间 + current_hour = now.hour + if current_hour >= 21: # 21点后属于下一日 + today_date = (now + pd.Timedelta(days=1)).date() + elif current_hour < 15: # 15点前属于当日 + today_date = now.date() + else: # 15点后属于当日 + today_date = now.date() + + data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录 + for symbol in param_dict.keys(): + # 加载历史订单流数据 + json_path = f"{data_dir}/{symbol}_ofdata.json" + if os.path.exists(json_path): + try: + DataStore.trade_dfs[symbol] = pd.read_json(json_path, lines=True) + if not DataStore.trade_dfs[symbol].empty: + print(f"加载历史订单流: {json_path}, 共{len(DataStore.trade_dfs[symbol])}条") + except Exception as e: + print(f"加载失败: {e}") + DataStore.trade_dfs[symbol] = pd.DataFrame({}) + else: + DataStore.trade_dfs[symbol] = pd.DataFrame({}) + + # 创建品种队列 + self.queue_dict[symbol] = queue.Queue(SYSTEM_CONFIG["queue_max_size"]) + + # 启动信号计算线程 + sig_thread = threading.Thread( + target=self.cal_sig, + args=(self.queue_dict[symbol],), + daemon=True, # 守护线程 + name=f"CalSig_{symbol}" + ) + sig_thread.start() + + # 启动行情分发线程 + distribute_thread = threading.Thread(target=self.distribute_tick, daemon=True) + distribute_thread.start() + + try: + distribute_thread.join() # 等待分发线程结束 + except KeyboardInterrupt: # 捕获中断信号 + print("接收到中断信号,正在关闭...") + raise + + +# ============ 主程序入口 ============ + +def run_trader(param_dict, broker_id, td_server, investor_id, password, + app_id, auth_code, md_queue=None, page_dir=""): + """启动交易进程""" + trader = OrderFlowTrader( # 创建交易引擎实例 + broker_id, td_server, investor_id, password, + app_id, auth_code, md_queue, page_dir + ) + trader.start(param_dict) # 启动交易引擎 + + +if __name__ == "__main__": + # 创建品种参数(自动从配置读取所有合约) + param_dict = {} + for symbol, config in TRADING_PARAMS.items(): + param_dict[symbol] = TradingParam.from_config(symbol, config) + + # 连接SimNow模拟交易 + future_account = get_simulate_account( + investor_id=SIMNOW_CONFIG["investor_id"], + password=SIMNOW_CONFIG["password"], + server_name=SIMNOW_CONFIG["server_name"], + subscribe_list=list(param_dict.keys()), + ) + + # 连接实盘(备用) + # from CtpPlus.CTP.FutureAccount import FutureAccount + # future_account = FutureAccount(...) + + print("开始", len(future_account.subscribe_list)) + + # 创建共享队列 + share_queue = Queue(maxsize=SYSTEM_CONFIG["queue_share_size"]) + + # 启动行情进程 + md_process = Process(target=run_tick_engine, args=(future_account, [share_queue])) + + # 启动交易进程 + trader_process = Process( + target=run_trader, + args=( + param_dict, + future_account.broker_id, + future_account.server_dict["TDServer"], + future_account.investor_id, + future_account.password, + future_account.app_id, + future_account.auth_code, + share_queue, + future_account.td_flow_path, + ), + ) + + md_process.start() # 启动行情进程 + trader_process.start() # 启动交易进程 + + md_process.join() # 等待行情进程结束 + trader_process.join() # 等待交易进程结束 \ No newline at end of file