筹码分布
筹码分布是股票分析里面的一个比较简单的部分,通过查看筹码的分布图形,判断当前的股票的压力的指数,通过查看获利的比例来计算有多少人愿意出,有多少人愿意保持价格
目前的很多工具也是可以直接去查看的,但是因为可能需要直接计算多个股票的筹码分布和历史分布,所以我们需要使用的代码来进行一些量化计算,当然需要获取一些数据,这个大家可以依靠自己的数据的接口来获取
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import tushare as ts#plt字体
plt.rcParams['font.family'] = ['SimHei']
def prepare_data(df: pd.DataFrame,latest_factor:float) -> pd.DataFrame:"""预处理 tushare 数据- vol: 成交量(手)需要转换为股数- close_x: 收盘价- adj_factor: 复权因子"""data = df.copy()# 将成交量从手转换为股data['volume'] = data['vol'] * 100# 计算复权后的收盘价#latest_factor = data['adj_factor'].iloc[-1]data['adj_close'] = data['close_x'] * data['adj_factor'] / latest_factor# 确保日期格式正确data['trade_date'] = pd.to_datetime(data['trade_date'])return dataclass ChipDistribution:def __init__(self):self.distribution: Dict[float, float] = {}self.fifo_queue: List[Tuple[float, float]] = []def adjust_price_and_volume(self, price: float, volume: float, old_factor: float, new_factor: float) -> Tuple[float, float]:"""同时调整价格和成交量"""ratio = new_factor / old_factornew_price = price * rationew_volume = volume / ratioreturn new_price, new_volumedef adjust_distribution(self, old_factor: float, new_factor: float) -> None:"""调整整个分布的价格和成交量"""new_distribution = {}new_queue = []for price, volume in self.distribution.items():new_price, new_volume = self.adjust_price_and_volume(price, volume, old_factor, new_factor)new_distribution[new_price] = new_volumefor price, volume in self.fifo_queue:new_price, new_volume = self.adjust_price_and_volume(price, volume, old_factor, new_factor)new_queue.append((new_price, new_volume))self.distribution = new_distributionself.fifo_queue = new_queuedef initialize_distribution(self, historical_data: pd.DataFrame) -> None:"""初始化筹码分布"""init_period = min(60, len(historical_data))data = historical_data.iloc[:init_period]latest_factor = historical_data['adj_factor'].iloc[-1]# 调整成交量adjusted_volume = data['volume'] * (data['adj_factor'] / latest_factor)# 计算VWAPvwap = (data['adj_close'] * adjusted_volume).sum() / adjusted_volume.sum()total_shares = adjusted_volume.sum()# 计算价格范围price_std = data['adj_close'].std()price_range = np.linspace(vwap - 3*price_std, vwap + 3*price_std, 100)# 生成正态分布distribution = np.exp(-((price_range - vwap)**2) / (2*price_std**2))distribution = distribution / distribution.sum() * total_sharesself.distribution = dict(zip(price_range, distribution))self.fifo_queue = [(price, volume) for price, volume in zip(price_range, distribution)]def process_transaction(self, price: float, volume: float, is_buy: bool) -> None:"""处理单次交易"""if is_buy:self.distribution[price] = self.distribution.get(price, 0) + volumeself.fifo_queue.append((price, volume))else:remain_volume = volumewhile remain_volume > 0 and self.fifo_queue:oldest_price, oldest_volume = self.fifo_queue[0]if oldest_volume <= remain_volume:remain_volume -= oldest_volumeself.distribution[oldest_price] -= oldest_volumeif self.distribution[oldest_price] <= 0:del self.distribution[oldest_price]self.fifo_queue.pop(0)else:self.distribution[oldest_price] -= remain_volumeself.fifo_queue[0] = (oldest_price, oldest_volume - remain_volume)remain_volume = 0def calculate_distribution(self, data: pd.DataFrame) -> None:"""计算整体筹码分布"""self.initialize_distribution(data)latest_factor = data['adj_factor'].iloc[-1]prev_factor = latest_factorfor _, row in data.iterrows():if row['adj_factor'] != prev_factor:self.adjust_distribution(prev_factor, row['adj_factor'])prev_factor = row['adj_factor']adjusted_volume = row['volume'] * (row['adj_factor'] / latest_factor)adj_price = row['adj_close']self.process_transaction(adj_price, adjusted_volume/2, True)self.process_transaction(adj_price, adjusted_volume/2, False)def plot_distribution(self, current_price: float, title: str = None) -> None:"""绘制筹码分布图"""if not self.distribution:returnprices = np.array(list(self.distribution.keys()))volumes = np.array(list(self.distribution.values()))volumes = volumes / volumes.sum()profit_ratio = sum(v for p, v in zip(prices, volumes) if p <= current_price)loss_ratio = 1 - profit_ratioplt.figure(figsize=(10, 12))profit_mask = prices <= current_priceloss_mask = prices > current_priceplt.barh(prices[profit_mask], volumes[profit_mask], height=0.1, alpha=0.6, color='red', label='获利筹码')plt.barh(prices[loss_mask], volumes[loss_mask], height=0.1, alpha=0.6, color='green', label='套牢筹码')plt.axhline(y=current_price, color='blue', linestyle='--', label=f'当前价格: {current_price:.2f}')plt.xlabel('筹码占比')plt.ylabel('价格')if title:plt.title(f'{title}\n获利: {profit_ratio:.2%} 套牢: {loss_ratio:.2%}')else:plt.title(f'筹码分布\n获利: {profit_ratio:.2%} 套牢: {loss_ratio:.2%}')plt.grid(True, alpha=0.3)plt.legend()price_range = current_price * 0.5plt.ylim(current_price - price_range, current_price + price_range)plt.tight_layout()plt.show()def analyze_chip_distribution(df: pd.DataFrame,latest_adj_factor: float, stock_code: str = None) -> ChipDistribution:"""主函数:分析并展示筹码分布"""# 预处理数据data = prepare_data(df,latest_adj_factor)# 按日期排序data = data.sort_values('trade_date')# 创建筹码分布对象chip_dist = ChipDistribution()# 计算分布chip_dist.calculate_distribution(data)# 绘制分布图current_price = data['adj_close'].iloc[-1]title = f'筹码分布 - {stock_code}' if stock_code else Nonechip_dist.plot_distribution(current_price, title)return chip_dist# 使用示例:
"""
# 假设 df 是从 tushare 获取的数据
chip_dist = analyze_chip_distribution(df, '000001.SZ')
"""#使用tushare获取过去1-2年的价格数据,复权因子的数据,成交量的数据
def get_stock_data_chip_distribution(ts_code,start_date,end_date):#具体的获取数据的方法在这,#df里面需要包含历史的close的价格,成交量,adj复权因子等analyze_chip_distribution(df,latest_adj_factor, stock_code=ts_code)return df # 分析筹码分布
chip_dist = get_stock_data_chip_distribution("688268.SH","20220101","20241211")
这个是我计算出来的结果,20241212,基本上和我在招商证券上看到的筹码分布图形非常类似,大家可以作为参考自己去评估和摸索
Talk is cheap , 直接分享了code,拿走不谢