基于变分自编码器和回归模型的故障预测模型代码解析
import utils
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Dense, Lambda, LSTM, RepeatVector, Bidirectional, Masking, Dropout
# 数据集
dataset = 'FD003'
# 使用的传感器
sensors = ['s_3', 's_4', 's_7', 's_11', 's_12']
# 窗口长度
sequence_length = 30
# 平滑强度
alpha = 0.1
# 最大剩余寿命
threshold = 125
# 获取数据
x_train, y_train, x_val, y_val, x_test, y_test = utils.get_data(dataset, sensors, sequence_length, alpha, threshold)
# 模型参数
timesteps = x_train.shape[1]
input_dim = x_train.shape[2]
intermediate_dim = 300
batch_size = 128
latent_dim = 2
epochs = 10
# 掩蔽值
masking_value = -99.
# 采样类
class Sampling(keras.layers.Layer):
'''从潜在空间中采样'''
def call(self, inputs):
mu, sigma = inputs
batch = tf.shape(mu)[0]
dim = tf.shape(mu)[1]
epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
return mu + tf.exp(0.5 * sigma) * epsilon
# ----------------------- 编码器 -----------------------
# 输入层
inputs = Input(shape=(timesteps, input_dim,), name='encoder_input')
# 掩蔽层
mask = Masking(mask_value=masking_value)(inputs)
# 双向LSTM层
h = Bidirectional(LSTM(intermediate_dim))(mask)
# VAE Z层
mu = Dense(latent_dim)(h)
sigma = Dense(latent_dim)(h)
z = Sampling()([mu, sigma])
# 实例化编码器模型
encoder = keras.Model(inputs, [mu, sigma, z], name='encoder')
print(encoder.summary())
# -------------------------------------------------------
# ----------------------- 回归器 --------------------
# 输入层
reg_latent_inputs = Input(shape=(latent_dim,), name='z_sampling_reg')
# 中间层
reg_intermediate = Dense(200, activation='tanh')(reg_latent_inputs)
# 输出层
reg_outputs = Dense(1, name='reg_output')(reg_intermediate)
# 实例化回归模型
regressor = keras.Model(reg_latent_inputs, reg_outputs, name='regressor')
print(regressor.summary())
# -------------------------------------------------------
# ----------------------- 解码器 --------------------
# 输入层
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
# 重复向量层
h_decoded = RepeatVector(timesteps)(latent_inputs)
# 双向LSTM层
h_decoded = Bidirectional(LSTM(intermediate_dim, return_sequences=True))(h_decoded)
# 解码层
outputs = LSTM(input_dim, return_sequences=True)(h_decoded)
# 实例化解码器模型
decoder = keras.Model(latent_inputs, outputs, name='decoder')
print(decoder.summary())
# -------------------------------------------------------
# RVE类
class RVE(keras.Model):
def __init__(self, encoder, regressor, decoder=None, **kwargs):
super(RVE, self).__init__(**kwargs)
self.encoder = encoder
self.regressor = regressor
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
self.reg_loss_tracker = keras.metrics.Mean(name="reg_loss")
self.decoder = decoder
if self.decoder!=None:
self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
@property
def metrics(self):
if self.decoder!=None:
return [
self.total_loss_tracker,
self.kl_loss_tracker,
self.reg_loss_tracker,
self.reconstruction_loss_tracker
]
else:
return [
self.total_loss_tracker,
self.kl_loss_tracker,
self.reg_loss_tracker,
]
def train_step(self, data):
x, target_x = data
with tf.GradientTape() as tape:
# KL loss
mu, sigma, z = self.encoder(x)
kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
# 回归器
reg_prediction = self.regressor(z)
reg_loss = tf.reduce_mean(
keras.losses.mse(target_x, reg_prediction)
)
# 重构
if self.decoder!=None:
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(
keras.losses.mse(x, reconstruction)
)
total_loss = kl_loss + reg_loss + reconstruction_loss
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
else:
total_loss = kl_loss + reg_loss
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.total_loss_tracker.update_state(total_loss)
self.kl_loss_tracker.update_state(kl_loss)
self.reg_loss_tracker.update_state(reg_loss)
return {
"loss": self.total_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
"reg_loss": self.reg_loss_tracker.result(),
}
def test_step(self, data):
x, target_x = data
# KL loss
mu, sigma, z = self.encoder(x)
kl_loss = -0.5 * (1 + sigma - tf.square(mu) - tf.exp(sigma))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
# 回归器
reg_prediction = self.regressor(z)
reg_loss = tf.reduce_mean(
keras.losses.mse(target_x, reg_prediction)
)
# 重构
if self.decoder!=None:
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(
keras.losses.mse(x, reconstruction)
)
total_loss = kl_loss + reg_loss + reconstruction_loss
else:
total_loss = kl_loss + reg_loss
return {
"loss": total_loss,
"kl_loss": kl_loss,
"reg_loss": reg_loss,
}
rve = RVE(encoder, regressor)
# LRFinder类
lr_finder = utils.LRFinder(rve)
rve.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0000001))
# 寻找最佳学习率
# 训练5个epoch,学习率从0.000001指数增长到0.1
lr_finder.find(x_train, y_train, start_lr=0.000001, end_lr=0.1, batch_size=batch_size, epochs=5)
# 绘制损失曲线
lr_finder.plot_loss(n_skip_beginning=20, n_skip_end=5)
# 实例化RVE模型
rve = RVE(encoder, regressor)
# 编译模型,学习率设置为0.001
rve.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001))
# 训练模型
rve.fit(x_train, y_train, epochs=epochs, validation_data=(x_val, y_val), batch_size=batch_size)
# 绘制潜在空间可视化图像
train_mu = utils.viz_latent_space(rve.encoder, np.concatenate((x_train, x_val)), np.concatenate((y_train, y_val)))
test_mu = utils.viz_latent_space(rve.encoder, x_test, y_test.clip(upper=threshold))
# 预测
y_hat_train = rve.regressor.predict(train_mu)
y_hat_test = rve.regressor.predict(test_mu)
# 评估模型性能
utils.evaluate(np.concatenate((y_train, y_val)), y_hat_train, 'train')
utils.evaluate(y_test, y_hat_test, 'test')
原文地址: https://www.cveoy.top/t/topic/jPe5 著作权归作者所有。请勿转载和采集!