这段代码实现了Informer网络的模型结构,并对数据集进行了预处理和归一化处理。接着,对模型进行了训练和测试,并输出了训练和测试损失。具体的步骤如下:

  1. 定义了'Encoder', 'Decoder', 'MultiheadAttention', 'EncoderLayer', 'DecoderLayer', 'PositionalEncoding' 和 'Informer' 等模型结构。

  2. 读取数据集,并将数据集分为训练集、验证集和测试集。同时,对数据进行归一化处理。

  3. 定义了损失函数和优化器。

  4. 对模型进行了训练,并输出了训练和验证损失。

  5. 对模型进行了测试,并输出了测试损失。

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd

# 接着,我们需要定义Informer网络的模型结构。
# Informer网络的结构比较复杂,主要包括编码器、解码器和自注意力机制等部分。我们可以使用PyTorch提供的模块来实现这些部分,具体代码如下:
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout):
        super(Encoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = dropout
        self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)

    def forward(self, x):
        output, hidden = self.rnn(x)
        return output, hidden

class Decoder(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers, dropout):
        super(Decoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        self.dropout = dropout
        self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden):
        output, hidden = self.rnn(x, hidden)
        output = self.fc(output)
        return output, hidden

class MultiheadAttention(nn.Module):
    def __init__(self, d_model, n_head, dropout):
        super(MultiheadAttention, self).__init__()
        self.d_model = d_model
        self.n_head = n_head

        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        bs, len_q, d_model = q.size()
        bs, len_k, d_model = k.size()
        bs, len_v, d_model = v.size()

        q = self.q_linear(q).view(bs, len_q, self.n_head, int(d_model/self.n_head)).transpose(1, 2)
        k = self.k_linear(k).view(bs, len_k, self.n_head, int(d_model/self.n_head)).transpose(1, 2)
        v = self.v_linear(v).view(bs, len_v, self.n_head, int(d_model/self.n_head)).transpose(1, 2)

        scores = torch.matmul(q, k.transpose(-2, -1)) / np.sqrt(self.d_model/self.n_head)
        if mask is not None:
            mask = mask.unsqueeze(1).repeat(1, self.n_head, 1, 1)
            scores = scores.masked_fill(mask == 0, -1e9)
        scores = nn.functional.softmax(scores, dim=-1)
        scores = self.dropout(scores)

        output = torch.matmul(scores, v).transpose(1, 2).contiguous().view(bs, len_q, -1)
        output = self.fc(output)
        return output

class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_head, pf_dim, dropout):
        super(EncoderLayer, self).__init__()

        self.self_attn = MultiheadAttention(d_model, n_head, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)

        self.fc = nn.Sequential(
            nn.Linear(d_model, pf_dim),
            nn.ReLU(),
            nn.Linear(pf_dim, d_model)
        )
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, src, src_mask=None):
        src2 = self.self_attn(src, src, src, src_mask)
        src = src + self.dropout1(src2)
        src = self.norm1(src)

        src2 = self.fc(src)
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_head, pf_dim, dropout):
        super(DecoderLayer, self).__init__()

        self.self_attn = MultiheadAttention(d_model, n_head, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)

        self.src_attn = MultiheadAttention(d_model, n_head, dropout)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout2 = nn.Dropout(dropout)

        self.fc = nn.Sequential(
            nn.Linear(d_model, pf_dim),
            nn.ReLU(),
            nn.Linear(pf_dim, d_model)
        )
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, tgt, src, tgt_mask=None, src_mask=None):
        tgt2 = self.self_attn(tgt, tgt, tgt, tgt_mask)
        tgt = tgt + self.dropout1(tgt2)
        tgt = self.norm1(tgt)

        tgt2 = self.src_attn(tgt, src, src, src_mask)
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)

        tgt2 = self.fc(tgt)
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)
        return tgt

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

class Informer(nn.Module):
    def __init__(self, input_size, output_size, enc_hid_dim, dec_hid_dim, n_enc_layers, n_dec_layers, n_heads, pf_dim, dropout):
        super(Informer, self).__init__()

        self.enc_hid_dim = enc_hid_dim
        self.dec_hid_dim = dec_hid_dim
        self.n_enc_layers = n_enc_layers
        self.n_dec_layers = n_dec_layers

        self.encoder = Encoder(input_size, enc_hid_dim, n_enc_layers, dropout)
        self.decoder = Decoder(output_size, dec_hid_dim, output_size, n_dec_layers, dropout)

        self.pos_encoder = PositionalEncoding(enc_hid_dim, dropout)
        self.pos_decoder = PositionalEncoding(dec_hid_dim, dropout)

        self.enc_layers = nn.ModuleList([EncoderLayer(enc_hid_dim, n_heads, pf_dim, dropout) for _ in range(n_enc_layers)])
        self.dec_layers = nn.ModuleList([DecoderLayer(dec_hid_dim, n_heads, pf_dim, dropout) for _ in range(n_dec_layers)])

    def forward(self, src, tgt):
        src_mask = self._generate_mask(src)
        # tgt_mask = self._generate_mask(tgt) & self._generate_tgt_subsequent_mask(tgt)
        tgt_mask = self._generate_mask(tgt).long() & self._generate_tgt_subsequent_mask(tgt).long()
        src = self.pos_encoder(src)
        tgt = self.pos_decoder(tgt)

        enc_output, enc_hidden = self.encoder(src)
        for layer in self.enc_layers:
            enc_output = layer(enc_output, src_mask)

        dec_output, _ = self.decoder(tgt, enc_hidden)
        for layer in self.dec_layers:
            dec_output = layer(dec_output, enc_output, tgt_mask, src_mask)

        return dec_output[:, -1, :]

    def _generate_mask(self, x):
        mask = (x != 0).unsqueeze(1).unsqueeze(2)
        return mask

    def _generate_tgt_subsequent_mask(self, x):
        mask = torch.triu(torch.ones(x.size(1), x.size(1)), diagonal=1).unsqueeze(0).unsqueeze(1)
        return mask

# 接下来,我们需要准备数据集并进行数据预处理。
# 
# 我们使用Pandas库来读取数据集,并将数据集分为训练集、验证集和测试集。同时,我们需要将数据进行归一化处理,将所有特征值缩放到0到1之间,以便模型更好地进行训练。

# Read dataset
df = pd.read_csv('./data/柳林.csv')

# Split dataset into train, valid and test sets
train_size = int(len(df) * 0.6)
valid_size = int(len(df) * 0.2)
test_size = len(df) - train_size - valid_size

train_df = df[:train_size]
valid_df = df[train_size:train_size+valid_size]
test_df = df[train_size+valid_size:]

# Normalize dataset
train_mean = train_df.mean()
train_std = train_df.std()

train_df = (train_df - train_mean) / train_std
valid_df = (valid_df - train_mean) / train_std
test_df = (test_df - train_mean) / train_std

# Prepare data for model training
train_x = torch.tensor(train_df[['辐照度', '温度', '湿度', '风速']].values, dtype=torch.float32)
train_y = torch.tensor(train_df[['功率']].values, dtype=torch.float32)

valid_x = torch.tensor(valid_df[['辐照度', '温度', '湿度', '风速']].values, dtype=torch.float32)
valid_y = torch.tensor(valid_df[['功率']].values, dtype=torch.float32)

test_x = torch.tensor(test_df[['辐照度', '温度', '湿度', '风速']].values, dtype=torch.float32)
test_y = torch.tensor(test_df[['功率']].values, dtype=torch.float32)

# 最后,我们需要定义损失函数和优化器,并对模型进行训练。
# Instantiate model, loss function and optimizer
model = Informer(input_size=4, output_size=1, enc_hid_dim=128, dec_hid_dim=64, n_enc_layers=3, n_dec_layers=2, n_heads=8, pf_dim=256, dropout=0.1)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters())

# Train model
num_epochs = 100
batch_size = 64
num_batches = len(train_x) // batch_size

for epoch in range(num_epochs):
    train_loss = 0.0
    valid_loss = 0.0

    model.train()
    for i in range(num_batches):
        optimizer.zero_grad()

        batch_x = train_x[i*batch_size:(i+1)*batch_size]
        batch_y = train_y[i*batch_size:(i+1)*batch_size]

        output = model(batch_x, batch_y)
        loss = criterion(output, batch_y)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    model.eval()
    with torch.no_grad():
        output = model(valid_x, valid_y)
        valid_loss = criterion(output, valid_y).item()

    print(f'Epoch {epoch+1}, train_loss: {train_loss/num_batches:.4f}, valid_loss: {valid_loss:.4f}')

# Test model
model.eval()
with torch.no_grad():
    output = model(test_x, test_y)
    test_loss = criterion(output, test_y).item()

print(f'Test loss: {test_loss:.4f}')

原文地址: https://www.cveoy.top/t/topic/mKky 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录