Enhance trading workflow with new order flow management

- Added dingdanliu_nb_mflow for improved order processing
- Updated related scripts and configurations to support new functionality
This commit is contained in:
Win_home
2025-03-15 22:45:08 +08:00
parent e2c54c6409
commit f925dff46b
21 changed files with 5345 additions and 0 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,342 @@
import pandas as pd
import os
from datetime import time as s_time
from datetime import datetime
import chardet
import numpy as np
# 日盘商品期货交易品种
commodity_day_dict = {'bb': s_time(15,00), 'jd': s_time(15,00), 'lh': s_time(15,00), 'l': s_time(15,00), 'fb': s_time(15,00), 'ec': s_time(15,00),
'AP': s_time(15,00), 'CJ': s_time(15,00), 'JR': s_time(15,00), 'LR': s_time(15,00), 'RS': s_time(15,00), 'PK': s_time(15,00),
'PM': s_time(15,00), 'PX': s_time(15,00), 'RI': s_time(15,00), 'SF': s_time(15,00), 'SM': s_time(15,00), 'UR': s_time(15,00),
'WH': s_time(15,00), 'ao': s_time(15,00), 'br': s_time(15,00), 'wr': s_time(15,00),}
# 夜盘商品期货交易品种
commodity_night_dict = {'sc': s_time(2,30), 'bc': s_time(1,0), 'lu': s_time(23,0), 'nr': s_time(23,0),'au': s_time(2,30), 'ag': s_time(2,30),
'ss': s_time(1,0), 'sn': s_time(1,0), 'ni': s_time(1,0), 'pb': s_time(1,0),'zn': s_time(1,0), 'al': s_time(1,0), 'cu': s_time(1,0),
'ru': s_time(23,0), 'rb': s_time(23,0), 'hc': s_time(23,0), 'fu': s_time(23,0), 'bu': s_time(23,0), 'sp': s_time(23,0),
'PF': s_time(23,0), 'SR': s_time(23,0), 'CF': s_time(23,0), 'CY': s_time(23,0), 'RM': s_time(23,0), 'MA': s_time(23,0),
'TA': s_time(23,0), 'ZC': s_time(23,0), 'FG': s_time(23,0), 'OI': s_time(23,0), 'SA': s_time(23,0),
'p': s_time(23,0), 'j': s_time(23,0), 'jm': s_time(23,0), 'i': s_time(23,0), 'l': s_time(23,0), 'v': s_time(23,0),
'pp': s_time(23,0), 'eg': s_time(23,0), 'c': s_time(23,0), 'cs': s_time(23,0), 'y': s_time(23,0), 'm': s_time(23,0),
'a': s_time(23,0), 'b': s_time(23,0), 'rr': s_time(23,0), 'eb': s_time(23,0), 'pg': s_time(23,0), 'SH': s_time(23,00)}
# 金融期货交易品种
financial_time_dict = {'IH': s_time(15,00), 'IF': s_time(15,00), 'IC': s_time(15,00), 'IM': s_time(15,00),'T': s_time(15,00), 'TS': s_time(15,00),
'TF': s_time(15,00), 'TL': s_time(15,00)}
# 所有已列入的筛选品种
all_dict = {k: v for d in [commodity_day_dict, commodity_night_dict, financial_time_dict] for k, v in d.items()}
def split_alpha_numeric(string):
alpha_chars = ""
numeric_chars = ""
for char in string:
if char.isalpha():
alpha_chars += char
elif char.isdigit():
numeric_chars += char
return alpha_chars, numeric_chars
def merged_old_tickdata(merged_up_df, sp_char, alpha_chars, code_value):
# merged_up_df = pd.DataFrame()
# merged_up_df,alpha_chars,code_value = merged_old_unprocessed_tickdata(all_csv_files, sp_char)
while alpha_chars not in all_dict.keys():
print("%s期货品种未列入所有筛选条件中!!!"%(code_value))
continue
merged_df = pd.DataFrame()
merged_df =pd.DataFrame({'main_contract':merged_up_df['统一代码'],'symbol':merged_up_df['合约代码'],'datetime':merged_up_df['时间'],'lastprice':merged_up_df['最新'],'volume':merged_up_df['成交量'],
'bid_p':merged_up_df['买一价'],'ask_p':merged_up_df['卖一价'],'bid_v':merged_up_df['买一量'],'ask_v':merged_up_df['卖一量']})
del merged_up_df
merged_df['datetime'] = pd.to_datetime(merged_df['datetime'])
merged_df['tmp_time'] = merged_df['datetime'].dt.strftime('%H:%M:%S.%f')
merged_df['time'] = merged_df['tmp_time'].apply(lambda x: datetime.strptime(x, '%H:%M:%S.%f')).dt.time
del merged_df['tmp_time']
merged_df = filter_tickdata_time(merged_df, alpha_chars)
del merged_df['time']
merged_df['datetime'] = sorted(merged_df['datetime'])
print("%s%s数据生成成功!"%(code_value,sp_char))
return merged_df
def merged_new_tickdata(merged_up_df, sp_char, alpha_chars, code_value):
# merged_up_df = pd.DataFrame()
# merged_up_df,alpha_chars,code_value = merged_new_unprocessed_tickdata(all_csv_files, sp_char)
while alpha_chars not in all_dict.keys():
print("%s期货品种未列入所有筛选条件中!!!"%(code_value))
continue
#日期修正
# merged_df['业务日期'] = pd.to_datetime(merged_df['业务日期'])
# merged_df['业务日期'] = merged_df['业务日期'].dt.strftime('%Y-%m-%d')
# merged_df['最后修改时间'] = pd.to_datetime(merged_df['最后修改时间'])
merged_up_df['datetime'] = merged_up_df['业务日期'].astype(str) + ' '+merged_up_df['最后修改时间'].astype(str) + '.' + merged_up_df['最后修改毫秒'].astype(str) # merged_df['最后修改时间'].dt.time.astype(str)
# 将'datetime' 列的数据类型更改为 datetime 格式如果数据转换少8个小时可以用timedelta处理
merged_up_df['datetime'] = pd.to_datetime(merged_up_df['datetime'], errors='coerce', format='%Y-%m-%d %H:%M:%S.%f')
#计算瞬时成交量
merged_up_df['volume'] = merged_up_df['数量'] - merged_up_df['数量'].shift(1)
merged_up_df['volume'] = merged_up_df['volume'].fillna(0)
merged_df = pd.DataFrame()
merged_df =pd.DataFrame({'main_contract':merged_up_df['统一代码'],'symbol':merged_up_df['合约代码'],'datetime':merged_up_df['datetime'],'lastprice':merged_up_df['最新价'],'volume':merged_up_df['volume'],
'bid_p':merged_up_df['申买价一'],'ask_p':merged_up_df['申卖价一'],'bid_v':merged_up_df['申买量一'],'ask_v':merged_up_df['申卖量一']})
del merged_up_df
# merged_df['datetime'] = pd.to_datetime(merged_df['datetime'])
merged_df['tmp_time'] = merged_df['datetime'].dt.strftime('%H:%M:%S.%f')
merged_df['time'] = merged_df['tmp_time'].apply(lambda x: datetime.strptime(x, '%H:%M:%S.%f')).dt.time
del merged_df['tmp_time']
merged_df = filter_tickdata_time(merged_df, alpha_chars)
del merged_df['time']
# merged_df['datetime'] = sorted(merged_df['datetime'])
sorted_merged_df = merged_df.sort_values(by = ['datetime'], inplace=True)
print("%s%s数据生成成功!"%(code_value,sp_char))
return merged_df
def filter_tickdata_time(filter_df, alpha_chars):
if alpha_chars in financial_time_dict.keys():
drop_index1 = pd.DataFrame().index
drop_index2 = filter_df.loc[(filter_df['time'] > s_time(11, 30, 0, 000000)) & (filter_df['time'] < s_time(13, 0, 0, 000000))].index
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 0, 0, 000000)) | (filter_df['time'] < s_time(9, 30, 0, 000000))].index
drop_index4 = pd.DataFrame().index
print("按照中金所交易时间筛选金融期货品种")
elif alpha_chars in commodity_night_dict.keys():
if commodity_night_dict[alpha_chars] == s_time(23,00):
drop_index1 = filter_df.loc[(filter_df['time'] > s_time(10, 15, 0, 000000)) & (filter_df['time'] < s_time(10, 30, 0, 000000))].index
drop_index2 = filter_df.loc[(filter_df['time'] > s_time(11, 30, 0, 000000)) & (filter_df['time'] < s_time(13, 30, 0, 000000))].index
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 0, 0, 000000)) & (filter_df['time'] < s_time(21, 0, 0, 000000))].index
drop_index4 = filter_df.loc[(filter_df['time'] > s_time(23, 0, 0, 000000)) | (filter_df['time'] < s_time(9, 0, 0, 000000))].index
print("按照夜盘截止交易时间为23:00筛选商品期货品种")
elif commodity_night_dict[alpha_chars] == s_time(1,00):
drop_index1 = filter_df.loc[(filter_df['time'] > s_time(10, 15, 0, 000000)) & (filter_df['time'] < s_time(10, 30, 0, 000000))].index
drop_index2 = filter_df.loc[(filter_df['time'] > s_time(11, 30, 0, 000000)) & (filter_df['time'] < s_time(13, 30, 0, 000000))].index
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 0, 0, 000000)) & (filter_df['time'] < s_time(21, 0, 0, 000000))].index
drop_index4 = filter_df.loc[(filter_df['time'] > s_time(1, 0, 0, 000000)) & (filter_df['time'] < s_time(9, 0, 0, 000000))].index
print("按照夜盘截止交易时间为1:00筛选商品期货品种")
elif commodity_night_dict[alpha_chars] == s_time(2,30):
drop_index1 = filter_df.loc[(filter_df['time'] > s_time(10, 15, 0, 000000)) & (filter_df['time'] < s_time(10, 30, 0, 000000))].index
drop_index2 = filter_df.loc[(filter_df['time'] > s_time(11, 30, 0, 000000)) & (filter_df['time'] < s_time(13, 30, 0, 000000))].index
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 0, 0, 000000)) & (filter_df['time'] < s_time(21, 0, 0, 000000))].index
drop_index4 = filter_df.loc[(filter_df['time'] > s_time(2, 30, 0, 000000)) & (filter_df['time'] < s_time(9, 0, 0, 000000))].index
print("按照夜盘截止交易时间为2:30筛选商品期货品种")
else:
print("夜盘截止交易时间未设置或者设置错误!!!")
elif alpha_chars in commodity_day_dict.keys():
drop_index1 = filter_df.loc[(filter_df['time'] > s_time(10, 15, 0, 000000)) & (filter_df['time'] < s_time(10, 30, 0, 000000))].index
drop_index2 = filter_df.loc[(filter_df['time'] > s_time(11, 30, 0, 000000)) & (filter_df['time'] < s_time(13, 30, 0, 000000))].index
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 0, 0, 000000)) | (filter_df['time'] < s_time(9, 0, 0, 000000))].index
drop_index4 = pd.DataFrame().index
print("按照无夜盘筛选商品期货品种")
else:
print("%s期货品种未执行时间筛选中!!!"%(alpha_chars))
# 清理不在交易时间段的数据
# 数据清理
filter_df.drop(labels=drop_index1, axis=0, inplace=True)
filter_df.drop(drop_index2, axis=0, inplace=True)
filter_df.drop(drop_index3, axis=0, inplace=True)
filter_df.drop(drop_index4, axis=0, inplace=True)
return filter_df
def insert_main_contract(df):
# 添加主力连续的合约代码主力连续为888指数连续可以用999次主力连续可以使用889表头用“统一代码”
alpha_chars, numeric_chars = split_alpha_numeric(df.loc[0,'合约代码'])
code_value = alpha_chars + "889"
print("code_value characters:", code_value)
df.insert(loc=0,column="统一代码", value=code_value)
return df, alpha_chars, code_value
def merged_old_unprocessed_tickdata(all_csv_files, sp_char):
csv_files = [sp_file for sp_file in all_csv_files if sp_char in sp_file]
print("csv_files:", csv_files)
merged_up_df = pd.DataFrame()
dir = os.getcwd()
fileNum_errors = 0
# 循环遍历每个csv文件
for file in csv_files:
try:
# 读取csv文件并使用第一行为列标题编译不通过可以改为gbk
df = pd.read_csv(file,
header=0,
# usecols=[ 1, 2, 3, 7, 12, 13, 14, 15],
# names=[
# "合约代码",
# "时间",
# "最新",
# "成交量",
# "买一价",
# "卖一价",
# "买一量",
# "卖一量",
# ],
encoding='gbk',
low_memory= False,
# skiprows=0,
# parse_dates=['时间'] # 注意此处增加的排序,为了后面按时间排序
)
except:
file_path = os.path.join(dir, file)
fileNum_errors += 1
with open(file_path, 'rb') as file:
data = file.read()
# 使用chardet检测编码
detected_encoding = chardet.detect(data)['encoding']
# print("%s当前文件不为gbk格式,其文件格式为%s,需要转换为gbk格式,错误总数为%s"%(file,detected_encoding,fileNum_errors))
print("%s:%s当前文件不为gbk格式,其文件格式为%s,需要转换为gbk格式,错误总数为%s"%(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),file_path,detected_encoding,fileNum_errors))
with open('output_error.txt', 'a') as f:
print("%s:%s当前文件不为gbk格式,其文件格式为%s,需要转换为gbk格式,错误总数为%s"%(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),file_path,detected_encoding,fileNum_errors), file = f)
# 删除重复行
df.drop_duplicates(inplace=True)
# 将数据合并到新的DataFrame中
merged_up_df = pd.concat([merged_up_df, df], ignore_index=True)
# 删除重复列
merged_up_df.drop_duplicates(subset=merged_up_df.columns.tolist(), inplace=True)
# 重置行索引
merged_up_df.reset_index(inplace=True, drop=True)
merged_up_df,alpha_chars,code_value = insert_main_contract(merged_up_df)
# 打印提示信息
# print("按年份未处理的CSV文件合并成功")
return merged_up_df,alpha_chars,code_value
def merged_new_unprocessed_tickdata(all_csv_files, sp_char):
csv_files = [sp_file for sp_file in all_csv_files if sp_char in sp_file]
print("csv_files:", csv_files)
merged_up_df = pd.DataFrame()
dir = os.getcwd()
fileNum_errors = 0
# 循环遍历每个csv文件
for file in csv_files:
try:
# 读取csv文件并使用第一行为列标题编译不通过可以改为gbk
df = pd.read_csv(
file,
header=0,
# usecols=[0, 1, 4, 11, 20, 21, 22, 23, 24, 25, 43],
# names=[
# "交易日",
# "合约代码",
# "最新价",
# "数量",
# "最后修改时间",
# "最后修改毫秒",
# "申买价一",
# "申买量一",
# "申卖价一",
# "申卖量一",
# "业务日期",
# ],
encoding='gbk',
low_memory= False,
# skiprows=0,
# parse_dates=['业务日期','最后修改时间','最后修改毫秒'] # 注意此处增加的排序,为了后面按时间排序
)
except:
file_path = os.path.join(dir, file)
fileNum_errors += 1
with open(file_path, 'rb') as file:
data = file.read()
# 使用chardet检测编码
detected_encoding = chardet.detect(data)['encoding']
# print("%s当前文件不为gbk格式,其文件格式为%s,需要转换为gbk格式,错误总数为%s"%(file_path,detected_encoding,fileNum_errors))
print("%s:%s当前文件不为gbk格式,其文件格式为%s,需要转换为gbk格式,错误总数为%s"%(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),file_path,detected_encoding,fileNum_errors))
with open('output_error.txt', 'a') as f:
print("%s:%s当前文件不为gbk格式,其文件格式为%s,需要转换为gbk格式,错误总数为%s"%(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),file_path,detected_encoding,fileNum_errors), file = f)
# 删除重复行
df.drop_duplicates(inplace=True)
# 将数据合并到新的DataFrame中
merged_up_df = pd.concat([merged_up_df, df], ignore_index=True)
# 删除重复列
merged_up_df.drop_duplicates(subset=merged_up_df.columns.tolist(), inplace=True)
# 重置行索引
merged_up_df.reset_index(inplace=True, drop=True)
merged_up_df,alpha_chars,code_value = insert_main_contract(merged_up_df)
# 打印提示信息
# print("按年份未处理的CSV文件合并成功")
return merged_up_df,alpha_chars,code_value
def reinstatement_tickdata(merged_rs_df):
merged_rs_df['main_contract'] = merged_rs_df['main_contract'].astype(str)
merged_rs_df['symbol'] = merged_rs_df['symbol'].astype(str)
merged_rs_df['datetime'] = pd.to_datetime(merged_rs_df['datetime'], errors='coerce', format='%Y-%m-%d %H:%M:%S.%f')
# merged_rs_df['lastprice'] = merged_rs_df['lastprice'].astype(float)
merged_rs_df['volume'] = merged_rs_df['volume'].astype(int)
# merged_rs_df['bid_p'] = merged_rs_df['bid_p'].astype(float)
# merged_rs_df['ask_p'] = merged_rs_df['ask_p'].astype(float)
merged_rs_df['bid_v'] = merged_rs_df['bid_v'].astype(int)
merged_rs_df['ask_v'] = merged_rs_df['ask_v'].astype(int)
# 等比复权,先不考虑
# df['复权因子'] = df['卖一价'].shift() / df['买一价']
# df['复权因子'] = np.where(df['合约代码'] != df['合约代码'].shift(), df['卖一价'].shift() / df['买一价'], 1)
# df['复权因子'] = df['复权因子'].fillna(1)
# df['买一价_adj'] = df['买一价'] * df['复权因子'].cumprod()
# df['卖一价_adj'] = df['卖一价'] * df['复权因子'].cumprod()
# df['最新_adj'] = df['最新'] * df['复权因子'].cumprod()
# 等差复权
merged_rs_df['复权因子'] = np.where(merged_rs_df['symbol'] != merged_rs_df['symbol'].shift(), merged_rs_df['ask_p'].shift() - merged_rs_df['bid_p'], 0)
merged_rs_df['复权因子'] = merged_rs_df['复权因子'].fillna(0)
merged_rs_df['bid_p_adj'] = merged_rs_df['bid_p'] + merged_rs_df['复权因子'].cumsum()
merged_rs_df['ask_p_adj'] = merged_rs_df['ask_p'] + merged_rs_df['复权因子'].cumsum()
merged_rs_df['lastprice_adj'] = merged_rs_df['lastprice'] + merged_rs_df['复权因子'].cumsum()
# 将调整后的数值替换原来的值
merged_rs_df['bid_p'] = merged_rs_df['bid_p_adj'].round(4)
merged_rs_df['ask_p'] = merged_rs_df['ask_p_adj'].round(4)
merged_rs_df['lastprice'] = merged_rs_df['lastprice_adj'].round(4)
# 删除多余的值
del merged_rs_df['复权因子']
del merged_rs_df['bid_p_adj']
del merged_rs_df['ask_p_adj']
del merged_rs_df['lastprice_adj']
return merged_rs_df
# def find_files(all_csv_files):
# all_csv_files = sorted(all_csv_files)
# sp_old_chars = ['_2019','_2020','_2021']
# sp_old_chars = sorted(sp_old_chars)
# sp_new_chars = ['_2022','_2023']
# sp_new_chars = sorted(sp_new_chars)
# csv_old_files = [file for file in all_csv_files if any(sp_char in file for sp_char in sp_old_chars)]
# csv_new_files = [file for file in all_csv_files if any(sp_char in file for sp_char in sp_new_chars)]
# return csv_old_files, csv_new_files

View File

@@ -0,0 +1,174 @@
import pandas as pd
import os
from datetime import time as s_time
from datetime import datetime
import chardet
import numpy as np
# 日盘商品期货交易品种
commodity_day_dict = {'bb': s_time(15,00), 'jd': s_time(15,00), 'lh': s_time(15,00), 'l': s_time(15,00), 'fb': s_time(15,00), 'ec': s_time(15,00),
'AP': s_time(15,00), 'CJ': s_time(15,00), 'JR': s_time(15,00), 'LR': s_time(15,00), 'RS': s_time(15,00), 'PK': s_time(15,00),
'PM': s_time(15,00), 'PX': s_time(15,00), 'RI': s_time(15,00), 'SF': s_time(15,00), 'SM': s_time(15,00), 'UR': s_time(15,00),
'WH': s_time(15,00), 'ao': s_time(15,00), 'br': s_time(15,00), 'wr': s_time(15,00),}
# 夜盘商品期货交易品种
commodity_night_dict = {'sc': s_time(2,30), 'bc': s_time(1,0), 'lu': s_time(23,0), 'nr': s_time(23,0),'au': s_time(2,30), 'ag': s_time(2,30),
'ss': s_time(1,0), 'sn': s_time(1,0), 'ni': s_time(1,0), 'pb': s_time(1,0),'zn': s_time(1,0), 'al': s_time(1,0), 'cu': s_time(1,0),
'ru': s_time(23,0), 'rb': s_time(23,0), 'hc': s_time(23,0), 'fu': s_time(23,0), 'bu': s_time(23,0), 'sp': s_time(23,0),
'PF': s_time(23,0), 'SR': s_time(23,0), 'CF': s_time(23,0), 'CY': s_time(23,0), 'RM': s_time(23,0), 'MA': s_time(23,0),
'TA': s_time(23,0), 'ZC': s_time(23,0), 'FG': s_time(23,0), 'OI': s_time(23,0), 'SA': s_time(23,0),
'p': s_time(23,0), 'j': s_time(23,0), 'jm': s_time(23,0), 'i': s_time(23,0), 'l': s_time(23,0), 'v': s_time(23,0),
'pp': s_time(23,0), 'eg': s_time(23,0), 'c': s_time(23,0), 'cs': s_time(23,0), 'y': s_time(23,0), 'm': s_time(23,0),
'a': s_time(23,0), 'b': s_time(23,0), 'rr': s_time(23,0), 'eb': s_time(23,0), 'pg': s_time(23,0), 'SH': s_time(23,00)}
# 金融期货交易品种
financial_time_dict = {'IH': s_time(15,00), 'IF': s_time(15,00), 'IC': s_time(15,00), 'IM': s_time(15,00),'T': s_time(15,15), 'TS': s_time(15,15),
'TF': s_time(15,15), 'TL': s_time(15,15)}
# 所有已列入的筛选品种
all_dict = {k: v for d in [commodity_day_dict, commodity_night_dict, financial_time_dict] for k, v in d.items()}
def split_alpha_numeric(string):
alpha_chars = ""
numeric_chars = ""
for char in string:
if char.isalpha():
alpha_chars += char
elif char.isdigit():
numeric_chars += char
return alpha_chars, numeric_chars
def merged_new_tickdata(merged_up_df, alpha_chars):
merged_up_df['datetime'] = merged_up_df['交易日'].astype(str) + ' '+merged_up_df['最后修改时间'].astype(str) + '.' + merged_up_df['最后修改毫秒'].astype(str) # merged_df['最后修改时间'].dt.time.astype(str)
# 将'datetime' 列的数据类型更改为 datetime 格式如果数据转换少8个小时可以用timedelta处理
merged_up_df['datetime'] = pd.to_datetime(merged_up_df['datetime'], errors='coerce', format='%Y-%m-%d %H:%M:%S.%f')
#计算瞬时成交量
merged_up_df['volume'] = merged_up_df['数量'] - merged_up_df['数量'].shift(1)
merged_up_df['volume'] = merged_up_df['volume'].fillna(0)
merged_df = pd.DataFrame()
merged_df =pd.DataFrame({'main_contract':merged_up_df['统一代码'],'symbol':merged_up_df['合约代码'],'datetime':merged_up_df['datetime'],'lastprice':merged_up_df['最新价'],'volume':merged_up_df['数量'],
'bid_p':merged_up_df['申买价一'],'ask_p':merged_up_df['申卖价一'],'bid_v':merged_up_df['申买量一'],'ask_v':merged_up_df['申卖量一']})
del merged_up_df
# merged_df['datetime'] = pd.to_datetime(merged_df['datetime'])
merged_df['tmp_time'] = merged_df['datetime'].dt.strftime('%H:%M:%S.%f')
merged_df['time'] = merged_df['tmp_time'].apply(lambda x: datetime.strptime(x, '%H:%M:%S.%f')).dt.time
del merged_df['tmp_time']
merged_df = filter_tickdata_time(merged_df, alpha_chars)
del merged_df['time']
# merged_df['datetime'] = sorted(merged_df['datetime'])
sorted_merged_df = merged_df.sort_values(by = ['datetime'], inplace=True)
# print("%s%s数据生成成功!"%(code_value,sp_char))
return merged_df
def filter_tickdata_time(filter_df, alpha_chars):
# 由于落到本地的时间有延迟建议结束时间延迟1秒。
if alpha_chars in financial_time_dict.keys():
drop_index1 = pd.DataFrame().index
drop_index2 = filter_df.loc[(filter_df['time'] > s_time(11, 30, 0, 500000)) & (filter_df['time'] < s_time(13, 0, 0, 000000))].index
if alpha_chars in ['IH', 'IF', 'IC', 'IM']:
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 0, 0, 500000)) | (filter_df['time'] < s_time(9, 30, 0, 000000))].index
print("按照中金所股指期货交易时间筛选金融期货品种")
else:
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 15, 0, 500000)) | (filter_df['time'] < s_time(9, 30, 0, 000000))].index
print("按照中金所国债期货交易时间筛选金融期货品种")
drop_index4 = pd.DataFrame().index
print("按照中金所交易时间筛选金融期货品种")
elif alpha_chars in commodity_night_dict.keys():
if commodity_night_dict[alpha_chars] == s_time(23,00):
drop_index1 = filter_df.loc[(filter_df['time'] > s_time(10, 15, 0, 000000)) & (filter_df['time'] < s_time(10, 30, 0, 000000))].index
drop_index2 = filter_df.loc[(filter_df['time'] > s_time(11, 30, 0, 000000)) & (filter_df['time'] < s_time(13, 30, 0, 000000))].index
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 0, 0, 000000)) & (filter_df['time'] < s_time(21, 0, 0, 000000))].index
drop_index4 = filter_df.loc[(filter_df['time'] > s_time(23, 0, 0, 000000)) | (filter_df['time'] < s_time(9, 0, 0, 000000))].index
print("按照夜盘截止交易时间为23:00筛选商品期货品种")
elif commodity_night_dict[alpha_chars] == s_time(1,00):
drop_index1 = filter_df.loc[(filter_df['time'] > s_time(10, 15, 0, 000000)) & (filter_df['time'] < s_time(10, 30, 0, 000000))].index
drop_index2 = filter_df.loc[(filter_df['time'] > s_time(11, 30, 0, 000000)) & (filter_df['time'] < s_time(13, 30, 0, 000000))].index
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 0, 0, 000000)) & (filter_df['time'] < s_time(21, 0, 0, 000000))].index
drop_index4 = filter_df.loc[(filter_df['time'] > s_time(1, 0, 0, 000000)) & (filter_df['time'] < s_time(9, 0, 0, 000000))].index
print("按照夜盘截止交易时间为1:00筛选商品期货品种")
elif commodity_night_dict[alpha_chars] == s_time(2,30):
drop_index1 = filter_df.loc[(filter_df['time'] > s_time(10, 15, 0, 000000)) & (filter_df['time'] < s_time(10, 30, 0, 000000))].index
drop_index2 = filter_df.loc[(filter_df['time'] > s_time(11, 30, 0, 000000)) & (filter_df['time'] < s_time(13, 30, 0, 000000))].index
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 0, 0, 000000)) & (filter_df['time'] < s_time(21, 0, 0, 000000))].index
drop_index4 = filter_df.loc[(filter_df['time'] > s_time(2, 30, 0, 000000)) & (filter_df['time'] < s_time(9, 0, 0, 000000))].index
print("按照夜盘截止交易时间为2:30筛选商品期货品种")
else:
print("夜盘截止交易时间未设置或者设置错误!!!")
elif alpha_chars in commodity_day_dict.keys():
drop_index1 = filter_df.loc[(filter_df['time'] > s_time(10, 15, 0, 000000)) & (filter_df['time'] < s_time(10, 30, 0, 000000))].index
drop_index2 = filter_df.loc[(filter_df['time'] > s_time(11, 30, 0, 000000)) & (filter_df['time'] < s_time(13, 30, 0, 000000))].index
drop_index3 = filter_df.loc[(filter_df['time'] > s_time(15, 0, 0, 000000)) | (filter_df['time'] < s_time(9, 0, 0, 000000))].index
drop_index4 = pd.DataFrame().index
print("按照无夜盘筛选商品期货品种")
else:
print("%s期货品种未执行时间筛选中!!!"%(alpha_chars))
# 清理不在交易时间段的数据
# 数据清理
filter_df.drop(labels=drop_index1, axis=0, inplace=True)
filter_df.drop(drop_index2, axis=0, inplace=True)
filter_df.drop(drop_index3, axis=0, inplace=True)
filter_df.drop(drop_index4, axis=0, inplace=True)
return filter_df
def insert_main_contract(df):
# 添加主力连续的合约代码主力连续为888指数连续可以用999次主力连续可以使用889表头用“统一代码”
alpha_chars, numeric_chars = split_alpha_numeric(df.loc[0,'合约代码'])
code_value = alpha_chars + "889"
print("code_value characters:", code_value)
df.insert(loc=0,column="统一代码", value=code_value)
return df, alpha_chars, code_value
def reinstatement_tickdata(merged_rs_df):
merged_rs_df['main_contract'] = merged_rs_df['main_contract'].astype(str)
merged_rs_df['symbol'] = merged_rs_df['symbol'].astype(str)
merged_rs_df['datetime'] = pd.to_datetime(merged_rs_df['datetime'], errors='coerce', format='%Y-%m-%d %H:%M:%S.%f')
# merged_rs_df['lastprice'] = merged_rs_df['lastprice'].astype(float)
merged_rs_df['volume'] = merged_rs_df['volume'].astype(int)
# merged_rs_df['bid_p'] = merged_rs_df['bid_p'].astype(float)
# merged_rs_df['ask_p'] = merged_rs_df['ask_p'].astype(float)
merged_rs_df['bid_v'] = merged_rs_df['bid_v'].astype(int)
merged_rs_df['ask_v'] = merged_rs_df['ask_v'].astype(int)
# 等比复权,先不考虑
# df['复权因子'] = df['卖一价'].shift() / df['买一价']
# df['复权因子'] = np.where(df['合约代码'] != df['合约代码'].shift(), df['卖一价'].shift() / df['买一价'], 1)
# df['复权因子'] = df['复权因子'].fillna(1)
# df['买一价_adj'] = df['买一价'] * df['复权因子'].cumprod()
# df['卖一价_adj'] = df['卖一价'] * df['复权因子'].cumprod()
# df['最新_adj'] = df['最新'] * df['复权因子'].cumprod()
# 等差复权
merged_rs_df['复权因子'] = np.where(merged_rs_df['symbol'] != merged_rs_df['symbol'].shift(), merged_rs_df['ask_p'].shift() - merged_rs_df['bid_p'], 0)
merged_rs_df['复权因子'] = merged_rs_df['复权因子'].fillna(0)
merged_rs_df['bid_p_adj'] = merged_rs_df['bid_p'] + merged_rs_df['复权因子'].cumsum()
merged_rs_df['ask_p_adj'] = merged_rs_df['ask_p'] + merged_rs_df['复权因子'].cumsum()
merged_rs_df['lastprice_adj'] = merged_rs_df['lastprice'] + merged_rs_df['复权因子'].cumsum()
# 将调整后的数值替换原来的值
merged_rs_df['bid_p'] = merged_rs_df['bid_p_adj'].round(4)
merged_rs_df['ask_p'] = merged_rs_df['ask_p_adj'].round(4)
merged_rs_df['lastprice'] = merged_rs_df['lastprice_adj'].round(4)
# 删除多余的值
del merged_rs_df['复权因子']
del merged_rs_df['bid_p_adj']
del merged_rs_df['ask_p_adj']
del merged_rs_df['lastprice_adj']
return merged_rs_df

View File

@@ -0,0 +1,68 @@
import pandas as pd
import os
from datetime import time as s_time
from datetime import datetime
import chardet
import numpy as np
def split_alpha_numeric(string):
alpha_chars = ""
numeric_chars = ""
for char in string:
if char.isalpha():
alpha_chars += char
elif char.isdigit():
numeric_chars += char
return alpha_chars, numeric_chars
def merged_old_unprocessed_tickdata(all_csv_files, sp_char):
csv_files = [sp_file for sp_file in all_csv_files if sp_char in sp_file]
print("csv_files:", csv_files)
merged_up_df = pd.DataFrame()
dir = os.getcwd()
fileNum_errors = 0
# 循环遍历每个csv文件
for file in csv_files:
try:
df = pd.read_csv(file,
header=0,
encoding='gbk',
low_memory= False,
# skiprows=0,
# parse_dates=['时间'] # 注意此处增加的排序,为了后面按时间排序
)
except:
file_path = os.path.join(dir, file)
fileNum_errors += 1
with open(file_path, 'rb') as file:
data = file.read()
# 使用chardet检测编码
detected_encoding = chardet.detect(data)['encoding']
# print("%s当前文件不为gbk格式,其文件格式为%s,需要转换为gbk格式,错误总数为%s"%(file,detected_encoding,fileNum_errors))
print("%s:%s当前文件不为gbk格式,其文件格式为%s,需要转换为gbk格式,错误总数为%s"%(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),file_path,detected_encoding,fileNum_errors))
with open('output_error.txt', 'a') as f:
print("%s:%s当前文件不为gbk格式,其文件格式为%s,需要转换为gbk格式,错误总数为%s"%(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),file_path,detected_encoding,fileNum_errors), file = f)
# 删除重复行
df.drop_duplicates(inplace=True)
# 将数据合并到新的DataFrame中
merged_up_df = pd.concat([merged_up_df, df], ignore_index=True)
# 删除重复列
merged_up_df.drop_duplicates(subset=merged_up_df.columns.tolist(), inplace=True)
# 重置行索引
merged_up_df.reset_index(inplace=True, drop=True)
# merged_up_df,alpha_chars,code_value = insert_main_contract(merged_up_df)
# 打印提示信息
# print("按年份未处理的CSV文件合并成功")
return merged_up_df #,alpha_chars,code_value