作者:chen_h
微信号 & QQ:862251340
微信公众号:coderpai
(一)机器学习中的集成学习入门
(二)bagging 方法
(三)使用Python进行交易的随机森林算法
(四)Python中随机森林的实现与解释
(五)如何用 Python 从头开始实现 Bagging 算法
决策树是一种简单而强大的预测建模技术,但它们存在高方差。这意味着在给定不同的训练数据的情况下,树可以得到非常不同的结果。为了使决策树更加健壮并实现更好性能,我们会采用集成学习方法,其中一种是 Bagging 方法。
在本教程中,您将了解如何使用 Python从头开始使用决策树的 bagging 过程。完成本教程后,您将了解:
- 如何创建数据集的自举过程;
- 如何使用自举模型进行预测;
- 如何将 bagging 算法应用到你的预测模型中;
Bootstrap Aggregation 算法
Bootstrap 是一种有放回的数据采集方式。这还意味着一个新的数据集是从原来数据中进行随机采用得到的,并且会把数据进行放回,然后进行下一次采样。
当我们在估算一个非常庞大的数据集的时候,这种估算方式是非常好的。我们可以通过计算一个有限集合的均值从而来得到整个数据集的均值。这种方法我们一般都是和一些具有高方差的算法一起使用,比如决策树。我们通过对每个自举样本进行单独模型计算,然后输出多个模型结果的平均值。这种技术称为 bootstrap 或者 bagging。
方差意味着算法的性能对训练数据敏感,高方差表明训练数据的变化越多,算法的性能就越差。我们可以通过训练许多树并且取其预测的平均值,可以改善诸如未修剪的决策树之类的高方差机器学习算法的性能。模型取得的结果通常会优于单个决策树的表现。
除了提高性能之外,bagging 的另一个好处是它不会过度拟合问题,我们可以通过继续添加树木,知道达到最佳性能。
Sonar 数据集
在本教程中我们使用的是 Sonar 数据集。这是一个描述声呐信号从不同表面反弹的数据集。输入数据是由 60 个特征数据组成的,输出数据是一个二分类,来判断物体表面是岩石还是金属圆柱。数据一共有 208 条。这是一个非常简单的数据集。所有的输入变量都是连续的,取值在 0 到 1 之间。输出变量是 M(金属圆柱) 和 R(岩石),我们需要将这个分类结果转变成 1 和 0。数据我们通过 UCI Machine Learing 进行下载。下载链接:https://archive.ics.uci.edu/ml/datasets/Connectionist+Bench+(Sonar,+Mines+vs.+Rocks)
实战例子
本教程分为两部分:
- Bootstrap 采样;
- 声呐数据分析;
这些步骤提供了数据采样和算法编写的基本功能,我们可以学习bagging算法是如何进行基础工作的。
1. Bootstrap 采样
让我们首先深入了解 bootstrap 方法的工作原理。
我们可以通过从数据集中随机选择行数据,并将它们添加到新列表来创建数据集成为新样本。我们可以针对固定数量的行重复进行此操作,或者知道新数据集的大小与原始数据集的大小的比率达到我们的要求。我们每采集一次数据,都会进行放回,然后再次采集。
下面是一个名为 subsample() 的函数,它实现了这个过程。随机模块中的 randrange() 函数用于选择随机行索引,以便在循环的每次迭代中添加到样本中。样本的默认数量大小是原始数据集的大小。
def
subsample
(
dataset
,
ratio
=
1.0
)
:
sample
=
list
(
)
n_sample
=
round
(
len
(
dataset
)
*
ratio
)
while
len
(
sample
)
<
n_sample
:
index
=
randrange
(
len
(
dataset
)
)
sample
.
append
(
dataset
[
index
]
)
return
sample
我们可以使用这个函数来评估一个人造的数据集的平均值。
首先,我们创建一个包含 20 行,里面的数字时 0 到 9 之间的随机值,并且我们计算他们的平均值。
然后,我们可以制作原始数据集的自举样本集,我们不断重复这个过程,直到我们有一个均值列表,然后计算平均值。这个平均值跟我们整个样本的平均值是非常接近的。
下面列出了一个完整的示例。
每个自举样本是原始样本的 10 %,也就是 2 个样本。然后,我们通过创建原始数据集的 1个,10个,100个自举样本,计算他们的平均值,然后平均所有这些估计的平均值来进行实验。
from
random
import
seed
from
random
import
random
from
random
import
randrange
# Create a random subsample from the dataset with replacement
def
subsample
(
dataset
,
ratio
=
1.0
)
:
sample
=
list
(
)
n_sample
=
round
(
len
(
dataset
)
*
ratio
)
while
len
(
sample
)
<
n_sample
:
index
=
randrange
(
len
(
dataset
)
)
sample
.
append
(
dataset
[
index
]
)
return
sample
# Calculate the mean of a list of numbers
def
mean
(
numbers
)
:
return
sum
(
numbers
)
/
float
(
len
(
numbers
)
)
seed
(
1
)
# True mean
dataset
=
[
[
randrange
(
10
)
]
for
i
in
range
(
20
)
]
print
(
'True Mean: %.3f'
%
mean
(
[
row
[
0
]
for
row
in
dataset
]
)
)
# Estimated means
ratio
=
0.10
for
size
in
[
1
,
10
,
100
]
:
sample_means
=
list
(
)
for
i
in
range
(
size
)
:
sample
=
subsample
(
dataset
,
ratio
)
sample_mean
=
mean
(
[
row
[
0
]
for
row
in
sample
]
)
sample_means
.
append
(
sample_mean
)
print
(
'Samples=%d, Estimated Mean: %.3f'
%
(
size
,
mean
(
sample_means
)
)
)
运行该示例将打印我们要估计的原始数据平均值。
然后我们可以从各种不同数量的自举样本中看到估计的平均值。我们可以看到,通过 100 个样本,我们可以很好的估计平均值。
True
Mean
:
4.450
Samples
=
1
,
Estimated Mean
:
4.500
Samples
=
10
,
Estimated Mean
:
3.300
Samples
=
100
,
Estimated Mean
:
4.480
我们可以为每个子样本创建一个模型,而不是简单的计算平均值。
接下来,让我们看看如何组合多个 bootstrap 模型的预测。
2. 声呐数据集案例研究
在这一节中,我们将随机森林算法应用于声呐数据集。
首先,我们需要导入数据集,将字符串值转换为数值型,并将输出列从字符串转换为 0 和 1 的整数值。这是通过辅助函数 load_csv() ,str_column_to_float() 和 str_column_to_int() 来实现的,以便预处理数据集。
我们将使用 k-fold 交叉验证来估计学习模型在未知数据上的性能。这意味着我们将构建和评估 k 个模型,并将性能估计为平均模型误差。分类精度将评估每个模型,这些算法都在 cross_validation_split() ,accuracy_metric() 和 evaluate_algoritm() 函数中得到解决。
我们使用 CART 算法来实现我们的 bagging 过程,在实现的过程中我们还设计了一些辅助函数:test_split() 函数将数据集拆分成组,gini_index() 用于评估拆分点,get_split() 用于查找最佳拆分点,to_terminal(),split() 和 build_tree() 用于创建单个决策树,predict() 用于使用决策树进行预测,并使用前一步骤中描述的 subsample() 函数来创建训练的子样本训练集。
我们还开发了一个 bagging_predict() 函数,该函数负责使用每个决策树进行预测并将预测组合成单个返回值。这是 bagging 方法常用的一种模式。
最后,我们设计一个新的 bagging() 函数,负责创建训练数据集的样本,在每个样本上训练决策树,然后使用bagging() 列表对测试数据集进行预测。
下面给出了完整代码:
# Bagging Algorithm on the Sonar dataset
from
random
import
seed
from
random
import
randrange
from
csv
import
reader
# Load a CSV file
def
load_csv
(
filename
)
:
dataset
=
list
(
)
with
open
(
filename
,
'r'
)
as
file
:
csv_reader
=
reader
(
file
)
for
row
in
csv_reader
:
if
not
row
:
continue
dataset
.
append
(
row
)
return
dataset
# Convert string column to float
def
str_column_to_float
(
dataset
,
column
)
:
for
row
in
dataset
:
row
[
column
]
=
float
(
row
[
column
]
.
strip
(
)
)
# Convert string column to integer
def
str_column_to_int
(
dataset
,
column
)
:
class_values
=
[
row
[
column
]
for
row
in
dataset
]
unique
=
set
(
class_values
)
lookup
=
dict
(
)
for
i
,
value
in
enumerate
(
unique
)
:
lookup
[
value
]
=
i
for
row
in
dataset
:
row
[
column
]
=
lookup
[
row
[
column
]
]
return
lookup
# Split a dataset into k folds
def
cross_validation_split
(
dataset
,
n_folds
)
:
dataset_split
=
list
(
)
dataset_copy
=
list
(
dataset
)
fold_size
=
int
(
len
(
dataset
)
/
n_folds
)
for
i
in
range
(
n_folds
)
:
fold
=
list
(
)
while
len
(
fold
)
<
fold_size
:
index
=
randrange
(
len
(
dataset_copy
)
)
fold
.
append
(
dataset_copy
.
pop
(
index
)
)
dataset_split
.
append
(
fold
)
return
dataset_split
# Calculate accuracy percentage
def
accuracy_metric
(
actual
,
predicted
)
:
correct
=
0
for
i
in
range
(
len
(
actual
)
)
:
if
actual
[
i
]
==
predicted
[
i
]
:
correct
+=
1
return
correct
/
float
(
len
(
actual
)
)
*
100.0
# Evaluate an algorithm using a cross validation split
def
evaluate_algorithm
(
dataset
,
algorithm
,
n_folds
,
*
args
)
:
folds
=
cross_validation_split
(
dataset
,
n_folds
)
scores
=
list
(
)
for
fold
in
folds
:
train_set
=
list
(
folds
)
train_set
.
remove
(
fold
)
train_set
=
sum
(
train_set
,
[
]
)
test_set
=
list
(
)
for
row
in
fold
:
row_copy
=
list
(
row
)
test_set
.
append
(
row_copy
)
row_copy
[
-
1
]
=
None
predicted
=
algorithm
(
train_set
,
test_set
,
*
args
)
actual
=
[
row
[
-
1
]
for
row
in
fold
]
accuracy
=
accuracy_metric
(
actual
,
predicted
)
scores
.
append
(
accuracy
)
return
scores
# Split a dataset based on an attribute and an attribute value
def
test_split
(
index
,
value
,
dataset
)
:
left
,
right
=
list
(
)
,
list
(
)
for
row
in
dataset
:
if
row
[
index
]
<
value
:
left
.
append
(
row
)
else
:
right
.
append
(
row
)
return
left
,
right
# Calculate the Gini index for a split dataset
def
gini_index
(
groups
,
classes
)
:
# count all samples at split point
n_instances
=
float
(
sum
(
[
len
(
group
)
for
group
in
groups
]
)
)
# sum weighted Gini index for each group
gini
=
0.0
for
group
in
groups
:
size
=
float
(
len
(
group
)
)
# avoid divide by zero
if
size
==
0
:
continue
score
=
0.0
# score the group based on the score for each class
for
class_val
in
classes
:
p
=
[
row
[
-
1
]
for
row
in
group
]
.
count
(
class_val
)
/
size
score
+=
p
*
p
# weight the group score by its relative size
gini
+=
(
1.0
-
score
)
*
(
size
/
n_instances
)
return
gini
# Select the best split point for a dataset
def
get_split
(
dataset
)
:
class_values
=
list
(
set
(
row
[
-
1
]
for
row
in
dataset
)
)
b_index
,
b_value
,
b_score
,
b_groups
=
999
,
999
,
999
,
None
for
index
in
range
(
len
(
dataset
[
0
]
)
-
1
)
:
for
row
in
dataset
:
# for i in range(len(dataset)):
# row = dataset[randrange(len(dataset))]
groups
=
test_split
(
index
,
row
[
index
]
,
dataset
)
gini
=
gini_index
(
groups
,
class_values
)
if
gini
<
b_score
:
b_index
,
b_value
,
b_score
,
b_groups
=
index
,
row
[
index
]
,
gini
,
groups
return
{
'index'
:
b_index
,
'value'
:
b_value
,
'groups'
:
b_groups
}
# Create a terminal node value
def
to_terminal
(
group
)
:
outcomes
=
[
row
[
-
1
]
for
row
in
group
]
return
max
(
set
(
outcomes
)
,
key
=
outcomes
.
count
)
# Create child splits for a node or make terminal
def
split
(
node
,
max_depth
,
min_size
,
depth
)
:
left
,
right
=
node
[
'groups'
]
del
(
node
[
'groups'
]
)
# check for a no split
if
not
left
or
not
right
:
node
[
'left'
]
=
node
[
'right'
]
=
to_terminal
(
left
+
right
)
return
# check for max depth
if
depth
>=
max_depth
:
node
[
'left'
]
,
node
[
'right'
]
=
to_terminal
(
left
)
,
to_terminal
(
right
)
return
# process left child
if
len
(
left
)
<=
min_size
:
node
[
'left'
]
=
to_terminal
(
left
)
else
:
node
[
'left'
]
=
get_split
(
left
)
split
(
node
[
'left'
]
,
max_depth
,
min_size
,
depth
+
1
)
# process right child
if
len
(
right
)
<=
min_size
:
node
[
'right'
]
=
to_terminal
(
right
)
else
:
node
[
'right'
]
=
get_split
(
right
)
split
(
node
[
'right'
]
,
max_depth
,
min_size
,
depth
+
1
)
# Build a decision tree
def
build_tree
(
train
,
max_depth
,
min_size
)
:
root
=
get_split
(
train
)
split
(
root
,
max_depth
,
min_size
,
1
)
return
root
# Make a prediction with a decision tree
def
predict
(
node
,
row
)
:
if
row
[
node
[
'index'
]
]
<
node
[
'value'
]
:
if
isinstance
(
node
[
'left'
]
,
dict
)
:
return
predict
(
node
[
'left'
]
,
row
)
else
:
return
node
[
'left'
]
else
:
if
isinstance
(
node
[
'right'
]
,
dict
)
:
return
predict
(
node
[
'right'
]
,
row
)
else
:
return
node
[
'right'
]
# Create a random subsample from the dataset with replacement
def
subsample
(
dataset
,
ratio
)
:
sample
=
list
(
)
n_sample
=
round
(
len
(
dataset
)
*
ratio
)
while
len
(
sample
)
<
n_sample
:
index
=
randrange
(
len
(
dataset
)
)
sample
.
append
(
dataset
[
index
]
)
return
sample
# Make a prediction with a list of bagged trees
def
bagging_predict
(
trees
,
row
)
:
predictions
=
[
predict
(
tree
,
row
)
for
tree
in
trees
]
return
max
(
set
(
predictions
)
,
key
=
predictions
.
count
)
# Bootstrap Aggregation Algorithm
def
bagging
(
train
,
test
,
max_depth
,
min_size
,
sample_size
,
n_trees
)
:
trees
=
list
(
)
for
i
in
range
(
n_trees
)
:
sample
=
subsample
(
train
,
sample_size
)
tree
=
build_tree
(
sample
,
max_depth
,
min_size
)
trees
.
append
(
tree
)
predictions
=
[
bagging_predict
(
trees
,
row
)
for
row
in
test
]
return
(
predictions
)
# Test bagging on the sonar dataset
seed
(
1
)
# load and prepare data
filename
=
'sonar.all-data.csv'
dataset
=
load_csv
(
filename
)
# convert string attributes to integers
for
i
in
range
(
len
(
dataset
[
0
]
)
-
1
)
:
str_column_to_float
(
dataset
,
i
)
# convert class column to integers
str_column_to_int
(
dataset
,
len
(
dataset
[
0
]
)
-
1
)
# evaluate algorithm
n_folds
=
5
max_depth
=
6
min_size
=
2
sample_size
=
0.50
for
n_trees
in
[
1
,
5
,
10
,
50
]
:
scores
=
evaluate_algorithm
(
dataset
,
bagging
,
n_folds
,
max_depth
,
min_size
,
sample_size
,
n_trees
)
print
(
'Trees: %d'
%
n_trees
)
print
(
'Scores: %s'
%
scores
)
print
(
'Mean Accuracy: %.3f%%'
%
(
sum
(
scores
)
/
float
(
len
(
scores
)
)
)
)
k 值为 5 时用于交叉验证,每次迭代评估的数据量为 208/5 = 41.6 或者直接使用 40 条进行循环迭代。
构建树的最大深度,我们设为 6,每个节点为 2 的最小训练行数。训练数据集的样本创建为原始数据集大小的 50% 。这是为了强制用于训练每棵树的训练集子样本中的某些变体。bagging 的默认设置是使样本数据集的大小与原始训练数据集的大小相匹配。
接下来我们打印每个类别的结果:
Trees
:
1
Scores
:
[
87.8048780487805
,
65.85365853658537
,
65.85365853658537
,
65.85365853658537
,
73.17073170731707
]
Mean Accuracy
:
71.707
%
Trees
:
5
Scores
:
[
60.97560975609756
,
80.48780487804879
,
78.04878048780488
,
82.92682926829268
,
63.41463414634146
]
Mean Accuracy
:
73.171
%
Trees
:
10
Scores
:
[
60.97560975609756
,
73.17073170731707
,
82.92682926829268
,
80.48780487804879
,
68.29268292682927
]
Mean Accuracy
:
73.171
%
Trees
:
50
Scores
:
[
63.41463414634146
,
75.60975609756098
,
80.48780487804879
,
75.60975609756098
,
85.36585365853658
]
Mean Accuracy
:
76.098
%
这种方法的一个难点是,即使我们构建了一定深度的树,但是 bagging 树得到的结果也是非常相似的。但是我们希望在训练的过程中可以降低高方差。这是因为我们在构造的过程中选择了相同或者相似的分裂节点,这是一种贪婪算法。
本教程试图通过约束用于训练每棵树的样本大小来重新计算方差。更强大的技术是约束在创建每个分割点时对特征的评估。这是随机森林中使用的方法。
扩展
-
调整树:调整树的大小,深度,以及单个树的配置;
-
bagging 中构建不同的树结构:我们可以通过使用不同的算法进行平均预测,比如贝叶斯,决策树,神经网络等等;