Python 实现 K-Means 聚类算法:完整代码 + 详细注释
K-Means 聚类是一种无监督学习算法,用于将数据集中的数据点划分为 K 个不同的簇(cluster)。每个簇由其中心点(centroid)表示,算法的目标是最小化每个数据点与其所属簇中心点之间的距离。K-Means 算法简单、高效,广泛应用于数据挖掘、图像处理、模式识别等领域。
本文将详细介绍如何使用 Python 从零开始实现 K-Means 聚类算法,包括完整的代码和详细的注释,帮助你深入理解算法的原理和实现细节。
1. K-Means 算法原理
K-Means 算法的核心思想是通过迭代更新簇中心点和数据点所属簇的关系,最终使得每个数据点与其所属簇中心点之间的距离之和最小化。算法的具体步骤如下:
-
初始化:
- 随机选择 K 个数据点作为初始簇中心点。
- 或者使用更智能的初始化方法,如 K-Means++。
-
分配数据点:
- 对于每个数据点,计算其与所有簇中心点的距离(通常使用欧氏距离)。
- 将数据点分配给距离其最近的簇中心点所在的簇。
-
更新簇中心点:
- 对于每个簇,计算该簇中所有数据点的平均值(即坐标的平均值),并将该平均值作为新的簇中心点。
-
重复步骤 2 和 3:
- 重复执行“分配数据点”和“更新簇中心点”这两个步骤,直到满足以下条件之一:
- 簇中心点不再发生变化(或变化很小)。
- 达到预定义的最大迭代次数。
- 重复执行“分配数据点”和“更新簇中心点”这两个步骤,直到满足以下条件之一:
目标函数:
K-Means 算法的目标是最小化所有数据点与其所属簇中心点之间的距离平方和,也称为簇内平方和(Within-Cluster Sum of Squares,WCSS)。
WCSS = Σ Σ ||xᵢ – μₖ||²
其中:
xᵢ
表示第 i 个数据点。μₖ
表示第 k 个簇的中心点。- 第一个 Σ 对所有数据点求和。
- 第二个 Σ 对每个簇内的所有数据点求和。
2. Python 代码实现
下面是使用 Python 实现 K-Means 聚类算法的完整代码,包括详细的注释:
“`python
import numpy as np
import matplotlib.pyplot as plt
import random
class KMeans:
“””
K-Means 聚类算法实现
“””
def __init__(self, n_clusters=2, max_iters=100, tol=1e-4, init_method='random'):
"""
初始化 K-Means 模型
Args:
n_clusters (int): 簇的数量 (K 值).
max_iters (int): 最大迭代次数.
tol (float): 簇中心点变化的容差,如果变化小于此值,则停止迭代.
init_method (str): 初始化簇中心点的方法 ('random' 或 'kmeans++').
"""
self.n_clusters = n_clusters
self.max_iters = max_iters
self.tol = tol
self.init_method = init_method
self.centroids = None # 簇中心点
self.labels = None # 每个数据点所属的簇标签
def _initialize_centroids(self, X):
"""
初始化簇中心点
Args:
X (numpy.ndarray): 数据集,形状为 (n_samples, n_features).
Returns:
numpy.ndarray: 初始化的簇中心点,形状为 (n_clusters, n_features).
"""
n_samples = X.shape[0]
if self.init_method == 'random':
# 随机选择 K 个数据点作为初始簇中心点
random_indices = random.sample(range(n_samples), self.n_clusters)
return X[random_indices]
elif self.init_method == 'kmeans++':
# 使用 K-Means++ 算法初始化簇中心点
centroids = [X[random.choice(range(n_samples))]] #先随机取第一个点
for _ in range(1, self.n_clusters):
# 计算每个点到当前所有簇中心点的最短距离
distances = np.array([min([np.linalg.norm(x - c)**2 for c in centroids]) for x in X])
# 根据距离的平方作为权重,选择下一个簇中心点 (距离越远的点,被选中的概率越大)
probabilities = distances / distances.sum()
cumulative_probabilities = probabilities.cumsum()
r = random.random()
for j, p in enumerate(cumulative_probabilities):
if r < p:
centroids.append(X[j])
break
return np.array(centroids)
else:
raise ValueError("Invalid init_method. Choose 'random' or 'kmeans++'.")
def _assign_labels(self, X):
"""
为每个数据点分配簇标签
Args:
X (numpy.ndarray): 数据集,形状为 (n_samples, n_features).
Returns:
numpy.ndarray: 每个数据点的簇标签,形状为 (n_samples,).
"""
# 计算每个数据点到每个簇中心点的距离
distances = np.array([[np.linalg.norm(x - c) for c in self.centroids] for x in X])
# 将数据点分配给距离最近的簇中心点所在的簇
return np.argmin(distances, axis=1)
def _update_centroids(self, X):
"""
更新簇中心点
Args:
X (numpy.ndarray): 数据集,形状为 (n_samples, n_features).
Returns:
numpy.ndarray: 更新后的簇中心点,形状为 (n_clusters, n_features).
"""
new_centroids = np.zeros((self.n_clusters, X.shape[1]))
for k in range(self.n_clusters):
# 计算每个簇中所有数据点的平均值
cluster_points = X[self.labels == k]
if len(cluster_points) > 0: # 避免空簇
new_centroids[k] = np.mean(cluster_points, axis=0)
else:
#如果出现空簇,则重新随机初始化该簇中心点
new_centroids[k] = X[random.choice(range(X.shape[0]))]
return new_centroids
def fit(self, X):
"""
训练 K-Means 模型
Args:
X (numpy.ndarray): 数据集,形状为 (n_samples, n_features).
"""
# 1. 初始化簇中心点
self.centroids = self._initialize_centroids(X)
# 2. 迭代更新
for _ in range(self.max_iters):
# 2.1 分配数据点到簇
self.labels = self._assign_labels(X)
# 2.2 更新簇中心点
new_centroids = self._update_centroids(X)
# 2.3 检查是否收敛
if np.linalg.norm(new_centroids - self.centroids) < self.tol:
break
self.centroids = new_centroids
def predict(self, X):
"""
预测数据点所属的簇
Args:
X (numpy.ndarray): 数据集,形状为 (n_samples, n_features).
Returns:
numpy.ndarray: 每个数据点的簇标签,形状为 (n_samples,).
"""
if self.centroids is None:
raise Exception("Model not trained yet. Call fit() first.")
return self._assign_labels(X)
def plot_clusters(self, X):
"""
绘制聚类结果 (仅适用于二维数据)
Args:
X (numpy.ndarray): 数据集 (二维),形状为 (n_samples, 2).
"""
if X.shape[1] != 2:
raise ValueError("plot_clusters can only be used for 2D data.")
plt.figure(figsize=(8, 6))
plt.scatter(X[:, 0], X[:, 1], c=self.labels, cmap='viridis', s=50)
plt.scatter(self.centroids[:, 0], self.centroids[:, 1], marker='X', s=200, color='red', label='Centroids')
plt.title('K-Means Clustering')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.show()
生成示例数据 (二维数据)
np.random.seed(42)
n_samples = 300
X = np.concatenate([
np.random.normal(loc=[2, 2], scale=0.5, size=(n_samples // 3, 2)),
np.random.normal(loc=[8, 8], scale=0.5, size=(n_samples // 3, 2)),
np.random.normal(loc=[2, 8], scale=0.5, size=(n_samples // 3, 2))
])
创建 K-Means 模型并训练
kmeans = KMeans(n_clusters=3, max_iters=300, tol=1e-4, init_method=’kmeans++’)
kmeans.fit(X)
预测数据点所属的簇
labels = kmeans.predict(X)
打印簇中心点
print(“簇中心点:\n”, kmeans.centroids)
绘制聚类结果
kmeans.plot_clusters(X)
计算WCSS
def calculate_wcss(X, labels, centroids):
“””计算 WCSS (Within-Cluster Sum of Squares)
Args:
X (numpy.ndarray): 数据集.
labels (numpy.ndarray): 数据点的簇标签.
centroids (numpy.ndarray): 簇中心点.
Returns:
float: WCSS 值.
“””
wcss = 0
for k in range(len(centroids)):
cluster_points = X[labels == k]
if len(cluster_points) > 0:
wcss += np.sum((cluster_points – centroids[k])**2)
return wcss
wcss = calculate_wcss(X, labels, kmeans.centroids)
print(“WCSS:”, wcss)
“`
3. 代码详解
3.1 KMeans
类
-
__init__(self, n_clusters=2, max_iters=100, tol=1e-4, init_method='random')
:- 初始化方法,设置 K-Means 模型的参数:
n_clusters
:簇的数量(K 值)。max_iters
:最大迭代次数。tol
:簇中心点变化的容差,如果变化小于此值,则认为算法已收敛,停止迭代。init_method
:初始化簇中心点的方法,可以是'random'
(随机选择)或'kmeans++'
(使用 K-Means++ 算法)。
- 初始化
self.centroids
(簇中心点)和self.labels
(数据点所属簇标签)为None
。
- 初始化方法,设置 K-Means 模型的参数:
-
_initialize_centroids(self, X)
:- 私有方法,用于初始化簇中心点。
init_method='random'
:从数据集中随机选择 K 个数据点作为初始簇中心点。init_method='kmeans++'
:使用 K-Means++ 算法初始化簇中心点。K-Means++ 算法通过选择彼此距离较远的点作为初始簇中心点,可以提高聚类结果的质量和收敛速度。 具体做法是:- 随机选择第一个簇中心。
- 对于每个数据点,计算其与已选择的所有簇中心点的最短距离。
- 根据距离的平方作为权重,选择下一个簇中心点(距离越远的点,被选中的概率越大)。
- 重复步骤 2 和 3,直到选出 K 个簇中心点。
- 返回初始化的簇中心点。
-
_assign_labels(self, X)
:- 私有方法,为每个数据点分配簇标签。
- 计算每个数据点到每个簇中心点的欧氏距离。
- 将数据点分配给距离其最近的簇中心点所在的簇。
- 返回每个数据点的簇标签。
-
_update_centroids(self, X)
:- 私有方法,更新簇中心点。
- 对于每个簇,计算该簇中所有数据点的平均值(即坐标的平均值)。
- 将该平均值作为新的簇中心点。
- 如果出现空簇(没有数据点被分配到该簇),则重新随机初始化该簇的中心点。 避免除以0的错误。
- 返回更新后的簇中心点。
-
fit(self, X)
:- 训练 K-Means 模型。
- 调用
_initialize_centroids()
方法初始化簇中心点。 - 迭代执行以下步骤:
- 调用
_assign_labels()
方法为每个数据点分配簇标签。 - 调用
_update_centroids()
方法更新簇中心点。 - 检查簇中心点是否收敛(变化小于
tol
)或达到最大迭代次数 (max_iters
)。如果满足其中一个条件,则停止迭代。
- 调用
-
predict(self, X)
:- 预测数据点所属的簇。
- 如果模型尚未训练(
self.centroids
为None
),则抛出异常。 - 调用
_assign_labels()
方法为每个数据点分配簇标签。 - 返回预测的簇标签。
-
plot_clusters(self, X)
:- 绘制聚类结果(仅支持二维数据可视化)
- 使用
matplotlib
库绘制散点图,其中:- 数据点根据其簇标签着色。
- 簇中心点用叉号(’X’)标记。
- 设置图表标题、坐标轴标签和图例。
3.2 示例用法
- 生成示例数据: 使用了
numpy.random.normal()
生成了三个二维高斯分布的数据点,模拟了三个簇。 - 创建并训练模型: 创建了一个
KMeans
对象,设置簇的数量为 3,并使用fit()
方法在示例数据上训练模型。 - 预测: 使用
predict()
方法预测每个数据点所属的簇。 - 打印并可视化结果:
- 打印计算出的簇中心点。
- 调用
plot_clusters
方法将聚类后的数据和簇中心点绘制出来。
- 计算WCSS:定义了一个函数
calculate_wcss
,计算并打印了WCSS。
4. 总结与扩展
4.1 总结
本文详细介绍了如何使用 Python 从零开始实现 K-Means 聚类算法,包括算法原理、完整代码和详细注释。通过阅读本文,你应该能够:
- 理解 K-Means 算法的核心思想和步骤。
- 掌握如何使用 Python 实现 K-Means 算法。
- 了解如何初始化簇中心点(随机初始化和 K-Means++)。
- 能够使用 K-Means 算法对数据进行聚类,并可视化聚类结果。
- 了解如何评估聚类结果(WCSS)。
4.2 扩展
-
K 值的选择:
- K-Means 算法需要预先指定簇的数量(K 值)。如何选择合适的 K 值是一个重要的问题。常用的方法有:
- 肘部法则(Elbow Method): 绘制 WCSS 随 K 值变化的曲线。通常,WCSS 会随着 K 值的增加而减小。选择 WCSS 下降速度明显变缓的那个 K 值(“肘部”)。
- 轮廓系数(Silhouette Coefficient): 轮廓系数衡量了数据点与其所属簇的相似度以及与其他簇的差异度。轮廓系数的取值范围为 [-1, 1],值越大表示聚类效果越好。可以计算不同 K 值下的平均轮廓系数,选择平均轮廓系数最大的 K 值。
- 基于业务知识: 根据具体的业务场景和需求,人为地设定一个合理的 K 值。
- K-Means 算法需要预先指定簇的数量(K 值)。如何选择合适的 K 值是一个重要的问题。常用的方法有:
-
距离度量:
- 本文使用了欧氏距离作为距离度量。K-Means 算法也可以使用其他距离度量,如曼哈顿距离、余弦相似度等。选择合适的距离度量取决于数据的类型和特点。
-
处理高维数据:
- 对于高维数据,K-Means 算法可能会受到“维度灾难”的影响。可以考虑使用降维技术(如 PCA)对数据进行预处理,或者使用专门针对高维数据的聚类算法(如谱聚类)。
-
处理大规模数据:
- 对于大规模数据,标准的 K-Means 算法可能效率较低。可以考虑使用 Mini-Batch K-Means 算法,该算法每次只使用部分数据来更新簇中心点,可以显著提高计算速度。
-
K-Means 的局限性:
- K-Means 算法假设簇是凸形的、大小相似的,并且对初始簇中心点的选择比较敏感。对于非凸形簇、大小差异较大的簇,K-Means 算法可能无法得到理想的聚类结果。
- 容易受到噪点和离群点的影响。
希望这篇文章能帮助你全面深入地理解 K-Means 聚类算法及其 Python 实现!