首先,我们需要导入需要用到的库:numpy、pandas、matplotlib、sklearn等。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Dropout, RepeatVector, TimeDistributed
from tensorflow.keras.callbacks import EarlyStopping

接下来,读取训练集和测试集的数据,并进行预处理。

# 读取数据
train_df = pd.read_csv('train.csv')
test_df = pd.read_csv('test.csv')

# 处理日期
train_df['date'] = pd.to_datetime(train_df['date'], format='%Y%m%d')
test_df['date'] = pd.to_datetime(test_df['date'], format='%Y%m%d')

# 将日期和时间合并成新的时间特征
train_df['time'] = (train_df['date'] - pd.Timestamp('1970-01-01')) // pd.Timedelta('1s') + train_df['time']
test_df['time'] = (test_df['date'] - pd.Timestamp('1970-01-01')) // pd.Timedelta('1s') + test_df['time']

# 将时间特征进行归一化处理
scaler = MinMaxScaler()
train_df[['time', 'close']] = scaler.fit_transform(train_df[['time', 'close']])
test_df[['time']] = scaler.transform(test_df[['time']])

接着,我们需要对训练集的数据进行划分,将其分为训练集和验证集。

# 将数据划分为训练集和验证集
train_data = train_df[['time', 'close']].values
X_train, X_val, y_train, y_val = train_test_split(train_data[:, 0], train_data[:, 1], test_size=0.2, shuffle=True)

# 将数据转换为LSTM模型可用的形式
def create_dataset(X, y, time_steps=1):
    Xs, ys = [], []
    for i in range(len(X) - time_steps):
        Xs.append(X[i:(i + time_steps)])
        ys.append(y[i + time_steps])
    return np.array(Xs), np.array(ys)

TIME_STEPS = 48
X_train, y_train = create_dataset(X_train, y_train, TIME_STEPS)
X_val, y_val = create_dataset(X_val, y_val, TIME_STEPS)

# 将数据转换为3D的形式
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
X_val = X_val.reshape((X_val.shape[0], X_val.shape[1], 1))

然后,我们可以建立LSTM模型。

# 建立LSTM模型
model = Sequential([
    LSTM(128, input_shape=(TIME_STEPS, 1)),
    Dropout(0.2),
    RepeatVector(TIME_STEPS),
    LSTM(128, return_sequences=True),
    Dropout(0.2),
    TimeDistributed(Dense(1))
])

model.compile(loss='mse', optimizer='adam', metrics=['mae'])

在训练模型之前,我们需要设置一个EarlyStopping的回调函数,以便在模型训练过程中监控模型的性能并在性能停止提升时停止训练。

# 设置EarlyStopping的回调函数
early_stopping = EarlyStopping(monitor='val_loss', patience=5)

接下来,我们可以开始训练模型了。

# 训练模型
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=100,
    batch_size=64,
    callbacks=[early_stopping],
    verbose=1,
    shuffle=False
)

在模型训练完成后,我们可以使用模型对测试集进行预测。

# 使用模型对测试集进行预测
test_data = test_df[test_df['id'] == 1][['time']].values
test_data = test_data.reshape((test_data.shape[0], TIME_STEPS, 1))
y_pred = model.predict(test_data)

# 将预测结果进行反归一化处理
y_pred = scaler.inverse_transform(y_pred.reshape(-1, 1)).reshape(-1)

最后,我们可以将预测结果可视化,以便更好地了解模型的性能。

# 可视化预测结果
plt.plot(test_df[test_df['id'] == 1]['time'], test_df[test_df['id'] == 1]['close'], label='Actual')
plt.plot(test_df[test_df['id'] == 1]['time'][48:], y_pred, label='Predicted')
plt.legend()
plt.show()

完整代码如下

traincsv - 训练集文件某只股票近5个月开盘日每5分钟的数据935~1500即每天48个数据。包括日期使用数字作为代替时间收盘价三个特征。testcsv - 测试集文件以5分钟为间隔24个时间点为一个周期我们给定21个时间点股票的所有数据需要参赛者预测3个空白时间点的收盘价。注:给定的测试数据并非按照时间顺序只是通过随机抽取得到。使用python中的transformer进行股票收盘价预测

原文地址: https://www.cveoy.top/t/topic/fs1U 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录