ConvLSTM 模型:用于时间序列预测的深度学习模型
以下是使用 PyTorch 实现的 ConvLSTM 模型,用于时间序列预测。/n/npython/nclass ConvLSTMCell(nn.Module):/n def __init__(self, input_dim, hidden_dim, kernel_size, bias):/n super(ConvLSTMCell, self).__init__()/n self.input_dim = input_dim/n self.hidden_dim = hidden_dim/n self.kernel_size = kernel_size/n self.padding = kernel_size[0] // 2, kernel_size[1] // 2/n self.bias = True/n self.conv = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,/n out_channels=4 * self.hidden_dim,/n kernel_size=self.kernel_size,/n padding=self.padding,/n bias=self.bias).cuda()/n/n def forward(self, input_tensor, cur_state):/n h_cur, c_cur = cur_state/n h_cur = h_cur.cuda()/n combined = torch.cat([input_tensor, h_cur], dim=1) # 将当前时刻的输入和前一时刻的隐藏状态在通道维度上拼接起来/n combined_conv = self.conv(combined.float()) # 将拼接后的张量通过卷积层进行计算/n cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim=1) # 将卷积计算得到的结果按照hidden_dim进行分割,分别代表输入、遗忘、输出和细胞门/n i = torch.sigmoid(cc_i) # 将分割后的结果进行sigmoid和tanh操作,得到当前时刻的输入门、遗忘门、输出门和细胞状态/n f = torch.sigmoid(cc_f)/n o = torch.sigmoid(cc_o)/n g = torch.tanh(cc_g)/n c_next = f * c_cur + i * g # 根据门和细胞状态,计算当前时刻的细胞状态和隐藏状态/n h_next = o * torch.tanh(c_next)/n return h_next, c_next # 返回当前时刻的隐藏状态和细胞状态/n/n def init_hidden(self, batch_size, image_size):/n # 定义LSTM模型的隐藏状态初始化函数,输入batch_size和图像大小,返回一个与输入维度相同的全0张量作为隐藏状态/n height, width = image_size # 解包图像大小/n return (torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device),/n torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device))/n/nclass ConvLSTM(nn.Module):/n def __init__(self, input_dim, hidden_dim, kernel_size, num_layers,/n batch_first=False, bias=True, return_all_layers=False):/n super(ConvLSTM, self).__init__()/n self._check_kernel_size_consistency(kernel_size)/n # Make sure that both `kernel_size` and `hidden_dim` are lists having len == num_layers/n kernel_size = self._extend_for_multilayer(kernel_size, num_layers)/n hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)/n if not len(kernel_size) == len(hidden_dim) == num_layers:/n raise ValueError('Inconsistent list length.')/n/n self.input_dim = input_dim/n self.hidden_dim = hidden_dim/n self.kernel_size = kernel_size/n self.num_layers = num_layers/n self.batch_first = batch_first/n self.bias = bias/n self.return_all_layers = return_all_layers/n/n cell_list = []/n for i in range(0, self.num_layers):/n cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i - 1]/n/n cell_list.append(ConvLSTMCell(input_dim=cur_input_dim,/n hidden_dim=self.hidden_dim[i],/n kernel_size=self.kernel_size[i],/n bias=self.bias))/n/n self.cell_list = nn.ModuleList(cell_list)/n/n def forward(self, input_tensor, hidden_state=None):/n if not self.batch_first:/n # (t, b, c, h, w) -> (b, t, c, h, w)/n input_tensor = input_tensor.permute(1, 0, 2, 3, 4)/n/n b, t, _, h, w = input_tensor.size()/n # Implement stateful ConvLSTM/n if hidden_state is not None:/n raise NotImplementedError()/n else:/n # Since the init is done in forward. Can send image size here/n hidden_state = self._init_hidden(batch_size=b,/n image_size=(h, w))/n/n layer_output_list = []/n last_state_list = []/n/n seq_len = input_tensor.size(1)/n cur_layer_input = input_tensor/n/n for layer_idx in range(self.num_layers):/n h, c = hidden_state[layer_idx]/n output_inner = []/n for t in range(seq_len):/n h, c = self.cell_list[layer_idx](input_tensor=cur_layer_input[:, t, :, :, :], cur_state=[h, c])/n c_cur = torch.zeros_like(hidden_state[1][0])/n c_cur = c_cur[:, :1, :, :]/n output_inner.append(h)/n/n layer_output = torch.stack(output_inner, dim=1)/n cur_layer_input = layer_output/n/n layer_output_list.append(layer_output)/n last_state_list.append([h, c])/n/n if not self.return_all_layers:/n layer_output_list = layer_output_list[-1:]/n last_state_list = last_state_list[-1:]/n/n return layer_output_list, last_state_list/n/n def _init_hidden(self, batch_size, image_size):/n init_states = []/n for i in range(self.num_layers):/n init_states.append(self.cell_list[i].init_hidden(batch_size, image_size))/n/n return init_states/n/n @staticmethod/n def _check_kernel_size_consistency(kernel_size):/n if not (isinstance(kernel_size, tuple) or/n (isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):/n raise ValueError('`kernel_size` must be tuple or list of tuples')/n/n @staticmethod/n def _extend_for_multilayer(param, num_layers):/n if not isinstance(param, list):/n param = [param] * num_layers/n/n return param/n/n# 实例化对象/nmodel = ConvLSTM(input_dim=1, hidden_dim=[64, 64], kernel_size=[(1, 55), (1, 55)], num_layers=2)/n/n# 设置优化参数/ncriterion = nn.CrossEntropyLoss()/noptimizer = optim.SGD(list(model.parameters()), lr=0.001, momentum=0.9)/n/n# 读取Excel文件/ndf = pd.read_excel(r'C:/Users/19738/Desktop/数据集/01.xlsx')/ndf = df.sort_values(by=['日期', '经度', '深度'])/n/n# 首先,根据时间、经度和深度对数据进行排序:/n/n# 将数据转换为numpy数组/ndata = df.values/ntime = df.iloc[:, 0] # 提取时间列/nlongitude = df.iloc[:, 1] # 提取经度列/ndepth = df.iloc[:, 2] # 提取深度列/n/n# 假设您的数据张量大小为 (num_samples,num_time_steps,1, num_lon, num_dep, )/nnum_samples = 100/nnum_lon = 2/nnum_dep = 55/nnum_time_steps = 11/n/n# 构建新的数据张量,初始化为 0/ntemp_data = np.zeros((num_samples, num_time_steps, 1, num_lon, num_dep))/n/nfor i in range(num_samples):/n # 在这个实现中,我们使用了四个嵌套的循环来遍历数据张量中的所有位置。在每个位置上,我们根据当前时间、经度和深度信息计算对应的索引,然后从DataFrame中获取对应的温度值,并填充到数据张量中。注意,为了计算索引,我们需要将时间、经度和深度信息分别转换成字符串类型。/n for j in range(num_lon):/n for k in range(num_dep):/n for t in range(num_time_steps):/n # 根据时间、经度、深度信息计算对应的索引/n date_str = time[t].strftime('%Y/%m/%d')/n lon_str = str(longitude[j])/n dep_str = str(depth[k])/n index = (df['日期'] == date_str) & (df['经度'] == lon_str) & (df['深度'] == dep_str)/n/n # 获取对应的温度值/n temp_value = df.loc[index, '温度'].values[0]/n/n # 填充温度值到数据张量中/n temp_data[i, t, 0, j, k] = temp_value/n/ntemp_data_tensor = torch.from_numpy(temp_data)/n/n# 定义训练数据和标签/ntrain_data = temp_data_tensor[:, :6, :, :, :]/ntrain_label = temp_data_tensor[:, 6:, :, :, :]/n/nprint(train_data.shape)/nprint(train_label.shape)/n/ntrain_dataset = TensorDataset(train_data, train_label)/ntrain_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)/n/n# 定义训练循环/nfor epoch in range(10):/n for i, (inputs, labels) in enumerate(train_loader):/n # enumerate() 函数是 Python 内置函数,它可以将一个可迭代对象转化为一个枚举对象,同时列出数据和数据下标。在这里,train_loader 是一个可迭代对象,可以用 enumerate() 对它进行遍历。i 表示当前的迭代次数,data 是从 train_loader 中迭代出来的数据和标签。通过 enumerate() 函数和 i 的使用,可以在训练过程中打印出每一次迭代的损失函数值、正确率等训练指标,方便进行模型的调试和优化。/n inputs, labels = inputs.to(device), labels.to(device)/n # 将数据移动到GPU上/n # 模型中输入的 inputs 就是 train_loader 中迭代出来的 data[0],标签 labels 就是 train_loader 中迭代出来的 data[1]/n model.to(device)/n optimizer.zero_grad() # 梯度清零/n outputs = model(inputs) # 前向传播/n print(outputs)/n loss = criterion(outputs_tensor, labels) # 计算损失函数/n loss.backward() # 反向传播/n optimizer.step() # 更新参数/n/n # 输出损失/n print(f'Epoch {epoch+1}, Batch {i+1}, Loss: {loss.item()}')/n/n# 你代码的输出 outputs 是一个元组,其中包含了每个时间步的预测结果。在这个实现中,outputs[0]表示第一个时间步的预测结果,outputs[1]表示第二个时间步的预测结果,以此类推,直到outputs[-1]表示最后一个时间步的预测结果。因此,如果你想获取未来五天的预测结果,可以使用outputs[-5:]。/n/n/n代码解释/n/n1. ConvLSTM 模型定义: 这部分代码定义了 ConvLSTM 模型,包括 ConvLSTMCell 和 ConvLSTM 两个类。ConvLSTMCell 类定义了单个 ConvLSTM 单元,它接收当前时间步的输入和前一个时间步的隐藏状态,并输出当前时间步的隐藏状态和细胞状态。ConvLSTM 类则包含多个 ConvLSTM 单元,并负责处理输入数据的序列,最终输出每个时间步的预测结果。/n/n2. 数据预处理: 代码首先从 Excel 文件中读取数据,并根据时间、经度和深度信息对数据进行排序。然后,将数据转换为 numpy 数组,并构建一个新的数据张量,用于训练模型。/n/n3. 训练模型: 训练模型部分使用 DataLoader 来加载数据,并使用 SGD 优化器来更新模型参数。代码中使用了循环来迭代训练数据,并在每次迭代后计算损失函数并更新模型参数。/n/n4. 预测未来五天温度: 预测未来五天温度可以使用 outputs[-5:] 来获取模型最后五个时间步的预测结果。/n/n需要注意的是: 代码中的数据大小、训练数据和标签的划分、训练参数等都需要根据实际情况进行调整。/n
原文地址: https://www.cveoy.top/t/topic/mQQs 著作权归作者所有。请勿转载和采集!