from flask import Flask, render_template, jsonify import pandas as pd import numpy as np import os import ast import time app = Flask(__name__) # 加入邮件通知 import smtplib from email.mime.text import MIMEText # 导入 MIMEText 类发送纯文本邮件 from email.mime.multipart import ( MIMEMultipart, ) import akshare as ak # from email.mime.application import MIMEApplication # 配置邮件信息 receivers = ["240884432@qq.com"] # 设置邮件接收人地址 subject = "TD_Simnow_Signal" # 设置邮件主题 订单流策略交易信号 # 配置邮件服务器信息 smtp_server = "smtp.qq.com" # 设置发送邮件的 SMTP 服务器地址 smtp_port = 465 # 设置发送邮件的 SMTP 服务器端口号,一般为 25 端口 465 sender = "240884432@qq.com" # 设置发送邮件的邮箱地址 username = "240884432@qq.com" # 设置发送邮件的邮箱用户名 password = "osjyjmbqrzxtbjbf" # zrmpcgttataabhjh,设置发送邮件的邮箱密码或授权码 last_sent_time = 0 time_period = 48 # current_dir = os.path.dirname(os.path.abspath(__file__)) # os.chdir(current_dir) # print("已更改为新的工作目录:", current_dir) # 获取当前工作目录 current_directory = os.getcwd() print("当前工作目录:", current_directory) # 设置新的工作目录 new_directory = "C:/simnow_trader/traderdata" os.chdir(new_directory) # 验证新的工作目录 updated_directory = os.getcwd() print("已更改为新的工作目录:", updated_directory) # 获取当前文件夹中所有包含"ofdata"字符的CSV文件 def get_csv_files(): files = {} for filename in os.listdir(): if "ofdata" in filename and filename.endswith(".csv"): files[filename] = os.path.join(os.getcwd(), filename) return files def send_mail(text): global last_sent_time # 检查时间间隔 current_time = time.time() if current_time - last_sent_time < 60: print("current_time:", current_time) print("last_sent_time:", last_sent_time) print("一分钟内已发送过邮件,本次跳过") return # 直接退出,不执行发送 msg = MIMEMultipart() msg["From"] = sender msg["To"] = ";".join(receivers) msg["Subject"] = subject html_content = f"""
以下是数据的最后一列:
{text} """ msg.attach(MIMEText(html_content, "html")) smtp = smtplib.SMTP_SSL(smtp_server, smtp_port) smtp.login(username, password) smtp.sendmail(sender, receivers, msg.as_string()) smtp.quit() # 根据文件路径加载数据,只读取前12列 def load_data(file_path): df = pd.read_csv(file_path, usecols=range(12)) # 只读取前12列 df = df.drop_duplicates(subset="datetime", keep="first").reset_index(drop=True) df = df[df["high"] != df["low"]] df["delta"] = df["delta"].astype(float) df["datetime"] = pd.to_datetime( df["datetime"], format="mixed" ) # , dayfirst=True, format='mixed' df["delta累计"] = df.groupby(df["datetime"].dt.date)["delta"].cumsum() df = df.fillna("缺值") df["终极平滑值"], df["趋势方向"] = ultimate_smoother(df["close"], time_period) df["datetime"] = df["datetime"].dt.strftime("%Y-%m-%d %H:%M:%S") df["POC"] = add_poc_column(df) if len(df) >= 5 * time_period and ( df["趋势方向"].iloc[-1] != df["趋势方向"].iloc[-2] ): table_text = ( df.iloc[:, 3:].tail(1).to_html(index=False) ) # price,Ask,Bid,symbol,datetime,delta,close,open,high,low,volume,dj send_mail(table_text) else: pass return df.iloc[-20:].to_dict(orient="records") def safe_literal_eval(x): """带异常处理的安全转换""" try: return ast.literal_eval(x) except: return [] # 返回空列表作为占位符 def add_poc_column(df): # 安全转换列数据 df["price"] = df["price"].apply(safe_literal_eval) df["Ask"] = df["Ask"].apply(lambda x: list(map(int, safe_literal_eval(x)))) df["Bid"] = df["Bid"].apply(lambda x: list(map(int, safe_literal_eval(x)))) # 定义处理函数(带数据验证) def find_poc(row): # 验证三个列表长度一致且非空 if not (len(row["price"]) == len(row["Ask"]) == len(row["Bid"]) > 0): return "缺值" # 返回空值标记异常数据 sums = [a + b for a, b in zip(row["Ask"], row["Bid"])] try: max_index = sums.index(max(sums)) return row["price"][max_index] except ValueError: return "缺值" # 处理空求和列表情况 # 应用处理函数 df["POC"] = df.apply(find_poc, axis=1) # 可选:统计异常数据 error_count = df["POC"].isnull().sum() if error_count > 0: print(f"警告:发现 {error_count} 行异常数据(已标记为NaN)") return df["POC"] def ultimate_smoother(price, period): # 初始化变量(修正角度单位为弧度) a1 = np.exp(-1.414 * np.pi / period) b1 = 2 * a1 * np.cos(1.414 * np.pi / period) # 将180改为np.pi c2 = b1 c3 = -(a1**2) c1 = (1 + c2 - c3) / 4 # 准备输出序列 us = np.zeros(len(price)) us_new = np.zeros(len(price)) trend = [None] * (len(price)) ma_close = np.zeros(len(price)) # 前4个点用原始价格初始化 for i in range(len(price)): if i < 4: us[i] = price[i] else: # 应用递归公式 us[i] = ( (1 - c1) * price[i] + (2 * c1 - c2) * price[i - 1] - (c1 + c3) * price[i - 2] + c2 * us[i - 1] + c3 * us[i - 2] ) us_new = np.around(us, decimals=2) ma_close = price.rolling(window=5 * period).mean() if us_new[i] > price[i] and ma_close[i] > price[i]: trend[i] = "空头趋势" elif us_new[i] < price[i] and ma_close[i] < price[i]: trend[i] = "多头趋势" else: trend[i] = "无趋势" return us_new, trend @app.route("/") def index(): files = get_csv_files() # 获取所有符合条件的文件 # 默认显示第一个文件的数据 first_file = list(files.keys())[0] if files else None data = load_data(files[first_file]) if first_file else [] return render_template("index.html", data=data, files=files, file_name=first_file) @app.route("/data/