import utils
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Dense, Lambda, LSTM, RepeatVector, Bidirectional, Masking, Dropout

# 数据集
dataset = 'FD003'
# 使用的传感器
sensors = ['s_3', 's_4', 's_7', 's_11', 's_12']
# 窗口长度
sequence_length = 30
# 平滑强度
alpha = 0.1
# 最大剩余寿命
threshold = 125

# 获取数据
x_train, y_train, x_val, y_val, x_test, y_test = utils.get_data(dataset, sensors, sequence_length, alpha, threshold)

# 模型参数
timesteps = x_train.shape[1]
input_dim = x_train.shape[2]
intermediate_dim = 300
batch_size = 128
latent_dim = 2
epochs = 10
# 掩蔽值
masking_value = -99.

# 采样类
class Sampling(keras.layers.Layer):
  '''从潜在空间中采样'''
  def call(self, inputs):
    mu, sigma = inputs
    batch = tf.shape(mu)[0]
    dim = tf.shape(mu)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
    return mu + tf.exp(0.5 * sigma) * epsilon

# ----------------------- 编码器 -----------------------
# 输入层
inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')

# 掩蔽层
mask = Masking(mask_value=masking_value)(inputs)

# 双向LSTM层
h = Bidirectional(LSTM(intermediate_dim))(mask) 

# VAE Z层
mu = Dense(latent_dim)(h)
sigma = Dense(latent_dim)(h)

z = Sampling()([mu, sigma])

# 实例化编码器模型
encoder = keras.Model(inputs, [mu, sigma, z], name='encoder')
print(encoder.summary())
# -------------------------------------------------------

# ----------------------- 回归器 --------------------
# 输入层
reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')
# 中间层
reg_intermediate = Dense(200, activation='tanh')(reg_latent_inputs)
# 输出层
reg_outputs = Dense(1, name='reg_output')(reg_intermediate)

# 实例化回归模型
regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')
print(regressor.summary())
# -------------------------------------------------------

# ----------------------- 解码器 --------------------
# 输入层
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
# 重复向量层
h_decoded = RepeatVector(timesteps)(latent_inputs)
# 双向LSTM层
h_decoded = Bidirectional(LSTM(intermediate_dim, return_sequences=True))(h_decoded) 
# 解码层
outputs = LSTM(input_dim, return_sequences=True)(h_decoded) 

# 实例化解码器模型
decoder = keras.Model(latent_inputs, outputs, name='decoder')
print(decoder.summary())
# -------------------------------------------------------

# RVE类
class RVE(keras.Model):
    def __init__(self, encoder, regressor, decoder=None, **kwargs):
        super(RVE, self).__init__(**kwargs)
        self.encoder = encoder
        self.regressor = regressor
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
        self.reg_loss_tracker = keras.metrics.Mean(name="reg_loss")
        self.decoder = decoder
        if self.decoder!=None:
          self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        

    @property
    def metrics(self):
      if self.decoder!=None:
        return [
            self.total_loss_tracker,
            self.kl_loss_tracker,
            self.reg_loss_tracker,
            self.reconstruction_loss_tracker
        ]
      else:
        return [
            self.total_loss_tracker,
            self.kl_loss_tracker,
            self.reg_loss_tracker,
        ]

    def train_step(self, data):
        x, target_x = data
        with tf.GradientTape() as tape:
            # KL loss
            mu, sigma, z = self.encoder(x)
            kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            # 回归器
            reg_prediction = self.regressor(z)
            reg_loss = tf.reduce_mean(
                keras.losses.mse(target_x, reg_prediction)
            )
            # 重构
            if self.decoder!=None:
              reconstruction = self.decoder(z)
              reconstruction_loss = tf.reduce_mean(
                  keras.losses.mse(x, reconstruction)
              )
              total_loss = kl_loss + reg_loss + reconstruction_loss
              self.reconstruction_loss_tracker.update_state(reconstruction_loss)
            else:
              total_loss = kl_loss + reg_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        self.reg_loss_tracker.update_state(reg_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
            "reg_loss": self.reg_loss_tracker.result(),
        }


    def test_step(self, data):
        x, target_x = data

        # KL loss
        mu, sigma, z = self.encoder(x)
        kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        # 回归器
        reg_prediction = self.regressor(z)
        reg_loss = tf.reduce_mean(
            keras.losses.mse(target_x, reg_prediction)
        )
        # 重构
        if self.decoder!=None:
          reconstruction = self.decoder(z)
          reconstruction_loss = tf.reduce_mean(
              keras.losses.mse(x, reconstruction)
          )

          total_loss = kl_loss + reg_loss + reconstruction_loss
        else:
          total_loss = kl_loss + reg_loss

        return {
            "loss": total_loss,
            "kl_loss": kl_loss,
            "reg_loss": reg_loss,
        }

rve = RVE(encoder, regressor)
# LRFinder类
lr_finder = utils.LRFinder(rve)

rve.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0000001))

# 寻找最佳学习率
# 训练5个epoch,学习率从0.000001指数增长到0.1
lr_finder.find(x_train, y_train, start_lr=0.000001, end_lr=0.1, batch_size=batch_size, epochs=5)

# 绘制损失曲线
lr_finder.plot_loss(n_skip_beginning=20, n_skip_end=5)

# 实例化RVE模型
rve = RVE(encoder, regressor)
# 编译模型,学习率设置为0.001
rve.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001))
# 训练模型
rve.fit(x_train, y_train, epochs=epochs, validation_data=(x_val, y_val), batch_size=batch_size)

# 绘制潜在空间可视化图像
train_mu = utils.viz_latent_space(rve.encoder, np.concatenate((x_train, x_val)), np.concatenate((y_train, y_val)))
test_mu = utils.viz_latent_space(rve.encoder, x_test, y_test.clip(upper=threshold))

# 预测
y_hat_train = rve.regressor.predict(train_mu)
y_hat_test = rve.regressor.predict(test_mu)

# 评估模型性能
utils.evaluate(np.concatenate((y_train, y_val)), y_hat_train, 'train')
utils.evaluate(y_test, y_hat_test, 'test')
基于变分自编码器和回归模型的故障预测模型代码解析

原文地址: https://www.cveoy.top/t/topic/jPe5 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录