import numpy as np
import mindspore.dataset as ds
import cv2
import mindspore.nn as nn
import os
from mindspore import context, ops, Tensor
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.train import Model
from mindspore.nn.metrics import Accuracy
np.random.seed(58)


class ResidualBlock(nn.Cell):
    expansion = 1
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, pad_mode='same')
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, pad_mode='same')
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample
        self.stride = stride

    def construct(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)
        out += identity
        out = self.relu(out)

        return out

class ResNet(nn.Cell):
    def __init__(self, block, layers, num_classes=34):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2,   pad_mode='valid')
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2,   pad_mode='valid')

        self.layer1 = self.make_layer(block, 64, layers[0])
        self.layer2 = self.make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self.make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self.make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AvgPool2d(kernel_size=3, stride=1,   pad_mode='valid')
        self.fc = nn.Dense(512 * block.expansion, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels * block.expansion):
            downsample = nn.SequentialCell([
                nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels * block.expansion)
            ])
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.SequentialCell(layers)

    def construct(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = ops.Reshape()(x, (ops.Shape()(x)[0], -1))
        x = self.fc(x)

        return x


class TrainDatasetGenerator:
    def __init__(self, file_path):
        self.file_path = file_path
        self.img_names = os.listdir(file_path)

    def __getitem__(self, index=0):
        data = cv2.imread(os.path.join(self.file_path, self.img_names[index]))
        # 使用正则表达式提取标签,避免文件名格式问题
        import re
        match = re.match(r'(\d+)', self.img_names[index])
        if match:
            label = int(match.group(1))
        else:
            raise ValueError(f'无法从文件名 {self.img_names[index]} 中提取标签')
        data = cv2.resize(data, (100, 100))
        data = data.transpose().astype(np.float32) / 255.
        return data, label

    def __len__(self):
        return len(self.img_names)

def train_resnet():
    context.set_context(mode=context.GRAPH_MODE, device_target='CPU')
    train_dataset_generator = TrainDatasetGenerator('E:/program/miniconda/envs/MindSpore/exam')
    ds_train = ds.GeneratorDataset(train_dataset_generator, ['data', 'label'], shuffle=True)
    ds_train = ds_train.shuffle(buffer_size=10)
    ds_train = ds_train.batch(batch_size=4, drop_remainder=True)
    valid_dataset_generator = TrainDatasetGenerator('E:/program/miniconda/envs/MindSpore/test')
    ds_valid = ds.GeneratorDataset(valid_dataset_generator, ['data', 'label'], shuffle=True)
    ds_valid = ds_valid.batch(batch_size=4, drop_remainder=True)
    network = ResNet(ResidualBlock, [2, 2, 2, 2])
    net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
    net_opt = nn.Momentum(network.trainable_params(), learning_rate=0.001, momentum=0.9)
    time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
    config_ck = CheckpointConfig(save_checkpoint_steps=10, keep_checkpoint_max=10)
    config_ckpt_path = 'E:/program/miniconda/envs/MindSpore/ckpt1/'
    ckpoint_cb = ModelCheckpoint(prefix='checkpoint_resnet', directory=config_ckpt_path, config=config_ck)

    model = Model(network, net_loss, net_opt, metrics={'Accuracy': Accuracy()})

    epoch_size = 10
    print('============== Starting Training ==============')
    # 训练模型
    model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()], dataset_sink_mode=True)

    acc = model.eval(ds_valid)
    print('============== {} =============='.format(acc))
    epoch_size = 10
    print('============== Starting Training ==============')
    model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()], dataset_sink_mode=True)

    acc = model.eval(ds_valid)
    print('============== {} =============='.format(acc))
    epoch_size = 10
    print('============== Starting Training ==============')
    model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()], dataset_sink_mode=True)

    acc = model.eval(ds_valid)
    print('============== {} =============='.format(acc))


if __name__ == '__main__':
    train_resnet()

基于MindSpore框架的ResNet图像分类实战教程

原文地址: https://www.cveoy.top/t/topic/jrHI 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录