Files

223 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO
import pandas as pd
import numpy as np
import os
import ast
import time
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
# 加入邮件通知
import smtplib
from email.mime.text import MIMEText # 导入 MIMEText 类发送纯文本邮件
from email.mime.multipart import (
MIMEMultipart,
)
import akshare as ak
# from email.mime.application import MIMEApplication
# 配置邮件信息
receivers = ["240884432@qq.com"] # 设置邮件接收人地址
subject = "TD_Simnow_Signal" # 设置邮件主题 订单流策略交易信号
# 配置邮件服务器信息
smtp_server = "smtp.qq.com" # 设置发送邮件的 SMTP 服务器地址
smtp_port = 465 # 设置发送邮件的 SMTP 服务器端口号,一般为 25 端口 465
sender = "240884432@qq.com" # 设置发送邮件的邮箱地址
username = "240884432@qq.com" # 设置发送邮件的邮箱用户名
password = "osjyjmbqrzxtbjbf" # zrmpcgttataabhjh设置发送邮件的邮箱密码或授权码
last_sent_time = 0
time_period = 48
# current_dir = os.path.dirname(os.path.abspath(__file__))
# os.chdir(current_dir)
# print("已更改为新的工作目录:", current_dir)
# 获取当前工作目录
current_directory = os.getcwd()
print("当前工作目录:", current_directory)
# 设置新的工作目录
new_directory = r"C:\simnow_trader\traderdata"
os.chdir(new_directory)
# 验证新的工作目录
updated_directory = os.getcwd()
print("已更改为新的工作目录:", updated_directory)
# 获取当前文件夹中所有包含"ofdata"字符的CSV文件
def get_csv_files():
files = {}
for filename in os.listdir():
if "ofdata" in filename and filename.endswith(".csv"):
files[filename] = os.path.join(os.getcwd(), filename)
return files
def send_mail(text):
global last_sent_time
# 检查时间间隔
current_time = time.time()
if current_time - last_sent_time < 60:
print("current_time:",current_time)
print("last_sent_time:",last_sent_time)
print("一分钟内已发送过邮件,本次跳过")
return # 直接退出,不执行发送
msg = MIMEMultipart()
msg["From"] = sender
msg["To"] = ";".join(receivers)
msg["Subject"] = subject
html_content = f"""
<html>
<body>
<p>以下是数据的最后一列:</p>
{text}
</body>
</html>
"""
msg.attach(MIMEText(html_content, 'html'))
smtp = smtplib.SMTP_SSL(smtp_server, smtp_port)
smtp.login(username, password)
smtp.sendmail(sender, receivers, msg.as_string())
smtp.quit()
# 根据文件路径加载数据只读取前12列
def load_data(file_path):
df = pd.read_csv(file_path, usecols=range(12)) # 只读取前12列
df = df.drop_duplicates(subset='datetime', keep='first').reset_index(drop=True)
df = df[df['high'] != df['low']]
df["delta"] = df["delta"].astype(float)
df['datetime'] = pd.to_datetime(df['datetime'],format='ISO8601')#, dayfirst=True,
df['delta累计'] = df.groupby(df['datetime'].dt.date)['delta'].cumsum()
df = df.fillna('缺值')
df['终极平滑值'],df['趋势方向'] = ultimate_smoother(df['close'],time_period)
df['datetime'] = df['datetime'].dt.strftime("%Y-%m-%d %H:%M:%S")
df['POC'] = add_poc_column(df)
print(df.tail(2))
# if len(df) >=5*time_period and (df['趋势方向'].iloc[-1] != df['趋势方向'].iloc[-2]):
# table_text = df.iloc[:,3:].tail(1).to_html(index=False) #price,Ask,Bid,symbol,datetime,delta,close,open,high,low,volume,dj
# send_mail(table_text)
# else:
# pass
return df.iloc[-60:].to_dict(orient="records")
def safe_literal_eval(x):
"""带异常处理的安全转换"""
try:
return ast.literal_eval(x)
except:
return [] # 返回空列表作为占位符
def add_poc_column(df):
# 安全转换列数据
df['price'] = df['price'].apply(safe_literal_eval)
df['Ask'] = df['Ask'].apply(lambda x: list(map(int, safe_literal_eval(x))))
df['Bid'] = df['Bid'].apply(lambda x: list(map(int, safe_literal_eval(x))))
# 定义处理函数(带数据验证)
def find_poc(row):
# 验证三个列表长度一致且非空
if not (len(row['price']) == len(row['Ask']) == len(row['Bid']) > 0):
return '缺值' # 返回空值标记异常数据
sums = [a + b for a, b in zip(row['Ask'], row['Bid'])]
try:
max_index = sums.index(max(sums))
return row['price'][max_index]
except ValueError:
return '缺值' # 处理空求和列表情况
# 应用处理函数
df['POC'] = df.apply(find_poc, axis=1)
# 可选:统计异常数据
error_count = df['POC'].isnull().sum()
if error_count > 0:
print(f"警告:发现 {error_count} 行异常数据已标记为NaN")
return df['POC']
def ultimate_smoother(price, period):
# 初始化变量(修正角度单位为弧度)
a1 = np.exp(-1.414 * np.pi / period)
b1 = 2 * a1 * np.cos(1.414 * np.pi / period) # 将180改为np.pi
c2 = b1
c3 = -a1 ** 2
c1 = (1 + c2 - c3) / 4
# 准备输出序列
us = np.zeros(len(price))
us_new = np.zeros(len(price))
trend = [None]*(len(price))
ma_close = np.zeros(len(price))
# 前4个点用原始价格初始化
for i in range(len(price)):
if i < 4:
us[i] = price[i]
else:
# 应用递归公式
us[i] = (1 - c1) * price[i] + (2 * c1 - c2) * price[i-1] \
- (c1 + c3) * price[i-2] + c2 * us[i-1] + c3 * us[i-2]
us_new = np.around(us, decimals=2)
ma_close = price.rolling(window=5*period).mean()
if us_new[i]>price[i] and ma_close[i]>price[i]:
trend[i] = '空头趋势'
elif us_new[i]<price[i] and ma_close[i]<price[i]:
trend[i] = '多头趋势'
else:
trend[i] = '无趋势'
return us_new,trend
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/data")
def get_data():
try:
files = get_csv_files()
data = {}
for symbol, filename in files.items():
loaded_data = load_data(filename)
if loaded_data:
data[symbol] = loaded_data
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)})
def background_thread():
"""后台线程,定期发送数据更新"""
while True:
files = get_csv_files()
data = {}
for file_name, file_path in files.items():
data[file_name] = load_data(file_path)
socketio.emit('data_update', data)
time.sleep(1) # 每秒更新一次
@socketio.on('connect')
def handle_connect():
print('Client connected')
# 启动后台线程
socketio.start_background_task(background_thread)
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
if __name__ == "__main__":
socketio.run(app, host='0.0.0.0', port=5000, debug=True) # 监听所有网络接口