机器学习-CDC聚类算法

·

前置知识准备:

1、Gram矩阵:

定义:
$$
G = \left[ \langle v_i, v_j \rangle \right]_{i,j=1}^k
$$
G 是一个 k*k 的对称矩阵;

第 (i,j)(i,j)(i,j) 元素就是向量 vi 和 vj 的内积。

意义:用于刻画向量之间的角度与长度关系,Gram矩阵就是向量组的内积表

2、聚类算法:

传统聚类方法包括:

**(1)K均值聚类和K中心点聚类:**用质心划分数据。需要预先指定簇数 k,并且假设簇是“球形+均匀密度”,遇到非凸形簇会失败;另外它们对噪声点/离群点敏感。

**(2)层次聚类:**自底向上或自顶向下逐步合并/划分。时间复杂度高(通常 O(n²)),且一旦合并/分裂,不能回退,容易出错,所以不适合大数据集。

**(3)密度峰值的聚类(CDP):**选择高密度且据其他高密度点远的点作为中心点,然后将其他点归属到离自己最近的高密度点所属的簇。这种算法不需要预设簇数,同时结果可视化性强;但是无法解决弱联通问题,同时参数复杂。

**(4)基于密度的聚类(DBSCAN):**将密度超过阈值的连通点邻域进行聚类。能保留聚类形态的局部细节,但容易将稀疏聚类误判为噪声点,甚至在点分布不均匀时导致整个聚类被分割,也就是不能解决稀疏簇问题。

**(5)基于网格的聚类算法(WaveCluster):**把数据空间转化为不同分辨率的网格表示,再在变换后的空间里识别高密度区域。弱连接的聚类难以有效分离,低密度聚类边界也常被误判为噪声点。

**(6)基于子空间的聚类算法(CLIQUE):**和(5)一样使用网格映射技术,在低维空间里找到稠密单元,再逐层组合扩展到高维子空间。缺点也相似。

**(7)局部引力聚类(LGC):**类比物理中的“引力”概念,提出基于均值漂移的两种度量指标——中心性(CE)和协调性(CO),用于衡量邻近点的局部吸引力与均值漂移方向的一致性,直观性强,且抗噪作用强,但是稀疏聚类中的内部点检测存在困难,另外,算法是模拟迭代的方式,可能需要多次更新才能收敛,自身的计算复杂度很高(O(n²))。

**(8)基于密度的度量方法(RKNN,逆向K近邻):**通过统计K近邻中将某点视为成员的对象数量来实现,如果 p 出现在很多点的 KNN 集合里,说明它处在一个“高密度区”;反之就说明它处于“低密度区”。但可能无法有效识别低密度区域的边界(k值过大,k值即考虑k个最近邻居)或结果过于局部,受噪声干扰大(k值过小)。

算法分析:

核心思路是先检测聚类的边界点,再将由外围边界点生成的封闭笼状结构内的内部点进行连接。具体而言,聚类内部点通常会被其K近邻点全方位包围,而边界点仅包含特定方向范围内的邻近点。我们利用这一差异,通过计算K近邻点的方向一致性来衡量局部中心性,从而区分内部点与边界点。因此,CDC算法能有效避免跨聚类连接,并分离出弱连接的聚类。同时,由于采用K近邻搜索不涉及点密度的邻近点,该方法还能保持稀疏聚类的完整性。

优点:

1、能处理异质密度数据

2、弱连通簇分离能力强,在面对弱连通簇与异质密度数据时鲁棒性好

3、簇形状灵活

4、边界点处理自然

缺点:

1、计算复杂度高:

需要计算 KNN + 邻居角度分布(或高维凸包 + 单形体积),高维情况下凸包计算复杂度非常高,O(n^d) 最坏情况不可用。实际复杂度常为 O(n·k²),比 DBSCAN(近似 O(n log n))要慢。

2、参数敏感:需要设定:

  • k(邻居数):太小 → 噪声影响大;太大 → 稀疏簇被淹没。
  • ratio(内部点比例):过大 → 边界点太少;过小 → 核心簇不稳定。

没有普适默认值,需要调参。

3、高维噪声问题:

  • 在高维空间,点都稀疏,“方向均匀性”度量容易失效。
  • 凸包计算容易数值不稳定,代码里不得在失败时直接判边界点,结果可能偏差。

代码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import math
import numpy as np
from sklearn.neighbors import NearestNeighbors
from scipy.special import gamma
from scipy.spatial import ConvexHull

def CDC(k_num, ratio, X):
'''
CDC 聚类算法实现
输入:
k_num : int, 最近邻数量 k
ratio : float, 内部点比例阈值 (0~1之间)
X : ndarray, (num, d),样本数据
输出:
cluster : ndarray, 每个点所属的簇标签 (1..C)
'''
[num, d] = X.shape # num: 样本数, d: 维度

# KNN 搜索,得到每个点的 k 个最近邻下标
nbrs = NearestNeighbors(n_neighbors=k_num+1, algorithm='ball_tree').fit(X)
indices = nbrs.kneighbors(X, return_distance=False)
# indices是kneighbors默认会返回的两个矩阵之一(分别是distances和indices,distance存储的是距离,这里将其禁用了),存储的是最邻近的邻居(包括本身)
get_knn = indices[:, 1:k_num+1] # 去掉自身(第0个邻居)

angle_var = np.zeros(num) # 存储每个点的方向方差指标

# 计算方向中心性
if (d == 2): # 二维情况
angle = np.zeros((num, k_num))
for i in range(num): # 遍历每个点
for j in range(k_num): # 遍历每个邻居
delta_x = X[get_knn[i, j], 0] - X[i, 0]
delta_y = X[get_knn[i, j], 1] - X[i, 1]
# 计算极角,分情况避免除零
if delta_x == 0:
if delta_y == 0:
angle[i, j] = 0
elif delta_y > 0:
angle[i, j] = math.pi / 2
else:
angle[i, j] = 3 * math.pi / 2
elif delta_x > 0:
if math.atan(delta_y / delta_x) >= 0:
angle[i, j] = math.atan(delta_y / delta_x)
else:
angle[i, j] = 2 * math.pi + math.atan(delta_y / delta_x)
else:
angle[i, j] = math.pi + math.atan(delta_y / delta_x)

# 计算角度间隔方差,越均匀则越像内部点
for i in range(num):
angle_order = sorted(angle[i, :]) # 邻居角度排序

for j in range(k_num - 1):
point_angle = angle_order[j + 1] - angle_order[j]
angle_var[i] += pow(point_angle - 2 * math.pi / k_num, 2)

# 首尾回绕的夹角
point_angle = angle_order[0] - angle_order[k_num - 1] + 2 * math.pi
angle_var[i] += pow(point_angle - 2 * math.pi / k_num, 2)

angle_var[i] /= k_num

# 归一化
angle_var = angle_var / ((k_num - 1) * 4 * pow(math.pi, 2) / pow(k_num, 2))

else: # 高维情况
#在高维里,圆周被替换成球面(d 维空间的单位球面),于是要检查的内容变成了:邻居点在球面上的覆盖是否均匀。
for i in range(num):
try:
# 取邻居相对向量
dif_x = X[get_knn[i], :] - X[i, :]
# 标准化映射
map_x = np.linalg.inv(np.diag(np.sqrt(np.diag(np.dot(dif_x, dif_x.T))))) @ dif_x
'''
具体步骤:
1、先算 dif_x @ dif_x.T → 得到 k*k 的 Gram 矩阵(前置知识1)
2、取其对角线 → 相当于每个向量的平方长度
3、开根号 → 得到长度
4、取对角矩阵的逆 → 相当于对每个向量除以自己的模长
5、右乘 dif_x → 得到一组归一化后的向量
换句话说:把邻居点投射到近似单位球面上
'''
# 计算凸包
hull = ConvexHull(map_x)
'''
hull 这个对象里存有很多属性:
hull.vertices :凸包的顶点索引
hull.simplices :凸包的“面片”(由哪些点组成的 (d-1)-单形)
(d-单形:d 维空间里,由 (d+1) 个点构成的最小凸集)
'''
simplex_num = len(hull.simplices) # 统计凸包的面片数量(也就是多少个单形)
simplex_vol = np.zeros(simplex_num)# 准备存放每个单形体积

for j in range(simplex_num):
simplex_coord = map_x[hull.simplices[j], :]
# 单形体积近似
simplex_vol[j] = np.sqrt(max(0, np.linalg.det(np.dot(simplex_coord, simplex_coord.T)))) / gamma(d-1)
'''
np.dot(simplex_coord, simplex_coord.T) = 单形顶点的 Gram 矩阵,它的行列式(det)代表这些顶点向量张成的平行体的体积平方
max(0,x)是防止浮点误差变为负数
在几何体积公式里,单形体积通常要除以阶乘,这里用了 gamma(gamma函数是阶乘的推广,gamma(n)=(n-1)!) 来通用化
'''
# 方差作为指标
angle_var[i] = np.var(simplex_vol)

except Exception as e:
# 出错时设为边界点倾向
angle_var[i] = 1

# 内部点/边界点划分
sort_dcm = sorted(angle_var)
T_DCM = sort_dcm[math.ceil(num*ratio)] # 阈值
ind = np.zeros(num) # 标记数组:1=内部点,0=边界点
for i in range(num):
if angle_var[i] < T_DCM:
ind[i] = 1

# 计算最近的跨类距离或索引
near_dis = np.zeros(num)
for i in range(num):
knn_ind = ind[get_knn[i, :]] # KNN 的点类型
if ind[i] == 1: # 内部点
if 0 in knn_ind: # 邻居有边界点
bdpts_ind = np.where(knn_ind == 0)
bd_id = get_knn[i, bdpts_ind[0][0]]
near_dis[i] = math.sqrt(sum(pow((X[i, :] - X[bd_id, :]), 2)))
else: # 全局找最近边界点
near_dis[i] = float("inf")
for j in range(num):
if ind[j] == 0:
temp_dis = math.sqrt(sum(pow((X[i, :] - X[j, :]), 2)))
if temp_dis < near_dis[i]:
near_dis[i] = temp_dis
else: # 边界点
if 1 in knn_ind: # 邻居有内部点
bdpts_ind = np.where(knn_ind == 1)
bd_id = get_knn[i, bdpts_ind[0][0]]
near_dis[i] = bd_id # 存索引
else: # 全局找最近内部点
mark_dis = float("inf")#先赋值为无穷大,方便查找替换
for j in range(num):
if ind[j] == 1:
temp_dis = math.sqrt(sum(pow((X[i, :] - X[j, :]), 2)))
if temp_dis < mark_dis:
mark_dis = temp_dis
near_dis[i] = j

# 内部点聚类 (基于连通性)
cluster = np.zeros(num) # 簇标签
mark = 1
for i in range(num):
if ind[i] == 1 and cluster[i] == 0:
cluster[i] = mark
for j in range(num):
if ind[j] == 1 and math.sqrt(sum(pow((X[i, :] - X[j, :]), 2))) <= near_dis[i] + near_dis[j]:
if cluster[j] == 0:
cluster[j] = cluster[i]
else:
# 如果 j 已经在别的簇,则把那个簇并入当前簇
temp_cluster = cluster[j]
temp_ind = np.where(cluster == temp_cluster)
cluster[temp_ind] = cluster[i]
mark += 1

# 边界点挂到最近内部点的簇
for i in range(num):
if ind[i] == 0:
cluster[i] = cluster[int(near_dis[i])]

# 簇标签重新压缩为 1..C
mark = 1
storage = np.zeros(num)
for i in range(num):
if cluster[i] in storage:
temp_ind = np.where(storage == cluster[i])
cluster[i] = cluster[temp_ind[0][0]]
else:
storage[i] = cluster[i]
cluster[i] = mark
mark += 1

return cluster


机器学习-CDC聚类算法
http://example.com/2025/09/10/机器学习-CDC聚类算法/
作者
oxygen
发布于
2025年9月10日
许可协议