下面这个是星主的源码:
@staticmethod
def trend_score(series: pd.Series, period=25):
def _trend_score(close, period=25):
"""
向量化计算趋势评分:年化收益率 × R平方
:param close: 收盘价序列(np.array或pd.Series)
:param period: 计算窗口长度,默认25天
:return: 趋势评分数组,长度与输入相同,前period-1位为NaN
"""
if len(close) < period:
return np.full_like(close, np.nan)
y = np.log(close)
windows = np.lib.stride_tricks.sliding_window_view(y, window_shape=period)
x = np.arange(period)
# 预计算固定值
n = period
sum_x = x.sum()
sum_x2 = (x ** 2).sum()
denominator = n * sum_x2 - sum_x ** 2
# 滑动窗口统计量
sum_y = windows.sum(axis=1)
sum_xy = (windows * x).sum(axis=1)
# 回归系数
slope = (n * sum_xy - sum_x * sum_y) / denominator
intercept = (sum_y - slope * sum_x) / n
# 年化收益率
annualized_returns = np.exp(slope * 250) - 1
# R平方计算
y_pred = slope[:, None] * x + intercept[:, None]
residuals = windows - y_pred
ss_res = np.sum(residuals ** 2, axis=1)
sum_y2 = np.sum(windows ** 2, axis=1)
ss_tot = sum_y2 - (sum_y ** 2) / n
r_squared = 1 - (ss_res / ss_tot)
r_squared = np.nan_to_num(r_squared, nan=0.0) # 处理零方差情况
# 综合评分
score = annualized_returns * r_squared
# 对齐原始序列长度
full_score = np.full_like(y, np.nan)
full_score = pd.Series(index=close.index)
full_score[period - 1:] = score
return full_score
return series.groupby(level='symbol', group_keys=False).apply(_trend_score)