Files

836 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
订单流回测模块
支持两种回测模式:
1. Mode ofdata:读取 ofdata.json 历史数据重放
2. Mode tick: 读取原始 tick CSV 重放完整流程
"""
import os
import json
import csv
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from config import SYSTEM_CONFIG, TRADING_PARAMS
# ============ 隔离数据存储 ============
class BacktestDataStore:
"""回测数据存储(隔离实例,避免与实盘共享状态)"""
tickdata = {}
quote = {}
ofdata = {}
trade_dfs = {}
prev_volume = {}
last_tick = {}
# ============ 交易记录数据结构 ============
@dataclass
class TradeRecord:
"""交易记录"""
trade_id: int
entry_datetime: str
entry_price: float
direction: str # "long" / "short"
exit_datetime: Optional[str] = None
exit_price: Optional[float] = None
lots: int = 1
pnl: float = 0.0
pnl_pct: float = 0.0
status: str = "open" # "open" / "closed"
# ============ 权益跟踪器 ============
class EquityTracker:
"""跟踪权益曲线和回撤"""
def __init__(self, initial_equity: float = 100000.0):
self.initial_equity = initial_equity
self.current_equity = initial_equity
self.peak_equity = initial_equity
self.max_drawdown = 0.0
self.max_drawdown_pct = 0.0
# 权益曲线记录
self.equity_curve = [] # [(datetime, equity, drawdown, drawdown_pct), ...]
self.trades = [] # TradeRecord list
self.trade_id_counter = 0
def update_equity(self, datetime_str: str, price: float, position: int):
"""更新权益曲线(每个 bar 更新一次)"""
if position != 0:
# 有持仓时不计入权益(按结算价计算 unrealized P&L
return
# 无持仓时权益不变
drawdown = self.current_equity - self.peak_equity
drawdown_pct = (drawdown / self.peak_equity * 100) if self.peak_equity > 0 else 0
self.equity_curve.append({
"datetime": datetime_str,
"equity": self.current_equity,
"drawdown": drawdown,
"drawdown_pct": round(drawdown_pct, 2)
})
def record_trade(self, trade: TradeRecord):
"""记录交易"""
self.trades.append(trade)
def close_trade(self, trade_id: int, exit_datetime: str, exit_price: float,
multiplier: float, lots: int):
"""平仓并计算 P&L"""
for trade in self.trades:
if trade.trade_id == trade_id and trade.status == "open":
trade.exit_datetime = exit_datetime
trade.exit_price = exit_price
trade.status = "closed"
if trade.direction == "long":
trade.pnl = (exit_price - trade.entry_price) * multiplier * lots
else: # short
trade.pnl = (trade.entry_price - exit_price) * multiplier * lots
trade.pnl_pct = (trade.pnl / self.initial_equity * 100)
# 更新权益
self.current_equity += trade.pnl
if self.current_equity > self.peak_equity:
self.peak_equity = self.current_equity
return trade
return None
def get_metrics(self) -> dict:
"""计算性能指标"""
closed_trades = [t for t in self.trades if t.status == "closed"]
winning_trades = [t for t in closed_trades if t.pnl > 0]
losing_trades = [t for t in closed_trades if t.pnl <= 0]
total_return = self.current_equity - self.initial_equity
total_return_pct = (total_return / self.initial_equity * 100)
# 计算最大回撤
equity_df = pd.DataFrame(self.equity_curve)
if not equity_df.empty and "equity" in equity_df.columns:
rolling_max = equity_df["equity"].cummax()
drawdowns = equity_df["equity"] - rolling_max
self.max_drawdown = drawdowns.min()
self.max_drawdown_pct = (drawdowns.min() / rolling_max.max() * 100) if rolling_max.max() > 0 else 0
avg_win = np.mean([t.pnl for t in winning_trades]) if winning_trades else 0
avg_loss = np.mean([t.pnl for t in losing_trades]) if losing_trades else 0
total_win = sum([t.pnl for t in winning_trades])
total_loss = abs(sum([t.pnl for t in losing_trades]))
return {
"total_return": round(total_return, 2),
"total_return_pct": round(total_return_pct, 2),
"max_drawdown": round(self.max_drawdown, 2),
"max_drawdown_pct": round(self.max_drawdown_pct, 2),
"win_rate": round(len(winning_trades) / len(closed_trades), 4) if closed_trades else 0,
"total_trades": len(closed_trades),
"winning_trades": len(winning_trades),
"losing_trades": len(losing_trades),
"avg_win": round(avg_win, 2),
"avg_loss": round(avg_loss, 2),
"profit_factor": round(total_win / total_loss, 2) if total_loss > 0 else 0,
"final_equity": round(self.current_equity, 2),
}
def save_results(self, symbol: str):
"""保存回测结果到文件"""
results_dir = SYSTEM_CONFIG["backtest_results_dir"]
os.makedirs(results_dir, exist_ok=True)
prefix = f"{results_dir}/{symbol}"
# 保存权益曲线 CSV
if self.equity_curve:
equity_df = pd.DataFrame(self.equity_curve)
equity_df.to_csv(f"{prefix}_equity.csv", index=False)
# 保存交易记录 CSV
if self.trades:
trades_data = []
for t in self.trades:
trades_data.append({
"trade_id": t.trade_id,
"entry_datetime": t.entry_datetime,
"entry_price": t.entry_price,
"direction": t.direction,
"exit_datetime": t.exit_datetime or "",
"exit_price": t.exit_price or "",
"lots": t.lots,
"pnl": round(t.pnl, 2),
"pnl_pct": round(t.pnl_pct, 2),
"status": t.status
})
trades_df = pd.DataFrame(trades_data)
trades_df.to_csv(f"{prefix}_trades.csv", index=False)
# 保存性能指标 JSON
metrics = self.get_metrics()
with open(f"{prefix}_metrics.json", 'w', encoding='utf-8') as f:
json.dump(metrics, f, indent=2, ensure_ascii=False)
# ============ 回测交易执行器 ============
class BacktestTrader:
"""回测交易执行器(复用信号计算逻辑)"""
def __init__(self, param_dict: dict, equity_tracker: EquityTracker):
self.param_dict = param_dict
self.equity_tracker = equity_tracker
self.current_symbol = " "
def set_symbol(self, symbol: str):
"""设置当前合约"""
self.current_symbol = symbol
def execute_open_long(self, trade_df: pd.DataFrame, param, current_price: float):
"""开多信号 — 记录待买入价格和对应的止损价(模拟实盘 pending order模式)"""
if param.position != 0:
return None
param.pending_long_price = trade_df['dj_price_high'].iloc[-1] if 'dj_price_high' in trade_df.columns else current_price
param.pending_sl_long_price = trade_df['dj_price_low'].iloc[-1] if 'dj_price_low' in trade_df.columns else 0
print(f"开多信号: 记录待买入价={param.pending_long_price}, datetime={trade_df['datetime'].iloc[-1]}")
def execute_open_short(self, trade_df: pd.DataFrame, param, current_price: float):
"""开空信号 — 记录待卖出价格和对应的止损价(模拟实盘 pending order 模式)"""
if param.position != 0:
return None
param.pending_short_price = trade_df['dj_price_low'].iloc[-1] if 'dj_price_low' in trade_df.columns else current_price
param.pending_sl_short_price = trade_df['dj_price_high'].iloc[-1] if 'dj_price_high' in trade_df.columns else 0
print(f"开空信号: 记录待卖出价={param.pending_short_price}, datetime={trade_df['datetime'].iloc[-1]}")
def check_pending_orders(self, trade_df: pd.DataFrame, param, current_price: float):
"""检查并执行待成交订单(模拟实盘 check_pending_orders"""
# 检查待开多订单(价格回落到堆积区间高价时买入)
if param.position == 0 and param.pending_long_price > 0:
if current_price <= param.pending_long_price:
price = current_price + param.price_offset
self._do_open_long(trade_df, param, price)
param.pending_long_price = 0
param.pending_sl_long_price = 0
# 检查待开空订单(价格反弹到堆积区间低价时卖出)
if param.position == 0 and param.pending_short_price > 0:
if current_price >= param.pending_short_price:
price = current_price - param.price_offset
self._do_open_short(trade_df, param, price)
param.pending_short_price = 0
param.pending_sl_short_price = 0
def _do_open_long(self, trade_df: pd.DataFrame, param, price: float):
"""执行开多(内部方法)"""
trade_id = self.equity_tracker.trade_id_counter + 1
self.equity_tracker.trade_id_counter += 1
trade = TradeRecord(
trade_id=trade_id,
entry_datetime=trade_df["datetime"].iloc[-1],
entry_price=price,
direction="long",
lots=param.lots,
status="open"
)
self.equity_tracker.record_trade(trade)
param.position = 1
param.sl_long_price = param.pending_sl_long_price if param.pending_sl_long_price > 0 else trade_df['dj_price_low'].iloc[-1] if 'dj_price_low' in trade_df.columns else 0
print(f"执行开多: price={price}, sl={param.sl_long_price}, datetime={trade_df['datetime'].iloc[-1]}")
return trade
def _do_open_short(self, trade_df: pd.DataFrame, param, price: float):
"""执行开空(内部方法)"""
trade_id = self.equity_tracker.trade_id_counter + 1
self.equity_tracker.trade_id_counter += 1
trade = TradeRecord(
trade_id=trade_id,
entry_datetime=trade_df["datetime"].iloc[-1],
entry_price=price,
direction="short",
lots=param.lots,
status="open"
)
self.equity_tracker.record_trade(trade)
param.position = -1
param.sl_short_price = param.pending_sl_short_price if param.pending_sl_short_price > 0 else trade_df['dj_price_high'].iloc[-1] if 'dj_price_high' in trade_df.columns else 0
print(f"执行开空: price={price}, sl={param.sl_short_price}, datetime={trade_df['datetime'].iloc[-1]}")
return trade
def execute_close_long(self, trade_df: pd.DataFrame, param, current_price: float):
"""平多"""
if param.position != 1:
return None
#找到对应的 open trade
for trade in self.equity_tracker.trades:
if trade.direction == "long" and trade.status == "open":
trade.exit_datetime = trade_df["datetime"].iloc[-1]
trade.exit_price = current_price
trade.status = "closed"
multiplier = SYSTEM_CONFIG["contract_multiplier"].get(
param.symbol, 1
)
trade.pnl = (current_price - trade.entry_price) * multiplier * param.lots
trade.pnl_pct = (trade.pnl / self.equity_tracker.initial_equity * 100)
self.equity_tracker.current_equity += trade.pnl
if self.equity_tracker.current_equity > self.equity_tracker.peak_equity:
self.equity_tracker.peak_equity = self.equity_tracker.current_equity
param.position = 0
param.sl_long_price = 0
param.pending_long_price = 0
return trade
return None
def execute_close_short(self, trade_df: pd.DataFrame, param, current_price: float):
"""平空"""
if param.position != -1:
return None
for trade in self.equity_tracker.trades:
if trade.direction == "short" and trade.status == "open":
trade.exit_datetime = trade_df["datetime"].iloc[-1]
trade.exit_price = current_price
trade.status = "closed"
multiplier = SYSTEM_CONFIG["contract_multiplier"].get(
param.symbol, 1
)
trade.pnl = (trade.entry_price - current_price) * multiplier * param.lots
trade.pnl_pct = (trade.pnl / self.equity_tracker.initial_equity * 100)
self.equity_tracker.current_equity += trade.pnl
if self.equity_tracker.current_equity > self.equity_tracker.peak_equity:
self.equity_tracker.peak_equity = self.equity_tracker.current_equity
param.position = 0
param.sl_short_price = 0
param.pending_short_price = 0
return trade
return None
def check_stop_loss(self, trade_df: pd.DataFrame, param, current_price: float) -> bool:
"""检查止损"""
if param.position > 0 and param.sl_long_price > 0:
if current_price < param.sl_long_price:
self.execute_close_long(trade_df, param, current_price)
return True
elif param.position < 0 and param.sl_short_price > 0:
if current_price > param.sl_short_price:
self.execute_close_short(trade_df, param, current_price)
return True
return False
def calculate_signals(self, trade_df: pd.DataFrame, param) -> Tuple[bool, bool, float, float]:
"""计算交易信号(复用 OrderFlowTrader 逻辑)"""
if len(trade_df) < 2:
return False, False, 0, 0
dj_last = trade_df["dj"].iloc[-1]
delta_last = trade_df["delta"].iloc[-1]
bull_signal = dj_last >= param.accumulation_threshold and \
delta_last >= param.delta_threshold
bear_signal = dj_last <= -param.accumulation_threshold and \
delta_last <= -param.delta_threshold
return bull_signal, bear_signal, dj_last, delta_last
def calculate_delta_cumulative(self, trade_df: pd.DataFrame, param):
"""计算 Delta 累计"""
if trade_df["delta"].dtype == object:
trade_df["delta"] = trade_df["delta"].astype(float)
trade_df["datetime"] = pd.to_datetime(trade_df["datetime"])
last_dt = pd.to_datetime(trade_df["datetime"].iloc[-1])
hour = last_dt.hour
if hour >= 21:
trading_day = (last_dt + pd.Timedelta(days=1)).date()
elif hour < 15:
trading_day = last_dt.date()
else:
trading_day = last_dt.date()
trade_df["trading_day"] = str(trading_day)
trade_df["delta累计"] = trade_df.groupby("trading_day")["delta"].cumsum()
trade_df["datetime"] = trade_df["datetime"].dt.strftime("%Y-%m-%d %H:%M:%S")
# ============ ofdata 回放器 (Mode 1) ============
class OfdataReplayer:
"""Mode 1: 回放 ofdata.json 数据"""
def __init__(self, backtester: 'Backtester', symbol: str):
self.backtester = backtester
self.symbol = symbol
self.param = backtester.param_dict[symbol]
self.trader = backtester.trader
self.equity_tracker = backtester.equity_tracker
self.data_store = backtester.data_store
def replay(self):
"""执行 ofdata 回放"""
data_dir = SYSTEM_CONFIG["data_dir"]
json_path = f"{data_dir}/{self.symbol}_ofdata.json"
if not os.path.exists(json_path):
print(f"ofdata 文件不存在: {json_path}")
return
# 加载 ofdata.json
trade_df = pd.read_json(json_path, lines=True)
if trade_df.empty:
print("ofdata 数据为空")
return
print(f"加载 ofdata: {json_path}, 共 {len(trade_df)}")
self.data_store.trade_dfs[self.symbol] = trade_df.copy()
self.trader.set_symbol(self.symbol)
param = self.param
param.position = 0
param.processed_rows = 0
# 逐行处理
for idx in range(len(trade_df)):
# 构建单行 DataFrame
row_df = trade_df.iloc[[idx]].copy()
current_dt = row_df["datetime"].iloc[0]
current_price = row_df["close"].iloc[0]
# 计算 delta 累计
if idx > 0:
# 重新计算到当前行
temp_df = trade_df.iloc[:idx+1].copy()
self.trader.calculate_delta_cumulative(temp_df, param)
signal_df = temp_df
else:
signal_df = trade_df.iloc[[idx]].copy()
# 更新 DataStore
self.data_store.trade_dfs[self.symbol] = trade_df.iloc[[idx]].copy()
# 检查止损(已有持仓的情况下)
self.trader.check_stop_loss(signal_df, param, current_price)
# 检查待成交订单(价格触达 pending 则开仓)
self.trader.check_pending_orders(signal_df, param, current_price)
# 计算信号(仅在无持仓时触发 pending order
bull_signal, bear_signal, dj_last, delta_last = self.trader.calculate_signals(signal_df, param)
if bull_signal or bear_signal:
# 平仓信号
if param.position < 0 and (bear_signal or bull_signal):
self.trader.execute_close_short(row_df, param, current_price)
elif param.position > 0 and (bull_signal or bear_signal):
self.trader.execute_close_long(row_df, param, current_price)
# 开仓信号(设置 pending price,不立即开仓)
if bull_signal and param.position == 0:
self.trader.execute_open_long(row_df, param, current_price)
if bear_signal and param.position == 0:
self.trader.execute_open_short(row_df, param, current_price)
# 更新权益曲线(每行)
self.equity_tracker.update_equity(current_dt, current_price, param.position)
param.processed_rows = idx + 1
print(f"回放完成: {len(trade_df)} 条, 最终持仓: {param.position}")
# ============ tick 回放器 (Mode 2) ============
class TickReplayer:
"""Mode 2: 回放原始 tick CSV 数据"""
def __init__(self, backtester: 'Backtester', symbol: str):
self.backtester = backtester
self.symbol = symbol
self.param = backtester.param_dict[symbol]
self.trader = backtester.trader
self.equity_tracker = backtester.equity_tracker
self.data_store = backtester.data_store
# tick CSV 路径
data_dir = SYSTEM_CONFIG["data_dir"]
self.tick_csv_path = f"{data_dir}/{symbol}_tick.csv"
def replay(self):
"""执行 tick 回放"""
if not os.path.exists(self.tick_csv_path):
print(f"tick CSV 不存在: {self.tick_csv_path}")
return
# 读取 tick CSV
tick_df = pd.read_csv(self.tick_csv_path)
if tick_df.empty:
print("tick 数据为空")
return
print(f"加载 tick: {self.tick_csv_path}, 共 {len(tick_df)}")
self.trader.set_symbol(self.symbol)
param = self.param
param.position = 0
param.processed_rows = 0
prev_bartime = None
for idx, row in tick_df.iterrows():
# 构建 tick 数据
tick_data = {
"InstrumentID": row["InstrumentID"].encode() if isinstance(row["InstrumentID"], str) else row["InstrumentID"],
"ActionDay": str(row["ActionDay"]).encode() if isinstance(row["ActionDay"], (int, str)) else row["ActionDay"],
"UpdateTime": str(row["UpdateTime"]).encode() if isinstance(row["UpdateTime"], str) else row["UpdateTime"],
"UpdateMillisec": int(row["UpdateMillisec"]) if "UpdateMillisec" in row else 0,
"LastPrice": float(row["LastPrice"]),
"Volume": int(row["Volume"]),
"BidPrice1": float(row["BidPrice1"]),
"BidVolume1": int(row["BidVolume1"]),
"AskPrice1": float(row["AskPrice1"]),
"AskVolume1": int(row["AskVolume1"]),
"UpperLimitPrice": float(row["UpperLimitPrice"]) if "UpperLimitPrice" in row else 0,
"LowerLimitPrice": float(row["LowerLimitPrice"]) if "LowerLimitPrice" in row else 0,
"TradingDay": str(row["TradingDay"]).encode() if isinstance(row["TradingDay"], str) else row["TradingDay"],
"Turnover": float(row["Turnover"]) if "Turnover" in row else 0,
"OpenInterest": int(row["OpenInterest"]) if "OpenInterest" in row else 0,
}
# 处理 tick
self._process_tick(tick_data, param, prev_bartime)
prev_bartime = self.data_store.tickdata.get(self.symbol, {}).get("bartime", prev_bartime)
print(f"tick 回放完成: {len(tick_df)} 条, 最终持仓: {param.position}")
def _process_tick(self, data: dict, param, prev_bartime: str):
"""处理单个 tick(复用 run.py 的 tick 处理逻辑)"""
instrument_id = data["InstrumentID"].decode() if isinstance(data["InstrumentID"], bytes) else data["InstrumentID"]
action_day = str(data["ActionDay"].decode() if isinstance(data["ActionDay"], bytes) else data["ActionDay"])
update_time = str(data["UpdateTime"].decode() if isinstance(data["UpdateTime"], bytes) else data["UpdateTime"])
action_day_fmt = f"{action_day[:4]}-{action_day[4:6]}-{action_day[6:]}"
created_at = datetime.strptime(
f"{action_day_fmt} {update_time}.{data['UpdateMillisec']}",
"%Y-%m-%d %H:%M:%S.%f"
)
# 计算瞬时成交量
prev_vol = self.data_store.prev_volume.get(instrument_id, 0)
curr_vol = int(data["Volume"])
last_vol = curr_vol - prev_vol if prev_vol != 0 else 0
self.data_store.prev_volume[instrument_id] = curr_vol
if last_vol <= 0:
return
# 构建 tick
tick = {
"symbol": instrument_id,
"created_at": created_at,
"price": float(data["LastPrice"]),
"last_volume": last_vol,
"bid_p": float(data["BidPrice1"]),
"bid_v": int(data["BidVolume1"]),
"ask_p": float(data["AskPrice1"]),
"ask_v": int(data["AskVolume1"]),
"upper_limit": float(data["UpperLimitPrice"]),
"lower_limit": float(data["LowerLimitPrice"]),
"trading_day": data["TradingDay"].decode() if isinstance(data["TradingDay"], bytes) else str(data["TradingDay"]),
"cum_volume": curr_vol,
"cum_amount": float(data["Turnover"]),
"cum_position": int(data["OpenInterest"]),
}
self._on_tick(tick, param, prev_bartime)
def _on_tick(self, tick: dict, param, prev_bartime: str):
"""处理 tick(简化版)"""
tsymbol = tick["symbol"]
# 获取上条 tick
prev_tick = self.data_store.last_tick.get(tsymbol)
if prev_tick:
bid_p = prev_tick["bid_p"]
ask_p = prev_tick["ask_p"]
bid_v = prev_tick["bid_v"]
ask_v = prev_tick["ask_v"]
else:
bid_p = tick["bid_p"]
ask_p = tick["ask_p"]
bid_v = tick["bid_v"]
ask_v = tick["ask_v"]
self.data_store.last_tick[tsymbol] = tick
timetick = tick["created_at"].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# 构建 DataFrame
tick_dt = pd.DataFrame({
"datetime": timetick,
"symbol": tick["symbol"],
"mainsym": tick["symbol"].rstrip("0123456789").upper(),
"lastprice": tick["price"],
"vol": tick["last_volume"],
"bid_p": bid_p,
"ask_p": ask_p,
"bid_v": bid_v,
"ask_v": ask_v,
}, index=[0])
self._tickdata(tick_dt, tsymbol, param, prev_bartime)
def _tickdata(self, df: pd.DataFrame, symbol: str, param, prev_bartime: str):
"""K线聚合(简化版)"""
from run import OrderFlowTrader
tickdata = df.copy()
tickdata["bartime"] = pd.to_datetime(tickdata["datetime"])
rdf = self.data_store.tickdata.get(symbol)
if rdf is not None:
rdftm = pd.to_datetime(rdf["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S")
now_bartime = pd.to_datetime(tickdata["bartime"][0]).strftime("%Y-%m-%d %H:%M:%S")
if now_bartime > rdftm:
# 新 K 线开始
with type('Lock', (), {'__enter__': lambda s: None, '__exit__': lambda s, *a: None})():
of = self.data_store.ofdata.pop(symbol, None)
if of is not None:
# 追加到 trade_dfs
existing = self.data_store.trade_dfs.get(symbol)
if existing is None:
self.data_store.trade_dfs[symbol] = of
else:
self.data_store.trade_dfs[symbol] = pd.concat([existing, of], ignore_index=True)
self.data_store.quote.pop(symbol, None)
self.data_store.tickdata.pop(symbol, None)
# 处理上一根 K 线的信号
trade_df = self.data_store.trade_dfs.get(symbol)
if trade_df is not None and len(trade_df) > 0:
last_row = trade_df.iloc[[-1]].copy()
current_price = last_row["close"].iloc[0]
# 计算 delta 累计
self.trader.calculate_delta_cumulative(trade_df, param)
# 检查止损
self.trader.check_stop_loss(last_row, param, current_price)
# 计算信号
bull_signal, bear_signal, dj_last, delta_last = self.trader.calculate_signals(trade_df, param)
if bull_signal or bear_signal:
if param.position < 0 and (bear_signal or bull_signal):
self.trader.execute_close_short(last_row, param, current_price)
elif param.position > 0 and (bull_signal or bear_signal):
self.trader.execute_close_long(last_row, param, current_price)
if bull_signal and param.position == 0:
self.trader.execute_open_long(last_row, param, current_price)
if bear_signal and param.position == 0:
self.trader.execute_open_short(last_row, param, current_price)
# 更新权益曲线
self.equity_tracker.update_equity(
last_row["datetime"].iloc[0],
current_price,
param.position
)
# 初始化新 K 线
tickdata["open"] = tickdata["lastprice"]
tickdata["high"] = tickdata["lastprice"]
tickdata["low"] = tickdata["lastprice"]
tickdata["close"] = tickdata["lastprice"]
tickdata["starttime"] = tickdata["datetime"]
else:
# 同一 K 线
tickdata["bartime"] = rdf["bartime"]
tickdata["open"] = rdf["open"]
lastprice_val = float(tickdata["lastprice"].values[0])
tickdata["high"] = max(lastprice_val, float(rdf["high"].values[0]))
tickdata["low"] = min(lastprice_val, float(rdf["low"].values[0]))
tickdata["close"] = tickdata["lastprice"]
tickdata["vol"] = df["vol"].values + rdf["vol"].values
tickdata["starttime"] = rdf["starttime"]
else:
# 首条 tick
tickdata["open"] = tickdata["lastprice"]
tickdata["high"] = tickdata["lastprice"]
tickdata["low"] = tickdata["lastprice"]
tickdata["close"] = tickdata["lastprice"]
tickdata["starttime"] = tickdata["datetime"]
self.data_store.tickdata[symbol] = tickdata
# 生成订单流数据
self._orderflow_df_new(tickdata, symbol, param)
def _orderflow_df_new(self, tickdata: pd.DataFrame, symbol: str, param):
"""生成订单流(简化版)"""
bar_vol = int(tickdata["vol"].values[0])
bar_close = float(tickdata["close"].values[0])
bar_open = float(tickdata["open"].values[0])
bar_low = float(tickdata["low"].values[0])
bar_high = float(tickdata["high"].values[0])
dt = pd.to_datetime(tickdata["bartime"].values[0]).strftime("%Y-%m-%d %H:%M:%S")
bid_p = float(tickdata["bid_p"].values[0])
ask_p = float(tickdata["ask_p"].values[0])
last_price = float(tickdata["lastprice"].values[0])
bid_dict = {}
ask_dict = {}
if last_price >= ask_p:
ask_dict[str(last_price)] = ask_dict.get(str(last_price), 0) + bar_vol
if last_price <= bid_p:
bid_dict[str(last_price)] = bid_dict.get(str(last_price), 0) + bar_vol
# 合并买卖盘
bid_result, ask_result = self._process_quote(bid_dict, ask_dict, symbol)
# 价格排序
price_list = sorted(bid_result.keys())
ask_list = [ask_result.get(p, 0) for p in price_list]
bid_list = [bid_result.get(p, 0) for p in price_list]
delta = sum(ask_list) - sum(bid_list)
# 构建 DataFrame
df = pd.DataFrame({
"price": [price_list],
"Ask": [ask_list],
"Bid": [bid_list],
})
df["symbol"] = symbol
df["datetime"] = dt
df["delta"] = delta
df["close"] = bar_close
df["open"] = bar_open
df["high"] = bar_high
df["low"] = bar_low
df["vol"] = bar_vol
df["dj"] = 0
df["dj_price_high"] = 0
df["dj_price_low"] = 0
self.data_store.ofdata[symbol] = df
def _process_quote(self, bid_dict: dict, ask_dict: dict, symbol: str):
"""合并买卖盘"""
dic = self.data_store.quote.get(symbol)
if dic:
bid_result = dic["bid_result"].copy()
ask_result = dic["ask_result"].copy()
else:
bid_result, ask_result = {}, {}
price_list = sorted(set(bid_dict.keys()) | set(ask_dict.keys()))
for price in price_list:
bid_val = int(bid_dict.get(price, 0))
ask_val = int(ask_dict.get(price, 0))
if bid_val:
bid_result[price] = bid_result.get(price, 0) + bid_val
ask_result.setdefault(price, 0)
if ask_val:
ask_result[price] = ask_result.get(price, 0) + ask_val
bid_result.setdefault(price, 0)
self.data_store.quote[symbol] = {"bid_result": bid_result, "ask_result": ask_result}
return bid_result, ask_result
# ============ 回测主类 ============
class Backtester:
"""回测主协调器"""
def __init__(self, param_dict: dict):
self.param_dict = param_dict
self.data_store = BacktestDataStore()
self.equity_tracker = EquityTracker(SYSTEM_CONFIG["initial_equity"])
self.trader = BacktestTrader(param_dict, self.equity_tracker)
self.results = {}
def run(self):
"""执行回测"""
mode = SYSTEM_CONFIG["backtest_mode"]
for symbol in self.param_dict.keys():
print(f"\n{'='*50}")
print(f"回测合约: {symbol},模式: {mode}")
print(f"{'='*50}")
# 每次重新初始化
self.data_store = BacktestDataStore()
self.equity_tracker = EquityTracker(SYSTEM_CONFIG["initial_equity"])
self.trader = BacktestTrader(self.param_dict, self.equity_tracker)
if mode == "ofdata":
replayer = OfdataReplayer(self, symbol)
else:
replayer = TickReplayer(self, symbol)
replayer.replay()
# 保存结果
self.equity_tracker.save_results(symbol)
# 打印指标
metrics = self.equity_tracker.get_metrics()
print(f"\n回测结果:")
print(f" 总收益率: {metrics['total_return_pct']}%")
print(f" 最大回撤: {metrics['max_drawdown_pct']}%")
print(f" 胜率: {metrics['win_rate']}%")
print(f" 总交易次数: {metrics['total_trades']}")
print(f" 盈利交易: {metrics['winning_trades']}, 亏损交易: {metrics['losing_trades']}")
print(f" 最终权益: {metrics['final_equity']}")
self.results[symbol] = metrics
return self.results
if __name__ == "__main__":
# 测试回测模块
from run import TradingParam
param_dict = {}
for symbol, config in TRADING_PARAMS.items():
p = TradingParam.from_config(symbol, config)
p.symbol = symbol
param_dict[symbol] = p
backtester = Backtester(param_dict)
backtester.run()