import numpy as np
import mindspore.dataset as ds
import os
import cv2
import mindspore
import mindspore.nn as nn
from mindspore import Tensor, load_checkpoint, load_param_into_net
from mindspore.common.initializer import Normal
from mindspore import context
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.ops.operations import TensorAdd
from mindspore import Model # 承载网络结构
from mindspore.nn.metrics import Accuracy # 测试模型用

np.random.seed(58)


class BasicBlock(nn.Cell):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, pad_mode='pad',has_bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, pad_mode='pad', has_bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample
        self.add = TensorAdd()

    def construct(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.add(out, identity)
        out = self.relu(out)

        return out

class ResNet(nn.Cell):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, pad_mode='pad', has_bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
        self.layer1 = self.make_layer(block, 64, layers[0])
        self.layer2 = self.make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self.make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self.make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AvgPool2d(kernel_size=7, stride=1, pad_mode='valid')  # 修改这里
        self.flatten = nn.Flatten()
        self.fc = nn.Dense(512, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.SequentialCell([
                nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, has_bias=False),
                nn.BatchNorm2d(out_channels)
            ])

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))

        return nn.SequentialCell(layers)

    def construct(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = self.flatten(x)
        x = self.fc(x)

        return x


class TrainDatasetGenerator:
    def __init__(self, file_path):
        self.file_path = file_path
        self.img_names = os.listdir(file_path)

    def __getitem__(self, index):
        data = cv2.imread(os.path.join(self.file_path, self.img_names[index]))
        label = self.img_names[index].split('_')[0]
        label = int(label)
        data = cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
        data = cv2.resize(data, (224, 224))
        data = data.transpose().astype(np.float32) / 255.
        return data, label

    def __len__(self):
        return len(self.img_names)

def load_model_from_ckpt():
    context.set_context(mode=context.GRAPH_MODE, device_target='CPU')
    # 创建ResNet模型
    network = ResNet(BasicBlock,[2,2,2,2])
    # 加载ckpt文件中的模型参数
    param_dict = load_checkpoint('D:/pythonProject7/ckpt/checkpoint_resnet_34-8_25.ckpt')
     #将模型参数加载到模型中
    load_param_into_net(network, param_dict)
    # 返回模型
    return network


def train_resnet():
    context.set_context(mode=context.GRAPH_MODE, device_target='CPU')
    train_dataset_generator = TrainDatasetGenerator('D:/pythonProject7/train1')
    ds_train = ds.GeneratorDataset(train_dataset_generator, ['data', 'label'], shuffle=True)
    ds_train = ds_train.shuffle(buffer_size=10)
    ds_train = ds_train.batch(batch_size=4, drop_remainder=True)
    valid_dataset_generator = TrainDatasetGenerator('D:/pythonProject7/test1')
    ds_valid = ds.GeneratorDataset(valid_dataset_generator, ['data', 'label'], shuffle=True)
    ds_valid = ds_valid.batch(batch_size=4, drop_remainder=True)
    network = ResNet(BasicBlock, [2, 2, 2, 2], num_classes=100)
    net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
    net_opt = nn.Momentum(network.trainable_params(), learning_rate=0.001, momentum=0.9)
    time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
    config_ck = CheckpointConfig(save_checkpoint_steps=10, keep_checkpoint_max=10)
    config_ckpt_path = 'D:/pythonProject7/ckpt/'
    ckpoint_cb = ModelCheckpoint(prefix='checkpoint_resnet', directory=config_ckpt_path, config=config_ck)


    model = Model(network, net_loss, net_opt, metrics={'Accuracy': Accuracy()})
    epoch_size = 10
    print('============== Starting Training =============')
    model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

    acc = model.eval(ds_valid)
    print('============== {} ============='.format(acc))
    epoch_size = 10
    print('============== Starting Training =============')
    model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

    acc = model.eval(ds_valid)
    print('============== {} ============='.format(acc))
    epoch_size = 10
    print('============== Starting Training =============')
    model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

    acc = model.eval(ds_valid)
    print('============== {} ============='.format(acc))


    face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_alt.xml')  # 加载检测器
    threshold = 0.95  # 设置阈值

    cap = cv2.VideoCapture(0)
    stop = False
    while not stop:
        success, img = cap.read()
        subjects = ['1', '2', '3', '4', '5', '6', '7', '8', '9','10', 'unknown']
        # 生成图像的副本,这样就能保留原始图像
        img1 = img.copy()
        # 检测人脸
        # 将测试图像转换为灰度图像,因为opencv人脸检测器需要灰度图像
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        # 检测多尺度图像,返回值是一张脸部区域信息的列表(x,y,宽,高)
        rect = face_cascade.detectMultiScale(img, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30), 
                                             flags=cv2.CASCADE_SCALE_IMAGE)
        # 如果未检测到面部
        if len(rect) == 0:
            txt = 'no face!'
            cv2.putText(img1, txt, (10, 20), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
        if not rect is None:
            for (x, y, w, h) in rect:
                face = img[y:y + w, x:x + h].astype(np.float32)  # 数值转换
                face = cv2.resize(face, (100, 100))
                face = face.transpose().astype(np.float32) / 255.
                face = np.expand_dims(face, axis=0)  # 扩展维度,变成(batch_size, channels, height, width)
                face = Tensor(face)
                cv2.rectangle(img1, (x, y), (x + w, y + h), (0, 255, 0), 2)  # 画出矩形框
                output = network(face)
                predicted_class = np.argmax(output.asnumpy(), axis=1)
                if output.asnumpy()[0][predicted_class[0]] < threshold:
                    label = 'unknown'
                else:
                    label = subjects[predicted_class[0]]
                # label = subjects[predicted_class[0]]
                cv2.putText(img1, label, (x, y), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
        cv2.imshow('img', img1)
        if (cv2.waitKey(1) & 0xFF == ord('q')):  # 按下q程序结束
            stop = True
            cv2.destroyAllWindows()  # 释放窗口


if __name__ == '__main__':
    train_resnet()

代码说明:

  1. 导入必要的库:

    • numpy: 用于数值计算
    • mindspore.dataset: 用于加载和处理数据集
    • os: 用于文件操作
    • cv2: 用于图像处理
    • mindspore: MindSpore框架
    • mindspore.nn: 神经网络模块
    • mindspore.common.initializer: 用于参数初始化
    • mindspore.train.callback: 用于训练过程的回调函数
    • mindspore.ops.operations: 用于自定义操作
    • mindspore.train.Model: 用于构建模型
    • mindspore.nn.metrics: 用于评估模型性能
  2. 定义BasicBlock类:

    • 定义ResNet-34模型的基本块,包含卷积层、批归一化层、ReLU激活函数和残差连接
  3. 定义ResNet类:

    • 定义ResNet-34模型,包含卷积层、批归一化层、ReLU激活函数、最大池化层、多个BasicBlock层、平均池化层、扁平化层和全连接层
  4. 定义TrainDatasetGenerator类:

    • 用于生成训练和验证数据集
    • 读取图像文件,进行预处理(颜色空间转换、尺寸调整、归一化),并返回图像数据和标签
  5. 定义load_model_from_ckpt函数:

    • 从ckpt文件加载训练好的模型参数
  6. 定义train_resnet函数:

    • 构建训练和验证数据集
    • 构建ResNet模型
    • 定义损失函数、优化器和回调函数
    • 使用Model类训练模型
    • 评估模型性能
  7. 实时识别部分:

    • 加载训练好的模型
    • 使用摄像头捕捉图像
    • 使用OpenCV的人脸检测器识别图像中的人脸
    • 将识别到的人脸图像进行预处理并输入模型进行预测
    • 将预测结果显示在图像上

修改后的AvgPool层参数:

在代码中,我们将self.avgpool = nn.AvgPool2d(kernel_size=8, stride=1, pad_mode='valid')修改为self.avgpool = nn.AvgPool2d(kernel_size=7, stride=1, pad_mode='valid'),这是因为输入特征图的大小为[4, 512, 7, 7],而kernel_size设置为8会导致输出特征图的尺寸为[4, 512, 0, 0],出现错误。

运行代码:

  1. 确保安装了MindSpore框架和其他依赖库
  2. 将代码中的文件路径修改为您的实际路径
  3. 运行代码,程序将开始训练ResNet模型,并使用摄像头进行实时人脸识别

注意:

  • 本代码仅供参考,您需要根据自己的需求进行调整和修改
  • 训练ResNet模型需要较长的训练时间
  • 实时识别速度受硬件性能影响
ResNet-34 人脸识别模型:基于MindSpore的训练和实时识别

原文地址: https://www.cveoy.top/t/topic/jrnO 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录