跳转至

几个因子的复现与测试

关于我的代码

我这部分的代码很可能存在严重的错误,但是因为时间比较紧,一下子我也没检查出bug在哪。我建议大家还是从零开始构建自己的框架,不要试图将我的代码改对(如果你发现了我的错误,请联系我,非常感谢🙇‍♀️)。我的代码仅供参考,不保证结果的正确性。

在开始前:关于作业的几点建议

  1. 如果你追求效率,尽可能不要使用.ipynb文件编写代码。它虽然可以写一步看一步,在缺乏完整思路时提供更加直观的参考;但一旦文件大到一定程度,运行流畅度将是灾难性的,尤其是当你试图可视化某个庞大的DataFrame时;
  2. 当然,如果你对代码没有完整的思路,.ipynb确实是一种更好的选择,因为它可以让你及时查看当前的运行结果;
  3. 建议在搭建框架时避免DataFrame的碎片化操作,尽可能不要对DataFrame频繁操作,这会导致你的程序性能大幅降低。等框架写好了再进行调整很可能会引发意想不到的问题;
  4. 上手joblib的使用。joblib是一个用于并行计算的库,因为因子的计算是互不依赖的,具有并行计算的先天优势,一个简单的Parallel就可以实现大幅加速。以计算速度最慢的std_FF3factor_1m因子为例,在并行优化前,我的函数需要运行约30分钟;而经过优化后,它可以在2分钟内给出结果

作业描述

基本流程

  1. 股票池筛选:剔除上市不满252个交易日的股票(data/IPO_date.csv);剔除已退市股票(data/delist_date.csv);剔除当月末截面日停牌股票
  2. 原始因子值:计算2010年12月31日~2024年11月30日的因子值,如果在第一步中股票被筛选出股票池,因子值设置为NaN
  3. 因子预处理
    • 去除极值:将因子值大于mean + 3 * std的因子置为mean + 3 * std,将因子值小于mean - 3 * std的因子置为mean - 3 * std
    • 行业市值中性化:因子为因变量,以A股市值因子(流通市值取对数、去极值、zscore标准化)、中信一级行业哑变量(1~29转换为29个的0/1变量;行业30忽略)为自变量,OLS,取残差
    • 标准化:zscore标准化,转换为均值为0、标准差为1的分布
  4. 因子检验
    • RankIC法:计算因子值和下个自然月收益率的Spearman相关系数
    • 分层测试法:对非NaN因子值进行降序排列,前20%股票视作第1层,20%-40%股票视作第2层……,下个自然月持有该层股票,等权计算收益率

因子介绍

  1. return_1m:近21个交易日区间收益率;
  2. turn_1m:近21个交易日每日换手率的均值;
  3. std_1m:近21个交易日每日收益率的标准差;
  4. std_FF3factor_1m:近21个交易日内,日收益率序列对中证全指日收益率、SMB因子日收益率、HML因子日收益率序列进行多元线性回归的残差的标准差。
    • SMB因子日收益率:在每个交易日,对股票按总市值排序,后30%的股票算术平均收益率减去前30%股票算术平均收益率;
    • HML因子日收益率:在每个交易日,对股票按市净率倒序排序,前30%股票算术平均收益率减去后30%股票算术平均收益率。

import和初始化

因为我用了pandas_market_calendars,所以实际上并没有用到day,但我还是初始化了。但建议还是使用老师的数据,两者似乎存在微小的差异

# import
import pandas as pd
from pandas_market_calendars import get_calendar
import numpy as np
from tqdm import tqdm  # 进度条,可视化程序运行进度
from joblib import Parallel, delayed  # 并行计算
import concurrent.futures  # 并行计算
from pytz import timezone
import statsmodels.api as sm
from scipy.stats import zscore
from scipy.stats import spearmanr
import matplotlib.pyplot as plt

# 读入数据,初始化
calendar = get_calendar('XSHG')

stock_code = pd.read_csv('data/stock_code_info.csv')
stock_code = stock_code['stock_code']

day = pd.read_csv('data/day.csv')
day = day.loc[(day['date'] > '2010-11-30') & (day['date'] <= '2024-11-30')].reset_index(drop=True)
day = day['date']

month = pd.read_csv('data/month.csv')
month = month.loc[(month['date'] > '2010-11-30') & (month['date'] <= '2024-11-30')].reset_index(drop=True)
month = month['date']

amt_day = pd.read_csv('data/amt_day.csv')

IPO_date = pd.read_csv('data/IPO_date_info.csv')
IPO_date = IPO_date.rename(columns={'Unnamed: 0': 'stock_code'})

delist_date = pd.read_csv('data/delist_date_info.csv')
delist_date = delist_date.rename(columns={'Unnamed: 0': 'stock_code'})

close_adj_day = pd.read_csv('data/close_adj_day.csv')
close_adj_day = close_adj_day.rename(columns={'Unnamed: 0': 'stock_code'})

turn_day = pd.read_csv('data/turn_day.csv')
turn_day = turn_day.rename(columns={'Unnamed: 0': 'stock_code'})

close_adj_day_sf = close_adj_day.shift(1, axis=1)
close_adj_day_value = close_adj_day.iloc[:, 2:]
close_adj_day_sf_value = close_adj_day_sf.iloc[:, 2:]

csiall_day = pd.read_csv('data/csiall_day.csv')
close_day = pd.read_csv('data/close_day.csv')
share_totala_day = pd.read_csv('data/share_totala_day.csv')
pb_lf_day = pd.read_csv('data/pb_lf_day.csv')

cs_indus_code = pd.read_csv('data/cs_indus_code_day.csv')
float_a_shares_day = pd.read_csv('data/float_a_shares_day.csv')

股票池筛选

  1. 剔除上市不满252个交易日的股票:遍历每一只股票的IPO日期,加上252个交易日(直接加在下标上,相比while循环速度快得多),如果超过当前日期,剔除该股票;
  2. 剔除已退市股票:在data/delist_date.csv中读取退市日期,如果该日期有效,剔除该股票;
  3. 剔除当月末截面日停牌股票:遍历每一只股票的月末截面日,如果停牌,剔除该股票。
stock_chosen = []

for month_cut in month:  # 遍历每个月末截面日
    day_amt = amt_day[month_cut]  # 获取月末截面日的销售数据

    day_amt = pd.DataFrame({'stock_code': stock_code, 'day_amt': day_amt, 'cut_day': month_cut, 'in_pool': 1})
    day_amt = pd.merge(day_amt, IPO_date, on='stock_code')  # 合并IPO数据
    day_amt = pd.merge(day_amt, delist_date, on='stock_code')  # 合并退市数据

    # 筛选股票
    day_amt.loc[day_amt['cut_day'] < day_amt['to_date'], 'in_pool'] = 0  # 剔除上市不满252个交易日的股票
    day_amt.loc[day_amt['day_amt'].isnull(), 'in_pool'] = 0  # 剔除月末截面停牌股票
    day_amt.loc[(day_amt['delist_date'].notnull()) & (day_amt['cut_day'] > day_amt['delist_date']), 'in_pool'] = 0  # 剔除退市股票,空值表示还没退市

    day_amt = day_amt[day_amt['in_pool'] == 1]
    day_pool = day_amt['stock_code'].tolist()
    day_pool.insert(0, month_cut)
    stock_chosen.append(day_pool)

stock_chosen = pd.DataFrame(stock_chosen)
stock_chosen.columns = ['cut_day'] + [f'stock_{i}' for i in range(1, stock_chosen.shape[1])]

原始因子计算

因子1:return_1m_raw

把每个月末截面日的复权收盘价和21天前的复权收盘价拿出来,计算收益率

return_1m_raw = pd.DataFrame({'stock_code': stock_code})

for i in range(len(month)):
    stock = stock_chosen.iloc[i, 1:]  # 取出该月末截面日的股票池
    stock = stock.dropna()
    stock = pd.DataFrame({'stock_code': stock})
    day = month[i]

    close_adj_today = pd.merge(stock, close_adj_day, on='stock_code', how='inner')  # 取出需要的股票收盘价数据
    date_index = close_adj_today.columns.get_loc(day)  # 找到当月截面日的列索引
    price_now = close_adj_today.iloc[:, date_index]
    price_21_before = close_adj_today.iloc[:, date_index - 21]  # 回溯21天

    return_1m = (price_now - price_21_before) / price_21_before

    stock = stock.reset_index(drop=True)
    return_1m = return_1m.reset_index(drop=True)
    # 重置索引,保证stock和return_1m索引一致,否则会因为索引不一致而报错

    return_1m = pd.DataFrame({day: return_1m})
    return_1m = pd.concat([stock, return_1m], axis=1)
    return_1m_raw = pd.merge(return_1m_raw, return_1m, on='stock_code', how='outer')

return_1m_raw_T = return_1m_raw.T
return_1m_raw_T.columns = return_1m_raw_T.iloc[0]
return_1m_raw_T = return_1m_raw_T.iloc[1:]
return_1m_raw_T.to_csv('answer/factor_1_raw.csv')  # 转置后再存储,以符合格式要求

因子2:turn_1m_raw

遍历每个月末截面日,并向前取出连续21天的换手率,取均值。只要将上面的代码复制下来然后微改一下就可以了

turn_1m_raw = pd.DataFrame({'stock_code': stock_code})

for i in range(len(month)):
    stock = stock_chosen.iloc[i, 1:]  # 取出该月末截面日的股票池
    stock = stock.dropna()
    stock = pd.DataFrame({'stock_code': stock})
    day = month[i]

    turn_today = pd.merge(stock, turn_day, on='stock_code', how='inner')  # 取出需要的换手率数据
    date_index = turn_today.columns.get_loc(day)  # 找到当月截面日的列索引
    turn_21 = turn_today.iloc[:, (date_index - 20):(date_index + 1)]  # 取出21天的换手率

    turn_1m = turn_21.mean(axis=1)

    stock = stock.reset_index(drop=True)
    turn_1m = turn_1m.reset_index(drop=True)

    turn_1m = pd.DataFrame({day: turn_1m})
    turn_1m = pd.concat([stock, turn_1m], axis=1)
    turn_1m_raw = pd.merge(turn_1m_raw, turn_1m, on='stock_code', how='outer')

turn_1m_raw_T = turn_1m_raw.T
turn_1m_raw_T.columns = turn_1m_raw_T.iloc[0]
turn_1m_raw_T = turn_1m_raw_T.iloc[1:]
turn_1m_raw_T.to_csv('answer/factor_2_raw.csv')

因子3:std_1m_raw

和上面的两个因子思路一致,取出过去22天的股价,计算21天的收益率,然后计算标准差

std_1m_raw = pd.DataFrame({'stock_code': stock_code})

for i in range(len(month)):
    stock = stock_chosen.iloc[i, 1:]  # 取出该月末截面日的股票池
    stock = stock.dropna()
    stock = pd.DataFrame({'stock_code': stock})
    day = month[i]

    close_adj_today = pd.merge(stock, close_adj_day, on='stock_code', how='inner')  # 取出需要的股票收盘价数据
    date_index = close_adj_today.columns.get_loc(day)  # 找到当月截面日的列索引
    price_22 = close_adj_today.iloc[:, (date_index - 21):(date_index + 1)]  # 获取过去22天的股价数据
    return_21 = price_22.pct_change(axis=1, fill_method=None).iloc[:, 1:]  # 计算21天的收益率

    std_1m = return_21.std(axis=1)  # 计算收益率的标准差

    stock = stock.reset_index(drop=True)
    std_1m = std_1m.reset_index(drop=True)

    std_1m = pd.DataFrame({day: std_1m})
    std_1m = pd.concat([stock, std_1m], axis=1)
    std_1m_raw = pd.merge(std_1m_raw, std_1m, on='stock_code', how='outer')

std_1m_raw_T = std_1m_raw.T
std_1m_raw_T.columns = std_1m_raw_T.iloc[0]
std_1m_raw_T = std_1m_raw_T.iloc[1:]
std_1m_raw_T.to_csv('answer/factor_3_raw.csv')

因子4:std_FF3factor_1m_raw

  1. 计算中证全指日收益率:按照中证全指的定义进行计算即可,是个GPT就会写

    # 计算收益率
    return_1 = (close_adj_day_value - close_adj_day_sf_value) / close_adj_day_sf_value
    return_1 = pd.concat([stock_code, return_1], axis=1)
    
    # 计算市值
    market_value = (share_totala_day.iloc[:, 1:] * close_day.iloc[:, 1:]).reset_index(drop=True)
    market_value = pd.concat([stock_code, market_value], axis=1)
    
    # 排名
    market_rank = market_value.rank(axis=0, ascending=False)
    pb_rank = pb_lf_day.rank(axis=0, ascending=False)
    
    day_list = market_rank.columns[2:]
    
    # 计算日历股价增长率(CSA)
    csiall_day_value = csiall_day.iloc[0, 2:].values  # 直接取值,避免DataFrame不必要的转换
    csiall_day_value_sf = csiall_day.shift(1, axis=1).iloc[0, 2:].values  # 同样处理
    csa = (csiall_day_value - csiall_day_value_sf) / csiall_day_value_sf
    
    # 创建最终的DataFrame
    stock_csa_df = pd.DataFrame({'date': day_list, 'CSA': csa})
    
  2. 计算SMB:先计算市值(收盘价乘以股本),然后根据市值排序,注意排序时忽略NaN。此外,这里stock_code不再保留真正的股票代码,而用排序前的下标代替,方便后面直接通过下标定位到股票,避免不必要的查找

    # 提前计算所有股票的收益率,这样每次就不需要进行merge
    returns_dict = return_1.set_index('stock_code')[market_value.columns[2:]]  # 每只股票在每一天的收益率
    
    SMB = []
    for date in tqdm(market_value.columns[2:]):
        top_30_num = int(market_rank[date].count() * 0.3)
    
        # 找到前30%和后30%的股票下标
        top_30_idx = market_rank[date].nsmallest(top_30_num).index
        bottom_30_idx = market_rank[date].nlargest(top_30_num).index
    
        # 提取收益率
        top_30_return = returns_dict.loc[stock_code[top_30_idx], date]
        bottom_30_return = returns_dict.loc[stock_code[bottom_30_idx], date]
    
        # 计算SMB
        SMB_top = top_30_return.mean()
        SMB_bottom = bottom_30_return.mean()
        SMB.append(SMB_bottom - SMB_top)
    
    # 将结果转化为DataFrame
    SMB_df = pd.DataFrame({'date': day_list, 'SMB': SMB})
    

  3. 计算HML:先计算市净率,因为市净率不会为负,所以按照市净率的倒数排序和按市净率倒序排序等效,为避免不必要的计算,选择后者;SMB复制下来微改一下即可

    # 提前计算所有股票的收益率,避免碎片化操作
    returns_dict = return_1.set_index('stock_code')[market_value.columns[2:]]  # 每只股票在每一天的收益率
    
    HML = []
    for date in tqdm(market_value.columns[2:]):
        top_30_num = int(pb_rank[date].count() * 0.3)
    
        # 找到前30%和后30%的股票下标
        top_30_idx = pb_rank[date].nlargest(top_30_num).index
        bottom_30_idx = pb_rank[date].nsmallest(top_30_num).index
    
        # 提取收益率
        top_30_return = returns_dict.loc[stock_code[top_30_idx], date]
        bottom_30_return = returns_dict.loc[stock_code[bottom_30_idx], date]
    
        # 计算HML
        HML_top = top_30_return.mean()
        HML_bottom = bottom_30_return.mean()
        HML.append(HML_bottom - HML_top)
    
    # 将结果转化为 DataFrame
    HML_df = pd.DataFrame({'date': market_value.columns[2:], 'HML': HML})
    
  4. OLS计算因子:建议采用并行计算,否则运行速度过慢。同时,要尽可能避免在二重循环中对Dataframe进行过多操作,这样的碎片化操作会导致程序会越跑越慢

    std_ff3_raw = pd.DataFrame(index=month, columns=stock_code)
    
    
    def process_month(i):
        day = month[i]
        stock = stock_chosen.iloc[i].dropna()
        stock = pd.DataFrame({'stock_code': stock})
        stock = stock[1:]
    
        # 右移一天计算收益率序列
        close_adj_today = pd.merge(stock, close_adj_day, on='stock_code', how='inner')
        date_index = close_adj_today.columns.get_loc(day)
        day_range = close_adj_today.iloc[:, (date_index - 21):(date_index + 1)]
        day_range_sf = day_range.shift(1, axis=1)
        day_range = day_range.drop(day_range.columns[0], axis=1)
        day_range_sf = day_range_sf.drop(day_range_sf.columns[0], axis=1)
    
        return_day = (day_range - day_range_sf) / day_range_sf
        # print(return_day)
    
        result_dict = {}
    
        # 计算因子
        for stock_index in return_day.index:
            returns = return_day.loc[stock_index]
            start_date = returns.index[-21]
            end_date = returns.index[-1]
    
            # 获取因子数据(提前计算索引)
            start_index = HML_df[HML_df['date'] == start_date].index[0]
            end_index = HML_df[HML_df['date'] == end_date].index[0]
    
            hml_data = HML_df.loc[start_index:end_index, 'HML'].reset_index(drop=True)
            smb_data = SMB_df.loc[start_index:end_index, 'SMB'].reset_index(drop=True)
            csa_data = stock_csa_df.loc[start_index:end_index, 'CSA'].reset_index(drop=True)
    
            # OLS回归
            X = sm.add_constant(pd.concat([hml_data, smb_data, csa_data], axis=1))
            returns = returns.reset_index(drop=True)
            X['CSA'] = X['CSA'].astype(float)
    
            model = sm.OLS(returns, X)
            results = model.fit()
    
            # 取残差标准差
            residuals = results.resid
            residual_std = residuals.std()
    
            result_dict[stock['stock_code'].iloc[stock_index]] = residual_std
            # print(stock['stock_code'].iloc[stock_index], residual_std)
            # print(stock)
    
        return result_dict
    
    
    # 使用并行化加速
    results = Parallel(n_jobs=-1)(delayed(process_month)(i) for i in tqdm(range(len(month))))
    
    # results = Parallel(n_jobs=-1)(delayed(process_month)(i) for i in tqdm(range(1)))
    
    # 将结果合并到 std_ff3_raw 中
    for i, result_dict in enumerate(results):
        for key, value in result_dict.items():
            std_ff3_raw.loc[month[i], key] = value
    
    # 保存结果
    std_ff3_raw.to_csv('answer/factor_4_raw.csv')
    

因子处理

初始化

虽然上面已经生成了因子处理需要用到的DataFrame(那四个factor_x_raw),但我还是选择从导出的数据中读入,方便测试和调整,也避免了对之前的结果进行破坏性操作。同时,重新读入一遍可以一定程度上避免原DataFrame中一些未发现的问题

factor_1 = pd.read_csv('answer/factor_1_raw.csv')
factor_1 = factor_1.rename(columns={'Unnamed: 0': 'date'})

factor_2 = pd.read_csv('answer/factor_2_raw.csv')
factor_2 = factor_2.rename(columns={'Unnamed: 0': 'date'})

factor_3 = pd.read_csv('answer/factor_3_raw.csv')
factor_3 = factor_3.rename(columns={'Unnamed: 0': 'date'})

factor_4 = pd.read_csv('answer/factor_4_raw.csv')
factor_4 = factor_4.rename(columns={'Unnamed: 0': 'date'})

去极值

计算因子标准差,然后遍历所有因子值,将所有超出范围的都设定在边界上

import pandas as pd
days = factor_1['date']

def process_std(factor):
    mean = factor.mean()
    std = factor.std()
    bottom = mean - 3 * std
    top = mean + 3 * std

    # 截断超出上下限的数据
    factor_value = factor.clip(lower=bottom, upper=top, axis=1)

    # 拼接日期列并设置索引
    processed_factor = pd.concat([days, factor_value], axis=1)
    processed_factor.set_index(processed_factor.columns[0], inplace=True)

    return processed_factor


# 调用函数
pro_factor_1_value = process_std(factor_1.iloc[:, 1:])
pro_factor_2_value = process_std(factor_2.iloc[:, 1:])
pro_factor_3_value = process_std(factor_3.iloc[:, 1:])
pro_factor_4_value = process_std(factor_4.iloc[:, 1:])

行业市值中性化

首先要计算A股市值因子,然后以其和中信一级行业哑变量为自变量,OLS去残差。

  1. 计算A股市值因子:取流通市值,取对数、去极值、zscore标准化

    # 读取数据
    float_a_shares_day = float_a_shares_day.iloc[:, 1:]
    close_day_value = close_day.iloc[:, 1:]
    
    # 计算市值
    float_market_value = float_a_shares_day * close_day_value
    
    # 拼接股票代码,并转置,简化列名设置
    float_market_value_adj = pd.concat([stock_code, float_market_value], axis=1).T
    float_market_value_adj.columns = float_market_value_adj.iloc[0]
    float_market_value_adj = float_market_value_adj[1:]
    float_market_value_adj.index.name = 'date'
    float_market_value_adj.columns.name = None
    
    # 获取在days列表中的日期
    days = factor_1['date']
    float_market_value = float_market_value_adj[float_market_value_adj.index.isin(days)]
    
    # 显式转换为 float 类型,然后再替换 0 为 NaN
    float_market_value = float_market_value.astype(float)  # 先转换为float
    float_market_value = float_market_value.replace(0, np.nan)  # 然后进行替换
    
    # 取对数
    float_market_value = np.log(float_market_value)
    
    # 计算平均值和标准差
    mean = float_market_value.mean()
    std = float_market_value.std()
    
    # 设置去极值的阈值为平均值±3倍标准差
    threshold = 3 * std
    
    # 去极值操作
    float_market_value = float_market_value.clip(lower=mean - threshold, upper=mean + threshold, axis=1)
    
    # zscore标准化
    float_market_value_zs = float_market_value.apply(lambda x: zscore(x, nan_policy='omit'), axis=1)
    
  2. 计算中信一级行业哑变量:中信一级行业哑变量(将1-29转换为29个的1/0变量;行业30可忽略)为自变量

    # 1. 调整格式
    cs_indus_code_01 = cs_indus_code.T
    cs_indus_code_01.columns = cs_indus_code_01.iloc[0]
    cs_indus_code_01 = cs_indus_code_01[1:]
    cs_indus_code_01.index.name = 'date'
    cs_indus_code_01.columns.name = None
    
    # 2. 过滤日期
    cs_indus_code_01 = cs_indus_code_01[cs_indus_code_01.index.isin(days)]
    
    # 3. 使用apply替代applymap,直接进行值替换
    converted_dfs = {i: cs_indus_code_01.apply(lambda col: np.where(col == i, 1, 0), axis=0) for i in range(1, 30)}
    
  3. 行业市值中性化:以A股市值因子和中信一级行业哑变量为自变量,OLS多元线性回归,取残差(我这个函数写得比较混乱,稍微有些低效,不过平均处理一个因子花15秒也能接受)

    stock = stock_chosen.iloc[0]
    stock = stock.dropna()
    stock = stock[1:]
    
    def process_neu(pro_factor_value):
        factor_neu = pd.DataFrame({'stock_code': stock_code})
    
        for i in tqdm(range(len(month))):
            dep_var = pro_factor_value.iloc[i]  # 因变量
            dep_var = dep_var.dropna()
            dep_var = dep_var[dep_var.index.isin(stock)]
    
            indep_var = float_market_value_zs.iloc[i]  # 自变量
            for tmp in range(1, 30):
                indep_var = pd.concat([indep_var, converted_dfs[tmp].iloc[i]], axis=1)
                indep_var = indep_var.dropna()
    
            # 确保只有同时出现在自变量和因变量里的股票会被做回归
            common_index = dep_var.index.intersection(indep_var.index)
            dep_var = dep_var[common_index]
            indep_var = indep_var.loc[common_index]
    
            # 检查索引是否对齐
            indep_var = indep_var.reindex(dep_var.index) 
    
            # 拟合模型
            model = sm.OLS(dep_var, indep_var)
            results = model.fit()
            residuals = results.resid
    
            # 重命名残差
            residuals = residuals.rename(month[i])
            residuals = pd.DataFrame(residuals)
            residuals['stock_code'] = residuals.index
            residuals = residuals.rename(columns={residuals.columns[0]: month[i]})
    
            # 合并结果
            factor_neu = pd.merge(factor_neu, residuals, on='stock_code', how='outer')
    
        return factor_neu
    
    
    factor_1_neu = process_neu(pro_factor_1_value)
    factor_2_neu = process_neu(pro_factor_2_value)
    factor_3_neu = process_neu(pro_factor_3_value)
    factor_4_neu = process_neu(pro_factor_4_value)
    

标准化

zscore标准化:转换为均值为0、标准差为1的分布。这个比较简单,直接套一个zscore函数就完成了

def process_std(factor_neu):
    factor_neu_T = factor_neu.T
    factor_neu_T.columns = factor_neu_T.iloc[0]
    factor_neu_T = factor_neu_T.iloc[1:]
    factor_neu_T = factor_neu_T.astype(float)

    factor_neu_T = factor_neu_T.apply(lambda x: zscore(x, nan_policy='omit'), axis=1)
    return factor_neu_T


factor_1_neu_std = process_std(factor_1_neu)
factor_1_neu_std.to_csv('answer/factor_1_processed.csv')
factor_2_neu_std = process_std(factor_2_neu)
factor_2_neu_std.to_csv('answer/factor_2_processed.csv')
factor_3_neu_std = process_std(factor_3_neu)
factor_3_neu_std.to_csv('answer/factor_3_processed.csv')
factor_4_neu_std = process_std(factor_4_neu)
factor_4_neu_std.to_csv('answer/factor_4_processed.csv')

因子检验

  1. RankIC法:每个月末截面日计算因子值和下个自然月收益率的Spearman相关系数;
  2. 分层测试法:每个月末截面日对非NaN因子值进行降序排列,前20%股票视作第1层,20%-40%股票视作第2层……,下个自然月持有该层股票,等权计算收益率。

初始化

factor_1 = pd.read_csv('answer/factor_1_processed.csv')
factor_1 = factor_1.rename(columns={'Unnamed: 0': 'date'})

factor_2 = pd.read_csv('answer/factor_2_processed.csv')
factor_2 = factor_2.rename(columns={'Unnamed: 0': 'date'})

factor_3 = pd.read_csv('answer/factor_3_processed.csv')
factor_3 = factor_3.rename(columns={'Unnamed: 0': 'date'})

factor_4 = pd.read_csv('answer/factor_4_processed.csv')
factor_4 = factor_4.rename(columns={'Unnamed: 0': 'date'})

# 检查发现close_adj_day截止到2024-11-30,因此最后一个月无法计算因子收益,只能舍弃
# mon = month # 避免破坏性操作,不然每多运行一次本段程序,month末尾就会多出一行,导致索引不匹配
# mon[len(month)] = '2024-12-31'  # 补充最后一天的日期
# month

RankIC法

遍历月末截面,对每个截面计算当前截面和下一个月末截面日间每支股票的收益率,然后对所有股票的相关系数进行Spearman相关系数计算。

def rankIC_test(factor):
    factor_test = pd.DataFrame(index=days,
                               columns=['rankIC', 'layer_1', 'layer_2', 'layer_3', 'layer_4', 'layer_5'])

    for i in range(149):
        date_1 = month[i]
        date_2 = month[i + 1]
        price_1 = close_adj_day[date_1]
        price_2 = close_adj_day[date_2]

        return_rate = (price_2 - price_1) / price_1

        factors = factor[factor['date'] == date_1]
        factors = factors.iloc[:, 1:].squeeze()
        factor_without_index = factors.reset_index(drop=True)

        valid_mask = factor_without_index.notna() & return_rate.notna()
        factor_valid = factor_without_index[valid_mask]
        return_rate_valid = return_rate[valid_mask]

        rankIC = spearmanr(factor_valid, return_rate_valid).correlation
        factor_test.loc[date_1, 'rankIC'] = rankIC

    return factor_test


factor_1_test = rankIC_test(factor_1)
factor_2_test = rankIC_test(factor_2)
factor_3_test = rankIC_test(factor_3)
factor_4_test = rankIC_test(factor_4)

分层测试法

遍历月末截面,对每个截面计算当前截面和下一个月末截面日间每支股票的收益率,然后对所有股票进行分层测试。

def layer_test(factor, factor_test):
    for i in range(len(month) - 1):
        date_1 = month[i]
        date_2 = month[i + 1]

        factors = factor[factor['date'] == date_1]
        factors = factors.iloc[:, 1:].squeeze()

        rank_factors = factors.rank(na_option='keep')
        num_stock = rank_factors.count()

        # 每一层的数量
        num_per_group = num_stock // 5
        close_1 = close_adj_day[date_1]
        close_2 = close_adj_day[date_2]
        return_rate = (close_2 - close_1) / close_1

        layer_return = []
        for j in range(5):
            start = j * num_per_group
            end = (j + 1) * num_per_group
            rank_factors = rank_factors.reset_index(drop=True)
            ind = rank_factors[(rank_factors >= start) & (rank_factors < end)]
            ind = ind.index
            layer_return.append(return_rate[ind].mean())
            factor_test.loc[date_1, 'layer_' + str(j + 1)] = layer_return[j]

    return factor_test


factor_1_test = layer_test(factor_1, factor_1_test)
factor_1_test.to_csv('answer/factor_1_test.csv')
factor_2_test = layer_test(factor_2, factor_2_test)
factor_2_test.to_csv('answer/factor_2_test.csv')
factor_3_test = layer_test(factor_3, factor_3_test)
factor_3_test.to_csv('answer/factor_3_test.csv')
factor_4_test = layer_test(factor_4, factor_4_test)
factor_4_test.to_csv('answer/factor_4_test.csv')

图像绘制

def illustrate_test(factor_test, i, x_tick_interval=29, date_format='%Y-%m'):
    # 确保索引是日期类型
    if not pd.api.types.is_datetime64_any_dtype(factor_test.index):
        factor_test.index = pd.to_datetime(factor_test.index)

    # 绘制累计 RankIC
    rank_ic = factor_test['rankIC']
    cumulative_rank_ic = -1 * rank_ic.cumsum()
    plt.figure(figsize=(10, 6))
    plt.plot(cumulative_rank_ic.index, cumulative_rank_ic.values, label='Cumulative RankIC')
    plt.axhline(y=0, color='red', linestyle='--', linewidth=1, label='y=0')  # 添加 y=0 水平线
    x_ticks = cumulative_rank_ic.index[::x_tick_interval]
    plt.xticks(x_ticks, labels=[x.strftime(date_format) for x in x_ticks])
    plt.title(f'Factor_{i} Cumulative RankIC')
    plt.xlabel('Date')
    plt.ylabel('Cumulative RankIC')
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.legend()
    plt.show()


    # 绘制累计收益
    cumulative_returns = factor_test.iloc[:, 1:].cumsum()
    cumulative_returns['Date'] = factor_test.index
    cumulative_returns.set_index('Date', inplace=True)

    plt.figure(figsize=(10, 6))
    cumulative_returns.plot(ax=plt.gca())
    plt.axhline(y=0, color='red', linestyle='--', linewidth=1, label='y=0')  # 添加 y=0 水平线
    x_ticks = cumulative_returns.index[::x_tick_interval]
    plt.xticks(x_ticks, labels=[x.strftime(date_format) for x in x_ticks])
    plt.title(f'Factor_{i} Cumulative Returns')
    plt.xlabel('Date')
    plt.ylabel('Cumulative Returns')
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.legend(title='Factors')
    plt.show()


illustrate_test(factor_1_test, 1)
illustrate_test(factor_2_test, 2)
illustrate_test(factor_3_test, 3)
illustrate_test(factor_4_test, 4)
图像参考

RankIC图应该近似是一条上升的直线,而分层收益图呈喇叭形发散,五条曲线有较高的分离度。如果你画出的图像和参考很不一样,大概率是代码存在问题,建议仔细检查。


笔记已完善