# --------------------------------------- TRAINING CALLBACKS  ---------------------------------------
class save_latent_space_viz(Callback):
    # 定义一个继承自 keras.callbacks.Callback 的回调函数类,用于在每个 epoch 结束时保存模型的潜在空间可视化图像
	def __init__(self, model, data, target):
		# 初始化回调函数,传入模型、数据和目标值
		self.model = model
		self.data = data
		self.target = target
	
def on_train_begin(self, logs={'}):'
		# 在训练开始时初始化最佳验证集损失
		self.best_val_loss = 100000
		
	def on_epoch_end(self, epoch, logs=None):
		# 在每个 epoch 结束时调用,如果当前验证集损失优于最佳验证集损失,则保存潜在空间可视化图像
		encoder = self.model.layers[0] # 获取编码器
		if logs.get('val_loss') < self.best_val_loss:
			self.best_val_loss = logs.get('val_loss')
			viz_latent_space(encoder, self.data, self.target, epoch, True, False)
	
# 定义一个函数,用于获取模型的回调函数列表
def get_callbacks(model, data, target):
	model_callbacks = [
		EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=30), # 提前停止,当验证集损失不再下降时停止训练
        ModelCheckpoint(filepath='./checkpoints/checkpoint',monitor='val_loss', mode='min', verbose=1, save_best_only=True, save_weights_only=True), # 模型检查点,保存验证集损失最小的模型
		TensorBoard(log_dir='./logs'), # TensorBoard 可视化
        save_latent_space_viz(model, data, target) # 自定义的保存潜在空间可视化图像的回调函数
	]
	return model_callbacks

# 定义一个函数,用于可视化模型的潜在空间
def viz_latent_space(encoder, data, targets=[], epoch='Final', save=False, show=True):
    z, _, _  = encoder.predict(data) # 使用编码器预测数据的潜在空间表示
    plt.figure(figsize=(8, 10))
    if len(targets)>0:
        plt.scatter(z[:, 0], z[:, 1], c=targets) # 如果有目标值,则根据目标值对数据点进行颜色编码
    else:
        plt.scatter(z[:, 0], z[:, 1]) # 否则直接绘制数据点
    plt.xlabel('z - dim 1')
    plt.ylabel('z - dim 2')
    plt.colorbar()
    if show:
        plt.show() # 显示图像
    if save:
        plt.savefig('./images/latent_space_epoch'+str(epoch)+'.png') # 保存图像
    return z
# ---------------------------------------------------------------------------------------------------

# ----------------------------------------- FIND OPTIMAL LR  ----------------------------------------
class LRFinder:
    # 定义一个类,用于寻找最优学习率
    """
    Cyclical LR, code tailored from:
    https://towardsdatascience.com/estimating-optimal-learning-rate-for-a-deep-neural-network-ce32f2556ce0
    """

    def __init__(self, model):
        # 初始化 LRFinder 类,传入模型
        self.model = model
        self.losses = []
        self.lrs = []
        self.best_loss = 1e9

    def on_batch_end(self, batch, logs):
        # 在每个 batch 结束时调用,记录学习率和损失值
        lr = K.get_value(self.model.optimizer.lr)
        self.lrs.append(lr)

        loss = logs['loss']
        self.losses.append(loss)

        # 如果损失值过大或出现 NaN,则停止训练
        if batch > 5 and (math.isnan(loss) or loss > self.best_loss * 4):
            self.model.stop_training = True
            return

        if loss < self.best_loss:
            self.best_loss = loss

        # 增加学习率
        lr *= self.lr_mult
        K.set_value(self.model.optimizer.lr, lr)

    def find(self, x_train, y_train, start_lr, end_lr, batch_size=64, epochs=1, **kw_fit):
        # 使用训练数据寻找最优学习率
        N = x_train[0].shape[0] if isinstance(x_train, list) else x_train.shape[0]

        # 计算 batch 数量和学习率乘数
        num_batches = epochs * N / batch_size
        self.lr_mult = (float(end_lr) / float(start_lr)) ** (float(1) / float(num_batches))
        # 保存模型权重
        initial_weights = self.model.get_weights()

        # 保存原始学习率
        original_lr = K.get_value(self.model.optimizer.lr)

        # 设置初始学习率
        K.set_value(self.model.optimizer.lr, start_lr)

        callback = LambdaCallback(on_batch_end=lambda batch, logs: self.on_batch_end(batch, logs))

        # 训练模型
        self.model.fit(x_train, y_train,
                       batch_size=batch_size, epochs=epochs,
                       callbacks=[callback],
                       **kw_fit)

        # 恢复模型权重
        self.model.set_weights(initial_weights)

        # 恢复原始学习率
        K.set_value(self.model.optimizer.lr, original_lr)

    def find_generator(self, generator, start_lr, end_lr, epochs=1, steps_per_epoch=None, **kw_fit):
        # 使用数据生成器寻找最优学习率
        if steps_per_epoch is None:
            try:
                steps_per_epoch = len(generator)
            except (ValueError, NotImplementedError) as e:
                raise e('`steps_per_epoch=None` is only valid for a'
                        ' generator based on the '
                        '`keras.utils.Sequence`'
                        ' class. Please specify `steps_per_epoch` '
                        'or use the `keras.utils.Sequence` class.')
        self.lr_mult = (float(end_lr) / float(start_lr)) ** (float(1) / float(epochs * steps_per_epoch))

        # 保存模型权重
        initial_weights = self.model.get_weights()

        # 保存原始学习率
        original_lr = K.get_value(self.model.optimizer.lr)

        # 设置初始学习率
        K.set_value(self.model.optimizer.lr, start_lr)

        callback = LambdaCallback(on_batch_end=lambda batch,
                                                      logs: self.on_batch_end(batch, logs))

        # 训练模型
        self.model.fit_generator(generator=generator,
                                 epochs=epochs,
                                 steps_per_epoch=steps_per_epoch,
                                 callbacks=[callback],
                                 **kw_fit)

        # 恢复模型权重
        self.model.set_weights(initial_weights)

        # 恢复原始学习率
        K.set_value(self.model.optimizer.lr, original_lr)

    def plot_loss(self, n_skip_beginning=10, n_skip_end=5, x_scale='log'):
        # 绘制损失值随学习率的变化曲线
        plt.ylabel('loss')
        plt.xlabel('learning rate (log scale)')
        plt.plot(self.lrs[n_skip_beginning:-n_skip_end], self.losses[n_skip_beginning:-n_skip_end])
        plt.xscale(x_scale)
        plt.show()

    def plot_loss_change(self, sma=1, n_skip_beginning=10, n_skip_end=5, y_lim=(-0.01, 0.01)):
        # 绘制损失值变化率随学习率的变化曲线
        derivatives = self.get_derivatives(sma)[n_skip_beginning:-n_skip_end]
        lrs = self.lrs[n_skip_beginning:-n_skip_end]
        plt.ylabel('rate of loss change')
        plt.xlabel('learning rate (log scale)')
        plt.plot(lrs, derivatives)
        plt.xscale('log')
        plt.ylim(y_lim)
        plt.show()

    def get_derivatives(self, sma):
        # 计算损失值变化率
        assert sma >= 1
        derivatives = [0] * sma
        for i in range(sma, len(self.lrs)):
            derivatives.append((self.losses[i] - self.losses[i - sma]) / sma)
        return derivatives

    def get_best_lr(self, sma, n_skip_beginning=10, n_skip_end=5):
        # 获取最优学习率
        derivatives = self.get_derivatives(sma)
        best_der_idx = np.argmin(derivatives[n_skip_beginning:-n_skip_end])
        return self.lrs[n_skip_beginning:-n_skip_end][best_der_idx]
# ---------------------------------------------------------------------------------------------------

# --------------------------------------------- RESULTS  --------------------------------------------
def get_model(path):
    # 加载保存的模型
    saved_VRAE_model = load_model(path, compile=False)
    
    # 返回编码器和回归器
    return saved_VRAE_model.layers[1], saved_VRAE_model.layers[2]

def evaluate(y_true, y_hat, label='test'):
    # 评估模型性能
    mse = mean_squared_error(y_true, y_hat) # 计算均方误差
    rmse = np.sqrt(mse) # 计算均方根误差
    variance = r2_score(y_true, y_hat) # 计算 R2 分数
    print('{} set RMSE:{}, R2:{}'.format(label, rmse, variance))

def score(y_true, y_hat):
  # 计算模型得分
  res = 0
  for true, hat in zip(y_true, y_hat):
    subs = hat - true
    if subs < 0:
      res = res + np.exp(-subs/10)[0]-1
    else:
      res = res + np.exp(subs/13)[0]-1
  print('score: ', res)

def results(path, x_train, y_train, x_test, y_test):
    # 获取模型结果
    encoder, regressor = get_model(path) # 加载模型
    # 潜在空间可视化
    train_mu = viz_latent_space(encoder, x_train, y_train) # 可视化训练集的潜在空间
    test_mu = viz_latent_space(encoder, x_test, y_test) # 可视化测试集的潜在空间
    # 评估模型
    y_hat_train = regressor.predict(train_mu) # 预测训练集的目标值
    y_hat_test = regressor.predict(test_mu) # 预测测试集的目标值

    evaluate(y_train, y_hat_train, 'train') # 评估训练集性能
    evaluate(y_test, y_hat_test, 'test') # 评估测试集性能
    score(y_test, y_hat_test) # 计算模型得分

Jupyter Notebook中模型训练回调函数和评估函数代码详解

原文地址: https://www.cveoy.top/t/topic/jPeA 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录