滑动均值和标准差
为了更好利用向量化来加速,滑动窗口使用np.lib.stride_tricks.sliding_window_view(x, win)
提取,它会返回所有x[i]
开头并且长度为win
的数组的数组。
def rolling(x, win):r = np.lib.stride_tricks.sliding_window_view(x, win)pad = np.zeros([len(x) - len(r), win]) * np.nanreturn np.vstack([pad, r])def rolling_mean(x, win):return rolling(x, win).mean(-1)def rolling_std(x, win):return rolling(x, win).std(-1)
布林带
def bollinger(close, win=10, nstd=2):means = rolling_mean(close, win)stds = rolling_std(close, win)upper = means + nstd * stdslower = means - nstd * stdsreturn upper, means, lower
指数滑动均值
这是原始实现:
# 计算指数平滑
# y[i] = alpha * x[i] + (1 - alpha) * y[i - 1]
def exp_smooth_naive(x, alpha):y = x.copy()for i in range(1, len(y)):y[i] = y[i] * alpha + y[i - 1] * (1 - alpha)return y
原始公式是递归的,需要改成通项才能向量化,这是推导过程:
y[0] = x[0] = inity[t] = alpha * x[t] + (1-alpha) * y[t-1]= alpha * x[t] + (1-alpha) * alpha * x[t-1] + (1-alpha) ** 2 * y[t-2]= alpha * x[t] + (1-alpha) * alpha * x[t-1] + ... + (1-alpha)** t * init= alpha * x[t] + (1-alpha) * alpha * x[t-1] + ... + alpha * (1-alpha)** t * init + (1 - alpha) ** (t + 1) * init= Σ(alpha * (1 - alpha) ** i * x[t-i]; i: 0 -> t) + (1 - alpha) ** (t + 1) * initcorr[i] = alpha * (1-alpha) ** i
supl[t] = (1 - alpha) ** (t + 1) * init y[t] = Σ(corr[i] * x(t-i); i: 0 -> t) + supl[t]
y = conv(corr, x) + supl
这就完成了向量化,因为 NumPy 或者 PyTorch 都针对卷积做了特殊优化。
def exp_smooth_vec(x, alpha):init, n = x[0], len(x)corr = alpha * (1 - alpha) ** np.arange(0, n)supl = (1 - alpha) ** (np.arange(0, n) + 1) * inity = np.convolve(corr, x, 'full')[:n] + suplreturn yexp_smooth = exp_smooth_vecdef rolling_ema(x, win):x = np.asarray(x)alpha = 2 / (win + 1.0)return exp_smooth(x, alpha)
MACD
def macd(close, fast_win=12, slow_win=26, sig_win=9):fast = rolling_ema(close, fast_win)slow = rolling_ema(close, slow_win)dif = fast - slowdea = rolling_ema(dif, sig_win)macd_ = dif - dea * 2return macd_, dif, dea
RSI
def rsi(close, win=3):change = np.diff(close)up = np.where(change > 0, change, 0)down = np.where(change < 0, change, 0)sum_up = rolling(up, win).sum(-1)sum_down = rolling(down, win).sum(-1)eps = 1e-12rs = sum_up / (sum_down + eps)rsi_ = 100 - 100 / (1 + rs)return np.hstack([[np.nan], rsi_])
KDJ
def kdj(close, low, high, n=9):hn = rolling(high, n).max(-1)ln = rolling(low, n).min(-1)rsv = (close - ln) / (hn - ln) * 100rsv = [x for x in rsv if not np.isnan(x)]rsv = np.hstack([[50], rsv])k = exp_smooth(rsv, 2/3)d = exp_smooth(k, 2/3)j = 3 * k - 2 * dpad = [np.nan] * (len(close) - len(k))k = np.hstack([pad, k])d = np.hstack([pad, d])j = np.hstack([pad, j])return k, d, j
OBV
def obv(close, vol):change = np.diff(close)sig = np.hstack([[1], np.sign(change)])obv_ = np.cumsum(vol * sig)return obv_