import numpy as np import mindspore.dataset as ds import os import cv2 import mindspore import mindspore.nn as nn from mindspore import Tensor from mindspore.common.initializer import Normal from mindspore import context from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor from mindspore.train import Model from mindspore.nn.metrics import Accuracy np.random.seed(58)

class LeNet5(nn.Cell): 'Lenet network

Args:
    num_class (int): Number of classes. Default: 10.
    num_channel (int): Number of channels. Default: 1.

Returns:
    Tensor, output tensor
Examples:
    >>> LeNet(num_class=10)

'
def __init__(self, num_class=10, num_channel=3, include_top=True):
    super(LeNet5, self).__init__()
    self.conv1 = nn.Conv2d(num_channel, 6, 5, pad_mode='valid')
    self.conv2 = nn.Conv2d(6, 16, 5, pad_mode='valid')
    self.relu = nn.ReLU()
    self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)
    self.include_top = include_top
    #如果include_top为True,则在模型的最后添加全连接层,
    #其中包括三个Dense层,分别是784->120、120->84、84->num_class
    # 每个Dense层采用正态分布初始化权重。同时,还定义了一个Flatten层,用于将输入数据压平。
    if self.include_top:#include_top是一个布尔值
        #如果include_top为True,则模型的顶部将包括全局平均池化层和分类器,用于进行分类
        #如果include_top为False,则模型的顶部将被删除,并且可以添加自定义的分类器。
        self.flatten = nn.Flatten()
        self.fc1 = nn.Dense(784, 120, weight_init=Normal(0.02))
        self.fc2 = nn.Dense(120, 84, weight_init=Normal(0.02))
        self.fc3 = nn.Dense(84, num_class, weight_init=Normal(0.02))


def construct(self, x):
    x = self.conv1(x)
    x = self.relu(x)
    x = self.max_pool2d(x)
    x = self.conv2(x)
    x = self.relu(x)
    x = self.max_pool2d(x)
    if not self.include_top:
        return x
    x = self.flatten(x)
    x = self.relu(self.fc1(x))
    x = self.relu(self.fc2(x))
    x = self.fc3(x)
    return x

#生成训练数据集,从指定路径读取图像文件,并以元组形式返回图像数据和标签 class TrainDatasetGenerator: def init(self, file_path): self.file_path = file_path self.img_names = os.listdir(file_path)

def __getitem__(self, index):
    data = cv2.imread(os.path.join(self.file_path, self.img_names[index]))
    label = int(self.img_names[index][0])-1#图像对应的类别
    #label = np.array([label])
    data = data.transpose().astype(np.float32) / 255.
    #data = np.expand_dims(data, axis=0)
    #data = Tensor(data)
    #label = Tensor(label)
    return data, label

def __len__(self):
    return len(self.img_names)

def train_lenet(): context.set_context(mode=context.GRAPH_MODE, device_target='CPU') train_dataset_generator = TrainDatasetGenerator('D:/code/machine vision course/digit-mindspore/dataset') ds_train = ds.GeneratorDataset(train_dataset_generator, ['data', 'label'], shuffle=True) ds_train = ds_train.shuffle(buffer_size=10) ds_train = ds_train.batch(batch_size=4, drop_remainder=True) valid_dataset_generator = TrainDatasetGenerator('D:/code/machine vision course/digit-mindspore/test') ds_valid = ds.GeneratorDataset(valid_dataset_generator, ['data', 'label'], shuffle=True) ds_valid = ds_valid.batch(batch_size=4, drop_remainder=True)

# 新建模型,并加载预训练模型的权重
network = LeNet5(num_class=7, include_top=False)
network = load_pretrained_weights(network, 'lenet5.pth')

# 新增全连接层,并定义Flatten层
fc1 = nn.Dense(120, 84, weight_init=Normal(0.02))
fc2 = nn.Dense(84, 7, weight_init=Normal(0.02))
flatten = nn.Flatten()

# 定义新模型的前向传播过程
def construct(x):
    x = network(x)
    x = flatten(x)
    x = fc1(x)
    x = fc2(x)
    return x

# 定义损失函数、优化器和回调函数
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
net_opt = nn.Momentum(construct.trainable_params(), learning_rate=0.01, momentum=0.9)
time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
config_ck = CheckpointConfig(save_checkpoint_steps=10,
                             keep_checkpoint_max=10)
config_ckpt_path = 'D:/code/machine vision course/digit-mindspore/ckpt/'
ckpoint_cb = ModelCheckpoint(prefix='checkpoint_lenet', directory=config_ckpt_path, config=config_ck)

# 新建模型并训练
model = Model(construct, net_loss, net_opt, metrics={'Accuracy': Accuracy()})
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

# 评估模型性能
acc = model.eval(ds_valid)
print('============== {} ============='.format(acc))

加载预训练模型权重函数

def load_pretrained_weights(model, pretrained_path): # 加载PyTorch预训练模型的权重 pretrained_dict = torch.load(pretrained_path, map_location='cpu') # 将权重转换为MindSpore格式 ms_dict = {} for k, v in pretrained_dict.items(): ms_dict[k] = Tensor(mnp.array(v.numpy())) # 将权重赋值给新建模型的对应层 model.conv1.weight.set_data(ms_dict['conv1.weight']) model.conv1.bias.set_data(ms_dict['conv1.bias']) model.conv2.weight.set_data(ms_dict['conv2.weight']) model.conv2.bias.set_data(ms_dict['conv2.bias']) model.fc1.weight.set_data(ms_dict['fc1.weight'].transpose()) model.fc1.bias.set_data(ms_dict['fc1.bias'].reshape(-1,)) model.fc2.weight.set_data(ms_dict['fc2.weight'].transpose()) model.fc2.bias.set_data(ms_dict['fc2.bias'].reshape(-1,)) model.fc3.weight.set_data(ms_dict['fc3.weight'].transpose()) model.fc3.bias.set_data(ms_dict['fc3.bias'].reshape(-1,)) return model

if name == 'main': train_lenet()

LeNet5 模型训练与预训练权重加载 - MindSpore 实现

原文地址: https://www.cveoy.top/t/topic/mQEy 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录