Time Series Regression with Variational Autoencoder (VAE) in Keras
# Define input shape for the encoder
inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')
# Masking layer to handle variable-length sequences
mask = Masking(mask_value=masking_value)(inputs)
# Bidirectional LSTM encoding
h = Bidirectional(LSTM(intermediate_dim))(mask)
# VAE Z layer
mu = Dense(latent_dim)(h)
sigma = Dense(latent_dim)(h)
z = Sampling()([mu, sigma])
# Instantiate the encoder model
encoder = keras.Model(inputs, [z, mu, sigma], name='encoder')
print(encoder.summary())
# -------------------------------------------------------
# ----------------------- Regressor --------------------
reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')
reg_intermediate = Dense(200, activation='tanh')(reg_latent_inputs)
reg_outputs = Dense(1, name='reg_output')(reg_intermediate)
# Instantiate the classifier model:
regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')
print(regressor.summary())
# -------------------------------------------------------
''' uncomment if needed
# ----------------------- Decoder --------------------
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
h_decoded = RepeatVector(timesteps)(latent_inputs)
h_decoded = Bidirectional(LSTM(intermediate_dim, return_sequences=True))(h_decoded)
# decoded layer
outputs = LSTM(input_dim, return_sequences=True)(h_decoded)
# Instantiate the decoder model:
decoder = keras.Model(latent_inputs, outputs, name='decoder')
print(decoder.summary())
# -------------------------------------------------------
'''
# -------------------- Wrapper model --------------------
class RVAE(keras.Model):
def __init__(self, encoder, regressor, decoder=None, **kwargs):
super(RVAE, self).__init__(**kwargs)
self.encoder = encoder
self.regressor = regressor
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
self.reg_loss_tracker = keras.metrics.Mean(name="reg_loss")
self.decoder = decoder
if self.decoder!=None:
self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
@property
def metrics(self):
if self.decoder!=None:
return [
self.total_loss_tracker,
self.kl_loss_tracker,
self.reg_loss_tracker,
self.reconstruction_loss_tracker
]
else:
return [
self.total_loss_tracker,
self.kl_loss_tracker,
self.reg_loss_tracker,
]
def train_step(self, data):
x, target_x = data
with tf.GradientTape() as tape:
# kl loss
z, mu, sigma = self.encoder(x)
kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
# Regressor
reg_prediction = self.regressor(z)
reg_loss = tf.reduce_mean(
keras.losses.mse(target_x, reg_prediction)
)
# Reconstruction
if self.decoder!=None:
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(
keras.losses.mse(x, reconstruction)
)
total_loss = kl_loss + reg_loss + reconstruction_loss
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
else:
total_loss = kl_loss + reg_loss
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.total_loss_tracker.update_state(total_loss)
self.kl_loss_tracker.update_state(kl_loss)
self.reg_loss_tracker.update_state(reg_loss)
return {
"loss": self.total_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
"reg_loss": self.reg_loss_tracker.result(),
}
def test_step(self, data):
x, target_x = data
# kl loss
z, mu, sigma = self.encoder(x)
kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
# Regressor
reg_prediction = self.regressor(z)
reg_loss = tf.reduce_mean(
keras.losses.mse(target_x, reg_prediction)
)
# Reconstruction
if self.decoder!=None:
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(
keras.losses.mse(x, reconstruction)
)
total_loss = kl_loss + reg_loss + reconstruction_loss
else:
total_loss = kl_loss + reg_loss
return {
"loss": total_loss,
"kl_loss": kl_loss,
"reg_loss": reg_loss,
}
# -------------------------------------------------------
vae = RVAE(encoder, regressor)
vae.compile(optimizer=optimizer)
return vae
每一行的代码注释内容:# Define input shape for the encoder inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')
Masking layer to handle variable-length sequences
mask = Masking(mask_value=masking_value)(inputs)
Bidirectional LSTM encoding
h = Bidirectional(LSTM(intermediate_dim))(mask)
VAE Z layer
mu = Dense(latent_dim)(h) sigma = Dense(latent_dim)(h) z = Sampling()([mu, sigma])
Instantiate the encoder model
encoder = keras.Model(inputs, [z, mu, sigma], name='encoder')
Define input shape for the regressor
reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')
Intermediate dense layer for regression
reg_intermediate = Dense(200, activation='tanh')(reg_latent_inputs)
Output layer for regression
reg_outputs = Dense(1, name='reg_output')(reg_intermediate)
Instantiate the regressor model
regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')
Wrapper model to combine encoder and regressor
class RVAE(keras.Model): def init(self, encoder, regressor, decoder=None, **kwargs): super(RVAE, self).init(**kwargs) self.encoder = encoder self.regressor = regressor self.total_loss_tracker = keras.metrics.Mean(name="total_loss") self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss") self.reg_loss_tracker = keras.metrics.Mean(name="reg_loss") self.decoder = decoder if self.decoder!=None: self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
# Define custom training step
def train_step(self, data):
x, target_x = data
with tf.GradientTape() as tape:
# KL loss
z, mu, sigma = self.encoder(x)
kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
# Regressor
reg_prediction = self.regressor(z)
reg_loss = tf.reduce_mean(keras.losses.mse(target_x, reg_prediction))
# Reconstruction
if self.decoder!=None:
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(keras.losses.mse(x, reconstruction))
total_loss = kl_loss + reg_loss + reconstruction_loss
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
else:
total_loss = kl_loss + reg_loss
# Compute gradients and update weights
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
# Update loss trackers
self.total_loss_tracker.update_state(total_loss)
self.kl_loss_tracker.update_state(kl_loss)
self.reg_loss_tracker.update_state(reg_loss)
# Return dictionary of metrics
return {
"loss": self.total_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
"reg_loss": self.reg_loss_tracker.result(),
}
# Define custom testing step
def test_step(self, data):
x, target_x = data
# KL loss
z, mu, sigma = self.encoder(x)
kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
# Regressor
reg_prediction = self.regressor(z)
reg_loss = tf.reduce_mean(keras.losses.mse(target_x, reg_prediction))
# Reconstruction
if self.decoder!=None:
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(keras.losses.mse(x, reconstruction))
total_loss = kl_loss + reg_loss + reconstruction_loss
else:
total_loss = kl_loss + reg_loss
# Return dictionary of metrics
return {
"loss": total_loss,
"kl_loss": kl_loss,
"reg_loss": reg_loss,
}
Instantiate the VAE model
vae = RVAE(encoder, regressor) vae.compile(optimizer=optimizer)
Return the VAE model
return vae
原文地址: https://www.cveoy.top/t/topic/jNqY 著作权归作者所有。请勿转载和采集!