import numpy as np
import mindspore.dataset as ds
import os
import cv2
import mindspore
import mindspore.nn as nn
from mindspore import Tensor
from mindspore.common.initializer import Normal
from mindspore import context
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.train import Model
from mindspore.nn.metrics import Accuracy
from mindspore.ops.operations import TensorAdd
from scipy.integrate._ivp.radau import P
from mindspore import Model # 承载网络结构
from mindspore.nn.metrics import Accuracy # 测试模型用

np.random.seed(58)


class BasicBlock(nn.Cell):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, pad_mode='pad',has_bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, pad_mode='pad', has_bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample
        self.add = TensorAdd()

    def construct(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.add(out, identity)
        out = self.relu(out)

        return out

class ResNet(nn.Cell):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, pad_mode='pad', has_bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
        self.layer1 = self.make_layer(block, 64, layers[0])
        self.layer2 = self.make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self.make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self.make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AvgPool2d(kernel_size=7, stride=1)
        self.flatten = nn.Flatten()
        self.fc = nn.Dense(512, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.SequentialCell([
                nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, has_bias=False),
                nn.BatchNorm2d(out_channels)
            ])

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))

        return nn.SequentialCell(layers)

    def construct(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = self.flatten(x)
        x = self.fc(x)

        return x


class TrainDatasetGenerator:
    def __init__(self, file_path):
        self.file_path = file_path
        self.img_names = os.listdir(file_path)

    def __getitem__(self, index):
        data = cv2.imread(os.path.join(self.file_path, self.img_names[index]))
        label = self.img_names[index].split('_')[0]
        label = int(label)
        data = cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
        data = cv2.resize(data, (224, 224))
        data = data.transpose().astype(np.float32) / 255.
        return data, label

    def __len__(self):
        return len(self.img_names)


def train_resnet():
    context.set_context(mode=context.GRAPH_MODE, device_target='CPU')
    train_dataset_generator = TrainDatasetGenerator('D:/pythonProject7/train1')
    ds_train = ds.GeneratorDataset(train_dataset_generator, ['data', 'label'], shuffle=True)
    ds_train = ds_train.shuffle(buffer_size=10)
    ds_train = ds_train.batch(batch_size=4, drop_remainder=True)
    valid_dataset_generator = TrainDatasetGenerator('D:/pythonProject7/test1')
    ds_valid = ds.GeneratorDataset(valid_dataset_generator, ['data', 'label'], shuffle=True)
    ds_valid = ds_valid.batch(batch_size=4, drop_remainder=True)
    network = ResNet(BasicBlock, [2, 2, 2, 2], num_classes=100)
    net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
    net_opt = nn.Momentum(network.trainable_params(), learning_rate=0.01, momentum=0.9)
    time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
    config_ck = CheckpointConfig(save_checkpoint_steps=10, keep_checkpoint_max=10)
    config_ckpt_path = 'D:/pythonProject7/ckpt/'
    ckpoint_cb = ModelCheckpoint(prefix='checkpoint_resnet', directory=config_ckpt_path, config=config_ck)

    model = Model(network, net_loss, net_opt, metrics={'Accuracy': Accuracy()})
    epoch_size = 10
    print('============== Starting Training =============')
    model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

    acc = model.eval(ds_valid)
    print('============== {} ============='.format(acc))
    epoch_size = 10
    print('============== Starting Training =============')
    model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

    acc = model.eval(ds_valid)
    print('============== {} ============='.format(acc))
    epoch_size = 10
    print('============== Starting Training =============')
    model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

    acc = model.eval(ds_valid)
    print('============== {} ============='.format(acc))

if __name__ == '__main__':
    train_resnet()


# 加载模型参数
model = mindspore.train.serialization.load_checkpoint('checkpoint_resnet-10_156.ckpt')
network = ResNet(BasicBlock, [2, 2, 2, 2], num_classes=100)
model.set_train(False)
model.load_parameters(model)

# 获取实时视频流
cap = cv2.VideoCapture(0)

# 图像预处理函数
def preprocess(frame):
    frame = cv2.resize(frame, (224, 224))
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame = frame.transpose().astype(np.float32) / 255.
    frame = np.expand_dims(frame, axis=0)
    return Tensor(frame)

# 实时人脸识别循环
while True:
    ret, frame = cap.read()
    if ret:
        input_data = preprocess(frame)
        output = model.predict(input_data)
        predict_label = np.argmax(output.asnumpy())
        cv2.putText(frame, str(predict_label), (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.imshow('frame', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

解释:

  1. 模型训练: 首先,使用提供的代码训练 ResNet 模型。训练完成后,模型参数将保存到 'checkpoint_resnet-10_156.ckpt' 文件中。
  2. 加载模型: 使用 mindspore.train.serialization.load_checkpoint 函数加载保存的模型参数。
  3. 获取视频流: 使用 cv2.VideoCapture(0) 获取摄像头视频流。
  4. 预处理: 使用 preprocess 函数对每一帧图像进行预处理,包括调整大小、颜色空间转换和归一化。
  5. 预测: 使用加载的模型进行预测,获得预测结果。
  6. 结果展示: 将预测结果显示在图像上。

注意:

  • 需要安装 opencv-python 库。
  • 替换代码中的 'checkpoint_resnet-10_156.ckpt' 为实际保存模型的路径。
  • 根据需要调整 preprocess 函数中的预处理参数。
  • 如果你的摄像头设备号不是 0,请修改 cv2.VideoCapture(0) 中的 0 为你的摄像头设备号。

代码运行流程:

  1. 运行代码,将开始从摄像头获取视频流。
  2. 对于每一帧图像,代码将进行预处理,并使用加载的模型进行预测。
  3. 预测结果将显示在图像上。
  4. 按下 'q' 键退出程序。

可能存在的问题:

  • 由于代码中使用的模型是在 CPU 上训练的,因此在实时识别过程中,可能会存在性能问题。
  • 如果使用的是 GPU,可以修改代码中的 context.set_context(mode=context.GRAPH_MODE, device_target='CPU')context.set_context(mode=context.GRAPH_MODE, device_target='GPU') 以提高性能。
  • 如果使用GPU,需要确保您的MindSpore环境已正确配置并安装了所需的驱动程序和库。

希望这份示例代码能够帮助您实现实时人脸识别。

MindSpore 实时人脸识别:基于 ResNet 模型

原文地址: https://www.cveoy.top/t/topic/mS6O 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录