from collections import defaultdict, Counter

import numpy as np import mindspore.dataset as ds import cv2 import mindspore.nn as nn import os from mindspore import context, ops, Tensor from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor from mindspore.train import Model from mindspore.nn.metrics import Accuracy np.random.seed(58)

class ResidualBlock(nn.Cell): expansion = 1 def init(self, in_channels, out_channels, stride=1, downsample=None): super(ResidualBlock, self).init() self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, pad_mode='same') self.bn1 = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, pad_mode='same') self.bn2 = nn.BatchNorm2d(out_channels) self.downsample = downsample self.stride = stride

def construct(self, x):
    identity = x

    out = self.conv1(x)
    out = self.bn1(out)
    out = self.relu(out)

    out = self.conv2(out)
    out = self.bn2(out)

    if self.downsample is not None:
        identity = self.downsample(x)
    out += identity
    out = self.relu(out)

    return out

class ResNet(nn.Cell): def init(self, block, layers, num_classes=34): super(ResNet, self).init() self.in_channels = 64

    self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2,   pad_mode='valid')
    self.bn1 = nn.BatchNorm2d(64)
    self.relu = nn.ReLU()
    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2,   pad_mode='valid')

    self.layer1 = self.make_layer(block, 64, layers[0])
    self.layer2 = self.make_layer(block, 128, layers[1], stride=2)
    self.layer3 = self.make_layer(block, 256, layers[2], stride=2)
    self.layer4 = self.make_layer(block, 512, layers[3], stride=2)

    self.avgpool = nn.AvgPool2d(kernel_size=3, stride=1,   pad_mode='valid')
    self.fc = nn.Dense(512 * block.expansion, num_classes)

def make_layer(self, block, out_channels, blocks, stride=1):
    downsample = None
    if (stride != 1) or (self.in_channels != out_channels * block.expansion):
        downsample = nn.SequentialCell([
            nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride),
            nn.BatchNorm2d(out_channels * block.expansion)
        ])
    layers = []
    layers.append(block(self.in_channels, out_channels, stride, downsample))
    self.in_channels = out_channels * block.expansion
    for _ in range(1, blocks):
        layers.append(block(self.in_channels, out_channels))
    return nn.SequentialCell(layers)

def construct(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.maxpool(x)

    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)

    x = self.avgpool(x)
    x = ops.Reshape()(x, (ops.Shape()(x)[0], -1))
    x = self.fc(x)

    return x

class TrainDatasetGenerator: def init(self, file_path): self.file_path = file_path self.img_names = os.listdir(file_path)

def __getitem__(self, index=0):
    data = cv2.imread(os.path.join(self.file_path, self.img_names[index]))
    label = int(self.img_names[index].split('-')[0])
    data = cv2.resize(data,(100,100))
    data = data.transpose().astype(np.float32) / 255.
    # data = np.expand_dims(data, axis=0)
    # data = Tensor(data)
    # label = Tensor(label)
    return data, label

def __len__(self):
    return len(self.img_names)

def train_resnet(): context.set_context(mode=context.GRAPH_MODE, device_target='CPU') train_dataset_generator = TrainDatasetGenerator('D:/pythonproject2/digital_mindspore/dataset') ds_train = ds.GeneratorDataset(train_dataset_generator, ['data', 'label'], shuffle=True) ds_train = ds_train.shuffle(buffer_size=10) ds_train = ds_train.batch(batch_size=4, drop_remainder=True) #valid_dataset_generator = TrainDatasetGenerator('D:/pythonproject2/test1') #ds_valid = ds.GeneratorDataset(valid_dataset_generator, ['data', 'label'], shuffle=True) #ds_valid = ds_valid.batch(batch_size=4, drop_remainder=True) network = ResNet(ResidualBlock,[2,2,2,2]) net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') net_opt = nn.Momentum(network.trainable_params(), learning_rate=0.01, momentum=0.9) time_cb = TimeMonitor(data_size=ds_train.get_dataset_size()) config_ck = CheckpointConfig(save_checkpoint_steps=10, keep_checkpoint_max=10) config_ckpt_path = 'D:/pythonproject2/ckpt/' ckpoint_cb = ModelCheckpoint(prefix='checkpoint_resnet', directory=config_ckpt_path, config=config_ck)

model = Model(network, net_loss, net_opt, metrics={'Accuracy': Accuracy()})
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

cap = cv2.VideoCapture(0)
stop = False
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_alt.xml')  # 加载检测器
while not stop:
    success, img = cap.read()
    subjects = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17',
                '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33']
    # 生成图像的副本,这样就能保留原始图像
    img1 = img.copy()
    # 检测人脸
    # 将测试图像转换为灰度图像,因为opencv人脸检测器需要灰度图像
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    # 检测多尺度图像,返回值是一张脸部区域信息的列表(x,y,宽,高)
    rect = face_cascade.detectMultiScale(img, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30),
                                         flags=cv2.CASCADE_SCALE_IMAGE)
    # 如果未检测到面部
    if len(rect) == 0:
        txt = 'no face!'
        cv2.putText(img1, txt, (10, 20), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
    if not rect is None:
        for (x, y, w, h) in rect:
            face = gray[y:y + w, x:x + h].astype(np.float32)  # 数值转换
            face = cv2.resize(face, (100, 100))
            cv2.rectangle(img1, (x, y), (x + w, y + h), (0, 255, 0), 2)  # 画出矩形框
            k = cv2.waitKey(100)  # 每0.1秒读取一次键盘
            min_d = 1000000000000
            c = -1
            for f in ds_train.create_dict_iterator(output_numpy=True):  # i,f是训练数据的信息,遍历训练数据
                d = ((face - f['data']) ** 2).sum()  # 计算test和训练图片的欧氏距离
                if d < min_d:
                    min_d = d
                    label1 = f['label']
                    label_counter = Counter(label1)
                    most_common_label = label_counter.most_common(1)[0][0]
                    c = most_common_label
            if min_d < 200000000000:
                cv2.putText(img1, subjects[c], (x, y), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
            else:
                label = 'unknown'
                cv2.putText(img1, label, (x, y), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
            cv2.imshow('img', img1)
            cv2.waitKey(1)
            if (c & 0xFF == ord('q')):  # 按下q程序结束
                stop = True
                cv2.destroyAllWindows()  # 释放窗口

if name == 'main': train_resnet()

摄像头画面非常卡内容:可能是因为计算机性能不足,导致实时处理摄像头画面的速度跟不上。建议优化算法或者升级计算机硬件。

基于MindSpore的ResNet人脸识别模型训练和实时识别

原文地址: https://www.cveoy.top/t/topic/mVe2 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录