K-Means 聚类并将数据分成多个簇,可以使用以下方法:
实现思路
完整代码:
import torch
import matplotlib.pyplot as pltdef kmeans(X, k, max_iters=100, tol=1e-4):"""使用 PyTorch 实现 K-Means 聚类,并返回聚类结果:param X: (n, d) 输入数据:param k: 簇的个数:param max_iters: 最大迭代次数:param tol: 收敛阈值:return: (最终聚类中心, 每个样本的簇索引)"""device = torch.device("cuda" if torch.cuda.is_available() else "cpu")X = X.to(device)n, d = X.shapeindices = torch.randperm(n)[:k] # 随机选择 k 个数据点作为初始聚类中心centroids = X[indices].clone()for i in range(max_iters):distances = torch.cdist(X, centroids) # 计算所有点到聚类中心的欧式距离cluster_assignments = torch.argmin(distances, dim=1) # 分配每个点到最近的簇new_centroids = torch.stack([X[cluster_assignments == j].mean(dim=0) if (cluster_assignments == j).sum() > 0else centroids[j] # 避免空簇for j in range(k)])shift = torch.norm(new_centroids - centroids, p=2) # 计算变化量if shift < tol:print(f'K-Means 提前收敛于第 {i+1} 轮')breakcentroids = new_centroidsreturn centroids.cpu(), cluster_assignments.cpu()# 生成数据
torch.manual_seed(42)
X = torch.randn(200, 2) # 200 个 2D 点
k = 3# 运行 K-Means
centroids, labels = kmeans(X, k)# 输出最终结果
print("最终聚类中心:")
print(centroids)# 统计每个簇的样本数量
for i in range(k):count = (labels == i).sum().item()print(f"簇 {i} 的数据点数量: {count}")# 可视化聚类结果
def plot_kmeans(X, labels, centroids, k):"""可视化 K-Means 聚类结果:param X: 数据点:param labels: 聚类标签:param centroids: 聚类中心:param k: 簇的个数"""X = X.numpy()labels = labels.numpy()centroids = centroids.numpy()plt.figure(figsize=(8, 6))# 画出每个簇的点colors = ['r', 'g', 'b', 'c', 'm', 'y', 'k']for i in range(k):plt.scatter(X[labels == i, 0], X[labels == i, 1],c=colors[i % len(colors)], label=f'Cluster {i}', alpha=0.6)# 画出聚类中心plt.scatter(centroids[:, 0], centroids[:, 1],c='black', marker='X', s=200, label='Centroids')plt.legend()plt.title("K-Means Clustering using PyTorch")plt.xlabel("Feature 1")plt.ylabel("Feature 2")plt.grid()plt.show()# 绘制聚类结果
plot_kmeans(X, labels, centroids, k)
备注:
- 初始化:
- 采用
torch.randperm(n)[:k]
选择k
个数据点作为初始聚类中心。
- 采用
- 计算距离:
torch.cdist(X, centroids)
计算所有点到各个聚类中心的欧式距离。
- 分配簇:
torch.argmin(distances, dim=1)
选择最近的聚类中心。
- 更新中心:
X[cluster_assignments == j].mean(dim=0)
计算每个簇的新中心。- 如果某个簇为空,保持原来的中心不变,避免空簇问题。
- 判断收敛:
torch.norm(new_centroids - centroids, p=2)
计算中心点的移动量,若小于阈值tol
,则提前终止。
- 按簇分类数据:
clusters = [X[labels == i] for i in range(k)]
将数据划分到不同簇。