跳转至

基于机器学习的高频收益预测

第二个作业有四个可选的选题(如果都不喜欢也可以自己另选),总体来说难度都不大,我选择了机器学习高频收益预测。

选题介绍

基于data目录高频数据文件中的002507的快照数据,参考Modeling High Frequency Limit Order Book Dynamics with Support Vector Machines中的特征构建,也可以构建其他特征,基于机器学习模型预测股票未来短期的收益,并设置一定的测试集,构建合理的回测框架进行模型的有效性验证。

虽然看上去很高级,但实际上只要不到200行代码就可以搞定。如果你开心的话,还可以试着调用下DeepSeek的API,我估计准确度比sklearn会高上不少。

当然,作为高频收益预估,在追求准确度的同时,也要关注程序的运行效率。所以我这个动不动就要跑十分钟的破代码还有很大的改进空间。

数据预处理

数据读取

使用dask库进行并行读取,以提高读取速度,保证大数据量下的读取效率。当然,我也为程序设计了冗余,如果dask读取失败,也会自动调整为常规方式进行读取。

def read_parquet_files(data_directory, use_dask=False):
    print("开始读取目录:", data_directory)

    if use_dask:  # 使用 Dask 并行加载数据,提升数据读取速度
        print("    使用 Dask 加载数据")
        ddf = dd.read_parquet(os.path.join(data_directory, "*.parquet"))
        full_data = ddf.compute()

    else:  # 常规方式:顺序加载数据
        file_list = [os.path.join(data_directory, f) for f in os.listdir(data_directory) if f.endswith('.parquet')]
        file_list = sorted(file_list)
        data_frames = []
        for file in tqdm(file_list, desc="加载文件", ncols=100):
            df = pd.read_parquet(file)
            data_frames.append(df)
        full_data = pd.concat(data_frames, ignore_index=True)

    print("数据读入完成\n")
    return full_data

设置标签

因为机器给出的预测只有3个类别——“上升”、“下跌”和“不变”,所以需要预先为数据打上相应的标签,以便后续模型训练。这里我采用了简单的方法,计算未来5秒的收益率,大于0.001的标记为“上升”,小于-0.001的标记为“下跌”,其余则视为“不变”。

我感觉我在这个地方的处理欠妥,存在着非常大的改进空间。或许可以考虑采用标准差等更合理的方式来确定标签,以保证训练数据的均衡性。

def process_data_and_define_labels(data, window_size=5):
    """标签处理保持不变"""
    print("开始处理数据...")
    processed_data = data.copy()
    processed_data['future_mid_price'] = processed_data['mid_price'].shift(-window_size)
    processed_data['return'] = (processed_data['future_mid_price'] - processed_data['mid_price']) / processed_data['mid_price']
    processed_data['label'] = np.where(processed_data['return'] > 0.001, 1, np.where(processed_data['return'] < -0.001, -1, 0))
    processed_data = processed_data.dropna().reset_index(drop=True) 
    features = processed_data.drop(columns=['future_mid_price', 'return', 'label'], errors='ignore')
    labels = processed_data['label']
    print("数据处理完成\n")
    return features, labels

特征构造

其实这里所谓的“特征”就是低频中的因子,且相比低频中那些花里胡哨的因子往往更为简单(我猜是因为过于复杂的因子会消耗过多的算力?而且高频的数据类型比较单一,本身也无法组合出非常复杂的因子)。

特征的构建可以通过joblibParallel实现并行计算,以提升计算效率。样例在订单强度特征中给出(因为给到学生的数据集只有一个,构建因子比较快,所以我没有每个都使用并行计算)。

基础特征

标准化的原始盘口数据,包括十档卖出价格、十档买入价格、对应档位挂单成交量。

for i in range(1, 11):
    features[f'ask_price{i}'] = data[f'ask_price{i}']  # 十档卖出价格
    features[f'bid_price{i}'] = data[f'bid_price{i}']  # 十档买入价格
    features[f'ask_volume{i}'] = data[f'ask_volume{i}']  # 十档卖出量
    features[f'bid_volume{i}'] = data[f'bid_volume{i}']  # 十档买入量

时间不敏感特征

间不敏感因子反映盘口数据的静态特征,是市场的一个「截面状态」

  • 中间价格mid_price,买一和卖一价格的均值;
  • 价差spread,买一与卖一价格的差值,反映市场流动性;
  • 成交量总和total_ask_volumntotal_bid_volumn,十档买卖总挂单量,更直观地反应市场流动性;
  • 价格不均衡price_imbalance,买卖挂单量的差值,反映市场的买卖压力(买卖盘价格不对称性);
  • 成交量不均衡volume_imbalance,买卖挂单量的差值,反映市场的买卖压力。
data['mid_price'] = (data['ask_price1'] + data['bid_price1']) / 2
data['spread'] = data['ask_price1'] - data['bid_price1']
data['total_ask_volume'] = data[[f'ask_volume{i}' for i in range(1, 11)]].sum(axis=1)
data['total_bid_volume'] = data[[f'bid_volume{i}' for i in range(1, 11)]].sum(axis=1)
data['price_imbalance'] = (data['ask_price1'] - data['bid_price1']) / (data['ask_price1'] + data['bid_price1'])
data['volume_imbalance'] = (data['total_ask_volume'] - data['total_bid_volume']) / (data['total_ask_volume'] + data['total_bid_volume'])

时间敏感特征

反映市场短期的趋势(动量?反转?)。计算中间价格、价差、成交量等指标在时间窗口内的变化率。

window_size = 5  # 时间窗口为5秒
data = data.sort_values(by='time')  # 按时间排序
for col in ['mid_price', 'spread', 'total_ask_volume', 'total_bid_volume']:
    features[f'{col}_diff'] = data[col].diff(periods=1) / window_size

订单强度特征

订单强度因子衡量的是盘口深度中的活跃程度,通过挂单量与价差的比值计算。

这里我采用了joblibParallel库进行并行计算,其余因子也可以使用相同的方法,不过对这样的小规模数据来说并不是很必要。

# 采用joblib的Parallel并行计算加速
def calculate_intensity(i):
    return (
        data[f'ask_volume{i}'] / (data['ask_price1'] - data['bid_price1']).clip(lower=1e-6),
        data[f'bid_volume{i}'] / (data['ask_price1'] - data['bid_price1']).clip(lower=1e-6),
    )

results = Parallel(n_jobs=-1)(delayed(calculate_intensity)(i) for i in range(1, 11))
for i, (ask_intensity, bid_intensity) in enumerate(results, start=1):
    features[f'ask_intensity_{i}'] = ask_intensity
    features[f'bid_intensity_{i}'] = bid_intensity

滚动窗口特征

滚动窗口特征因子通过在指定的时间窗口内计算盘口数据的均值和标准差,反映出市场的短期动态变化和波动性。

  • 滚动均值mid_price_roll_meanspread_roll_mean,平滑市场短期波动,提取更加稳定的趋势特征。
  • 滚动标准差mid_price_roll_stdspread_roll_std,衡量市场短期波动的波动性。

我设置了不同的滚动窗口大小(5秒、10秒、20秒),以反映不同时间维度的市场动态特征。

window_sizes = [5, 10, 20]  # 滚动窗口大小列表
for window in window_sizes:
    # 滚动均值
    features[f'mid_price_roll_mean_{window}'] = data['mid_price'].rolling(window=window).mean()
    features[f'spread_roll_mean_{window}'] = data['spread'].rolling(window=window).mean()

    # 滚动标准差
    features[f'mid_price_roll_std_{window}'] = data['mid_price'].rolling(window=window).std()
    features[f'spread_roll_std_{window}'] = data['spread'].rolling(window=window).std()

features = features.dropna()

机器学习收益预测

数据采样平衡

因为在短期收益率预测中,“上涨”、“下跌”和”不变”的类别样本数量存在很大差异。我比较了静态划分和SMOTE方法,发现后者的效果显著优于前者,故采用后者。

SMOTE通过生成少数类别的合成样本,使类别分布更加平衡。是一种「样本的虚构」。

smote = SMOTE(random_state=42)
features_resampled, labels_resampled = smote.fit_resample(features, labels)
X_train, X_test, y_train, y_test = train_test_split(
    features_resampled, 
    labels_resampled, 
    test_size=0.2, 
    random_state=42
    )

机器学习模型选择

我先后采用了两种实现方式——LightGBM和Stacking集成学习,回测结果表明后者的预测准确度略优于前者,但运行时间也远长于前者。

LightGBM的优点是内存占用低,训练速度快,通过设置n_jobs=-1实现并行计算,可以提高训练速度。

Stacking通过组合多个基础模型的预测结果,实现更准确的预测。我选择LightGBM、XGBoost和CatBoost作为基础学习器,LightGBM作为最终学习器,构建了如下Stacking模型:

lgbm_model = LGBMClassifier(
    n_estimators=100, 
    max_depth=10, 
    learning_rate=0.05, 
    random_state=42
    )
xgb_model = XGBClassifier(
    n_estimators=100, 
    max_depth=10, 
    learning_rate=0.05, 
    random_state=42
    )
catboost_model = CatBoostClassifier
    n_estimators=100, 
    max_depth=10, 
    learning_rate=0.05, 
    random_state=42, 
    verbose=0
    )

# 集成模型:StackingClassifier
stacking_model = StackingClassifier(
    estimators=[('lgbm', lgbm_model), ('xgb', xgb_model), ('catboost', catboost_model)],
    final_estimator=LGBMClassifier(n_estimators=50, random_state=42)
    )

# 训练集成模型
stacking_model.fit(X_train, y_train)
y_pred = stacking_model.predict(X_test)

预测效果分析

特征重要性分析

整体来看,时间敏感因子对模型预测的贡献最大,时间不敏感次之,订单强度因子反而影响较小。可能是因为后者一定程度上具有中低频「价值属性」,在短期的高频预测中无法起到足够的作用。

模型性能分析

选择效果最好的Stacking模型进行分析。它在“不变”类别上表现最好,这也是因为“不变”类别的样本量最大,模型更容易学习到其中的规律。

相比“不变”,我们实际上更关心“上涨”的预测效果,其准确度为0.72,召回率为0.67,表现尚可。或许在打标签时可以将更多的数据分入“上涨”类别中,以为模型提供更多可靠的训练数据,而不是依赖SMOTE虚构。

完整代码

'''
⚠️建议在venv虚拟环境中运行,如果遇到如下报错
ImportError: cannot import name 'Series' from partially initialized module 'pandas'
经检查是pandas冲突导致的,请卸载seaborn,将pandas升级到最新版本,并重新安装seaborn
'''
import os
import numpy as np
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
from sklearn.ensemble import StackingClassifier
from lightgbm import LGBMClassifier
from xgboost import XGBClassifier
from catboost import CatBoostClassifier
from imblearn.over_sampling import SMOTE
import matplotlib.pyplot as plt
import dask.dataframe as dd
from tqdm import tqdm  # 导入进度条模块

# ==================== 中文显示配置 ====================
try:
    plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']
except Exception as e:
    print("⚠️ 中文显示配置失败:", e)


# =====================================================

def read_parquet_files(data_directory, use_dask=False):
    """读取 parquet 文件"""
    print("开始读取目录:", data_directory)
    if use_dask:
        print("    使用 Dask 加载数据")
        ddf = dd.read_parquet(os.path.join(data_directory, "*.parquet"))
        full_data = ddf.compute()
    else:
        file_list = [os.path.join(data_directory, f) for f in os.listdir(data_directory) if f.endswith('.parquet')]
        file_list = sorted(file_list)
        data_frames = []
        for file in tqdm(file_list, desc="加载文件", ncols=100):  # 添加进度条
            df = pd.read_parquet(file)
            data_frames.append(df)
        full_data = pd.concat(data_frames, ignore_index=True)
    print("数据读入完成\n")
    return full_data


def construct_features(data):
    """特征工程的改进版,增加滚动窗口特征"""
    print("开始构建特征...")
    features = pd.DataFrame()
    data['time'] = pd.to_datetime(data['date'] + ' ' + data['time'])
    data = data.sort_values(by='time').reset_index(drop=True)
    for i in range(1, 11):
        features[f'ask_price{i}'] = data[f'ask_price{i}']
        features[f'bid_price{i}'] = data[f'bid_price{i}']
        features[f'ask_volume{i}'] = data[f'ask_volume{i}']
        features[f'bid_volume{i}'] = data[f'bid_volume{i}']

    data['mid_price'] = (data['ask_price1'] + data['bid_price1']) / 2
    data['spread'] = data['ask_price1'] - data['bid_price1']
    data['total_ask_volume'] = data[[f'ask_volume{i}' for i in range(1, 11)]].sum(axis=1)
    data['total_bid_volume'] = data[[f'bid_volume{i}' for i in range(1, 11)]].sum(axis=1)
    data['price_imbalance'] = (data['ask_price1'] - data['bid_price1']) / (data['ask_price1'] + data['bid_price1'])
    data['volume_imbalance'] = (data['total_ask_volume'] - data['total_bid_volume']) / (data['total_ask_volume'] + data['total_bid_volume'])

    features['mid_price'] = data['mid_price']
    features['spread'] = data['spread']
    features['total_ask_volume'] = data['total_ask_volume']
    features['total_bid_volume'] = data['total_bid_volume']
    features['price_imbalance'] = data['price_imbalance']
    features['volume_imbalance'] = data['volume_imbalance']

    # 新增滚动窗口特征
    window_sizes = [5, 10, 20]
    for window in window_sizes:
        features[f'mid_price_roll_mean_{window}'] = data['mid_price'].rolling(window=window).mean()
        features[f'spread_roll_mean_{window}'] = data['spread'].rolling(window=window).mean()
        features[f'mid_price_roll_std_{window}'] = data['mid_price'].rolling(window=window).std()
        features[f'spread_roll_std_{window}'] = data['spread'].rolling(window=window).std()

    features = features.dropna()
    print("特征构建完成\n")
    return features


def process_data_and_define_labels(data, window_size=5):
    """标签处理保持不变"""
    print("开始处理数据...")
    processed_data = data.copy()
    processed_data['future_mid_price'] = processed_data['mid_price'].shift(-window_size)
    processed_data['return'] = (processed_data['future_mid_price'] - processed_data['mid_price']) / processed_data['mid_price']
    processed_data['label'] = np.where(
        processed_data['return'] > 0.001, 1,
        np.where(processed_data['return'] < -0.001, -1, 0)
    )
    processed_data = processed_data.dropna().reset_index(drop=True)
    features = processed_data.drop(columns=['future_mid_price', 'return', 'label'], errors='ignore')
    labels = processed_data['label']
    print("数据处理完成\n")
    return features, labels


# 主程序
if __name__ == "__main__":
    data_directory = "002507"  # 你的数据目录
    read_data = read_parquet_files(data_directory, use_dask=True)
    constructed_features = construct_features(read_data)
    features, labels = process_data_and_define_labels(constructed_features)

    print("开始训练模型...")

    # 使用 SMOTE 进行过采样
    smote = SMOTE(random_state=42)
    features_resampled, labels_resampled = smote.fit_resample(features, labels)

    X_train, X_test, y_train, y_test = train_test_split(features_resampled, labels_resampled, test_size=0.2, random_state=42)

    # 构建模型集成
    lgbm_model = LGBMClassifier(n_estimators=100, max_depth=10, learning_rate=0.05, random_state=42)
    xgb_model = XGBClassifier(n_estimators=100, max_depth=10, learning_rate=0.05, random_state=42)
    catboost_model = CatBoostClassifier(n_estimators=100, max_depth=10, learning_rate=0.05, random_state=42, verbose=0)

    # 集成模型:StackingClassifier
    stacking_model = StackingClassifier(
        estimators=[('lgbm', lgbm_model), ('xgb', xgb_model), ('catboost', catboost_model)],
        final_estimator=LGBMClassifier(n_estimators=50, random_state=42)
    )

    # 训练集成模型
    stacking_model.fit(X_train, y_train)

    y_pred = stacking_model.predict(X_test)

    print("分类报告:")
    print(classification_report(y_test, y_pred, target_names=["下跌", "不变", "上涨"]))

    # 混淆矩阵显示
    plt.figure(figsize=(8, 6), dpi=100)
    conf_matrix = confusion_matrix(y_test, y_pred, labels=[-1, 0, 1])
    disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=["下跌", "不变", "上涨"])
    disp.plot(cmap=plt.cm.Blues, values_format='d')
    plt.title('分类结果混淆矩阵', fontsize=14, pad=20)
    plt.xlabel('预测标签', fontsize=12)
    plt.ylabel('真实标签', fontsize=12)
    plt.xticks(fontsize=10)
    plt.yticks(fontsize=10)
    plt.tight_layout()
    plt.show()

笔记已完善