用TensorFlow构建变分自编码器(VAE)实现回归和重建
import random
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Input, Dense, LSTM, Bidirectional, Masking
# 设置随机种子, 保证实验结果可复现
seed = 99
random.seed(seed)
tf.random.set_seed(seed)
def create_model(timesteps, input_dim, intermediate_dim, batch_size, latent_dim, epochs, optimizer):
# 设置网络参数
timesteps = timesteps
input_dim = input_dim
intermediate_dim = intermediate_dim
batch_size = batch_size
latent_dim = latent_dim
epochs = epochs
if optimizer == 'adam':
optimizer = keras.optimizers.Adam(learning_rate=0.001)
else:
print('unimplemented optimizer')
exit(-1)
masking_value = -99.
class Sampling(keras.layers.Layer):
'''使用 (z_mean, sigma) 对 z 进行采样, z是编码引擎轨迹的向量。'''
def call(self, inputs):
mu, sigma = inputs
batch = tf.shape(mu)[0]
dim = tf.shape(mu)[1]
epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
return mu + tf.exp(0.5 * sigma) * epsilon
# ----------------------- 编码器 -----------------------
inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')
# 对输入数据进行掩码处理
mask = Masking(mask_value=masking_value)(inputs)
# 使用双向LSTM进行编码
h = Bidirectional(LSTM(intermediate_dim))(mask)
# VAE Z 层
mu = Dense(latent_dim)(h)
sigma = Dense(latent_dim)(h)
# 从潜在空间中采样
z = Sampling()([mu, sigma])
# 实例化编码器模型
encoder = keras.Model(inputs, [z, mu, sigma], name='encoder')
print(encoder.summary())
# -------------------------------------------------------
# ----------------------- 回归器 --------------------
reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')
reg_intermediate = Dense(200, activation='tanh')(reg_latent_inputs)
reg_outputs = Dense(1, name='reg_output')(reg_intermediate)
# 实例化回归器模型
regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')
print(regressor.summary())
# -------------------------------------------------------
'''
# 如有需要, 取消注释
# ----------------------- 解码器 --------------------
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
h_decoded = RepeatVector(timesteps)(latent_inputs)
h_decoded = Bidirectional(LSTM(intermediate_dim, return_sequences=True))(h_decoded)
# 解码层
outputs = LSTM(input_dim, return_sequences=True)(h_decoded)
# 实例化解码器模型
decoder = keras.Model(latent_inputs, outputs, name='decoder')
print(decoder.summary())
# -------------------------------------------------------
'''
# -------------------- 包装模型 --------------------
class RVE(keras.Model):
def __init__(self, encoder, regressor, decoder=None, **kwargs):
super(RVE, self).__init__(**kwargs)
self.encoder = encoder
self.regressor = regressor
self.total_loss_tracker = keras.metrics.Mean(name='total_loss')
self.kl_loss_tracker = keras.metrics.Mean(name='kl_loss')
self.reg_loss_tracker = keras.metrics.Mean(name='reg_loss')
self.decoder = decoder
if self.decoder!=None:
self.reconstruction_loss_tracker = keras.metrics.Mean(name='reconstruction_loss')
@property
def metrics(self):
if self.decoder!=None:
return [
self.total_loss_tracker,
self.kl_loss_tracker,
self.reg_loss_tracker,
self.reconstruction_loss_tracker
]
else:
return [
self.total_loss_tracker,
self.kl_loss_tracker,
self.reg_loss_tracker,
]
def train_step(self, data):
x, target_x = data
with tf.GradientTape() as tape:
# 计算 KL 散度损失
z, mu, sigma = self.encoder(x)
kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
# 计算回归损失
reg_prediction = self.regressor(z)
reg_loss = tf.reduce_mean(
keras.losses.mse(target_x, reg_prediction)
)
# 计算重建损失 (如果使用了解码器)
if self.decoder!=None:
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(
keras.losses.mse(x, reconstruction)
)
total_loss = kl_loss + reg_loss + reconstruction_loss
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
else:
total_loss = kl_loss + reg_loss
# 计算梯度并更新模型参数
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
# 更新损失指标
self.total_loss_tracker.update_state(total_loss)
self.kl_loss_tracker.update_state(kl_loss)
self.reg_loss_tracker.update_state(reg_loss)
# 返回损失值
return {
'loss': self.total_loss_tracker.result(),
'kl_loss': self.kl_loss_tracker.result(),
'reg_loss': self.reg_loss_tracker.result(),
}
def test_step(self, data):
x, target_x = data
# 计算 KL 散度损失
z, mu, sigma = self.encoder(x)
kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
# 计算回归损失
reg_prediction = self.regressor(z)
reg_loss = tf.reduce_mean(
keras.losses.mse(target_x, reg_prediction)
)
# 计算重建损失 (如果使用了解码器)
if self.decoder!=None:
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(
keras.losses.mse(x, reconstruction)
)
total_loss = kl_loss + reg_loss + reconstruction_loss
else:
total_loss = kl_loss + reg_loss
return {
'loss': total_loss,
'kl_loss': kl_loss,
'reg_loss': reg_loss,
}
# -------------------------------------------------------
# 创建并编译模型
rve = RVE(encoder, regressor)
rve.compile(optimizer=optimizer)
return rve
这段代码定义了一个名为 create_model 的函数, 该函数可以创建一个变分自编码器 (VAE) 模型, 用于回归预测和数据重建。
主要功能:
- 编码器: 使用双向 LSTM 对输入数据进行编码, 并将其映射到潜在空间中的一个点。
- 解码器 (可选): 使用 LSTM 将潜在空间中的点解码回原始输入数据的形状。
- 回归器: 使用编码器的输出 (潜在空间中的点) 来预测目标值。
- 损失函数: 使用 KL 散度损失、回归损失和重建损失 (如果使用了解码器) 来训练模型。
代码解析:
- 导入必要的库: 包括
tensorflow,keras以及一些用于构建模型的层。 - 设置随机种子: 确保实验结果可复现。
- 定义
create_model函数:- 设置网络参数, 包括时间步长、输入维度、中间维度、批次大小、潜在维度、训练轮数和优化器。
- 定义
Sampling层: 用于从潜在空间中采样。 - 定义编码器、回归器和解码器 (可选): 使用 Keras 函数式 API 构建模型。
- 定义
RVE类: 包装模型, 包含训练和测试步骤的逻辑。 - 创建并编译模型: 使用指定的优化器编译模型。
使用方法:
- 调用
create_model函数, 传入网络参数, 创建模型。 - 使用训练数据训练模型。
- 使用测试数据评估模型性能。
注意:
- 可以根据需要修改网络参数和模型结构。
- 可以选择使用或不使用解码器。如果只关注回归预测, 则可以不使用解码器。
- 可以使用不同的优化器和损失函数来训练模型。
原文地址: https://www.cveoy.top/t/topic/jPkm 著作权归作者所有。请勿转载和采集!