首先,我们需要导入相关的库,包括PyTorch、NumPy、Pandas等:

import torch import torch.nn as nn import torch.optim as optim import numpy as np import pandas as pd

接着,我们需要定义Informer网络的模型结构。

Informer网络的结构比较复杂,主要包括编码器、解码器和自注意力机制等部分。我们可以使用PyTorch提供的模块来实现这些部分,具体代码如下:

定义编码器

class Encoder(nn.Module): def init(self, input_size, hidden_size, num_layers, dropout): super(Encoder, self).init() self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.dropout = dropout

    # LSTM层
    self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)

def forward(self, x):
    output, hidden = self.rnn(x)
    return output, hidden

定义解码器

class Decoder(nn.Module): def init(self, input_size, hidden_size, output_size, num_layers, dropout): super(Decoder, self).init() self.input_size = input_size self.hidden_size = hidden_size self.output_size = output_size self.num_layers = num_layers self.dropout = dropout

    # LSTM层和全连接层
    self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
    self.fc = nn.Linear(hidden_size, output_size)

def forward(self, x, hidden):
    output, hidden = self.rnn(x, hidden)
    output = self.fc(output)
    return output, hidden

定义多头注意力机制

class MultiheadAttention(nn.Module): def init(self, d_model, n_head, dropout): super(MultiheadAttention, self).init() self.d_model = d_model self.n_head = n_head

    # 三个全连接层和一个全连接层
    self.q_linear = nn.Linear(d_model, d_model)
    self.v_linear = nn.Linear(d_model, d_model)
    self.k_linear = nn.Linear(d_model, d_model)
    self.dropout = nn.Dropout(dropout)
    self.fc = nn.Linear(d_model, d_model)

def forward(self, q, k, v, mask=None):
    bs, len_q, d_model = q.size()
    bs, len_k, d_model = k.size()
    bs, len_v, d_model = v.size()
    
    # 将q、k、v分别通过全连接层进行变换,并将结果分成n_head份
    q = self.q_linear(q).view(bs, len_q, self.n_head, int(d_model/self.n_head)).transpose(1, 2)
    k = self.k_linear(k).view(bs, len_k, self.n_head, int(d_model/self.n_head)).transpose(1, 2)
    v = self.v_linear(v).view(bs, len_v, self.n_head, int(d_model/self.n_head)).transpose(1, 2)
    
    # 计算注意力分数并进行softmax操作
    scores = torch.matmul(q, k.transpose(-2, -1)) / np.sqrt(self.d_model/self.n_head)
    if mask is not None:
        mask = mask.unsqueeze(1).repeat(1, self.n_head, 1, 1)
        scores = scores.masked_fill(mask == 0, -1e9)
    scores = nn.functional.softmax(scores, dim=-1)
    scores = self.dropout(scores)
    
    # 将分数和v相乘得到输出
    output = torch.matmul(scores, v).transpose(1, 2).contiguous().view(bs, len_q, -1)
    output = self.fc(output)
    return output

定义编码器层

class EncoderLayer(nn.Module): def init(self, d_model, n_head, pf_dim, dropout): super(EncoderLayer, self).init()

    # 多头注意力机制、LayerNorm和Dropout层
    self.self_attn = MultiheadAttention(d_model, n_head, dropout)
    self.norm1 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(dropout)
    
    # 全连接层和LayerNorm层
    self.fc = nn.Sequential(
        nn.Linear(d_model, pf_dim),
        nn.ReLU(),
        nn.Linear(pf_dim, d_model)
    )
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout2 = nn.Dropout(dropout)

def forward(self, src, src_mask=None):
    src2 = self.self_attn(src, src, src, src_mask)
    src = src + self.dropout1(src2)
    src = self.norm1(src)
    
    src2 = self.fc(src)
    src = src + self.dropout2(src2)
    src = self.norm2(src)
    return src

定义解码器层

class DecoderLayer(nn.Module): def init(self, d_model, n_head, pf_dim, dropout): super(DecoderLayer, self).init()

    # 多头注意力机制、LayerNorm和Dropout层
    self.self_attn = MultiheadAttention(d_model, n_head, dropout)
    self.norm1 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(dropout)
    
    # 多头注意力机制、LayerNorm和Dropout层
    self.src_attn = MultiheadAttention(d_model, n_head, dropout)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout2 = nn.Dropout(dropout)
    
    # 全连接层和LayerNorm层
    self.fc = nn.Sequential(
        nn.Linear(d_model, pf_dim),
        nn.ReLU(),
        nn.Linear(pf_dim, d_model)
    )
    self.norm3 = nn.LayerNorm(d_model)
    self.dropout3 = nn.Dropout(dropout)

def forward(self, tgt, src, tgt_mask=None, src_mask=None):
    tgt2 = self.self_attn(tgt, tgt, tgt, tgt_mask)
    tgt = tgt + self.dropout1(tgt2)
    tgt = self.norm1(tgt)
    
    tgt2 = self.src_attn(tgt, src, src, src_mask)
    tgt = tgt + self.dropout2(tgt2)
    tgt = self.norm2(tgt)
    
    tgt2 = self.fc(tgt)
    tgt = tgt + self.dropout3(tgt2)
    tgt = self.norm3(tgt)
    return tgt

class PositionalEncoding(nn.Module): def init(self, d_model, dropout, max_len=5000): super(PositionalEncoding, self).init() self.dropout = nn.Dropout(dropout)

    pe = torch.zeros(max_len, d_model)
    position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)
    pe = pe.unsqueeze(0).transpose(0, 1)
    self.register_buffer('pe', pe)

def forward(self, x):
    x = x + self.pe[:x.size(0), :]
    return self.dropout(x)

class Informer(nn.Module): def init(self, input_size, output_size, enc_hid_dim, dec_hid_dim, n_enc_layers, n_dec_layers, n_heads, pf_dim, dropout): super(Informer, self).init()

    self.enc_hid_dim = enc_hid_dim
    self.dec_hid_dim = dec_hid_dim
    self.n_enc_layers = n_enc_layers
    self.n_dec_layers = n_dec_layers
    
    self.encoder = Encoder(input_size, enc_hid_dim, n_enc_layers, dropout)
    self.decoder = Decoder(output_size, dec_hid_dim, output_size, n_dec_layers, dropout)
    
    self.pos_encoder = PositionalEncoding(enc_hid_dim, dropout)
    self.pos_decoder = PositionalEncoding(dec_hid_dim, dropout)
    
    self.enc_layers = nn.ModuleList([EncoderLayer(enc_hid_dim, n_heads, pf_dim, dropout) for _ in range(n_enc_layers)])
    self.dec_layers = nn.ModuleList([DecoderLayer(dec_hid_dim, n_heads, pf_dim, dropout) for _ in range(n_dec_layers)])

def forward(self, src, tgt):
    src_mask = self._generate_mask(src)
    tgt_mask = self._generate_mask(tgt) & self._generate_tgt_subsequent_mask(tgt)
    src = self.pos_encoder(src)
    tgt = self.pos_decoder(tgt)
    
    enc_output, enc_hidden = self.encoder(src)
    for layer in self.enc_layers:
        enc_output = layer(enc_output, src_mask)
    
    dec_output, _ = self.decoder(tgt, enc_hidden)
    for layer in self.dec_layers:
        dec_output = layer(dec_output, enc_output, tgt_mask, src_mask)
    
    return dec_output[:, -1, :]

def _generate_mask(self, x):
    mask = (x != 0).unsqueeze(1).unsqueeze(2)
    return mask

def _generate_tgt_subsequent_mask(self, x):
    mask = torch.triu(torch.ones(x.size(1), x.size(1)), diagonal=1).unsqueeze(0).unsqueeze(1)
    return mask

接下来,我们需要准备数据集并进行数据预处理。

我们使用Pandas库来读取数据集,并将数据集分为训练集、验证集和测试集。同时,我们需要将数据进行归一化处理,将所有特征值缩放到0到1之间,以便模型更好地进行训练。

Read dataset

df = pd.read_csv('./data/柳林.csv')

Split dataset into train, valid and test sets

train_size = int(len(df) * 0.6) valid_size = int(len(df) * 0.2) test_size = len(df) - train_size - valid_size

train_df = df[:train_size] valid_df = df[train_size:train_size+valid_size] test_df = df[train_size+valid_size:]

Normalize dataset

train_mean = train_df.mean() train_std = train_df.std()

train_df = (train_df - train_mean) / train_std valid_df = (valid_df - train_mean) / train_std test_df = (test_df - train_mean) / train_std

Prepare data for model training

train_x = torch.tensor(train_df[['辐照度', '温度', '湿度', '风速']].values, dtype=torch.float32) train_y = torch.tensor(train_df[['功率']].values, dtype=torch.float32)

valid_x = torch.tensor(valid_df[['辐照度', '温度', '湿度', '风速']].values, dtype=torch.float32) valid_y = torch.tensor(valid_df[['功率']].values, dtype=torch.float32)

test_x = torch.tensor(test_df[['辐照度', '温度', '湿度', '风速']].values, dtype=torch.float32) test_y = torch.tensor(test_df[['功率']].values, dtype=torch.float32)

最后,我们需要定义损失函数和优化器,并对模型进行训练。

Instantiate model, loss function and optimizer

model = Informer(input_size=4, output_size=1, enc_hid_dim=128, dec_hid_dim=64, n_enc_layers=3, n_dec_layers=2, n_heads=8, pf_dim=256, dropout=0.1) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters())

Train model

num_epochs = 100 batch_size = 64 num_batches = len(train_x) // batch_size

for epoch in range(num_epochs): train_loss = 0.0 valid_loss = 0.0

model.train()
for i in range(num_batches):
    optimizer.zero_grad()
    
    batch_x = train_x[i*batch_size:(i+1)*batch_size]
    batch_y = train_y[i*batch_size:(i+1)*batch_size]
    
    output = model(batch_x, batch_y)
    loss = criterion(output, batch_y)
    loss.backward()
    optimizer.step()
    
    train_loss += loss.item()

model.eval()
with torch.no_grad():
    output = model(valid_x, valid_y)
    valid_loss = criterion(output, valid_y).item()

print(f'Epoch {epoch+1}, train_loss: {train_loss/num_batches:.4f}, valid_loss: {valid_loss:.4f}')

Test model

model.eval() with torch.no_grad(): output = model(test_x, test_y) test_loss = criterion(output, test_y).item()

print(f'Test loss: {test_loss:.4f}')


原文地址: https://www.cveoy.top/t/topic/mJ6G 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录