import numpy as np import mindspore.dataset as ds import os import cv2 import mindspore import mindspore.nn as nn from mindspore import Tensor from mindspore.common.initializer import Normal from mindspore import context from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor from mindspore.train import Model from mindspore.nn.metrics import Accuracy from mindspore.ops.operations import TensorAdd from scipy.integrate._ivp.radau import P from mindspore import Model # 承载网络结构 from mindspore.nn.metrics import Accuracy # 测试模型用

np.random.seed(58)

class BasicBlock(nn.Cell): def init(self, in_channels, out_channels, stride=1, downsample=None): super(BasicBlock, self).init() self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, pad_mode='pad',has_bias=False) self.bn1 = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, pad_mode='pad', has_bias=False) self.bn2 = nn.BatchNorm2d(out_channels) self.downsample = downsample self.add = TensorAdd()

def construct(self, x):
    identity = x

    out = self.conv1(x)
    out = self.bn1(out)
    out = self.relu(out)

    out = self.conv2(out)
    out = self.bn2(out)

    if self.downsample is not None:
        identity = self.downsample(x)

    out = self.add(out, identity)
    out = self.relu(out)

    return out

class ResNet(nn.Cell): def init(self, block, layers, num_classes=10): super(ResNet, self).init() self.in_channels = 64

    self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, pad_mode='pad', has_bias=False)
    self.bn1 = nn.BatchNorm2d(64)
    self.relu = nn.ReLU()
    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
    self.layer1 = self.make_layer(block, 64, layers[0])
    self.layer2 = self.make_layer(block, 128, layers[1], stride=2)
    self.layer3 = self.make_layer(block, 256, layers[2], stride=2)
    self.layer4 = self.make_layer(block, 512, layers[3], stride=2)
    self.avgpool = nn.AvgPool2d(kernel_size=7, stride=1)
    self.flatten = nn.Flatten()
    self.fc = nn.Dense(512, num_classes)

def make_layer(self, block, out_channels, blocks, stride=1):
    downsample = None
    if stride != 1 or self.in_channels != out_channels:
        downsample = nn.SequentialCell([
            nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, has_bias=False),
            nn.BatchNorm2d(out_channels)
        ])

    layers = []
    layers.append(block(self.in_channels, out_channels, stride, downsample))
    self.in_channels = out_channels
    for _ in range(1, blocks):
        layers.append(block(out_channels, out_channels))

    return nn.SequentialCell(layers)

def construct(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.maxpool(x)

    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)

    x = self.avgpool(x)
    x = self.flatten(x)
    x = self.fc(x)

    return x

class TrainDatasetGenerator: def init(self, file_path): self.file_path = file_path self.img_names = os.listdir(file_path)

def __getitem__(self, index):
    data = cv2.imread(os.path.join(self.file_path, self.img_names[index]))
    label = self.img_names[index].split('_')[0]
    label = int(label)
    data = cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
    data = cv2.resize(data, (224, 224))
    data = data.transpose().astype(np.float32) / 255.
    return data, label

def __len__(self):
    return len(self.img_names)

def train_resnet(): context.set_context(mode=context.GRAPH_MODE, device_target='CPU') train_dataset_generator = TrainDatasetGenerator('D:/pythonProject7/train1') ds_train = ds.GeneratorDataset(train_dataset_generator, ['data', 'label'], shuffle=True) ds_train = ds_train.shuffle(buffer_size=10) ds_train = ds_train.batch(batch_size=4, drop_remainder=True) valid_dataset_generator = TrainDatasetGenerator('D:/pythonProject7/test1') ds_valid = ds.GeneratorDataset(valid_dataset_generator, ['data', 'label'], shuffle=True) ds_valid = ds_valid.batch(batch_size=4, drop_remainder=True) network = ResNet(BasicBlock, [2, 2, 2, 2], num_classes=100) net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') net_opt = nn.Momentum(network.trainable_params(), learning_rate=0.001, momentum=0.9) time_cb = TimeMonitor(data_size=ds_train.get_dataset_size()) config_ck = CheckpointConfig(save_checkpoint_steps=10, keep_checkpoint_max=10) config_ckpt_path = 'D:/pythonProject7/ckpt/' ckpoint_cb = ModelCheckpoint(prefix='checkpoint_resnet', directory=config_ckpt_path, config=config_ck)

model = Model(network, net_loss, net_opt, metrics={'Accuracy': Accuracy()})
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

acc = model.eval(ds_valid)
print('============== {} ============='.format(acc))
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

acc = model.eval(ds_valid)
print('============== {} ============='.format(acc))
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

acc = model.eval(ds_valid)
print('============== {} ============='.format(acc))

if name == 'main': train_resnet()

如何选出最佳模型参数文件ckpt以供后续人脸识别内容:可以使用MindSpore提供的ModelCheckpoint回调函数,每训练一定步数就会保存一次模型参数文件ckpt,可以根据训练过程中验证集的表现来选出最佳的模型参数文件ckpt。可以使用MindSpore提供的LoadCheckpoint函数来加载已经保存的模型参数文件ckpt,然后进行人脸识别任务。

基于MindSpore的ResNet人脸识别模型训练与评估

原文地址: https://www.cveoy.top/t/topic/jqxu 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录