ECG Classification with Full Convolutional Neural Network: A Comprehensive Guide
from sklearn.model_selection import train_test_split
from torch.nn.common_types import _size_1_t
import torch.optim as optim
from typing import Union
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import scipy.io as scio
from torch.utils.data import Dataset, DataLoader
import numpy as np
import random
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
classes_num = 17
########################################读数据##############################################
# 读取出来的data是字典格式
dataset_path = '../ECG_Dataset/ECG-17'
classes = ['NSR', 'APB', 'AFL', 'AFIB', 'SVTA', 'WPW', 'PVC', 'Bigeminy',
'Trigeminy', 'VT', 'IVR', 'VFL', 'Fusion', 'LBBBB', 'RBBBB', 'SDHB', 'PR']
len_classes = len(classes)
X = list()
y = list()
for root, dirs, files in os.walk(dataset_path, topdown=False):
for name in files:
data_train = scio.loadmat(
os.path.join(root, name)) # 取出字典里的value
# arr -> list
data_arr = data_train.get('val')
data_list = data_arr.tolist()
# append() 方法用于在列表的末尾追加元素,该方法的语法格式如下:
# listname.append(obj)
X.append(data_list[0]) # [[……]] -> [ ]
y.append(int(os.path.basename(root)[0:2]) - 1)# 标签0和1位,name -> num
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.long)
X_mean = torch.mean(X, dim=1, keepdim=True)
X_std = torch.std(X, dim=1, keepdim=True)
X = (X - X_mean) / X_std ##数据归一化,减均值除以方差
X = X.reshape((1000, 1, 3600)).to(device) # [1,3600]
y = y.reshape((1000)).to(device)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4)
batch_size = 64
lr = 0.002
########################################训练集##############################################
class TrainDatasets(Dataset):
def __init__(self, x_train, y_train):
self.len = x_train.size(0) # 取第0元素:长度
self.x_train = x_train
self.y_train = y_train
def __getitem__(self, index):
return self.x_train[index], self.y_train[index] # 返回对应样本即可
def __len__(self):
return self.len
class TestDatasets(Dataset):
def __init__(self, x_test, y_test):
self.len = x_test.size(0)
self.x_test = x_test
self.y_test = y_test
def __getitem__(self, index):
return self.x_test[index], self.y_test[index]
def __len__(self):
return self.len
########################################封装dataloader######################################
train_dataset = TrainDatasets(X_train, y_train)
test_dataset = TestDatasets(X_test, y_test)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)
###########################################################################################
class FullConv1d(nn.Conv1d):
def __init__(self, in_channels: int, out_channels: int, kernel_size: _size_1_t, stride: _size_1_t = 1,
padding: Union[str, _size_1_t] = 0,
dilation: _size_1_t = 1,
groups: int = 1,
bias: bool = False,
padding_mode: str = 'zeros', ):
super().__init__(in_channels, out_channels, kernel_size, stride=stride, padding=padding, dilation=dilation,
groups=groups, bias=bias, padding_mode=padding_mode)
def forward(self, input):
w = self.weight
out = F.conv1d(input=input, weight=w, bias=None, stride=self.stride,
padding=self.padding, dilation=self.dilation, groups=self.groups)
return out
###########################################################################################
class Full_conv_pool(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, stride, padding, padding_value, pool_size,
pool_stride, ):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.kernel_size = kernel_size
self.stride = stride
self.padding = padding
self.bn = nn.BatchNorm1d(out_channels)
# pad
self.pad = nn.ConstantPad1d(padding=padding, value=padding_value)
# 无bias,一维卷积
self.conv = FullConv1d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=0,
bias=False)
# MaxPool
self.pool = nn.MaxPool1d(kernel_size=pool_size, stride=pool_stride)
# ACT
self.relu = nn.ReLU()
self.prelu = nn.PReLU()
self.htanh = nn.Hardtanh()
# self.tanh = nn.Tanh()
def forward(self, I):
I = self.pad(I)
I = self.conv(I)
I = self.pool(I)
I = self.prelu(I)
I = self.bn(I)
return I
###########################################################################################
class Full_ECG(nn.Module):
def __init__(self, device):
super(Full_ECG, self).__init__()
self.name = 'Bin_ECG'
self.device = device
self.classifier = nn.Sequential(
# input_channels, output_channels, kernel_size, stride, padding, pad_value, pool_size, pool_stride
Full_conv_pool(1, 8, 9, 3, 5, 1,
5, 2),
Full_conv_pool(8, 16, 9, 1, 5, 1,
5, 2),
Full_conv_pool(16, 32, 9, 1, 5, 1,
5, 2),
Full_conv_pool(32, 32, 9, 1, 5, 1,
5, 2),
Full_conv_pool(32, classes_num, 9, 1, 5, 1,
5, 2)
)
self.dropout = nn.Dropout(p=0.8) # 防止过拟合
def forward(self, batch_data):
batch_data = batch_data.clone().detach().requires_grad_(True).to(self.device)
batch_data = self.classifier(batch_data)
batch_data = self.dropout(batch_data)
batch_data = batch_data.mean(dim=2) # 去掉一个维度
return batch_data
model = Full_ECG(device=device).to(device)
#####################################损失函数:交叉熵#########################################
loss_fn = nn.CrossEntropyLoss().to(device)
########################################优化器##############################################
optimizer = optim.Adam(model.parameters(), lr=lr)
print(device)
####################################模型参数汇总#######################################
from torchinfo import summary
summary(model=model,
input_size=(batch_size, 1, 3600), # make sure this is "input_size", not "input_shape"
col_names=["input_size", "output_size", "num_params", "trainable"],
col_width=20,
row_settings=["var_names"])
epochs = 200
seed = 110
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
results = {"train_loss": [],
"train_acc": [],
"test_loss": [],
"test_acc": []
}
best_test_acc = 0.0
best_train_acc = 0.0
best_test_epoch = 0
best_train_epoch = 0
# Make sure model on target device
model.to(device)
#################################学习率梯度衰减#################################
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs, eta_min=0.05,
last_epoch=-1)
# # lr每1个epoch衰减0.02
# lr_scheduler=torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer,epochs,1,0.02)
from matplotlib import pyplot as plt
# 添加余弦退火到训练循环中
train_losses = []
train_acces = []
eval_losses = []
eval_acces = []
for epoch in range(epochs):
##################################开始训练########################################
train_acc = 0
train_loss= 0
model.train() # 将模型改为训练模式
correct, total = 0, 0
for X, y in train_loader:
X, y = X.to(device), y.to(device)
# 前向传播,得到损失
y_pred = model(X)
loss = loss_fn(y_pred, y)
# 反向传播,将上一次的梯度清0,反向传播,并且step更新相应的参数
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 记录误差
train_loss += loss.item()
# 计算分类的准确率
_, predicted = torch.max(y_pred.data, dim=1) # 取出预测的最大值
correct += (predicted == y).sum().cpu().item() # 判断预测是否正确
total += len(y)
train_loss = train_loss / len(train_loader)
train_acc = correct/total
train_losses.append(train_loss / len(train_loader))
train_acces.append(correct / total)
##########################每进行一次迭代,就去测试一次##################################
model.eval()
test_loss, test_acc = 0, 0
correct, total = 0, 0
# with torch.inference_mode():
with torch.no_grad():
for (X, y) in test_loader:
X, y = X.to(device), y.to(device)
test_pred_logits = model(X)
loss = loss_fn(test_pred_logits, y)
test_loss += loss.item()
_, predicted = torch.max(test_pred_logits.data, dim=1) # 输出概率最大的标签
total += len(y)
correct += (predicted == y).sum().cpu().item() # 判断是否预测准确
test_loss = test_loss / len(test_loader)
test_acc = correct/total
eval_losses.append(test_loss / len(train_loader))
eval_acces.append(correct / total)
# 更新学习率
lr_scheduler.step()
if test_acc > best_test_acc:
best_test_acc = test_acc
best_test_epoch = epoch + 1
acc_str = "{:.2f}".format(best_test_acc * 100) + '%'
if train_acc > best_train_acc:
best_train_acc = train_acc
best_train_epoch = epoch + 1
acc_str = "{:.2f}".format(best_test_acc * 100) + '%'
print(
f"Epoch: {epoch + 1} | "
f"train_loss: {train_loss:.4f} | "
f"train_acc: {train_acc:.4f} | "
f"test_loss: {test_loss:.4f} | "
f"test_acc: {test_acc:.4f} | "
f"lr: {lr_scheduler.get_last_lr()[0]:.6f}"
)
results["train_loss"].append(train_loss)
results["train_acc"].append(train_acc)
results["test_loss"].append(test_loss)
results["test_acc"].append(test_acc)
##################################
# weightOperation.WeightBinarize()
# print(model.state_dict())
print('best_test_acc: ', "{:.2f}".format(best_test_acc * 100) + '%', ' epoch: ', best_test_epoch)
print('best_train_acc: ', "{:.2f}".format(best_train_acc * 100) + '%', ' epoch: ', best_train_epoch)
print("-" * 50 + "\n")
plt.plot(np.arange(len(train_losses)), train_losses,label = "train loss")
plt.plot(np.arange(len(train_acces)), train_acces,label = "train acc")
plt.plot(np.arange(len(eval_losses)), eval_losses,label = "test loss")
plt.plot(np.arange(len(eval_acces)), eval_acces,label = "test acc")
plt.legend()
plt.xlabel('epoches')
plt.title('Model loss & accuracy')
plt.show()
## How to validate model generalization performance using cross-validation
Cross-validation is a commonly used method to evaluate the generalization performance of a model, effectively avoiding overfitting and underfitting. In cross-validation, the dataset is divided into several equal-sized subsets. Each time, one subset is used as the test set and the remaining subsets are used as the training set. The process of training and testing the model is repeated multiple times. The average of the final test results is the generalization performance of the model.
Specifically, you can perform cross-validation according to the following steps:
1. Divide the dataset into training and test sets according to a certain proportion.
2. Divide the training set into K subsets according to a certain proportion.
3. For each subset i, use it as the validation set and the remaining subsets as the training set to train the model and test it on the validation set, obtaining the test results of the model on this validation set.
4. Repeat step 3 until all subsets have been used as validation sets, obtaining the test results of the model on K validation sets.
5. Calculate the average test result of the model on K validation sets as the generalization performance of the model.
It should be noted that cross-validation requires more computational resources and time because it requires training and testing the model multiple times. In practical applications, you can choose the number of cross-validation times and the size of the subsets according to specific circumstances, as well as whether to use stratified sampling or other methods to further improve the effectiveness of cross-validation.
原文地址: https://www.cveoy.top/t/topic/mmvo 著作权归作者所有。请勿转载和采集!