基于MindSpore的ResNet图像分类模型训练
import numpy as np
import mindspore.dataset as ds
import cv2
import mindspore.nn as nn
import os
from mindspore import context, ops
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.train import Model
from mindspore.nn.metrics import Accuracy
np.random.seed(58)
class ResidualBlock(nn.Cell):
expansion = 1
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super(ResidualBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, pad_mode='same')
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU()
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, pad_mode='same')
self.bn2 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
self.stride = stride
def construct(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class ResNet(nn.Cell):
def __init__(self, block, layers, num_classes=34):
super(ResNet, self).__init__()
self.in_channels = 64
self.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, pad_mode='valid')
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU()
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='valid')
self.layer1 = self.make_layer(block, 64, layers[0])
self.layer2 = self.make_layer(block, 128, layers[1], stride=2)
self.layer3 = self.make_layer(block, 256, layers[2], stride=2)
self.layer4 = self.make_layer(block, 512, layers[3], stride=2)
self.avgpool = nn.AvgPool2d(kernel_size=3, stride=1, pad_mode='valid')
self.fc = nn.Dense(512 * block.expansion, num_classes)
def make_layer(self, block, out_channels, blocks, stride=1):
downsample = None
if (stride != 1) or (self.in_channels != out_channels * block.expansion):
downsample = nn.SequentialCell([
nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride),
nn.BatchNorm2d(out_channels * block.expansion)
])
layers = []
layers.append(block(self.in_channels, out_channels, stride, downsample))
self.in_channels = out_channels * block.expansion
for _ in range(1, blocks):
layers.append(block(self.in_channels, out_channels))
return nn.SequentialCell(layers)
def construct(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = ops.Reshape()(x, (ops.Shape()(x)[0], -1))
x = self.fc(x)
return x
class TrainDatasetGenerator:
def __init__(self, file_path):
self.file_path = file_path
self.img_names = os.listdir(file_path)
def __call__(self, index=0):
data = cv2.imread(os.path.join(self.file_path, self.img_names[index]))
label = int(self.img_names[index][0])-1
data = cv2.resize(data, (100, 100))
data = data.transpose().astype(np.float32) / 255.
return data, np.array([label])
def __len__(self):
return len(self.img_names)
def train_resnet():
context.set_context(mode=context.GRAPH_MODE, device_target='CPU')
train_dataset_generator = TrainDatasetGenerator('D:/pythonproject2/digital_mindspore/dataset')
ds_train = ds.GeneratorDataset(train_dataset_generator, ['data', 'label'], shuffle=True)
ds_train = ds_train.shuffle(buffer_size=10)
ds_train = ds_train.batch(batch_size=4, drop_remainder=True)
valid_dataset_generator = TrainDatasetGenerator('D:/pythonproject2/test1')
ds_valid = ds.GeneratorDataset(valid_dataset_generator, ['data', 'label'], shuffle=True)
ds_valid = ds_valid.batch(batch_size=4, drop_remainder=True)
network = ResNet(ResidualBlock,[2,2,2,2])
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
net_opt = nn.Momentum(network.trainable_params(), learning_rate=0.01, momentum=0.9)
time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
config_ck = CheckpointConfig(save_checkpoint_steps=10,
keep_checkpoint_max=10)
config_ckpt_path = 'D:/pythonproject2/ckpt/'
ckpoint_cb = ModelCheckpoint(prefix='checkpoint_lenet', directory=config_ckpt_path, config=config_ck)
model = Model(network, net_loss, net_opt, metrics={'Accuracy': Accuracy()})
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])
acc = model.eval(ds_valid)
print('============== {} ============='.format(acc))
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])
acc = model.eval(ds_valid)
print('============== {} ============='.format(acc))
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])
acc = model.eval(ds_valid)
print('============== {} ============='.format(acc))
if __name__ == '__main__':
train_resnet()
代码说明:
- 模型定义: 代码定义了ResNet模型,包括残差块(ResidualBlock)和整体ResNet网络结构。
- 数据集加载: 使用自定义的
TrainDatasetGenerator类加载训练数据集和验证数据集,并将数据处理成MindSpore可以识别的格式。 - 训练过程: 代码使用MindSpore的
Model类进行模型训练,并设置了学习率、优化器、损失函数等参数,使用ModelCheckpoint回调函数保存模型训练过程中的中间结果。 - 评估: 使用训练好的模型对验证数据集进行评估,并打印出模型的准确率。
代码优化:
- 将代码中所有双引号改为单引号,使其符合Python代码规范。
- 对代码进行了适当的格式化,使代码更易读。
- 添加了简要的代码说明,方便读者理解代码的功能。
SEO优化:
- 标题: 使用更加具体和描述性的标题,例如“基于MindSpore的ResNet图像分类模型训练”。
- 描述: 添加简短的代码描述,例如“使用MindSpore框架搭建ResNet模型,并使用图像数据集进行训练,实现图像分类任务。”
- 关键词: 添加与代码内容相关的关键词,例如“MindSpore, ResNet, 图像分类, 深度学习, 训练”。
- 内容: 添加代码的详细说明,解释代码的功能和实现细节。
其他建议:
- 建议您将代码中的文件路径改为相对路径,以便代码更易移植。
- 建议您添加代码的测试用例,确保代码的正确性和稳定性。
- 建议您使用可视化工具(例如TensorBoard)对模型训练过程进行监控,以便更好地理解模型的训练效果。
原文地址: https://www.cveoy.top/t/topic/mRCW 著作权归作者所有。请勿转载和采集!