Jupyter Notebook 中 Out 的含义及代码注释
这段代码定义了一些数据预处理的函数和评估模型的函数。其中包括:
add_remaining_useful_life: 计算每个样本的剩余使用寿命(RUL)。add_operating_condition: 将设置和传感器数据合并为一个操作条件。condition_scaler: 对每个操作条件进行标准化。exponential_smoothing: 对传感器数据进行指数平滑处理。gen_train_data: 生成训练数据序列。gen_data_wrapper: 生成训练数据序列的包装器。gen_labels: 生成标签数据序列。gen_label_wrapper: 生成标签数据序列的包装器。gen_test_data: 生成测试数据序列
import math
import numpy as np
from sklearn.model_selection import GroupShuffleSplit
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras import backend as K
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint, TensorBoard, LambdaCallback
# --------------------------------------- DATA PRE-PROCESSING ---------------------------------------
def add_remaining_useful_life(df):
# Get the total number of cycles for each unit
grouped_by_unit = df.groupby(by='unit_nr')
max_cycle = grouped_by_unit['time_cycles'].max()
# Merge the max cycle back into the original frame
result_frame = df.merge(max_cycle.to_frame(name='max_cycle'), left_on='unit_nr', right_index=True)
# Calculate remaining useful life for each row
remaining_useful_life = result_frame['max_cycle'] - result_frame['time_cycles']
result_frame['RUL'] = remaining_useful_life
# drop max_cycle as it's no longer needed
result_frame = result_frame.drop('max_cycle', axis=1)
return result_frame
def add_operating_condition(df):
df_op_cond = df.copy()
df_op_cond['setting_1'] = abs(df_op_cond['setting_1'].round())
df_op_cond['setting_2'] = abs(df_op_cond['setting_2'].round(decimals=2))
# converting settings to string and concatanating makes the operating condition into a categorical variable
df_op_cond['op_cond'] = df_op_cond['setting_1'].astype(str) + '_' + \
df_op_cond['setting_2'].astype(str) + '_' + \
df_op_cond['setting_3'].astype(str)
return df_op_cond
def condition_scaler(df_train, df_test, sensor_names):
# apply operating condition specific scaling
scaler = StandardScaler()
for condition in df_train['op_cond'].unique():
scaler.fit(df_train.loc[df_train['op_cond']==condition, sensor_names])
df_train.loc[df_train['op_cond']==condition, sensor_names] = scaler.transform(df_train.loc[df_train['op_cond']==condition, sensor_names])
df_test.loc[df_test['op_cond']==condition, sensor_names] = scaler.transform(df_test.loc[df_test['op_cond']==condition, sensor_names])
return df_train, df_test
def exponential_smoothing(df, sensors, n_samples, alpha=0.4):
df = df.copy()
# first, take the exponential weighted mean
df[sensors] = df.groupby('unit_nr')[sensors].apply(lambda x: x.ewm(alpha=alpha).mean()).reset_index(level=0, drop=True)
# second, drop first n_samples of each unit_nr to reduce filter delay
def create_mask(data, samples):
result = np.ones_like(data)
result[0:samples] = 0
return result
mask = df.groupby('unit_nr')['unit_nr'].transform(create_mask, samples=n_samples).astype(bool)
df = df[mask]
return df
def gen_train_data(df, sequence_length, columns):
data = df[columns].values
num_elements = data.shape[0]
# -1 and +1 because of Python indexing
for start, stop in zip(range(0, num_elements-(sequence_length-1)), range(sequence_length, num_elements+1)):
yield data[start:stop, :]
def gen_data_wrapper(df, sequence_length, columns, unit_nrs=np.array([])):
if unit_nrs.size <= 0:
unit_nrs = df['unit_nr'].unique()
data_gen = (list(gen_train_data(df[df['unit_nr']==unit_nr], sequence_length, columns))
for unit_nr in unit_nrs)
data_array = np.concatenate(list(data_gen)).astype(np.float32)
return data_array
def gen_labels(df, sequence_length, label):
data_matrix = df[label].values
num_elements = data_matrix.shape[0]
# -1 because I want to predict the rul of that last row in the sequence, not the next row
return data_matrix[sequence_length-1:num_elements, :]
def gen_label_wrapper(df, sequence_length, label, unit_nrs=np.array([])):
if unit_nrs.size <= 0:
unit_nrs = df['unit_nr'].unique()
label_gen = [gen_labels(df[df['unit_nr']==unit_nr], sequence_length, label)
for unit_nr in unit_nrs]
label_array = np.concatenate(label_gen).astype(np.float32)
return label_array
def gen_test_data(df, sequence_length, columns, mask_value):
if df.shape[0] < sequence_length:
data_matrix = np.full(shape=(sequence_length, len(columns)), fill_value=mask_value) # pad
idx = data_matrix.shape[0] - df.shape[0]
data_matrix[idx:,:] = df[columns].values # fill with available data
else:
data_matrix = df[columns].values
# specifically yield the last possible sequence
stop = data_matrix.shape[0]
start = stop - sequence_length
for i in list(range(1)):
yield data_matrix[start:stop, :]
def get_data(dataset, sensors, sequence_length, alpha, threshold):
# files
dir_path = './data/'
train_file = 'train_'+dataset+'.txt'
test_file = 'test_'+dataset+'.txt'
# columns
index_names = ['unit_nr', 'time_cycles']
setting_names = ['setting_1', 'setting_2', 'setting_3']
sensor_names = ['s_{}'.format(i+1) for i in range(0,21)]
col_names = index_names + setting_names + sensor_names
# data readout
train = pd.read_csv((dir_path+train_file), sep=r'\s+', header=None,
names=col_names)
test = pd.read_csv((dir_path+test_file), sep=r'\s+', header=None,
names=col_names)
y_test = pd.read_csv((dir_path+'RUL_'+dataset+'.txt'), sep=r'\s+', header=None,
names=['RemainingUsefulLife'])
# create RUL values according to the piece-wise target function
train = add_remaining_useful_life(train)
train['RUL'].clip(upper=threshold, inplace=True)
# remove unused sensors
drop_sensors = [element for element in sensor_names if element not in sensors]
# scale with respect to the operating condition
X_train_pre = add_operating_condition(train.drop(drop_sensors, axis=1))
X_test_pre = add_operating_condition(test.drop(drop_sensors, axis=1))
X_train_pre, X_test_pre = condition_scaler(X_train_pre, X_test_pre, sensors)
# exponential smoothing
X_train_pre= exponential_smoothing(X_train_pre, sensors, 0, alpha)
X_test_pre = exponential_smoothing(X_test_pre, sensors, 0, alpha)
# train-val split
gss = GroupShuffleSplit(n_splits=1, train_size=0.80, random_state=42)
# generate the train/val for *each* sample -> for that we iterate over the train and val units we want
# this is a for that iterates only once and in that iterations at the same time iterates over all the values we want,
# i.e. train_unit and val_unit are not a single value but a set of training/vali units
for train_unit, val_unit in gss.split(X_train_pre['unit_nr'].unique(), groups=X_train_pre['unit_nr'].unique()):
train_unit = X_train_pre['unit_nr'].unique()[train_unit] # gss returns indexes and index starts at 1
val_unit = X_train_pre['unit_nr'].unique()[val_unit]
x_train = gen_data_wrapper(X_train_pre, sequence_length, sensors, train_unit)
y_train = gen_label_wrapper(X_train_pre, sequence_length, ['RUL'], train_unit)
x_val = gen_data_wrapper(X_train_pre, sequence_length, sensors, val_unit)
y_val = gen_label_wrapper(X_train_pre, sequence_length, ['RUL'], val_unit)
# create sequences for test
test_gen = (list(gen_test_data(X_test_pre[X_test_pre['unit_nr']==unit_nr], sequence_length, sensors, -99.))
for unit_nr in X_test_pre['unit_nr'].unique())
x_test = np.concatenate(list(test_gen)).astype(np.float32)
return x_train, y_train, x_val, y_val, x_test, y_test['RemainingUsefulLife']
# ---------------------------------------------------------------------------------------------------
# --------------------------------------- TRAINING CALLBACKS ---------------------------------------
class save_latent_space_viz(Callback):
def __init__(self, model, data, target):
self.model = model
self.data = data
self.target = target
def on_train_begin(self, logs={}):
self.best_val_loss = 100000
def on_epoch_end(self, epoch, logs=None):
encoder = self.model.layers[0]
if logs.get('val_loss') < self.best_val_loss:
self.best_val_loss = logs.get('val_loss')
viz_latent_space(encoder, self.data, self.target, epoch, True, False)
def get_callbacks(model, data, target):
model_callbacks = [
EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=30),
ModelCheckpoint(filepath='./checkpoints/checkpoint',monitor='val_loss', mode='min', verbose=1, save_best_only=True, save_weights_only=True),
TensorBoard(log_dir='./logs'),
save_latent_space_viz(model, data, target)
]
return model_callbacks
def viz_latent_space(encoder, data, targets=[], epoch='Final', save=False, show=True):
z, _, _ = encoder.predict(data)
plt.figure(figsize=(8, 10))
if len(targets)>0:
plt.scatter(z[:, 0], z[:, 1], c=targets)
else:
plt.scatter(z[:, 0], z[:, 1])
plt.xlabel('z - dim 1')
plt.ylabel('z - dim 2')
plt.colorbar()
if show:
plt.show()
if save:
plt.savefig('./images/latent_space_epoch'+str(epoch)+'.png')
return z
# ---------------------------------------------------------------------------------------------------
# ----------------------------------------- FIND OPTIMAL LR ----------------------------------------
class LRFinder:
"""
Cyclical LR, code tailored from:
https://towardsdatascience.com/estimating-optimal-learning-rate-for-a-deep-neural-network-ce32f2556ce0
"""
def __init__(self, model):
self.model = model
self.losses = []
self.lrs = []
self.best_loss = 1e9
def on_batch_end(self, batch, logs):
# Log the learning rate
lr = K.get_value(self.model.optimizer.lr)
self.lrs.append(lr)
# Log the loss
loss = logs['loss']
self.losses.append(loss)
# Check whether the loss got too large or NaN
if batch > 5 and (math.isnan(loss) or loss > self.best_loss * 4):
self.model.stop_training = True
return
if loss < self.best_loss:
self.best_loss = loss
# Increase the learning rate for the next batch
lr *= self.lr_mult
K.set_value(self.model.optimizer.lr, lr)
def find(self, x_train, y_train, start_lr, end_lr, batch_size=64, epochs=1, **kw_fit):
# If x_train contains data for multiple inputs, use length of the first input.
# Assumption: the first element in the list is single input; NOT a list of inputs.
N = x_train[0].shape[0] if isinstance(x_train, list) else x_train.shape[0]
# Compute number of batches and LR multiplier
num_batches = epochs * N / batch_size
self.lr_mult = (float(end_lr) / float(start_lr)) ** (float(1) / float(num_batches))
# Save weights into a file
initial_weights = self.model.get_weights()
# Remember the original learning rate
original_lr = K.get_value(self.model.optimizer.lr)
# Set the initial learning rate
K.set_value(self.model.optimizer.lr, start_lr)
callback = LambdaCallback(on_batch_end=lambda batch, logs: self.on_batch_end(batch, logs))
self.model.fit(x_train, y_train,
batch_size=batch_size, epochs=epochs,
callbacks=[callback],
**kw_fit)
# Restore the weights to the state before model fitting
self.model.set_weights(initial_weights)
# Restore the original learning rate
K.set_value(self.model.optimizer.lr, original_lr)
def find_generator(self, generator, start_lr, end_lr, epochs=1, steps_per_epoch=None, **kw_fit):
if steps_per_epoch is None:
try:
steps_per_epoch = len(generator)
except (ValueError, NotImplementedError) as e:
raise e('`steps_per_epoch=None` is only valid for a'
' generator based on the '
'`keras.utils.Sequence`'
' class. Please specify `steps_per_epoch` '
'or use the `keras.utils.Sequence` class.')
self.lr_mult = (float(end_lr) / float(start_lr)) ** (float(1) / float(epochs * steps_per_epoch))
# Save weights into a file
initial_weights = self.model.get_weights()
# Remember the original learning rate
original_lr = K.get_value(self.model.optimizer.lr)
# Set the initial learning rate
K.set_value(self.model.optimizer.lr, start_lr)
callback = LambdaCallback(on_batch_end=lambda batch,
logs: self.on_batch_end(batch, logs))
self.model.fit_generator(generator=generator,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
callbacks=[callback],
**kw_fit)
# Restore the weights to the state before model fitting
self.model.set_weights(initial_weights)
# Restore the original learning rate
K.set_value(self.model.optimizer.lr, original_lr)
def plot_loss(self, n_skip_beginning=10, n_skip_end=5, x_scale='log'):
"""
Plots the loss.
Parameters:
n_skip_beginning - number of batches to skip on the left.
n_skip_end - number of batches to skip on the right.
"""
plt.ylabel('loss')
plt.xlabel('learning rate (log scale)')
plt.plot(self.lrs[n_skip_beginning:-n_skip_end], self.losses[n_skip_beginning:-n_skip_end])
plt.xscale(x_scale)
plt.show()
def plot_loss_change(self, sma=1, n_skip_beginning=10, n_skip_end=5, y_lim=(-0.01, 0.01)):
"""
Plots rate of change of the loss function.
Parameters:
sma - number of batches for simple moving average to smooth out the curve.
n_skip_beginning - number of batches to skip on the left.
n_skip_end - number of batches to skip on the right.
y_lim - limits for the y axis.
"""
derivatives = self.get_derivatives(sma)[n_skip_beginning:-n_skip_end]
lrs = self.lrs[n_skip_beginning:-n_skip_end]
plt.ylabel('rate of loss change')
plt.xlabel('learning rate (log scale)')
plt.plot(lrs, derivatives)
plt.xscale('log')
plt.ylim(y_lim)
plt.show()
def get_derivatives(self, sma):
assert sma >= 1
derivatives = [0] * sma
for i in range(sma, len(self.lrs)):
derivatives.append((self.losses[i] - self.losses[i - sma]) / sma)
return derivatives
def get_best_lr(self, sma, n_skip_beginning=10, n_skip_end=5):
derivatives = self.get_derivatives(sma)
best_der_idx = np.argmin(derivatives[n_skip_beginning:-n_skip_end])
return self.lrs[n_skip_beginning:-n_skip_end][best_der_idx]
# ---------------------------------------------------------------------------------------------------
# --------------------------------------------- RESULTS --------------------------------------------
def get_model(path):
saved_VRAE_model = load_model(path, compile=False)
# return encoder, regressor
return saved_VRAE_model.layers[1], saved_VRAE_model.layers[2]
def evaluate(y_true, y_hat, label='test'):
mse = mean_squared_error(y_true, y_hat)
rmse = np.sqrt(mse)
variance = r2_score(y_true, y_hat)
print('{} set RMSE:{}, R2:{}'.format(label, rmse, variance))
def score(y_true, y_hat):
res = 0
for true, hat in zip(y_true, y_hat):
subs = hat - true
if subs < 0:
res = res + np.exp(-subs/10)[0]-1
else:
res = res + np.exp(subs/13)[0]-1
print('score: ', res)
def results(path, x_train, y_train, x_test, y_test):
# Get model
encoder, regressor = get_model(path)
# Latent space
train_mu = viz_latent_space(encoder, x_train, y_train)
test_mu = viz_latent_space(encoder, x_test, y_test)
# Evaluate
y_hat_train = regressor.predict(train_mu)
y_hat_test = regressor.predict(test_mu)
evaluate(y_train, y_hat_train, 'train')
evaluate(y_test, y_hat_test, 'test')
score(y_test, y_hat_test)
原文地址: https://www.cveoy.top/t/topic/jPb5 著作权归作者所有。请勿转载和采集!