机器学习-增量更新的流数据聚类

增量更新的流数据聚类


A) 通用 Density-Grid 流水线(对应综述中的典型网格法)

流程图

flowchart TD
    S["新样本 x_t 到达"] --> N["归一化"]
    N --> Q["量化到网格单元"]
    Q --> U["更新格子密度(含时间衰减)"]
    U --> T{"密度是否超过阈值?"}
    T -- "是" --> C1["标记为核心格"]
    T -- "否"  --> C2["标记为稀疏/噪声候选"]
    C1 --> A["连接相邻核心格"]
    C2 --> A
    A --> CC["提取连通分量 → 聚类ID"]
    CC --> O["输出 x_t 的聚类结果"]

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import numpy as np
from collections import defaultdict, deque

class GridStreamClustering:
"""
通用 Density-Grid 聚类(在线):
- 等宽网格量化;
- 指数衰减密度;
- 核心格与邻接连通分量形成簇;
- 到达即刻返回该点的簇ID(若为噪声则返回 -1)。
"""
def __init__(self, bounds, cell_size, lambda_decay=0.01, core_threshold=3.0,
neighbor_conn=1):
"""
bounds: (d, 2) 每维(min,max);cell_size: 每维网格边长(标量或向量)
lambda_decay: 衰减系数;core_threshold: 判定核心格的密度阈值
neighbor_conn: 邻接半径(曼哈顿距离<=1 表示6/8邻接)
"""
self.bounds = np.asarray(bounds)
self.d = self.bounds.shape[0]
self.size = np.asarray(cell_size) if np.ndim(cell_size) else np.full(self.d, cell_size)
self.decay = lambda_decay
self.th = core_threshold
self.conn = neighbor_conn
# 网格仓库:key=tuple(index), val={"density":float, "time":last_update}
self.cells = {}
self.t = 0

def _coord_to_cell(self, x):
# 将 x 量化为网格索引
idx = np.floor((x - self.bounds[:, 0]) / self.size).astype(int)
return tuple(idx)

def _decay_to_now(self, cell):
# 对指定格子做时间衰减
last_t = cell.get("time", self.t)
dt = self.t - last_t
if dt > 0:
cell["density"] *= np.exp(-self.decay * dt)
cell["time"] = self.t

def _neighbors(self, g):
# 生成邻接网格索引(曼哈顿<=conn)
d = self.d
base = np.array(g)
for offset in np.ndindex(*([2*self.conn+1]*d)):
off = np.array(offset) - self.conn
if np.abs(off).sum() <= self.conn:
if not np.all(off == 0):
yield tuple(base + off)

def _cell(self, g):
c = self.cells.get(g)
if c is None:
c = {"density": 0.0, "time": self.t}
self.cells[g] = c
else:
self._decay_to_now(c)
return c

def partial_fit_predict(self, x):
"""在线处理单点并返回聚类ID(-1 表示暂为噪声)。"""
self.t += 1
x = np.asarray(x)
g = self._coord_to_cell(x)
c = self._cell(g)
c["density"] += 1.0 # 简单计数 + 衰减视为密度

# 判定核心格集合
core = set()
for key, cell in self.cells.items():
self._decay_to_now(cell)
if cell["density"] >= self.th:
core.add(key)

if not core:
return -1

# 在核心格子子图上做连通分量;可缓存以提速(此处演示写法)
# BFS 连通
visited = set()
comp_id = {}
cid = 0
for node in core:
if node in visited:
continue
cid += 1
q = deque([node])
visited.add(node)
comp_id[node] = cid
while q:
u = q.popleft()
for v in self._neighbors(u):
if v in core and v not in visited:
visited.add(v)
comp_id[v] = cid
q.append(v)

# 将当前点所在格子的连通分量作为聚类ID
return comp_id.get(g, -1)

# 实现备注:
# 1) 生产中会对 core 图连通做缓存 + 增量更新;
# 2) density 可用“到达率/核密度估计/加权核”替代;
# 3) 也可加入空格修剪策略,周期性清理 density 很小的格。

B) CEDGM(密度网格 + 核心微簇 + 宏簇相交)

流程图

flowchart TD
    X["点 x_t 到达"] --> Q["量化到网格"]
    Q --> G["更新网格密度 / 异常缓冲"]
    G --> J{"是否落入某微簇的核区或壳层?"}
    J -- "核区" --> U1["更新该微簇中心/计数/半径"]
    J -- "壳层" --> U2["吸纳并检查是否变核心"]
    J -- "否"   --> O1["作为异常候选或新建微簇"]
    U1 --> H["检查微簇之间相交"]
    U2 --> H
    O1 --> H
    H --> M["构建微簇相交图"]
    M --> C["连通分量 → 宏簇"]
    C --> Y["输出所属宏簇ID"]

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import numpy as np
from collections import defaultdict, deque

class CEDGM:
"""
CEDGM 轻量实现:
- 网格做粗粒度压缩 + 异常缓存;
- CMC (Core Micro-Cluster) 维护中心、半径 r0、计数;
- 核区 r<=r0/2,壳层 r0/2<r<=r0;
- 两 CMC 若“核-壳相交”则连边,连通分量作为宏簇。
"""
def __init__(self, bounds, cell_size, lambda_decay=0.01, core_th=5,
init_radius=0.5, max_outlier_age=200):
self.bounds = np.asarray(bounds)
self.size = np.asarray(cell_size) if np.ndim(cell_size) else np.full(self.bounds.shape[0], cell_size)
self.decay = lambda_decay
self.core_th = core_th
self.init_r = init_radius
self.max_age = max_outlier_age
self.t = 0
# 网格密度仓库
self.grid = {}
# 微簇列表:每个元素 dict(center, r0, N, last)
self.cmcs = []

def _cell_index(self, x):
return tuple(np.floor((x - self.bounds[:,0]) / self.size).astype(int))

def _update_grid(self, g):
c = self.grid.get(g)
if c is None:
self.grid[g] = {"rho":1.0, "t":self.t}
else:
dt = self.t - c["t"]
if dt>0:
c["rho"] *= np.exp(-self.decay*dt)
c["rho"] += 1.0
c["t"] = self.t

def _assign_to_cmc(self, x):
if not self.cmcs:
# 新建微簇
self.cmcs.append({"center":x.copy(), "r0":self.init_r, "N":1.0, "last":self.t})
return 0, "new"
dists = [np.linalg.norm(x - c["center"]) for c in self.cmcs]
j = int(np.argmin(dists))
c = self.cmcs[j]
r = dists[j]
# 判定核/壳/外
if r <= c["r0"]/2:
c["N"] += 1.0
eta = 1.0 / c["N"]
c["center"] = (1-eta)*c["center"] + eta*x
c["last"] = self.t
return j, "core"
elif r <= c["r0"]:
c["N"] += 1.0
eta = 1.0 / c["N"]
c["center"] = (1-eta)*c["center"] + eta*x
# 轻微扩半径以兼容漂移
c["r0"] = max(c["r0"], r*1.05)
c["last"] = self.t
return j, "shell"
else:
# 视为异常:新建微簇
self.cmcs.append({"center":x.copy(), "r0":self.init_r, "N":1.0, "last":self.t})
return len(self.cmcs)-1, "outlier"

def _cmc_intersect(self, i, j):
a, b = self.cmcs[i], self.cmcs[j]
d = np.linalg.norm(a["center"] - b["center"])
return (d <= a["r0"]) or (d <= b["r0"]) or (d <= (a["r0"]/2 + b["r0"]))

def _macro_clusters(self):
n = len(self.cmcs)
# 构造交集图并取连通分量
g = [[] for _ in range(n)]
for i in range(n):
for j in range(i+1, n):
if self._cmc_intersect(i, j):
g[i].append(j)
g[j].append(i)
comp = [-1]*n
cid = 0
for i in range(n):
if comp[i] != -1: continue
cid += 1
q = deque([i])
comp[i] = cid
while q:
u = q.popleft()
for v in g[u]:
if comp[v]==-1:
comp[v]=cid
q.append(v)
return comp

def partial_fit_predict(self, x):
self.t += 1
x = np.asarray(x)
g = self._cell_index(x)
self._update_grid(g)
idx, st = self._assign_to_cmc(x)
comp = self._macro_clusters()
# 返回所属宏簇ID
return comp[idx]

C) DenStream(指数衰减 + 核心/潜在/离群 微簇)

流程图

flowchart TD
    X["点 x_t 到达"] --> A["寻找最近微簇"]
    A --> B{"距离是否 ≤ ε?"}
    B -- "是" --> U["吸纳并(带衰减)更新中心/权重/半径"]
    B -- "否" --> O["新建离群微簇"]
    U --> P{"权重 ≥ β 且 半径 ≤ ε?"}
    P -- "是" --> C["标记为核心/潜在微簇"]
    P -- "否" --> R["保持为潜在/离群"]
    O --> S["周期修剪低权重离群微簇"]
    C --> Y["输出簇ID(可离线合并)"]
    R --> Y
    S --> Y

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import numpy as np
from collections import deque

class DenStream:
"""
简化 DenStream:
- 指数衰减: w(t) = w(t-1)*exp(-lambda*dt) + 1
- 分类微簇: core / potential / outlier 依据权重阈值 β 与半径 ε
- 周期修剪: 对低权重的 outlier 删除
"""
def __init__(self, eps=0.5, beta=10.0, lambd=0.01, cleanup_period=100):
self.eps = eps
self.beta = beta
self.lambd = lambd
self.period = cleanup_period
self.t = 0
self.clusters = [] # 每个: {center, weight, radius, last, kind}

def _decay(self, c):
dt = self.t - c["last"]
if dt > 0:
c["weight"] *= np.exp(-self.lambd*dt)
c["last"] = self.t

def _nearest(self, x):
if not self.clusters:
return None, np.inf
d = [np.linalg.norm(x - c["center"]) for c in self.clusters]
j = int(np.argmin(d))
return j, d[j]

def _cleanup(self):
# 删除长期低权重的 outlier
keep = []
for c in self.clusters:
self._decay(c)
if c["kind"] == "outlier" and c["weight"] < 1.0:
continue
keep.append(c)
self.clusters = keep

def partial_fit_predict(self, x):
self.t += 1
x = np.asarray(x)
j, dist = self._nearest(x)
if j is None or dist > self.eps:
# 新建 outlier 微簇
self.clusters.append({
"center": x.copy(),
"weight": 1.0,
"radius": 0.0,
"last": self.t,
"kind": "outlier"
})
cid = len(self.clusters) - 1
else:
# 吸纳
c = self.clusters[j]
self._decay(c)
c["weight"] += 1.0
eta = 1.0 / c["weight"]
c["center"] = (1-eta)*c["center"] + eta*x
c["radius"] = max(c["radius"], dist)
c["kind"] = "core" if (c["weight"] >= self.beta and c["radius"]<=self.eps) else "potential"
cid = j
if self.t % self.period == 0:
self._cleanup()
# 以微簇索引近似作为簇ID(生产中应对 core 微簇做连通/离线聚合)
return cid

D) HPStream(Projected Clustering with Fading)

流程图

flowchart TD
    X["点 x_t 到达"] --> W["为各簇衰减更新均值/方差/权重"]
    W --> D["计算各维重要性(1/方差)"]
    D --> L["为每簇选择前 l 个相关维"]
    L --> A["在所选子空间上计算与中心距离"]
    A --> J{"是否在吸纳阈值内?"}
    J -- "是" --> U["吸纳并增量更新统计量"]
    J -- "否" --> N["新建小簇或缓冲,定期合并"]
    U --> O["输出当前簇ID"]
    N --> O

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import numpy as np

class HPStream:
"""
HPStream 风格实现:
- 维护每簇衰减统计: 重心、方差、权重;
- 以方差倒数为维重要性,选 top-l 维作为该簇的子空间;
- 在子空间上做距离/吸纳;
- 简化了离线阶段(只做小簇合并门槛)。
"""
def __init__(self, l=5, eps=1.0, lambd=0.01, min_weight=5.0):
self.l = l
self.eps = eps
self.lambd = lambd
self.minw = min_weight
self.t = 0
self.clusters = [] # {mu, var, w, last}

def _decay(self, c):
dt = self.t - c["last"]
if dt > 0:
factor = np.exp(-self.lambd * dt)
c["w"] *= factor
# 均值保持;方差向 0 平移(简化:按同因子衰减)
c["var"] *= factor
c["last"] = self.t

def _subspace_dims(self, c):
# 重要性 ~ 1 / (var + 1e-6)
score = 1.0 / (c["var"] + 1e-6)
return np.argsort(-score)[:self.l]

def _dist_subspace(self, x, c, dims):
d = x[dims] - c["mu"][dims]
return np.linalg.norm(d)

def partial_fit_predict(self, x):
self.t += 1
x = np.asarray(x)
if not self.clusters:
self.clusters.append({
"mu": x.copy(),
"var": np.ones_like(x)*1e-2,
"w": 1.0,
"last": self.t
})
return 0
# 尝试吸纳最近簇(在其子空间)
best, bd = None, np.inf
for i, c in enumerate(self.clusters):
self._decay(c)
dims = self._subspace_dims(c)
d = self._dist_subspace(x, c, dims)
if d < bd:
bd, best = d, i
if bd <= self.eps:
c = self.clusters[best]
c["w"] += 1.0
eta = 1.0 / c["w"]
delta = x - c["mu"]
c["mu"] += eta * delta
c["var"] = (1-eta)*c["var"] + eta*(delta**2)
return best
else:
# 新建小簇
self.clusters.append({
"mu": x.copy(),
"var": np.ones_like(x)*1e-2,
"w": 1.0,
"last": self.t
})
return len(self.clusters)-1

E) EDSSC(动态稀疏子空间聚类 for 演化高维流)

流程图

flowchart TD
    S["静态缓冲数据 X0"] --> L["Lasso 自表达求 C"]
    L --> W["构亲和矩阵(|C|+|C^T|)并谱聚类"]
    W --> B["每簇做 PCA 得到子空间基 U_k"]
    B --> R["进入在线阶段"]
    R --> X["新样本 x_t"]
    X --> E["到各子空间的残差 r_k"]
    E --> D{"min(r_k) 是否 < 阈值 τ?"}
    D -- "是" --> A["分配给子空间 k,并做增量 PCA 更新"]
    D -- "否" --> N["创建新子空间 / 触发分裂"]
    A --> O["输出子空间ID"]
    N --> O

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import numpy as np
from sklearn.linear_model import Lasso
from sklearn.cluster import SpectralClustering

class EDSSC:
"""
EDSSC 风格实现:
- 预热窗口做 Lasso 自表达 + 谱聚类 -> 初始子空间;
- 在线阶段用残差到子空间基的最小值进行分配;
- 子空间基用简易增量PCA更新(Oja风格)。
"""
def __init__(self, alpha=0.01, n_init_subspaces=5, subspace_rank=5, tau=0.5, lr=0.05):
self.alpha = alpha
self.K = n_init_subspaces
self.r = subspace_rank
self.tau = tau
self.lr = lr
self.bases = [] # list of U_k (d x r)
self.labels_ = None

@staticmethod
def _normalize(X):
n = np.linalg.norm(X, axis=0) + 1e-8
return X / n

def _self_expressive(self, X):
# 求解 X ≈ X C, 对每列做 Lasso(排除自身)
n = X.shape[1]
C = np.zeros((n, n))
for i in range(n):
Xi = np.delete(X, i, axis=1)
yi = X[:, i]
lasso = Lasso(alpha=self.alpha, fit_intercept=False, max_iter=2000)
lasso.fit(Xi, yi)
coef = lasso.coef_
C[:i, i] = coef[:i]
C[i+1:, i] = coef[i:]
return C

def _spectral_cluster(self, C, K):
W = np.abs(C) + np.abs(C.T)
sc = SpectralClustering(n_clusters=K, affinity='precomputed', assign_labels='kmeans')
y = sc.fit_predict(W)
return y

def _init_bases(self, X, y):
d = X.shape[0]
self.bases = []
for k in range(self.K):
Xk = X[:, y==k]
if Xk.shape[1] < self.r:
# 不足 r 列时,做零填充
U = np.eye(d, self.r)
else:
# PCA: top-r 左奇异向量
U, _, _ = np.linalg.svd(Xk, full_matrices=False)
U = U[:, :self.r]
self.bases.append(U)

def fit_init(self, X0):
"""静态预热阶段。
X0: d x n0"""
X0 = self._normalize(X0)
C = self._self_expressive(X0)
y = self._spectral_cluster(C, self.K)
self._init_bases(X0, y)
self.labels_ = y
return y

def _residuals(self, x):
rs = []
for U in self.bases:
proj = U @ (U.T @ x)
rs.append(np.linalg.norm(x - proj))
return np.array(rs)

def _update_basis(self, k, x):
# Oja 风格增量PCA: 对 U 做一小步朝向 x 的投影子空间更新
U = self.bases[k]
proj = U @ (U.T @ x)
err = x - proj
U_new = U + self.lr * np.outer(err, (U.T @ x))
# 正交化
q, _ = np.linalg.qr(U_new)
self.bases[k] = q[:, :self.r]

def partial_fit_predict(self, x):
x = x / (np.linalg.norm(x) + 1e-8)
rs = self._residuals(x)
k = int(np.argmin(rs))
if rs[k] < self.tau:
self._update_basis(k, x)
return k
else:
# 新子空间:以 x 为起点(若需要可设最大子空间数/合并策略)
d = x.shape[0]
U = np.zeros((d, self.r))
U[:, 0] = x / (np.linalg.norm(x) + 1e-8)
for j in range(1, self.r):
# 随机正交补(简易)
v = np.random.randn(d)
v -= U[:, :j] @ (U[:, :j].T @ v)
U[:, j] = v / (np.linalg.norm(v) + 1e-8)
self.bases.append(U)
self.K += 1
return self.K - 1

F) 流数据聚类总结

事实上,这几个算法无非就是在线维护、动态插入并调整思路的几种不同的呈现形式,这种思路的确最受欢迎,在这种思路上逐渐改动确实是较为省力且讨巧的方法。自然,这些算法存在共性问题:

稳定–敏捷两难(遗忘/衰减的权衡)
衰减快:对新模式灵敏但簇抖动、遗忘历史;衰减慢:稳定但跟不上漂移,出现滞后与过时簇占内存。

超参敏感且在线难调
半径/密度阈值、网格粒度、衰减系数、子空间维数/秩、残差阈值等对结果影响巨大,而在线场景缺少可靠的即时监督信号来调参。

可扩展性与内存控制

高频流 + 高维度 → 网格/微簇数量暴涨;自表达/谱分解在长时运行有累积成本。

每个算法也存在着不同的适应场景与独特优缺点:

DenStream/网格法在早期样本少、密度不稳定;EDSSC 需要预热窗口做自表达与谱聚类;CEDGM/网格法需要维护核心格或CMC的连通/相交图;若簇数量大或频繁变化,增量更新可能成为瓶颈…

所以,我们能做些什么以求改进呢?我也不清楚(要是我知道就不在这敲这篇不知道有没有用处的blog了…


机器学习-增量更新的流数据聚类
http://example.com/2025/10/27/机器学习-增量更新的流数据聚类/
作者
oxygen
发布于
2025年10月27日
许可协议