基于MindSpore的ResNet人脸识别模型训练和实时识别
from collections import defaultdict, Counter
import numpy as np import mindspore.dataset as ds import cv2 import mindspore.nn as nn import os from mindspore import context, ops, Tensor from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor from mindspore.train import Model from mindspore.nn.metrics import Accuracy np.random.seed(58)
class ResidualBlock(nn.Cell): expansion = 1 def init(self, in_channels, out_channels, stride=1, downsample=None): super(ResidualBlock, self).init() self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, pad_mode='same') self.bn1 = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU() self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, pad_mode='same') self.bn2 = nn.BatchNorm2d(out_channels) self.downsample = downsample self.stride = stride
def construct(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class ResNet(nn.Cell): def init(self, block, layers, num_classes=34): super(ResNet, self).init() self.in_channels = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, pad_mode='valid')
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU()
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='valid')
self.layer1 = self.make_layer(block, 64, layers[0])
self.layer2 = self.make_layer(block, 128, layers[1], stride=2)
self.layer3 = self.make_layer(block, 256, layers[2], stride=2)
self.layer4 = self.make_layer(block, 512, layers[3], stride=2)
self.avgpool = nn.AvgPool2d(kernel_size=3, stride=1, pad_mode='valid')
self.fc = nn.Dense(512 * block.expansion, num_classes)
def make_layer(self, block, out_channels, blocks, stride=1):
downsample = None
if (stride != 1) or (self.in_channels != out_channels * block.expansion):
downsample = nn.SequentialCell([
nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride),
nn.BatchNorm2d(out_channels * block.expansion)
])
layers = []
layers.append(block(self.in_channels, out_channels, stride, downsample))
self.in_channels = out_channels * block.expansion
for _ in range(1, blocks):
layers.append(block(self.in_channels, out_channels))
return nn.SequentialCell(layers)
def construct(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = ops.Reshape()(x, (ops.Shape()(x)[0], -1))
x = self.fc(x)
return x
class TrainDatasetGenerator: def init(self, file_path): self.file_path = file_path self.img_names = os.listdir(file_path)
def __getitem__(self, index=0):
data = cv2.imread(os.path.join(self.file_path, self.img_names[index]))
label = int(self.img_names[index].split('-')[0])
data = cv2.resize(data,(100,100))
data = data.transpose().astype(np.float32) / 255.
# data = np.expand_dims(data, axis=0)
# data = Tensor(data)
# label = Tensor(label)
return data, label
def __len__(self):
return len(self.img_names)
def train_resnet(): context.set_context(mode=context.GRAPH_MODE, device_target='CPU') train_dataset_generator = TrainDatasetGenerator('D:/pythonproject2/digital_mindspore/dataset') ds_train = ds.GeneratorDataset(train_dataset_generator, ['data', 'label'], shuffle=True) ds_train = ds_train.shuffle(buffer_size=10) ds_train = ds_train.batch(batch_size=4, drop_remainder=True) #valid_dataset_generator = TrainDatasetGenerator('D:/pythonproject2/test1') #ds_valid = ds.GeneratorDataset(valid_dataset_generator, ['data', 'label'], shuffle=True) #ds_valid = ds_valid.batch(batch_size=4, drop_remainder=True) network = ResNet(ResidualBlock,[2,2,2,2]) net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') net_opt = nn.Momentum(network.trainable_params(), learning_rate=0.01, momentum=0.9) time_cb = TimeMonitor(data_size=ds_train.get_dataset_size()) config_ck = CheckpointConfig(save_checkpoint_steps=10, keep_checkpoint_max=10) config_ckpt_path = 'D:/pythonproject2/ckpt/' ckpoint_cb = ModelCheckpoint(prefix='checkpoint_resnet', directory=config_ckpt_path, config=config_ck)
model = Model(network, net_loss, net_opt, metrics={'Accuracy': Accuracy()})
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])
cap = cv2.VideoCapture(0)
stop = False
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_alt.xml') # 加载检测器
while not stop:
success, img = cap.read()
subjects = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17',
'18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33']
# 生成图像的副本,这样就能保留原始图像
img1 = img.copy()
# 检测人脸
# 将测试图像转换为灰度图像,因为opencv人脸检测器需要灰度图像
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 检测多尺度图像,返回值是一张脸部区域信息的列表(x,y,宽,高)
rect = face_cascade.detectMultiScale(img, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30),
flags=cv2.CASCADE_SCALE_IMAGE)
# 如果未检测到面部
if len(rect) == 0:
txt = 'no face!'
cv2.putText(img1, txt, (10, 20), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
if not rect is None:
for (x, y, w, h) in rect:
face = gray[y:y + w, x:x + h].astype(np.float32) # 数值转换
face = cv2.resize(face, (100, 100))
cv2.rectangle(img1, (x, y), (x + w, y + h), (0, 255, 0), 2) # 画出矩形框
k = cv2.waitKey(100) # 每0.1秒读取一次键盘
min_d = 1000000000000
c = -1
for f in ds_train.create_dict_iterator(output_numpy=True): # i,f是训练数据的信息,遍历训练数据
d = ((face - f['data']) ** 2).sum() # 计算test和训练图片的欧氏距离
if d < min_d:
min_d = d
label1 = f['label']
label_counter = Counter(label1)
most_common_label = label_counter.most_common(1)[0][0]
c = most_common_label
if min_d < 200000000000:
cv2.putText(img1, subjects[c], (x, y), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
else:
label = 'unknown'
cv2.putText(img1, label, (x, y), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
cv2.imshow('img', img1)
cv2.waitKey(1)
if (c & 0xFF == ord('q')): # 按下q程序结束
stop = True
cv2.destroyAllWindows() # 释放窗口
if name == 'main': train_resnet()
摄像头画面非常卡内容:可能是因为计算机性能不足,导致实时处理摄像头画面的速度跟不上。建议优化算法或者升级计算机硬件。
原文地址: https://www.cveoy.top/t/topic/mVe2 著作权归作者所有。请勿转载和采集!