使用TensorFlow构建变分自编码器(VAE)回归模型
import random import tensorflow as tf from tensorflow import keras from tensorflow.keras.layers import Input, Dense, LSTM, Bidirectional, Masking
为了方便解读超参数的影响,保持每次实验的随机数种子不变
seed = 99 random.seed(seed) tf.random.set_seed(seed)
def create_model(timesteps, input_dim, intermediate_dim, batch_size, latent_dim, epochs, optimizer): # 创建VAE模型 # timesteps: 时间步数 # input_dim: 输入的特征数 # intermediate_dim: 中间层的维度 # batch_size: 批量大小 # latent_dim: 潜在空间的维度 # epochs: 训练轮数 # optimizer: 优化器
# 设置随机数种子,保持实验一致性
seed = 99
random.seed(seed)
tf.random.set_seed(seed)
# 定义采样层
class Sampling(keras.layers.Layer):
'''使用(z_mean,sigma)对z进行采样,即编码器输出的潜在空间向量。'''
def call(self, inputs):
mu, sigma = inputs
batch = tf.shape(mu)[0]
dim = tf.shape(mu)[1]
epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
return mu + tf.exp(0.5 * sigma) * epsilon
# ----------------------- Encoder -----------------------
# 编码器部分
inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')
# 掩码层,将输入数据中的掩码值替换为指定值
mask = Masking(mask_value=-99.)(inputs)
# 双向LSTM编码
h = Bidirectional(LSTM(intermediate_dim))(mask)
# VAE Z层,输出潜在空间向量z的均值和方差
mu = Dense(latent_dim)(h)
sigma = Dense(latent_dim)(h)
# 采样层,对潜在空间向量z进行采样
z = Sampling()([mu, sigma])
# 实例化编码器模型
encoder = keras.Model(inputs, [z, mu, sigma], name='encoder')
# -------------------------------------------------------
# ----------------------- Regressor --------------------
# 回归器部分
reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')
reg_intermediate = Dense(200, activation='tanh')(reg_latent_inputs)
reg_outputs = Dense(1, name='reg_output')(reg_intermediate)
# 实例化回归器模型
regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')
# -------------------------------------------------------
# -------------------- Wrapper model --------------------
# 整合编码器和回归器的模型
class RVE(keras.Model):
def __init__(self, encoder, regressor, **kwargs):
super(RVE, self).__init__(**kwargs)
self.encoder = encoder
self.regressor = regressor
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
self.reg_loss_tracker = keras.metrics.Mean(name="reg_loss")
@property
def metrics(self):
return [
self.total_loss_tracker,
self.kl_loss_tracker,
self.reg_loss_tracker,
]
def train_step(self, data):
x, target_x = data
with tf.GradientTape() as tape:
# kl loss
z, mu, sigma = self.encoder(x)
kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
# Regressor
reg_prediction = self.regressor(z)
reg_loss = tf.reduce_mean(
keras.losses.mse(target_x, reg_prediction)
)
total_loss = kl_loss + reg_loss
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.total_loss_tracker.update_state(total_loss)
self.kl_loss_tracker.update_state(kl_loss)
self.reg_loss_tracker.update_state(reg_loss)
return {
"loss": self.total_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
"reg_loss": self.reg_loss_tracker.result(),
}
def test_step(self, data):
x, target_x = data
# kl loss
z, mu, sigma = self.encoder(x)
kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
# Regressor
reg_prediction = self.regressor(z)
reg_loss = tf.reduce_mean(
keras.losses.mse(target_x, reg_prediction)
)
total_loss = kl_loss + reg_loss
return {
"loss": total_loss,
"kl_loss": kl_loss,
"reg_loss": reg_loss,
}
# -------------------------------------------------------
# 实例化整合模型
rve = RVE(encoder, regressor)
rve.compile(optimizer=optimizer)
return rve
原文地址: https://www.cveoy.top/t/topic/jNYN 著作权归作者所有。请勿转载和采集!