MindSpore ResNet 模型训练 - 数字识别
MindSpore ResNet 模型训练 - 数字识别
本代码使用 MindSpore 框架训练 ResNet 模型进行数字识别。代码包含数据预处理、模型定义、训练和评估等步骤。
代码结构:
-
导入库: 导入必要的库,包括
numpy、mindspore.dataset、cv2、mindspore.nn等。 -
定义残差块: 定义 ResNet 模型中的残差块,它包含两个卷积层、两个 Batch Normalization 层和一个 ReLU 激活函数。
-
定义 ResNet 模型: 定义 ResNet 模型,它包含多个残差块,以及卷积层、池化层、全连接层等。
-
定义数据集生成器: 定义一个数据集生成器,用于读取图像数据并进行预处理。
-
训练模型: 定义训练函数,加载数据集、定义损失函数、优化器、回调函数等,并开始训练模型。
代码:
import numpy as np
import mindspore.dataset as ds
import cv2
import mindspore.nn as nn
import os
from mindspore import context, ops
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.train import Model
from mindspore.nn.metrics import Accuracy
np.random.seed(58)
class ResidualBlock(nn.Cell):
expansion = 1
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super(ResidualBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, pad_mode='same')
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU()
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, pad_mode='same')
self.bn2 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
self.stride = stride
def construct(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class ResNet(nn.Cell):
def __init__(self, block, layers, num_classes=34):
super(ResNet, self).__init__()
self.in_channels = 64
self.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, pad_mode='valid')
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU()
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='valid')
self.layer1 = self.make_layer(block, 64, layers[0])
self.layer2 = self.make_layer(block, 128, layers[1], stride=2)
self.layer3 = self.make_layer(block, 256, layers[2], stride=2)
self.layer4 = self.make_layer(block, 512, layers[3], stride=2)
self.avgpool = nn.AvgPool2d(kernel_size=3, stride=1, pad_mode='valid')
self.fc = nn.Dense(512 * block.expansion, num_classes)
def make_layer(self, block, out_channels, blocks, stride=1):
downsample = None
if (stride != 1) or (self.in_channels != out_channels * block.expansion):
downsample = nn.SequentialCell([
nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride),
nn.BatchNorm2d(out_channels * block.expansion)
])
layers = []
layers.append(block(self.in_channels, out_channels, stride, downsample))
self.in_channels = out_channels * block.expansion
for _ in range(1, blocks):
layers.append(block(self.in_channels, out_channels))
return nn.SequentialCell(layers)
def construct(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = ops.Reshape()(x, (ops.Shape()(x)[0], -1))
x = self.fc(x)
return x
class TrainDatasetGenerator:
def __init__(self, file_path):
self.file_path = file_path
self.img_names = os.listdir(file_path)
def __call__(self, index=0):
data = cv2.imread(os.path.join(self.file_path, self.img_names[index]))
label = int(self.img_names[index][0])-1
data = cv2.resize(data, (100, 100))
data = data.transpose().astype(np.float32) / 255.
label = np.array([label])
return data, label # 修改此处,返回一个元组
def __len__(self):
return len(self.img_names)
def train_resnet():
context.set_context(mode=context.GRAPH_MODE, device_target='CPU')
train_dataset_generator = TrainDatasetGenerator('D:/pythonproject2/digital_mindspore/dataset')
ds_train = ds.GeneratorDataset(train_dataset_generator, ['data', 'label'], shuffle=True)
ds_train = ds_train.shuffle(buffer_size=10)
ds_train = ds_train.batch(batch_size=4, drop_remainder=True)
valid_dataset_generator = TrainDatasetGenerator('D:/pythonproject2/test1')
ds_valid = ds.GeneratorDataset(valid_dataset_generator, ['data', 'label'], shuffle=True)
ds_valid = ds_valid.batch(batch_size=4, drop_remainder=True)
network = ResNet(ResidualBlock,[2,2,2,2])
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
net_opt = nn.Momentum(network.trainable_params(), learning_rate=0.01, momentum=0.9)
time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
config_ck = CheckpointConfig(save_checkpoint_steps=10,
keep_checkpoint_max=10)
config_ckpt_path = 'D:/pythonproject2/ckpt/'
ckpoint_cb = ModelCheckpoint(prefix='checkpoint_lenet', directory=config_ckpt_path, config=config_ck)
model = Model(network, net_loss, net_opt, metrics={'Accuracy': Accuracy()})
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])
acc = model.eval(ds_valid)
print('============== {} ============='.format(acc))
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])
acc = model.eval(ds_valid)
print('============== {} ============='.format(acc))
epoch_size = 10
print('============== Starting Training =============')
model.train(epoch_size, ds_train, callbacks=[time_cb, ckpoint_cb, LossMonitor()])
acc = model.eval(ds_valid)
print('============== {} ============='.format(acc))
if __name__ == '__main__':
train_resnet()
运行结果:
代码运行后,会输出训练过程中的损失值、准确率等信息。
注意:
- 代码中的数据路径需要根据实际情况进行修改。
- 代码中的超参数可以根据实际情况进行调整。
- 代码使用了 CPU 进行训练,如果需要使用 GPU 进行训练,需要修改代码中的
context.set_context函数。 - 此代码仅供参考,读者可以根据自己的需求进行修改和完善。
原文地址: https://www.cveoy.top/t/topic/mRBX 著作权归作者所有。请勿转载和采集!