import numpy as np
import mindspore.dataset as ds
import cv2
import mindspore.nn as nn
import os
from mindspore import context, ops, Tensor
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.train import Model
from mindspore.nn.metrics import Accuracy
np.random.seed(58)


class ResidualBlock(nn.Cell):
    expansion = 1
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride,   pad_mode='same')
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1,   pad_mode='same')
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample
        self.stride = stride

    def construct(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)
        out += identity
        out = self.relu(out)

        return out

class ResNet(nn.Cell):
    def __init__(self, block, layers, num_classes=34):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2,   pad_mode='valid')
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2,   pad_mode='valid')

        self.layer1 = self.make_layer(block, 64, layers[0])
        self.layer2 = self.make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self.make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self.make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AvgPool2d(kernel_size=3, stride=1,   pad_mode='valid')
        self.fc = nn.Dense(512 * block.expansion, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels * block.expansion):
            downsample = nn.SequentialCell([
                nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels * block.expansion)
            ])
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.SequentialCell(layers)

    def construct(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = ops.Reshape()(x, (ops.Shape()(x)[0], -1))
        x = self.fc(x)

        return x


class TrainDatasetGenerator:
    def __init__(self, file_path):
        self.file_path = file_path
        self.img_names = os.listdir(file_path)

    def __getitem__(self, index=0):
        data = cv2.imread(os.path.join(self.file_path, self.img_names[index]))
        label = int(self.img_names[index].split('-')[0])
        data = cv2.resize(data,(100,100))
        data = data.transpose().astype(np.float32) / 255.
        # data = np.expand_dims(data, axis=0)
        # data = Tensor(data)
        # label = Tensor(label)
        return data, label

    def __len__(self):
        return len(self.img_names)


def train_resnet():
    context.set_context(mode=context.GRAPH_MODE, device_target='CPU')
    train_dataset_generator = TrainDatasetGenerator('D:/pythonproject2/digital_mindspore/dataset')
    ds_train = ds.GeneratorDataset(train_dataset_generator, ['data', 'label'], shuffle=True)
    ds_train = ds_train.shuffle(buffer_size=10)
    ds_train = ds_train.batch(batch_size=4, drop_remainder=True)
    #valid_dataset_generator = TrainDatasetGenerator('D:/pythonproject2/test1')
    #ds_valid = ds.GeneratorDataset(valid_dataset_generator, ['data', 'label'], shuffle=True)
    #ds_valid = ds_valid.batch(batch_size=4, drop_remainder=True)
    network = ResNet(ResidualBlock,[2,2,2,2])
    net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
    net_opt = nn.Momentum(network.trainable_params(), learning_rate=0.01, momentum=0.9)
    time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
    config_ck = CheckpointConfig(save_checkpoint_steps=10,
                                 keep_checkpoint_max=10)
    config_ckpt_path = 'D:/pythonproject2/ckpt/'
    ckpoint_cb = ModelCheckpoint(prefix='checkpoint_resnet', directory=config_ckpt_path, config=config_ck)


    model = Model(network, net_loss, net_opt, metrics={'Accuracy': Accuracy()})
    epoch_size = 10
    print('============== Starting Training =============')
    model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])

    cap = cv2.VideoCapture(0)
    stop = False
    face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_alt.xml')  # 加载检测器
    while not stop:
        success, img = cap.read()
        subjects = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17',
                    '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33']
        # 生成图像的副本,这样就能保留原始图像
        img1 = img.copy()
        # 检测人脸
        # 将测试图像转换为灰度图像,因为opencv人脸检测器需要灰度图像
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        # 检测多尺度图像,返回值是一张脸部区域信息的列表(x,y,宽,高)
        rect = face_cascade.detectMultiScale(img, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30),
                                             flags=cv2.CASCADE_SCALE_IMAGE)
        # 如果未检测到面部
        if len(rect) == 0:
            txt = 'no face!'
            cv2.putText(img1, txt, (10, 20), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
        if not rect is None:
            for (x, y, w, h) in rect:
                face = gray[y:y + w, x:x + h].astype(np.float32)  # 数值转换
                face = cv2.resize(face, (100, 100))
                cv2.rectangle(img1, (x, y), (x + w, y + h), (0, 255, 0), 2)  # 画出矩形框
                k = cv2.waitKey(100)  # 每0.1秒读取一次键盘
                min_d = 1000000000000
                c = -1
                for  f in ds_train.create_dict_iterator(output_numpy=True):  # i,f是训练数据的信息,遍历训练数据
                    d = ((face - f['data']) ** 2).sum()  # 计算test和训练图片的欧氏距离
                    if d < min_d:
                        min_d = d
                        c = f['label']
                if min_d < 10000000:
                    cv2.putText(img1, subjects[c], (x, y), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
                else:
                    label = 'unknown'
                    cv2.putText(img1, label, (x, y), cv2.FONT_HERSHEY_COMPLEX, 1, (128, 128, 0), 2)
                cv2.imshow('img', img1)
                cv2.waitKey(1)
                if c & 0xFF == ord('q'):  # 按下q程序结束
                    stop = True
                    cv2.destroyAllWindows()  # 释放窗口
if __name__ == '__main__':
    train_resnet()

ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()
解决错误代码示例
内容:该错误通常是由于numpy数组中含有多个元素导致的,因此需要对其进行any()或all()操作。在此处可能是在for循环中遍历ds_train时,其中某个元素返回的是一个包含多个元素的数组,因此需要使用any()或all()进行判断。可以将if语句改为:

if (f['label'] == c).any():
MindSpore ResNet人脸识别模型训练与实时识别

原文地址: http://www.cveoy.top/t/topic/mT6P 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录