1189 lines
59 KiB
Python
1189 lines
59 KiB
Python
"""
|
||
订单流交易策略
|
||
功能:处理Tick行情数据,计算订单流指标,生成交易信号并执行交易
|
||
"""
|
||
import queue # 队列模块,用于线程间通信
|
||
import threading # 线程模块
|
||
import os # 操作系统模块
|
||
import json # JSON模块
|
||
import re # 正则表达式模块
|
||
import time as time_module # 标准库时间模块
|
||
from datetime import datetime, time as datetime_time # datetime时间对象
|
||
from multiprocessing import Process, Queue # 多进程模块
|
||
|
||
import pandas as pd # 数据分析库
|
||
import numpy as np # 数值计算库
|
||
import smtplib # SMTP邮件发送库
|
||
from email.mime.text import MIMEText # 邮件文本格式
|
||
from email.mime.multipart import MIMEMultipart # 邮件多部分格式
|
||
|
||
from CtpPlus.CTP.MdApi import run_tick_engine # 行情引擎
|
||
from CtpPlus.CTP.FutureAccount import get_simulate_account # 获取模拟账户
|
||
from CtpPlus.CTP.TraderApiBase import TraderApiBase # 交易API基类
|
||
|
||
# 导入配置
|
||
from config import (
|
||
SMTP_CONFIG, TRADING_PARAMS, SIMNOW_CONFIG, SYSTEM_CONFIG, NIGHT_CLEARING_TIME,
|
||
FEISHU_CONFIG
|
||
)
|
||
|
||
# ============ 全局数据字典 ============
|
||
class DataStore:
|
||
"""全局数据存储"""
|
||
tickdata = {} # K线数据 {symbol: DataFrame}
|
||
quote = {} # 报价数据 {symbol: {bid, ask}}
|
||
ofdata = {} # 订单流数据 {symbol: DataFrame}
|
||
trade_dfs = {} # 交易数据 {symbol: DataFrame}
|
||
prev_volume = {} # 上个Tick成交量 {symbol: volume}
|
||
last_tick = {} # 上条Tick数据 {symbol: tick}
|
||
|
||
|
||
# ============ 参数配置类 ============
|
||
|
||
class TradingParam:
|
||
"""交易参数配置"""
|
||
def __init__(
|
||
self, symbol, lots, price_offset, delta_threshold,
|
||
imbalance_ratio, accumulation_threshold, period,
|
||
min_volume=0, merge_price=1, mini_price=0.2
|
||
):
|
||
self.symbol = symbol # 合约代码
|
||
self.lots = lots # 交易手数
|
||
self.price_offset = price_offset # 价格偏移量(下单时加减)
|
||
self.delta_threshold = delta_threshold # Delta阈值(开仓条件)
|
||
self.imbalance_ratio = imbalance_ratio # 失衡比率(堆积判断)
|
||
self.accumulation_threshold = accumulation_threshold # 堆积阈值
|
||
self.period = period # K线周期
|
||
self.min_volume = min_volume # 最小成交量过滤
|
||
self.merge_price = merge_price # 价格档位合并数量
|
||
self.mini_price = mini_price # 最小价格变动
|
||
|
||
# 持仓状态
|
||
self.position = 0 # 当前持仓(1多头/-1空头/0无持仓)
|
||
self.pending_long_price = 0 # 待执行开多价格
|
||
self.pending_short_price = 0 # 待执行开空价格
|
||
self.sl_long_price = 0 # 多头止损价
|
||
self.sl_short_price = 0 # 空头止损价
|
||
self.clearing_executed = False # 日终清算是否已执行
|
||
self.load_historical_data = True # 是否加载历史数据
|
||
self.processed_rows = 0 # 已处理的行数(用于判断新K线)
|
||
|
||
@classmethod
|
||
def from_config(cls, symbol, config_dict):
|
||
"""从配置字典创建参数"""
|
||
return cls(
|
||
symbol=symbol, # 合约代码
|
||
lots=config_dict["lots"], # 手数
|
||
price_offset=config_dict["price_offset"], # 价格偏移
|
||
delta_threshold=config_dict["delta_threshold"], # Delta阈值
|
||
imbalance_ratio=config_dict["imbalance_ratio"], # 失衡比率
|
||
accumulation_threshold=config_dict["accumulation_threshold"], # 堆积阈值
|
||
period=config_dict["period"], # K线周期
|
||
min_volume=config_dict.get("min_volume", 0), # 最小成交量
|
||
merge_price=config_dict.get("merge_price", 1), # 价格合并
|
||
mini_price=config_dict.get("mini_price", 0.2) # 最小价格
|
||
)
|
||
|
||
|
||
# ============ 邮件通知模块 ============
|
||
|
||
class MailNotifier:
|
||
"""邮件通知类"""
|
||
def __init__(self):
|
||
self.config = SMTP_CONFIG # 加载SMTP配置
|
||
|
||
def send(self, text, subject="TD_Simnow_Signal"):
|
||
"""发送邮件通知"""
|
||
if not self.config["password"]: # 未配置密码则跳过
|
||
print("SMTP密码未配置,跳过邮件发送")
|
||
return
|
||
|
||
try:
|
||
msg = MIMEMultipart() # 创建多部分邮件对象
|
||
msg["From"] = self.config["sender"] # 发件人
|
||
msg["To"] = ";".join(self.config["receivers"]) # 收件人(逗号分隔)
|
||
msg["Subject"] = subject # 邮件主题
|
||
msg.attach(MIMEText(text, "plain", "utf-8")) # 添加纯文本内容
|
||
|
||
smtp = smtplib.SMTP_SSL(self.config["server"], self.config["port"]) # 连接SSL SMTP
|
||
smtp.login(self.config["username"], self.config["password"]) # 登录
|
||
smtp.sendmail(self.config["sender"], self.config["receivers"], msg.as_string()) # 发送
|
||
smtp.quit() # 关闭连接
|
||
except Exception as e:
|
||
print(f"邮件发送失败: {e}") # 捕获并打印异常
|
||
|
||
|
||
class FeishuNotifier:
|
||
"""飞书通知类"""
|
||
def __init__(self):
|
||
self.config = FEISHU_CONFIG # 加载飞书配置
|
||
|
||
def send(self, text):
|
||
"""发送飞书消息"""
|
||
if not self.config.get("enabled", False): # 未启用则跳过
|
||
return
|
||
|
||
try:
|
||
import requests # 导入requests库
|
||
headers = {"Content-Type": "application/json"} # 请求头
|
||
data = {
|
||
"msg_type": "text", # 消息类型为文本
|
||
"content": {"text": text} # 文本内容
|
||
}
|
||
response = requests.post(self.config["webhook_url"], headers=headers, json=data) # POST请求
|
||
if response.status_code != 200: # 检查响应状态
|
||
print(f"飞书消息发送失败,状态码: {response.status_code}")
|
||
except Exception as e:
|
||
print(f"飞书消息发送失败: {e}")
|
||
|
||
|
||
# ============ 交易引擎类 ============
|
||
|
||
class OrderFlowTrader(TraderApiBase):
|
||
"""订单流交易引擎"""
|
||
|
||
def __init__(self, broker_id, td_server, investor_id, password,
|
||
app_id, auth_code, md_queue=None, page_dir=""):
|
||
self.param_dict = {} # 交易参数字典 {symbol: TradingParam}
|
||
self.queue_dict = {} # 信号计算队列字典 {symbol: Queue}
|
||
self.current_symbol = " " # 当前处理的合约代码
|
||
self.last_stops_loatime = {} # 上次加载止盈止损的时间 {symbol: timestamp}
|
||
|
||
# 线程锁(保证多线程访问共享数据的安全性)
|
||
self._lock = threading.RLock() # 综合锁
|
||
self._trade_locks = {} # 交易数据锁
|
||
self._prev_volume_lock = threading.RLock() # 成交量锁
|
||
self._tickdata_lock = threading.RLock() # K线数据锁
|
||
self._quote_lock = threading.RLock() # 报价数据锁
|
||
self._ofdata_lock = threading.RLock() # 订单流数据锁
|
||
self._last_tick_lock = threading.RLock() # 上个Tick锁
|
||
|
||
# 邮件通知器
|
||
self.mail = MailNotifier()
|
||
|
||
# 飞书通知器
|
||
self.feishu = FeishuNotifier()
|
||
|
||
# 数据存储
|
||
self._json_save_counter = {} # JSON保存计数器
|
||
self.md_queue = md_queue # 行情队列
|
||
|
||
# ============ 行情数据处理 ============
|
||
|
||
def tickcome(self, md_queue):
|
||
"""接收并处理Tick行情数据"""
|
||
data = md_queue # 从队列获取行情数据
|
||
instrument_id = data["InstrumentID"].decode() # 解码合约代码
|
||
action_day = data["ActionDay"].decode() # 解码交易日期
|
||
update_time = data["UpdateTime"].decode() # 解码更新时间
|
||
|
||
# 格式化日期为YYYY-MM-DD格式
|
||
action_day_fmt = f"{action_day[:4]}-{action_day[4:6]}-{action_day[6:]}"
|
||
|
||
# 组合完整的时间戳(包含毫秒)
|
||
created_at = datetime.strptime(
|
||
f"{action_day_fmt} {update_time}.{data['UpdateMillisec']}",
|
||
"%Y-%m-%d %H:%M:%S.%f"
|
||
)
|
||
|
||
# 计算瞬时成交量(当前成交量 - 上个Tick成交量)
|
||
with self._prev_volume_lock:
|
||
prev_vol = DataStore.prev_volume.get(instrument_id, 0) # 获取上次成交量
|
||
curr_vol = int(data["Volume"]) # 当前总成交量
|
||
last_vol = curr_vol - prev_vol if prev_vol != 0 else 0 # 计算瞬时成交量
|
||
DataStore.prev_volume[instrument_id] = curr_vol # 更新存储的成交量
|
||
|
||
if last_vol <= 0: # 无增量则返回(避免重复处理)
|
||
return
|
||
|
||
# 构建Tick数据结构
|
||
tick = {
|
||
"symbol": instrument_id, # 合约代码
|
||
"created_at": created_at, # 时间戳
|
||
"price": float(data["LastPrice"]), # 最新价
|
||
"last_volume": last_vol, # 瞬时成交量
|
||
"bid_p": float(data["BidPrice1"]), # 买一价
|
||
"bid_v": int(data["BidVolume1"]), # 买一量
|
||
"ask_p": float(data["AskPrice1"]), # 卖一价
|
||
"ask_v": int(data["AskVolume1"]), # 卖一量
|
||
"upper_limit": float(data["UpperLimitPrice"]), # 涨停价
|
||
"lower_limit": float(data["LowerLimitPrice"]), # 跌停价
|
||
"trading_day": data["TradingDay"].decode(), # 交易日
|
||
"cum_volume": curr_vol, # 累计成交量
|
||
"cum_amount": float(data["Turnover"]), # 累计成交额
|
||
"cum_position": int(data["OpenInterest"]), # 累计持仓量
|
||
}
|
||
self.on_tick(tick) # 调用Tick处理回调
|
||
|
||
def on_tick(self, tick):
|
||
"""处理单个Tick数据"""
|
||
if tick["last_volume"] == 0: # 无成交量则返回
|
||
return
|
||
|
||
tsymbol = tick["symbol"] # 合约代码
|
||
|
||
# 获取上条Tick的买卖盘数据进行对比
|
||
with self._last_tick_lock:
|
||
prev_tick = DataStore.last_tick.get(tsymbol) # 获取上条Tick
|
||
if prev_tick: # 存在则使用上条数据
|
||
bid_p = prev_tick["bid_p"]
|
||
ask_p = prev_tick["ask_p"]
|
||
bid_v = prev_tick["bid_v"]
|
||
ask_v = prev_tick["ask_v"]
|
||
else: # 不存在则使用当前数据初始化
|
||
bid_p = tick["bid_p"]
|
||
ask_p = tick["ask_p"]
|
||
bid_v = tick["bid_v"]
|
||
ask_v = tick["ask_v"]
|
||
DataStore.last_tick[tsymbol] = tick # 更新存储的上条Tick
|
||
|
||
# 格式化时间字符串(毫秒精度)
|
||
timetick = tick["created_at"].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
||
|
||
# 构建DataFrame格式的Tick数据
|
||
tick_dt = pd.DataFrame({
|
||
"datetime": timetick, # 时间戳字符串
|
||
"symbol": tick["symbol"], # 合约代码
|
||
"mainsym": tick["symbol"].rstrip("0123456789").upper(), # 主合约代码(去除数字)
|
||
"lastprice": tick["price"], # 最新价
|
||
"vol": tick["last_volume"], # 瞬时成交量
|
||
"bid_p": bid_p, # 买一价
|
||
"ask_p": ask_p, # 卖一价
|
||
"bid_v": bid_v, # 买一量
|
||
"ask_v": ask_v, # 卖一量
|
||
}, index=[0])
|
||
self.tickdata(tick_dt, tsymbol) # 调用K线聚合处理
|
||
|
||
def data_of(self, symbol, df):
|
||
"""将订单流数据添加到交易DataFrame"""
|
||
with self._lock:
|
||
if symbol not in self._trade_locks: # 首次访问则创建锁
|
||
self._trade_locks[symbol] = threading.RLock()
|
||
trade_lock = self._trade_locks[symbol]
|
||
|
||
with trade_lock:
|
||
existing = DataStore.trade_dfs.get(symbol) # 获取现有数据
|
||
if existing is None or existing.empty: # 无数据则直接赋值
|
||
DataStore.trade_dfs[symbol] = df
|
||
else:
|
||
# 检查是否有重复的 datetime,有则删除旧数据
|
||
if "datetime" in df.columns and "datetime" in existing.columns:
|
||
new_dt = df["datetime"].iloc[0] if len(df) > 0 else None
|
||
if new_dt is not None:
|
||
# 删除已存在的相同 datetime 的旧数据
|
||
existing = existing[existing["datetime"] != new_dt]
|
||
DataStore.trade_dfs[symbol] = pd.concat([existing, df], ignore_index=True) # 合并数据
|
||
|
||
def process_quote(self, bid_dict, ask_dict, symbol):
|
||
"""处理并合并买卖盘数据"""
|
||
with self._quote_lock:
|
||
dic = DataStore.quote.get(symbol) # 获取该合约的现有报价
|
||
if dic:
|
||
bid_result = dic["bid_result"].copy() # 复制买盘数据
|
||
ask_result = dic["ask_result"].copy() # 复制卖盘数据
|
||
else:
|
||
bid_result, ask_result = {}, {} # 无数据则初始化空字典
|
||
|
||
# 合并价格档位(买卖盘价格可能有差异)
|
||
price_list = sorted(set(bid_dict.keys()) | set(ask_dict.keys()))
|
||
|
||
for price in price_list:
|
||
bid_val = int(bid_dict.get(price, 0)) # 获取买盘量
|
||
ask_val = int(ask_dict.get(price, 0)) # 获取卖盘量
|
||
|
||
if bid_val: # 累加买盘量
|
||
bid_result[price] = bid_result.get(price, 0) + bid_val
|
||
ask_result.setdefault(price, 0) # 确保卖盘有该价格档位
|
||
if ask_val: # 累加卖盘量
|
||
ask_result[price] = ask_result.get(price, 0) + ask_val
|
||
bid_result.setdefault(price, 0) # 确保买盘有该价格档位
|
||
|
||
DataStore.quote[symbol] = {"bid_result": bid_result, "ask_result": ask_result} # 更新存储
|
||
return bid_result, ask_result # 返回合并后的数据
|
||
|
||
def tickdata(self, df, symbol):
|
||
"""将Tick数据聚合成K线"""
|
||
tickdata = pd.DataFrame({
|
||
"datetime": df["datetime"], # 时间
|
||
"symbol": df["symbol"], # 合约代码
|
||
"lastprice": df["lastprice"], # 最新价
|
||
"vol": df["vol"], # 成交量
|
||
"bid_p": df["bid_p"], # 买一价
|
||
"bid_v": df["bid_v"], # 买一量
|
||
"ask_p": df["ask_p"], # 卖一价
|
||
"ask_v": df["ask_v"], # 卖一量
|
||
})
|
||
|
||
try:
|
||
with self._tickdata_lock:
|
||
rdf = DataStore.tickdata.get(symbol) # 获取已存储的K线数据
|
||
|
||
if rdf is not None: # 已存在K线数据(需要累加)
|
||
rdftm = pd.to_datetime(rdf["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S") # 获取K线时间
|
||
# 获取当前 tick 的 bar 时间(可能还未设置,用 datetime 作为备选)
|
||
tick_bartime = tickdata.get("bartime", tickdata["datetime"])[0]
|
||
now_bartime = pd.to_datetime(tick_bartime).strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
if now_bartime > rdftm: # 新K线开始(时间大于上一K线)
|
||
# 保存旧K线的订单流数据
|
||
with self._ofdata_lock:
|
||
of = DataStore.ofdata.pop(symbol, None)
|
||
if of is not None:
|
||
self.data_of(symbol, of) # 写入交易DataFrame
|
||
|
||
# 清空旧K线的报价数据
|
||
with self._quote_lock:
|
||
DataStore.quote.pop(symbol, None)
|
||
# 清空旧K线的Tick数据
|
||
with self._tickdata_lock:
|
||
DataStore.tickdata.pop(symbol, None)
|
||
|
||
# 初始化新K线数据
|
||
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"]) # K线时间
|
||
tickdata["open"] = tickdata["lastprice"] # 开盘价
|
||
tickdata["high"] = tickdata["lastprice"] # 最高价
|
||
tickdata["low"] = tickdata["lastprice"] # 最低价
|
||
tickdata["close"] = tickdata["lastprice"] # 收盘价
|
||
tickdata["starttime"] = tickdata["datetime"] # 开始时间
|
||
else: # 同一K线内:累加数据
|
||
tickdata["bartime"] = rdf["bartime"] # 继承K线时间
|
||
tickdata["open"] = rdf["open"] # 继承开盘价
|
||
lastprice_val = float(tickdata["lastprice"].values[0]) # 当前价格
|
||
# 更新最高价(取最大值)
|
||
tickdata["high"] = max(lastprice_val, float(rdf["high"].values[0]) if len(rdf["high"].values) > 0 else 0)
|
||
# 更新最低价(取最小值)
|
||
tickdata["low"] = min(lastprice_val, float(rdf["low"].values[0]) if len(rdf["low"].values) > 0 else 0)
|
||
tickdata["close"] = tickdata["lastprice"] # 更新收盘价
|
||
tickdata["vol"] = df["vol"].values + rdf["vol"].values # 累加成交量
|
||
tickdata["starttime"] = rdf["starttime"] # 继承开始时间
|
||
else: # 首条Tick,创建新K线
|
||
print("新bar的第一个tick进入")
|
||
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"])
|
||
tickdata["open"] = tickdata["lastprice"]
|
||
tickdata["high"] = tickdata["lastprice"]
|
||
tickdata["low"] = tickdata["lastprice"]
|
||
tickdata["close"] = tickdata["lastprice"]
|
||
tickdata["starttime"] = tickdata["datetime"]
|
||
except Exception as e:
|
||
print(f"tickdata异常: {e}")
|
||
if "bartime" not in tickdata.columns: # 确保bartime存在
|
||
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"])
|
||
|
||
# 确保bartime存在且格式正确
|
||
if "bartime" not in tickdata.columns:
|
||
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"])
|
||
tickdata["bartime"] = pd.to_datetime(tickdata["bartime"])
|
||
|
||
param = self.param_dict[self.current_symbol] # 获取交易参数
|
||
|
||
# 重采样聚合K线(按指定周期合并)
|
||
bardata = (
|
||
tickdata.resample(on="bartime", rule=param.period, label="right", closed="right")
|
||
.agg({
|
||
"starttime": "first", # 开始时间取第一个
|
||
"symbol": "last", # 合约代码取最后一个
|
||
"open": "first", # 开盘价取第一个
|
||
"high": "max", # 最高价取最大
|
||
"low": "min", # 最低价取最小
|
||
"close": "last", # 收盘价取最后一个
|
||
"vol": "sum", # 成交量求和
|
||
})
|
||
.reset_index(drop=False)
|
||
)
|
||
|
||
bardata = bardata.dropna().reset_index(drop=True) # 删除空值行
|
||
|
||
# 只保留最后一个 bar(避免同一周期产生多行)
|
||
if len(bardata) > 1:
|
||
bardata = bardata.iloc[[-1]]
|
||
|
||
bardata["bartime"] = pd.to_datetime(bardata["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S") # 格式化时间
|
||
|
||
with self._tickdata_lock:
|
||
DataStore.tickdata[symbol] = bardata # 更新存储的K线数据
|
||
|
||
# 使用单tick成交量进行后续处理
|
||
tickdata["vol"] = df["vol"].values
|
||
self.orderflow_df_new(tickdata, bardata, symbol) # 生成订单流数据
|
||
|
||
def merge_price_levels(self, price_dict, merge_count, mini_price):
|
||
"""合并价格档位"""
|
||
if not price_dict or merge_count <= 1 or mini_price <= 0: # 参数无效则返回原数据
|
||
return price_dict
|
||
|
||
merged = {}
|
||
sorted_prices = sorted([float(p) for p in price_dict.keys()]) # 排序价格
|
||
|
||
if not sorted_prices: # 无价格则返回空
|
||
return {}
|
||
|
||
min_price = sorted_prices[0] # 最低价
|
||
merge_interval = merge_count * mini_price # 合并间隔
|
||
|
||
# 按间隔分组合并成交量
|
||
for price_str, vol in price_dict.items():
|
||
price = float(price_str)
|
||
group_idx = int((price - min_price) // merge_interval) # 计算组索引
|
||
merged_price = min_price + group_idx * merge_interval # 计算合并后的价格
|
||
key = str(merged_price)
|
||
merged[key] = merged.get(key, 0) + vol # 累加成交量
|
||
|
||
return merged # 返回合并后的数据
|
||
|
||
def filter_by_min_vol(self, price_dict, min_vol):
|
||
"""过滤最小成交量以下的价格档位"""
|
||
if min_vol <= 0: # 无阈值则返回原数据
|
||
return price_dict
|
||
return {k: v for k, v in price_dict.items() if v >= min_vol} # 过滤低成交量档位
|
||
|
||
def orderflow_df_new(self, df_tick, df_min, symbol):
|
||
"""生成订单流DataFrame"""
|
||
param = self.param_dict[symbol] # 获取交易参数
|
||
min_vol = param.min_volume # 最小成交量
|
||
merge_price = param.merge_price # 价格合并数量
|
||
|
||
# 单tick成交量,用于bid/ask方向判断
|
||
bar_vol = int(df_tick["vol"].values[0])
|
||
# K线周期内成交量之和(resample聚合结果)
|
||
period_vol = int(df_min["vol"].values[0])
|
||
bar_close = float(df_min["close"].values[0]) # K线收盘价
|
||
bar_open = float(df_min["open"].values[0]) # K线开盘价
|
||
bar_low = float(df_min["low"].values[0]) # K线最低价
|
||
bar_high = float(df_min["high"].values[0]) # K线最高价
|
||
dt = df_min["bartime"].values[0] # K线时间
|
||
|
||
bp1 = float(df_tick["bid_p"].values[0]) # 买一价
|
||
ap1 = float(df_tick["ask_p"].values[0]) # 卖一价
|
||
last_price = float(df_tick["lastprice"].values[0]) # 最新价
|
||
volume = bar_vol # 成交量
|
||
bar_symbol = df_tick["symbol"].values[0] # 合约代码
|
||
|
||
bid_dict = {} # 买盘字典
|
||
ask_dict = {} # 卖盘字典
|
||
|
||
# 判断价格位置并归类到买卖盘
|
||
if last_price >= ap1: # 价格>=卖一价,记录到卖盘
|
||
ask_dict[str(last_price)] = ask_dict.get(str(last_price), 0) + volume
|
||
if last_price <= bp1: # 价格<=买一价,记录到买盘
|
||
bid_dict[str(last_price)] = bid_dict.get(str(last_price), 0) + volume
|
||
|
||
bid_result, ask_result = self.process_quote(bid_dict, ask_dict, symbol) # 合并买卖盘数据
|
||
|
||
# 应用价格合并
|
||
if merge_price > 1:
|
||
bid_result = self.merge_price_levels(bid_result, merge_price, param.mini_price)
|
||
ask_result = self.merge_price_levels(ask_result, merge_price, param.mini_price)
|
||
|
||
# 价格升序排序
|
||
bid_result = dict(sorted(bid_result.items(), key=lambda x: x[0]))
|
||
ask_result = dict(sorted(ask_result.items(), key=lambda x: x[0]))
|
||
|
||
price_list = list(bid_result.keys()) # 价格列表
|
||
ask_list = list(ask_result.values()) # 卖盘量列表
|
||
bid_list = list(bid_result.values()) # 买盘量列表
|
||
|
||
delta = sum(ask_list) - sum(bid_list) # Delta = 卖量 - 买量
|
||
|
||
# 构建订单流DataFrame
|
||
df = pd.DataFrame({
|
||
"price": pd.Series([price_list]), # 价格列表
|
||
"Ask": pd.Series([ask_list]), # 卖盘量
|
||
"Bid": pd.Series([bid_list]), # 买盘量
|
||
})
|
||
df["symbol"] = bar_symbol # 合约代码
|
||
df["datetime"] = dt # 时间
|
||
df["delta"] = delta # Delta值
|
||
df["close"] = bar_close # 收盘价
|
||
df["open"] = bar_open # 开盘价
|
||
df["high"] = bar_high # 最高价
|
||
df["low"] = bar_low # 最低价
|
||
df["vol"] = period_vol # 周期成交量
|
||
|
||
# 计算堆积指标
|
||
dj_count, dj_high, dj_low = self.calculate_accumulation(df, min_vol)
|
||
df["dj"] = dj_count # 堆积数量
|
||
df["dj_price_high"] = dj_high # 堆积区间高价
|
||
df["dj_price_low"] = dj_low # 堆积区间低价
|
||
|
||
with self._ofdata_lock:
|
||
DataStore.ofdata[symbol] = df # 更新订单流数据
|
||
|
||
def calculate_accumulation(self, kdata, min_vol=0):
|
||
"""计算堆积指标(Order Flow Delta)
|
||
|
||
Args:
|
||
kdata: 订单流DataFrame
|
||
min_vol: 最小成交量过滤
|
||
|
||
Returns:
|
||
(dj_count, dj_price_high, dj_price_low)
|
||
"""
|
||
param = self.param_dict[self.current_symbol] # 获取交易参数
|
||
imbalance_ratio = int(param.imbalance_ratio) # 失衡比率
|
||
accumulation_threshold = int(param.accumulation_threshold) # 堆积阈值
|
||
|
||
dj_count = 0 # 堆积计数
|
||
dj_price_high = 0 # 堆积价格区间高
|
||
dj_price_low = 0 # 堆积价格区间低
|
||
bearish_zones = [] # 空头堆积区间
|
||
bullish_zones = [] # 多头堆积区间
|
||
|
||
# 遍历每个价格档位
|
||
for item in kdata.itertuples(index=False):
|
||
price_s = item.price # 价格序列
|
||
ask_s = item.Ask # 卖盘序列
|
||
bid_s = item.Bid # 买盘序列
|
||
|
||
bear_count = 0 # 空头连续计数
|
||
bull_count = 0 # 多头连续计数
|
||
bear_start, bear_end = None, None # 空头区间起止
|
||
bull_start, bull_end = None, None # 多头区间起止
|
||
|
||
price_len = len(price_s) # 价格档位数量
|
||
|
||
for i in range(price_len):
|
||
# 获取各档位成交量(低于最小值的视为0)
|
||
bid_val = bid_s[i] if i < len(bid_s) and bid_s[i] >= min_vol else 0
|
||
ask_val = ask_s[i] if i < len(ask_s) and ask_s[i] >= min_vol else 0
|
||
next_ask = ask_s[i + 1] if i + 1 < len(ask_s) and ask_s[i + 1] >= min_vol else 0
|
||
prev_bid = bid_s[i - 1] if i > 0 and bid_s[i - 1] >= min_vol else 0
|
||
|
||
# 空头堆积: 买盘 > 卖盘 * 失衡比率
|
||
if i < price_len - 1:
|
||
if bid_val > next_ask * imbalance_ratio: # 判断空头堆积
|
||
if bear_start is None:
|
||
bear_start = price_s[i] # 记录空头区间起点
|
||
bear_end = price_s[i] # 更新空头区间终点
|
||
bear_count += 1 # 连续计数+1
|
||
if bear_count >= accumulation_threshold and float(price_s[i]) > 0: # 达到阈值
|
||
dj_count -= 1 # 堆积计数-1
|
||
else: # 堆积中断
|
||
if bear_count >= accumulation_threshold and bear_start is not None:
|
||
bearish_zones.append((bear_start, bear_end)) # 记录空头区间
|
||
bear_count = 0 # 重置计数
|
||
bear_start, bear_end = None, None # 重置区间
|
||
|
||
# 多头堆积: 卖盘 > 买盘 * 失衡比率
|
||
if i >= 1 and i < price_len - 1:
|
||
if ask_val > prev_bid * imbalance_ratio: # 判断多头堆积
|
||
if bull_start is None:
|
||
bull_start = price_s[i] # 记录多头区间起点
|
||
bull_end = price_s[i] # 更新多头区间终点
|
||
bull_count += 1 # 连续计数+1
|
||
if bull_count >= accumulation_threshold and float(price_s[i]) > 0: # 达到阈值
|
||
dj_count += 1 # 堆积计数+1
|
||
else: # 堆积中断
|
||
if bull_count >= accumulation_threshold and bull_start is not None:
|
||
bullish_zones.append((bull_start, bull_end)) # 记录多头区间
|
||
bull_count = 0 # 重置计数
|
||
bull_start, bull_end = None, None # 重置区间
|
||
|
||
# 处理最后堆积(未中断的区间)
|
||
if bear_count >= accumulation_threshold and bear_start is not None:
|
||
bearish_zones.append((bear_start, bear_end))
|
||
if bull_count >= accumulation_threshold and bull_start is not None:
|
||
bullish_zones.append((bull_start, bull_end))
|
||
|
||
# 计算价格范围
|
||
all_zones = bearish_zones + bullish_zones
|
||
if all_zones:
|
||
all_prices = [float(p) for zone in all_zones for p in zone]
|
||
dj_price_high = max(all_prices) # 取最高价
|
||
dj_price_low = min(all_prices) # 取最低价
|
||
|
||
return dj_count, dj_price_high, dj_price_low # 返回堆积结果
|
||
|
||
# ============ 数据持久化 ============
|
||
|
||
def read_from_csv(self, symbol):
|
||
"""从CSV读取历史交易数据"""
|
||
param = self.param_dict[symbol] # 获取交易参数
|
||
data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录
|
||
os.makedirs(data_dir, exist_ok=True) # 确保目录存在
|
||
file_path = f"{data_dir}/{symbol}traderdata.csv" # 数据文件路径
|
||
|
||
if os.path.exists(file_path): # 文件存在则读取
|
||
df = pd.read_csv(file_path)
|
||
if not df.empty and param.load_historical_data: # 有数据且需加载
|
||
row = df.iloc[-1] # 取最后一行
|
||
param.position = int(row["pos"]) # 恢复持仓
|
||
param.sl_long_price = float(row["sl_long_price"]) # 恢复多头止损价
|
||
param.sl_short_price = float(row["sl_short_price"]) # 恢复空头止损价
|
||
print(f"加载历史数据: {df.iloc[-1]}")
|
||
param.load_historical_data = False # 标记已加载
|
||
|
||
def save_to_csv(self, symbol):
|
||
"""保存交易数据到CSV"""
|
||
# 确保 symbol 是字符串类型
|
||
if isinstance(symbol, bytes):
|
||
symbol = symbol.decode()
|
||
symbol = str(symbol)
|
||
|
||
# 从 param_dict 获取参数(需要确保是字符串 key)
|
||
if symbol in self.param_dict:
|
||
param = self.param_dict[symbol]
|
||
else:
|
||
# 尝试找到匹配的 key
|
||
for key in self.param_dict:
|
||
if str(key) == symbol:
|
||
param = self.param_dict[key]
|
||
break
|
||
else:
|
||
print(f"警告: 未找到 symbol {symbol} 的参数配置")
|
||
return
|
||
symbol = str(symbol)
|
||
data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录
|
||
os.makedirs(data_dir, exist_ok=True) # 确保目录存在
|
||
|
||
# 构建保存数据
|
||
data = {
|
||
"datetime": [DataStore.trade_dfs[symbol]["datetime"].iloc[-1]], # 最后时间
|
||
"pos": [param.position], # 持仓
|
||
"sl_long_price": [param.sl_long_price], # 多头止损价
|
||
"sl_short_price": [param.sl_short_price], # 空头止损价
|
||
}
|
||
|
||
df = pd.DataFrame(data)
|
||
file_path = f"{data_dir}/{symbol}traderdata.csv" # 文件路径
|
||
|
||
if os.path.exists(file_path): # 文件存在则追加
|
||
csv_df = pd.read_csv(file_path)
|
||
if df["pos"].iloc[-1] != csv_df["pos"].iloc[-1]: # 持仓变化则保存
|
||
df.to_csv(file_path, mode="a", header=False, index=False)
|
||
else: # 文件不存在则新建
|
||
df.to_csv(file_path, index=False)
|
||
|
||
self.save_stops_to_json(symbol, DataStore.trade_dfs[symbol]) # 同时保存止盈止损JSON
|
||
|
||
def save_stops_to_json(self, symbol, trade_df):
|
||
"""保存止盈止损数据到JSON"""
|
||
# 确保 symbol 是字符串类型
|
||
if isinstance(symbol, bytes):
|
||
symbol = symbol.decode()
|
||
symbol = str(symbol)
|
||
|
||
# 获取参数
|
||
if symbol in self.param_dict:
|
||
param = self.param_dict[symbol]
|
||
else:
|
||
for key in self.param_dict:
|
||
if str(key) == symbol:
|
||
param = self.param_dict[key]
|
||
break
|
||
else:
|
||
print(f"警告: 未找到 symbol {symbol} 的参数配置")
|
||
return
|
||
|
||
data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录
|
||
os.makedirs(data_dir, exist_ok=True) # 确保目录存在
|
||
file_path = f"{data_dir}/{symbol}_stops.json" # JSON文件路径
|
||
|
||
param = self.param_dict[symbol]
|
||
|
||
# 获取堆积价格区间
|
||
if trade_df is not None and len(trade_df) > 0:
|
||
dj_low = float(trade_df['dj_price_low'].iloc[-1]) if 'dj_price_low' in trade_df.columns else 0
|
||
dj_high = float(trade_df['dj_price_high'].iloc[-1]) if 'dj_price_high' in trade_df.columns else 0
|
||
else:
|
||
dj_low, dj_high = 0, 0
|
||
|
||
# 构建止盈止损数据结构
|
||
stops_data = {
|
||
"long": { # 多头止盈止损
|
||
"position": param.position if param.position > 0 else 0, # 持仓
|
||
"entry_price": dj_high if param.position > 0 else 0, # 入场价
|
||
"stop_loss": dj_low if param.position > 0 else 0 # 止损价
|
||
},
|
||
"short": { # 空头止盈止损
|
||
"position": abs(param.position) if param.position < 0 else 0, # 持仓
|
||
"entry_price": dj_high if param.position < 0 else 0, # 入场价
|
||
"stop_loss": dj_low if param.position < 0 else 0 # 止损价
|
||
}
|
||
}
|
||
|
||
with open(file_path, 'w', encoding='utf-8') as f: # 写入JSON文件
|
||
json.dump(stops_data, f, indent=4, ensure_ascii=False)
|
||
|
||
def load_stops_from_json(self, symbol):
|
||
"""从JSON加载止盈止损数据"""
|
||
# 确保 symbol 是字符串类型
|
||
if isinstance(symbol, bytes):
|
||
symbol = symbol.decode()
|
||
symbol = str(symbol)
|
||
|
||
current_time = time_module.time() # 当前时间戳
|
||
interval = SYSTEM_CONFIG["stops_load_interval"] # 加载间隔
|
||
|
||
# 处理 last_stops_loatime 的 key 类型
|
||
last_stops_time = self.last_stops_loatime
|
||
normalized_key = symbol
|
||
|
||
# 检查是否在加载间隔内(避免频繁读取磁盘)
|
||
if symbol in last_stops_time or normalized_key in [str(k) for k in last_stops_time]:
|
||
# 找到匹配的 key
|
||
for k in last_stops_time:
|
||
if str(k) == symbol:
|
||
if current_time - last_stops_time.get(k, 0) <= interval:
|
||
return False # 在间隔内,跳过
|
||
break
|
||
|
||
file_path = f"{SYSTEM_CONFIG['data_dir']}/{symbol}_stops.json" # JSON文件路径
|
||
|
||
if os.path.exists(file_path): # 文件存在则读取
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
stops_data = json.load(f)
|
||
|
||
# 获取参数
|
||
if symbol in self.param_dict:
|
||
param = self.param_dict[symbol]
|
||
else:
|
||
for key in self.param_dict:
|
||
if str(key) == symbol:
|
||
param = self.param_dict[key]
|
||
break
|
||
else:
|
||
print(f"警告: 未找到 symbol {symbol} 的参数配置")
|
||
return False
|
||
|
||
# 恢复多头止盈止损
|
||
if stops_data['long']['position'] > 0:
|
||
param.position = stops_data['long']['position']
|
||
param.sl_long_price = stops_data['long']['entry_price']
|
||
# 恢复空头止盈止损
|
||
elif stops_data['short']['position'] > 0:
|
||
param.position = -stops_data['short']['position']
|
||
param.sl_short_price = stops_data['short']['entry_price']
|
||
|
||
self.last_stops_loatime[symbol] = current_time # 记录加载时间
|
||
print(f"加载止盈止损信息: {symbol}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"加载止盈止损信息失败: {e}")
|
||
|
||
return False
|
||
|
||
def daily_reset(self, symbol):
|
||
"""每日收盘数据重置"""
|
||
param = self.param_dict[symbol] # 获取交易参数
|
||
sec = "".join(re.findall("[a-zA-Z]", str(symbol))) # 提取合约字母部分(如ru)
|
||
current_time = datetime.now().time() # 当前时间
|
||
|
||
clearing_time1_start = datetime_time(15, 5) # 日盘清算开始(15:05)
|
||
clearing_time1_end = datetime_time(15, 10) # 日盘清算结束(15:10)
|
||
|
||
param.clearing_executed = False # 重置清算标志
|
||
|
||
# 判断是否在日盘清算时间段
|
||
if clearing_time1_start <= current_time <= clearing_time1_end:
|
||
param.clearing_executed = True # 标记已执行清算
|
||
DataStore.trade_dfs[symbol] = DataStore.trade_dfs[symbol].iloc[0:0] # 清空数据
|
||
# 判断是否在夜盘清算时间段
|
||
elif sec in NIGHT_CLEARING_TIME:
|
||
hour, minute = NIGHT_CLEARING_TIME[sec] # 获取夜盘清算时间
|
||
clearing_time2_start = datetime_time(hour, minute)
|
||
clearing_time2_end = datetime_time(hour, minute + 15) # 15分钟后结束
|
||
if clearing_time2_start <= current_time <= clearing_time2_end:
|
||
param.clearing_executed = True
|
||
DataStore.trade_dfs[symbol] = DataStore.trade_dfs[symbol].iloc[0:0]
|
||
|
||
return param.clearing_executed # 返回是否执行了清算
|
||
|
||
# ============ CTP回调 ============
|
||
|
||
def OnRtnTrade(self, pTrade):
|
||
"""成交回报回调"""
|
||
print("||成交回报||", pTrade)
|
||
|
||
def OnRspOrderInsert(self, pInputOrder, pRspInfo, nRequestID, bIsLast):
|
||
"""下单响应回调"""
|
||
print("||OnRspOrderInsert||", pInputOrder, pRspInfo, nRequestID, bIsLast)
|
||
|
||
def OnRtnOrder(self, pOrder):
|
||
"""订单回报回调"""
|
||
print("||订单回报||", pOrder)
|
||
|
||
# ============ 交易辅助 ============
|
||
|
||
def place_order(self, exchange_id, instrument_id, price, lots, direction, offset):
|
||
"""统一下单方法"""
|
||
self.insert_order(exchange_id, instrument_id, price, lots, direction, offset)
|
||
|
||
def place_close_orders(self, exchange_id, instrument_id, price, lots, direction):
|
||
"""平仓(下单两次:平昨+平今)"""
|
||
self.place_order(exchange_id, instrument_id, price, lots, direction, b"1") # 平昨
|
||
self.place_order(exchange_id, instrument_id, price, lots, direction, b"3") # 平今
|
||
|
||
# ============ 交易信号计算 ============
|
||
|
||
def check_stop_loss(self, trade_df, param, data):
|
||
"""检查并执行止损"""
|
||
# 多头止损条件:有多头持仓 且 收盘价 < 多头止损价
|
||
if param.position > 0 and param.sl_long_price > 0 and \
|
||
trade_df["close"].iloc[-1] < param.sl_long_price:
|
||
price = data["BidPrice1"] - param.price_offset # 计算止损价
|
||
print(f"多头止损: close={trade_df['close'].iloc[-1]}, SL={param.sl_long_price}")
|
||
self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"1") # 执行平仓
|
||
param.sl_long_price = 0 # 重置止损价
|
||
param.position = 0 # 清空持仓
|
||
self.save_to_csv(data["InstrumentID"]) # 保存数据
|
||
# 发送邮件通知
|
||
text = f"SL_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
|
||
f"Price:{price}, SL:{param.sl_long_price}, Lots:{param.lots}, " \
|
||
f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \
|
||
f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}"
|
||
self.mail.send(text)
|
||
return True
|
||
|
||
# 空头止损条件:有空头持仓 且 收盘价 > 空头止损价
|
||
if param.position < 0 and param.sl_short_price > 0 and \
|
||
trade_df["close"].iloc[-1] > param.sl_short_price:
|
||
price = data["AskPrice1"] + param.price_offset
|
||
print(f"空头止损: close={trade_df['close'].iloc[-1]}, SL={param.sl_short_price}")
|
||
self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"0") # 执行平仓
|
||
param.sl_short_price = 0
|
||
param.position = 0
|
||
self.save_to_csv(data["InstrumentID"])
|
||
text = f"SL_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
|
||
f"Price:{price}, SL:{param.sl_short_price}, Lots:{param.lots}, " \
|
||
f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \
|
||
f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}"
|
||
self.mail.send(text)
|
||
return True
|
||
|
||
return False # 无止损触发
|
||
|
||
def calculate_delta_cumulative(self, trade_df, param):
|
||
"""计算Delta累计"""
|
||
if trade_df["delta"].dtype == object: # 类型为对象则转换
|
||
trade_df["delta"] = trade_df["delta"].astype(float)
|
||
|
||
trade_df["datetime"] = pd.to_datetime(trade_df["datetime"]) # 转换时间格式
|
||
|
||
# 交易日判断(根据时间确定所属交易日)
|
||
last_dt = pd.to_datetime(trade_df["datetime"].iloc[-1])
|
||
hour = last_dt.hour
|
||
if hour >= 21: # 夜盘21点后属于下一个交易日
|
||
trading_day = (last_dt + pd.Timedelta(days=1)).date()
|
||
elif hour < 15: # 15点前属于当日
|
||
trading_day = last_dt.date()
|
||
else: # 15点后也属于当日
|
||
trading_day = last_dt.date()
|
||
|
||
trade_df["trading_day"] = str(trading_day) # 添加交易日列
|
||
|
||
# 按交易日分组累计Delta
|
||
trade_df["delta累计"] = trade_df.groupby("trading_day")["delta"].cumsum()
|
||
|
||
trade_df["datetime"] = trade_df["datetime"].dt.strftime("%Y-%m-%d %H:%M:%S") # 还原时间格式
|
||
|
||
def calculate_signals(self, trade_df, param):
|
||
"""计算交易信号"""
|
||
if len(trade_df) < 2: # 数据不足则返回
|
||
return None, None, None, None
|
||
|
||
dj_last = trade_df["dj"].iloc[-1] # 最新堆积值
|
||
delta_last = trade_df["delta"].iloc[-1] # 最新Delta值
|
||
|
||
# 开多条件:堆积值>=阈值 且 Delta>=阈值
|
||
bull_signal = dj_last >= param.accumulation_threshold and \
|
||
delta_last >= param.delta_threshold
|
||
|
||
# 开空条件:堆积值<=-阈值 且 Delta<=-阈值
|
||
bear_signal = dj_last <= -param.accumulation_threshold and \
|
||
delta_last <= -param.delta_threshold
|
||
|
||
print(f"开多:{bull_signal}, 开空:{bear_signal}, 持仓:{param.position}")
|
||
print(f"dj:{dj_last}, delta:{delta_last}")
|
||
|
||
return bull_signal, bear_signal, dj_last, delta_last # 返回信号
|
||
|
||
def execute_open_long(self, trade_df, param, data):
|
||
"""执行开多"""
|
||
param.pending_long_price = trade_df['dj_price_high'].iloc[-1] # 记录待买入价格(堆积区间高价)
|
||
print(f"开多信号触发,记录待买入价格: {param.pending_long_price}")
|
||
|
||
def execute_open_short(self, trade_df, param, data):
|
||
"""执行开空"""
|
||
param.pending_short_price = trade_df['dj_price_low'].iloc[-1] # 记录待卖出价格(堆积区间低价)
|
||
print(f"开空信号触发,记录待卖出价格: {param.pending_short_price}")
|
||
|
||
def check_pending_orders(self, trade_df, param, data):
|
||
"""检查并执行待成交订单"""
|
||
# 检查待开多订单(价格回落到堆积区间高价时买入)
|
||
if param.position == 0 and param.pending_long_price > 0:
|
||
current_price = data["AskPrice1"] # 当前卖一价
|
||
if current_price <= param.pending_long_price: # 价格触达
|
||
price = current_price + param.price_offset # 计算下单价
|
||
print(f"执行开多: 价格={price}")
|
||
self.place_order(data["ExchangeID"], data["InstrumentID"],
|
||
price, param.lots, b"0", b"0") # 开多
|
||
param.position = 1 # 更新持仓
|
||
param.sl_long_price = trade_df['dj_price_low'].iloc[-1] # 设置止损价
|
||
param.pending_long_price = 0 # 清空待执行价格
|
||
self.save_to_csv(data["InstrumentID"])
|
||
text = f"O_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
|
||
f"Price:{price}, Lots:{param.lots}, " \
|
||
f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \
|
||
f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}"
|
||
self.mail.send(text)
|
||
|
||
# 检查待开空订单(价格反弹到堆积区间低价时卖出)
|
||
if param.position == 0 and param.pending_short_price > 0:
|
||
current_price = data["BidPrice1"] # 当前买一价
|
||
if current_price >= param.pending_short_price: # 价格触达
|
||
price = current_price - param.price_offset
|
||
print(f"执行开空: 价格={price}")
|
||
self.place_order(data["ExchangeID"], data["InstrumentID"],
|
||
price, param.lots, b"1", b"0") # 开空
|
||
param.position = -1
|
||
param.sl_short_price = trade_df['dj_price_high'].iloc[-1] # 设置止损价
|
||
param.pending_short_price = 0
|
||
self.save_to_csv(data["InstrumentID"])
|
||
text = f"O_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
|
||
f"Price:{price}, Lots:{param.lots}, " \
|
||
f"dj区:{trade_df['dj_price_high'].iloc[-1]}-{trade_df['dj_price_low'].iloc[-1]}, " \
|
||
f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}"
|
||
self.mail.send(text)
|
||
|
||
def execute_close_long(self, trade_df, param, data):
|
||
"""平多"""
|
||
price = data["BidPrice1"] - param.price_offset # 计算平多价格
|
||
print(f"平多: BidPrice1={price}")
|
||
self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"1") # 平多
|
||
param.position = 0 # 清空持仓
|
||
param.sl_long_price = 0 # 重置止损价
|
||
param.pending_long_price = 0 # 重置待执行价格
|
||
self.save_to_csv(data["InstrumentID"])
|
||
text = f"C_L_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
|
||
f"Price:{price}, Lots:{param.lots}, " \
|
||
f"dj:{trade_df['dj'].iloc[-1]}, delta:{trade_df['delta累计'].iloc[-1]}"
|
||
self.mail.send(text)
|
||
|
||
def execute_close_short(self, trade_df, param, data):
|
||
"""平空"""
|
||
price = data["AskPrice1"] + param.price_offset
|
||
print(f"平空: AskPrice1={price}")
|
||
self.place_close_orders(data["ExchangeID"], data["InstrumentID"], price, param.lots, b"0") # 平空
|
||
param.position = 0
|
||
param.sl_short_price = 0
|
||
param.pending_short_price = 0
|
||
self.save_to_csv(data["InstrumentID"])
|
||
text = f"C_S_T: ID:{data['InstrumentID']}, datetime:{trade_df['datetime'].iloc[-1]}, " \
|
||
f"Price:{price}, Lots:{param.lots}, dj:{trade_df['dj'].iloc[-1]}"
|
||
self.mail.send(text)
|
||
|
||
def save_ofdata_json(self, symbol):
|
||
"""保存订单流数据到JSON(新K线生成时调用)"""
|
||
data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录
|
||
os.makedirs(data_dir, exist_ok=True) # 确保目录存在
|
||
json_path = f"{data_dir}/{symbol}_ofdata.json" # JSON文件路径
|
||
|
||
trade_df = DataStore.trade_dfs.get(symbol) # 获取交易数据
|
||
if trade_df is None or trade_df.empty:
|
||
return # 无数据则返回
|
||
|
||
save_df = trade_df.copy()
|
||
if 'datetime' in save_df.columns:
|
||
save_df['datetime'] = save_df['datetime'].astype(str) # 转换时间格式
|
||
# 去重:保留每个 datetime 的最后一条记录
|
||
save_df = save_df.drop_duplicates(subset=['datetime'], keep='last')
|
||
|
||
try:
|
||
save_df.to_json(json_path, orient='records', force_ascii=False, lines=True) # 写入JSON
|
||
except Exception as e:
|
||
print(f"JSON保存失败: {e}")
|
||
|
||
def save_json_periodically(self, instrument_id, trade_df):
|
||
"""定期保存订单流数据到JSON(已废弃,改为新K线触发保存)"""
|
||
# 此方法已废弃,保留用于兼容
|
||
pass
|
||
|
||
def cal_sig(self, symbol_queue):
|
||
"""交易信号计算线程主循环"""
|
||
while True:
|
||
try:
|
||
data = symbol_queue.get(block=True, timeout=SYSTEM_CONFIG["queue_timeout"]) # 从队列获取数据
|
||
instrument_id = data["InstrumentID"].decode() # 解码合约代码
|
||
|
||
# 检查队列阻塞
|
||
size = symbol_queue.qsize()
|
||
if size > SYSTEM_CONFIG.get("queue_warning_size", 5):
|
||
print(f"{instrument_id}队列长度={size},有点阻塞!")
|
||
|
||
# 读取历史数据
|
||
self.read_from_csv(instrument_id)
|
||
self.load_stops_from_json(instrument_id)
|
||
self.daily_reset(instrument_id)
|
||
|
||
param = self.param_dict[instrument_id] # 获取参数
|
||
self.current_symbol = instrument_id # 设置当前合约
|
||
|
||
# 处理Tick数据
|
||
self.tickcome(data)
|
||
trade_df = DataStore.trade_dfs[instrument_id] # 获取交易数据
|
||
|
||
# 新K线开始时
|
||
if len(trade_df) > param.processed_rows:
|
||
# 止损检查
|
||
if self.check_stop_loss(trade_df, param, data):
|
||
param.processed_rows = len(trade_df)
|
||
continue
|
||
|
||
# 计算Delta累计(必须在保存之前)
|
||
self.calculate_delta_cumulative(trade_df, param)
|
||
|
||
# 保存 ofdata.json(计算 delta累计 之后)
|
||
self.save_ofdata_json(instrument_id)
|
||
|
||
# 计算交易信号
|
||
bull_signal, bear_signal, dj_last, delta_last = self.calculate_signals(trade_df, param)
|
||
|
||
if bull_signal or bear_signal:
|
||
# 平仓信号(与持仓相反)
|
||
if param.position < 0 and (bear_signal or bull_signal):
|
||
self.execute_close_short(trade_df, param, data)
|
||
elif param.position > 0 and (bull_signal or bear_signal):
|
||
self.execute_close_long(trade_df, param, data)
|
||
|
||
# 开仓信号(无持仓)
|
||
if bull_signal and param.position == 0:
|
||
self.execute_open_long(trade_df, param, data)
|
||
if bear_signal and param.position == 0:
|
||
self.execute_open_short(trade_df, param, data)
|
||
|
||
# 检查待成交订单
|
||
self.check_pending_orders(trade_df, param, data)
|
||
|
||
param.processed_rows = len(trade_df)
|
||
|
||
except queue.Empty: # 队列为空则继续循环
|
||
pass
|
||
|
||
def distribute_tick(self):
|
||
"""行情分发线程"""
|
||
while True:
|
||
if self.status == 0: # 连接正常
|
||
try:
|
||
data = self.md_queue.get(block=True, timeout=0.1) # 获取行情数据
|
||
instrument_id = data["InstrumentID"].decode()
|
||
|
||
try:
|
||
self.queue_dict[instrument_id].put(data, block=True, timeout=1.0) # 分发到各合约队列
|
||
except queue.Full:
|
||
print(f"{instrument_id}合约信号计算阻塞导致队列已满")
|
||
except queue.Empty:
|
||
pass
|
||
else: # 连接断开则等待
|
||
time_module.sleep(0.1)
|
||
|
||
def start(self, param_dict):
|
||
"""启动交易引擎"""
|
||
self.param_dict = param_dict # 保存参数
|
||
|
||
now = datetime.now() # 当前时间
|
||
current_hour = now.hour
|
||
if current_hour >= 21: # 21点后属于下一日
|
||
today_date = (now + pd.Timedelta(days=1)).date()
|
||
elif current_hour < 15: # 15点前属于当日
|
||
today_date = now.date()
|
||
else: # 15点后属于当日
|
||
today_date = now.date()
|
||
|
||
data_dir = SYSTEM_CONFIG["data_dir"] # 数据目录
|
||
for symbol in param_dict.keys():
|
||
# 加载历史订单流数据
|
||
json_path = f"{data_dir}/{symbol}_ofdata.json"
|
||
if os.path.exists(json_path):
|
||
try:
|
||
DataStore.trade_dfs[symbol] = pd.read_json(json_path, lines=True)
|
||
if not DataStore.trade_dfs[symbol].empty:
|
||
print(f"加载历史订单流: {json_path}, 共{len(DataStore.trade_dfs[symbol])}条")
|
||
except Exception as e:
|
||
print(f"加载失败: {e}")
|
||
DataStore.trade_dfs[symbol] = pd.DataFrame({})
|
||
else:
|
||
DataStore.trade_dfs[symbol] = pd.DataFrame({})
|
||
|
||
# 创建品种队列
|
||
self.queue_dict[symbol] = queue.Queue(SYSTEM_CONFIG["queue_max_size"])
|
||
|
||
# 启动信号计算线程
|
||
sig_thread = threading.Thread(
|
||
target=self.cal_sig,
|
||
args=(self.queue_dict[symbol],),
|
||
daemon=True, # 守护线程
|
||
name=f"CalSig_{symbol}"
|
||
)
|
||
sig_thread.start()
|
||
|
||
# 启动行情分发线程
|
||
distribute_thread = threading.Thread(target=self.distribute_tick, daemon=True)
|
||
distribute_thread.start()
|
||
|
||
try:
|
||
distribute_thread.join() # 等待分发线程结束
|
||
except KeyboardInterrupt: # 捕获中断信号
|
||
print("接收到中断信号,正在关闭...")
|
||
raise
|
||
|
||
|
||
# ============ 主程序入口 ============
|
||
|
||
def run_trader(param_dict, broker_id, td_server, investor_id, password,
|
||
app_id, auth_code, md_queue=None, page_dir=""):
|
||
"""启动交易进程"""
|
||
trader = OrderFlowTrader( # 创建交易引擎实例
|
||
broker_id, td_server, investor_id, password,
|
||
app_id, auth_code, md_queue, page_dir
|
||
)
|
||
trader.start(param_dict) # 启动交易引擎
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 创建品种参数(自动从配置读取所有合约)
|
||
param_dict = {}
|
||
for symbol, config in TRADING_PARAMS.items():
|
||
param_dict[symbol] = TradingParam.from_config(symbol, config)
|
||
|
||
# 连接SimNow模拟交易
|
||
future_account = get_simulate_account(
|
||
investor_id=SIMNOW_CONFIG["investor_id"],
|
||
password=SIMNOW_CONFIG["password"],
|
||
server_name=SIMNOW_CONFIG["server_name"],
|
||
subscribe_list=list(param_dict.keys()),
|
||
)
|
||
|
||
# 连接实盘(备用)
|
||
# from CtpPlus.CTP.FutureAccount import FutureAccount
|
||
# future_account = FutureAccount(...)
|
||
|
||
print("开始", len(future_account.subscribe_list))
|
||
|
||
# 创建共享队列
|
||
share_queue = Queue(maxsize=SYSTEM_CONFIG["queue_share_size"])
|
||
|
||
# 启动行情进程
|
||
md_process = Process(target=run_tick_engine, args=(future_account, [share_queue]))
|
||
|
||
# 启动交易进程
|
||
trader_process = Process(
|
||
target=run_trader,
|
||
args=(
|
||
param_dict,
|
||
future_account.broker_id,
|
||
future_account.server_dict["TDServer"],
|
||
future_account.investor_id,
|
||
future_account.password,
|
||
future_account.app_id,
|
||
future_account.auth_code,
|
||
share_queue,
|
||
future_account.td_flow_path,
|
||
),
|
||
)
|
||
|
||
md_process.start() # 启动行情进程
|
||
trader_process.start() # 启动交易进程
|
||
|
||
md_process.join() # 等待行情进程结束
|
||
trader_process.join() # 等待交易进程结束 |