import random
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Input, Dense, LSTM, Bidirectional, Masking

# 设置随机种子, 保证实验结果可复现
seed = 99
random.seed(seed)
tf.random.set_seed(seed)


def create_model(timesteps, input_dim, intermediate_dim, batch_size, latent_dim, epochs, optimizer):
    # 设置网络参数
    timesteps = timesteps
    input_dim = input_dim
    intermediate_dim = intermediate_dim
    batch_size = batch_size
    latent_dim = latent_dim
    epochs = epochs
    if optimizer == 'adam':
        optimizer = keras.optimizers.Adam(learning_rate=0.001)
    else:
        print('unimplemented optimizer')
        exit(-1)
    masking_value = -99.

    class Sampling(keras.layers.Layer):
        '''使用 (z_mean, sigma) 对 z 进行采样, z是编码引擎轨迹的向量。'''
        def call(self, inputs):
            mu, sigma = inputs
            batch = tf.shape(mu)[0]
            dim = tf.shape(mu)[1]
            epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
            return mu + tf.exp(0.5 * sigma) * epsilon
    
    # ----------------------- 编码器 -----------------------
    inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')

    # 对输入数据进行掩码处理
    mask = Masking(mask_value=masking_value)(inputs)
    
    # 使用双向LSTM进行编码
    h = Bidirectional(LSTM(intermediate_dim))(mask) 

    # VAE Z 层
    mu = Dense(latent_dim)(h)
    sigma = Dense(latent_dim)(h)

    # 从潜在空间中采样
    z = Sampling()([mu, sigma])

    # 实例化编码器模型
    encoder = keras.Model(inputs, [z, mu, sigma], name='encoder')
    print(encoder.summary())
    # -------------------------------------------------------
    
    # ----------------------- 回归器 --------------------
    reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')
    reg_intermediate = Dense(200, activation='tanh')(reg_latent_inputs)
    reg_outputs = Dense(1, name='reg_output')(reg_intermediate)
    # 实例化回归器模型
    regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')
    print(regressor.summary())
    # -------------------------------------------------------

    '''
    # 如有需要, 取消注释
    # ----------------------- 解码器 --------------------
    latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
    h_decoded = RepeatVector(timesteps)(latent_inputs)
    h_decoded = Bidirectional(LSTM(intermediate_dim, return_sequences=True))(h_decoded) 
    # 解码层
    outputs = LSTM(input_dim, return_sequences=True)(h_decoded) 

    # 实例化解码器模型
    decoder = keras.Model(latent_inputs, outputs, name='decoder')
    print(decoder.summary())
    # -------------------------------------------------------
    '''
    
    # -------------------- 包装模型 --------------------
    class RVE(keras.Model):
        def __init__(self, encoder, regressor, decoder=None, **kwargs):
            super(RVE, self).__init__(**kwargs)
            self.encoder = encoder
            self.regressor = regressor
            self.total_loss_tracker = keras.metrics.Mean(name='total_loss')
            self.kl_loss_tracker = keras.metrics.Mean(name='kl_loss')
            self.reg_loss_tracker = keras.metrics.Mean(name='reg_loss')
            self.decoder = decoder
            if self.decoder!=None:
                self.reconstruction_loss_tracker = keras.metrics.Mean(name='reconstruction_loss')
            
        @property
        def metrics(self):
            if self.decoder!=None:
                return [
                    self.total_loss_tracker,
                    self.kl_loss_tracker,
                    self.reg_loss_tracker,
                    self.reconstruction_loss_tracker
                ]
            else:
                return [
                    self.total_loss_tracker,
                    self.kl_loss_tracker,
                    self.reg_loss_tracker,
                ]

        def train_step(self, data):
            x, target_x = data
            with tf.GradientTape() as tape:
                # 计算 KL 散度损失
                z, mu, sigma = self.encoder(x)
                kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
                kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
                # 计算回归损失
                reg_prediction = self.regressor(z)
                reg_loss = tf.reduce_mean(
                    keras.losses.mse(target_x, reg_prediction)
                )
                # 计算重建损失 (如果使用了解码器)
                if self.decoder!=None:
                    reconstruction = self.decoder(z)
                    reconstruction_loss = tf.reduce_mean(
                        keras.losses.mse(x, reconstruction)
                    )
                    total_loss = kl_loss + reg_loss + reconstruction_loss
                    self.reconstruction_loss_tracker.update_state(reconstruction_loss)
                else:
                    total_loss = kl_loss + reg_loss
            # 计算梯度并更新模型参数
            grads = tape.gradient(total_loss, self.trainable_weights)
            self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
            # 更新损失指标
            self.total_loss_tracker.update_state(total_loss)
            self.kl_loss_tracker.update_state(kl_loss)
            self.reg_loss_tracker.update_state(reg_loss)
            # 返回损失值
            return {
                'loss': self.total_loss_tracker.result(),
                'kl_loss': self.kl_loss_tracker.result(),
                'reg_loss': self.reg_loss_tracker.result(),
            }


        def test_step(self, data):
            x, target_x = data

            # 计算 KL 散度损失
            z, mu, sigma = self.encoder(x)
            kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            # 计算回归损失
            reg_prediction = self.regressor(z)
            reg_loss = tf.reduce_mean(
                keras.losses.mse(target_x, reg_prediction)
            )
            # 计算重建损失 (如果使用了解码器)
            if self.decoder!=None:
                reconstruction = self.decoder(z)
                reconstruction_loss = tf.reduce_mean(
                    keras.losses.mse(x, reconstruction)
                )
                total_loss = kl_loss + reg_loss + reconstruction_loss
            else:
                total_loss = kl_loss + reg_loss

            return {
                'loss': total_loss,
                'kl_loss': kl_loss,
                'reg_loss': reg_loss,
            }
    # -------------------------------------------------------

    # 创建并编译模型
    rve = RVE(encoder, regressor)
    rve.compile(optimizer=optimizer)

    return rve

这段代码定义了一个名为 create_model 的函数, 该函数可以创建一个变分自编码器 (VAE) 模型, 用于回归预测和数据重建。

主要功能:

  • 编码器: 使用双向 LSTM 对输入数据进行编码, 并将其映射到潜在空间中的一个点。
  • 解码器 (可选): 使用 LSTM 将潜在空间中的点解码回原始输入数据的形状。
  • 回归器: 使用编码器的输出 (潜在空间中的点) 来预测目标值。
  • 损失函数: 使用 KL 散度损失、回归损失和重建损失 (如果使用了解码器) 来训练模型。

代码解析:

  1. 导入必要的库: 包括 tensorflow, keras 以及一些用于构建模型的层。
  2. 设置随机种子: 确保实验结果可复现。
  3. 定义 create_model 函数:
    • 设置网络参数, 包括时间步长、输入维度、中间维度、批次大小、潜在维度、训练轮数和优化器。
    • 定义 Sampling 层: 用于从潜在空间中采样。
    • 定义编码器、回归器和解码器 (可选): 使用 Keras 函数式 API 构建模型。
    • 定义 RVE 类: 包装模型, 包含训练和测试步骤的逻辑。
    • 创建并编译模型: 使用指定的优化器编译模型。

使用方法:

  1. 调用 create_model 函数, 传入网络参数, 创建模型。
  2. 使用训练数据训练模型。
  3. 使用测试数据评估模型性能。

注意:

  • 可以根据需要修改网络参数和模型结构。
  • 可以选择使用或不使用解码器。如果只关注回归预测, 则可以不使用解码器。
  • 可以使用不同的优化器和损失函数来训练模型。
用TensorFlow构建变分自编码器(VAE)实现回归和重建

原文地址: https://www.cveoy.top/t/topic/jPkm 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录