166 lines
5.3 KiB
Python
166 lines
5.3 KiB
Python
import akshare as ak
|
||
import pandas as pd
|
||
import numpy as np
|
||
import os
|
||
import ast
|
||
import time
|
||
import mplfinance as mpf
|
||
import matplotlib.pyplot as plt
|
||
|
||
# from flask import Flask, render_template, jsonify
|
||
|
||
# # 读取数据(注意替换实际文件路径)
|
||
file_path = r"C:\Users\zhouj\Desktop\IM888_5MIN_2024_ofdata.csv"
|
||
df = pd.read_csv(file_path, parse_dates=["datetime"], index_col="datetime").sort_index()
|
||
|
||
# 使用akshare获取数据
|
||
symbol_name = 'IM2503'
|
||
time_period = 30
|
||
# df = ak.futures_zh_minute_sina(symbol=symbol_name, period="15")
|
||
|
||
# ultimate_smoother函数
|
||
def ultimate_smoother(price, period):
|
||
# 初始化变量(修正角度单位为弧度)
|
||
a1 = np.exp(-1.414 * np.pi / period)
|
||
b1 = 2 * a1 * np.cos(1.414 * np.pi / period) # 将180改为np.pi
|
||
c2 = b1
|
||
c3 = -a1**2
|
||
c1 = (1 + c2 - c3) / 4
|
||
|
||
# 准备输出序列
|
||
us = np.zeros(len(price))
|
||
us_new = np.zeros(len(price))
|
||
trend = [None] * (len(price))
|
||
ma_close = np.zeros(len(price))
|
||
|
||
# 前4个点用原始价格初始化
|
||
for i in range(len(price)):
|
||
if i < 4:
|
||
us[i] = price[i]
|
||
else:
|
||
# 应用递归公式
|
||
us[i] = (1 - c1) * price[i] + (2 * c1 - c2) * price[i-1] \
|
||
- (c1 + c3) * price[i-2] + c2 * us[i-1] + c3 * us[i-2]
|
||
|
||
us_new = np.around(us, decimals=2)
|
||
ma_close = price.rolling(window=5 * period).mean()
|
||
|
||
if us_new[i] > price[i] and ma_close[i] > price[i]:
|
||
trend[i] = '空头趋势'
|
||
elif us_new[i] < price[i] and ma_close[i] < price[i]:
|
||
trend[i] = '多头趋势'
|
||
else:
|
||
trend[i] = '无趋势'
|
||
|
||
return us_new, trend
|
||
|
||
def safe_literal_eval(x):
|
||
"""带异常处理的安全转换"""
|
||
try:
|
||
return ast.literal_eval(x)
|
||
except:
|
||
return [] # 返回空列表作为占位符
|
||
|
||
def add_poc_column(df):
|
||
# 安全转换列数据
|
||
df['price'] = df['price'].apply(safe_literal_eval)
|
||
df['Ask'] = df['Ask'].apply(lambda x: list(map(int, safe_literal_eval(x))))
|
||
df['Bid'] = df['Bid'].apply(lambda x: list(map(int, safe_literal_eval(x))))
|
||
|
||
# 定义处理函数(带数据验证)
|
||
def find_poc(row):
|
||
# 验证三个列表长度一致且非空
|
||
if not (len(row['price']) == len(row['Ask']) == len(row['Bid']) > 0):
|
||
return '缺值' # 返回空值标记异常数据
|
||
|
||
sums = [a + b for a, b in zip(row['Ask'], row['Bid'])]
|
||
try:
|
||
max_index = sums.index(max(sums))
|
||
return row['price'][max_index]
|
||
except ValueError:
|
||
return '缺值' # 处理空求和列表情况
|
||
|
||
# 应用处理函数
|
||
df['POC'] = df.apply(find_poc, axis=1)
|
||
|
||
# 可选:统计异常数据
|
||
error_count = df['POC'].isnull().sum()
|
||
if error_count > 0:
|
||
print(f"警告:发现 {error_count} 行异常数据(已标记为NaN)")
|
||
|
||
return df['POC']
|
||
df.reset_index()
|
||
# df['datetime'] = pd.to_datetime(df['datetime'])
|
||
df['终极平滑值'], df['趋势方向'] = ultimate_smoother(df["close"], time_period)
|
||
df['POC'] = add_poc_column(df)
|
||
df.index = pd.to_datetime(df.index)
|
||
print(df.head(5))
|
||
|
||
# 规范列名(兼容MA列)
|
||
df = df.rename(columns={"close": "Close", "open": "Open", "high": "High", "low": "Low", "volume":"Volume"})
|
||
|
||
# 创建一个新的DataFrame来存储有效的POC值
|
||
valid_poc = pd.DataFrame(index=df.index) # 创建与主数据相同索引的DataFrame
|
||
valid_poc['POC'] = df['POC'].apply(lambda x: float(x) if x != '缺值' else np.nan) # 将有效POC值转换为float,无效值设为NaN
|
||
|
||
# 创建附加绘图对象(关键步骤)
|
||
apd = [
|
||
mpf.make_addplot(
|
||
df["终极平滑值"], # MA数据列
|
||
color="dodgerblue", # 线条颜色
|
||
width=1, # 线宽
|
||
panel=0, # 显示在主图区域
|
||
ylabel="终极平滑值", # 右侧Y轴标签
|
||
mav=(60),#绘制均线
|
||
secondary_y=True, # 与主图共享左侧Y轴
|
||
),
|
||
mpf.make_addplot(
|
||
valid_poc['POC'], # 使用处理后的POC值
|
||
type='scatter', # 使用散点图
|
||
marker='o', # 圆形标记
|
||
markersize=100, # 标记大小
|
||
color='yellow', # 标记颜色
|
||
alpha=0.5, # 透明度
|
||
panel=0, # 显示在主图区域
|
||
ylabel="POC", # 右侧Y轴标签
|
||
secondary_y=False, # 与主图共享左侧Y轴
|
||
)
|
||
]
|
||
|
||
# 定义 mplfinance 的自定义风格
|
||
mc = mpf.make_marketcolors(up='r', down='g', volume='inherit')
|
||
s = mpf.make_mpf_style(base_mpf_style='charles', marketcolors=mc, rc={'font.sans-serif': ['Microsoft YaHei']})
|
||
|
||
# 创建图形
|
||
fig, axes = mpf.plot(
|
||
df,
|
||
type="candle",
|
||
style=s,
|
||
addplot=apd, # 添加ultimate_smoother线和POC散点
|
||
title=f"{symbol_name} K-Line with ultimate_smoother and POC",
|
||
ylabel="Price",
|
||
show_nontrading=False,
|
||
returnfig=True # 返回图形对象
|
||
)
|
||
|
||
# 获取主图轴对象
|
||
ax = axes[0]
|
||
|
||
# 添加POC值标注
|
||
for idx, row in valid_poc.iterrows():
|
||
if pd.notnull(row['POC']):
|
||
ax.annotate(
|
||
f'{row["POC"]:.2f}',
|
||
xy=(idx, row['POC']),
|
||
xytext=(10, 10),
|
||
textcoords='offset points',
|
||
color='black',
|
||
fontsize=8,
|
||
bbox=dict(boxstyle='round,pad=0.5', fc='yellow', alpha=0.5),
|
||
arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0')
|
||
)
|
||
|
||
# 显示图形
|
||
plt.show()
|
||
|