书籍下载https://download.csdn.net/download/qfire/11175748
采用MNIST数据集进行卷积神经网络实战学习,面向对象设计方法,把每个独立模块封装成对象。训练一个卷积网络,主要包括7部分:激活函数、正则化、优化方法、卷积网络基本模块、训练方法、网络结构和数据集。每个部分都可以抽象成一个类,其中激活函数、正则化、优化方法、卷积网络基本模块和训练方法这5个类基本固定,可以适用于各种网络结构,并且前4个类都设计为接口类,数据存储在网络结构类种。网络结构类利用卷积网络的基本模块类进行组合,可以实现各种网络结构,如ResNet
1、激活函数
activation_interface.py,主要是ReLU和ELU
import numpy as np
class ActivationInterface(object):
activations = ['ReLU', 'ELU']
@staticmethod
def activation(data, activation):
if activation == 'ReLU':
data = np.maximum(0, data)
return data
if activation == 'ELU':
expdata = np.exp(data) - 1
data = np.where(data > 0, data, expdata)
return data
@staticmethod
def dactivation(ddata, data, activation):
if activation == 'ReLU':
ddata[data <= 0] = 0
return ddata
if activation == 'ELU':
ddatatemp = ddata*(data+1)
ddata = np.where(data > 0, ddata, ddatatemp)
return ddata
@staticmethod
def check_activation(activation):
if activation not in ActivationInterface.activations:
raise ValueError('''Activation methods: ReLU, ELU!''')
2、正则化
regulation_interface.py 两种范数正则化:L2和L1
import numpy as np
class RegulationInterface(object):
regulations = ['L1', 'L2']
@staticmethod
def norm_reg(weight, reg, regulation):
if regulation == 'L2':
return np.sum(weight*weight)*reg/2
if regulation == 'L1':
return np.sum(np.abs(weight))*reg
@staticmethod
def dnorm_reg(weight, reg, regulation):
if regulation == 'L2':
return weight*reg
if regulation == 'L1':
return np.sign(weight)*reg
@staticmethod
def check_regulation(regulation):
if regulation not in RegulationInterface.regulations:
raise ValueError('''Regulation methods: L1, L2!''')
3、优化方法
optimizer_interface.py 实现常用的两种优化方法
import numpy as np
class OptimizerInterface(object):
optimizers = [ 'Nesterov', 'adam']
decay_rate=0.999
eps = 10**(-8)
@staticmethod
def nesterov_momentumGD(lr, param, vparam, dparam, mu=0.9):
pre_vparam = vparam
vparam = mu*vparam - lr*dparam
updata_param = vparam + mu*(vparam - pre_vparam)
update_ratio= np.sum(np.abs(updata_param))/(np.sum(np.abs(param)) + OptimizerInterface.eps)
param += updata_param
return update_ratio
@staticmethod
def adam(lr, param, vparam, cache, dparam, t=1, mu=0.9):
vparam = mu*vparam + (1-mu)*dparam
vparamt = vparam/(1 - mu**t)
cache = OptimizerInterface.decay_rate*cache + (1-OptimizerInterface.decay_rate)*(dparam**2)
cachet = cache/(1 - OptimizerInterface.decay_rate**t)
updata_param = -(lr/(np.sqrt(cachet) + OptimizerInterface.eps)) * vparamt
update_ratio = np.sum(np.abs(updata_param))/(np.sum(np.abs(param)) + OptimizerInterface.eps)
param += updata_param
return update_ratio
@staticmethod
def check_optimizer(optimizer):
if optimizer not in OptimizerInterface.optimizers:
raise ValueError('''updates methods: Nesterov and adam!''')
4、卷积网络的基本模块
cnn_block_interface.py 有卷积层、池化层、全连接层、softmax层的前向和反向,以及参数初始化
import numpy as np
from activation_interface import ActivationInterface
class CnnBlockInterface(ActivationInterface):
'''
the implementation of three basic blocks of cnn net:
the conv pool and fc block
and the softmax layer
'''
@staticmethod
def conv_layer(in_data, weights, biases, layer_param=(0,3,1,1), activation='ReLU'):
'''
in_data.shape = [batch,in_height,in_width,in_depth]
weights.shape = [filter_size*filter_size*in_depth, out_depth]
biases.shape = [1, out_depth]
out_data.shape = [batch,out_height,out_width,out_depth]
the data for calu gradient: matric_data, filter_data
'''
(batch, in_height, in_width, in_depth) = in_data.shape
(out_depth, filter_size, stride, padding) = layer_param
if padding:
padding_data = np.zeros((batch, in_height + 2*padding, in_width + 2*padding, in_depth) )
padding_data[:, padding : -padding, padding : -padding, :] = in_data
else:
padding_data = in_data
filter_size2 = filter_size*filter_size
height_ef = padding_data.shape[1] - filter_size + 1
width_ef = padding_data.shape[2] - filter_size + 1
out_height = (in_height - filter_size + 2*padding)//stride + 1
out_width = (in_width - filter_size + 2*padding)//stride + 1
out_size = out_height*out_width
matric_data = np.zeros( (out_size*batch, filter_size2*in_depth) )
for i_batch in range(batch):
i_batch_size = i_batch*out_size
for i_h, i_height in zip(range(out_height), range(0, height_ef, stride)):
i_height_size = i_batch_size + i_h*out_width
for i_w, i_width in zip(range(out_width), range(0, width_ef, stride)):
matric_data[i_height_size + i_w, :] = padding_data[i_batch, i_height : i_height + filter_size,
i_width : i_width + filter_size, :].ravel()
filter_data = np.dot(matric_data, weights) + biases
filter_data = CnnBlockInterface.activation(filter_data, activation)
out_data = np.zeros((batch, out_height, out_width, out_depth))
for i_batch in range(batch):
i_batch_size = i_batch*out_size
for i_height in range(out_height):
i_height_size = i_batch_size + i_height*out_width
for i_width in range(out_width):
out_data[i_batch, i_height, i_width, :] = filter_data[i_height_size + i_width, :]
return (matric_data, filter_data, out_data)
@staticmethod
def dconv_layer(dout_data, matric_data, filter_data, weights, maps_shape, layer_param=(3,1,1), activation='ReLU'):
'''
inputs: dout_data, matric_data, filter_data
matric_data, filter_data are data produced in the forward
outputs: (dweight, dbias, din_data)
'''
(filter_size, stride, padding) = layer_param
(in_height, in_width, in_depth) = maps_shape
(batch, out_height, out_width, out_depth) = dout_data.shape
out_size = out_height*out_width
dfilter_data = np.zeros_like(filter_data)
for i_batch in range(batch):
i_batch_size = i_batch*out_size
for i_height in range(out_height):
i_height_size = i_batch_size + i_height*out_width
for i_width in range(out_width):
dfilter_data[i_height_size + i_width, :] = dout_data[i_batch, i_height, i_width, :]
dfilter_data = CnnBlockInterface.dactivation(dfilter_data, filter_data, activation)
#backprop the dot product filter_data = np.dot(matric_data, weights) + biases
dweight = np.dot(matric_data.T, dfilter_data)
dbias = np.sum(dfilter_data, axis=0, keepdims=True)
dmatric_data = np.dot(dfilter_data, weights.T)
#backprop the dmatric_data to dpadding_data, just change the shape.
padding_height = in_height + 2*padding
padding_width = in_width + 2*padding
dpadding_data = np.zeros((batch, padding_height, padding_width, in_depth) )
height_ef = padding_height - filter_size + 1
width_ef = padding_width - filter_size + 1
for i_batch in range(batch):
i_batch_size = i_batch*out_size
for i_h, i_height in zip(range(out_height), range(0, height_ef, stride)):
i_height_size = i_batch_size + i_h*out_width
for i_w, i_width in zip(range(out_width), range(0, width_ef, stride)):
dpadding_data[i_batch, i_height : i_height + filter_size, i_width : i_width + filter_size, :] += dmatric_data[i_height_size + i_w, :].reshape(filter_size, filter_size, -1)
#backprop the dpadding_data to din_data
if padding:
din_data = dpadding_data[:,padding:-padding,padding:-padding,:]
else:
din_data = dpadding_data
return (dweight, dbias, din_data)
@staticmethod
def pooling_layer(in_data, filter_size=2, stride=2):
'''
in_data.shape = [batch,in_height,in_width,in_depth]
out_data.shape = [batch,out_height,out_width,out_depth=in_depth]
the data for calu gradient: matric_data_max_pos
'''
(batch, in_height, in_width, in_depth) = in_data.shape
filter_size2 = filter_size*filter_size
height_ef = in_height - filter_size + 1
width_ef = in_width - filter_size + 1
out_height = (in_height - filter_size)//stride + 1
out_width = (in_width - filter_size)//stride + 1
out_size = out_height*out_width
matric_data = np.zeros( (out_size*in_depth*batch, filter_size2) )
for i_batch in range(batch):
i_batch_size = i_batch*out_size*in_depth
for i_h, i_height in zip(range(out_height), range(0, height_ef, stride)):
i_height_size = i_batch_size + i_h*out_width*in_depth
for i_w, i_width in zip(range(0, in_depth*out_width, in_depth), range(0, width_ef, stride)):
md = matric_data[i_height_size + i_w : i_height_size + i_w + in_depth, : ]
src = in_data[i_batch, i_height : i_height + filter_size, i_width : i_width + filter_size, :]
for i in range(filter_size):
for j in range(filter_size):
md[:, i*filter_size + j] = src[i, j, :]
matric_data_max_value = matric_data.max(axis = 1, keepdims = True)
matric_data_max_pos = matric_data == matric_data_max_value #for calu grad
out_depth = in_depth
out_data = np.zeros((batch, out_height, out_width, out_depth))
for i_batch in range(batch):
i_batch_size = i_batch*out_size*out_depth
for i_height in range(out_height):
i_height_size = i_batch_size + i_height*out_width*out_depth
for i_width in range(out_width):
out_data[i_batch, i_height, i_width, :] = matric_data_max_value[i_height_size + i_width*out_depth :
i_height_size + i_width*out_depth + out_depth].ravel()
return (out_data, matric_data_max_pos)
@staticmethod
def dpooling_layer(dout_data, matric_data_max_pos, maps_shape, filter_size=2, stride=2):
'''
dout_data.shape = [batch,out_height,out_width,out_depth=in_depth]
matric_data_max_pos.shape = [batch,in_height,in_width,in_depth]
din_data.shape = [batch,in_height,in_width,in_depth]
'''
(in_height, in_width, not_used) = maps_shape
matric_data_not_max_pos = ~matric_data_max_pos
(batch, out_height, out_width, in_depth) = dout_data.shape
out_size = out_height*out_width
din_data = np.zeros((batch, in_height, in_width, in_depth), dtype = np.float64)
height_ef = in_height - filter_size + 1
width_ef = in_width - filter_size + 1
for i_batch in range(batch):
i_batch_size = i_batch*out_size*in_depth
for i_h_out, i_height in zip(range(out_height), range(0, height_ef, stride)):
i_height_size = i_batch_size + i_h_out*out_width*in_depth
for i_w_dout, i_w, i_width in zip(range(out_width), range(0, in_depth*out_width, in_depth),
range(0, width_ef, stride)):
md = matric_data_not_max_pos[i_height_size + i_w : i_height_size + i_w + in_depth, : ]
din = din_data[i_batch, i_height : i_height + filter_size, i_width : i_width + filter_size, :]
dout = dout_data[i_batch, i_h_out, i_w_dout, :]
for i in range(filter_size):
for j in range(filter_size):
din[i, j, :] = dout[:]
din[i, j, :][md[:, i*filter_size + j]] = 0
return din_data
@staticmethod
def FC_layer(in_data, weights, biases, out_depth, last, activation='ReLU'):
'''
in_data.shape = [batch, in_height, in_width, in_depth]
weights.shape = [filter_size*filter_size*in_depth, out_depth]
biases.shape = [1, out_depth]
last=1 if the FC is the last one
out_data.shape = [batch,out_height,out_width,out_depth]
the data for calu gradient: matric_data, filter_data
'''
(batch, in_height, in_width, in_depth) = in_data.shape
matric_data = np.zeros( (batch, in_height*in_width*in_depth) )
for i_batch in range(batch):
matric_data[i_batch] = in_data[i_batch].ravel()
filter_data = np.dot(matric_data, weights) + biases
if not last: #the last layer not need RELU
filter_data = CnnBlockInterface.activation(filter_data, activation)
out_data = np.zeros((batch, 1, 1, out_depth))
for i_batch in range(batch):
out_data[i_batch] = filter_data[i_batch]
return (matric_data, filter_data, out_data)
@staticmethod
def dFC_layer(dout_data, matric_data, filter_data, weights, maps_shape, last, activation='ReLU'):
'''
inputs: dout_data, matric_data, filter_data
matric_data, filter_data are data produced in the forward
outputs: (dweight, dbias, din_data)
'''
(in_height, in_width, in_depth) = maps_shape
(batch, out_height, out_width, out_depth) = dout_data.shape
dfilter_data = np.zeros_like(filter_data)
for i_batch in range(batch):
dfilter_data[i_batch] = dout_data[i_batch].ravel()
#backprop the ReLU non-linearity
if not last:
dfilter_data = CnnBlockInterface.dactivation(dfilter_data, filter_data, activation)
#backprop the dot product filter_data = np.dot(matric_data, weights) + biases
dweight = np.dot(matric_data.T, dfilter_data)
dbias = np.sum(dfilter_data, axis=0, keepdims=True)
dmatric_data = np.dot(dfilter_data, weights.T)
#backprop the dmatric_data to din_data, just change the shape.
din_data = np.zeros((batch, in_height, in_width, in_depth) )
for i_batch in range(batch):
din_data[i_batch] = dmatric_data[i_batch].reshape(in_height, in_width, -1)
return (dweight, dbias, din_data)
@staticmethod
def softmax_layer(scores):
"""
scores.shape = [batch,1,1,in_depth]
probs.shape = [batch,1,1,in_depth]
"""
scores -= np.max(scores, axis=3, keepdims=True)
exp_scores = np.exp(scores)+10**(-8) # 数值计算更稳定
exp_scores_sum = np.sum(exp_scores, axis=3, keepdims=True)
probs = exp_scores/exp_scores_sum
return probs
@staticmethod
def data_loss(probs, labels):
"""
labels is array of integers specifying correct class
probs.shape = [batch,1,1,in_depth]
"""
probs_correct = probs[range(probs.shape[0]), :, :, labels]
logprobs_correct = -np.log(probs_correct)
data_loss = np.sum(logprobs_correct)/labels.shape[0]
return data_loss
@staticmethod
def evaluate_dscores(probs, labels):
'''
probs.shape = [batch,1,1,in_depth]
labels is array of integers specifying correct class
dscores.shape = [batch,1,1,in_depth]
'''
dscores = probs.copy()
dscores[range(probs.shape[0]), :, :, labels] -= 1
dscores /= labels.shape[0]
return dscores
@staticmethod
def param_init(out_depth, in_depth, filter_size2):
'''
filter_size2 = filter_size*filter_size
weights.shape = [filter_size2*in_depth, out_depth]
'''
std = np.sqrt(2)/np.sqrt(filter_size2*in_depth)
weights = std * np.random.randn(filter_size2*in_depth, out_depth)
biases = np.zeros((1, out_depth))
return (weights, biases)
5、训练方法
cnn_train_interface.py
import numpy as np
import matplotlib.pyplot as plt
class CnnTrainInterface(object):
'''
decay the learning rate every epoch using an exponential rate of lr_decay
support learning rate and regularization random search
also support train and test from checkpoint
'''
def __shuffle_data(self):
shuffle_no = list(range(self.num_train_samples))
np.random.shuffle(shuffle_no)
self.train_labels = self.train_labels[shuffle_no]
self.train_data = self.train_data[shuffle_no]
shuffle_no = list(range(self.num_val_samples))
np.random.shuffle(shuffle_no)
self.val_labels = self.val_labels[shuffle_no]
self.val_data = self.val_data[shuffle_no]
def __train(self, epoch_more=20, lr=10**(-4), reg=10**(-5), batch=64, lr_decay=0.8, mu=0.9,
optimizer='Nesterov', regulation='L2', activation='ReLU'):
# 可视化数据损失、训练集和验证集准确率
plt.close()
fig=plt.figure('')
ax=fig.add_subplot(3,1,1)
ax.grid(True)
ax2=fig.add_subplot(3,1,2)
ax2.grid(True)
ax3=fig.add_subplot(3,1,3)
ax3.grid(True)
plt.xlabel( 'log10(lr)=' + str(round((np.log10(lr)),2)) + ' ' + 'log10(reg)=' + str(round((np.log10(reg)),2)), fontsize=14)
plt.ylabel(' update_ratio accuracy log10(data loss)', fontsize=14)
epoch = 0
val_no = 0
per_epoch_time = self.num_train_samples//batch
while epoch < epoch_more:
losses = 0
self.__shuffle_data()
for i in range(0, self.num_train_samples, batch):
batch_data = self.train_data[i:i+batch,:]
labels = self.train_labels[i:i+batch]
(data_loss, reg_loss) = self.forward(batch_data, labels, reg, regulation, activation)
losses += data_loss + reg_loss
self.backpropagation(labels, reg, regulation, activation)
self.params_update(lr, per_epoch_time*epoch + i+1, mu, optimizer)
update_ratio = self.update_ratio[0][0]
if i % (batch*20) == 0:
ax.scatter(i/self.num_train_samples+epoch, np.log10(data_loss), c='b',marker='.')
train_accuracy = self.predict(batch_data, labels, activation)
batch_data_val = self.val_data[val_no:val_no+batch,:]
labels_val = self.val_labels[val_no:val_no+batch]
val_accuracy = self.predict(batch_data_val, labels_val, activation)
val_no += batch
if val_no >= self.num_val_samples - batch:
val_no = 0
ax2.scatter(i/self.num_train_samples+epoch, (train_accuracy), c='r',marker='*')
ax2.scatter(i/self.num_train_samples+epoch, (val_accuracy), c='b',marker='.')
ax3.scatter(i/self.num_train_samples+epoch, np.log10(update_ratio), c='r',marker='.')
plt.pause(0.000001)
epoch += 1
plt.savefig('checkpoint_' + '(loss_' + str(round(np.log10(losses/per_epoch_time),2)) +
')_(epoch_' + str(round(epoch,2)) + ')_' + '_[(lr reg)_' + '(' + str(round((np.log10(lr)),2)) +
' ' + str(round((np.log10(reg)),2)) + ')]' + '_' +
' ' + optimizer + ' '+ regulation + ' ' + activation + '.png')
self.context[0] = lr
self.save_checkpoint('checkpoint_' + '(loss_' + str(round(np.log10(losses/per_epoch_time),2)) +
')_(epoch_' + str(round(epoch,2)) + ')_' + '_[(lr reg)_' + '(' + str(round((np.log10(lr)),2)) +
' ' + str(round((np.log10(reg)),2)) + ')]' + '_' +
' ' + optimizer + ' '+ regulation + ' ' + activation + '.npy')
lr *= lr_decay #decayed every epoch using an exponential rate
self.test(batch, activation)
def __methods_check(self, optimizer, regulation, activation):
self.check_optimizer(optimizer)
self.check_regulation(regulation)
self.check_activation(activation)
@staticmethod
def __gen_lr_reg(lr=[0, -6], reg=[-3, -6], num_try=10):
minlr = min(lr)
maxlr = max(lr)
randn = np.random.rand(num_try*2)
lr_array = 10**(minlr + (maxlr-minlr)*randn[0: num_try])
minreg = min(reg)
maxreg = max(reg)
reg_array = 10**(minreg + (maxreg-minreg)*randn[num_try: 2*num_try])
lr_regs = zip(lr_array, reg_array)
return lr_regs
def train_random_search(self, lr=[-1, -5], reg=[-1, -5], num_try=10, epoch_more=1,batch=64, lr_decay=0.8, mu=0.9,
optimizer='Nesterov', regulation='L2', activation='ReLU'):
self.__methods_check(optimizer, regulation, activation)
self.featuremap_shape()
lr_regs = self.__gen_lr_reg(lr, reg, num_try)
for lr_reg in lr_regs:
try:
self.init_params()
self.context = [*lr_reg, batch, lr_decay, mu, optimizer, regulation, activation]
self.__train(epoch_more, *lr_reg, batch, lr_decay, mu, optimizer, regulation, activation)
except KeyboardInterrupt:
pass
def train_from_checkpoint(self, epoch_more=10, checkpoint_fname=''):
self.load_checkpoint(checkpoint_fname)
[lr, reg, batch, lr_decay, mu, optimizer, regulation, activation] = self.context
lr = np.double(lr)
reg = np.double(reg)
batch = np.int(batch)
lr_decay = np.double(lr_decay)
mu = np.double(mu)
self.__train(epoch_more, lr, reg, batch, lr_decay, mu, optimizer, regulation, activation)
def test_from_checkpoint(self, checkpoint_fname):
self.load_test_data()
self.load_checkpoint(checkpoint_fname)
[lr, reg, batch, lr_decay, mu, optimizer, regulation, activation] = self.context
batch = np.int(batch)
accuracys = np.zeros(shape=(self.test_labels.shape[0],))
for i in range(0, self.test_labels.shape[0], batch):
batch_data = self.test_data[i:i+batch,:]
label = self.test_labels[i:i+batch]
accuracys[i:i+batch] = self.predict(batch_data, label, activation)
accuracy = np.mean(accuracys)
print('the test accuracy: %.5f' % accuracy)
return accuracy
def test(self, batch, activation):
self.load_test_data()
accuracys = np.zeros(shape=(self.test_labels.shape[0],))
for i in range(0, self.test_labels.shape[0], batch):
batch_data = self.test_data[i:i+batch,:]
label = self.test_labels[i:i+batch]
accuracys[i:i+batch] = self.predict(batch_data, label, activation)
accuracy = np.mean(accuracys)
print('the test accuracy: %.5f' % accuracy)
return accuracy
6、VGG网络结构
vgg_net.py
import numpy as np
import re
class VGGNet(object):
'''
only support the VGG like cnn struct
struct = ['conv_16_5_2_2'] + ['conv_32']*2 + ['pool'] + ['conv_64']*3 + ['pool'] + ['FC_128']
conv_16_5_2_2 = conv_featureMapNum_[_filterSize=3_stride=1_padding=1]
the last TWO layers always are: FC, softmax
all pool layer always have filterSize=2 stride=2
surport save/load the checkpoint
'''
def __init__(self, struct=[]):
if len(struct) == 0:
print('you are using linearity model!')
self.__struct_parse(struct)
self.__struct = struct
self.__struct += ['FC', 'softmax']
def __struct_parse(self, struct):
layers = []
for layer in struct:
convfull = re.match('^conv_(\d{1,3})_(\d{1})_(\d{1})_(\d{1})$', layer)
convdefault = re.match('^conv_(\d{1,3})$', layer)
pool = re.match('^pool$', layer)
fc = re.match('^FC_(\d{1,4})$', layer)
if convfull:
layers.append(( int(convfull.group(1)), int(convfull.group(2)),
int(convfull.group(3)), int(convfull.group(4)), 'conv'))
elif convdefault:
layers.append(( int(convdefault.group(1)),3,1,1,'conv'))
elif pool:
layers.append( (layers[-1][0], 'pool') )
elif fc:
layers.append( (int(fc.group(1)), 'FC') )
else:
raise ValueError('the layer must like conv_16_5_2_2 or conv_16 or pool or FC_64')
layers.append(('', 'Last_FC'))
self.__layers_params = layers
def featuremap_shape(self):
maps_shape = []
in_map_shape = (self.im_height, self.im_width, self.im_dims)
maps_shape.append(in_map_shape)
for layer in self.__layers_params:
if layer[-1] == 'Last_FC':
break
elif layer[-1] == 'FC':
in_map_shape = (1, 1, layer[0])
elif layer[-1] == 'conv':
(out_depth, filter_size, stride, padding, not_used) = layer
out_height = (in_map_shape[0] - filter_size + 2*padding)//stride + 1
out_width = (in_map_shape[1] - filter_size + 2*padding)//stride + 1
in_map_shape = (out_height, out_width, out_depth)
if out_height < filter_size or out_width < filter_size:
raise ValueError('the cnn struct is not compatible with the image size!\n')
elif layer[-1] == 'pool':
filter_size = 2
stride = 2
out_height = (in_map_shape[0] - filter_size)//stride + 1
out_width = (in_map_shape[1] - filter_size)//stride + 1
in_map_shape = (out_height, out_width, layer[0])
if out_height < filter_size or out_width < filter_size:
raise ValueError('the cnn struct is not compatible with the image size!\n')
else:
pass
maps_shape.append(in_map_shape)
self.maps_shape = maps_shape
def init_params(self):
self.__weights = []
self.__biases = []
in_depth = self.im_dims
out_depth = in_depth
for layer_param, map_shape in zip(self.__layers_params, self.maps_shape):
weight = np.array([])
bias = np.array([])
if layer_param[-1] == 'Last_FC':
in_depth = out_depth
out_depth = self.num_class
(weight, bias) = self.param_init(out_depth, in_depth, map_shape[0]*map_shape[1])
elif layer_param[-1] == 'FC':
out_depth = layer_param[0]
in_depth = map_shape[2]
(weight, bias) = self.param_init(out_depth, in_depth, map_shape[0]*map_shape[1])
elif layer_param[-1] == 'conv':
filter_size = layer_param[1]
out_depth = layer_param[0]
(weight, bias) = self.param_init(out_depth, in_depth, filter_size*filter_size)
elif layer_param[-1] == 'pool': # pool has no params
pass
else:
pass
in_depth = out_depth
self.__weights.append(weight)
self.__biases.append(bias)
#softmax layer: no params
#for backprop
self.__vweights = []
self.__vbiases = []
self.__cache_biases = []
self.__cache_weights = []
for weight, bias in zip(self.__weights, self.__biases):
self.__vweights.append(np.zeros_like(weight))
self.__vbiases.append(np.zeros_like(bias))
self.__cache_weights.append(np.zeros_like(weight))
self.__cache_biases.append(np.zeros_like(bias))
def reg_loss(self, reg=10**(-5), regulation='L2'):
reg_loss = 0
for weight in self.__weights:
if weight.size != 0:
reg_loss += self.norm_reg(weight, reg, regulation)
return reg_loss
def forward(self, batch_data, labels, reg=10**(-5), regulation='L2', activation='ReLU'):
self.__matric_data = []
self.__filter_data = []
self.__matric_data_max_pos = []
in_maps = batch_data
for layer_param, weight, bias in zip(self.__layers_params, self.__weights, self.__biases):
matric_data = np.array([])
filter_data = np.array([])
matric_data_max_pos = np.array([])
if layer_param[-1] == 'Last_FC': #last FC layer, no non linearity
(matric_data, filter_data, out_maps) = self.FC_layer(in_maps, weight, bias, self.num_class, 1, activation)
elif layer_param[-1] == 'FC':
(matric_data, filter_data, out_maps) = self.FC_layer(in_maps, weight, bias, layer_param[0], 0, activation)
elif layer_param[-1] == 'conv':
(matric_data, filter_data, out_maps) = self.conv_layer(in_maps, weight, bias, layer_param[0:-1], activation)
elif layer_param[-1] == 'pool':
(out_maps, matric_data_max_pos) = self.pooling_layer(in_maps)
else:
pass
in_maps = out_maps
self.__matric_data.append(matric_data)
self.__filter_data.append(filter_data)
self.__matric_data_max_pos.append(matric_data_max_pos)
self.__probs = self.softmax_layer(out_maps)
data_loss = self.data_loss(self.__probs, labels)
reg_loss = self.reg_loss(reg, regulation)
return (data_loss, reg_loss)
def predict(self, batch_data, labels, activation='ReLU'):
in_maps = batch_data
for layer_param, weight, bias in zip(self.__layers_params, self.__weights, self.__biases):
if layer_param[-1] == 'Last_FC': #last FC layer, no non linearity
(matric_data, filter_data, out_maps) = self.FC_layer(in_maps, weight, bias, self.num_class, 1, activation)
elif layer_param[-1] == 'FC':
(matric_data, filter_data, out_maps) = self.FC_layer(in_maps, weight, bias, layer_param[0], 0, activation)
elif layer_param[-1] == 'conv':
(matric_data, filter_data, out_maps) = self.conv_layer(in_maps, weight, bias, layer_param[0:-1], activation)
elif layer_param[-1] == 'pool':
(out_maps, matric_data_max_pos) = self.pooling_layer(in_maps)
else:
pass
in_maps = out_maps
predicted_class = np.argmax(out_maps, axis=3)
accuracy = predicted_class.ravel() == labels
return np.mean(accuracy)
def dweight_reg(self, reg=10**(-5), regulation='L2'):
for i in range(len(self.__weights)):
weight = self.__weights[i]
if weight.size != 0:
self.__dweights[-1-i] += self.dnorm_reg(weight, reg, regulation)
def backpropagation(self, labels, reg=10**(-5), regulation='L2', activation='ReLU'):
dscores = self.evaluate_dscores(self.__probs, labels)
dout_maps = dscores
self.__dweights = []
self.__dbiases = []
for (layer_param, maps_shape, weight,
matric_data, filter_data, matric_data_max_pos) in zip(reversed(self.__layers_params),
reversed(self.maps_shape), reversed(self.__weights),
reversed(self.__matric_data), reversed(self.__filter_data), reversed(self.__matric_data_max_pos) ):
if layer_param[-1] == 'Last_FC':
(dweight, dbias, din_maps) = self.dFC_layer(dout_maps, matric_data, filter_data,
weight, maps_shape, 1, activation)
elif layer_param[-1] == 'FC':
(dweight, dbias, din_maps) = self.dFC_layer(dout_maps, matric_data, filter_data,
weight, maps_shape, 0, activation)
elif layer_param[-1] == 'conv':
(dweight, dbias, din_maps) = self.dconv_layer(dout_maps, matric_data, filter_data,
weight, maps_shape, layer_param[1:-1], activation)
elif layer_param[-1] == 'pool':
dweight = np.array([])
dbias = np.array([])
din_maps = self.dpooling_layer(dout_maps, matric_data_max_pos, maps_shape)
else:
pass
dout_maps = din_maps
self.__dweights.append(dweight)
self.__dbiases.append(dbias)
self.__dbatch_data = din_maps #grad of input image batch
self.dweight_reg(reg, regulation)
def params_update(self, lr=10**(-4), t=1, mu=0.9, optimizer='Nesterov'):
self.update_ratio = []
if optimizer == 'adam':
for i in range(len(self.__weights)):
weight = self.__weights[i]
bias = self.__biases[i]
dweight = self.__dweights[-1-i]
dbias = self.__dbiases[-1-i]
v_weight = self.__vweights[i]
v_bias = self.__vbiases[i]
cache_weight = self.__cache_weights[i]
cache_bias = self.__cache_biases[i]
if weight.size != 0:
update_ratio_w = self.adam(lr, weight, v_weight, cache_weight, dweight, t, mu)
update_ratio_b = self.adam(lr, bias, v_bias, cache_bias, dbias, t, mu)
self.update_ratio.append((update_ratio_w,update_ratio_b))
if optimizer == 'Nesterov':
for i in range(len(self.__weights)):
weight = self.__weights[i]
bias = self.__biases[i]
dweight = self.__dweights[-1-i]
dbias = self.__dbiases[-1-i]
v_weight = self.__vweights[i]
v_bias = self.__vbiases[i]
if weight.size != 0:
update_ratio_w = self.nesterov_momentumGD(lr, weight, v_weight, dweight, mu)
update_ratio_b = self.nesterov_momentumGD(lr, bias, v_bias, dbias, mu)
self.update_ratio.append((update_ratio_w,update_ratio_b))
def save_checkpoint(self, fname):
with open(fname, 'wb') as f:
np.save(f, np.array([3,1,4,1,5,9,2,8,8])) # magic number
np.save(f, np.array( self.__struct) )
np.save(f, np.array([self.num_class, self.im_dims, self.im_height, self.im_width]) )
np.save(f, np.array(self.__layers_params))
np.save(f, np.array(self.maps_shape))
np.save(f, np.array(self.context))
for array in self.__weights:
np.save(f, array)
for array in self.__biases:
np.save(f, array)
for array in self.__vweights:
np.save(f, array)
for array in self.__vbiases:
np.save(f, array)
for array in self.__cache_weights:
np.save(f, array)
for array in self.__cache_biases:
np.save(f, array)
def load_checkpoint(self, fname):
with open(fname, 'rb') as f:
magic_number = np.load(f)
if not all(magic_number == np.array([3,1,4,1,5,9,2,8,8])): # magic number
raise ValueError('the file format is wrong!\n')
self.__struct = np.load(f)
print('\n\nthe net struct is: \n', self.__struct)
self.num_class, self.im_dims, self.im_height, self.im_width = np.load(f)
self.__layers_params = np.load(f)
self.maps_shape = np.load(f)
self.context = np.load(f)
self.__weights=[]
self.__biases=[]
for i in range(len(self.__layers_params)):
array = np.load(f)
self.__weights.append(array)
for i in range(len(self.__layers_params)):
array = np.load(f)
self.__biases.append(array)
self.__vweights=[]
self.__vbiases=[]
for i in range(len(self.__layers_params)):
array = np.load(f)
self.__vweights.append(array)
for i in range(len(self.__layers_params)):
array = np.load(f)
self.__vbiases.append(array)
self.__cache_weights=[]
self.__cache_biases=[]
for i in range(len(self.__layers_params)):
array = np.load(f)
self.__cache_weights.append(array)
for i in range(len(self.__layers_params)):
array = np.load(f)
self.__cache_biases.append(array)
print('the struct hyper parameters:\n', self.__layers_params)
7、MNIST数据集
http://yann.lecun.com/exdb/mnist/下载到MNIST目录下
MNIST_interface.py
import numpy as np
import gzip, struct
class MNISTInterface(object):
'''
load the mnist dataset
and shuffle split the train set into train and validation set
the ratio of train and validation may be 7:3
'''
#
def load_train_data(self, num_ratio):
(imgs, labels) = MNISTInterface.get_mnist_train()
#data preprocess
imgs = imgs/255 # normalize to [0 1]
# split the data into train and val data subset and shuffle
self.num_samples = labels.size
if isinstance(num_ratio, int):
self.num_train_samples = num_ratio
else:
self.num_train_samples = int(self.num_samples*num_ratio)
self.num_val_samples = self.num_samples - self.num_train_samples
shuffle_no = list(range(self.num_samples))
np.random.shuffle(shuffle_no)
imgs = imgs[shuffle_no]
labels = labels[shuffle_no]
self.train_data = imgs[0:self.num_train_samples]
self.train_labels = labels[0:self.num_train_samples]
self.val_data = imgs[self.num_train_samples::]
self.val_labels = labels[self.num_train_samples::]
self.__set_data_pro()
def load_test_data(self):
(imgs, labels) = MNISTInterface.get_mnist_test()
#data preprocess
imgs = imgs/255 # normalize to [0 1]
self.test_data = imgs
self.test_labels = labels
self.__set_data_pro()
def __set_data_pro(self, num_class=10, im_height=28, im_width=28, im_dims=1):
self.num_class = num_class
self.im_height = im_height
self.im_width = im_width
self.im_dims = im_dims
@staticmethod
def __read(image, label):
mnist_dir = 'MNIST/'
with gzip.open(mnist_dir + label) as flbl:
magic, num = struct.unpack(">II", flbl.read(8))
label = np.fromstring(flbl.read(), dtype=np.uint8)
with gzip.open(mnist_dir + image, 'rb') as fimg:
magic, num, rows, cols = struct.unpack(">IIII", fimg.read(16))
image = np.fromstring(fimg.read(), dtype=np.uint8).reshape(len(label), rows, cols)
return (image, label)
@staticmethod
def get_mnist_train():
train_img, train_label = MNISTInterface.__read('train-images-idx3-ubyte.gz', 'train-labels-idx1-ubyte.gz')
train_img = train_img.reshape((*train_img.shape,1))
return (train_img, train_label)
@staticmethod
def get_mnist_test():
test_img, test_label = MNISTInterface.__read('t10k-images-idx3-ubyte.gz', 't10k-labels-idx1-ubyte.gz')
test_img = test_img.reshape((*test_img.shape,1))
return (test_img, test_label)
8、梯度检测
vgg_grad_check.py
import numpy as np
from vgg_net import VGGNet
from cnn_block_interface import CnnBlockInterface
from regulation_interface import RegulationInterface
class VGGTest(VGGNet, CnnBlockInterface, RegulationInterface):
def set_data_pro(self, num_class=4, im_height=32, im_width=32, im_dims=3):
self.num_class = num_class
self.im_height = im_height
self.im_width = im_width
self.im_dims = im_dims
def gen_random_data(self):
self.num_samples = self.num_class*20
self.data = np.random.randn(self.num_samples, self.im_height, self.im_width, self.im_dims)
self.labels = np.random.randint(self.num_class, size=self.num_samples)
def check_gradient(self, check_weight_or_bias=1, step=10**(-5), reg=10**(-1), regulation='L1', activation='ELU'):
# check_weight_or_bias: 1 for weight, 0 for bias
self.set_data_pro()
self.gen_random_data()
self.featuremap_shape()
self.init_params()
for layer in range(len(self.maps_shape)):
if check_weight_or_bias:
weight = self._VGGNet__weights[layer]
if weight.size == 0:
continue
else:
row = np.random.randint(weight.shape[0])
col = np.random.randint(weight.shape[1])
param = weight[row][col]
else:
bias = self._VGGNet__biases[layer]
if bias.size == 0:
continue
else:
row = np.random.randint(bias.shape[1])
param = bias[0][row]
(data_loss, reg_loss) = self.forward(self.data, self.labels, reg, regulation, activation)
self.backpropagation(self.labels, reg, regulation, activation)
if check_weight_or_bias:
danalytic = self._VGGNet__dweights[-1-layer][row][col]
else:
danalytic = self._VGGNet__dbiases[-1-layer][0][row]
if check_weight_or_bias:
self._VGGNet__weights[layer][row][col] = param - step
else:
self._VGGNet__biases[layer][0][row] = param - step
(data_loss1, reg_loss) = self.forward(self.data, self.labels, reg, regulation, activation)
loss1 = data_loss1 + reg_loss
if check_weight_or_bias:
self._VGGNet__weights[layer][row][col] = param + step
else:
self._VGGNet__biases[layer][0][row] = param + step
(data_loss2, reg_loss) = self.forward(self.data, self.labels, reg, regulation, activation)
loss2 = data_loss2 + reg_loss
dnumeric = (loss2 - loss1)/(2*step)
print(layer, data_loss1, data_loss2)
error_relative = np.abs(danalytic - dnumeric)/np.maximum(danalytic, dnumeric)
print(danalytic, dnumeric, error_relative)
if __name__ == '__main__':
#网络结构
struct = ['conv_32_5_1_0'] + ['pool'] + ['conv_64'] + ['pool'] + ['conv_128']*2 + ['pool'] + ['conv_256'] + ['FC_100']
vgg = VGGTest(struct) #创建网络实例
vgg.check_gradient(check_weight_or_bias=1, step=10**(-5), reg=10**(-50), regulation='L1', activation='ReLU')
采用上面的参数设置,梯度的相对误差在10**(-9)左右时,说明梯度计算正确
9、训练
vgg_test.py
from vgg_net import VGGNet
from cnn_block_interface import CnnBlockInterface
from cnn_train_interface import CnnTrainInterface
from optimizer_interface import OptimizerInterface
from regulation_interface import RegulationInterface
from MNIST_interface import MNISTInterface
class VGGTest(MNISTInterface, VGGNet, CnnBlockInterface, CnnTrainInterface, OptimizerInterface, RegulationInterface):
pass
if __name__ == '__main__':
# struct = [] #linearity model
# struct = ['FC_64'] # one hidden layer network
struct = ['conv_8'] + ['pool'] + ['conv_12']*3 + ['pool'] + ['conv_36']*3 + ['pool'] + ['FC_64']
vgg = VGGTest(struct)
num_samples = 0.7
vgg.load_train_data(num_samples)
train = 1
scratch = 1
if train:
if scratch:
vgg.train_random_search(lr=[-2.0, -5.0], reg=[-3, -5], num_try=1, epoch_more=20, batch=64, lr_decay=1, mu=0.9, optimizer='adam', regulation='L2', activation='ReLU') # 超参数随机搜索
else:
vgg.train_from_checkpoint(epoch_more=2, checkpoint_fname='checkpoint_(loss_-1.23)_(epoch_4)__[(lr reg)_(-3.0 -4.0)]_ adam L2 ELU.npy')
else:
vgg.test_from_checkpoint('checkpoint_(loss_-1.23)_(epoch_4)__[(lr reg)_(-3.0 -4.0)]_ adam L2 ELU.npy')
#%%
'''
98.73 checkpoint_(loss_-1.23)_(epoch_4)__[(lr reg)_(-3.0 -4.0)]_ adam L2 ELU
只使用随机40个样本,测试集准确率达63.68
400个达到90.05
0.5样本准确率97.15
0.25样本准确率97.49
0.1样本准确率96.34
0.05样本准确率92.93
0.01样本准确率83.25
'''