184 lines
5.3 KiB
Python
184 lines
5.3 KiB
Python
"""Technical analysis indicators implementation."""
|
||
from typing import List, Optional, Sequence
|
||
import numpy as np
|
||
from .indicators import rolling_mean
|
||
|
||
def rsi(prices: Sequence[float], period: int = 14) -> Optional[float]:
|
||
"""计算相对强弱指标(RSI)。
|
||
|
||
Args:
|
||
prices: 价格序列,从新到旧排序
|
||
period: RSI周期,默认14天
|
||
|
||
Returns:
|
||
RSI值 (0-100) 或 None(数据不足时)
|
||
"""
|
||
if len(prices) < period + 1:
|
||
return None
|
||
|
||
# 计算价格变化
|
||
deltas = [prices[i] - prices[i+1] for i in range(len(prices)-1)]
|
||
deltas = deltas[:period] # 只使用所需周期的数据
|
||
|
||
gain = [delta if delta > 0 else 0 for delta in deltas]
|
||
loss = [-delta if delta < 0 else 0 for delta in deltas]
|
||
|
||
avg_gain = sum(gain) / period
|
||
avg_loss = sum(loss) / period
|
||
|
||
if avg_loss == 0:
|
||
return 100.0
|
||
|
||
rs = avg_gain / avg_loss
|
||
rsi = 100 - (100 / (1 + rs))
|
||
return rsi
|
||
|
||
def macd(prices: Sequence[float],
|
||
fast_period: int = 12,
|
||
slow_period: int = 26,
|
||
signal_period: int = 9) -> Optional[float]:
|
||
"""计算MACD信号。
|
||
|
||
Args:
|
||
prices: 价格序列,从新到旧排序
|
||
fast_period: 快线周期
|
||
slow_period: 慢线周期
|
||
signal_period: 信号线周期
|
||
|
||
Returns:
|
||
MACD柱状值,或None(数据不足时)
|
||
"""
|
||
if len(prices) < slow_period + signal_period:
|
||
return None
|
||
|
||
# 计算快慢线EMA
|
||
fast_ema = _ema(prices, fast_period)
|
||
slow_ema = _ema(prices, slow_period)
|
||
if fast_ema is None or slow_ema is None:
|
||
return None
|
||
|
||
# 计算MACD线
|
||
macd_line = fast_ema - slow_ema
|
||
|
||
# 计算信号线
|
||
macd_values = []
|
||
for i in range(len(prices) - slow_period + 1):
|
||
fast_ema = _ema(prices[i:], fast_period)
|
||
slow_ema = _ema(prices[i:], slow_period)
|
||
if fast_ema is not None and slow_ema is not None:
|
||
macd_values.append(fast_ema - slow_ema)
|
||
|
||
if len(macd_values) < signal_period:
|
||
return None
|
||
|
||
signal_line = _ema(macd_values, signal_period)
|
||
if signal_line is None:
|
||
return None
|
||
|
||
# 返回MACD柱状图值
|
||
return macd_line - signal_line
|
||
|
||
def _ema(data: Sequence[float], period: int) -> Optional[float]:
|
||
"""计算指数移动平均。"""
|
||
if len(data) < period:
|
||
return None
|
||
|
||
multiplier = 2 / (period + 1)
|
||
ema = data[0]
|
||
for price in data[1:period]:
|
||
ema = (price - ema) * multiplier + ema
|
||
return ema
|
||
|
||
def bollinger_bands(prices: Sequence[float],
|
||
period: int = 20,
|
||
std_dev: float = 2.0) -> Optional[float]:
|
||
"""计算布林带位置。
|
||
|
||
Args:
|
||
prices: 价格序列,从新到旧排序
|
||
period: 移动平均周期
|
||
std_dev: 标准差倍数
|
||
|
||
Returns:
|
||
价格在布林带中的位置(-1到1),或None(数据不足时)
|
||
"""
|
||
if len(prices) < period:
|
||
return None
|
||
|
||
# 获取周期内数据
|
||
price_window = prices[:period]
|
||
|
||
# 计算移动平均和标准差
|
||
ma = sum(price_window) / period
|
||
std = np.std(price_window)
|
||
|
||
# 计算布林带
|
||
upper = ma + (std_dev * std)
|
||
lower = ma - (std_dev * std)
|
||
|
||
# 计算当前价格在带中的位置
|
||
current_price = prices[0]
|
||
band_width = upper - lower
|
||
if band_width == 0:
|
||
return 0
|
||
|
||
position = 2 * (current_price - lower) / band_width - 1
|
||
return max(-1, min(1, position)) # 确保值在-1到1之间
|
||
|
||
def obv_momentum(volumes: Sequence[float],
|
||
prices: Sequence[float],
|
||
period: int = 20) -> Optional[float]:
|
||
"""计算能量潮(OBV)动量。
|
||
|
||
Args:
|
||
volumes: 成交量序列,从新到旧排序
|
||
prices: 价格序列,从新到旧排序
|
||
period: 计算周期
|
||
|
||
Returns:
|
||
OBV动量值,或None(数据不足时)
|
||
"""
|
||
if len(volumes) < period + 1 or len(prices) < period + 1:
|
||
return None
|
||
|
||
# 计算OBV序列
|
||
obv = [0.0] # 初始OBV值
|
||
for i in range(1, period):
|
||
price_change = prices[i-1] - prices[i]
|
||
if price_change > 0:
|
||
obv.append(obv[-1] + volumes[i-1])
|
||
elif price_change < 0:
|
||
obv.append(obv[-1] - volumes[i-1])
|
||
else:
|
||
obv.append(obv[-1])
|
||
|
||
# 计算OBV动量(当前值与N日前的差值)
|
||
obv_momentum = (obv[0] - obv[-1]) / sum(volumes[:period])
|
||
return obv_momentum
|
||
|
||
def price_volume_trend(prices: Sequence[float],
|
||
volumes: Sequence[float],
|
||
period: int = 20) -> Optional[float]:
|
||
"""计算价量趋势指标。
|
||
|
||
Args:
|
||
prices: 价格序列,从新到旧排序
|
||
volumes: 成交量序列,从新到旧排序
|
||
period: 计算周期
|
||
|
||
Returns:
|
||
价量趋势值,或None(数据不足时)
|
||
"""
|
||
if len(prices) < period or len(volumes) < period:
|
||
return None
|
||
|
||
# 计算价格变动和成交量的乘积
|
||
pv_values = []
|
||
for i in range(period-1):
|
||
price_change = (prices[i] - prices[i+1]) / prices[i+1]
|
||
pv_values.append(price_change * volumes[i])
|
||
|
||
# 使用移动平均平滑处理
|
||
pv_trend = sum(pv_values) / sum(volumes[:period])
|
||
return pv_trend
|