卷积神经网络实例Python(VGG)

系统 1407 0

书籍下载https://download.csdn.net/download/qfire/11175748

    采用MNIST数据集进行卷积神经网络实战学习,面向对象设计方法,把每个独立模块封装成对象。训练一个卷积网络,主要包括7部分:激活函数、正则化、优化方法、卷积网络基本模块、训练方法、网络结构和数据集。每个部分都可以抽象成一个类,其中激活函数、正则化、优化方法、卷积网络基本模块和训练方法这5个类基本固定,可以适用于各种网络结构,并且前4个类都设计为接口类,数据存储在网络结构类种。网络结构类利用卷积网络的基本模块类进行组合,可以实现各种网络结构,如ResNet

1、激活函数

activation_interface.py,主要是ReLU和ELU

            
              import numpy as np

class ActivationInterface(object):
    activations = ['ReLU', 'ELU']
    @staticmethod
    def activation(data, activation):
        if activation == 'ReLU':
            data = np.maximum(0, data)
            return data
        if activation == 'ELU':
            expdata = np.exp(data) - 1
            data = np.where(data > 0, data, expdata) 
            return data
    @staticmethod    
    def dactivation(ddata, data, activation):
        if activation == 'ReLU':
            ddata[data <= 0] = 0
            return ddata
        if activation == 'ELU':
            ddatatemp = ddata*(data+1)
            ddata = np.where(data > 0, ddata, ddatatemp) 
            return ddata
    @staticmethod
    def check_activation(activation):
        if activation not in ActivationInterface.activations:        
            raise ValueError('''Activation methods: ReLU, ELU!''')
            
          

2、正则化

regulation_interface.py  两种范数正则化:L2和L1

            
              import numpy as np

class RegulationInterface(object):
    regulations = ['L1', 'L2']
    @staticmethod
    def norm_reg(weight, reg, regulation):
        if regulation == 'L2':
            return np.sum(weight*weight)*reg/2
        if regulation == 'L1':
            return np.sum(np.abs(weight))*reg
    @staticmethod    
    def dnorm_reg(weight, reg, regulation):
        if regulation == 'L2':
            return weight*reg
        if regulation == 'L1':
            return np.sign(weight)*reg
    @staticmethod
    def check_regulation(regulation):
        if regulation not in RegulationInterface.regulations:        
            raise ValueError('''Regulation methods: L1, L2!''')
            
          

3、优化方法

optimizer_interface.py 实现常用的两种优化方法

            
              import numpy as np

class OptimizerInterface(object):
    optimizers = [ 'Nesterov', 'adam']
    decay_rate=0.999
    eps = 10**(-8)
    @staticmethod
    def nesterov_momentumGD(lr, param, vparam, dparam, mu=0.9):
        pre_vparam = vparam
        vparam = mu*vparam - lr*dparam
        updata_param = vparam + mu*(vparam - pre_vparam)
        update_ratio= np.sum(np.abs(updata_param))/(np.sum(np.abs(param)) + OptimizerInterface.eps)
        param += updata_param
        return update_ratio      
    @staticmethod
    def adam(lr, param, vparam, cache, dparam, t=1, mu=0.9):
        vparam = mu*vparam + (1-mu)*dparam
        vparamt = vparam/(1 - mu**t)
        cache = OptimizerInterface.decay_rate*cache + (1-OptimizerInterface.decay_rate)*(dparam**2)
        cachet = cache/(1 - OptimizerInterface.decay_rate**t)
        updata_param = -(lr/(np.sqrt(cachet) + OptimizerInterface.eps)) * vparamt
        update_ratio = np.sum(np.abs(updata_param))/(np.sum(np.abs(param)) + OptimizerInterface.eps)
        param += updata_param
        return update_ratio
    @staticmethod
    def check_optimizer(optimizer):
        if optimizer not in OptimizerInterface.optimizers:        
            raise ValueError('''updates methods: Nesterov and adam!''')
            
          

4、卷积网络的基本模块

    cnn_block_interface.py 有卷积层、池化层、全连接层、softmax层的前向和反向,以及参数初始化

            
              import numpy as np

from activation_interface import ActivationInterface

class CnnBlockInterface(ActivationInterface):
    '''
    the implementation of three basic blocks of cnn net:
    the conv pool and fc block
    and the softmax layer
    '''
    @staticmethod
    def conv_layer(in_data, weights, biases, layer_param=(0,3,1,1), activation='ReLU'):        
        '''
        in_data.shape = [batch,in_height,in_width,in_depth]        
        weights.shape = [filter_size*filter_size*in_depth, out_depth]     
        biases.shape = [1, out_depth]
        
        out_data.shape = [batch,out_height,out_width,out_depth]
        the data for calu gradient: matric_data, filter_data
        '''
        (batch, in_height, in_width, in_depth) = in_data.shape
        (out_depth, filter_size, stride, padding) = layer_param
        if padding:
            padding_data = np.zeros((batch, in_height + 2*padding, in_width + 2*padding, in_depth) )
            padding_data[:, padding : -padding, padding : -padding, :] = in_data
        else:
            padding_data = in_data
            
        filter_size2 = filter_size*filter_size
        height_ef = padding_data.shape[1] - filter_size + 1
        width_ef = padding_data.shape[2] - filter_size + 1
        
        out_height = (in_height - filter_size + 2*padding)//stride + 1
        out_width = (in_width - filter_size + 2*padding)//stride + 1
        out_size = out_height*out_width       
        matric_data = np.zeros( (out_size*batch, filter_size2*in_depth) )

        for i_batch in range(batch):
            i_batch_size = i_batch*out_size
            for i_h, i_height in zip(range(out_height), range(0, height_ef, stride)):
                i_height_size = i_batch_size + i_h*out_width
                for i_w, i_width in zip(range(out_width), range(0, width_ef, stride)):
                    matric_data[i_height_size + i_w, :] = padding_data[i_batch,  i_height : i_height + filter_size,
                                                                       i_width : i_width  + filter_size, :].ravel()        
        
        filter_data = np.dot(matric_data, weights) + biases
        filter_data = CnnBlockInterface.activation(filter_data, activation)  
        
        out_data = np.zeros((batch, out_height, out_width, out_depth))

        for i_batch in range(batch):
            i_batch_size = i_batch*out_size
            for i_height in range(out_height):
                i_height_size = i_batch_size + i_height*out_width
                for i_width in range(out_width):
                    out_data[i_batch, i_height, i_width, :] = filter_data[i_height_size + i_width, :]

        return (matric_data, filter_data, out_data)
    
    @staticmethod
    def dconv_layer(dout_data, matric_data, filter_data, weights, maps_shape, layer_param=(3,1,1), activation='ReLU'):
        '''
        inputs: dout_data, matric_data, filter_data
        matric_data, filter_data are data produced in the forward
        outputs: (dweight, dbias, din_data)
        '''
        (filter_size, stride, padding) = layer_param
        (in_height, in_width, in_depth) = maps_shape
        (batch, out_height, out_width, out_depth) = dout_data.shape
        out_size = out_height*out_width        
        dfilter_data = np.zeros_like(filter_data)
        
        for i_batch in range(batch):
            i_batch_size = i_batch*out_size
            for i_height in range(out_height):
                i_height_size = i_batch_size + i_height*out_width
                for i_width in range(out_width):
                    dfilter_data[i_height_size + i_width, :] = dout_data[i_batch, i_height, i_width, :]
        
        dfilter_data = CnnBlockInterface.dactivation(dfilter_data, filter_data, activation)

        #backprop the dot product filter_data = np.dot(matric_data, weights) + biases
        dweight = np.dot(matric_data.T, dfilter_data)
        dbias = np.sum(dfilter_data, axis=0, keepdims=True)
        dmatric_data = np.dot(dfilter_data, weights.T)
        
        #backprop the dmatric_data to dpadding_data, just change the shape.
        padding_height = in_height + 2*padding
        padding_width = in_width + 2*padding
        dpadding_data = np.zeros((batch, padding_height, padding_width, in_depth) )
        
        height_ef = padding_height - filter_size + 1
        width_ef = padding_width - filter_size + 1

        for i_batch in range(batch):
            i_batch_size = i_batch*out_size
            for i_h, i_height in zip(range(out_height), range(0, height_ef, stride)):
                i_height_size = i_batch_size + i_h*out_width
                for i_w, i_width in zip(range(out_width), range(0, width_ef, stride)):
                    dpadding_data[i_batch, i_height : i_height + filter_size, i_width : i_width  + filter_size, :] += dmatric_data[i_height_size + i_w, :].reshape(filter_size, filter_size, -1) 
                    
        #backprop the dpadding_data to din_data
        if padding:
            din_data = dpadding_data[:,padding:-padding,padding:-padding,:]
        else:
            din_data = dpadding_data          
        return (dweight, dbias, din_data)
    
    @staticmethod
    def pooling_layer(in_data, filter_size=2, stride=2):
        '''
        in_data.shape = [batch,in_height,in_width,in_depth]
        
        out_data.shape = [batch,out_height,out_width,out_depth=in_depth]
        the data for calu gradient: matric_data_max_pos
        '''
        (batch, in_height, in_width, in_depth) = in_data.shape
        filter_size2 = filter_size*filter_size
        height_ef = in_height - filter_size + 1
        width_ef = in_width - filter_size + 1
        out_height = (in_height - filter_size)//stride + 1 
        out_width = (in_width - filter_size)//stride + 1 
        out_size = out_height*out_width        
        matric_data = np.zeros( (out_size*in_depth*batch, filter_size2) )

        for i_batch in range(batch):
            i_batch_size = i_batch*out_size*in_depth
            for i_h, i_height in zip(range(out_height), range(0, height_ef, stride)):
                i_height_size = i_batch_size + i_h*out_width*in_depth
                for i_w, i_width in zip(range(0, in_depth*out_width, in_depth), range(0, width_ef, stride)):
                    md = matric_data[i_height_size + i_w : i_height_size + i_w + in_depth, : ]
                    src = in_data[i_batch, i_height : i_height + filter_size, i_width : i_width + filter_size, :]
                    for i in range(filter_size):
                        for j in range(filter_size):
                            md[:, i*filter_size + j] = src[i, j, :]
                            
        matric_data_max_value = matric_data.max(axis = 1, keepdims = True)
        matric_data_max_pos = matric_data == matric_data_max_value #for calu grad
               
        out_depth = in_depth        
        out_data = np.zeros((batch, out_height, out_width, out_depth))

        for i_batch in range(batch):
            i_batch_size = i_batch*out_size*out_depth
            for i_height in range(out_height):
                i_height_size = i_batch_size + i_height*out_width*out_depth
                for i_width in range(out_width):
                    out_data[i_batch, i_height, i_width, :] = matric_data_max_value[i_height_size + i_width*out_depth :
                                            					i_height_size + i_width*out_depth + out_depth].ravel()             
        return (out_data, matric_data_max_pos) 
    
    @staticmethod
    def dpooling_layer(dout_data, matric_data_max_pos, maps_shape, filter_size=2, stride=2):
        '''
        dout_data.shape = [batch,out_height,out_width,out_depth=in_depth]
        matric_data_max_pos.shape = [batch,in_height,in_width,in_depth]
        
        din_data.shape = [batch,in_height,in_width,in_depth]        
        '''               
        (in_height, in_width, not_used) = maps_shape        
        matric_data_not_max_pos =  ~matric_data_max_pos
        (batch, out_height, out_width, in_depth) = dout_data.shape
        out_size = out_height*out_width 
        din_data = np.zeros((batch, in_height, in_width, in_depth), dtype = np.float64)

        height_ef = in_height - filter_size + 1
        width_ef = in_width - filter_size + 1          

        for i_batch in range(batch):
            i_batch_size = i_batch*out_size*in_depth
            for i_h_out, i_height in zip(range(out_height), range(0, height_ef, stride)):
                i_height_size = i_batch_size + i_h_out*out_width*in_depth
                for i_w_dout, i_w, i_width in zip(range(out_width), range(0, in_depth*out_width, in_depth),
                                                  			range(0, width_ef, stride)):
                    md = matric_data_not_max_pos[i_height_size + i_w : i_height_size + i_w + in_depth, : ]
                    din = din_data[i_batch, i_height : i_height + filter_size, i_width : i_width + filter_size, :]
                    dout = dout_data[i_batch, i_h_out, i_w_dout, :]
                    for i in range(filter_size):
                        for j in range(filter_size):
                            din[i, j, :] = dout[:]
                            din[i, j, :][md[:, i*filter_size + j]] = 0                          
        return din_data
    
    @staticmethod
    def FC_layer(in_data, weights, biases, out_depth, last, activation='ReLU'):
        '''
        in_data.shape = [batch, in_height, in_width, in_depth]      
        weights.shape = [filter_size*filter_size*in_depth, out_depth]        
        biases.shape = [1, out_depth]
        last=1 if the FC is the last one        
                
        out_data.shape = [batch,out_height,out_width,out_depth] 
        the data for calu gradient: matric_data, filter_data
        '''        
        (batch, in_height, in_width, in_depth) = in_data.shape              
        matric_data = np.zeros( (batch, in_height*in_width*in_depth) )
        for i_batch in range(batch):
            matric_data[i_batch] = in_data[i_batch].ravel()                
        filter_data = np.dot(matric_data, weights) + biases        
        if not last: #the last layer not need RELU
            filter_data = CnnBlockInterface.activation(filter_data, activation)

        out_data = np.zeros((batch, 1, 1, out_depth))
        for i_batch in range(batch):
            out_data[i_batch] = filter_data[i_batch]
        
        return (matric_data, filter_data, out_data)

    @staticmethod
    def dFC_layer(dout_data, matric_data, filter_data, weights, maps_shape, last, activation='ReLU'):
        '''
        inputs: dout_data, matric_data, filter_data
        matric_data, filter_data are data produced in the forward
        outputs: (dweight, dbias, din_data)
        '''
        (in_height, in_width, in_depth) = maps_shape       
        (batch, out_height, out_width, out_depth) = dout_data.shape
        
        dfilter_data = np.zeros_like(filter_data)

        for i_batch in range(batch):
            dfilter_data[i_batch] = dout_data[i_batch].ravel()        
        #backprop the ReLU non-linearity
        if not last:
            dfilter_data = CnnBlockInterface.dactivation(dfilter_data, filter_data, activation)

        #backprop the dot product filter_data = np.dot(matric_data, weights) + biases
        dweight = np.dot(matric_data.T, dfilter_data)
        dbias = np.sum(dfilter_data, axis=0, keepdims=True)
        dmatric_data = np.dot(dfilter_data, weights.T)
        
        #backprop the dmatric_data to din_data, just change the shape.
        din_data = np.zeros((batch, in_height, in_width, in_depth) )
        for i_batch in range(batch):
            din_data[i_batch] = dmatric_data[i_batch].reshape(in_height, in_width, -1)
            
        return (dweight, dbias, din_data)
        
    @staticmethod
    def softmax_layer(scores):
        """
        scores.shape = [batch,1,1,in_depth]
        probs.shape = [batch,1,1,in_depth]
        """ 
        scores -= np.max(scores, axis=3, keepdims=True)
        exp_scores = np.exp(scores)+10**(-8) # 数值计算更稳定
        exp_scores_sum = np.sum(exp_scores, axis=3, keepdims=True)
        probs = exp_scores/exp_scores_sum
        return probs
    
    @staticmethod       
    def data_loss(probs, labels):
        """
        labels is array of integers specifying correct class
        probs.shape = [batch,1,1,in_depth]
        """
        probs_correct = probs[range(probs.shape[0]), :, :, labels]
        logprobs_correct = -np.log(probs_correct)
        data_loss = np.sum(logprobs_correct)/labels.shape[0]    
        return data_loss

    @staticmethod
    def evaluate_dscores(probs, labels):
        '''
        probs.shape = [batch,1,1,in_depth]
        labels is array of integers specifying correct class
        dscores.shape = [batch,1,1,in_depth]
        '''
        dscores = probs.copy()
        dscores[range(probs.shape[0]), :, :, labels] -= 1
        dscores /= labels.shape[0]
        return dscores   

    @staticmethod
    def param_init(out_depth, in_depth, filter_size2):
        '''
        filter_size2 = filter_size*filter_size
        weights.shape = [filter_size2*in_depth, out_depth]
        '''                     
        std = np.sqrt(2)/np.sqrt(filter_size2*in_depth)
        weights = std * np.random.randn(filter_size2*in_depth, out_depth)
        biases = np.zeros((1, out_depth))
        return (weights, biases)
            
          

5、训练方法

cnn_train_interface.py

            
              import numpy as np
import matplotlib.pyplot as plt

class CnnTrainInterface(object):
    '''      
    decay the learning rate every epoch using an exponential rate of lr_decay  
    support learning rate and regularization random search 
    also support train and test from checkpoint
    '''
    def __shuffle_data(self):
        shuffle_no = list(range(self.num_train_samples))
        np.random.shuffle(shuffle_no)
        self.train_labels = self.train_labels[shuffle_no]
        self.train_data = self.train_data[shuffle_no]

        shuffle_no = list(range(self.num_val_samples))
        np.random.shuffle(shuffle_no)
        self.val_labels = self.val_labels[shuffle_no]
        self.val_data = self.val_data[shuffle_no]  
    
    def __train(self, epoch_more=20, lr=10**(-4), reg=10**(-5), batch=64, lr_decay=0.8, mu=0.9, 
                optimizer='Nesterov', regulation='L2', activation='ReLU'):               
        # 可视化数据损失、训练集和验证集准确率
        plt.close()
        fig=plt.figure('')
        ax=fig.add_subplot(3,1,1)
        ax.grid(True)
        ax2=fig.add_subplot(3,1,2)
        ax2.grid(True)
        ax3=fig.add_subplot(3,1,3)
        ax3.grid(True)
        plt.xlabel( 'log10(lr)=' + str(round((np.log10(lr)),2)) + ' ' +  'log10(reg)=' + str(round((np.log10(reg)),2)), fontsize=14)
        plt.ylabel('                                        update_ratio     accuracy       log10(data loss)', fontsize=14)  

        epoch = 0   
        val_no = 0
        per_epoch_time = self.num_train_samples//batch
        while epoch < epoch_more:
            losses = 0
            self.__shuffle_data()       
            for i in range(0, self.num_train_samples, batch):
                batch_data = self.train_data[i:i+batch,:]                
                labels = self.train_labels[i:i+batch]
                (data_loss, reg_loss) = self.forward(batch_data, labels, reg, regulation, activation)   
                losses += data_loss + reg_loss
                self.backpropagation(labels, reg, regulation, activation)
                self.params_update(lr, per_epoch_time*epoch + i+1, mu, optimizer)
                update_ratio = self.update_ratio[0][0]
                
                if i % (batch*20) == 0:
                    ax.scatter(i/self.num_train_samples+epoch, np.log10(data_loss), c='b',marker='.')                    
                    train_accuracy = self.predict(batch_data, labels, activation)                    
                    batch_data_val = self.val_data[val_no:val_no+batch,:]                
                    labels_val = self.val_labels[val_no:val_no+batch]
                    val_accuracy = self.predict(batch_data_val, labels_val, activation)                    
                    val_no += batch
                    if val_no >= self.num_val_samples - batch:
                        val_no = 0
                    ax2.scatter(i/self.num_train_samples+epoch, (train_accuracy), c='r',marker='*')
                    ax2.scatter(i/self.num_train_samples+epoch, (val_accuracy), c='b',marker='.')
                    
                    ax3.scatter(i/self.num_train_samples+epoch, np.log10(update_ratio), c='r',marker='.')
                    plt.pause(0.000001)
            epoch += 1              
        
            plt.savefig('checkpoint_' + '(loss_' + str(round(np.log10(losses/per_epoch_time),2)) +
                                 ')_(epoch_' + str(round(epoch,2)) + ')_' + '_[(lr reg)_' + '(' + str(round((np.log10(lr)),2)) +
                                 ' ' + str(round((np.log10(reg)),2)) + ')]' + '_' + 
                 ' ' + optimizer + ' '+ regulation + ' ' + activation + '.png')
    
            self.context[0] = lr
            self.save_checkpoint('checkpoint_' + '(loss_' + str(round(np.log10(losses/per_epoch_time),2)) +
                                 ')_(epoch_' + str(round(epoch,2)) + ')_' + '_[(lr reg)_' + '(' + str(round((np.log10(lr)),2)) +
                                 ' ' + str(round((np.log10(reg)),2)) + ')]' + '_' + 
                 ' ' + optimizer + ' '+ regulation + ' ' + activation + '.npy')
    
            lr *= lr_decay #decayed every epoch using an exponential rate

        self.test(batch, activation)            
    
    def __methods_check(self, optimizer, regulation, activation):        
        self.check_optimizer(optimizer)
        self.check_regulation(regulation)
        self.check_activation(activation)       
               
    @staticmethod
    def __gen_lr_reg(lr=[0, -6], reg=[-3, -6], num_try=10):
        minlr = min(lr)
        maxlr = max(lr)        
        randn = np.random.rand(num_try*2)
        lr_array = 10**(minlr + (maxlr-minlr)*randn[0: num_try])
             
        minreg = min(reg)
        maxreg = max(reg)
        reg_array = 10**(minreg + (maxreg-minreg)*randn[num_try: 2*num_try])       
        lr_regs =  zip(lr_array, reg_array)
        return lr_regs
    
    def train_random_search(self, lr=[-1, -5], reg=[-1, -5], num_try=10, epoch_more=1,batch=64, lr_decay=0.8, mu=0.9, 
                            optimizer='Nesterov', regulation='L2', activation='ReLU'):        
        self.__methods_check(optimizer, regulation, activation)          
        self.featuremap_shape()    
        lr_regs = self.__gen_lr_reg(lr, reg, num_try)       
        for lr_reg in lr_regs:
            try:
                self.init_params()
                self.context = [*lr_reg, batch, lr_decay, mu, optimizer, regulation, activation]
                self.__train(epoch_more, *lr_reg, batch, lr_decay, mu, optimizer, regulation, activation)
            except KeyboardInterrupt:
                pass   
        
    def train_from_checkpoint(self, epoch_more=10, checkpoint_fname=''):                              
        self.load_checkpoint(checkpoint_fname) 
        [lr, reg, batch, lr_decay, mu, optimizer, regulation, activation] = self.context
        lr = np.double(lr)
        reg = np.double(reg)
        batch = np.int(batch)
        lr_decay = np.double(lr_decay)
        mu = np.double(mu)
        self.__train(epoch_more, lr, reg, batch, lr_decay, mu, optimizer, regulation, activation)
        
    def test_from_checkpoint(self, checkpoint_fname):    
        self.load_test_data()
        self.load_checkpoint(checkpoint_fname) 
        [lr, reg, batch, lr_decay, mu, optimizer, regulation, activation] = self.context        
        batch = np.int(batch)      
        accuracys = np.zeros(shape=(self.test_labels.shape[0],))
        for i in range(0, self.test_labels.shape[0], batch):
            batch_data = self.test_data[i:i+batch,:]
            label = self.test_labels[i:i+batch]
            accuracys[i:i+batch] = self.predict(batch_data, label, activation)
            
        accuracy = np.mean(accuracys)            
        print('the test accuracy: %.5f' % accuracy)
        return accuracy
        
    def test(self, batch, activation):   
        self.load_test_data()
        accuracys = np.zeros(shape=(self.test_labels.shape[0],))
        for i in range(0, self.test_labels.shape[0], batch):
            batch_data = self.test_data[i:i+batch,:]
            label = self.test_labels[i:i+batch]
            accuracys[i:i+batch] = self.predict(batch_data, label, activation)
            
        accuracy = np.mean(accuracys)            
        print('the test accuracy: %.5f' % accuracy)
        return accuracy
            
          

6、VGG网络结构

vgg_net.py

            
              import numpy as np
import re

class VGGNet(object):
    '''
    only support the VGG like cnn struct
    struct = ['conv_16_5_2_2'] + ['conv_32']*2 + ['pool'] + ['conv_64']*3 + ['pool'] + ['FC_128']
    conv_16_5_2_2 = conv_featureMapNum_[_filterSize=3_stride=1_padding=1]
    the last TWO layers always are: FC, softmax
    all pool layer always have filterSize=2 stride=2    
    surport save/load the checkpoint
    '''           
    def __init__(self, struct=[]):
        if len(struct) == 0:
            print('you are using linearity model!')
        self.__struct_parse(struct)
        self.__struct = struct
        self.__struct += ['FC', 'softmax']
    
    def __struct_parse(self, struct):
        layers = []
        for layer in struct:
            convfull = re.match('^conv_(\d{1,3})_(\d{1})_(\d{1})_(\d{1})$', layer)
            convdefault = re.match('^conv_(\d{1,3})$', layer)
            pool = re.match('^pool$', layer)
            fc = re.match('^FC_(\d{1,4})$', layer)
            if convfull:
                layers.append(( int(convfull.group(1)), int(convfull.group(2)), 
                               int(convfull.group(3)), int(convfull.group(4)), 'conv'))
            elif convdefault:
                layers.append(( int(convdefault.group(1)),3,1,1,'conv'))
            elif pool:
                layers.append( (layers[-1][0], 'pool') )
            elif fc:
                layers.append( (int(fc.group(1)), 'FC') )
            else:
                raise ValueError('the layer must like conv_16_5_2_2 or conv_16 or pool or FC_64')
                       
        layers.append(('', 'Last_FC'))
        self.__layers_params = layers
        
    def featuremap_shape(self):        
        maps_shape = []
        in_map_shape = (self.im_height, self.im_width, self.im_dims)
        maps_shape.append(in_map_shape)
        for layer in self.__layers_params:
            if layer[-1] == 'Last_FC':
                break
            elif layer[-1] == 'FC':
                in_map_shape = (1, 1, layer[0])                              
            elif layer[-1] == 'conv':
                (out_depth, filter_size, stride, padding, not_used) = layer
                out_height = (in_map_shape[0] - filter_size + 2*padding)//stride + 1
                out_width = (in_map_shape[1] - filter_size + 2*padding)//stride + 1
                in_map_shape = (out_height, out_width, out_depth)
                if out_height < filter_size or out_width < filter_size:
                    raise ValueError('the cnn struct is not compatible with the image size!\n')
            elif layer[-1] == 'pool':
                filter_size = 2
                stride = 2
                out_height = (in_map_shape[0] - filter_size)//stride + 1
                out_width = (in_map_shape[1] - filter_size)//stride + 1
                in_map_shape = (out_height, out_width, layer[0])
                if out_height < filter_size or out_width < filter_size:
                    raise ValueError('the cnn struct is not compatible with the image size!\n')   
            else:
                pass
            maps_shape.append(in_map_shape)       
        self.maps_shape = maps_shape
        
    def init_params(self):
        self.__weights = []
        self.__biases = []
        in_depth = self.im_dims
        out_depth = in_depth
        for layer_param, map_shape in zip(self.__layers_params, self.maps_shape):
            weight = np.array([])
            bias = np.array([])
            if layer_param[-1] == 'Last_FC':
                in_depth = out_depth
                out_depth = self.num_class
                (weight, bias) = self.param_init(out_depth, in_depth, map_shape[0]*map_shape[1])
            elif layer_param[-1] == 'FC':
                out_depth = layer_param[0]
                in_depth = map_shape[2]
                (weight, bias) = self.param_init(out_depth, in_depth, map_shape[0]*map_shape[1])
            elif layer_param[-1] == 'conv':
                filter_size = layer_param[1]
                out_depth = layer_param[0]
                (weight, bias) = self.param_init(out_depth, in_depth, filter_size*filter_size)
            elif layer_param[-1] == 'pool': # pool has no params
                pass
            else: 
                pass
            in_depth = out_depth
            self.__weights.append(weight)
            self.__biases.append(bias)
        #softmax layer: no params        
        #for backprop
        self.__vweights = []
        self.__vbiases = []
        self.__cache_biases = []
        self.__cache_weights = []
        for weight, bias in zip(self.__weights, self.__biases):
            self.__vweights.append(np.zeros_like(weight))
            self.__vbiases.append(np.zeros_like(bias))
            self.__cache_weights.append(np.zeros_like(weight))
            self.__cache_biases.append(np.zeros_like(bias))
            
    def reg_loss(self, reg=10**(-5), regulation='L2'):
        reg_loss = 0
        for weight in self.__weights:
            if weight.size != 0:
                reg_loss += self.norm_reg(weight, reg, regulation)
        return reg_loss
    
    def forward(self, batch_data, labels, reg=10**(-5), regulation='L2', activation='ReLU'):
        self.__matric_data = []
        self.__filter_data = []
        self.__matric_data_max_pos = []

        in_maps = batch_data                
        for layer_param, weight, bias in zip(self.__layers_params, self.__weights, self.__biases):
            matric_data = np.array([])
            filter_data = np.array([])
            matric_data_max_pos = np.array([])
            if layer_param[-1] == 'Last_FC': #last FC layer, no non linearity
                (matric_data, filter_data, out_maps) = self.FC_layer(in_maps, weight, bias, self.num_class, 1, activation)
            elif layer_param[-1] == 'FC':               
                (matric_data, filter_data, out_maps) = self.FC_layer(in_maps, weight, bias, layer_param[0], 0, activation)
            elif layer_param[-1] == 'conv':
                (matric_data, filter_data, out_maps) = self.conv_layer(in_maps, weight, bias, layer_param[0:-1], activation)               
            elif layer_param[-1] == 'pool':
                (out_maps, matric_data_max_pos)  = self.pooling_layer(in_maps)
            else:
                pass
            in_maps = out_maps

            self.__matric_data.append(matric_data)
            self.__filter_data.append(filter_data)
            self.__matric_data_max_pos.append(matric_data_max_pos)

        self.__probs = self.softmax_layer(out_maps)
        data_loss = self.data_loss(self.__probs, labels)
        reg_loss = self.reg_loss(reg, regulation)
        return (data_loss, reg_loss)
    
    def predict(self, batch_data, labels, activation='ReLU'):        
        in_maps = batch_data                
        for layer_param, weight, bias in zip(self.__layers_params, self.__weights, self.__biases):           
            if layer_param[-1] == 'Last_FC': #last FC layer, no non linearity
                (matric_data, filter_data, out_maps) = self.FC_layer(in_maps, weight, bias, self.num_class, 1, activation)
            elif layer_param[-1] == 'FC':               
                (matric_data, filter_data, out_maps) = self.FC_layer(in_maps, weight, bias, layer_param[0], 0, activation)
            elif layer_param[-1] == 'conv':
                (matric_data, filter_data, out_maps) = self.conv_layer(in_maps, weight, bias, layer_param[0:-1], activation)             
            elif layer_param[-1] == 'pool':
                (out_maps, matric_data_max_pos)  = self.pooling_layer(in_maps)
            else:
                pass
            in_maps = out_maps
        predicted_class = np.argmax(out_maps, axis=3)
        accuracy = predicted_class.ravel() == labels        
        return np.mean(accuracy)

    
    def dweight_reg(self, reg=10**(-5), regulation='L2'):
        for i in range(len(self.__weights)):
            weight = self.__weights[i]
            if weight.size != 0:
                self.__dweights[-1-i] += self.dnorm_reg(weight, reg, regulation)
            
    def backpropagation(self, labels, reg=10**(-5), regulation='L2', activation='ReLU'):
        dscores = self.evaluate_dscores(self.__probs, labels)
        dout_maps = dscores
        self.__dweights = []
        self.__dbiases = []
        for (layer_param, maps_shape, weight,
             matric_data, filter_data, matric_data_max_pos) in zip(reversed(self.__layers_params),
            reversed(self.maps_shape), reversed(self.__weights),
            reversed(self.__matric_data), reversed(self.__filter_data), reversed(self.__matric_data_max_pos) ):
            if layer_param[-1] == 'Last_FC':
                (dweight, dbias, din_maps) = self.dFC_layer(dout_maps, matric_data, filter_data,
                                                               weight, maps_shape, 1, activation)
            elif layer_param[-1] == 'FC':
                (dweight, dbias, din_maps) = self.dFC_layer(dout_maps, matric_data, filter_data,
                                                               weight, maps_shape, 0, activation)
            elif layer_param[-1] == 'conv':   
                (dweight, dbias, din_maps) = self.dconv_layer(dout_maps, matric_data, filter_data,
                                                               weight, maps_shape, layer_param[1:-1], activation)            
            elif layer_param[-1] == 'pool':  
                dweight = np.array([])
                dbias = np.array([])
                din_maps = self.dpooling_layer(dout_maps, matric_data_max_pos, maps_shape)
            else:
                pass
            dout_maps = din_maps
            self.__dweights.append(dweight)
            self.__dbiases.append(dbias)
        self.__dbatch_data = din_maps #grad of input image batch
        self.dweight_reg(reg, regulation)  

    def params_update(self, lr=10**(-4), t=1, mu=0.9, optimizer='Nesterov'):
        self.update_ratio = []
        if optimizer == 'adam':
            for i in range(len(self.__weights)):
                weight = self.__weights[i]
                bias = self.__biases[i]
                dweight = self.__dweights[-1-i]
                dbias = self.__dbiases[-1-i]
                v_weight = self.__vweights[i]
                v_bias = self.__vbiases[i]
                cache_weight = self.__cache_weights[i]
                cache_bias = self.__cache_biases[i]
                if weight.size != 0:
                    update_ratio_w = self.adam(lr, weight, v_weight, cache_weight, dweight, t, mu)
                    update_ratio_b = self.adam(lr, bias, v_bias, cache_bias, dbias, t, mu)
                    self.update_ratio.append((update_ratio_w,update_ratio_b))
                    
        if optimizer == 'Nesterov':
            for i in range(len(self.__weights)):
                weight = self.__weights[i]
                bias = self.__biases[i]
                dweight = self.__dweights[-1-i]
                dbias = self.__dbiases[-1-i]
                v_weight = self.__vweights[i]
                v_bias = self.__vbiases[i]
                if weight.size != 0:
                    update_ratio_w = self.nesterov_momentumGD(lr, weight, v_weight, dweight, mu)
                    update_ratio_b = self.nesterov_momentumGD(lr, bias, v_bias, dbias, mu)
                    self.update_ratio.append((update_ratio_w,update_ratio_b))

    def save_checkpoint(self, fname):   
        with open(fname, 'wb') as f:
            np.save(f, np.array([3,1,4,1,5,9,2,8,8])) # magic number
            np.save(f, np.array( self.__struct) )
            np.save(f, np.array([self.num_class, self.im_dims, self.im_height, self.im_width]) )
            np.save(f, np.array(self.__layers_params))
            np.save(f, np.array(self.maps_shape))
            np.save(f, np.array(self.context))
            for array in self.__weights:
                np.save(f, array)
            for array in self.__biases:
                np.save(f, array)
            for array in self.__vweights:
                np.save(f, array)
            for array in self.__vbiases:
                np.save(f, array)
            for array in self.__cache_weights:
                np.save(f, array)
            for array in self.__cache_biases:
                np.save(f, array)
 
    def load_checkpoint(self, fname):
        with open(fname, 'rb') as f:
            magic_number = np.load(f)
            if not all(magic_number == np.array([3,1,4,1,5,9,2,8,8])):  # magic number
                raise ValueError('the file format is wrong!\n')
            self.__struct = np.load(f)
            print('\n\nthe net struct is: \n', self.__struct)
            self.num_class, self.im_dims, self.im_height, self.im_width = np.load(f)                
            self.__layers_params = np.load(f)
            self.maps_shape = np.load(f)
            self.context = np.load(f)
            self.__weights=[]
            self.__biases=[]
            for i in range(len(self.__layers_params)):
                array = np.load(f)
                self.__weights.append(array)
            for i in range(len(self.__layers_params)):
                array = np.load(f)
                self.__biases.append(array)
            self.__vweights=[]
            self.__vbiases=[]
            for i in range(len(self.__layers_params)):
                array = np.load(f)
                self.__vweights.append(array)
            for i in range(len(self.__layers_params)):
                array = np.load(f)
                self.__vbiases.append(array)                
            self.__cache_weights=[]
            self.__cache_biases=[]
            for i in range(len(self.__layers_params)):
                array = np.load(f)
                self.__cache_weights.append(array)
            for i in range(len(self.__layers_params)):
                array = np.load(f)
                self.__cache_biases.append(array)                
            print('the struct hyper parameters:\n', self.__layers_params)
            
          

7、MNIST数据集

http://yann.lecun.com/exdb/mnist/下载到MNIST目录下

MNIST_interface.py

            
              import numpy as np
import gzip, struct

class MNISTInterface(object):
    ''' 
    load the mnist dataset
    and shuffle split the train set into train and validation set
    the ratio of train and validation may be 7:3 
    '''
    #
    def load_train_data(self, num_ratio):
        (imgs, labels) = MNISTInterface.get_mnist_train()       
        #data preprocess
        imgs = imgs/255  # normalize to [0 1]    
        # split the data into train and val data subset and shuffle
        self.num_samples = labels.size
        if isinstance(num_ratio, int):
            self.num_train_samples = num_ratio                
        else:
            self.num_train_samples = int(self.num_samples*num_ratio)
        self.num_val_samples = self.num_samples - self.num_train_samples        
        shuffle_no = list(range(self.num_samples))
        np.random.shuffle(shuffle_no)
        imgs = imgs[shuffle_no]
        labels = labels[shuffle_no]
        self.train_data = imgs[0:self.num_train_samples]
        self.train_labels = labels[0:self.num_train_samples]
        self.val_data = imgs[self.num_train_samples::]
        self.val_labels = labels[self.num_train_samples::]
        
        self.__set_data_pro()
        
    def load_test_data(self):
        (imgs, labels) = MNISTInterface.get_mnist_test()        
        #data preprocess
        imgs = imgs/255  # normalize to [0 1]   
        self.test_data = imgs
        self.test_labels = labels
        self.__set_data_pro()
        
    def __set_data_pro(self, num_class=10, im_height=28, im_width=28, im_dims=1):        
        self.num_class = num_class
        self.im_height = im_height
        self.im_width = im_width
        self.im_dims = im_dims
    
    @staticmethod
    def __read(image, label):
        mnist_dir = 'MNIST/'
        with gzip.open(mnist_dir + label) as flbl:
            magic, num = struct.unpack(">II", flbl.read(8))
            label = np.fromstring(flbl.read(), dtype=np.uint8)
        with gzip.open(mnist_dir + image, 'rb') as fimg:
            magic, num, rows, cols = struct.unpack(">IIII", fimg.read(16))
            image = np.fromstring(fimg.read(), dtype=np.uint8).reshape(len(label), rows, cols)
        return (image, label)
    
    @staticmethod
    def get_mnist_train():
        train_img, train_label = MNISTInterface.__read('train-images-idx3-ubyte.gz', 'train-labels-idx1-ubyte.gz')   
        train_img = train_img.reshape((*train_img.shape,1))
        return (train_img, train_label)
    
    @staticmethod
    def get_mnist_test():
        test_img, test_label = MNISTInterface.__read('t10k-images-idx3-ubyte.gz', 't10k-labels-idx1-ubyte.gz')      
        test_img = test_img.reshape((*test_img.shape,1))   
        return (test_img, test_label)
            
          

8、梯度检测

vgg_grad_check.py

            
              import numpy as np

from vgg_net import VGGNet
from cnn_block_interface import CnnBlockInterface
from regulation_interface import RegulationInterface

class VGGTest(VGGNet, CnnBlockInterface, RegulationInterface):
    def set_data_pro(self, num_class=4, im_height=32, im_width=32, im_dims=3):        
        self.num_class = num_class
        self.im_height = im_height
        self.im_width = im_width
        self.im_dims = im_dims 
        
    def gen_random_data(self):
            self.num_samples = self.num_class*20
            self.data = np.random.randn(self.num_samples, self.im_height, self.im_width, self.im_dims)        
            self.labels = np.random.randint(self.num_class, size=self.num_samples)
        
    def check_gradient(self, check_weight_or_bias=1, step=10**(-5), reg=10**(-1), regulation='L1', activation='ELU'):          
    #    check_weight_or_bias: 1 for weight, 0 for bias   
        self.set_data_pro()
        self.gen_random_data()     
        self.featuremap_shape()
        self.init_params()            
        for layer in range(len(self.maps_shape)):
            if check_weight_or_bias:
                weight = self._VGGNet__weights[layer]
                if weight.size == 0:
                    continue
                else:
                    row = np.random.randint(weight.shape[0])
                    col = np.random.randint(weight.shape[1])
                    param = weight[row][col]                    
            else:
                bias = self._VGGNet__biases[layer]
                if bias.size == 0:
                    continue
                else:
                    row = np.random.randint(bias.shape[1])
                    param = bias[0][row]                
            
            (data_loss, reg_loss) = self.forward(self.data, self.labels, reg, regulation, activation)
            self.backpropagation(self.labels, reg, regulation, activation)              
            if check_weight_or_bias:
                danalytic = self._VGGNet__dweights[-1-layer][row][col]
            else:
                danalytic = self._VGGNet__dbiases[-1-layer][0][row]
        
            if check_weight_or_bias:
                self._VGGNet__weights[layer][row][col] = param - step
            else:
                self._VGGNet__biases[layer][0][row] = param - step    
            (data_loss1, reg_loss) = self.forward(self.data, self.labels, reg, regulation, activation)
            loss1 = data_loss1 + reg_loss
            
            if check_weight_or_bias:
                self._VGGNet__weights[layer][row][col] = param + step
            else:
                self._VGGNet__biases[layer][0][row] = param + step
            (data_loss2, reg_loss) = self.forward(self.data, self.labels, reg, regulation, activation)
            loss2 = data_loss2 + reg_loss
            dnumeric = (loss2 - loss1)/(2*step)
            
            print(layer, data_loss1, data_loss2)    
            error_relative = np.abs(danalytic - dnumeric)/np.maximum(danalytic, dnumeric)   
            print(danalytic, dnumeric, error_relative)  
            
if __name__ == '__main__':
    #网络结构    
    struct = ['conv_32_5_1_0'] + ['pool'] + ['conv_64'] + ['pool'] + ['conv_128']*2 + ['pool'] + ['conv_256'] + ['FC_100']
    vgg = VGGTest(struct) #创建网络实例
    vgg.check_gradient(check_weight_or_bias=1, step=10**(-5), reg=10**(-50), regulation='L1', activation='ReLU')
            
          

    采用上面的参数设置,梯度的相对误差在10**(-9)左右时,说明梯度计算正确

9、训练

vgg_test.py

            
              from vgg_net import VGGNet
from cnn_block_interface import CnnBlockInterface
from cnn_train_interface import CnnTrainInterface
from optimizer_interface import OptimizerInterface
from regulation_interface import RegulationInterface
from MNIST_interface import MNISTInterface

class VGGTest(MNISTInterface, VGGNet, CnnBlockInterface, CnnTrainInterface, OptimizerInterface, RegulationInterface):
    pass
   
if __name__ == '__main__':
       
#    struct = [] #linearity model
#    struct = ['FC_64'] # one hidden layer network
    struct = ['conv_8'] + ['pool'] + ['conv_12']*3 + ['pool']  + ['conv_36']*3  + ['pool'] + ['FC_64']
    vgg = VGGTest(struct)
    num_samples = 0.7
    vgg.load_train_data(num_samples)
    
    train = 1
    scratch = 1
    
    if train:
        if scratch:
            vgg.train_random_search(lr=[-2.0, -5.0], reg=[-3, -5], num_try=1, epoch_more=20, batch=64, lr_decay=1, mu=0.9, optimizer='adam', regulation='L2', activation='ReLU') # 超参数随机搜索
        else:
            vgg.train_from_checkpoint(epoch_more=2, checkpoint_fname='checkpoint_(loss_-1.23)_(epoch_4)__[(lr reg)_(-3.0 -4.0)]_ adam L2 ELU.npy')

    else:       
        vgg.test_from_checkpoint('checkpoint_(loss_-1.23)_(epoch_4)__[(lr reg)_(-3.0 -4.0)]_ adam L2 ELU.npy')
 
#%%
'''
98.73 checkpoint_(loss_-1.23)_(epoch_4)__[(lr reg)_(-3.0 -4.0)]_ adam L2 ELU

只使用随机40个样本,测试集准确率达63.68
400个达到90.05
0.5样本准确率97.15
0.25样本准确率97.49
0.1样本准确率96.34
0.05样本准确率92.93
0.01样本准确率83.25
'''
            
          

 

 

 

 


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论