1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
| import math import numpy as np from sklearn.neighbors import NearestNeighbors from scipy.special import gamma from scipy.spatial import ConvexHull
def CDC(k_num, ratio, X): ''' CDC 聚类算法实现 输入: k_num : int, 最近邻数量 k ratio : float, 内部点比例阈值 (0~1之间) X : ndarray, (num, d),样本数据 输出: cluster : ndarray, 每个点所属的簇标签 (1..C) ''' [num, d] = X.shape nbrs = NearestNeighbors(n_neighbors=k_num+1, algorithm='ball_tree').fit(X) indices = nbrs.kneighbors(X, return_distance=False) get_knn = indices[:, 1:k_num+1]
angle_var = np.zeros(num)
if (d == 2): angle = np.zeros((num, k_num)) for i in range(num): for j in range(k_num): delta_x = X[get_knn[i, j], 0] - X[i, 0] delta_y = X[get_knn[i, j], 1] - X[i, 1] if delta_x == 0: if delta_y == 0: angle[i, j] = 0 elif delta_y > 0: angle[i, j] = math.pi / 2 else: angle[i, j] = 3 * math.pi / 2 elif delta_x > 0: if math.atan(delta_y / delta_x) >= 0: angle[i, j] = math.atan(delta_y / delta_x) else: angle[i, j] = 2 * math.pi + math.atan(delta_y / delta_x) else: angle[i, j] = math.pi + math.atan(delta_y / delta_x)
for i in range(num): angle_order = sorted(angle[i, :])
for j in range(k_num - 1): point_angle = angle_order[j + 1] - angle_order[j] angle_var[i] += pow(point_angle - 2 * math.pi / k_num, 2)
point_angle = angle_order[0] - angle_order[k_num - 1] + 2 * math.pi angle_var[i] += pow(point_angle - 2 * math.pi / k_num, 2)
angle_var[i] /= k_num
angle_var = angle_var / ((k_num - 1) * 4 * pow(math.pi, 2) / pow(k_num, 2))
else: for i in range(num): try: dif_x = X[get_knn[i], :] - X[i, :] map_x = np.linalg.inv(np.diag(np.sqrt(np.diag(np.dot(dif_x, dif_x.T))))) @ dif_x ''' 具体步骤: 1、先算 dif_x @ dif_x.T → 得到 k*k 的 Gram 矩阵(前置知识1) 2、取其对角线 → 相当于每个向量的平方长度 3、开根号 → 得到长度 4、取对角矩阵的逆 → 相当于对每个向量除以自己的模长 5、右乘 dif_x → 得到一组归一化后的向量 换句话说:把邻居点投射到近似单位球面上 ''' hull = ConvexHull(map_x) ''' hull 这个对象里存有很多属性: hull.vertices :凸包的顶点索引 hull.simplices :凸包的“面片”(由哪些点组成的 (d-1)-单形) (d-单形:d 维空间里,由 (d+1) 个点构成的最小凸集) ''' simplex_num = len(hull.simplices) simplex_vol = np.zeros(simplex_num)
for j in range(simplex_num): simplex_coord = map_x[hull.simplices[j], :] simplex_vol[j] = np.sqrt(max(0, np.linalg.det(np.dot(simplex_coord, simplex_coord.T)))) / gamma(d-1) ''' np.dot(simplex_coord, simplex_coord.T) = 单形顶点的 Gram 矩阵,它的行列式(det)代表这些顶点向量张成的平行体的体积平方 max(0,x)是防止浮点误差变为负数 在几何体积公式里,单形体积通常要除以阶乘,这里用了 gamma(gamma函数是阶乘的推广,gamma(n)=(n-1)!) 来通用化 ''' angle_var[i] = np.var(simplex_vol)
except Exception as e: angle_var[i] = 1
sort_dcm = sorted(angle_var) T_DCM = sort_dcm[math.ceil(num*ratio)] ind = np.zeros(num) for i in range(num): if angle_var[i] < T_DCM: ind[i] = 1
near_dis = np.zeros(num) for i in range(num): knn_ind = ind[get_knn[i, :]] if ind[i] == 1: if 0 in knn_ind: bdpts_ind = np.where(knn_ind == 0) bd_id = get_knn[i, bdpts_ind[0][0]] near_dis[i] = math.sqrt(sum(pow((X[i, :] - X[bd_id, :]), 2))) else: near_dis[i] = float("inf") for j in range(num): if ind[j] == 0: temp_dis = math.sqrt(sum(pow((X[i, :] - X[j, :]), 2))) if temp_dis < near_dis[i]: near_dis[i] = temp_dis else: if 1 in knn_ind: bdpts_ind = np.where(knn_ind == 1) bd_id = get_knn[i, bdpts_ind[0][0]] near_dis[i] = bd_id else: mark_dis = float("inf") for j in range(num): if ind[j] == 1: temp_dis = math.sqrt(sum(pow((X[i, :] - X[j, :]), 2))) if temp_dis < mark_dis: mark_dis = temp_dis near_dis[i] = j
cluster = np.zeros(num) mark = 1 for i in range(num): if ind[i] == 1 and cluster[i] == 0: cluster[i] = mark for j in range(num): if ind[j] == 1 and math.sqrt(sum(pow((X[i, :] - X[j, :]), 2))) <= near_dis[i] + near_dis[j]: if cluster[j] == 0: cluster[j] = cluster[i] else: temp_cluster = cluster[j] temp_ind = np.where(cluster == temp_cluster) cluster[temp_ind] = cluster[i] mark += 1
for i in range(num): if ind[i] == 0: cluster[i] = cluster[int(near_dis[i])]
mark = 1 storage = np.zeros(num) for i in range(num): if cluster[i] in storage: temp_ind = np.where(storage == cluster[i]) cluster[i] = cluster[temp_ind[0][0]] else: storage[i] = cluster[i] cluster[i] = mark mark += 1
return cluster
|