几个因子的复现与测试
关于我的代码
我这部分的代码很可能存在严重的错误,但是因为时间比较紧,一下子我也没检查出bug在哪。我建议大家还是从零开始构建自己的框架,不要试图将我的代码改对(如果你发现了我的错误,请联系我,非常感谢🙇♀️)。我的代码仅供参考,不保证结果的正确性。
在开始前:关于作业的几点建议
- 如果你追求效率,尽可能不要使用
.ipynb
文件编写代码。它虽然可以写一步看一步,在缺乏完整思路时提供更加直观的参考;但一旦文件大到一定程度,运行流畅度将是灾难性的,尤其是当你试图可视化某个庞大的DataFrame
时; - 当然,如果你对代码没有完整的思路,
.ipynb
确实是一种更好的选择,因为它可以让你及时查看当前的运行结果; - 建议在搭建框架时避免
DataFrame
的碎片化操作,尽可能不要对DataFrame
频繁操作,这会导致你的程序性能大幅降低。等框架写好了再进行调整很可能会引发意想不到的问题; - 上手
joblib
的使用。joblib
是一个用于并行计算的库,因为因子的计算是互不依赖的,具有并行计算的先天优势,一个简单的Parallel
就可以实现大幅加速。以计算速度最慢的std_FF3factor_1m
因子为例,在并行优化前,我的函数需要运行约30分钟;而经过优化后,它可以在2分钟内给出结果
作业描述
基本流程
- 股票池筛选:剔除上市不满252个交易日的股票(
data/IPO_date.csv
);剔除已退市股票(data/delist_date.csv
);剔除当月末截面日停牌股票 - 原始因子值:计算2010年12月31日~2024年11月30日的因子值,如果在第一步中股票被筛选出股票池,因子值设置为
NaN
- 因子预处理
- 去除极值:将因子值大于
mean + 3 * std
的因子置为mean + 3 * std
,将因子值小于mean - 3 * std
的因子置为mean - 3 * std
- 行业市值中性化:因子为因变量,以A股市值因子(流通市值取对数、去极值、zscore标准化)、中信一级行业哑变量(1~29转换为29个的0/1变量;行业30忽略)为自变量,OLS,取残差
- 标准化:zscore标准化,转换为均值为0、标准差为1的分布
- 去除极值:将因子值大于
- 因子检验
- RankIC法:计算因子值和下个自然月收益率的Spearman相关系数
- 分层测试法:对非
NaN
因子值进行降序排列,前20%股票视作第1层,20%-40%股票视作第2层……,下个自然月持有该层股票,等权计算收益率
因子介绍
return_1m
:近21个交易日区间收益率;turn_1m
:近21个交易日每日换手率的均值;std_1m
:近21个交易日每日收益率的标准差;std_FF3factor_1m
:近21个交易日内,日收益率序列对中证全指日收益率、SMB因子日收益率、HML因子日收益率序列进行多元线性回归的残差的标准差。- SMB因子日收益率:在每个交易日,对股票按总市值排序,后30%的股票算术平均收益率减去前30%股票算术平均收益率;
- HML因子日收益率:在每个交易日,对股票按市净率倒序排序,前30%股票算术平均收益率减去后30%股票算术平均收益率。
import和初始化
因为我用了
pandas_market_calendars
,所以实际上并没有用到day
,但我还是初始化了。但建议还是使用老师的数据,两者似乎存在微小的差异
# import
import pandas as pd
from pandas_market_calendars import get_calendar
import numpy as np
from tqdm import tqdm # 进度条,可视化程序运行进度
from joblib import Parallel, delayed # 并行计算
import concurrent.futures # 并行计算
from pytz import timezone
import statsmodels.api as sm
from scipy.stats import zscore
from scipy.stats import spearmanr
import matplotlib.pyplot as plt
# 读入数据,初始化
calendar = get_calendar('XSHG')
stock_code = pd.read_csv('data/stock_code_info.csv')
stock_code = stock_code['stock_code']
day = pd.read_csv('data/day.csv')
day = day.loc[(day['date'] > '2010-11-30') & (day['date'] <= '2024-11-30')].reset_index(drop=True)
day = day['date']
month = pd.read_csv('data/month.csv')
month = month.loc[(month['date'] > '2010-11-30') & (month['date'] <= '2024-11-30')].reset_index(drop=True)
month = month['date']
amt_day = pd.read_csv('data/amt_day.csv')
IPO_date = pd.read_csv('data/IPO_date_info.csv')
IPO_date = IPO_date.rename(columns={'Unnamed: 0': 'stock_code'})
delist_date = pd.read_csv('data/delist_date_info.csv')
delist_date = delist_date.rename(columns={'Unnamed: 0': 'stock_code'})
close_adj_day = pd.read_csv('data/close_adj_day.csv')
close_adj_day = close_adj_day.rename(columns={'Unnamed: 0': 'stock_code'})
turn_day = pd.read_csv('data/turn_day.csv')
turn_day = turn_day.rename(columns={'Unnamed: 0': 'stock_code'})
close_adj_day_sf = close_adj_day.shift(1, axis=1)
close_adj_day_value = close_adj_day.iloc[:, 2:]
close_adj_day_sf_value = close_adj_day_sf.iloc[:, 2:]
csiall_day = pd.read_csv('data/csiall_day.csv')
close_day = pd.read_csv('data/close_day.csv')
share_totala_day = pd.read_csv('data/share_totala_day.csv')
pb_lf_day = pd.read_csv('data/pb_lf_day.csv')
cs_indus_code = pd.read_csv('data/cs_indus_code_day.csv')
float_a_shares_day = pd.read_csv('data/float_a_shares_day.csv')
股票池筛选
- 剔除上市不满252个交易日的股票:遍历每一只股票的IPO日期,加上252个交易日(直接加在下标上,相比while循环速度快得多),如果超过当前日期,剔除该股票;
- 剔除已退市股票:在
data/delist_date.csv
中读取退市日期,如果该日期有效,剔除该股票; - 剔除当月末截面日停牌股票:遍历每一只股票的月末截面日,如果停牌,剔除该股票。
stock_chosen = []
for month_cut in month: # 遍历每个月末截面日
day_amt = amt_day[month_cut] # 获取月末截面日的销售数据
day_amt = pd.DataFrame({'stock_code': stock_code, 'day_amt': day_amt, 'cut_day': month_cut, 'in_pool': 1})
day_amt = pd.merge(day_amt, IPO_date, on='stock_code') # 合并IPO数据
day_amt = pd.merge(day_amt, delist_date, on='stock_code') # 合并退市数据
# 筛选股票
day_amt.loc[day_amt['cut_day'] < day_amt['to_date'], 'in_pool'] = 0 # 剔除上市不满252个交易日的股票
day_amt.loc[day_amt['day_amt'].isnull(), 'in_pool'] = 0 # 剔除月末截面停牌股票
day_amt.loc[(day_amt['delist_date'].notnull()) & (day_amt['cut_day'] > day_amt['delist_date']), 'in_pool'] = 0 # 剔除退市股票,空值表示还没退市
day_amt = day_amt[day_amt['in_pool'] == 1]
day_pool = day_amt['stock_code'].tolist()
day_pool.insert(0, month_cut)
stock_chosen.append(day_pool)
stock_chosen = pd.DataFrame(stock_chosen)
stock_chosen.columns = ['cut_day'] + [f'stock_{i}' for i in range(1, stock_chosen.shape[1])]
原始因子计算
因子1:return_1m_raw
把每个月末截面日的复权收盘价和21天前的复权收盘价拿出来,计算收益率
return_1m_raw = pd.DataFrame({'stock_code': stock_code})
for i in range(len(month)):
stock = stock_chosen.iloc[i, 1:] # 取出该月末截面日的股票池
stock = stock.dropna()
stock = pd.DataFrame({'stock_code': stock})
day = month[i]
close_adj_today = pd.merge(stock, close_adj_day, on='stock_code', how='inner') # 取出需要的股票收盘价数据
date_index = close_adj_today.columns.get_loc(day) # 找到当月截面日的列索引
price_now = close_adj_today.iloc[:, date_index]
price_21_before = close_adj_today.iloc[:, date_index - 21] # 回溯21天
return_1m = (price_now - price_21_before) / price_21_before
stock = stock.reset_index(drop=True)
return_1m = return_1m.reset_index(drop=True)
# 重置索引,保证stock和return_1m索引一致,否则会因为索引不一致而报错
return_1m = pd.DataFrame({day: return_1m})
return_1m = pd.concat([stock, return_1m], axis=1)
return_1m_raw = pd.merge(return_1m_raw, return_1m, on='stock_code', how='outer')
return_1m_raw_T = return_1m_raw.T
return_1m_raw_T.columns = return_1m_raw_T.iloc[0]
return_1m_raw_T = return_1m_raw_T.iloc[1:]
return_1m_raw_T.to_csv('answer/factor_1_raw.csv') # 转置后再存储,以符合格式要求
因子2:turn_1m_raw
遍历每个月末截面日,并向前取出连续21天的换手率,取均值。只要将上面的代码复制下来然后微改一下就可以了
turn_1m_raw = pd.DataFrame({'stock_code': stock_code})
for i in range(len(month)):
stock = stock_chosen.iloc[i, 1:] # 取出该月末截面日的股票池
stock = stock.dropna()
stock = pd.DataFrame({'stock_code': stock})
day = month[i]
turn_today = pd.merge(stock, turn_day, on='stock_code', how='inner') # 取出需要的换手率数据
date_index = turn_today.columns.get_loc(day) # 找到当月截面日的列索引
turn_21 = turn_today.iloc[:, (date_index - 20):(date_index + 1)] # 取出21天的换手率
turn_1m = turn_21.mean(axis=1)
stock = stock.reset_index(drop=True)
turn_1m = turn_1m.reset_index(drop=True)
turn_1m = pd.DataFrame({day: turn_1m})
turn_1m = pd.concat([stock, turn_1m], axis=1)
turn_1m_raw = pd.merge(turn_1m_raw, turn_1m, on='stock_code', how='outer')
turn_1m_raw_T = turn_1m_raw.T
turn_1m_raw_T.columns = turn_1m_raw_T.iloc[0]
turn_1m_raw_T = turn_1m_raw_T.iloc[1:]
turn_1m_raw_T.to_csv('answer/factor_2_raw.csv')
因子3:std_1m_raw
和上面的两个因子思路一致,取出过去22天的股价,计算21天的收益率,然后计算标准差
std_1m_raw = pd.DataFrame({'stock_code': stock_code})
for i in range(len(month)):
stock = stock_chosen.iloc[i, 1:] # 取出该月末截面日的股票池
stock = stock.dropna()
stock = pd.DataFrame({'stock_code': stock})
day = month[i]
close_adj_today = pd.merge(stock, close_adj_day, on='stock_code', how='inner') # 取出需要的股票收盘价数据
date_index = close_adj_today.columns.get_loc(day) # 找到当月截面日的列索引
price_22 = close_adj_today.iloc[:, (date_index - 21):(date_index + 1)] # 获取过去22天的股价数据
return_21 = price_22.pct_change(axis=1, fill_method=None).iloc[:, 1:] # 计算21天的收益率
std_1m = return_21.std(axis=1) # 计算收益率的标准差
stock = stock.reset_index(drop=True)
std_1m = std_1m.reset_index(drop=True)
std_1m = pd.DataFrame({day: std_1m})
std_1m = pd.concat([stock, std_1m], axis=1)
std_1m_raw = pd.merge(std_1m_raw, std_1m, on='stock_code', how='outer')
std_1m_raw_T = std_1m_raw.T
std_1m_raw_T.columns = std_1m_raw_T.iloc[0]
std_1m_raw_T = std_1m_raw_T.iloc[1:]
std_1m_raw_T.to_csv('answer/factor_3_raw.csv')
因子4:std_FF3factor_1m_raw
-
计算中证全指日收益率:按照中证全指的定义进行计算即可,是个GPT就会写
# 计算收益率 return_1 = (close_adj_day_value - close_adj_day_sf_value) / close_adj_day_sf_value return_1 = pd.concat([stock_code, return_1], axis=1) # 计算市值 market_value = (share_totala_day.iloc[:, 1:] * close_day.iloc[:, 1:]).reset_index(drop=True) market_value = pd.concat([stock_code, market_value], axis=1) # 排名 market_rank = market_value.rank(axis=0, ascending=False) pb_rank = pb_lf_day.rank(axis=0, ascending=False) day_list = market_rank.columns[2:] # 计算日历股价增长率(CSA) csiall_day_value = csiall_day.iloc[0, 2:].values # 直接取值,避免DataFrame不必要的转换 csiall_day_value_sf = csiall_day.shift(1, axis=1).iloc[0, 2:].values # 同样处理 csa = (csiall_day_value - csiall_day_value_sf) / csiall_day_value_sf # 创建最终的DataFrame stock_csa_df = pd.DataFrame({'date': day_list, 'CSA': csa})
-
计算SMB:先计算市值(收盘价乘以股本),然后根据市值排序,注意排序时忽略
NaN
。此外,这里stock_code
不再保留真正的股票代码,而用排序前的下标代替,方便后面直接通过下标定位到股票,避免不必要的查找# 提前计算所有股票的收益率,这样每次就不需要进行merge returns_dict = return_1.set_index('stock_code')[market_value.columns[2:]] # 每只股票在每一天的收益率 SMB = [] for date in tqdm(market_value.columns[2:]): top_30_num = int(market_rank[date].count() * 0.3) # 找到前30%和后30%的股票下标 top_30_idx = market_rank[date].nsmallest(top_30_num).index bottom_30_idx = market_rank[date].nlargest(top_30_num).index # 提取收益率 top_30_return = returns_dict.loc[stock_code[top_30_idx], date] bottom_30_return = returns_dict.loc[stock_code[bottom_30_idx], date] # 计算SMB SMB_top = top_30_return.mean() SMB_bottom = bottom_30_return.mean() SMB.append(SMB_bottom - SMB_top) # 将结果转化为DataFrame SMB_df = pd.DataFrame({'date': day_list, 'SMB': SMB})
-
计算HML:先计算市净率,因为市净率不会为负,所以按照市净率的倒数排序和按市净率倒序排序等效,为避免不必要的计算,选择后者;SMB复制下来微改一下即可
# 提前计算所有股票的收益率,避免碎片化操作 returns_dict = return_1.set_index('stock_code')[market_value.columns[2:]] # 每只股票在每一天的收益率 HML = [] for date in tqdm(market_value.columns[2:]): top_30_num = int(pb_rank[date].count() * 0.3) # 找到前30%和后30%的股票下标 top_30_idx = pb_rank[date].nlargest(top_30_num).index bottom_30_idx = pb_rank[date].nsmallest(top_30_num).index # 提取收益率 top_30_return = returns_dict.loc[stock_code[top_30_idx], date] bottom_30_return = returns_dict.loc[stock_code[bottom_30_idx], date] # 计算HML HML_top = top_30_return.mean() HML_bottom = bottom_30_return.mean() HML.append(HML_bottom - HML_top) # 将结果转化为 DataFrame HML_df = pd.DataFrame({'date': market_value.columns[2:], 'HML': HML})
-
OLS计算因子:建议采用并行计算,否则运行速度过慢。同时,要尽可能避免在二重循环中对
Dataframe
进行过多操作,这样的碎片化操作会导致程序会越跑越慢std_ff3_raw = pd.DataFrame(index=month, columns=stock_code) def process_month(i): day = month[i] stock = stock_chosen.iloc[i].dropna() stock = pd.DataFrame({'stock_code': stock}) stock = stock[1:] # 右移一天计算收益率序列 close_adj_today = pd.merge(stock, close_adj_day, on='stock_code', how='inner') date_index = close_adj_today.columns.get_loc(day) day_range = close_adj_today.iloc[:, (date_index - 21):(date_index + 1)] day_range_sf = day_range.shift(1, axis=1) day_range = day_range.drop(day_range.columns[0], axis=1) day_range_sf = day_range_sf.drop(day_range_sf.columns[0], axis=1) return_day = (day_range - day_range_sf) / day_range_sf # print(return_day) result_dict = {} # 计算因子 for stock_index in return_day.index: returns = return_day.loc[stock_index] start_date = returns.index[-21] end_date = returns.index[-1] # 获取因子数据(提前计算索引) start_index = HML_df[HML_df['date'] == start_date].index[0] end_index = HML_df[HML_df['date'] == end_date].index[0] hml_data = HML_df.loc[start_index:end_index, 'HML'].reset_index(drop=True) smb_data = SMB_df.loc[start_index:end_index, 'SMB'].reset_index(drop=True) csa_data = stock_csa_df.loc[start_index:end_index, 'CSA'].reset_index(drop=True) # OLS回归 X = sm.add_constant(pd.concat([hml_data, smb_data, csa_data], axis=1)) returns = returns.reset_index(drop=True) X['CSA'] = X['CSA'].astype(float) model = sm.OLS(returns, X) results = model.fit() # 取残差标准差 residuals = results.resid residual_std = residuals.std() result_dict[stock['stock_code'].iloc[stock_index]] = residual_std # print(stock['stock_code'].iloc[stock_index], residual_std) # print(stock) return result_dict # 使用并行化加速 results = Parallel(n_jobs=-1)(delayed(process_month)(i) for i in tqdm(range(len(month)))) # results = Parallel(n_jobs=-1)(delayed(process_month)(i) for i in tqdm(range(1))) # 将结果合并到 std_ff3_raw 中 for i, result_dict in enumerate(results): for key, value in result_dict.items(): std_ff3_raw.loc[month[i], key] = value # 保存结果 std_ff3_raw.to_csv('answer/factor_4_raw.csv')
因子处理
初始化
虽然上面已经生成了因子处理需要用到的DataFrame
(那四个factor_x_raw
),但我还是选择从导出的数据中读入,方便测试和调整,也避免了对之前的结果进行破坏性操作。同时,重新读入一遍可以一定程度上避免原DataFrame
中一些未发现的问题
factor_1 = pd.read_csv('answer/factor_1_raw.csv')
factor_1 = factor_1.rename(columns={'Unnamed: 0': 'date'})
factor_2 = pd.read_csv('answer/factor_2_raw.csv')
factor_2 = factor_2.rename(columns={'Unnamed: 0': 'date'})
factor_3 = pd.read_csv('answer/factor_3_raw.csv')
factor_3 = factor_3.rename(columns={'Unnamed: 0': 'date'})
factor_4 = pd.read_csv('answer/factor_4_raw.csv')
factor_4 = factor_4.rename(columns={'Unnamed: 0': 'date'})
去极值
计算因子标准差,然后遍历所有因子值,将所有超出范围的都设定在边界上
import pandas as pd
days = factor_1['date']
def process_std(factor):
mean = factor.mean()
std = factor.std()
bottom = mean - 3 * std
top = mean + 3 * std
# 截断超出上下限的数据
factor_value = factor.clip(lower=bottom, upper=top, axis=1)
# 拼接日期列并设置索引
processed_factor = pd.concat([days, factor_value], axis=1)
processed_factor.set_index(processed_factor.columns[0], inplace=True)
return processed_factor
# 调用函数
pro_factor_1_value = process_std(factor_1.iloc[:, 1:])
pro_factor_2_value = process_std(factor_2.iloc[:, 1:])
pro_factor_3_value = process_std(factor_3.iloc[:, 1:])
pro_factor_4_value = process_std(factor_4.iloc[:, 1:])
行业市值中性化
首先要计算A股市值因子,然后以其和中信一级行业哑变量为自变量,OLS去残差。
-
计算A股市值因子:取流通市值,取对数、去极值、zscore标准化
# 读取数据 float_a_shares_day = float_a_shares_day.iloc[:, 1:] close_day_value = close_day.iloc[:, 1:] # 计算市值 float_market_value = float_a_shares_day * close_day_value # 拼接股票代码,并转置,简化列名设置 float_market_value_adj = pd.concat([stock_code, float_market_value], axis=1).T float_market_value_adj.columns = float_market_value_adj.iloc[0] float_market_value_adj = float_market_value_adj[1:] float_market_value_adj.index.name = 'date' float_market_value_adj.columns.name = None # 获取在days列表中的日期 days = factor_1['date'] float_market_value = float_market_value_adj[float_market_value_adj.index.isin(days)] # 显式转换为 float 类型,然后再替换 0 为 NaN float_market_value = float_market_value.astype(float) # 先转换为float float_market_value = float_market_value.replace(0, np.nan) # 然后进行替换 # 取对数 float_market_value = np.log(float_market_value) # 计算平均值和标准差 mean = float_market_value.mean() std = float_market_value.std() # 设置去极值的阈值为平均值±3倍标准差 threshold = 3 * std # 去极值操作 float_market_value = float_market_value.clip(lower=mean - threshold, upper=mean + threshold, axis=1) # zscore标准化 float_market_value_zs = float_market_value.apply(lambda x: zscore(x, nan_policy='omit'), axis=1)
-
计算中信一级行业哑变量:中信一级行业哑变量(将1-29转换为29个的1/0变量;行业30可忽略)为自变量
# 1. 调整格式 cs_indus_code_01 = cs_indus_code.T cs_indus_code_01.columns = cs_indus_code_01.iloc[0] cs_indus_code_01 = cs_indus_code_01[1:] cs_indus_code_01.index.name = 'date' cs_indus_code_01.columns.name = None # 2. 过滤日期 cs_indus_code_01 = cs_indus_code_01[cs_indus_code_01.index.isin(days)] # 3. 使用apply替代applymap,直接进行值替换 converted_dfs = {i: cs_indus_code_01.apply(lambda col: np.where(col == i, 1, 0), axis=0) for i in range(1, 30)}
-
行业市值中性化:以A股市值因子和中信一级行业哑变量为自变量,OLS多元线性回归,取残差(我这个函数写得比较混乱,稍微有些低效,不过平均处理一个因子花15秒也能接受)
stock = stock_chosen.iloc[0] stock = stock.dropna() stock = stock[1:] def process_neu(pro_factor_value): factor_neu = pd.DataFrame({'stock_code': stock_code}) for i in tqdm(range(len(month))): dep_var = pro_factor_value.iloc[i] # 因变量 dep_var = dep_var.dropna() dep_var = dep_var[dep_var.index.isin(stock)] indep_var = float_market_value_zs.iloc[i] # 自变量 for tmp in range(1, 30): indep_var = pd.concat([indep_var, converted_dfs[tmp].iloc[i]], axis=1) indep_var = indep_var.dropna() # 确保只有同时出现在自变量和因变量里的股票会被做回归 common_index = dep_var.index.intersection(indep_var.index) dep_var = dep_var[common_index] indep_var = indep_var.loc[common_index] # 检查索引是否对齐 indep_var = indep_var.reindex(dep_var.index) # 拟合模型 model = sm.OLS(dep_var, indep_var) results = model.fit() residuals = results.resid # 重命名残差 residuals = residuals.rename(month[i]) residuals = pd.DataFrame(residuals) residuals['stock_code'] = residuals.index residuals = residuals.rename(columns={residuals.columns[0]: month[i]}) # 合并结果 factor_neu = pd.merge(factor_neu, residuals, on='stock_code', how='outer') return factor_neu factor_1_neu = process_neu(pro_factor_1_value) factor_2_neu = process_neu(pro_factor_2_value) factor_3_neu = process_neu(pro_factor_3_value) factor_4_neu = process_neu(pro_factor_4_value)
标准化
zscore标准化:转换为均值为0、标准差为1的分布。这个比较简单,直接套一个zscore
函数就完成了
def process_std(factor_neu):
factor_neu_T = factor_neu.T
factor_neu_T.columns = factor_neu_T.iloc[0]
factor_neu_T = factor_neu_T.iloc[1:]
factor_neu_T = factor_neu_T.astype(float)
factor_neu_T = factor_neu_T.apply(lambda x: zscore(x, nan_policy='omit'), axis=1)
return factor_neu_T
factor_1_neu_std = process_std(factor_1_neu)
factor_1_neu_std.to_csv('answer/factor_1_processed.csv')
factor_2_neu_std = process_std(factor_2_neu)
factor_2_neu_std.to_csv('answer/factor_2_processed.csv')
factor_3_neu_std = process_std(factor_3_neu)
factor_3_neu_std.to_csv('answer/factor_3_processed.csv')
factor_4_neu_std = process_std(factor_4_neu)
factor_4_neu_std.to_csv('answer/factor_4_processed.csv')
因子检验
- RankIC法:每个月末截面日计算因子值和下个自然月收益率的Spearman相关系数;
- 分层测试法:每个月末截面日对非
NaN
因子值进行降序排列,前20%股票视作第1层,20%-40%股票视作第2层……,下个自然月持有该层股票,等权计算收益率。
初始化
factor_1 = pd.read_csv('answer/factor_1_processed.csv')
factor_1 = factor_1.rename(columns={'Unnamed: 0': 'date'})
factor_2 = pd.read_csv('answer/factor_2_processed.csv')
factor_2 = factor_2.rename(columns={'Unnamed: 0': 'date'})
factor_3 = pd.read_csv('answer/factor_3_processed.csv')
factor_3 = factor_3.rename(columns={'Unnamed: 0': 'date'})
factor_4 = pd.read_csv('answer/factor_4_processed.csv')
factor_4 = factor_4.rename(columns={'Unnamed: 0': 'date'})
# 检查发现close_adj_day截止到2024-11-30,因此最后一个月无法计算因子收益,只能舍弃
# mon = month # 避免破坏性操作,不然每多运行一次本段程序,month末尾就会多出一行,导致索引不匹配
# mon[len(month)] = '2024-12-31' # 补充最后一天的日期
# month
RankIC法
遍历月末截面,对每个截面计算当前截面和下一个月末截面日间每支股票的收益率,然后对所有股票的相关系数进行Spearman相关系数计算。
def rankIC_test(factor):
factor_test = pd.DataFrame(index=days,
columns=['rankIC', 'layer_1', 'layer_2', 'layer_3', 'layer_4', 'layer_5'])
for i in range(149):
date_1 = month[i]
date_2 = month[i + 1]
price_1 = close_adj_day[date_1]
price_2 = close_adj_day[date_2]
return_rate = (price_2 - price_1) / price_1
factors = factor[factor['date'] == date_1]
factors = factors.iloc[:, 1:].squeeze()
factor_without_index = factors.reset_index(drop=True)
valid_mask = factor_without_index.notna() & return_rate.notna()
factor_valid = factor_without_index[valid_mask]
return_rate_valid = return_rate[valid_mask]
rankIC = spearmanr(factor_valid, return_rate_valid).correlation
factor_test.loc[date_1, 'rankIC'] = rankIC
return factor_test
factor_1_test = rankIC_test(factor_1)
factor_2_test = rankIC_test(factor_2)
factor_3_test = rankIC_test(factor_3)
factor_4_test = rankIC_test(factor_4)
分层测试法
遍历月末截面,对每个截面计算当前截面和下一个月末截面日间每支股票的收益率,然后对所有股票进行分层测试。
def layer_test(factor, factor_test):
for i in range(len(month) - 1):
date_1 = month[i]
date_2 = month[i + 1]
factors = factor[factor['date'] == date_1]
factors = factors.iloc[:, 1:].squeeze()
rank_factors = factors.rank(na_option='keep')
num_stock = rank_factors.count()
# 每一层的数量
num_per_group = num_stock // 5
close_1 = close_adj_day[date_1]
close_2 = close_adj_day[date_2]
return_rate = (close_2 - close_1) / close_1
layer_return = []
for j in range(5):
start = j * num_per_group
end = (j + 1) * num_per_group
rank_factors = rank_factors.reset_index(drop=True)
ind = rank_factors[(rank_factors >= start) & (rank_factors < end)]
ind = ind.index
layer_return.append(return_rate[ind].mean())
factor_test.loc[date_1, 'layer_' + str(j + 1)] = layer_return[j]
return factor_test
factor_1_test = layer_test(factor_1, factor_1_test)
factor_1_test.to_csv('answer/factor_1_test.csv')
factor_2_test = layer_test(factor_2, factor_2_test)
factor_2_test.to_csv('answer/factor_2_test.csv')
factor_3_test = layer_test(factor_3, factor_3_test)
factor_3_test.to_csv('answer/factor_3_test.csv')
factor_4_test = layer_test(factor_4, factor_4_test)
factor_4_test.to_csv('answer/factor_4_test.csv')
图像绘制
def illustrate_test(factor_test, i, x_tick_interval=29, date_format='%Y-%m'):
# 确保索引是日期类型
if not pd.api.types.is_datetime64_any_dtype(factor_test.index):
factor_test.index = pd.to_datetime(factor_test.index)
# 绘制累计 RankIC
rank_ic = factor_test['rankIC']
cumulative_rank_ic = -1 * rank_ic.cumsum()
plt.figure(figsize=(10, 6))
plt.plot(cumulative_rank_ic.index, cumulative_rank_ic.values, label='Cumulative RankIC')
plt.axhline(y=0, color='red', linestyle='--', linewidth=1, label='y=0') # 添加 y=0 水平线
x_ticks = cumulative_rank_ic.index[::x_tick_interval]
plt.xticks(x_ticks, labels=[x.strftime(date_format) for x in x_ticks])
plt.title(f'Factor_{i} Cumulative RankIC')
plt.xlabel('Date')
plt.ylabel('Cumulative RankIC')
plt.grid(True, linestyle='--', alpha=0.7)
plt.legend()
plt.show()
# 绘制累计收益
cumulative_returns = factor_test.iloc[:, 1:].cumsum()
cumulative_returns['Date'] = factor_test.index
cumulative_returns.set_index('Date', inplace=True)
plt.figure(figsize=(10, 6))
cumulative_returns.plot(ax=plt.gca())
plt.axhline(y=0, color='red', linestyle='--', linewidth=1, label='y=0') # 添加 y=0 水平线
x_ticks = cumulative_returns.index[::x_tick_interval]
plt.xticks(x_ticks, labels=[x.strftime(date_format) for x in x_ticks])
plt.title(f'Factor_{i} Cumulative Returns')
plt.xlabel('Date')
plt.ylabel('Cumulative Returns')
plt.grid(True, linestyle='--', alpha=0.7)
plt.legend(title='Factors')
plt.show()
illustrate_test(factor_1_test, 1)
illustrate_test(factor_2_test, 2)
illustrate_test(factor_3_test, 3)
illustrate_test(factor_4_test, 4)
图像参考
RankIC图应该近似是一条上升的直线,而分层收益图呈喇叭形发散,五条曲线有较高的分离度。如果你画出的图像和参考很不一样,大概率是代码存在问题,建议仔细检查。


笔记已完善