# Define input shape for the encoder
inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')

# Masking layer to handle variable-length sequences
mask = Masking(mask_value=masking_value)(inputs)

# Bidirectional LSTM encoding
h = Bidirectional(LSTM(intermediate_dim))(mask) 

# VAE Z layer
mu = Dense(latent_dim)(h)
sigma = Dense(latent_dim)(h)
z = Sampling()([mu, sigma])

# Instantiate the encoder model
encoder = keras.Model(inputs, [z, mu, sigma], name='encoder')
print(encoder.summary())
# -------------------------------------------------------

# ----------------------- Regressor --------------------
reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')
reg_intermediate = Dense(200, activation='tanh')(reg_latent_inputs)
reg_outputs = Dense(1, name='reg_output')(reg_intermediate)
# Instantiate the classifier model:
regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')
print(regressor.summary())
# -------------------------------------------------------

''' uncomment if needed
# ----------------------- Decoder --------------------
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
h_decoded = RepeatVector(timesteps)(latent_inputs)
h_decoded = Bidirectional(LSTM(intermediate_dim, return_sequences=True))(h_decoded) 
# decoded layer
outputs = LSTM(input_dim, return_sequences=True)(h_decoded) 
# Instantiate the decoder model:
decoder = keras.Model(latent_inputs, outputs, name='decoder')
print(decoder.summary())
# -------------------------------------------------------
'''

# -------------------- Wrapper model --------------------
class RVAE(keras.Model):
    def __init__(self, encoder, regressor, decoder=None, **kwargs):
        super(RVAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.regressor = regressor
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
        self.reg_loss_tracker = keras.metrics.Mean(name="reg_loss")
        self.decoder = decoder
        if self.decoder!=None:
            self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")


    @property
    def metrics(self):
        if self.decoder!=None:
            return [
                self.total_loss_tracker,
                self.kl_loss_tracker,
                self.reg_loss_tracker,
                self.reconstruction_loss_tracker
            ]
        else:
            return [
                self.total_loss_tracker,
                self.kl_loss_tracker,
                self.reg_loss_tracker,
            ]

    def train_step(self, data):
        x, target_x = data
        with tf.GradientTape() as tape:
            # kl loss
            z, mu, sigma = self.encoder(x)
            kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            # Regressor
            reg_prediction = self.regressor(z)
            reg_loss = tf.reduce_mean(
                keras.losses.mse(target_x, reg_prediction)
            )
            # Reconstruction
            if self.decoder!=None:
                reconstruction = self.decoder(z)
                reconstruction_loss = tf.reduce_mean(
                    keras.losses.mse(x, reconstruction)
                )
                total_loss = kl_loss + reg_loss + reconstruction_loss
                self.reconstruction_loss_tracker.update_state(reconstruction_loss)
            else:
                total_loss = kl_loss + reg_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        self.reg_loss_tracker.update_state(reg_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
            "reg_loss": self.reg_loss_tracker.result(),
        }


    def test_step(self, data):
        x, target_x = data

        # kl loss
        z, mu, sigma = self.encoder(x)
        kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            # Regressor
        reg_prediction = self.regressor(z)
        reg_loss = tf.reduce_mean(
            keras.losses.mse(target_x, reg_prediction)
        )
        # Reconstruction
        if self.decoder!=None:
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                keras.losses.mse(x, reconstruction)
            )
            total_loss = kl_loss + reg_loss + reconstruction_loss
        else:
            total_loss = kl_loss + reg_loss

        return {
            "loss": total_loss,
            "kl_loss": kl_loss,
            "reg_loss": reg_loss,
        }
# -------------------------------------------------------

vae = RVAE(encoder, regressor)
vae.compile(optimizer=optimizer)

return vae

每一行的代码注释内容:# Define input shape for the encoder inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')

Masking layer to handle variable-length sequences

mask = Masking(mask_value=masking_value)(inputs)

Bidirectional LSTM encoding

h = Bidirectional(LSTM(intermediate_dim))(mask)

VAE Z layer

mu = Dense(latent_dim)(h) sigma = Dense(latent_dim)(h) z = Sampling()([mu, sigma])

Instantiate the encoder model

encoder = keras.Model(inputs, [z, mu, sigma], name='encoder')

Define input shape for the regressor

reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')

Intermediate dense layer for regression

reg_intermediate = Dense(200, activation='tanh')(reg_latent_inputs)

Output layer for regression

reg_outputs = Dense(1, name='reg_output')(reg_intermediate)

Instantiate the regressor model

regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')

Wrapper model to combine encoder and regressor

class RVAE(keras.Model): def init(self, encoder, regressor, decoder=None, **kwargs): super(RVAE, self).init(**kwargs) self.encoder = encoder self.regressor = regressor self.total_loss_tracker = keras.metrics.Mean(name="total_loss") self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss") self.reg_loss_tracker = keras.metrics.Mean(name="reg_loss") self.decoder = decoder if self.decoder!=None: self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")

# Define custom training step
def train_step(self, data):
    x, target_x = data
    with tf.GradientTape() as tape:
        # KL loss
        z, mu, sigma = self.encoder(x)
        kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))

        # Regressor
        reg_prediction = self.regressor(z)
        reg_loss = tf.reduce_mean(keras.losses.mse(target_x, reg_prediction))

        # Reconstruction
        if self.decoder!=None:
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(keras.losses.mse(x, reconstruction))
            total_loss = kl_loss + reg_loss + reconstruction_loss
            self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        else:
            total_loss = kl_loss + reg_loss

    # Compute gradients and update weights
    grads = tape.gradient(total_loss, self.trainable_weights)
    self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

    # Update loss trackers
    self.total_loss_tracker.update_state(total_loss)
    self.kl_loss_tracker.update_state(kl_loss)
    self.reg_loss_tracker.update_state(reg_loss)

    # Return dictionary of metrics
    return {
        "loss": self.total_loss_tracker.result(),
        "kl_loss": self.kl_loss_tracker.result(),
        "reg_loss": self.reg_loss_tracker.result(),
    }

# Define custom testing step
def test_step(self, data):
    x, target_x = data

    # KL loss
    z, mu, sigma = self.encoder(x)
    kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
    kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))

    # Regressor
    reg_prediction = self.regressor(z)
    reg_loss = tf.reduce_mean(keras.losses.mse(target_x, reg_prediction))

    # Reconstruction
    if self.decoder!=None:
        reconstruction = self.decoder(z)
        reconstruction_loss = tf.reduce_mean(keras.losses.mse(x, reconstruction))
        total_loss = kl_loss + reg_loss + reconstruction_loss
    else:
        total_loss = kl_loss + reg_loss

    # Return dictionary of metrics
    return {
        "loss": total_loss,
        "kl_loss": kl_loss,
        "reg_loss": reg_loss,
    }

Instantiate the VAE model

vae = RVAE(encoder, regressor) vae.compile(optimizer=optimizer)

Return the VAE model

return vae

Time Series Regression with Variational Autoencoder (VAE) in Keras

原文地址: https://www.cveoy.top/t/topic/jNqY 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录