Files

455 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
'''
以下是代码的详细说明:
#公众号松鼠Quant
#主页www.quant789.com
#本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!!
#版权归松鼠Quant所有禁止转发、转卖源码违者必究。
1.
导入必要的模块和库:
backtrader 用于回测功能
datetime 用于处理日期和时间
GenericCSVData 用于从CSV文件加载数据
numpy 用于数值操作
time 用于时间相关操作
matplotlib.pyplot 用于绘图
2. 定义自定义手续费模板MyCommission
继承自bt.CommInfoBase
3.
定义自定义数据源类 GenericCSV_SIG
继承自 GenericCSVData并添加了两个额外的行'sig''delta'
定义了参数 'sig''delta'
4.
定义 MyStrategy_固定止损_跟踪止盈 类:
继承自 bt.Strategybacktrader的基础策略类
定义了两个参数trailing_stop_percent 和 fixed_stop_loss_percent
初始化策略并设置各种变量和指标
实现了 next 方法该方法在数据源的每个新的K线出现时被调用
根据当前K线数据更新跟踪止盈价格
实现了跟踪止盈出场和固定止损出场
根据信号处理多头和空头仓位
在策略执行过程中打印调试信息
5.
if __name__ == "__main__": 代码块:
使用 Cerebro 实例设置回测环境
使用 GenericCSV_SIG 数据源从CSV文件加载数据
将数据源和策略添加到 Cerebro 实例中
添加观察者和分析器以评估性能
设置初始资金和经纪人参数
运行回测并获取结果
打印回测报告,包括收益率、回撤、胜率和交易统计数据
使用 matplotlib 绘制回测结果
使用说明:使用前需要调整的相关参数如下
1.确定python到csv文件夹下运行,修改csv文件为需要运行的csv
2.MyStrategy_固定止损_跟踪止盈可以修改跟踪百分比和移动周期均线。
3.__init__函数中可以修改lost手数
4.next函数一、修改清仓时间参数每个品种不一致二、window_size和window_size_delta的周期暂为10三、修改“开多组合”和“开空组合”
5.__main__函数:一、修改回测时间段fromdate和todate二、根据交易平中设置初始资金、手续费单手保证金合约倍数
'''
# 需要进一步了解windows_size的计算规则日线
import backtrader as bt
from datetime import datetime
from datetime import time as s_time
from backtrader.feeds import GenericCSVData
import numpy as np
import pandas as pd
import time
import matplotlib.pyplot as plt
import os
# 导入表头解决图标中中文显示问题
from pylab import mpl
手续费汇总=0
class GenericCSV_SIG(GenericCSVData):
# 从基类继承,添加一个 'sig'delta
lines = ('sig','delta')
# 添加参数为从基类继承的参数
params = (('sig',6),('delta', 8))
class MyStrategy_固定止损_跟踪止盈(bt.Strategy):
params = (
('trailing_stop_percent', 0.02), # 跟踪止盈百分比
('fixed_stop_loss_percent', 0.01), # 固定止损百分比
# ('sma1_period', 60), # 移动平均线周期
# ('sma2_period',120),
)
def __init__(self):
self.Lots=1 #下单手数
self.signal = self.datas[0].sig # 使用sig字段作为策略的信号字段
self.delta= self.datas[0].delta
# 获取数据序列别名列表
line_aliases = self.datas[0].getlinealiases()
self.pos=0
print(line_aliases)
self.high=self.datas[0].high
self.low=self.datas[0].low
self.closes=self.datas[0].close
self.open=self.datas[0].open
self.trailing_stop_percent = self.params.trailing_stop_percent
self.short_trailing_stop_price = 0
self.long_trailing_stop_price = 0
self.fixed_stop_loss_percent = self.params.fixed_stop_loss_percent
self.sl_long_price=0
self.sl_shor_price=0
#240884432
self.out_long=0
self.out_short=0
self.rinei_ma=[]
self.rinei_mean=0
self.datetime_list= []
self.high_list = []
self.low_list = []
self.close_list = []
self.opens_list = []
self.deltas_list = []
self.delta_cumsum=[]
self.barN = 0
# self.sma1 = bt.indicators.SimpleMovingAverage(
# self.data, period=self.params.sma1_period
# )
# self.sma2 = bt.indicators.SimpleMovingAverage(
# self.data, period=self.params.sma2_period
# )
self.df = pd.DataFrame(columns=['datetime', 'high', 'low', 'close', 'open', 'delta', 'delta_cumsum'])
self.trader_df=pd.DataFrame(columns=['open', 'high', 'low', 'close', 'volume', 'openInterest','delta'])
def log(self, txt, dt=None):
'''可选,构建策略打印日志的函数:可用于打印订单记录或交易记录等'''
dt = dt or self.datas[0].datetime.date(0)
print('%s, %s' % (dt.isoformat(), txt))
def notify_order(self, order):
# 未被处理的订单
if order.status in [order.Submitted, order.Accepted]:
return
# 已经处理的订单
if order.status in [order.Completed, order.Canceled, order.Margin]:
global 手续费汇总
if order.isbuy():
手续费汇总 +=order.executed.comm
self.log(
'BUY EXECUTED, 订单编号:%.0f,成交价格: %.2f, 手续费滑点:%.2f, 成交量: %.2f, 品种: %s,手续费汇总:%.2f' %
(order.ref, # 订单编号
order.executed.price, # 成交价
order.executed.comm, # 佣金
order.executed.size, # 成交量
order.data._name,# 品种名称
手续费汇总))
else: # Sell
手续费汇总 +=order.executed.comm
self.log('SELL EXECUTED, 订单编号:%.0f,成交价格: %.2f, 手续费滑点:%.2f, 成交量: %.2f, 品种: %s,手续费汇总:%.2f' %
(order.ref,
order.executed.price,
order.executed.comm,
order.executed.size,
order.data._name,
手续费汇总))
def next(self):
#公众号松鼠Quant
#主页www.quant789.com
#本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!!
#版权归松鼠Quant所有禁止转发、转卖源码违者必究。
#bar线计数初始化
self.barN += 1
position = self.getposition(self.datas[0]).size
#时间轴
dt = bt.num2date(self.data.datetime[0])
#更新跟踪止损价格
def 每日重置数据():
# 获取当前时间
current_time = dt.time()
#print(current_time)
# 设置清仓操作的时间范围114:55到15:00
clearing_time1_start = s_time(14, 55)
clearing_time1_end = s_time(15, 0)
# 设置清仓操作的时间范围200:55到01:00
clearing_time2_start = s_time(22, 55)
clearing_time2_end = s_time(23, 0)
# 创建一个标志变量
clearing_executed = False
if clearing_time1_start <= current_time <= clearing_time1_end and not clearing_executed :
clearing_executed = True # 设置标志变量为已执行
self.rinei_ma=[]
self.delta_cumsum=[]
self.deltas_list=[]
elif clearing_time2_start <= current_time <= clearing_time2_end and not clearing_executed :
clearing_executed = True # 设置标志变量为已执行
self.rinei_ma=[]
self.delta_cumsum=[]
self.deltas_list=[]
# 如果不在任何时间范围内,可以执行其他操作
else:
self.rinei_ma.append(self.closes[0])
self.rinei_mean = np.mean(self.rinei_ma)
#self.delta_cumsum=[]
#self.deltas_list=[]
#print('rinei_ma',self.rinei_ma)
clearing_executed = False
pass
return clearing_executed
run_kg=每日重置数据()
#过滤成交量为0或小于0
if self.data.volume[0] <= 0 :
return
#print(f'volume,{self.data.volume[0]}')
if self.long_trailing_stop_price >0 and self.pos>0:
#print('datetime+sig: ',dt,'旧多头出线',self.long_trailing_stop_price,'low',self.low[0])
self.long_trailing_stop_price = self.low[0] if self.long_trailing_stop_price<self.low[0] else self.long_trailing_stop_price
#print('datetime+sig: ',dt,'多头出线',self.long_trailing_stop_price)
if self.short_trailing_stop_price >0 and self.pos<0:
#print('datetime+sig: ',dt,'旧空头出线',self.short_trailing_stop_price,'high',self.high[0])
self.short_trailing_stop_price = self.high[0] if self.high[0] <self.short_trailing_stop_price else self.short_trailing_stop_price
#print('datetime+sig: ',dt,'空头出线',self.short_trailing_stop_price)
self.out_long=self.long_trailing_stop_price * (1 - self.trailing_stop_percent)
self.out_short=self.short_trailing_stop_price*(1 + self.trailing_stop_percent)
#print('datetime+sig: ',dt,'空头出线',self.out_short)
#print('datetime+sig: ',dt,'多头出线',self.out_long)
# 跟踪出场
if self.out_long >0:
if self.low[0] < self.out_long and self.pos>0 and self.sl_long_price>0 and self.low[0]>self.sl_long_price:
print('--多头止盈出场datetime+sig: ',dt,'Trailing stop triggered: Closing position','TR',self.out_long,'low', self.low[0])
self.close(data=self.data, price=self.data.close[0],size=self.Lots, exectype=bt.Order.Market)
self.long_trailing_stop_price = 0
self.sl_long_price=0
self.out_long=0
self.pos = 0
if self.out_short>0:
if self.high[0] > self.out_short and self.pos<0 and self.sl_shor_price>0 and self.high[0]<self.sl_shor_price:
print('--空头止盈出场datetime+sig: ',dt,'Trailing stop triggered: Closing position: ','TR',self.out_short,'high', self.high[0])
self.close(data=self.data, price=self.data.close[0],size=self.Lots, exectype=bt.Order.Market)
self.short_trailing_stop_price = 0
self.sl_shor_price=0
self.out_shor=0
self.pos = 0
# 固定止损
self.fixed_stop_loss_L = self.sl_long_price * (1 - self.fixed_stop_loss_percent)
if self.sl_long_price>0 and self.fixed_stop_loss_L>0 and self.pos > 0 and self.closes[0] < self.fixed_stop_loss_L:
print('--多头止损datetime+sig: ', dt, 'Fixed stop loss triggered: Closing position', 'SL', self.fixed_stop_loss_L, 'close', self.closes[0])
self.close(data=self.data, price=self.data.close[0],size=self.Lots, exectype=bt.Order.Market)
self.long_trailing_stop_price = 0
self.sl_long_price=0
self.out_long = 0
self.pos = 0
self.fixed_stop_loss_S = self.sl_shor_price * (1 + self.fixed_stop_loss_percent)
if self.sl_shor_price>0 and self.fixed_stop_loss_S>0 and self.pos < 0 and self.closes[0] > self.fixed_stop_loss_S:
print('--空头止损datetime+sig: ', dt, 'Fixed stop loss triggered: Closing position', 'SL', self.fixed_stop_loss_S, 'close', self.closes[0])
self.close(data=self.data, price=self.data.close[0], size=self.Lots,exectype=bt.Order.Market)
self.short_trailing_stop_price = 0
self.sl_shor_price=0
self.out_short = 0
self.pos = 0
# 更新最高价和最低价的列表
self.datetime_list.append(dt)
self.high_list.append(self.data.high[0])
self.low_list.append(self.data.low[0])
self.close_list.append(self.data.close[0])
self.opens_list.append(self.data.open[0])
self.deltas_list.append(self.data.delta[0])
# 计算delta累计
self.delta_cumsum.append(sum(self.deltas_list))
# 将当前行数据添加到 DataFrame
# new_row = {
# 'datetime': dt,
# 'high': self.data.high[0],
# 'low': self.data.low[0],
# 'close': self.data.close[0],
# 'open': self.data.open[0],
# 'delta': self.data.delta[0],
# 'delta_cumsum': sum(self.deltas_list)
# }
# # 使用pandas.concat代替append
# self.df = pd.concat([self.df, pd.DataFrame([new_row])], ignore_index=True)
# # 检查文件是否存在
# csv_file_path = f"output.csv"
# if os.path.exists(csv_file_path):
# # 仅保存最后一行数据
# self.df.tail(1).to_csv(csv_file_path, mode='a', header=False, index=False)
# else:
# # 创建新文件并保存整个DataFrame
# self.df.to_csv(csv_file_path, index=False)
#
if run_kg==False : #
# # 构建delta的正数和负数
# positive_nums = [x for x in self.data.delta if x > 0]
# negative_nums = [x for x in self.data.delta if x < 0]
# positive_sums = [x for x in self.delta_cumsum if x > 0]
# negative_sums = [x for x in self.delta_cumsum if x < 0]
# #
# # 开多组合= self.rinei_mean>0 and self.closes[0]>self.rinei_mean and self.signal[0] >1 and self.data.delta[0]>1000 and self.delta_cumsum[-1]>1500
# # 开空组合= self.rinei_mean>0 and self.closes[0]<self.rinei_mean and self.signal[0] <-1 and self.data.delta[0]<-1000 and self.delta_cumsum[-1]<-1500
# 开多组合= self.rinei_mean>0 and self.closes[0]>self.rinei_mean and self.signal[0] > 1 and self.data.delta[0]>max(self.data.delta[-60:-1]) #and self.delta_cumsum[-1] > np.max(self.delta_cumsum[-61:-2]) #np.mean(self.data.delta_cumsum[-61:-2])
# 开空组合= self.rinei_mean>0 and self.closes[0]<self.rinei_mean and self.signal[0] <-1 and self.data.delta[0]<min(self.data.delta[-60:-1]) #and self.delta_cumsum[-1] < np.min(self.delta_cumsum[-61:-2]) #np.mean(self.data.delta_cumsum[-61:-2])
#print(self.delta_cumsum)
开多组合= self.rinei_mean>0 and self.closes[0]>self.rinei_mean and self.signal[0] >1 and self.data.delta[0]>1500 and self.delta_cumsum[-1]>2000
开空组合= self.rinei_mean>0 and self.closes[0]<self.rinei_mean and self.signal[0] <-1 and self.data.delta[0]<-1500 and self.delta_cumsum[-1]<-2000
平多条件=self.pos<0 and self.signal[0] >1
平空条件=self.pos>0 and self.signal[0] <-1
if self.pos !=1 : #
if 平多条件:
#print('datetime+sig: ', dt, 'Fixed stop loss triggered: Closing position', 'SL', self.fixed_stop_loss_S, 'close', self.closes[0])
self.close(data=self.data, price=self.data.close[0], exectype=bt.Order.Market)
self.short_trailing_stop_price = 0
self.sl_shor_price=0
self.out_short = 0
self.pos = 0
if 开多组合 : #
self.buy(data=self.data, price=self.data.close[0], size=1, exectype=bt.Order.Market)
self.pos=1
self.long_trailing_stop_price=self.low[0]
self.sl_long_price=self.data.open[0]
#print('datetime+sig: ',dt,' sig: ',self.signal[0],'保存多头价格: ',self.long_trailing_stop_price)
if self.pos !=-1 : #
if 平空条件:
#print('datetime+sig: ', dt, 'Fixed stop loss triggered: Closing position', 'SL', self.fixed_stop_loss_L, 'close', self.closes[0])
self.close(data=self.data, price=self.data.close[0], exectype=bt.Order.Market)
self.long_trailing_stop_price = 0
self.sl_long_price=0
self.out_long = 0
self.pos = 0
if 开空组合: #
self.sell(data=self.data, price=self.data.close[0], size=1, exectype=bt.Order.Market)
self.pos=-1
self.short_trailing_stop_price=self.high[0]
self.sl_shor_price=self.data.open[0]
#print('datetime+sig: ',dt,' sig: ',self.signal[0],'保存空头价格: ',self.short_trailing_stop_price)
if __name__ == "__main__":
#公众号松鼠Quant
#主页www.quant789.com
#本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!!
#版权归松鼠Quant所有禁止转发、转卖源码违者必究。
# 创建Cerebro实例
cerebro = bt.Cerebro()
#数据
csv_file='./tick生成的OF数据-own/back_ofdata_dj.csv' #
# 从CSV文件加载数据
data = GenericCSV_SIG(
dataname=csv_file,
fromdate=datetime(2023,1,1),
todate=datetime(2023,12,29),
timeframe=bt.TimeFrame.Minutes,
nullvalue=0.0,
dtformat='%Y-%m-%d %H:%M:%S',
datetime=0,
high=3,
low=4,
open=2,
close=1,
volume=5,
openinterest=None,
sig=6,
delta=8
)
# 添加数据到Cerebro实例
cerebro.adddata(data)
# 添加策略到Cerebro实例
cerebro.addstrategy(MyStrategy_固定止损_跟踪止盈)
# 添加观察者和分析器到Cerebro实例
#cerebro.addobserver(bt.observers.BuySell)
cerebro.addobserver(bt.observers.Value)
cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
初始资金=10000
cerebro.broker.setcash(初始资金) # 设置初始资金
#手续费,单手保证金,合约倍数
cerebro.broker.setcommission(commission=14, margin=5000.0,mult=10)#回测参数
# 运行回测
result = cerebro.run()
# 获取策略分析器中的结果
analyzer = result[0].analyzers
total_trades = analyzer.trades.get_analysis()['total']['total']
winning_trades = analyzer.trades.get_analysis()['won']['total']
# 获取TradeAnalyzer分析器的结果
trade_analyzer_result = analyzer.trades.get_analysis()
# 获取总收益额
total_profit = trade_analyzer_result.pnl.net.total
if total_trades > 0:
win_rate = winning_trades / total_trades
else:
win_rate = 0.0
# 打印回测报告
print('回测报告:')
print('期初权益', 初始资金)
print('期末权益', 初始资金+round(total_profit))
print('盈亏额', round(total_profit))
print('最大回撤率,', round(analyzer.drawdown.get_analysis()['drawdown'],2),'%')
print('胜率,', round(win_rate*100,2),'%')
print("交易次数,", total_trades)
print("盈利次数,", winning_trades)
print("亏损次数,", total_trades - winning_trades)
print('总手续费+滑点,', 手续费汇总)
手续费汇总=0
# 设置中文显示
mpl.rcParams["font.sans-serif"] = ["SimHei"]
mpl.rcParams["axes.unicode_minus"] = False
# 保存回测图像文件
plot = cerebro.plot()[0][0]
plot_filename = os.path.splitext(os.path.basename(csv_file))[0] +'ss'+ '_plot.png'
plot_path = os.path.join('部分回测报告', plot_filename)
plot.savefig(plot_path)
#公众号松鼠Quant
#主页www.quant789.com
#本策略仅作学习交流使用,实盘交易盈亏投资者个人负责!!!