ResNet-50 模型实现:使用 MindSpore 构建图像分类模型
ResNet-50 模型实现
本篇代码演示了如何使用 MindSpore 实现 ResNet-50 模型,包括定义 ResNet-50 的网络结构、加载预训练模型、构建数据集、训练和测试模型等步骤。
定义 ResNet-50 网络结构
import mindspore.nn as nn
import mindspore.ops as ops
class ConvBlock(nn.Cell):
def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1):
super(ConvBlock, self).__init__()
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, has_bias=False)
self.bn = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU()
def construct(self, x):
x = self.conv(x)
x = self.bn(x)
x = self.relu(x)
return x
class ResBlock(nn.Cell):
def __init__(self, in_channels, out_channels, stride=1):
super(ResBlock, self).__init__()
self.conv1 = ConvBlock(in_channels, out_channels, stride=stride)
self.conv2 = ConvBlock(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
self.downsample = nn.SequentialCell([nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, has_bias=False), nn.BatchNorm2d(out_channels)])
self.relu = nn.ReLU()
def construct(self, x):
identity = x
x = self.conv1(x)
x = self.conv2(x)
if self.downsample is not None:
identity = self.downsample(identity)
x = x + identity
x = self.relu(x)
return x
class ResNet(nn.Cell):
def __init__(self, block, layers, num_classes=1000):
super(ResNet, self).__init__()
self.in_channels = 64
self.conv1 = ConvBlock(3, 64, kernel_size=7, stride=2, padding=3)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
self.layer1 = self._make_layer(block, 64, layers[0])
self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
self.avgpool = nn.AvgPool2d(7, 1)
self.dropout = nn.Dropout(0.4)
self.fc = nn.Dense(512 * block.expansion, num_classes)
def _make_layer(self, block, out_channels, blocks, stride=1):
downsample = None
if stride != 1 or self.in_channels != out_channels * block.expansion:
downsample = nn.SequentialCell([nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, has_bias=False), nn.BatchNorm2d(out_channels * block.expansion)])
layers = []
layers.append(block(self.in_channels, out_channels, stride, downsample))
self.in_channels = out_channels * block.expansion
for i in range(1, blocks):
layers.append(block(self.in_channels, out_channels))
return nn.SequentialCell(layers)
def construct(self, x):
x = self.conv1(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = self.dropout(x)
x = ops.Reshape()(x, (-1, 512 * 1 * 1))
x = self.fc(x)
return x
def resnet50():
return ResNet(ResBlock, [3, 4, 6, 3])
加载预训练模型
from mindspore import load_checkpoint, load_param_into_net
pretrained_model = 'resnet50.ckpt'
net = resnet50()
param_dict = load_checkpoint(pretrained_model)
load_param_into_net(net, param_dict)
构建数据集
import os
import numpy as np
from mindspore import Tensor
from PIL import Image
from mindspore.dataset import vision
def load_dataset(data_path):
images = []
labels = []
for subdir in os.listdir(data_path):
subpath = os.path.join(data_path, subdir)
for filename in os.listdir(subpath):
imgpath = os.path.join(subpath, filename)
img = Image.open(imgpath)
img = img.resize((224, 224))
img = np.array(img).astype(np.float32)
img = img.transpose((2, 0, 1))
images.append(img)
labels.append(int(subdir))
images = np.array(images)
labels = np.array(labels)
return Tensor(images), Tensor(labels)
train_path = 'train/'
test_path = 'test/'
train_data = vision.ImageFolderDataset(train_path, num_parallel_workers=8, shuffle=True)
train_data = train_data.map(operations=vision.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]))
train_data = train_data.map(operations=vision.HWC2CHW())
train_data = train_data.batch(batch_size=32, drop_remainder=True)
test_data = vision.ImageFolderDataset(test_path, num_parallel_workers=8, shuffle=False)
test_data = test_data.map(operations=vision.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]))
test_data = test_data.map(operations=vision.HWC2CHW())
test_data = test_data.batch(batch_size=32, drop_remainder=True)
训练和测试模型
from mindspore import Model
from mindspore.nn.metrics import Accuracy
from mindspore.train.callback import LossMonitor
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True)
lr = 0.01
momentum = 0.9
net_opt = nn.Momentum(net.trainable_params(), lr, momentum)
model = Model(net, net_loss, net_opt, metrics={'accuracy': Accuracy()})
loss_cb = LossMonitor(per_print_times=train_data.get_dataset_size())
# 训练模型
model.train(3, train_data, loss_cb) # 训练3个epoch
model.eval(test_data)
原文地址: https://www.cveoy.top/t/topic/mJeK 著作权归作者所有。请勿转载和采集!