基于粒子群优化的电力负荷预测模型
import os import random import numpy as np import pandas as pd
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import torch.utils.data as Data
from sklearn.model_selection import train_test_split from tqdm import tqdm
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
load dataset from a csv file
df = pd.read_csv('Electric_Power_Consumption.csv', delimiter=';', parse_dates={'dt': ['Date', 'Time']}, infer_datetime_format=True, na_values=['nan', '?'], index_col='dt')
fill missing values with the last known value
df.fillna(method='ffill', inplace=True)
resample to hourly data
df = df.resample('H').mean()
normalize the data
df = (df - df.mean()) / df.std()
split into training and testing datasets
train_df, test_df = train_test_split(df, test_size=0.2, shuffle=False)
convert data into sequences of length seq_len+timestep_pred
seq_len = 24 * 7 # one week timestep_pred = 24 # predict the next 24 hours train_data = [] for i in range(seq_len, len(train_df) - timestep_pred): train_data.append(train_df.iloc[i - seq_len:i].values.tolist())
train_data = np.array(train_data) train_labels = train_df.iloc[seq_len + timestep_pred:].values
test_data = [] for i in range(seq_len, len(test_df) - timestep_pred): test_data.append(test_df.iloc[i - seq_len:i].values.tolist())
test_data = np.array(test_data) test_labels = test_df.iloc[seq_len + timestep_pred:].values
print('training dataset shapes: train_data: %s and train_labels: %s' % (train_data.shape, train_labels.shape)) print('testing dataset shapes: test_data: %s and test_labels: %s' % (test_data.shape, test_labels.shape))
train_y_ts = torch.from_numpy(train_labels).float().to(device) train_X_ts = torch.from_numpy(train_data).float().to(device)
test_y_ts = torch.from_numpy(test_labels).float().to(device) test_X_ts = torch.from_numpy(test_data).float().to(device)
train_set = Data.TensorDataset(train_X_ts, train_y_ts) test_set = Data.TensorDataset(test_X_ts, test_y_ts)
num_clients = 10 num_selected = 10 num_rounds = 10 epochs = 10 batch_size = 64
traindata_split = torch.utils.data.random_split(train_set, [int(train_labels.shape[0] / num_clients)] * (num_clients - 1) + [ train_labels.shape[0] - int(train_labels.shape[0] / num_clients) * ( num_clients - 1)])
train_loader = [torch.utils.data.DataLoader(x, batch_size=batch_size, shuffle=True) for x in traindata_split] test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=True)
Define the LSTM model
class LSTM(nn.Module): def init(self, input_dim, hidden_dim, seq_len, num_layers=2): super(LSTM, self).init() self.input_dim = input_dim self.hidden_dim = hidden_dim self.seq_len = seq_len self.num_layers = num_layers
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim * seq_len, 1)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device)
out, (hn, cn) = self.lstm(x, (h0, c0))
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
define particle
class Particle: def init(self, model): # self.position = model.get_params().detach().clone() self.position = [param.data.detach().clone() for param in model.parameters()] self.velocity = [torch.zeros_like(param) for param in self.position] self.best_position = [param.data.detach().clone() for param in self.position] self.best_loss = float('inf')
define PSO algorithm
class PSO: def init(self, model, criterion, lr=0.1, momentum=0.9, weight_decay=0.001): self.model = model self.criterion = criterion self.lr = lr self.momentum = momentum self.weight_decay = weight_decay self.particles = [Particle(model) for _ in range(num_selected)]
#print(model.state_dict()['fc.weight'].shape)
def step(self):
for i in range(num_selected):
# Update velocity
for j, param in enumerate(self.particles[i].position):
self.particles[i].velocity[j] = self.momentum * self.particles[i].velocity[j] \
+ 2 * torch.rand_like(param) * (
self.particles[i].best_position[j] - param) \
+ 2 * torch.rand_like(param) * (global_best_position[j] - param)
# Clamp velocity
self.particles[i].velocity[j].clamp_(-1, 1)
# Update position
self.particles[i].position[j] += self.particles[i].velocity[j] * self.lr
# Clamp position
self.particles[i].position[j].clamp_(-1, 1)
# Evaluate fitness of new position
for j, param in enumerate(self.particles[i].position):
if j == 0:
self.model.state_dict()[f'lstm.weight_ih_l0'] = param
elif j == 1:
self.model.state_dict()[f'lstm.weight_hh_l0'] = param
elif j == 2:
self.model.state_dict()[f'lstm.bias_ih_l0'] = param
elif j == 3:
self.model.state_dict()[f'lstm.bias_hh_l0'] = param
elif j == 4:
self.model.state_dict()[f'lstm.weight_ih_l1'] = param
elif j == 5:
self.model.state_dict()[f'lstm.weight_hh_l1'] = param
elif j == 6:
self.model.state_dict()[f'lstm.bias_ih_l1'] = param
elif j == 7:
self.model.state_dict()[f'lstm.bias_hh_l1'] = param
elif j == 8:
self.model.state_dict()[f'fc.weight'] = param
elif j == 9:
self.model.state_dict()[f'fc.bias'] = param
loss = self.evaluate_fitness()
# Update personal best
if loss < self.particles[i].best_loss:
self.particles[i].best_position = [param.data.detach().clone() for param in self.particles[i].position]
self.particles[i].best_loss = loss
def evaluate_fitness(self):
total_loss = 0.0
with torch.no_grad():
for data, target in test_loader:
output = self.model(data)
loss = self.criterion(output, target)
total_loss += loss.item() * len(data)
return total_loss / len(test_set)
def get_best(self):
best_particle = min(self.particles, key=lambda x: x.best_loss)
return best_particle.best_position, best_particle.best_loss
def evaluate(self, data_loader):
total_loss = 0.0
with torch.no_grad():
for data, target in data_loader:
output = self.model(data)
loss = self.criterion(output, target)
total_loss += loss.item() * len(data)
return total_loss / len(data_loader.dataset)
Train the model using PSO
input_dim = train_data.shape[2] hidden_dim = 128 model = LSTM(input_dim, hidden_dim, seq_len).to(device) criterion = nn.MSELoss() optimizer_pso = PSO(model=model, criterion=criterion) #print(model.state_dict()) global_best_loss = float('inf') global_best_position = None
for r in range(num_rounds): selected_particles = random.sample(optimizer_pso.particles, num_selected)
for i in range(num_selected):
# optimizer = optim.Adam(selected_particles[i].position, lr=0.01, weight_decay=0.1)
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=0.1)
# model.set_params(selected_particles[i].position)
model_param = iter(model.parameters())
for param in model_param:
with torch.no_grad():
for j, param in enumerate(selected_particles[i].position):
if j == 0:
model.state_dict()[f'lstm.weight_ih_l0'] = param
elif j == 1:
model.state_dict()[f'lstm.weight_hh_l0'] = param
elif j == 2:
model.state_dict()[f'lstm.bias_ih_l0'] = param
elif j == 3:
model.state_dict()[f'lstm.bias_hh_l0'] = param
elif j == 4:
model.state_dict()[f'lstm.weight_ih_l1'] = param
elif j == 5:
model.state_dict()[f'lstm.weight_hh_l1'] = param
elif j == 6:
model.state_dict()[f'lstm.bias_ih_l1'] = param
elif j == 7:
model.state_dict()[f'lstm.bias_hh_l1'] = param
elif j == 8:
model.state_dict()[f'fc.weight'] = param
elif j == 9:
model.state_dict()[f'fc.bias'] = param
#param.data.copy_(selected_particles[i].position)
train_loss = 0.0
for epoch in range(epochs):
for data, target in tqdm(train_loader[i], desc='Training round %d/%d client %d/%d' % (r + 1, num_rounds, i + 1, num_clients)):
if len(target.shape) == 2:
target = torch.unsqueeze(target, dim=2)
target = target.permute(0, 2, 1) # 将目标张量的形状转换为 [batch_size, seq_len, 1]
target = target[:, :, 0] # 将最后一维的长度为 1 的维度去掉
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
# Update personal best
for j, param in enumerate(selected_particles[i].position):
if j == 0:
param.data.copy_(model.state_dict()[f'lstm.weight_ih_l0'])
elif j == 1:
param.data.copy_(model.state_dict()[f'lstm.weight_hh_l0'])
elif j == 2:
param.data.copy_(model.state_dict()[f'lstm.bias_ih_l0'])
elif j == 3:
param.data.copy_(model.state_dict()[f'lstm.bias_hh_l0'])
elif j == 4:
param.data.copy_(model.state_dict()[f'lstm.weight_ih_l1'])
elif j == 5:
param.data.copy_(model.state_dict()[f'lstm.weight_hh_l1'])
elif j == 6:
param.data.copy_(model.state_dict()[f'lstm.bias_ih_l1'])
elif j == 7:
param.data.copy_(model.state_dict()[f'lstm.bias_hh_l1'])
elif j == 8:
param.data.copy_(model.state_dict()[f'fc.weight'])
elif j == 9:
param.data.copy_(model.state_dict()[f'fc.bias'])
selected_particles[i].best_position = [param.data.detach().clone() for param in
selected_particles[i].position]
selected_particles[i].best_loss = optimizer_pso.evaluate_fitness()
# Update global best
if selected_particles[i].best_loss < global_best_loss:
global_best_loss = selected_particles[i].best_loss
global_best_position = [param.data.detach().clone() for param in
selected_particles[i].position]
# Evaluate the model on training and testing datasets
# train_mse = optimizer_pso.evaluate(train_loader[i])
# test_mse = optimizer_pso.evaluate(test_loader)
# print('Round %d/%d, Client %d/%d, Train MSE: %.4f, Test MSE: %.4f' % (r + 1, num_rounds, i + 1, num_clients, train_mse, test_mse))
# Update particle swarm
optimizer_pso.step()
# Copy parameters from model to particle swarm
for j, param in enumerate(selected_particles[i].position):
if j == 0:
param.data.copy_(model.state_dict()[f'lstm.weight_ih_l0'])
elif j == 1:
param.data.copy_(model.state_dict()[f'lstm.weight_hh_l0'])
elif j == 2:
param.data.copy_(model.state_dict()[f'lstm.bias_ih_l0'])
elif j == 3:
param.data.copy_(model.state_dict()[f'lstm.bias_hh_l0'])
elif j == 4:
param.data.copy_(model.state_dict()[f'lstm.weight_ih_l1'])
elif j == 5:
param.data.copy_(model.state_dict()[f'lstm.weight_hh_l1'])
elif j == 6:
param.data.copy_(model.state_dict()[f'lstm.bias_ih_l1'])
elif j == 7:
param.data.copy_(model.state_dict()[f'lstm.bias_hh_l1'])
elif j == 8:
param.data.copy_(model.state_dict()[f'fc.weight'])
elif j == 9:
param.data.copy_(model.state_dict()[f'fc.bias'])
# Print final evaluation on training and testing datasets
train_mse = optimizer_pso.evaluate(train_loader[0])
test_mse = optimizer_pso.evaluate(test_loader)
# Print progress
print('Round %d/%d, Global best loss: %.4f' % (r + 1, num_rounds, global_best_loss))
print('Final evaluation, Train MSE: %.4f, Test MSE: %.4f' % (train_mse, test_mse))
原文地址: https://www.cveoy.top/t/topic/nQv3 著作权归作者所有。请勿转载和采集!