836 lines
33 KiB
Python
836 lines
33 KiB
Python
"""
|
||
订单流回测模块
|
||
支持两种回测模式:
|
||
1. Mode ofdata:读取 ofdata.json 历史数据重放
|
||
2. Mode tick: 读取原始 tick CSV 重放完整流程
|
||
"""
|
||
import os
|
||
import json
|
||
import csv
|
||
import pandas as pd
|
||
import numpy as np
|
||
from datetime import datetime, timedelta
|
||
from dataclasses import dataclass, field
|
||
from typing import Dict, List, Optional, Tuple
|
||
|
||
from config import SYSTEM_CONFIG, TRADING_PARAMS
|
||
|
||
|
||
# ============ 隔离数据存储 ============
|
||
|
||
class BacktestDataStore:
|
||
"""回测数据存储(隔离实例,避免与实盘共享状态)"""
|
||
tickdata = {}
|
||
quote = {}
|
||
ofdata = {}
|
||
trade_dfs = {}
|
||
prev_volume = {}
|
||
last_tick = {}
|
||
|
||
|
||
# ============ 交易记录数据结构 ============
|
||
|
||
@dataclass
|
||
class TradeRecord:
|
||
"""交易记录"""
|
||
trade_id: int
|
||
entry_datetime: str
|
||
entry_price: float
|
||
direction: str # "long" / "short"
|
||
exit_datetime: Optional[str] = None
|
||
exit_price: Optional[float] = None
|
||
lots: int = 1
|
||
pnl: float = 0.0
|
||
pnl_pct: float = 0.0
|
||
status: str = "open" # "open" / "closed"
|
||
|
||
|
||
# ============ 权益跟踪器 ============
|
||
|
||
class EquityTracker:
|
||
"""跟踪权益曲线和回撤"""
|
||
|
||
def __init__(self, initial_equity: float = 100000.0):
|
||
self.initial_equity = initial_equity
|
||
self.current_equity = initial_equity
|
||
self.peak_equity = initial_equity
|
||
self.max_drawdown = 0.0
|
||
self.max_drawdown_pct = 0.0
|
||
|
||
# 权益曲线记录
|
||
self.equity_curve = [] # [(datetime, equity, drawdown, drawdown_pct), ...]
|
||
self.trades = [] # TradeRecord list
|
||
self.trade_id_counter = 0
|
||
|
||
def update_equity(self, datetime_str: str, price: float, position: int):
|
||
"""更新权益曲线(每个 bar 更新一次)"""
|
||
if position != 0:
|
||
# 有持仓时不计入权益(按结算价计算 unrealized P&L)
|
||
return
|
||
|
||
# 无持仓时权益不变
|
||
drawdown = self.current_equity - self.peak_equity
|
||
drawdown_pct = (drawdown / self.peak_equity * 100) if self.peak_equity > 0 else 0
|
||
|
||
self.equity_curve.append({
|
||
"datetime": datetime_str,
|
||
"equity": self.current_equity,
|
||
"drawdown": drawdown,
|
||
"drawdown_pct": round(drawdown_pct, 2)
|
||
})
|
||
|
||
def record_trade(self, trade: TradeRecord):
|
||
"""记录交易"""
|
||
self.trades.append(trade)
|
||
|
||
def close_trade(self, trade_id: int, exit_datetime: str, exit_price: float,
|
||
multiplier: float, lots: int):
|
||
"""平仓并计算 P&L"""
|
||
for trade in self.trades:
|
||
if trade.trade_id == trade_id and trade.status == "open":
|
||
trade.exit_datetime = exit_datetime
|
||
trade.exit_price = exit_price
|
||
trade.status = "closed"
|
||
|
||
if trade.direction == "long":
|
||
trade.pnl = (exit_price - trade.entry_price) * multiplier * lots
|
||
else: # short
|
||
trade.pnl = (trade.entry_price - exit_price) * multiplier * lots
|
||
|
||
trade.pnl_pct = (trade.pnl / self.initial_equity * 100)
|
||
|
||
# 更新权益
|
||
self.current_equity += trade.pnl
|
||
if self.current_equity > self.peak_equity:
|
||
self.peak_equity = self.current_equity
|
||
|
||
return trade
|
||
return None
|
||
|
||
def get_metrics(self) -> dict:
|
||
"""计算性能指标"""
|
||
closed_trades = [t for t in self.trades if t.status == "closed"]
|
||
winning_trades = [t for t in closed_trades if t.pnl > 0]
|
||
losing_trades = [t for t in closed_trades if t.pnl <= 0]
|
||
|
||
total_return = self.current_equity - self.initial_equity
|
||
total_return_pct = (total_return / self.initial_equity * 100)
|
||
|
||
# 计算最大回撤
|
||
equity_df = pd.DataFrame(self.equity_curve)
|
||
if not equity_df.empty and "equity" in equity_df.columns:
|
||
rolling_max = equity_df["equity"].cummax()
|
||
drawdowns = equity_df["equity"] - rolling_max
|
||
self.max_drawdown = drawdowns.min()
|
||
self.max_drawdown_pct = (drawdowns.min() / rolling_max.max() * 100) if rolling_max.max() > 0 else 0
|
||
|
||
avg_win = np.mean([t.pnl for t in winning_trades]) if winning_trades else 0
|
||
avg_loss = np.mean([t.pnl for t in losing_trades]) if losing_trades else 0
|
||
total_win = sum([t.pnl for t in winning_trades])
|
||
total_loss = abs(sum([t.pnl for t in losing_trades]))
|
||
|
||
return {
|
||
"total_return": round(total_return, 2),
|
||
"total_return_pct": round(total_return_pct, 2),
|
||
"max_drawdown": round(self.max_drawdown, 2),
|
||
"max_drawdown_pct": round(self.max_drawdown_pct, 2),
|
||
"win_rate": round(len(winning_trades) / len(closed_trades), 4) if closed_trades else 0,
|
||
"total_trades": len(closed_trades),
|
||
"winning_trades": len(winning_trades),
|
||
"losing_trades": len(losing_trades),
|
||
"avg_win": round(avg_win, 2),
|
||
"avg_loss": round(avg_loss, 2),
|
||
"profit_factor": round(total_win / total_loss, 2) if total_loss > 0 else 0,
|
||
"final_equity": round(self.current_equity, 2),
|
||
}
|
||
|
||
def save_results(self, symbol: str):
|
||
"""保存回测结果到文件"""
|
||
results_dir = SYSTEM_CONFIG["backtest_results_dir"]
|
||
os.makedirs(results_dir, exist_ok=True)
|
||
|
||
prefix = f"{results_dir}/{symbol}"
|
||
|
||
# 保存权益曲线 CSV
|
||
if self.equity_curve:
|
||
equity_df = pd.DataFrame(self.equity_curve)
|
||
equity_df.to_csv(f"{prefix}_equity.csv", index=False)
|
||
|
||
# 保存交易记录 CSV
|
||
if self.trades:
|
||
trades_data = []
|
||
for t in self.trades:
|
||
trades_data.append({
|
||
"trade_id": t.trade_id,
|
||
"entry_datetime": t.entry_datetime,
|
||
"entry_price": t.entry_price,
|
||
"direction": t.direction,
|
||
"exit_datetime": t.exit_datetime or "",
|
||
"exit_price": t.exit_price or "",
|
||
"lots": t.lots,
|
||
"pnl": round(t.pnl, 2),
|
||
"pnl_pct": round(t.pnl_pct, 2),
|
||
"status": t.status
|
||
})
|
||
trades_df = pd.DataFrame(trades_data)
|
||
trades_df.to_csv(f"{prefix}_trades.csv", index=False)
|
||
|
||
# 保存性能指标 JSON
|
||
metrics = self.get_metrics()
|
||
with open(f"{prefix}_metrics.json", 'w', encoding='utf-8') as f:
|
||
json.dump(metrics, f, indent=2, ensure_ascii=False)
|
||
|
||
|
||
# ============ 回测交易执行器 ============
|
||
|
||
class BacktestTrader:
|
||
"""回测交易执行器(复用信号计算逻辑)"""
|
||
|
||
def __init__(self, param_dict: dict, equity_tracker: EquityTracker):
|
||
self.param_dict = param_dict
|
||
self.equity_tracker = equity_tracker
|
||
self.current_symbol = " "
|
||
|
||
def set_symbol(self, symbol: str):
|
||
"""设置当前合约"""
|
||
self.current_symbol = symbol
|
||
|
||
def execute_open_long(self, trade_df: pd.DataFrame, param, current_price: float):
|
||
"""开多信号 — 记录待买入价格和对应的止损价(模拟实盘 pending order模式)"""
|
||
if param.position != 0:
|
||
return None
|
||
|
||
param.pending_long_price = trade_df['dj_price_high'].iloc[-1] if 'dj_price_high' in trade_df.columns else current_price
|
||
param.pending_sl_long_price = trade_df['dj_price_low'].iloc[-1] if 'dj_price_low' in trade_df.columns else 0
|
||
print(f"开多信号: 记录待买入价={param.pending_long_price}, datetime={trade_df['datetime'].iloc[-1]}")
|
||
|
||
def execute_open_short(self, trade_df: pd.DataFrame, param, current_price: float):
|
||
"""开空信号 — 记录待卖出价格和对应的止损价(模拟实盘 pending order 模式)"""
|
||
if param.position != 0:
|
||
return None
|
||
|
||
param.pending_short_price = trade_df['dj_price_low'].iloc[-1] if 'dj_price_low' in trade_df.columns else current_price
|
||
param.pending_sl_short_price = trade_df['dj_price_high'].iloc[-1] if 'dj_price_high' in trade_df.columns else 0
|
||
print(f"开空信号: 记录待卖出价={param.pending_short_price}, datetime={trade_df['datetime'].iloc[-1]}")
|
||
|
||
def check_pending_orders(self, trade_df: pd.DataFrame, param, current_price: float):
|
||
"""检查并执行待成交订单(模拟实盘 check_pending_orders)"""
|
||
# 检查待开多订单(价格回落到堆积区间高价时买入)
|
||
if param.position == 0 and param.pending_long_price > 0:
|
||
if current_price <= param.pending_long_price:
|
||
price = current_price + param.price_offset
|
||
self._do_open_long(trade_df, param, price)
|
||
param.pending_long_price = 0
|
||
param.pending_sl_long_price = 0
|
||
|
||
# 检查待开空订单(价格反弹到堆积区间低价时卖出)
|
||
if param.position == 0 and param.pending_short_price > 0:
|
||
if current_price >= param.pending_short_price:
|
||
price = current_price - param.price_offset
|
||
self._do_open_short(trade_df, param, price)
|
||
param.pending_short_price = 0
|
||
param.pending_sl_short_price = 0
|
||
|
||
def _do_open_long(self, trade_df: pd.DataFrame, param, price: float):
|
||
"""执行开多(内部方法)"""
|
||
trade_id = self.equity_tracker.trade_id_counter + 1
|
||
self.equity_tracker.trade_id_counter += 1
|
||
|
||
trade = TradeRecord(
|
||
trade_id=trade_id,
|
||
entry_datetime=trade_df["datetime"].iloc[-1],
|
||
entry_price=price,
|
||
direction="long",
|
||
lots=param.lots,
|
||
status="open"
|
||
)
|
||
self.equity_tracker.record_trade(trade)
|
||
|
||
param.position = 1
|
||
param.sl_long_price = param.pending_sl_long_price if param.pending_sl_long_price > 0 else trade_df['dj_price_low'].iloc[-1] if 'dj_price_low' in trade_df.columns else 0
|
||
print(f"执行开多: price={price}, sl={param.sl_long_price}, datetime={trade_df['datetime'].iloc[-1]}")
|
||
|
||
return trade
|
||
|
||
def _do_open_short(self, trade_df: pd.DataFrame, param, price: float):
|
||
"""执行开空(内部方法)"""
|
||
trade_id = self.equity_tracker.trade_id_counter + 1
|
||
self.equity_tracker.trade_id_counter += 1
|
||
|
||
trade = TradeRecord(
|
||
trade_id=trade_id,
|
||
entry_datetime=trade_df["datetime"].iloc[-1],
|
||
entry_price=price,
|
||
direction="short",
|
||
lots=param.lots,
|
||
status="open"
|
||
)
|
||
self.equity_tracker.record_trade(trade)
|
||
|
||
param.position = -1
|
||
param.sl_short_price = param.pending_sl_short_price if param.pending_sl_short_price > 0 else trade_df['dj_price_high'].iloc[-1] if 'dj_price_high' in trade_df.columns else 0
|
||
print(f"执行开空: price={price}, sl={param.sl_short_price}, datetime={trade_df['datetime'].iloc[-1]}")
|
||
|
||
return trade
|
||
|
||
def execute_close_long(self, trade_df: pd.DataFrame, param, current_price: float):
|
||
"""平多"""
|
||
if param.position != 1:
|
||
return None
|
||
|
||
#找到对应的 open trade
|
||
for trade in self.equity_tracker.trades:
|
||
if trade.direction == "long" and trade.status == "open":
|
||
trade.exit_datetime = trade_df["datetime"].iloc[-1]
|
||
trade.exit_price = current_price
|
||
trade.status = "closed"
|
||
|
||
multiplier = SYSTEM_CONFIG["contract_multiplier"].get(
|
||
param.symbol, 1
|
||
)
|
||
trade.pnl = (current_price - trade.entry_price) * multiplier * param.lots
|
||
trade.pnl_pct = (trade.pnl / self.equity_tracker.initial_equity * 100)
|
||
|
||
self.equity_tracker.current_equity += trade.pnl
|
||
if self.equity_tracker.current_equity > self.equity_tracker.peak_equity:
|
||
self.equity_tracker.peak_equity = self.equity_tracker.current_equity
|
||
|
||
param.position = 0
|
||
param.sl_long_price = 0
|
||
param.pending_long_price = 0
|
||
|
||
return trade
|
||
return None
|
||
|
||
def execute_close_short(self, trade_df: pd.DataFrame, param, current_price: float):
|
||
"""平空"""
|
||
if param.position != -1:
|
||
return None
|
||
|
||
for trade in self.equity_tracker.trades:
|
||
if trade.direction == "short" and trade.status == "open":
|
||
trade.exit_datetime = trade_df["datetime"].iloc[-1]
|
||
trade.exit_price = current_price
|
||
trade.status = "closed"
|
||
|
||
multiplier = SYSTEM_CONFIG["contract_multiplier"].get(
|
||
param.symbol, 1
|
||
)
|
||
trade.pnl = (trade.entry_price - current_price) * multiplier * param.lots
|
||
trade.pnl_pct = (trade.pnl / self.equity_tracker.initial_equity * 100)
|
||
|
||
self.equity_tracker.current_equity += trade.pnl
|
||
if self.equity_tracker.current_equity > self.equity_tracker.peak_equity:
|
||
self.equity_tracker.peak_equity = self.equity_tracker.current_equity
|
||
|
||
param.position = 0
|
||
param.sl_short_price = 0
|
||
param.pending_short_price = 0
|
||
|
||
return trade
|
||
return None
|
||
|
||
def check_stop_loss(self, trade_df: pd.DataFrame, param, current_price: float) -> bool:
|
||
"""检查止损"""
|
||
if param.position > 0 and param.sl_long_price > 0:
|
||
if current_price < param.sl_long_price:
|
||
self.execute_close_long(trade_df, param, current_price)
|
||
return True
|
||
elif param.position < 0 and param.sl_short_price > 0:
|
||
if current_price > param.sl_short_price:
|
||
self.execute_close_short(trade_df, param, current_price)
|
||
return True
|
||
return False
|
||
|
||
def calculate_signals(self, trade_df: pd.DataFrame, param) -> Tuple[bool, bool, float, float]:
|
||
"""计算交易信号(复用 OrderFlowTrader 逻辑)"""
|
||
if len(trade_df) < 2:
|
||
return False, False, 0, 0
|
||
|
||
dj_last = trade_df["dj"].iloc[-1]
|
||
delta_last = trade_df["delta"].iloc[-1]
|
||
|
||
bull_signal = dj_last >= param.accumulation_threshold and \
|
||
delta_last >= param.delta_threshold
|
||
bear_signal = dj_last <= -param.accumulation_threshold and \
|
||
delta_last <= -param.delta_threshold
|
||
|
||
return bull_signal, bear_signal, dj_last, delta_last
|
||
|
||
def calculate_delta_cumulative(self, trade_df: pd.DataFrame, param):
|
||
"""计算 Delta 累计"""
|
||
if trade_df["delta"].dtype == object:
|
||
trade_df["delta"] = trade_df["delta"].astype(float)
|
||
|
||
trade_df["datetime"] = pd.to_datetime(trade_df["datetime"])
|
||
|
||
last_dt = pd.to_datetime(trade_df["datetime"].iloc[-1])
|
||
hour = last_dt.hour
|
||
if hour >= 21:
|
||
trading_day = (last_dt + pd.Timedelta(days=1)).date()
|
||
elif hour < 15:
|
||
trading_day = last_dt.date()
|
||
else:
|
||
trading_day = last_dt.date()
|
||
|
||
trade_df["trading_day"] = str(trading_day)
|
||
trade_df["delta累计"] = trade_df.groupby("trading_day")["delta"].cumsum()
|
||
|
||
trade_df["datetime"] = trade_df["datetime"].dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
# ============ ofdata 回放器 (Mode 1) ============
|
||
|
||
class OfdataReplayer:
|
||
"""Mode 1: 回放 ofdata.json 数据"""
|
||
|
||
def __init__(self, backtester: 'Backtester', symbol: str):
|
||
self.backtester = backtester
|
||
self.symbol = symbol
|
||
self.param = backtester.param_dict[symbol]
|
||
self.trader = backtester.trader
|
||
self.equity_tracker = backtester.equity_tracker
|
||
self.data_store = backtester.data_store
|
||
|
||
def replay(self):
|
||
"""执行 ofdata 回放"""
|
||
data_dir = SYSTEM_CONFIG["data_dir"]
|
||
json_path = f"{data_dir}/{self.symbol}_ofdata.json"
|
||
|
||
if not os.path.exists(json_path):
|
||
print(f"ofdata 文件不存在: {json_path}")
|
||
return
|
||
|
||
# 加载 ofdata.json
|
||
trade_df = pd.read_json(json_path, lines=True)
|
||
if trade_df.empty:
|
||
print("ofdata 数据为空")
|
||
return
|
||
|
||
print(f"加载 ofdata: {json_path}, 共 {len(trade_df)} 条")
|
||
|
||
self.data_store.trade_dfs[self.symbol] = trade_df.copy()
|
||
self.trader.set_symbol(self.symbol)
|
||
|
||
param = self.param
|
||
param.position = 0
|
||
param.processed_rows = 0
|
||
|
||
# 逐行处理
|
||
for idx in range(len(trade_df)):
|
||
# 构建单行 DataFrame
|
||
row_df = trade_df.iloc[[idx]].copy()
|
||
current_dt = row_df["datetime"].iloc[0]
|
||
current_price = row_df["close"].iloc[0]
|
||
|
||
# 计算 delta 累计
|
||
if idx > 0:
|
||
# 重新计算到当前行
|
||
temp_df = trade_df.iloc[:idx+1].copy()
|
||
self.trader.calculate_delta_cumulative(temp_df, param)
|
||
signal_df = temp_df
|
||
else:
|
||
signal_df = trade_df.iloc[[idx]].copy()
|
||
|
||
# 更新 DataStore
|
||
self.data_store.trade_dfs[self.symbol] = trade_df.iloc[[idx]].copy()
|
||
|
||
# 检查止损(已有持仓的情况下)
|
||
self.trader.check_stop_loss(signal_df, param, current_price)
|
||
|
||
# 检查待成交订单(价格触达 pending 则开仓)
|
||
self.trader.check_pending_orders(signal_df, param, current_price)
|
||
|
||
# 计算信号(仅在无持仓时触发 pending order)
|
||
bull_signal, bear_signal, dj_last, delta_last = self.trader.calculate_signals(signal_df, param)
|
||
|
||
if bull_signal or bear_signal:
|
||
# 平仓信号
|
||
if param.position < 0 and (bear_signal or bull_signal):
|
||
self.trader.execute_close_short(row_df, param, current_price)
|
||
elif param.position > 0 and (bull_signal or bear_signal):
|
||
self.trader.execute_close_long(row_df, param, current_price)
|
||
|
||
# 开仓信号(设置 pending price,不立即开仓)
|
||
if bull_signal and param.position == 0:
|
||
self.trader.execute_open_long(row_df, param, current_price)
|
||
if bear_signal and param.position == 0:
|
||
self.trader.execute_open_short(row_df, param, current_price)
|
||
|
||
# 更新权益曲线(每行)
|
||
self.equity_tracker.update_equity(current_dt, current_price, param.position)
|
||
|
||
param.processed_rows = idx + 1
|
||
|
||
print(f"回放完成: {len(trade_df)} 条, 最终持仓: {param.position}")
|
||
|
||
|
||
# ============ tick 回放器 (Mode 2) ============
|
||
|
||
class TickReplayer:
|
||
"""Mode 2: 回放原始 tick CSV 数据"""
|
||
|
||
def __init__(self, backtester: 'Backtester', symbol: str):
|
||
self.backtester = backtester
|
||
self.symbol = symbol
|
||
self.param = backtester.param_dict[symbol]
|
||
self.trader = backtester.trader
|
||
self.equity_tracker = backtester.equity_tracker
|
||
self.data_store = backtester.data_store
|
||
|
||
# tick CSV 路径
|
||
data_dir = SYSTEM_CONFIG["data_dir"]
|
||
self.tick_csv_path = f"{data_dir}/{symbol}_tick.csv"
|
||
|
||
def replay(self):
|
||
"""执行 tick 回放"""
|
||
if not os.path.exists(self.tick_csv_path):
|
||
print(f"tick CSV 不存在: {self.tick_csv_path}")
|
||
return
|
||
|
||
# 读取 tick CSV
|
||
tick_df = pd.read_csv(self.tick_csv_path)
|
||
if tick_df.empty:
|
||
print("tick 数据为空")
|
||
return
|
||
|
||
print(f"加载 tick: {self.tick_csv_path}, 共 {len(tick_df)} 条")
|
||
|
||
self.trader.set_symbol(self.symbol)
|
||
param = self.param
|
||
param.position = 0
|
||
param.processed_rows = 0
|
||
|
||
prev_bartime = None
|
||
|
||
for idx, row in tick_df.iterrows():
|
||
# 构建 tick 数据
|
||
tick_data = {
|
||
"InstrumentID": row["InstrumentID"].encode() if isinstance(row["InstrumentID"], str) else row["InstrumentID"],
|
||
"ActionDay": str(row["ActionDay"]).encode() if isinstance(row["ActionDay"], (int, str)) else row["ActionDay"],
|
||
"UpdateTime": str(row["UpdateTime"]).encode() if isinstance(row["UpdateTime"], str) else row["UpdateTime"],
|
||
"UpdateMillisec": int(row["UpdateMillisec"]) if "UpdateMillisec" in row else 0,
|
||
"LastPrice": float(row["LastPrice"]),
|
||
"Volume": int(row["Volume"]),
|
||
"BidPrice1": float(row["BidPrice1"]),
|
||
"BidVolume1": int(row["BidVolume1"]),
|
||
"AskPrice1": float(row["AskPrice1"]),
|
||
"AskVolume1": int(row["AskVolume1"]),
|
||
"UpperLimitPrice": float(row["UpperLimitPrice"]) if "UpperLimitPrice" in row else 0,
|
||
"LowerLimitPrice": float(row["LowerLimitPrice"]) if "LowerLimitPrice" in row else 0,
|
||
"TradingDay": str(row["TradingDay"]).encode() if isinstance(row["TradingDay"], str) else row["TradingDay"],
|
||
"Turnover": float(row["Turnover"]) if "Turnover" in row else 0,
|
||
"OpenInterest": int(row["OpenInterest"]) if "OpenInterest" in row else 0,
|
||
}
|
||
|
||
# 处理 tick
|
||
self._process_tick(tick_data, param, prev_bartime)
|
||
prev_bartime = self.data_store.tickdata.get(self.symbol, {}).get("bartime", prev_bartime)
|
||
|
||
print(f"tick 回放完成: {len(tick_df)} 条, 最终持仓: {param.position}")
|
||
|
||
def _process_tick(self, data: dict, param, prev_bartime: str):
|
||
"""处理单个 tick(复用 run.py 的 tick 处理逻辑)"""
|
||
instrument_id = data["InstrumentID"].decode() if isinstance(data["InstrumentID"], bytes) else data["InstrumentID"]
|
||
action_day = str(data["ActionDay"].decode() if isinstance(data["ActionDay"], bytes) else data["ActionDay"])
|
||
update_time = str(data["UpdateTime"].decode() if isinstance(data["UpdateTime"], bytes) else data["UpdateTime"])
|
||
|
||
action_day_fmt = f"{action_day[:4]}-{action_day[4:6]}-{action_day[6:]}"
|
||
created_at = datetime.strptime(
|
||
f"{action_day_fmt} {update_time}.{data['UpdateMillisec']}",
|
||
"%Y-%m-%d %H:%M:%S.%f"
|
||
)
|
||
|
||
# 计算瞬时成交量
|
||
prev_vol = self.data_store.prev_volume.get(instrument_id, 0)
|
||
curr_vol = int(data["Volume"])
|
||
last_vol = curr_vol - prev_vol if prev_vol != 0 else 0
|
||
self.data_store.prev_volume[instrument_id] = curr_vol
|
||
|
||
if last_vol <= 0:
|
||
return
|
||
|
||
# 构建 tick
|
||
tick = {
|
||
"symbol": instrument_id,
|
||
"created_at": created_at,
|
||
"price": float(data["LastPrice"]),
|
||
"last_volume": last_vol,
|
||
"bid_p": float(data["BidPrice1"]),
|
||
"bid_v": int(data["BidVolume1"]),
|
||
"ask_p": float(data["AskPrice1"]),
|
||
"ask_v": int(data["AskVolume1"]),
|
||
"upper_limit": float(data["UpperLimitPrice"]),
|
||
"lower_limit": float(data["LowerLimitPrice"]),
|
||
"trading_day": data["TradingDay"].decode() if isinstance(data["TradingDay"], bytes) else str(data["TradingDay"]),
|
||
"cum_volume": curr_vol,
|
||
"cum_amount": float(data["Turnover"]),
|
||
"cum_position": int(data["OpenInterest"]),
|
||
}
|
||
self._on_tick(tick, param, prev_bartime)
|
||
|
||
def _on_tick(self, tick: dict, param, prev_bartime: str):
|
||
"""处理 tick(简化版)"""
|
||
tsymbol = tick["symbol"]
|
||
|
||
# 获取上条 tick
|
||
prev_tick = self.data_store.last_tick.get(tsymbol)
|
||
if prev_tick:
|
||
bid_p = prev_tick["bid_p"]
|
||
ask_p = prev_tick["ask_p"]
|
||
bid_v = prev_tick["bid_v"]
|
||
ask_v = prev_tick["ask_v"]
|
||
else:
|
||
bid_p = tick["bid_p"]
|
||
ask_p = tick["ask_p"]
|
||
bid_v = tick["bid_v"]
|
||
ask_v = tick["ask_v"]
|
||
|
||
self.data_store.last_tick[tsymbol] = tick
|
||
|
||
timetick = tick["created_at"].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
||
|
||
# 构建 DataFrame
|
||
tick_dt = pd.DataFrame({
|
||
"datetime": timetick,
|
||
"symbol": tick["symbol"],
|
||
"mainsym": tick["symbol"].rstrip("0123456789").upper(),
|
||
"lastprice": tick["price"],
|
||
"vol": tick["last_volume"],
|
||
"bid_p": bid_p,
|
||
"ask_p": ask_p,
|
||
"bid_v": bid_v,
|
||
"ask_v": ask_v,
|
||
}, index=[0])
|
||
|
||
self._tickdata(tick_dt, tsymbol, param, prev_bartime)
|
||
|
||
def _tickdata(self, df: pd.DataFrame, symbol: str, param, prev_bartime: str):
|
||
"""K线聚合(简化版)"""
|
||
from run import OrderFlowTrader
|
||
|
||
tickdata = df.copy()
|
||
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"])
|
||
|
||
rdf = self.data_store.tickdata.get(symbol)
|
||
|
||
if rdf is not None:
|
||
rdftm = pd.to_datetime(rdf["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S")
|
||
now_bartime = pd.to_datetime(tickdata["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
if now_bartime > rdftm:
|
||
# 新 K 线开始
|
||
with type('Lock', (), {'__enter__': lambda s: None, '__exit__': lambda s, *a: None})():
|
||
of = self.data_store.ofdata.pop(symbol, None)
|
||
if of is not None:
|
||
# 追加到 trade_dfs
|
||
existing = self.data_store.trade_dfs.get(symbol)
|
||
if existing is None:
|
||
self.data_store.trade_dfs[symbol] = of
|
||
else:
|
||
self.data_store.trade_dfs[symbol] = pd.concat([existing, of], ignore_index=True)
|
||
|
||
self.data_store.quote.pop(symbol, None)
|
||
self.data_store.tickdata.pop(symbol, None)
|
||
|
||
# 处理上一根 K 线的信号
|
||
trade_df = self.data_store.trade_dfs.get(symbol)
|
||
if trade_df is not None and len(trade_df) > 0:
|
||
last_row = trade_df.iloc[[-1]].copy()
|
||
current_price = last_row["close"].iloc[0]
|
||
|
||
# 计算 delta 累计
|
||
self.trader.calculate_delta_cumulative(trade_df, param)
|
||
|
||
# 检查止损
|
||
self.trader.check_stop_loss(last_row, param, current_price)
|
||
|
||
# 计算信号
|
||
bull_signal, bear_signal, dj_last, delta_last = self.trader.calculate_signals(trade_df, param)
|
||
|
||
if bull_signal or bear_signal:
|
||
if param.position < 0 and (bear_signal or bull_signal):
|
||
self.trader.execute_close_short(last_row, param, current_price)
|
||
elif param.position > 0 and (bull_signal or bear_signal):
|
||
self.trader.execute_close_long(last_row, param, current_price)
|
||
if bull_signal and param.position == 0:
|
||
self.trader.execute_open_long(last_row, param, current_price)
|
||
if bear_signal and param.position == 0:
|
||
self.trader.execute_open_short(last_row, param, current_price)
|
||
|
||
# 更新权益曲线
|
||
self.equity_tracker.update_equity(
|
||
last_row["datetime"].iloc[0],
|
||
current_price,
|
||
param.position
|
||
)
|
||
|
||
# 初始化新 K 线
|
||
tickdata["open"] = tickdata["lastprice"]
|
||
tickdata["high"] = tickdata["lastprice"]
|
||
tickdata["low"] = tickdata["lastprice"]
|
||
tickdata["close"] = tickdata["lastprice"]
|
||
tickdata["starttime"] = tickdata["datetime"]
|
||
else:
|
||
# 同一 K 线
|
||
tickdata["bartime"] = rdf["bartime"]
|
||
tickdata["open"] = rdf["open"]
|
||
lastprice_val = float(tickdata["lastprice"].values[0])
|
||
tickdata["high"] = max(lastprice_val, float(rdf["high"].values[0]))
|
||
tickdata["low"] = min(lastprice_val, float(rdf["low"].values[0]))
|
||
tickdata["close"] = tickdata["lastprice"]
|
||
tickdata["vol"] = df["vol"].values + rdf["vol"].values
|
||
tickdata["starttime"] = rdf["starttime"]
|
||
else:
|
||
# 首条 tick
|
||
tickdata["open"] = tickdata["lastprice"]
|
||
tickdata["high"] = tickdata["lastprice"]
|
||
tickdata["low"] = tickdata["lastprice"]
|
||
tickdata["close"] = tickdata["lastprice"]
|
||
tickdata["starttime"] = tickdata["datetime"]
|
||
|
||
self.data_store.tickdata[symbol] = tickdata
|
||
|
||
# 生成订单流数据
|
||
self._orderflow_df_new(tickdata, symbol, param)
|
||
|
||
def _orderflow_df_new(self, tickdata: pd.DataFrame, symbol: str, param):
|
||
"""生成订单流(简化版)"""
|
||
bar_vol = int(tickdata["vol"].values[0])
|
||
bar_close = float(tickdata["close"].values[0])
|
||
bar_open = float(tickdata["open"].values[0])
|
||
bar_low = float(tickdata["low"].values[0])
|
||
bar_high = float(tickdata["high"].values[0])
|
||
dt = pd.to_datetime(tickdata["bartime"].values[0]).strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
bid_p = float(tickdata["bid_p"].values[0])
|
||
ask_p = float(tickdata["ask_p"].values[0])
|
||
last_price = float(tickdata["lastprice"].values[0])
|
||
|
||
bid_dict = {}
|
||
ask_dict = {}
|
||
|
||
if last_price >= ask_p:
|
||
ask_dict[str(last_price)] = ask_dict.get(str(last_price), 0) + bar_vol
|
||
if last_price <= bid_p:
|
||
bid_dict[str(last_price)] = bid_dict.get(str(last_price), 0) + bar_vol
|
||
|
||
# 合并买卖盘
|
||
bid_result, ask_result = self._process_quote(bid_dict, ask_dict, symbol)
|
||
|
||
# 价格排序
|
||
price_list = sorted(bid_result.keys())
|
||
ask_list = [ask_result.get(p, 0) for p in price_list]
|
||
bid_list = [bid_result.get(p, 0) for p in price_list]
|
||
|
||
delta = sum(ask_list) - sum(bid_list)
|
||
|
||
# 构建 DataFrame
|
||
df = pd.DataFrame({
|
||
"price": [price_list],
|
||
"Ask": [ask_list],
|
||
"Bid": [bid_list],
|
||
})
|
||
df["symbol"] = symbol
|
||
df["datetime"] = dt
|
||
df["delta"] = delta
|
||
df["close"] = bar_close
|
||
df["open"] = bar_open
|
||
df["high"] = bar_high
|
||
df["low"] = bar_low
|
||
df["vol"] = bar_vol
|
||
df["dj"] = 0
|
||
df["dj_price_high"] = 0
|
||
df["dj_price_low"] = 0
|
||
|
||
self.data_store.ofdata[symbol] = df
|
||
|
||
def _process_quote(self, bid_dict: dict, ask_dict: dict, symbol: str):
|
||
"""合并买卖盘"""
|
||
dic = self.data_store.quote.get(symbol)
|
||
if dic:
|
||
bid_result = dic["bid_result"].copy()
|
||
ask_result = dic["ask_result"].copy()
|
||
else:
|
||
bid_result, ask_result = {}, {}
|
||
|
||
price_list = sorted(set(bid_dict.keys()) | set(ask_dict.keys()))
|
||
|
||
for price in price_list:
|
||
bid_val = int(bid_dict.get(price, 0))
|
||
ask_val = int(ask_dict.get(price, 0))
|
||
|
||
if bid_val:
|
||
bid_result[price] = bid_result.get(price, 0) + bid_val
|
||
ask_result.setdefault(price, 0)
|
||
if ask_val:
|
||
ask_result[price] = ask_result.get(price, 0) + ask_val
|
||
bid_result.setdefault(price, 0)
|
||
|
||
self.data_store.quote[symbol] = {"bid_result": bid_result, "ask_result": ask_result}
|
||
return bid_result, ask_result
|
||
|
||
|
||
# ============ 回测主类 ============
|
||
|
||
class Backtester:
|
||
"""回测主协调器"""
|
||
|
||
def __init__(self, param_dict: dict):
|
||
self.param_dict = param_dict
|
||
self.data_store = BacktestDataStore()
|
||
self.equity_tracker = EquityTracker(SYSTEM_CONFIG["initial_equity"])
|
||
self.trader = BacktestTrader(param_dict, self.equity_tracker)
|
||
self.results = {}
|
||
|
||
def run(self):
|
||
"""执行回测"""
|
||
mode = SYSTEM_CONFIG["backtest_mode"]
|
||
|
||
for symbol in self.param_dict.keys():
|
||
print(f"\n{'='*50}")
|
||
print(f"回测合约: {symbol},模式: {mode}")
|
||
print(f"{'='*50}")
|
||
|
||
# 每次重新初始化
|
||
self.data_store = BacktestDataStore()
|
||
self.equity_tracker = EquityTracker(SYSTEM_CONFIG["initial_equity"])
|
||
self.trader = BacktestTrader(self.param_dict, self.equity_tracker)
|
||
|
||
if mode == "ofdata":
|
||
replayer = OfdataReplayer(self, symbol)
|
||
else:
|
||
replayer = TickReplayer(self, symbol)
|
||
|
||
replayer.replay()
|
||
|
||
# 保存结果
|
||
self.equity_tracker.save_results(symbol)
|
||
|
||
# 打印指标
|
||
metrics = self.equity_tracker.get_metrics()
|
||
print(f"\n回测结果:")
|
||
print(f" 总收益率: {metrics['total_return_pct']}%")
|
||
print(f" 最大回撤: {metrics['max_drawdown_pct']}%")
|
||
print(f" 胜率: {metrics['win_rate']}%")
|
||
print(f" 总交易次数: {metrics['total_trades']}")
|
||
print(f" 盈利交易: {metrics['winning_trades']}, 亏损交易: {metrics['losing_trades']}")
|
||
print(f" 最终权益: {metrics['final_equity']}")
|
||
|
||
self.results[symbol] = metrics
|
||
|
||
return self.results
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 测试回测模块
|
||
from run import TradingParam
|
||
|
||
param_dict = {}
|
||
for symbol, config in TRADING_PARAMS.items():
|
||
p = TradingParam.from_config(symbol, config)
|
||
p.symbol = symbol
|
||
param_dict[symbol] = p
|
||
|
||
backtester = Backtester(param_dict)
|
||
backtester.run() |