Jupyter Notebook中模型训练回调函数和评估函数代码详解
# --------------------------------------- TRAINING CALLBACKS ---------------------------------------
class save_latent_space_viz(Callback):
# 定义一个继承自 keras.callbacks.Callback 的回调函数类,用于在每个 epoch 结束时保存模型的潜在空间可视化图像
def __init__(self, model, data, target):
# 初始化回调函数,传入模型、数据和目标值
self.model = model
self.data = data
self.target = target
def on_train_begin(self, logs={'}):'
# 在训练开始时初始化最佳验证集损失
self.best_val_loss = 100000
def on_epoch_end(self, epoch, logs=None):
# 在每个 epoch 结束时调用,如果当前验证集损失优于最佳验证集损失,则保存潜在空间可视化图像
encoder = self.model.layers[0] # 获取编码器
if logs.get('val_loss') < self.best_val_loss:
self.best_val_loss = logs.get('val_loss')
viz_latent_space(encoder, self.data, self.target, epoch, True, False)
# 定义一个函数,用于获取模型的回调函数列表
def get_callbacks(model, data, target):
model_callbacks = [
EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=30), # 提前停止,当验证集损失不再下降时停止训练
ModelCheckpoint(filepath='./checkpoints/checkpoint',monitor='val_loss', mode='min', verbose=1, save_best_only=True, save_weights_only=True), # 模型检查点,保存验证集损失最小的模型
TensorBoard(log_dir='./logs'), # TensorBoard 可视化
save_latent_space_viz(model, data, target) # 自定义的保存潜在空间可视化图像的回调函数
]
return model_callbacks
# 定义一个函数,用于可视化模型的潜在空间
def viz_latent_space(encoder, data, targets=[], epoch='Final', save=False, show=True):
z, _, _ = encoder.predict(data) # 使用编码器预测数据的潜在空间表示
plt.figure(figsize=(8, 10))
if len(targets)>0:
plt.scatter(z[:, 0], z[:, 1], c=targets) # 如果有目标值,则根据目标值对数据点进行颜色编码
else:
plt.scatter(z[:, 0], z[:, 1]) # 否则直接绘制数据点
plt.xlabel('z - dim 1')
plt.ylabel('z - dim 2')
plt.colorbar()
if show:
plt.show() # 显示图像
if save:
plt.savefig('./images/latent_space_epoch'+str(epoch)+'.png') # 保存图像
return z
# ---------------------------------------------------------------------------------------------------
# ----------------------------------------- FIND OPTIMAL LR ----------------------------------------
class LRFinder:
# 定义一个类,用于寻找最优学习率
"""
Cyclical LR, code tailored from:
https://towardsdatascience.com/estimating-optimal-learning-rate-for-a-deep-neural-network-ce32f2556ce0
"""
def __init__(self, model):
# 初始化 LRFinder 类,传入模型
self.model = model
self.losses = []
self.lrs = []
self.best_loss = 1e9
def on_batch_end(self, batch, logs):
# 在每个 batch 结束时调用,记录学习率和损失值
lr = K.get_value(self.model.optimizer.lr)
self.lrs.append(lr)
loss = logs['loss']
self.losses.append(loss)
# 如果损失值过大或出现 NaN,则停止训练
if batch > 5 and (math.isnan(loss) or loss > self.best_loss * 4):
self.model.stop_training = True
return
if loss < self.best_loss:
self.best_loss = loss
# 增加学习率
lr *= self.lr_mult
K.set_value(self.model.optimizer.lr, lr)
def find(self, x_train, y_train, start_lr, end_lr, batch_size=64, epochs=1, **kw_fit):
# 使用训练数据寻找最优学习率
N = x_train[0].shape[0] if isinstance(x_train, list) else x_train.shape[0]
# 计算 batch 数量和学习率乘数
num_batches = epochs * N / batch_size
self.lr_mult = (float(end_lr) / float(start_lr)) ** (float(1) / float(num_batches))
# 保存模型权重
initial_weights = self.model.get_weights()
# 保存原始学习率
original_lr = K.get_value(self.model.optimizer.lr)
# 设置初始学习率
K.set_value(self.model.optimizer.lr, start_lr)
callback = LambdaCallback(on_batch_end=lambda batch, logs: self.on_batch_end(batch, logs))
# 训练模型
self.model.fit(x_train, y_train,
batch_size=batch_size, epochs=epochs,
callbacks=[callback],
**kw_fit)
# 恢复模型权重
self.model.set_weights(initial_weights)
# 恢复原始学习率
K.set_value(self.model.optimizer.lr, original_lr)
def find_generator(self, generator, start_lr, end_lr, epochs=1, steps_per_epoch=None, **kw_fit):
# 使用数据生成器寻找最优学习率
if steps_per_epoch is None:
try:
steps_per_epoch = len(generator)
except (ValueError, NotImplementedError) as e:
raise e('`steps_per_epoch=None` is only valid for a'
' generator based on the '
'`keras.utils.Sequence`'
' class. Please specify `steps_per_epoch` '
'or use the `keras.utils.Sequence` class.')
self.lr_mult = (float(end_lr) / float(start_lr)) ** (float(1) / float(epochs * steps_per_epoch))
# 保存模型权重
initial_weights = self.model.get_weights()
# 保存原始学习率
original_lr = K.get_value(self.model.optimizer.lr)
# 设置初始学习率
K.set_value(self.model.optimizer.lr, start_lr)
callback = LambdaCallback(on_batch_end=lambda batch,
logs: self.on_batch_end(batch, logs))
# 训练模型
self.model.fit_generator(generator=generator,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
callbacks=[callback],
**kw_fit)
# 恢复模型权重
self.model.set_weights(initial_weights)
# 恢复原始学习率
K.set_value(self.model.optimizer.lr, original_lr)
def plot_loss(self, n_skip_beginning=10, n_skip_end=5, x_scale='log'):
# 绘制损失值随学习率的变化曲线
plt.ylabel('loss')
plt.xlabel('learning rate (log scale)')
plt.plot(self.lrs[n_skip_beginning:-n_skip_end], self.losses[n_skip_beginning:-n_skip_end])
plt.xscale(x_scale)
plt.show()
def plot_loss_change(self, sma=1, n_skip_beginning=10, n_skip_end=5, y_lim=(-0.01, 0.01)):
# 绘制损失值变化率随学习率的变化曲线
derivatives = self.get_derivatives(sma)[n_skip_beginning:-n_skip_end]
lrs = self.lrs[n_skip_beginning:-n_skip_end]
plt.ylabel('rate of loss change')
plt.xlabel('learning rate (log scale)')
plt.plot(lrs, derivatives)
plt.xscale('log')
plt.ylim(y_lim)
plt.show()
def get_derivatives(self, sma):
# 计算损失值变化率
assert sma >= 1
derivatives = [0] * sma
for i in range(sma, len(self.lrs)):
derivatives.append((self.losses[i] - self.losses[i - sma]) / sma)
return derivatives
def get_best_lr(self, sma, n_skip_beginning=10, n_skip_end=5):
# 获取最优学习率
derivatives = self.get_derivatives(sma)
best_der_idx = np.argmin(derivatives[n_skip_beginning:-n_skip_end])
return self.lrs[n_skip_beginning:-n_skip_end][best_der_idx]
# ---------------------------------------------------------------------------------------------------
# --------------------------------------------- RESULTS --------------------------------------------
def get_model(path):
# 加载保存的模型
saved_VRAE_model = load_model(path, compile=False)
# 返回编码器和回归器
return saved_VRAE_model.layers[1], saved_VRAE_model.layers[2]
def evaluate(y_true, y_hat, label='test'):
# 评估模型性能
mse = mean_squared_error(y_true, y_hat) # 计算均方误差
rmse = np.sqrt(mse) # 计算均方根误差
variance = r2_score(y_true, y_hat) # 计算 R2 分数
print('{} set RMSE:{}, R2:{}'.format(label, rmse, variance))
def score(y_true, y_hat):
# 计算模型得分
res = 0
for true, hat in zip(y_true, y_hat):
subs = hat - true
if subs < 0:
res = res + np.exp(-subs/10)[0]-1
else:
res = res + np.exp(subs/13)[0]-1
print('score: ', res)
def results(path, x_train, y_train, x_test, y_test):
# 获取模型结果
encoder, regressor = get_model(path) # 加载模型
# 潜在空间可视化
train_mu = viz_latent_space(encoder, x_train, y_train) # 可视化训练集的潜在空间
test_mu = viz_latent_space(encoder, x_test, y_test) # 可视化测试集的潜在空间
# 评估模型
y_hat_train = regressor.predict(train_mu) # 预测训练集的目标值
y_hat_test = regressor.predict(test_mu) # 预测测试集的目标值
evaluate(y_train, y_hat_train, 'train') # 评估训练集性能
evaluate(y_test, y_hat_test, 'test') # 评估测试集性能
score(y_test, y_hat_test) # 计算模型得分
原文地址: https://www.cveoy.top/t/topic/jPeA 著作权归作者所有。请勿转载和采集!