机器学习XGBoost股价预测实战
在量化金融领域,股价预测一直是备受关注的热点问题。XGBoost(Extreme Gradient Boosting)作为当前最优秀的机器学习算法之一,在金融时间序列预测中表现出色。本文将详细介绍如何使用XGBoost算法构建股价预测模型,涵盖从数据获取到模型评估的完整流程。
目录
XGBoost算法简介
什么是XGBoost
XGBoost是boosting算法的其中一种实现。Boosting算法的核心思想是将许多弱分类器集成在一起,形成一个强分类器。XGBoost作为提升树模型,将多个决策树模型集成,形成一个强大的分类器或回归器。
XGBoost的核心优势:
-
高效性
- 并行化训练,速度快
- 优化了内存使用
- 支持多线程处理
-
准确性
- 二阶导数优化
- 正则化防止过拟合
- 自动处理缺失值
-
灵活性
- 支持回归和分类问题
- 自定义损失函数
- 易于扩展和调优
-
鲁棒性
- 对异常值不敏感
- 自动处理特征选择
- 防止过拟合机制
XGBoost在金融中的应用
股价预测的优势:
- 能够捕捉非线性关系
- 自动特征选择
- 处理高维特征数据
- 对缺失值和异常值鲁棒
- 提供特征重要性分析
适用场景:
- 短期股价走势预测
- 技术指标信号生成
- 风险评估和预警
- 投资组合优化
- 量化交易策略开发
项目环境准备
必需依赖包
# 核心机器学习库
pip install xgboost==1.6.2
pip install scikit-learn==1.1.1
# 数据处理
pip install pandas==1.4.3
pip install numpy==1.23.1
pip install tushare==1.2.62
# 可视化
pip install matplotlib==3.5.2
pip install seaborn==0.11.2
pip install plotly==5.9.0
# 进度条
pip install tqdm==4.64.0
# 深度学习(可选)
pip install torch==1.12.0
环境验证
import xgboost as xgb
import pandas as pd
import numpy as np
import tushare as ts
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import tqdm
print("XGBoost version:", xgb.__version__)
print("Pandas version:", pd.__version__)
print("All dependencies installed successfully!")
数据获取
安装Tushare
Tushare是一个金融数据库,提供股票、期货、基金等金融数据。
# 安装Tushare
pip install tushare
# 升级到最新版本
pip install -U tushare
配置Token
import tushare as ts
import pandas as pd
import matplotlib.pyplot as plt
# 初始化pro_api
# 请替换为您的实际Token
pro = ts.pro_api("YOUR_API_TOKEN")
# 可选:从配置文件读取
import os
token = os.getenv('TUSHARE_TOKEN')
if token:
pro = ts.pro_api(token)
else:
print("请设置Tushare Token")
获取Token的方法:
- 访问 Tushare官网
- 注册账号并登录
- 进入个人中心 → 接口Token
- 复制Token到代码中
数据选择
# 获取平安银行股票数据(示例)
df = pro.daily(ts_code='000001.SZ', start_date='20180701', end_date='20180718')
# 数据预处理
df['date'] = pd.to_datetime(df['trade_date'])
df['adj_close'] = df['close']
df['volume'] = df['vol']
df['month'] = pd.DatetimeIndex(df['trade_date']).month
# 选择需要的列
df = df[['date', 'open', 'high', 'low', 'close', 'adj_close', 'volume', 'month']]
# 按日期排序
df = df.sort_values('date').reset_index(drop=True)
print(df.head())
print("\n数据形状:", df.shape)
print("\n数据信息:")
print(df.info())
数据字段说明:
| 字段名 | 含义 | 说明 |
|---|---|---|
| ts_code | 股票代码 | 格式:代码.市场(如000001.SZ) |
| trade_date | 交易日期 | YYYYMMDD格式 |
| open | 开盘价 | 当日开盘价格 |
| high | 最高价 | 当日最高价格 |
| low | 最低价 | 当日最低价格 |
| close | 收盘价 | 当日收盘价格 |
| adj_close | 复权收盘价 | 考虑分红配股的调整价格 |
| vol | 成交量 | 单位:手 |
| amount | 成交额 | 单位:千元 |
常用股票代码:
# 沪深300成分股示例
stock_codes = [
'000001.SZ', # 平安银行
'000002.SZ', # 万科A
'600036.SH', # 招商银行
'600519.SH', # 贵州茅台
'000858.SZ', # 五粮液
]
# 批量获取数据
def get_stock_data(stock_code, start_date, end_date):
"""获取单只股票数据"""
try:
df = pro.daily(ts_code=stock_code,
start_date=start_date,
end_date=end_date)
df['date'] = pd.to_datetime(df['trade_date'])
df = df.sort_values('date').reset_index(drop=True)
return df
except Exception as e:
print(f"获取 {stock_code} 数据失败: {e}")
return None
# 使用示例
stock_data = get_stock_data('000001.SZ', '20200101', '20221231')
特征工程
技术指标计算
def calculate_technical_indicators(df):
"""计算技术指标"""
# 价格区间
df['range_hl'] = df['high'] - df['low'] # 最高价-最低价
df['range_oc'] = df['open'] - df['close'] # 开盘价-收盘价
# 价格变化率
df['pct_change'] = df['adj_close'].pct_change()
# 波动率(5日滚动标准差)
df['volatility'] = df['pct_change'].rolling(window=5).std()
# RSI指标
def calculate_rsi(prices, window=14):
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
df['rsi'] = calculate_rsi(df['close'])
# 布林带
df['bb_middle'] = df['close'].rolling(window=20).mean()
df['bb_std'] = df['close'].rolling(window=20).std()
df['bb_upper'] = df['bb_middle'] + (df['bb_std'] * 2)
df['bb_lower'] = df['bb_middle'] - (df['bb_std'] * 2)
# MACD
exp1 = df['close'].ewm(span=12).mean()
exp2 = df['close'].ewm(span=26).mean()
df['macd'] = exp1 - exp2
df['macd_signal'] = df['macd'].ewm(span=9).mean()
df['macd_hist'] = df['macd'] - df['macd_signal']
return df
# 应用技术指标
df = calculate_technical_indicators(df)
滞后特征构建
def create_lag_features(df, target_col='adj_close', N=3):
"""创建滞后特征"""
df = df.copy()
# 需要滞后处理的列
lag_cols = [
'adj_close', 'range_hl', 'range_oc', 'volume',
'pct_change', 'volatility', 'rsi', 'macd'
]
# 创建滞 后特征
shift_range = list(range(1, N + 1))
for col in lag_cols:
if col in df.columns:
for i in shift_range:
new_col = f'{col}_lag_{i}'
df[new_col] = df[col].shift(i)
# 删除包含NaN的行
df = df[N:].reset_index(drop=True)
return df
# 创建滞后特征
N = 3 # 使用过去3天的数据预测
df_with_lags = create_lag_features(df, N=N)
print(f"原始数据形状: {df.shape}")
print(f"滞后特征后形状: {df_with_lags.shape}")
print(f"新增特征: {df_with_lags.shape[1] - df.shape[1]}")
移动平均线计算
import numpy as np
import copy
def get_mov_avg_std(df, col, N):
"""
计算移动平均和移动标准差
用于数据标准化
"""
# 计算移动平均
mean_list = df[col].rolling(window=N, min_periods=1).mean()
# 计算移动标准差
std_list = df[col].rolling(window=N, min_periods=1).std()
# 前移一位(避免未来信息泄露)
mean_list = np.concatenate((np.array([np.nan]), np.array(mean_list[:-1])))
std_list = np.concatenate((np.array([np.nan]), np.array(std_list[:-1])))
# 创建新DataFrame
df_out = df.copy()
df_out[col + '_mean'] = mean_list
df_out[col + '_std'] = std_list
return df_out
# 应用移动平均
df_ma = get_mov_avg_std(df_with_lags, "close", 3)
print(df_ma[['date', 'close', 'close_mean', 'close_std']].head())
数据预处理
数据标准化
def scale_row(row, feat_mean, feat_std):
"""
标准化单行数据
"""
feat_std = 0.001 if feat_std == 0 else feat_std
row_scaled = (row - feat_mean) / feat_std
return row_scaled
# 需要标准化的列
cols_list = [
"adj_close", "range_hl", "range_oc", "volume",
"pct_change", "volatility", "rsi", "macd"
]
# 对所有列应用移动平均标准化
for col in cols_list:
df_ma = get_mov_avg_std(df_ma, col, N)
# 使用sklearn的StandardScaler进行特征标准化
from sklearn.preprocessing import StandardScaler
cols_to_scale = ["adj_close"]
# 添加滞后特征到缩放列表
for i in range(1, N + 1):
cols_to_scale.extend([
f"adj_close_lag_{i}",
f"range_hl_lag_{i}",
f"range_oc_lag_{i}",
f"volume_lag_{i}",
f"pct_change_lag_{i}",
f"volatility_lag_{i}",
f"rsi_lag_{i}",
f"macd_lag_{i}"
])
# 训练集标准化
scaler = StandardScaler()
train_scaled = scaler.fit_transform(df_ma[cols_to_scale])
train_scaled = pd.DataFrame(train_scaled, columns=cols_to_scale)
train_scaled[['date', 'month']] = df_ma.reset_index()[['date', 'month']]
# 测试集标准化
test_scaled = df_ma[['date']]
for col in tqdm.tqdm(cols_list):
feat_list = [col + f'_lag_{shift}' for shift in range(1, N + 1)]
temp = df_ma.apply(
lambda row: scale_row(
row[feat_list],
row[col + '_mean'],
row[col + '_std']
),
axis=1
)
test_scaled = pd.concat([test_scaled, temp], axis=1)
print("数据标准化完成")
print(f"训练集标准化后形状: {train_scaled.shape}")
print(f"测试集标准化后形状: {test_scaled.shape}")
训练集与测试集划分
# 设置测试集比例
test_size = 0.2
num_test = int(test_size * len(df_ma))
num_train = len(df_ma) - num_test
# 划分数据
train = df_ma[:num_train]
test = df_ma[num_train:]
print(f"训练集大小: {len(train)} ({len(train)/len(df_ma)*100:.1f}%)")
print(f"测试集大小: {len(test)} ({len(test)/len(df_ma)*100:.1f}%)")
print(f"训练集日期范围: {train['date'].min()} 到 {train['date'].max()}")
print(f"测试集日期范围: {test['date'].min()} 到 {test['date'].max()}")
# 划分特征和标签
features = []
for i in range(1, N + 1):
features.extend([
f"adj_close_lag_{i}",
f"range_hl_lag_{i}",
f"range_oc_lag_{i}",
f"volume_lag_{i}"
])
target = "adj_close"
# 原始数据
X_train = train[features]
y_train = train[target]
X_test = test[features]
y_test = test[target]
# 标准化数据
X_train_scaled = train_scaled[features]
y_train_scaled = train_scaled[target]
X_test_scaled = test_scaled[features]
print(f"特征数量: {len(features)}")
print(f"特征列表: {features}")
模型训练与调优
参数设置
from xgboost import XGBRegressor
import math
# 基础参数
model_seed = 100
# XGBoost参数说明
xgb_params = {
'n_estimators': 100, # 树的数量
'max_depth': 3, # 树的最大深度
'learning_rate': 0.1, # 学习率
'min_child_weight': 1, # 叶子节点最小权重
'subsample': 1, # 采样比例
'colsample_bytree': 1, # 特征采样比例
'colsample_bylevel': 1, # 层级特征采样比例
'gamma': 0, # 最小损失减少
'reg_alpha': 0, # L1正则化
'reg_lambda': 1, # L2正则化
'eval_metric': 'rmse', # 评估指标
'random_state': model_seed # 随机种子
}
# 创建模型
model = XGBRegressor(**xgb_params)
网格搜索调优
from sklearn.model_selection import GridSearchCV
# 网格搜索参数
parameters = {
'n_estimators': [50, 100, 200],
'max_depth': [3, 5, 7],
'learning_rate': [0.05, 0.1, 0.2],
'min_child_weight': range(1, 11, 2),
'subsample': [0.8, 0.9, 1.0],
'colsample_bytree': [0.8, 0.9, 1.0]
}
# 网格搜索(使用5折交叉验证)
print("开始网格搜索调优...")
print(f"参数组合数: {len(parameters['n_estimators']) * len(parameters['max_depth']) * len(parameters['learning_rate'])}")
# 使用较小的参数范围进行快速调优
quick_parameters = {
'n_estimators': [90, 100, 110],
'max_depth': [5, 7, 9],
'learning_rate': [0.1, 0.15, 0.2],
'min_child_weight': range(5, 21, 5)
}
grid_search = GridSearchCV(
estimator=model,
param_grid=quick_parameters,
cv=5,
refit=True,
scoring='neg_mean_squared_error',
verbose=1,
n_jobs=-1
)
# 训练模型
grid_search.fit(X_train_scaled, y_train_scaled)
print("\n网格搜索完成!")
print(f"最优参数: {grid_search.best_params_}")
print(f"最优分数: {grid_search.best_score_:.4f}")
# 获取最优模型
best_model = grid_search.best_estimator_
模型训练
# 使用最优参数重新训练
print("\n使用最优参数训练最终模型...")
final_model = XGBRegressor(
**grid_search.best_params_,
eval_metric='rmse',
random_state=model_seed
)
# 训练模型
final_model.fit(X_train_scaled, y_train_scaled)
# 特征重要性
feature_importance = final_model.feature_importances_
feature_names = features
# 创建特征重要性DataFrame
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': feature_importance
}).sort_values('importance', ascending=False)
print("\n前10个最重要特征:")
print(importance_df.head(10))
模型评估
预测结果可视化
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
# 训练集预测
train_pred_scaled = final_model.predict(X_train_scaled)
train['pred'] = train_pred_scaled * math.sqrt(scaler.var_[0]) + scaler.mean_[0]
# 测试集预测
test_pred_scaled = final_model.predict(X_test_scaled)
test['pred'] = test_pred_scaled * test['adj_close_std'] + test['adj_close_mean']
# 可视化结果
plt.figure(figsize=(15, 10))
# 子图1:训练集预测结果
plt.subplot(2, 2, 1)
plt.plot(train['date'], train['adj_close'], label='实际价格', alpha=0.8)
plt.plot(train['date'], train['pred'], label='预测价格', alpha=0.8)
plt.title('训练集预测结果')
plt.xlabel('日期')
plt.ylabel('股价')
plt.legend()
plt.grid(True)
# 子图2:测试集预测结果
plt.subplot(2, 2, 2)
plt.plot(test['date'], test['adj_close'], label='实际价格', alpha=0.8)
plt.plot(test['date'], test['pred'], label='预测价格', alpha=0.8)
plt.title('测试集预测结果')
plt.xlabel('日期')
plt.ylabel('股价')
plt.legend()
plt.grid(True)
# 子图3:预测误差分布
plt.subplot(2, 2, 3)
train_error = train['adj_close'] - train['pred']
test_error = test['adj_close'] - test['pred']
plt.hist(train_error, bins=30, alpha=0.5, label='训练集误差')
plt.hist(test_error, bins=30, alpha=0.5, label='测试集误差')
plt.title('预测误差分布')
plt.xlabel('误差')
plt.ylabel('频次')
plt.legend()
plt.grid(True)
# 子图4:特征重要性
plt.subplot(2, 2, 4)
top_features = importance_df.head(10)
plt.barh(range(len(top_features)), top_features['importance'])
plt.yticks(range(len(top_features)), top_features['feature'])
plt.title('特征重要性 (Top 10)')
plt.xlabel('重要性')
plt.grid(True)
plt.tight_layout()
plt.show()
性能指标计算
# 计算评估指标
def calculate_metrics(y_true, y_pred):
"""计算预测性能指标"""
mse = mean_squared_error(y_true, y_pred)
rmse = math.sqrt(mse)
mae = mean_absolute_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
# MAPE (平均绝对百分比误差)
mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
return {
'MSE': mse,
'RMSE': rmse,
'MAE': mae,
'R²': r2,
'MAPE(%)': mape
}
# 训练集性能
train_metrics = calculate_metrics(train['adj_close'], train['pred'])
print("