这段代码实现了一个机器学习模型,它使用变分自编码器 (VAE) 和回归模型来预测发动机的剩余使用寿命 (RUL)。

首先,代码使用 utils 模块中的 get_data 函数获取数据集,并设置了一些网络参数,包括:

  • dataset: 数据集名称,这里为 'FD003'。
  • sensors: 使用的传感器,这里为 's_3', 's_4', 's_7', 's_11', 's_12'。
  • sequence_length: 序列长度,这里为 30。
  • alpha: 平滑强度,这里为 0.1。
  • threshold: 最大 RUL,这里为 125。

接着定义了一个 Sampling 类,它从均值和方差中采样出一个向量,用于 VAE 的潜在空间采样。

然后代码定义了三个模型:

  • 编码器 (Encoder):将输入序列编码为潜在空间向量 (z),该向量包含了输入序列的主要信息。
  • 回归器 (Regressor):接收潜在空间向量 (z) 并预测 RUL。
  • 解码器 (Decoder):接收潜在空间向量 (z) 并重建原始输入序列,用于 VAE 的重构损失计算。

最后,代码将三个模型组合成一个 RVE (Residual Variance Estimator) 模型,并进行训练和测试,评估模型性能。

以下是代码的主要部分以及注释:

import utils
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Dense, Lambda, LSTM, RepeatVector, Bidirectional, Masking, Dropout

dataset = 'FD003'
# 使用的传感器
sensors = ['s_3', 's_4', 's_7', 's_11', 's_12']
# 序列长度
sequence_length = 30
# 平滑强度
alpha = 0.1
# 最大 RUL
threshold = 125

x_train, y_train, x_val, y_val, x_test, y_test = utils.get_data(dataset, sensors, sequence_length, alpha, threshold)

# 设置网络参数
timesteps = x_train.shape[1]
input_dim = x_train.shape[2]
intermediate_dim = 300
batch_size = 128
latent_dim = 2
epochs = 10
# 用于掩盖序列中不足 30 个周期的值
masking_value = -99.

class Sampling(keras.layers.Layer):
  '''从 (z_mean, sigma) 中采样 z,即表示发动机轨迹的向量。'''
  def call(self, inputs):
    mu, sigma = inputs
    batch = tf.shape(mu)[0]
    dim = tf.shape(mu)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
    return mu + tf.exp(0.5 * sigma) * epsilon

# ----------------------- 编码器 -----------------------
inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')

mask = Masking(mask_value=masking_value)(inputs)

# LSTM 编码
h = Bidirectional(LSTM(intermediate_dim))(mask)

# VAE Z 层
mu = Dense(latent_dim)(h)
sigma = Dense(latent_dim)(h)

z = Sampling()([mu, sigma])

# 实例化编码器模型
encoder = keras.Model(inputs, [mu, sigma, z], name='encoder')
print(encoder.summary())
# -------------------------------------------------------
# ----------------------- 回归器 --------------------
reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')
reg_intermediate = Dense(200, activation='tanh')(reg_latent_inputs)
reg_outputs = Dense(1, name='reg_output')(reg_intermediate)
# 实例化分类器模型
regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')
print(regressor.summary())
# -------------------------------------------------------
# ----------------------- 解码器 --------------------
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
h_decoded = RepeatVector(timesteps)(latent_inputs)
h_decoded = Bidirectional(LSTM(intermediate_dim, return_sequences=True))(h_decoded)
# 解码层
outputs = LSTM(input_dim, return_sequences=True)(h_decoded)

# 实例化解码器模型
decoder = keras.Model(latent_inputs, outputs, name='decoder')
print(decoder.summary())
# -------------------------------------------------------
class RVE(keras.Model):
    def __init__(self, encoder, regressor, decoder=None, **kwargs):
        super(RVE, self).__init__(**kwargs)
        self.encoder = encoder
        self.regressor = regressor
        self.total_loss_tracker = keras.metrics.Mean(name='total_loss')
        self.kl_loss_tracker = keras.metrics.Mean(name='kl_loss')
        self.reg_loss_tracker = keras.metrics.Mean(name='reg_loss')
        self.decoder = decoder
        if self.decoder != None:
          self.reconstruction_loss_tracker = keras.metrics.Mean(name='reconstruction_loss')
        

    @property
    def metrics(self):
      if self.decoder != None:
        return [
            self.total_loss_tracker,
            self.kl_loss_tracker,
            self.reg_loss_tracker,
            self.reconstruction_loss_tracker
        ]
      else:
        return [
            self.total_loss_tracker,
            self.kl_loss_tracker,
            self.reg_loss_tracker,
        ]

    def train_step(self, data):
        x, target_x = data
        with tf.GradientTape() as tape:
            # KL 损失
            mu, sigma, z = self.encoder(x)
            kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            # 回归器
            reg_prediction = self.regressor(z)
            reg_loss = tf.reduce_mean(
                keras.losses.mse(target_x, reg_prediction)
            )
            # 重构
            if self.decoder != None:
              reconstruction = self.decoder(z)
              reconstruction_loss = tf.reduce_mean(
                  keras.losses.mse(x, reconstruction)
              )
              total_loss = kl_loss + reg_loss + reconstruction_loss
              self.reconstruction_loss_tracker.update_state(reconstruction_loss)
            else:
              total_loss = kl_loss + reg_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        self.reg_loss_tracker.update_state(reg_loss)
        return {
            'loss': self.total_loss_tracker.result(),
            'kl_loss': self.kl_loss_tracker.result(),
            'reg_loss': self.reg_loss_tracker.result(),
        }


    def test_step(self, data):
        x, target_x = data

        # KL 损失
        mu, sigma, z = self.encoder(x)
        kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        # 回归器
        reg_prediction = self.regressor(z)
        reg_loss = tf.reduce_mean(
            keras.losses.mse(target_x, reg_prediction)
        )
        # 重构
        if self.decoder != None:
          reconstruction = self.decoder(z)
          reconstruction_loss = tf.reduce_mean(
              keras.losses.mse(x, reconstruction)
          )

          total_loss = kl_loss + reg_loss + reconstruction_loss
        else:
          total_loss = kl_loss + reg_loss

        return {
            'loss': total_loss,
            'kl_loss': kl_loss,
            'reg_loss': reg_loss,
        }
rve = RVE(encoder, regressor)
lr_finder = utils.LRFinder(rve)

rve.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0000001))

# 使用 batch 大小为 512,训练 5 个 epoch,学习率从 0.0000001 指数增长到 0.1
# 训练模型
lr_finder.find(x_train, y_train, start_lr=0.000001, end_lr=0.1, batch_size=batch_size, epochs=5)

# 绘制损失曲线,忽略开始的 20 个 batch 和最后的 5 个 batch
lr_finder.plot_loss(n_skip_beginning=20, n_skip_end=5)
rve = RVE(encoder, regressor)
rve.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001))
rve.fit(x_train, y_train, epochs=epochs, validation_data=(x_val, y_val), batch_size=batch_size)
train_mu = utils.viz_latent_space(rve.encoder, np.concatenate((x_train, x_val)), np.concatenate((y_train, y_val)))
test_mu = utils.viz_latent_space(rve.encoder, x_test, y_test.clip(upper=threshold))
# 评估
y_hat_train = rve.regressor.predict(train_mu)
y_hat_test = rve.regressor.predict(test_mu)

utils.evaluate(np.concatenate((y_train, y_val)), y_hat_train, 'train')
utils.evaluate(y_test, y_hat_test, 'test')

代码中用到的主要技术:

  • 变分自编码器 (VAE):是一种生成模型,能够学习数据的潜在表示,并生成与训练数据类似的新数据。
  • 回归模型: 用于预测连续数值目标,例如 RUL。
  • LSTM: 一种循环神经网络,用于处理时序数据。
  • Masking: 用于处理序列中缺失的值。
  • Bidirectional LSTM: 用于双向处理序列数据。
  • RepeatVector: 用于将一个向量复制成多个相同向量。
  • Keras: 一种深度学习库,用于构建和训练神经网络。
  • TensorFlow: 一种机器学习平台,提供计算图和自动微分等功能。

代码解释:

  1. 导入库: 导入必要的库,包括 utils 模块用于加载数据集和评估模型。
  2. 加载数据集: 从 utils 模块中加载数据集,并设置模型参数。
  3. 定义 Sampling: 该类用于从均值和方差中采样潜在空间向量。
  4. 定义编码器: 编码器将输入序列编码为潜在空间向量。
  5. 定义回归器: 回归器接收潜在空间向量,并预测 RUL。
  6. 定义解码器: 解码器接收潜在空间向量,并重建原始输入序列。
  7. 定义 RVE: RVE 类将编码器、回归器和解码器组合在一起,并定义了训练和测试步骤。
  8. 训练模型: 使用 fit 方法训练模型,并使用 evaluate 方法评估模型性能。

总结:

代码实现了一个基于变分自编码器和回归模型的发动机剩余使用寿命预测模型。代码提供了详细的注释,帮助读者理解模型的原理和代码实现细节。该模型可以用于预测发动机的剩余使用寿命,并帮助维护人员及时进行维修,提高设备的使用效率和安全性。


原文地址: https://www.cveoy.top/t/topic/jPeT 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录