Files

1237 lines
61 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
订单流交易策略
功能:处理Tick行情数据,计算订单流指标,生成交易信号并执行交易
"""
import queue # 队列模块,用于线程间通信
import threading # 线程模块
import os # 操作系统模块
import json # JSON模块
import re # 正则表达式模块
import csv # CSV模块
import time as time_module # 标准库时间模块
from datetime import datetime, time as datetime_time # datetime时间对象
from multiprocessing import Process, Queue # 多进程模块
import pandas as pd # 数据分析库
import numpy as np # 数值计算库
import smtplib # SMTP邮件发送库
from email.mime.text import MIMEText # 邮件文本格式
from email.mime.multipart import MIMEMultipart # 邮件多部分格式
from CtpPlus.CTP.MdApi import run_tick_engine # 行情引擎
from CtpPlus.CTP.FutureAccount import get_simulate_account # 获取模拟账户
from CtpPlus.CTP.TraderApiBase import TraderApiBase # 交易API基类
# 导入配置
from config import (
SMTP_CONFIG, TRADING_PARAMS, SIMNOW_CONFIG, SYSTEM_CONFIG, NIGHT_CLEARING_TIME,
FEISHU_CONFIG
)
# ============ 全局数据字典 ============
class DataStore:
"""全局数据存储"""
tickdata = {} # K线数据 {symbol: DataFrame}
quote = {} # 报价数据 {symbol: {bid, ask}}
ofdata = {} # 订单流数据 {symbol: DataFrame}
trade_dfs = {} # 交易数据 {symbol: DataFrame}
prev_volume = {} # 上个Tick成交量 {symbol: volume}
last_tick = {} # 上条Tick数据 {symbol: tick}
# ============ 参数配置类 ============
class TradingParam:
"""交易参数配置"""
def __init__(
self, symbol, lots, price_offset, delta_threshold,
imbalance_ratio, accumulation_threshold, period,
min_volume=0, merge_price=1, mini_price=0.2
):
self.symbol = symbol # 合约代码
self.lots = lots # 交易手数
self.price_offset = price_offset # 价格偏移量(下单时加减)
self.delta_threshold = delta_threshold # Delta阈值(开仓条件)
self.imbalance_ratio = imbalance_ratio # 失衡比率(堆积判断)
self.accumulation_threshold = accumulation_threshold # 堆积阈值
self.period = period # K线周期
self.min_volume = min_volume # 最小成交量过滤
self.merge_price = merge_price # 价格档位合并数量
self.mini_price = mini_price # 最小价格变动
# 持仓状态
self.position = 0 # 当前持仓(1多头/-1空头/0无持仓)
self.pending_long_price = 0 # 待执行开多价格
self.pending_short_price = 0 # 待执行开空价格
self.sl_long_price = 0 # 多头止损价
self.sl_short_price = 0 # 空头止损价
self.clearing_executed = False # 日终清算是否已执行
self.load_historical_data = True # 是否加载历史数据
self.processed_rows = 0 # 已处理的行数(用于判断新K线)
@classmethod
def from_config(cls, symbol, config_dict):
"""从配置字典创建参数"""
return cls(
symbol=symbol, # 合约代码
lots=config_dict["lots"], # 手数
price_offset=config_dict["price_offset"], # 价格偏移
delta_threshold=config_dict["delta_threshold"], # Delta阈值
imbalance_ratio=config_dict["imbalance_ratio"], # 失衡比率
accumulation_threshold=config_dict["accumulation_threshold"], # 堆积阈值
period=config_dict["period"], # K线周期
min_volume=config_dict.get("min_volume", 0), # 最小成交量
merge_price=config_dict.get("merge_price", 1), # 价格合并
mini_price=config_dict.get("mini_price", 0.2) # 最小价格
)
# ============ 邮件通知模块 ============
class MailNotifier:
"""邮件通知类"""
def __init__(self):
self.config = SMTP_CONFIG # 加载SMTP配置
def send(self, text, subject="TD_Simnow_Signal"):
"""发送邮件通知"""
if not self.config["password"]: # 未配置密码则跳过
print("SMTP密码未配置,跳过邮件发送")
return
try:
msg = MIMEMultipart() # 创建多部分邮件对象
msg["From"] = self.config["sender"] # 发件人
msg["To"] = ";".join(self.config["receivers"]) # 收件人(逗号分隔)
msg["Subject"] = subject # 邮件主题
msg.attach(MIMEText(text, "plain", "utf-8")) # 添加纯文本内容
smtp = smtplib.SMTP_SSL(self.config["server"], self.config["port"]) # 连接SSL SMTP
smtp.login(self.config["username"], self.config["password"]) # 登录
smtp.sendmail(self.config["sender"], self.config["receivers"], msg.as_string()) # 发送
smtp.quit() # 关闭连接
except Exception as e:
print(f"邮件发送失败: {e}") # 捕获并打印异常
class FeishuNotifier:
"""飞书通知类"""
def __init__(self):
self.config = FEISHU_CONFIG # 加载飞书配置
def send(self, text):
"""发送飞书消息"""
if not self.config.get("enabled", False): # 未启用则跳过
return
try:
import requests # 导入requests库
headers = {"Content-Type": "application/json"} # 请求头
data = {
"msg_type": "text", # 消息类型为文本
"content": {"text": text} # 文本内容
}
response = requests.post(self.config["webhook_url"], headers=headers, json=data) # POST请求
if response.status_code != 200: # 检查响应状态
print(f"飞书消息发送失败,状态码: {response.status_code}")
except Exception as e:
print(f"飞书消息发送失败: {e}")
# ============ 交易引擎类 ============
class OrderFlowTrader(TraderApiBase):
"""订单流交易引擎"""
def __init__(self, broker_id, td_server, investor_id, password,
app_id, auth_code, md_queue=None, page_dir=""):
self.param_dict = {} # 交易参数字典 {symbol: TradingParam}
self.queue_dict = {} # 信号计算队列字典 {symbol: Queue}
self.current_symbol = " " # 当前处理的合约代码
self.last_stops_loatime = {} # 上次加载止盈止损的时间 {symbol: timestamp}
# 线程锁(保证多线程访问共享数据的安全性)
self._lock = threading.RLock() # 综合锁
self._trade_locks = {} # 交易数据锁
self._prev_volume_lock = threading.RLock() # 成交量锁
self._tickdata_lock = threading.RLock() # K线数据锁
self._quote_lock = threading.RLock() # 报价数据锁
self._ofdata_lock = threading.RLock() # 订单流数据锁
self._last_tick_lock = threading.RLock() # 上个Tick锁
# 邮件通知器
self.mail = MailNotifier()
# 飞书通知器
self.feishu = FeishuNotifier()
# 数据存储
self._json_save_counter = {} # JSON保存计数器
self.md_queue = md_queue # 行情队列
self._tick_record_counter = {} # tick录制计数器 {symbol: count}
# ============ 行情数据处理 ============
def tickcome(self, md_queue):
"""接收并处理Tick行情数据"""
data = md_queue # 从队列获取行情数据
# === tick录制(实盘模式) ===
if SYSTEM_CONFIG.get("record_tick", False):
instrument_id_raw = data["InstrumentID"]
symbol = instrument_id_raw.decode() if isinstance(instrument_id_raw, bytes) else instrument_id_raw
self._tick_record_counter[symbol] = self._tick_record_counter.get(symbol, 0) + 1
interval = SYSTEM_CONFIG.get("tick_record_interval", 1)
if self._tick_record_counter[symbol] % interval == 0:
self._record_tick_to_csv(data, symbol)
instrument_id = data["InstrumentID"].decode() # 解码合约代码
action_day = data["ActionDay"].decode() # 解码交易日期
update_time = data["UpdateTime"].decode() # 解码更新时间
# 格式化日期为YYYY-MM-DD格式
action_day_fmt = f"{action_day[:4]}-{action_day[4:6]}-{action_day[6:]}"
# 组合完整的时间戳(包含毫秒)
created_at = datetime.strptime(
f"{action_day_fmt} {update_time}.{data['UpdateMillisec']}",
"%Y-%m-%d %H:%M:%S.%f"
)
# 计算瞬时成交量(当前成交量 - 上个Tick成交量)
with self._prev_volume_lock:
prev_vol = DataStore.prev_volume.get(instrument_id, 0) # 获取上次成交量
curr_vol = int(data["Volume"]) # 当前总成交量
last_vol = curr_vol - prev_vol if prev_vol != 0 else 0 # 计算瞬时成交量
DataStore.prev_volume[instrument_id] = curr_vol # 更新存储的成交量
if last_vol <= 0: # 无增量则返回(避免重复处理)
return
# 构建Tick数据结构
tick = {
"symbol": instrument_id, # 合约代码
"created_at": created_at, # 时间戳
"price": float(data["LastPrice"]), # 最新价
"last_volume": last_vol, # 瞬时成交量
"bid_p": float(data["BidPrice1"]), # 买一价
"bid_v": int(data["BidVolume1"]), # 买一量
"ask_p": float(data["AskPrice1"]), # 卖一价
"ask_v": int(data["AskVolume1"]), # 卖一量
"upper_limit": float(data["UpperLimitPrice"]), # 涨停价
"lower_limit": float(data["LowerLimitPrice"]), # 跌停价
"trading_day": data["TradingDay"].decode(), # 交易日
"cum_volume": curr_vol, # 累计成交量
"cum_amount": float(data["Turnover"]), # 累计成交额
"cum_position": int(data["OpenInterest"]), # 累计持仓量
}
self.on_tick(tick) # 调用Tick处理回调
def _record_tick_to_csv(self, data: dict, symbol: str):
"""录制 tick 数据到 CSV"""
data_dir = SYSTEM_CONFIG["data_dir"]
os.makedirs(data_dir, exist_ok=True)
csv_path = f"{data_dir}/{symbol}_tick.csv"
# CSV 字段
fields = [
"InstrumentID", "ActionDay", "UpdateTime", "UpdateMillisec",
"LastPrice", "Volume", "BidPrice1", "BidVolume1",
"AskPrice1", "AskVolume1", "UpperLimitPrice", "LowerLimitPrice",
"TradingDay", "Turnover", "OpenInterest"
]
# 写入字段(bytes 解码)
row = {}
for field in fields:
val = data.get(field)
if isinstance(val, bytes):
row[field] = val.decode()
else:
row[field] = val
file_exists = os.path.exists(csv_path)
with open(csv_path, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fields)
if not file_exists:
writer.writeheader()
writer.writerow(row)
def on_tick(self, tick):
"""处理单个Tick数据"""
if tick["last_volume"] == 0: # 无成交量则返回
return
tsymbol = tick["symbol"] # 合约代码
# 获取上条Tick的买卖盘数据进行对比
with self._last_tick_lock:
prev_tick = DataStore.last_tick.get(tsymbol) # 获取上条Tick
if prev_tick: # 存在则使用上条数据
bid_p = prev_tick["bid_p"]
ask_p = prev_tick["ask_p"]
bid_v = prev_tick["bid_v"]
ask_v = prev_tick["ask_v"]
else: # 不存在则使用当前数据初始化
bid_p = tick["bid_p"]
ask_p = tick["ask_p"]
bid_v = tick["bid_v"]
ask_v = tick["ask_v"]
DataStore.last_tick[tsymbol] = tick # 更新存储的上条Tick
# 格式化时间字符串(毫秒精度)
timetick = tick["created_at"].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# 构建DataFrame格式的Tick数据
tick_dt = pd.DataFrame({
"datetime": timetick, # 时间戳字符串
"symbol": tick["symbol"], # 合约代码
"mainsym": tick["symbol"].rstrip("0123456789").upper(), # 主合约代码(去除数字)
"lastprice": tick["price"], # 最新价
"vol": tick["last_volume"], # 瞬时成交量
"bid_p": bid_p, # 买一价
"ask_p": ask_p, # 卖一价
"bid_v": bid_v, # 买一量
"ask_v": ask_v, # 卖一量
}, index=[0])
self.tickdata(tick_dt, tsymbol) # 调用K线聚合处理
def data_of(self, symbol, df):
"""将订单流数据添加到交易DataFrame"""
with self._lock:
if symbol not in self._trade_locks: # 首次访问则创建锁
self._trade_locks[symbol] = threading.RLock()
trade_lock = self._trade_locks[symbol]
with trade_lock:
existing = DataStore.trade_dfs.get(symbol) # 获取现有数据
if existing is None or existing.empty: # 无数据则直接赋值
DataStore.trade_dfs[symbol] = df
else:
# 检查是否有重复的 datetime,有则删除旧数据
if "datetime" in df.columns and "datetime" in existing.columns:
new_dt = df["datetime"].iloc[0] if len(df) > 0 else None
if new_dt is not None:
# 删除已存在的相同 datetime 的旧数据
existing = existing[existing["datetime"] != new_dt]
DataStore.trade_dfs[symbol] = pd.concat([existing, df], ignore_index=True) # 合并数据
def process_quote(self, bid_dict, ask_dict, symbol):
"""处理并合并买卖盘数据"""
with self._quote_lock:
dic = DataStore.quote.get(symbol) # 获取该合约的现有报价
if dic:
bid_result = dic["bid_result"].copy() # 复制买盘数据
ask_result = dic["ask_result"].copy() # 复制卖盘数据
else:
bid_result, ask_result = {}, {} # 无数据则初始化空字典
# 合并价格档位(买卖盘价格可能有差异)
price_list = sorted(set(bid_dict.keys()) | set(ask_dict.keys()))
for price in price_list:
bid_val = int(bid_dict.get(price, 0)) # 获取买盘量
ask_val = int(ask_dict.get(price, 0)) # 获取卖盘量
if bid_val: # 累加买盘量
bid_result[price] = bid_result.get(price, 0) + bid_val
ask_result.setdefault(price, 0) # 确保卖盘有该价格档位
if ask_val: # 累加卖盘量
ask_result[price] = ask_result.get(price, 0) + ask_val
bid_result.setdefault(price, 0) # 确保买盘有该价格档位
DataStore.quote[symbol] = {"bid_result": bid_result, "ask_result": ask_result} # 更新存储
return bid_result, ask_result # 返回合并后的数据
def tickdata(self, df, symbol):
"""将Tick数据聚合成K线"""
tickdata = pd.DataFrame({
"datetime": df["datetime"], # 时间
"symbol": df["symbol"], # 合约代码
"lastprice": df["lastprice"], # 最新价
"vol": df["vol"], # 成交量
"bid_p": df["bid_p"], # 买一价
"bid_v": df["bid_v"], # 买一量
"ask_p": df["ask_p"], # 卖一价
"ask_v": df["ask_v"], # 卖一量
})
try:
with self._tickdata_lock:
rdf = DataStore.tickdata.get(symbol) # 获取已存储的K线数据
if rdf is not None: # 已存在K线数据(需要累加)
rdftm = pd.to_datetime(rdf["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S") # 获取K线时间
# 获取当前 tick 的 bar 时间(可能还未设置,用 datetime 作为备选)
tick_bartime = tickdata.get("bartime", tickdata["datetime"])[0]
now_bartime = pd.to_datetime(tick_bartime).strftime("%Y-%m-%d %H:%M:%S")
if now_bartime > rdftm: # 新K线开始(时间大于上一K线)
# 保存旧K线的订单流数据
with self._ofdata_lock:
of = DataStore.ofdata.pop(symbol, None)
if of is not None:
self.data_of(symbol, of) # 写入交易DataFrame
# 保存 ofdata.json(改善时间戳对齐:旧K线完成时即保存)
self.save_ofdata_json(symbol)
# 清空旧K线的报价数据
with self._quote_lock:
DataStore.quote.pop(symbol, None)
# 清空旧K线的Tick数据
with self._tickdata_lock:
DataStore.tickdata.pop(symbol, None)
# 初始化新K线数据
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) # K线时间
tickdata["open"] = tickdata["lastprice"] # 开盘价
tickdata["high"] = tickdata["lastprice"] # 最高价
tickdata["low"] = tickdata["lastprice"] # 最低价
tickdata["close"] = tickdata["lastprice"] # 收盘价
tickdata["starttime"] = tickdata["datetime"] # 开始时间
else: # 同一K线内:累加数据
tickdata["bartime"] = rdf["bartime"] # 继承K线时间
tickdata["open"] = rdf["open"] # 继承开盘价
lastprice_val = float(tickdata["lastprice"].values[0]) # 当前价格
# 更新最高价(取最大值)
tickdata["high"] = max(lastprice_val, float(rdf["high"].values[0]) if len(rdf["high"].values) > 0 else 0)
# 更新最低价(取最小值)
tickdata["low"] = min(lastprice_val, float(rdf["low"].values[0]) if len(rdf["low"].values) > 0 else 0)
tickdata["close"] = tickdata["lastprice"] # 更新收盘价
tickdata["vol"] = df["vol"].values + rdf["vol"].values # 累加成交量
tickdata["starttime"] = rdf["starttime"] # 继承开始时间
else: # 首条Tick,创建新K线
print("新bar的第一个tick进入")
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"])
tickdata["open"] = tickdata["lastprice"]
tickdata["high"] = tickdata["lastprice"]
tickdata["low"] = tickdata["lastprice"]
tickdata["close"] = tickdata["lastprice"]
tickdata["starttime"] = tickdata["datetime"]
except Exception as e:
print(f"tickdata异常: {e}")
if "bartime" not in tickdata.columns: # 确保bartime存在
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"])
# 确保bartime存在且格式正确
if "bartime" not in tickdata.columns:
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"])
tickdata["bartime"] = pd.to_datetime(tickdata["bartime"])
param = self.param_dict[self.current_symbol] # 获取交易参数
# 重采样聚合K线(按指定周期合并)
bardata = (
tickdata.resample(on="bartime", rule=param.period, label="right", closed="right")
.agg({
"starttime": "first", # 开始时间取第一个
"symbol": "last", # 合约代码取最后一个
"open": "first", # 开盘价取第一个
"high": "max", # 最高价取最大
"low": "min", # 最低价取最小
"close": "last", # 收盘价取最后一个
"vol": "sum", # 成交量求和
})
.reset_index(drop=False)
)
bardata = bardata.dropna().reset_index(drop=True) # 删除空值行
# 只保留最后一个 bar(避免同一周期产生多行)
if len(bardata) > 1:
bardata = bardata.iloc[[-1]]
bardata["bartime"] = pd.to_datetime(bardata["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S") # 格式化时间
with self._tickdata_lock:
DataStore.tickdata[symbol] = bardata # 更新存储的K线数据
# 使用单tick成交量进行后续处理
tickdata["vol"] = df["vol"].values
self.orderflow_df_new(tickdata, bardata, symbol) # 生成订单流数据
def merge_price_levels(self, price_dict, merge_count, mini_price):
"""合并价格档位"""
if not price_dict or merge_count <= 1 or mini_price <= 0: # 参数无效则返回原数据
return price_dict
merged = {}
sorted_prices = sorted([float(p) for p in price_dict.keys()]) # 排序价格
if not sorted_prices: # 无价格则返回空
return {}
min_price = sorted_prices[0] # 最低价
merge_interval = merge_count * mini_price # 合并间隔
# 按间隔分组合并成交量
for price_str, vol in price_dict.items():
price = float(price_str)
group_idx = int((price - min_price) // merge_interval) # 计算组索引
merged_price = min_price + group_idx * merge_interval # 计算合并后的价格
key = str(merged_price)
merged[key] = merged.get(key, 0) + vol # 累加成交量
return merged # 返回合并后的数据
def filter_by_min_vol(self, price_dict, min_vol):
"""过滤最小成交量以下的价格档位"""
if min_vol <= 0: # 无阈值则返回原数据
return price_dict
return {k: v for k, v in price_dict.items() if v >= min_vol} # 过滤低成交量档位
def orderflow_df_new(self, df_tick, df_min, symbol):
"""生成订单流DataFrame"""
param = self.param_dict[symbol] # 获取交易参数
min_vol = param.min_volume # 最小成交量
merge_price = param.merge_price # 价格合并数量
# 单tick成交量,用于bid/ask方向判断
bar_vol = int(df_tick["vol"].values[0])
# K线周期内成交量之和(resample聚合结果)
period_vol = int(df_min["vol"].values[0])
bar_close = float(df_min["close"].values[0]) # K线收盘价
bar_open = float(df_min["open"].values[0]) # K线开盘价
bar_low = float(df_min["low"].values[0]) # K线最低价
bar_high = float(df_min["high"].values[0]) # K线最高价
dt = df_min["bartime"].values[0] # K线时间
bp1 = float(df_tick["bid_p"].values[0]) # 买一价
ap1 = float(df_tick["ask_p"].values[0]) # 卖一价
last_price = float(df_tick["lastprice"].values[0]) # 最新价
volume = bar_vol # 成交量
bar_symbol = df_tick["symbol"].values[0] # 合约代码
bid_dict = {} # 买盘字典
ask_dict = {} # 卖盘字典
# 判断价格位置并归类到买卖盘
if last_price >= ap1: # 价格>=卖一价,记录到卖盘
ask_dict[str(last_price)] = ask_dict.get(str(last_price), 0) + volume
if last_price <= bp1: # 价格<=买一价,记录到买盘
bid_dict[str(last_price)] = bid_dict.get(str(last_price), 0) + volume
bid_result, ask_result = self.process_quote(bid_dict, ask_dict, symbol) # 合并买卖盘数据
# 应用价格合并
if merge_price > 1:
bid_result = self.merge_price_levels(bid_result, merge_price, param.mini_price)
ask_result = self.merge_price_levels(ask_result, merge_price, param.mini_price)
# 价格升序排序
bid_result = dict(sorted(bid_result.items(), key=lambda x: x[0]))
ask_result = dict(sorted(ask_result.items(), key=lambda x: x[0]))
price_list = list(bid_result.keys()) # 价格列表
ask_list = list(ask_result.values()) # 卖盘量列表
bid_list = list(bid_result.values()) # 买盘量列表
delta = sum(ask_list) - sum(bid_list) # Delta = 卖量 - 买量
# 构建订单流DataFrame
df = pd.DataFrame({
"price": pd.Series([price_list]), # 价格列表
"Ask": pd.Series([ask_list]), # 卖盘量
"Bid": pd.Series([bid_list]), # 买盘量
})
df["symbol"] = bar_symbol # 合约代码
df["datetime"] = dt # 时间
df["delta"] = delta # Delta值
df["close"] = bar_close # 收盘价
df["open"] = bar_open # 开盘价
df["high"] = bar_high # 最高价
df["low"] = bar_low # 最低价
df["vol"] = period_vol # 周期成交量
# 计算堆积指标
dj_count, dj_high, dj_low = self.calculate_accumulation(df, min_vol)
df["dj"] = dj_count # 堆积数量
df["dj_price_high"] = dj_high # 堆积区间高价
df["dj_price_low"] = dj_low # 堆积区间低价
with self._ofdata_lock:
DataStore.ofdata[symbol] = df # 更新订单流数据
def calculate_accumulation(self, kdata, min_vol=0):
"""计算堆积指标(Order Flow Delta)
Args:
kdata: 订单流DataFrame
min_vol: 最小成交量过滤
Returns:
(dj_count, dj_price_high, dj_price_low)
"""
param = self.param_dict[self.current_symbol] # 获取交易参数
imbalance_ratio = int(param.imbalance_ratio) # 失衡比率
accumulation_threshold = int(param.accumulation_threshold) # 堆积阈值
dj_count = 0 # 堆积计数
dj_price_high = 0 # 堆积价格区间高
dj_price_low = 0 # 堆积价格区间低
bearish_zones = [] # 空头堆积区间
bullish_zones = [] # 多头堆积区间
# 遍历每个价格档位
for item in kdata.itertuples(index=False):
price_s = item.price # 价格序列
ask_s = item.Ask # 卖盘序列
bid_s = item.Bid # 买盘序列
bear_count = 0 # 空头连续计数
bull_count = 0 # 多头连续计数
bear_start, bear_end = None, None # 空头区间起止
bull_start, bull_end = None, None # 多头区间起止
price_len = len(price_s) # 价格档位数量
for i in range(price_len):
# 获取各档位成交量(低于最小值的视为0)
bid_val = bid_s[i] if i < len(bid_s) and bid_s[i] >= min_vol else 0
ask_val = ask_s[i] if i < len(ask_s) and ask_s[i] >= min_vol else 0
next_ask = ask_s[i + 1] if i + 1 < len(ask_s) and ask_s[i + 1] >= min_vol else 0
prev_bid = bid_s[i - 1] if i > 0 and bid_s[i - 1] >= min_vol else 0
# 空头堆积: 买盘 > 卖盘 * 失衡比率
if i < price_len - 1:
if bid_val > next_ask * imbalance_ratio: # 判断空头堆积
if bear_start is None:
bear_start = price_s[i] # 记录空头区间起点
bear_end = price_s[i] # 更新空头区间终点
bear_count += 1 # 连续计数+1
if bear_count >= accumulation_threshold and float(price_s[i]) > 0: # 达到阈值
dj_count -= 1 # 堆积计数-1
else: # 堆积中断
if bear_count >= accumulation_threshold and bear_start is not None:
bearish_zones.append((bear_start, bear_end)) # 记录空头区间
bear_count = 0 # 重置计数
bear_start, bear_end = None, None # 重置区间
# 多头堆积: 卖盘 > 买盘 * 失衡比率
if i >= 1 and i < price_len - 1:
if ask_val > prev_bid * imbalance_ratio: # 判断多头堆积
if bull_start is None:
bull_start = price_s[i] # 记录多头区间起点
bull_end = price_s[i] # 更新多头区间终点
bull_count += 1 # 连续计数+1
if bull_count >= accumulation_threshold and float(price_s[i]) > 0: # 达到阈值
dj_count += 1 # 堆积计数+1
else: # 堆积中断
if bull_count >= accumulation_threshold and bull_start is not None:
bullish_zones.append((bull_start, bull_end)) # 记录多头区间
bull_count = 0 # 重置计数
bull_start, bull_end = None, None # 重置区间
# 处理最后堆积(未中断的区间)
if bear_count >= accumulation_threshold and bear_start is not None:
bearish_zones.append((bear_start, bear_end))
if bull_count >= accumulation_threshold and bull_start is not None:
bullish_zones.append((bull_start, bull_end))
# 计算价格范围
all_zones = bearish_zones + bullish_zones
if all_zones:
all_prices = [float(p) for zone in all_zones for p in zone]
dj_price_high = max(all_prices) # 取最高价
dj_price_low = min(all_prices) # 取最低价
return dj_count, dj_price_high, dj_price_low # 返回堆积结果
# ============ 数据持久化 ============
def read_from_csv(self, symbol):
"""从CSV读取历史交易数据"""
param = self.param_dict[symbol] # 获取交易参数
data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录
os.makedirs(data_dir, exist_ok=True) # 确保目录存在
file_path = f"{data_dir}/{symbol}traderdata.csv" # 数据文件路径
if os.path.exists(file_path): # 文件存在则读取
df = pd.read_csv(file_path)
if not df.empty and param.load_historical_data: # 有数据且需加载
row = df.iloc[-1] # 取最后一行
param.position = int(row["pos"]) # 恢复持仓
param.sl_long_price = float(row["sl_long_price"]) # 恢复多头止损价
param.sl_short_price = float(row["sl_short_price"]) # 恢复空头止损价
print(f"加载历史数据: {df.iloc[-1]}")
param.load_historical_data = False # 标记已加载
def save_to_csv(self, symbol):
"""保存交易数据到CSV"""
# 确保 symbol 是字符串类型
if isinstance(symbol, bytes):
symbol = symbol.decode()
symbol = str(symbol)
# 从 param_dict 获取参数(需要确保是字符串 key)
if symbol in self.param_dict:
param = self.param_dict[symbol]
else:
# 尝试找到匹配的 key
for key in self.param_dict:
if str(key) == symbol:
param = self.param_dict[key]
break
else:
print(f"警告: 未找到 symbol {symbol} 的参数配置")
return
symbol = str(symbol)
data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录
os.makedirs(data_dir, exist_ok=True) # 确保目录存在
# 构建保存数据
data = {
"datetime": [DataStore.trade_dfs[symbol]["datetime"].iloc[-1]], # 最后时间
"pos": [param.position], # 持仓
"sl_long_price": [param.sl_long_price], # 多头止损价
"sl_short_price": [param.sl_short_price], # 空头止损价
}
df = pd.DataFrame(data)
file_path = f"{data_dir}/{symbol}traderdata.csv" # 文件路径
if os.path.exists(file_path): # 文件存在则追加
csv_df = pd.read_csv(file_path)
if df["pos"].iloc[-1] != csv_df["pos"].iloc[-1]: # 持仓变化则保存
df.to_csv(file_path, mode="a", header=False, index=False)
else: # 文件不存在则新建
df.to_csv(file_path, index=False)
self.save_stops_to_json(symbol, DataStore.trade_dfs[symbol]) # 同时保存止盈止损JSON
def save_stops_to_json(self, symbol, trade_df):
"""保存止盈止损数据到JSON"""
# 确保 symbol 是字符串类型
if isinstance(symbol, bytes):
symbol = symbol.decode()
symbol = str(symbol)
# 获取参数
if symbol in self.param_dict:
param = self.param_dict[symbol]
else:
for key in self.param_dict:
if str(key) == symbol:
param = self.param_dict[key]
break
else:
print(f"警告: 未找到 symbol {symbol} 的参数配置")
return
data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录
os.makedirs(data_dir, exist_ok=True) # 确保目录存在
file_path = f"{data_dir}/{symbol}_stops.json" # JSON文件路径
param = self.param_dict[symbol]
# 获取堆积价格区间
if trade_df is not None and len(trade_df) > 0:
dj_low = float(trade_df['dj_price_low'].iloc[-1]) if 'dj_price_low' in trade_df.columns else 0
dj_high = float(trade_df['dj_price_high'].iloc[-1]) if 'dj_price_high' in trade_df.columns else 0
else:
dj_low, dj_high = 0, 0
# 构建止盈止损数据结构
stops_data = {
"long": { # 多头止盈止损
"position": param.position if param.position > 0 else 0, # 持仓
"entry_price": dj_high if param.position > 0 else 0, # 入场价
"stop_loss": dj_low if param.position > 0 else 0 # 止损价
},
"short": { # 空头止盈止损
"position": abs(param.position) if param.position < 0 else 0, # 持仓
"entry_price": dj_high if param.position < 0 else 0, # 入场价
"stop_loss": dj_low if param.position < 0 else 0 # 止损价
}
}
with open(file_path, 'w', encoding='utf-8') as f: # 写入JSON文件
json.dump(stops_data, f, indent=4, ensure_ascii=False)
def load_stops_from_json(self, symbol):
"""从JSON加载止盈止损数据"""
# 确保 symbol 是字符串类型
if isinstance(symbol, bytes):
symbol = symbol.decode()
symbol = str(symbol)
current_time = time_module.time() # 当前时间戳
interval = SYSTEM_CONFIG["stops_load_interval"] # 加载间隔
# 处理 last_stops_loatime 的 key 类型
last_stops_time = self.last_stops_loatime
normalized_key = symbol
# 检查是否在加载间隔内(避免频繁读取磁盘)
if symbol in last_stops_time or normalized_key in [str(k) for k in last_stops_time]:
# 找到匹配的 key
for k in last_stops_time:
if str(k) == symbol:
if current_time - last_stops_time.get(k, 0) <= interval:
return False # 在间隔内,跳过
break
file_path = f"{SYSTEM_CONFIG['data_dir']}/{symbol}_stops.json" # JSON文件路径
if os.path.exists(file_path): # 文件存在则读取
try:
with open(file_path, 'r', encoding='utf-8') as f:
stops_data = json.load(f)
# 获取参数
if symbol in self.param_dict:
param = self.param_dict[symbol]
else:
for key in self.param_dict:
if str(key) == symbol:
param = self.param_dict[key]
break
else:
print(f"警告: 未找到 symbol {symbol} 的参数配置")
return False
# 恢复多头止盈止损
if stops_data['long']['position'] > 0:
param.position = stops_data['long']['position']
param.sl_long_price = stops_data['long']['entry_price']
# 恢复空头止盈止损
elif stops_data['short']['position'] > 0:
param.position = -stops_data['short']['position']
param.sl_short_price = stops_data['short']['entry_price']
self.last_stops_loatime[symbol] = current_time # 记录加载时间
print(f"加载止盈止损信息: {symbol}")
return True
except Exception as e:
print(f"加载止盈止损信息失败: {e}")
return False
def daily_reset(self, symbol):
"""每日收盘数据重置"""
param = self.param_dict[symbol] # 获取交易参数
sec = "".join(re.findall("[a-zA-Z]", str(symbol))) # 提取合约字母部分(如ru
current_time = datetime.now().time() # 当前时间
clearing_time1_start = datetime_time(15, 5) # 日盘清算开始(15:05
clearing_time1_end = datetime_time(15, 10) # 日盘清算结束(15:10
param.clearing_executed = False # 重置清算标志
# 判断是否在日盘清算时间段
if clearing_time1_start <= current_time <= clearing_time1_end:
param.clearing_executed = True # 标记已执行清算
DataStore.trade_dfs[symbol] = DataStore.trade_dfs[symbol].iloc[0:0] # 清空数据
# 判断是否在夜盘清算时间段
elif sec in NIGHT_CLEARING_TIME:
hour, minute = NIGHT_CLEARING_TIME[sec] # 获取夜盘清算时间
clearing_time2_start = datetime_time(hour, minute)
clearing_time2_end = datetime_time(hour, minute + 15) # 15分钟后结束
if clearing_time2_start <= current_time <= clearing_time2_end:
param.clearing_executed = True
DataStore.trade_dfs[symbol] = DataStore.trade_dfs[symbol].iloc[0:0]
return param.clearing_executed # 返回是否执行了清算
# ============ CTP回调 ============
def OnRtnTrade(self, pTrade):
"""成交回报回调"""
print("||成交回报||", pTrade)
def OnRspOrderInsert(self, pInputOrder, pRspInfo, nRequestID, bIsLast):
"""下单响应回调"""
print("||OnRspOrderInsert||", pInputOrder, pRspInfo, nRequestID, bIsLast)
def OnRtnOrder(self, pOrder):
"""订单回报回调"""
print("||订单回报||", pOrder)
# ============ 交易辅助 ============
def place_order(self, exchange_id, instrument_id, price, lots, direction, offset):
"""统一下单方法"""
self.insert_order(exchange_id, instrument_id, price, lots, direction, offset)
def place_close_orders(self, exchange_id, instrument_id, price, lots, direction):
"""平仓(下单两次:平昨+平今)"""
self.place_order(exchange_id, instrument_id, price, lots, direction, b"1") # 平昨
self.place_order(exchange_id, instrument_id, price, lots, direction, b"3") # 平今
# ============ 交易信号计算 ============
def check_stop_loss(self, trade_df, param, data):
"""检查并执行止损"""
# 多头止损条件:有多头持仓 且 收盘价 < 多头止损价
if param.position > 0 and param.sl_long_price > 0 and \
trade_df["close"].iloc[-1] < param.sl_long_price:
price = data["BidPrice1"] - param.price_offset # 计算止损价
print(f"多头止损: close={trade_df['close'].iloc[-1]}, SL={param.sl_long_price}")
self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"1") # 执行平仓
param.sl_long_price = 0 # 重置止损价
param.position = 0 # 清空持仓
self.save_to_csv(data["InstrumentID"]) # 保存数据
# 发送邮件通知
text = f"SL_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
f"Price:{price}, SL:{param.sl_long_price}, Lots:{param.lots}, " \
f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \
f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}"
self.mail.send(text)
return True
# 空头止损条件:有空头持仓 且 收盘价 > 空头止损价
if param.position < 0 and param.sl_short_price > 0 and \
trade_df["close"].iloc[-1] > param.sl_short_price:
price = data["AskPrice1"] + param.price_offset
print(f"空头止损: close={trade_df['close'].iloc[-1]}, SL={param.sl_short_price}")
self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"0") # 执行平仓
param.sl_short_price = 0
param.position = 0
self.save_to_csv(data["InstrumentID"])
text = f"SL_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
f"Price:{price}, SL:{param.sl_short_price}, Lots:{param.lots}, " \
f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \
f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}"
self.mail.send(text)
return True
return False # 无止损触发
def calculate_delta_cumulative(self, trade_df, param):
"""计算Delta累计"""
if trade_df["delta"].dtype == object: # 类型为对象则转换
trade_df["delta"] = trade_df["delta"].astype(float)
trade_df["datetime"] = pd.to_datetime(trade_df["datetime"]) # 转换时间格式
# 交易日判断(根据时间确定所属交易日)
last_dt = pd.to_datetime(trade_df["datetime"].iloc[-1])
hour = last_dt.hour
if hour >= 21: # 夜盘21点后属于下一个交易日
trading_day = (last_dt + pd.Timedelta(days=1)).date()
elif hour < 15: # 15点前属于当日
trading_day = last_dt.date()
else: # 15点后也属于当日
trading_day = last_dt.date()
trade_df["trading_day"] = str(trading_day) # 添加交易日列
# 按交易日分组累计Delta
trade_df["delta累计"] = trade_df.groupby("trading_day")["delta"].cumsum()
trade_df["datetime"] = trade_df["datetime"].dt.strftime("%Y-%m-%d %H:%M:%S") # 还原时间格式
def calculate_signals(self, trade_df, param):
"""计算交易信号"""
if len(trade_df) < 2: # 数据不足则返回
return None, None, None, None
dj_last = trade_df["dj"].iloc[-1] # 最新堆积值
delta_last = trade_df["delta"].iloc[-1] # 最新Delta值
# 开多条件:堆积值>=阈值 且 Delta>=阈值
bull_signal = dj_last >= param.accumulation_threshold and \
delta_last >= param.delta_threshold
# 开空条件:堆积值<=-阈值 且 Delta<=-阈值
bear_signal = dj_last <= -param.accumulation_threshold and \
delta_last <= -param.delta_threshold
print(f"开多:{bull_signal}, 开空:{bear_signal}, 持仓:{param.position}")
print(f"dj:{dj_last}, delta:{delta_last}")
return bull_signal, bear_signal, dj_last, delta_last # 返回信号
def execute_open_long(self, trade_df, param, data):
"""执行开多"""
param.pending_long_price = trade_df['dj_price_high'].iloc[-1] # 记录待买入价格(堆积区间高价)
print(f"开多信号触发,记录待买入价格: {param.pending_long_price}")
def execute_open_short(self, trade_df, param, data):
"""执行开空"""
param.pending_short_price = trade_df['dj_price_low'].iloc[-1] # 记录待卖出价格(堆积区间低价)
print(f"开空信号触发,记录待卖出价格: {param.pending_short_price}")
def check_pending_orders(self, trade_df, param, data):
"""检查并执行待成交订单"""
# 检查待开多订单(价格回落到堆积区间高价时买入)
if param.position == 0 and param.pending_long_price > 0:
current_price = data["AskPrice1"] # 当前卖一价
if current_price <= param.pending_long_price: # 价格触达
price = current_price + param.price_offset # 计算下单价
print(f"执行开多: 价格={price}")
self.place_order(data["ExchangeID"], data["InstrumentID"],
price, param.lots, b"0", b"0") # 开多
param.position = 1 # 更新持仓
param.sl_long_price = trade_df['dj_price_low'].iloc[-1] # 设置止损价
param.pending_long_price = 0 # 清空待执行价格
self.save_to_csv(data["InstrumentID"])
text = f"O_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
f"Price:{price}, Lots:{param.lots}, " \
f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \
f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}"
self.mail.send(text)
# 检查待开空订单(价格反弹到堆积区间低价时卖出)
if param.position == 0 and param.pending_short_price > 0:
current_price = data["BidPrice1"] # 当前买一价
if current_price >= param.pending_short_price: # 价格触达
price = current_price - param.price_offset
print(f"执行开空: 价格={price}")
self.place_order(data["ExchangeID"], data["InstrumentID"],
price, param.lots, b"1", b"0") # 开空
param.position = -1
param.sl_short_price = trade_df['dj_price_high'].iloc[-1] # 设置止损价
param.pending_short_price = 0
self.save_to_csv(data["InstrumentID"])
text = f"O_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
f"Price:{price}, Lots:{param.lots}, " \
f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \
f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}"
self.mail.send(text)
def execute_close_long(self, trade_df, param, data):
"""平多"""
price = data["BidPrice1"] - param.price_offset # 计算平多价格
print(f"平多: BidPrice1={price}")
self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"1") # 平多
param.position = 0 # 清空持仓
param.sl_long_price = 0 # 重置止损价
param.pending_long_price = 0 # 重置待执行价格
self.save_to_csv(data["InstrumentID"])
text = f"C_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
f"Price:{price}, Lots:{param.lots}, " \
f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}"
self.mail.send(text)
def execute_close_short(self, trade_df, param, data):
"""平空"""
price = data["AskPrice1"] + param.price_offset
print(f"平空: AskPrice1={price}")
self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"0") # 平空
param.position = 0
param.sl_short_price = 0
param.pending_short_price = 0
self.save_to_csv(data["InstrumentID"])
text = f"C_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
f"Price:{price}, Lots:{param.lots}, dj:{trade_df['dj'].iloc[-1]}"
self.mail.send(text)
def save_ofdata_json(self, symbol):
"""保存订单流数据到JSON(新K线生成时调用)"""
data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录
os.makedirs(data_dir, exist_ok=True) # 确保目录存在
json_path = f"{data_dir}/{symbol}_ofdata.json" # JSON文件路径
trade_df = DataStore.trade_dfs.get(symbol) # 获取交易数据
if trade_df is None or trade_df.empty:
return # 无数据则返回
save_df = trade_df.copy()
if 'datetime' in save_df.columns:
save_df['datetime'] = save_df['datetime'].astype(str) # 转换时间格式
# 去重:保留每个 datetime 的最后一条记录
save_df = save_df.drop_duplicates(subset=['datetime'], keep='last')
try:
save_df.to_json(json_path, orient='records', force_ascii=False, lines=True) # 写入JSON
except Exception as e:
print(f"JSON保存失败: {e}")
def save_json_periodically(self, instrument_id, trade_df):
"""定期保存订单流数据到JSON(已废弃,改为新K线触发保存)"""
# 此方法已废弃,保留用于兼容
pass
def cal_sig(self, symbol_queue):
"""交易信号计算线程主循环"""
while True:
try:
data = symbol_queue.get(block=True, timeout=SYSTEM_CONFIG["queue_timeout"]) # 从队列获取数据
instrument_id = data["InstrumentID"].decode() # 解码合约代码
# 检查队列阻塞
size = symbol_queue.qsize()
if size > SYSTEM_CONFIG.get("queue_warning_size", 5):
print(f"{instrument_id}队列长度={size},有点阻塞!")
# 读取历史数据
self.read_from_csv(instrument_id)
self.load_stops_from_json(instrument_id)
self.daily_reset(instrument_id)
param = self.param_dict[instrument_id] # 获取参数
self.current_symbol = instrument_id # 设置当前合约
# 处理Tick数据
self.tickcome(data)
trade_df = DataStore.trade_dfs[instrument_id] # 获取交易数据
# 新K线开始时
if len(trade_df) > param.processed_rows:
# 止损检查
if self.check_stop_loss(trade_df, param, data):
param.processed_rows = len(trade_df)
continue
# 计算Delta累计(必须在保存之前)
self.calculate_delta_cumulative(trade_df, param)
# 计算交易信号
bull_signal, bear_signal, dj_last, delta_last = self.calculate_signals(trade_df, param)
if bull_signal or bear_signal:
# 平仓信号(与持仓相反)
if param.position < 0 and (bear_signal or bull_signal):
self.execute_close_short(trade_df, param, data)
elif param.position > 0 and (bull_signal or bear_signal):
self.execute_close_long(trade_df, param, data)
# 开仓信号(无持仓)
if bull_signal and param.position == 0:
self.execute_open_long(trade_df, param, data)
if bear_signal and param.position == 0:
self.execute_open_short(trade_df, param, data)
# 检查待成交订单
self.check_pending_orders(trade_df, param, data)
param.processed_rows = len(trade_df)
except queue.Empty: # 队列为空则继续循环
pass
def distribute_tick(self):
"""行情分发线程"""
while True:
if self.status == 0: # 连接正常
try:
data = self.md_queue.get(block=True, timeout=0.1) # 获取行情数据
instrument_id = data["InstrumentID"].decode()
try:
self.queue_dict[instrument_id].put(data, block=True, timeout=1.0) # 分发到各合约队列
except queue.Full:
print(f"{instrument_id}合约信号计算阻塞导致队列已满")
except queue.Empty:
pass
else: # 连接断开则等待
time_module.sleep(0.1)
def start(self, param_dict):
"""启动交易引擎"""
self.param_dict = param_dict # 保存参数
now = datetime.now() # 当前时间
current_hour = now.hour
if current_hour >= 21: # 21点后属于下一日
today_date = (now + pd.Timedelta(days=1)).date()
elif current_hour < 15: # 15点前属于当日
today_date = now.date()
else: # 15点后属于当日
today_date = now.date()
data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录
for symbol in param_dict.keys():
# 加载历史订单流数据
json_path = f"{data_dir}/{symbol}_ofdata.json"
if os.path.exists(json_path):
try:
DataStore.trade_dfs[symbol] = pd.read_json(json_path, lines=True)
if not DataStore.trade_dfs[symbol].empty:
print(f"加载历史订单流: {json_path}, 共{len(DataStore.trade_dfs[symbol])}")
except Exception as e:
print(f"加载失败: {e}")
DataStore.trade_dfs[symbol] = pd.DataFrame({})
else:
DataStore.trade_dfs[symbol] = pd.DataFrame({})
# 创建品种队列
self.queue_dict[symbol] = queue.Queue(SYSTEM_CONFIG["queue_max_size"])
# 启动信号计算线程
sig_thread = threading.Thread(
target=self.cal_sig,
args=(self.queue_dict[symbol],),
daemon=True, # 守护线程
name=f"CalSig_{symbol}"
)
sig_thread.start()
# 启动行情分发线程
distribute_thread = threading.Thread(target=self.distribute_tick, daemon=True)
distribute_thread.start()
try:
distribute_thread.join() # 等待分发线程结束
except KeyboardInterrupt: # 捕获中断信号
print("接收到中断信号,正在关闭...")
raise
# ============ 主程序入口 ============
def run_trader(param_dict, broker_id, td_server, investor_id, password,
app_id, auth_code, md_queue=None, page_dir=""):
"""启动交易进程"""
trader = OrderFlowTrader( # 创建交易引擎实例
broker_id, td_server, investor_id, password,
app_id, auth_code, md_queue, page_dir
)
trader.start(param_dict) # 启动交易引擎
if __name__ == "__main__":
# 创建品种参数(自动从配置读取所有合约)
param_dict = {}
for symbol, config in TRADING_PARAMS.items():
param_dict[symbol] = TradingParam.from_config(symbol, config)
# ===模式切换:回测模式 ===
if SYSTEM_CONFIG.get("mode", "live") == "backtest":
from backtest import Backtester
backtester = Backtester(param_dict)
backtester.run()
else:
# === 实盘/模拟模式 ===
# 连接SimNow模拟交易
future_account = get_simulate_account(
investor_id=SIMNOW_CONFIG["investor_id"],
password=SIMNOW_CONFIG["password"],
server_name=SIMNOW_CONFIG["server_name"],
subscribe_list=list(param_dict.keys()),
)
# 连接实盘(备用)
# from CtpPlus.CTP.FutureAccount import FutureAccount
# future_account = FutureAccount(...)
print("开始", len(future_account.subscribe_list))
# 创建共享队列
share_queue = Queue(maxsize=SYSTEM_CONFIG["queue_share_size"])
# 启动行情进程
md_process = Process(target=run_tick_engine, args=(future_account, [share_queue]))
# 启动交易进程
trader_process = Process(
target=run_trader,
args=(
param_dict,
future_account.broker_id,
future_account.server_dict["TDServer"],
future_account.investor_id,
future_account.password,
future_account.app_id,
future_account.auth_code,
share_queue,
future_account.td_flow_path,
),
)
md_process.start() # 启动行情进程
trader_process.start() # 启动交易进程
md_process.join() # 等待行情进程结束
trader_process.join() # 等待交易进程结束