Informer 网络实现:模型结构、数据预处理、训练与测试
这段代码实现了Informer网络的模型结构,并对数据集进行了预处理和归一化处理。接着,对模型进行了训练和测试,并输出了训练和测试损失。具体的步骤如下:
-
定义了'Encoder', 'Decoder', 'MultiheadAttention', 'EncoderLayer', 'DecoderLayer', 'PositionalEncoding' 和 'Informer' 等模型结构。
-
读取数据集,并将数据集分为训练集、验证集和测试集。同时,对数据进行归一化处理。
-
定义了损失函数和优化器。
-
对模型进行了训练,并输出了训练和验证损失。
-
对模型进行了测试,并输出了测试损失。
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
# 接着,我们需要定义Informer网络的模型结构。
# Informer网络的结构比较复杂,主要包括编码器、解码器和自注意力机制等部分。我们可以使用PyTorch提供的模块来实现这些部分,具体代码如下:
class Encoder(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, dropout):
super(Encoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.dropout = dropout
self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
def forward(self, x):
output, hidden = self.rnn(x)
return output, hidden
class Decoder(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers, dropout):
super(Decoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.num_layers = num_layers
self.dropout = dropout
self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, hidden):
output, hidden = self.rnn(x, hidden)
output = self.fc(output)
return output, hidden
class MultiheadAttention(nn.Module):
def __init__(self, d_model, n_head, dropout):
super(MultiheadAttention, self).__init__()
self.d_model = d_model
self.n_head = n_head
self.q_linear = nn.Linear(d_model, d_model)
self.v_linear = nn.Linear(d_model, d_model)
self.k_linear = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(d_model, d_model)
def forward(self, q, k, v, mask=None):
bs, len_q, d_model = q.size()
bs, len_k, d_model = k.size()
bs, len_v, d_model = v.size()
q = self.q_linear(q).view(bs, len_q, self.n_head, int(d_model/self.n_head)).transpose(1, 2)
k = self.k_linear(k).view(bs, len_k, self.n_head, int(d_model/self.n_head)).transpose(1, 2)
v = self.v_linear(v).view(bs, len_v, self.n_head, int(d_model/self.n_head)).transpose(1, 2)
scores = torch.matmul(q, k.transpose(-2, -1)) / np.sqrt(self.d_model/self.n_head)
if mask is not None:
mask = mask.unsqueeze(1).repeat(1, self.n_head, 1, 1)
scores = scores.masked_fill(mask == 0, -1e9)
scores = nn.functional.softmax(scores, dim=-1)
scores = self.dropout(scores)
output = torch.matmul(scores, v).transpose(1, 2).contiguous().view(bs, len_q, -1)
output = self.fc(output)
return output
class EncoderLayer(nn.Module):
def __init__(self, d_model, n_head, pf_dim, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = MultiheadAttention(d_model, n_head, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.fc = nn.Sequential(
nn.Linear(d_model, pf_dim),
nn.ReLU(),
nn.Linear(pf_dim, d_model)
)
self.norm2 = nn.LayerNorm(d_model)
self.dropout2 = nn.Dropout(dropout)
def forward(self, src, src_mask=None):
src2 = self.self_attn(src, src, src, src_mask)
src = src + self.dropout1(src2)
src = self.norm1(src)
src2 = self.fc(src)
src = src + self.dropout2(src2)
src = self.norm2(src)
return src
class DecoderLayer(nn.Module):
def __init__(self, d_model, n_head, pf_dim, dropout):
super(DecoderLayer, self).__init__()
self.self_attn = MultiheadAttention(d_model, n_head, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.src_attn = MultiheadAttention(d_model, n_head, dropout)
self.norm2 = nn.LayerNorm(d_model)
self.dropout2 = nn.Dropout(dropout)
self.fc = nn.Sequential(
nn.Linear(d_model, pf_dim),
nn.ReLU(),
nn.Linear(pf_dim, d_model)
)
self.norm3 = nn.LayerNorm(d_model)
self.dropout3 = nn.Dropout(dropout)
def forward(self, tgt, src, tgt_mask=None, src_mask=None):
tgt2 = self.self_attn(tgt, tgt, tgt, tgt_mask)
tgt = tgt + self.dropout1(tgt2)
tgt = self.norm1(tgt)
tgt2 = self.src_attn(tgt, src, src, src_mask)
tgt = tgt + self.dropout2(tgt2)
tgt = self.norm2(tgt)
tgt2 = self.fc(tgt)
tgt = tgt + self.dropout3(tgt2)
tgt = self.norm3(tgt)
return tgt
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
class Informer(nn.Module):
def __init__(self, input_size, output_size, enc_hid_dim, dec_hid_dim, n_enc_layers, n_dec_layers, n_heads, pf_dim, dropout):
super(Informer, self).__init__()
self.enc_hid_dim = enc_hid_dim
self.dec_hid_dim = dec_hid_dim
self.n_enc_layers = n_enc_layers
self.n_dec_layers = n_dec_layers
self.encoder = Encoder(input_size, enc_hid_dim, n_enc_layers, dropout)
self.decoder = Decoder(output_size, dec_hid_dim, output_size, n_dec_layers, dropout)
self.pos_encoder = PositionalEncoding(enc_hid_dim, dropout)
self.pos_decoder = PositionalEncoding(dec_hid_dim, dropout)
self.enc_layers = nn.ModuleList([EncoderLayer(enc_hid_dim, n_heads, pf_dim, dropout) for _ in range(n_enc_layers)])
self.dec_layers = nn.ModuleList([DecoderLayer(dec_hid_dim, n_heads, pf_dim, dropout) for _ in range(n_dec_layers)])
def forward(self, src, tgt):
src_mask = self._generate_mask(src)
# tgt_mask = self._generate_mask(tgt) & self._generate_tgt_subsequent_mask(tgt)
tgt_mask = self._generate_mask(tgt).long() & self._generate_tgt_subsequent_mask(tgt).long()
src = self.pos_encoder(src)
tgt = self.pos_decoder(tgt)
enc_output, enc_hidden = self.encoder(src)
for layer in self.enc_layers:
enc_output = layer(enc_output, src_mask)
dec_output, _ = self.decoder(tgt, enc_hidden)
for layer in self.dec_layers:
dec_output = layer(dec_output, enc_output, tgt_mask, src_mask)
return dec_output[:, -1, :]
def _generate_mask(self, x):
mask = (x != 0).unsqueeze(1).unsqueeze(2)
return mask
def _generate_tgt_subsequent_mask(self, x):
mask = torch.triu(torch.ones(x.size(1), x.size(1)), diagonal=1).unsqueeze(0).unsqueeze(1)
return mask
# 接下来,我们需要准备数据集并进行数据预处理。
#
# 我们使用Pandas库来读取数据集,并将数据集分为训练集、验证集和测试集。同时,我们需要将数据进行归一化处理,将所有特征值缩放到0到1之间,以便模型更好地进行训练。
# Read dataset
df = pd.read_csv('./data/柳林.csv')
# Split dataset into train, valid and test sets
train_size = int(len(df) * 0.6)
valid_size = int(len(df) * 0.2)
test_size = len(df) - train_size - valid_size
train_df = df[:train_size]
valid_df = df[train_size:train_size+valid_size]
test_df = df[train_size+valid_size:]
# Normalize dataset
train_mean = train_df.mean()
train_std = train_df.std()
train_df = (train_df - train_mean) / train_std
valid_df = (valid_df - train_mean) / train_std
test_df = (test_df - train_mean) / train_std
# Prepare data for model training
train_x = torch.tensor(train_df[['辐照度', '温度', '湿度', '风速']].values, dtype=torch.float32)
train_y = torch.tensor(train_df[['功率']].values, dtype=torch.float32)
valid_x = torch.tensor(valid_df[['辐照度', '温度', '湿度', '风速']].values, dtype=torch.float32)
valid_y = torch.tensor(valid_df[['功率']].values, dtype=torch.float32)
test_x = torch.tensor(test_df[['辐照度', '温度', '湿度', '风速']].values, dtype=torch.float32)
test_y = torch.tensor(test_df[['功率']].values, dtype=torch.float32)
# 最后,我们需要定义损失函数和优化器,并对模型进行训练。
# Instantiate model, loss function and optimizer
model = Informer(input_size=4, output_size=1, enc_hid_dim=128, dec_hid_dim=64, n_enc_layers=3, n_dec_layers=2, n_heads=8, pf_dim=256, dropout=0.1)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters())
# Train model
num_epochs = 100
batch_size = 64
num_batches = len(train_x) // batch_size
for epoch in range(num_epochs):
train_loss = 0.0
valid_loss = 0.0
model.train()
for i in range(num_batches):
optimizer.zero_grad()
batch_x = train_x[i*batch_size:(i+1)*batch_size]
batch_y = train_y[i*batch_size:(i+1)*batch_size]
output = model(batch_x, batch_y)
loss = criterion(output, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
model.eval()
with torch.no_grad():
output = model(valid_x, valid_y)
valid_loss = criterion(output, valid_y).item()
print(f'Epoch {epoch+1}, train_loss: {train_loss/num_batches:.4f}, valid_loss: {valid_loss:.4f}')
# Test model
model.eval()
with torch.no_grad():
output = model(test_x, test_y)
test_loss = criterion(output, test_y).item()
print(f'Test loss: {test_loss:.4f}')
原文地址: https://www.cveoy.top/t/topic/mKky 著作权归作者所有。请勿转载和采集!