信息量计算的实现

(本文会用到的所有代码都在这里

由于决策树的生成算法中会用到各种定义下的信息量的计算,所以我们应该先把这些计算信息量相关的算法实现出来。注意到这些算法同样是在不断地进行计数工作,所以我们同样需要尽量尝试利用好上一章讲述过的 bincount 方法。由于我们是在决策树模型中调用这些算法的,所以数据预处理应该交由决策树来做、这里就只需要专注于算法本身。值得一提的是,这一套算法不仅能够应用在决策树中,在遇到任何其它需要计算信息量的场合时都能够进行应用

首先实现其基本结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import math
import numpy as np
class Cluster:
"""
初始化结构
self._x, self._y:记录数据集的变量
self._counters:类别向量的计数器,记录第i类数据的个数
self._sample_weight:记录样本权重的属性
self._con_chaos_cache, self._ent_cache, self._gini_cache:记录中间结果的属性
self._base:记录对数的底的属性
"""
def __init__(self, x, y, sample_weight=None, base=2):
# 这里我们要求输入的是Numpy向量(矩阵)
self._x, self._y = x.T, y
# 利用样本权重对类别向量y进行计数
if sample_weight is None:
self._counters = np.bincount(self._y)
else:
self._counters = np.bincount(self._y,
weights=sample_weight*len(sample_weight))
self._sample_weight = sample_weight
self._con_chaos_cache = self._ent_cache = self._gini_cache = None
self._base = base

接下来就需要定义计算不确定性的两个函数。由于一个 Cluster 只接受一份数据,所以其实总的不确定性只用计算一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 定义计算信息熵的函数
def ent(self, ent=None, eps=1e-12):
# 如果已经计算过且调用时没有额外给各类别样本的个数、就直接调用结果
if self._ent_cache is not None and ent is None:
return self._ent_cache
_len = len(self._y)
# 如果调用时没有给各类别样本的个数,就利用结构本身的计数器来获取相应个数
if ent is None:
ent = self._counters
# 使用eps来让算法的数值稳定性更好
_ent_cache = max(eps, -sum(
[_c / _len * math.log(_c / _len, self._base) if _c != 0 else 0 for _c in ent]))
# 如果调用时没有给各类别样本的个数、就将计算好的信息熵储存下来
if ent is None:
self._ent_cache = _ent_cache
return _ent_cache
# 定义计算基尼系数的函数,和计算信息熵的函数很类似、所以略去注释
def gini(self, p=None):
if self._gini_cache is not None and p is None:
return self._gini_cache
if p is None:
p = self._counters
_gini_cache = 1 - np.sum((p / len(self._y)) ** 2)
if p is None:
self._gini_cache = _gini_cache
return _gini_cache

然后就需要定义计算的函数。从算法公式可以看出它们具有形式一致性,所以我们可以把它们的实现整合在一起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 定义计算和的函数
def con_chaos(self, idx, criterion="ent", features=None):
# 根据不同的准则、调用不同的方法
if criterion == "ent":
_method = lambda cluster: cluster.ent()
elif criterion == "gini":
_method = lambda cluster: cluster.gini()
# 根据输入获取相应维度的向量
data = self._x[idx]
# 如果调用时没有给该维度的取值空间features、就调用set方法获得该取值空间
# 由于调用set方法比较耗时,在决策树实现时应努力将features传入
if features is None:
features = set(data)
# 获得该维度特征各取值所对应的数据的下标
# 用self._con_chaos_cache记录下相应结果以加速后面定义的相关函数
tmp_labels = [data == feature for feature in features]
self._con_chaos_cache = [np.sum(_label) for _label in tmp_labels]
# 利用下标获取相应的类别向量
label_lst = [self._y[label] for label in tmp_labels]
rs, chaos_lst = 0, []
# 遍历各下标和对应的类别向量
for data_label, tar_label in zip(tmp_labels, label_lst):
# 获取相应的数据
tmp_data = self._x.T[data_label]
# 根据相应数据、类别向量和样本权重计算出不确定性
if self._sample_weight is None:
_chaos = _method(Cluster(tmp_data, tar_label, base=self._base))
else:
_new_weights = self._sample_weight[data_label]
_chaos = _method(Cluster(tmp_data, tar_label, _new_weights / np.sum(
_new_weights), base=self._base))
# 依概率加权,同时把各个初始条件不确定性记录下来
rs += len(tmp_data) / len(data) * _chaos
chaos_lst.append(_chaos)
return rs, chaos_lst

注意:如果仅仅是为了获得总的条件不确定性、是不用将划分后数据的各个部分的条件不确定记录下来的;之所以我们把它记录下来、是因为在决策树生成的过程里会用到这个中间变量。我们会在后面讲解决策树结构时进行相应的说明

最后需要定义计算信息增益的函数。我们将会实现涉及过的三种定义方法,而且由于它们同样具有形式一致性、所以它们的实现同样可以整合在一起:

1
2
3
4
5
6
7
8
9
10
11
12
# 定义计算信息增益的函数,参数get_chaos_lst用于控制输出
def info_gain(self, idx, criterion="ent", get_chaos_lst=False, features=None):
# 根据不同的准则、获取相应的“条件不确定性”
if criterion in ("ent", "ratio"):
_con_chaos, _chaos_lst = self.con_chaos(idx, "ent", features)
_gain = self.ent() - _con_chaos
if criterion == "ratio":
_gain /= self.ent(self._con_chaos_cache)
elif criterion == "gini":
_con_chaos, _chaos_lst = self.con_chaos(idx, "gini", features)
_gain = self.gini() - _con_chaos
return (_gain, _chaos_lst) if get_chaos_lst else _gain

考虑到二类问题的特殊性,我们需要定义专门处理二类问题的、计算信息增益相关的函数。它们大部分和以上定义的函数没有区别、代码也有大量重复,只是它会多传进一个代表二分标准的参数。为简洁,我们略去上文已经给出过的注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 定义计算二类问题条件不确定性的函数
# 参数tar即是二分标准,参数continuous则告诉我们该维度的特征是否连续
def bin_con_chaos(self, idx, tar, criterion="gini", continuous=False):
if criterion == "ent":
_method = lambda cluster: cluster.ent()
elif criterion == "gini":
_method = lambda cluster: cluster.gini()
data = self._x[idx]
# 根据二分标准划分数据,注意要分离散和连续两种情况讨论
tar = data == tar if not continuous else data < tar
tmp_labels = [tar, ~tar]
self._con_chaos_cache = [np.sum(_label) for _label in tmp_labels]
label_lst = [self._y[label] for label in tmp_labels]
rs, chaos_lst = 0, []
for data_label, tar_label in zip(tmp_labels, label_lst):
tmp_data = self._x.T[data_label]
if self._sample_weight is None:
_chaos = _method(Cluster(tmp_data, tar_label, base=self._base))
else:
_new_weights = self._sample_weight[data_label]
_chaos = _method(Cluster(tmp_data, tar_label, _new_weights / np.sum(
_new_weights), base=self._base))
rs += len(tmp_data) / len(data) * _chaos
chaos_lst.append(_chaos)
return rs, chaos_lst

定义计算二类问题信息增益的函数时,只需将之前定义过的、计算信息增益的函数中计算条件不确定性的函数替换成计算二类问题条件不确定性的函数即可

观众老爷们能赏个脸么 ( σ'ω')σ