ConvLSTM时空预测温度模型代码详解
这段代码使用ConvLSTM模型实现时空预测温度的功能。
模型结构定义
class ConvLSTMCell(nn.Module):
def __init__(self, input_dim, hidden_dim, kernel_size, bias):
super(ConvLSTMCell, self).__init__()
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.kernel_size = kernel_size
self.padding = kernel_size[0] // 2, kernel_size[1] // 2
self.bias = True
self.conv = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,
out_channels=4 * self.hidden_dim,
kernel_size=self.kernel_size,
padding=self.padding,
bias=self.bias).cuda()
def forward(self, input_tensor, cur_state):
h_cur, c_cur = cur_state
h_cur= h_cur.cuda()
combined = torch.cat([input_tensor, h_cur], dim=1) # 将当前时刻的输入和前一时刻的隐藏状态在通道维度上拼接起来
combined_conv = self.conv(combined.float()) # 将拼接后的张量通过卷积层进行计算
cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim=1)# 将卷积计算得到的结果按照hidden_dim进行分割,分别代表输入、遗忘、输出和细胞门
i = torch.sigmoid(cc_i) # 将分割后的结果进行sigmoid和tanh操作,得到当前时刻的输入门、遗忘门、输出门和细胞状态
f = torch.sigmoid(cc_f)
o = torch.sigmoid(cc_o)
g = torch.tanh(cc_g)
c_next = f * c_cur + i * g # 根据门和细胞状态,计算当前时刻的细胞状态和隐藏状态
h_next = o * torch.tanh(c_next)
return h_next, c_next # 返回当前时刻的隐藏状态和细胞状态
def init_hidden(self, batch_size, image_size):
# 定义LSTM模型的隐藏状态初始化函数,输入batch_size和图像大小,返回一个与输入维度相同的全0张量作为隐藏状态
height, width = image_size # 解包图像大小
return (torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device),
# 返回与输入维度相同的全0张量作为隐藏状态
torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device))
class ConvLSTM(nn.Module):
def __init__(self, input_dim,hidden_dim, kernel_size, num_layers,
batch_first=False, bias=True, return_all_layers=False):
super(ConvLSTM, self).__init__()
self._check_kernel_size_consistency(kernel_size)
# Make sure that both `kernel_size` and `hidden_dim` are lists having len == num_layers
kernel_size = self._extend_for_multilayer(kernel_size, num_layers)
hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)
if not len(kernel_size) == len(hidden_dim) == num_layers:
raise ValueError('Inconsistent list length.')
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.kernel_size = kernel_size
self.num_layers = num_layers
self.batch_first = batch_first
self.bias = bias
self.return_all_layers = return_all_layers
cell_list = []
for i in range(0, self.num_layers):
cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i - 1]
cell_list.append(ConvLSTMCell(input_dim=cur_input_dim,
hidden_dim=self.hidden_dim[i],
kernel_size=self.kernel_size[i],
bias=self.bias))
self.cell_list = nn.ModuleList(cell_list)
def forward(self, input_tensor, hidden_state=None):
if not self.batch_first:
# (t, b, c, h, w) -> (b, t, c, h, w)
input_tensor = input_tensor.permute(1, 0, 2, 3, 4)
b, t, _, h, w = input_tensor.size()
# Implement stateful ConvLSTM
if hidden_state is not None:
raise NotImplementedError()
else:
# Since the init is done in forward. Can send image size here
hidden_state = self._init_hidden(batch_size=b,
image_size=(h, w))
layer_output_list = []
last_state_list = []
seq_len = input_tensor.size(1)
cur_layer_input = input_tensor
for layer_idx in range(self.num_layers):
h, c = hidden_state[layer_idx]
output_inner = []
for t in range(seq_len):
h, c = self.cell_list[layer_idx](input_tensor=cur_layer_input[:, t, :, :, :], cur_state=[h, c])
c_cur =torch.zeros_like(hidden_state[1][0])
c_cur = c_cur[:, :1, :, :]
output_inner.append(h)
layer_output = torch.stack(output_inner, dim=1)
cur_layer_input = layer_output
layer_output_list.append(layer_output)
last_state_list.append([h, c])
if not self.return_all_layers:
layer_output_list = layer_output_list[-1:]
last_state_list = last_state_list[-1:]
return layer_output_list, last_state_list
def _init_hidden(self, batch_size, image_size):
init_states = []
for i in range(self.num_layers):
init_states.append(self.cell_list[i].init_hidden(batch_size, image_size))
return init_states
@staticmethod
def _check_kernel_size_consistency(kernel_size):
if not (isinstance(kernel_size, tuple) or
(isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):
raise ValueError('`kernel_size` must be tuple or list of tuples')
@staticmethod
def _extend_for_multilayer(param, num_layers):
if not isinstance(param, list):
param = [param] * num_layers
return param
# 实例化对象
model = ConvLSTM(input_dim=1, hidden_dim=[64, 64], kernel_size=[(1, 55), (1, 55)], num_layers=2)
# 设置优化参数
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(list(model.parameters()), lr=0.001, momentum=0.9)
# 读取Excel文件
df = pd.read_excel(r'C:\Users\19738\Desktop\数据集\01.xlsx')
df = df.sort_values(by=['日期', '经度', '深度'])
# 将数据转换为numpy数组
data = df.values
time = df.iloc[:, 0] # 提取时间列
longitude = df.iloc[:, 1] # 提取经度列
depth = df.iloc[:, 2] # 提取深度列
# 假设您的数据张量大小为 (num_samples,num_time_steps,1, num_lon, num_dep, )
num_samples = 100
num_lon = 2
num_dep = 55
num_time_steps = 11
# 构建新的数据张量,初始化为 0
temp_data = np.zeros((num_samples, num_time_steps, 1, num_lon, num_dep))
for i in range(num_samples):
for j in range(num_lon):
for k in range(num_dep):
for t in range(num_time_steps):
# 根据时间、经度、深度信息计算对应的索引
date_str = time[t].strftime('%Y/%m/%d')
lon_str = str(longitude[j])
dep_str = str(depth[k])
index = (df['日期'] == date_str) & (df['经度'] == lon_str) & (df['深度'] == dep_str)
# 获取对应的温度值
temp_value = df.loc[index, '温度'].values[0]
# 填充温度值到数据张量中
temp_data[i, t, 0,j, k] = temp_value
temp_data_tensor = torch.from_numpy(temp_data)
# 定义训练数据和标签
train_data = temp_data_tensor[:, :6, :, :, :]
train_label = temp_data_tensor[:, 6:, :, :, :]
print(train_data.shape)
print(train_label.shape)
train_dataset = TensorDataset(train_data, train_label)
train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)
# 定义训练循环
for epoch in range(10):
for i, (inputs, labels) in enumerate(train_loader):
inputs, labels = inputs.to(device), labels.to(device)
model.to(device)
optimizer.zero_grad() # 梯度清零
outputs = model(inputs) # 前向传播
print(outputs)
loss = criterion(outputs_tensor, labels) # 计算损失函数
loss.backward() # 反向传播
optimizer.step() # 更新参数
# 输出损失
print(f'Epoch {epoch+1}, Batch {i+1}, Loss: {loss.item()}')
代码解释:
- 模型定义: 代码定义了两个类:
ConvLSTMCell和ConvLSTM。ConvLSTMCell是 ConvLSTM 的基本单元,包含了卷积操作和门控机制,ConvLSTM则将多个ConvLSTMCell堆叠起来,形成多层网络。 - 数据处理: 代码首先读取 Excel 文件,然后将数据转换为 NumPy 数组,并将其 reshape 成所需的张量形式。数据张量包含了时间、经度、深度和温度信息。
- 训练: 代码使用 PyTorch 的
DataLoader类将数据封装成批次,然后使用训练循环对模型进行训练。训练过程中,模型接收输入数据,进行前向传播,计算损失,并使用反向传播算法更新模型参数。 - 预测: 训练结束后,可以使用模型对新的数据进行预测。
总结:
这段代码实现了 ConvLSTM 模型,并使用它进行时空预测温度。代码包含了模型结构定义、数据处理、训练和预测等关键步骤。ConvLSTM 模型适用于需要进行时空序列数据预测的场景,例如气象预报、交通流量预测等。
注意:
这段代码仅供参考,实际应用中需要根据具体情况进行调整。例如,可以尝试不同的模型参数、数据预处理方法、损失函数等,以提高模型的性能。
原文地址: https://www.cveoy.top/t/topic/mQOq 著作权归作者所有。请勿转载和采集!