本文将介绍如何使用 Python 搭建一个 Informer 网络,用于预测光伏功率。我们将使用 CSV 格式的数据,其中第一列为时间,其余几列分别为辐照度、温度、湿度、风速和功率。该网络将进行模型训练和测试,并输出测试结果,包括评价指标 R 方、MSE、MAPE、MAE 等。

首先,需要安装以下 Python 库:pandas、numpy、torch、sklearn、matplotlib。

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

步骤 1:读取数据

首先,读取 CSV 文件的数据,并将其转换为 pandas DataFrame 对象。

data = pd.read_csv('data.csv')

步骤 2:数据预处理

数据预处理是神经网络模型的重要步骤之一。对于此问题,我们需要对数据进行以下预处理:

  1. 将时间列转换为 pandas 的时间戳对象,并将其设置为数据框的索引。
  2. 将数据框中的所有列转换为浮点数类型。
  3. 将数据框中的所有列进行标准化处理。
# 将时间列转换为时间戳对象,并将其设置为索引
data['time'] = pd.to_datetime(data['time'])
data.set_index('time', inplace=True)

# 将数据框中的所有列转换为浮点数类型
data = data.astype(float)

# 对数据框中的所有列进行标准化处理
scaler = StandardScaler()
data = pd.DataFrame(scaler.fit_transform(data), columns=data.columns, index=data.index)

步骤 3:创建数据集

我们需要将数据集分为训练集和测试集。在这里,我们将前 70% 的数据用作训练集,将后 30% 的数据用作测试集。

# 将数据集分为训练集和测试集
train_size = int(0.7 * len(data))
train_data, test_data = data.iloc[:train_size], data.iloc[train_size:]

# 创建训练集和测试集
def create_dataset(dataset, look_back=1):
    X, Y = [], []
    for i in range(len(dataset)-look_back):
        X.append(dataset.iloc[i:i+look_back].values)
        Y.append(dataset.iloc[i+look_back].values)
    return np.array(X), np.array(Y)

look_back = 24 # 使用前 24 小时的数据进行预测
train_X, train_Y = create_dataset(train_data, look_back)
test_X, test_Y = create_dataset(test_data, look_back)

步骤 4:构建模型

我们使用 PyTorch 构建模型。在此示例中,我们使用 Informer 模型。Informer 模型是一种用于时间序列预测的深度学习模型。它是 Transformer 模型的变体,可以捕获序列中的长期依赖性。

class Informer(nn.Module):
    def __init__(self, input_size, output_size, d_model=512, n_heads=8, e_layers=2, d_layers=1, d_ff=2048, dropout=0.1):
        super(Informer, self).__init__()
        
        self.encoder = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(e_layers)])
        self.decoder = nn.ModuleList([DecoderLayer(input_size+d_model, d_model, n_heads, d_ff, dropout) for _ in range(d_layers)])
        self.fc = nn.Linear(d_model, output_size)
        
    def forward(self, x):
        encoder_outputs = []
        
        # encoder
        for layer in self.encoder:
            x = layer(x)
            encoder_outputs.append(x)
            
        # decoder
        for layer, enc_out in zip(self.decoder, encoder_outputs):
            x = layer(x, enc_out)
            
        # output
        x = self.fc(x[:, -1, :])
        return x

class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.feed_forward = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        
    def forward(self, x):
        # self-attention
        residual = x
        x = self.norm1(x)
        x = self.self_attn(x, x, x)
        x = self.dropout1(x)
        x += residual
        
        # feed-forward
        residual = x
        x = self.norm2(x)
        x = self.feed_forward(x)
        x = self.dropout2(x)
        x += residual
        
        return x

class DecoderLayer(nn.Module):
    def __init__(self, input_size, d_model, n_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.cross_attn = MultiHeadAttention(d_model, n_heads)
        self.feed_forward = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
        
    def forward(self, x, encoder_output):
        # self-attention
        residual = x
        x = self.norm1(x)
        x = self.self_attn(x, x, x)
        x = self.dropout1(x)
        x += residual
        
        # cross-attention
        residual = x
        x = self.norm2(x)
        x = self.cross_attn(x, encoder_output, encoder_output)
        x = self.dropout2(x)
        x += residual
        
        # feed-forward
        residual = x
        x = self.norm3(x)
        x = self.feed_forward(x)
        x = self.dropout3(x)
        x += residual
        
        return x

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads):
        super(MultiHeadAttention, self).__init__()
        
        self.d_model = d_model
        self.n_heads = n_heads
        self.head_dim = d_model // n_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def forward(self, Q, K, V):
        Q = self.W_q(Q)
        K = self.W_k(K)
        V = self.W_v(V)
        
        Q = self.split_heads(Q)
        K = self.split_heads(K)
        V = self.split_heads(V)
        
        # scaled dot-product attention
        scores = torch.matmul(Q, K.transpose(-1, -2)) / torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32))
        attn_weights = F.softmax(scores, dim=-1)
        x = torch.matmul(attn_weights, V)
        
        x = self.combine_heads(x)
        x = self.W_o(x)
        return x
    
    def split_heads(self, x):
        batch_size, seq_len, d_model = x.size()
        x = x.view(batch_size, seq_len, self.n_heads, self.head_dim)
        x = x.permute(0, 2, 1, 3)
        return x
    
    def combine_heads(self, x):
        batch_size, _, seq_len, d_model = x.size()
        x = x.permute(0, 2, 1, 3)
        x = x.reshape(batch_size, seq_len, self.n_heads*self.head_dim)
        return x

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout):
        super(FeedForward, self).__init__()
        
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

步骤 5:训练模型

在这里,我们使用 Adam 优化器和平均绝对误差(MAE)损失函数来训练模型。我们将模型训练 100 个 epoch。

from torch.utils.data import DataLoader, TensorDataset

# 创建数据加载器
train_dataset = TensorDataset(torch.from_numpy(train_X), torch.from_numpy(train_Y))
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 创建模型
input_size = train_X.shape[-1]
output_size = train_Y.shape[-1]
model = Informer(input_size, output_size)

# 定义优化器和损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.L1Loss()

# 训练模型
for epoch in range(100):
    for X, Y in train_loader:
        optimizer.zero_grad()
        Y_pred = model(X)
        loss = criterion(Y_pred, Y)
        loss.backward()
        optimizer.step()
        
    print(f'Epoch {epoch+1}, Loss: {loss.item():.4f}')

步骤 6:测试模型

在这里,我们使用训练好的模型对测试集进行预测,并计算评价指标。

# 创建数据加载器
test_dataset = TensorDataset(torch.from_numpy(test_X), torch.from_numpy(test_Y))
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 将模型设置为评估模式
model.eval()

# 测试模型
Y_true, Y_pred = [], []
with torch.no_grad():
    for X, Y in test_loader:
        Y_true.append(Y.numpy())
        Y_pred.append(model(X).numpy())
        
Y_true = np.concatenate(Y_true)
Y_pred = np.concatenate(Y_pred)

# 计算评价指标
r2 = r2_score(Y_true, Y_pred)
mse = mean_squared_error(Y_true, Y_pred)
mae = mean_absolute_error(Y_true, Y_pred)
mape = np.mean(np.abs((Y_true - Y_pred) / Y_true)) * 100

print(f'R2 score: {r2:.4f}')
print(f'MSE: {mse:.4f}')
print(f'MAE: {mae:.4f}')
print(f'MAPE: {mape:.4f}%')

步骤 7:可视化预测结果

最后,我们可以将预测结果可视化,并将其与实际值进行比较。

# 将数据反标准化
Y_true = scaler.inverse_transform(Y_true)
Y_pred = scaler.inverse_transform(Y_pred)

# 可视化预测结果
plt.figure(figsize=(10, 5))
plt.plot(Y_true, label='Actual')
plt.plot(Y_pred, label='Predicted')
plt.legend()
plt.show()

本文介绍了如何使用 Python 搭建 Informer 网络进行光伏功率预测,并提供了完整的代码示例。你可以根据实际情况调整参数和数据,以获得更准确的预测结果。

使用 Python 搭建 Informer 网络预测光伏功率

原文地址: https://www.cveoy.top/t/topic/mKCF 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录