Jupyter Notebook 中的 'out' 是什么意思? 注释解释
Jupyter Notebook 中的 'out' 是什么意思? 注释解释
这个代码示例展示了如何使用 Python 在 Jupyter Notebook 中进行机器学习任务,并使用 TensorFlow 和 Keras 库来构建和训练模型。
代码注释
import math
import numpy as np
from sklearn.model_selection import GroupShuffleSplit
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras import backend as K
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint, TensorBoard, LambdaCallback
# --------------------------------------- DATA PRE-PROCESSING ---------------------------------------
def add_remaining_useful_life(df):
# 获取每个单元的总循环次数
grouped_by_unit = df.groupby(by="unit_nr")
max_cycle = grouped_by_unit["time_cycles"].max()
# 将最大循环次数合并回原始数据框
result_frame = df.merge(max_cycle.to_frame(name='max_cycle'), left_on='unit_nr', right_index=True)
# 计算每行的剩余使用寿命
remaining_useful_life = result_frame["max_cycle"] - result_frame["time_cycles"]
result_frame["RUL"] = remaining_useful_life
# 删除不再需要的 'max_cycle' 列
result_frame = result_frame.drop("max_cycle", axis=1)
return result_frame
def add_operating_condition(df):
df_op_cond = df.copy()
df_op_cond['setting_1'] = abs(df_op_cond['setting_1'].round())
df_op_cond['setting_2'] = abs(df_op_cond['setting_2'].round(decimals=2))
# 将设置转换为字符串并连接,将操作条件变成一个分类变量
df_op_cond['op_cond'] = df_op_cond['setting_1'].astype(str) + '_' + \
df_op_cond['setting_2'].astype(str) + '_' + \
df_op_cond['setting_3'].astype(str)
return df_op_cond
def condition_scaler(df_train, df_test, sensor_names):
# 应用特定于操作条件的缩放
scaler = StandardScaler()
for condition in df_train['op_cond'].unique():
scaler.fit(df_train.loc[df_train['op_cond']==condition, sensor_names])
df_train.loc[df_train['op_cond']==condition, sensor_names] = scaler.transform(df_train.loc[df_train['op_cond']==condition, sensor_names])
df_test.loc[df_test['op_cond']==condition, sensor_names] = scaler.transform(df_test.loc[df_test['op_cond']==condition, sensor_names])
return df_train, df_test
def exponential_smoothing(df, sensors, n_samples, alpha=0.4):
df = df.copy()
# 首先,进行指数加权平均
df[sensors] = df.groupby('unit_nr')[sensors].apply(lambda x: x.ewm(alpha=alpha).mean()).reset_index(level=0, drop=True)
# 其次,删除每个 'unit_nr' 的前 'n_samples' 个样本,以减少滤波延迟
def create_mask(data, samples):
result = np.ones_like(data)
result[0:samples] = 0
return result
mask = df.groupby('unit_nr')['unit_nr'].transform(create_mask, samples=n_samples).astype(bool)
df = df[mask]
return df
def gen_train_data(df, sequence_length, columns):
data = df[columns].values
num_elements = data.shape[0]
# -1 和 +1 是因为 Python 的索引从 0 开始
for start, stop in zip(range(0, num_elements-(sequence_length-1)), range(sequence_length, num_elements+1)):
yield data[start:stop, :]
def gen_data_wrapper(df, sequence_length, columns, unit_nrs=np.array([])):
if unit_nrs.size <= 0:
unit_nrs = df['unit_nr'].unique()
data_gen = (list(gen_train_data(df[df['unit_nr']==unit_nr], sequence_length, columns))
for unit_nr in unit_nrs)
data_array = np.concatenate(list(data_gen)).astype(np.float32)
return data_array
def gen_labels(df, sequence_length, label):
data_matrix = df[label].values
num_elements = data_matrix.shape[0]
# -1 是因为我们想要预测序列中最后一行数据的 'rul',而不是下一行数据
return data_matrix[sequence_length-1:num_elements, :]
def gen_label_wrapper(df, sequence_length, label, unit_nrs=np.array([])):
if unit_nrs.size <= 0:
unit_nrs = df['unit_nr'].unique()
label_gen = [gen_labels(df[df['unit_nr']==unit_nr], sequence_length, label)
for unit_nr in unit_nrs]
label_array = np.concatenate(label_gen).astype(np.float32)
return label_array
def gen_test_data(df, sequence_length, columns, mask_value):
if df.shape[0] < sequence_length:
data_matrix = np.full(shape=(sequence_length, len(columns)), fill_value=mask_value) # 填充
idx = data_matrix.shape[0] - df.shape[0]
data_matrix[idx:,:] = df[columns].values # 用可用数据填充
else:
data_matrix = df[columns].values
# 特别地,产生最后可能的序列
stop = data_matrix.shape[0]
start = stop - sequence_length
for i in list(range(1)):
yield data_matrix[start:stop, :]
def get_data(dataset, sensors, sequence_length, alpha, threshold):
# 文件
dir_path = './data/'
train_file = 'train_'+dataset+'.txt'
test_file = 'test_'+dataset+'.txt'
# 列
index_names = ['unit_nr', 'time_cycles']
setting_names = ['setting_1', 'setting_2', 'setting_3']
sensor_names = ['s_{}'.format(i+1) for i in range(0,21)]
col_names = index_names + setting_names + sensor_names
# 数据读取
train = pd.read_csv((dir_path+train_file), sep=r'\s+', header=None,
names=col_names)
test = pd.read_csv((dir_path+test_file), sep=r'\s+', header=None,
names=col_names)
y_test = pd.read_csv((dir_path+'RUL_'+dataset+'.txt'), sep=r'\s+', header=None,
names=['RemainingUsefulLife'])
# 根据分段目标函数创建 RUL 值
train = add_remaining_useful_life(train)
train['RUL'].clip(upper=threshold, inplace=True)
# 删除未使用的传感器
drop_sensors = [element for element in sensor_names if element not in sensors]
# 根据操作条件进行缩放
X_train_pre = add_operating_condition(train.drop(drop_sensors, axis=1))
X_test_pre = add_operating_condition(test.drop(drop_sensors, axis=1))
X_train_pre, X_test_pre = condition_scaler(X_train_pre, X_test_pre, sensors)
# 指数平滑
X_train_pre= exponential_smoothing(X_train_pre, sensors, 0, alpha)
X_test_pre = exponential_smoothing(X_test_pre, sensors, 0, alpha)
# 训练-验证拆分
gss = GroupShuffleSplit(n_splits=1, train_size=0.80, random_state=42)
# 为每个样本生成训练/验证数据 - 为了做到这一点,我们迭代我们想要的训练和验证单元
# 这是一个只迭代一次的循环,并且在这个迭代中同时迭代了我们想要的所有值,
# 也就是说,'train_unit' 和 'val_unit' 不是单个值,而是一组训练/验证单元
for train_unit, val_unit in gss.split(X_train_pre['unit_nr'].unique(), groups=X_train_pre['unit_nr'].unique()):
train_unit = X_train_pre['unit_nr'].unique()[train_unit] # 'gss' 返回索引,索引从 1 开始
val_unit = X_train_pre['unit_nr'].unique()[val_unit]
x_train = gen_data_wrapper(X_train_pre, sequence_length, sensors, train_unit)
y_train = gen_label_wrapper(X_train_pre, sequence_length, ['RUL'], train_unit)
x_val = gen_data_wrapper(X_train_pre, sequence_length, sensors, val_unit)
y_val = gen_label_wrapper(X_train_pre, sequence_length, ['RUL'], val_unit)
# 为测试创建序列
test_gen = (list(gen_test_data(X_test_pre[X_test_pre['unit_nr']==unit_nr], sequence_length, sensors, -99.))
for unit_nr in X_test_pre['unit_nr'].unique())
x_test = np.concatenate(list(test_gen)).astype(np.float32)
return x_train, y_train, x_val, y_val, x_test, y_test['RemainingUsefulLife']
# ---------------------------------------------------------------------------------------------------
# --------------------------------------- TRAINING CALLBACKS ---------------------------------------
class save_latent_space_viz(Callback):
def __init__(self, model, data, target):
self.model = model
self.data = data
self.target = target
def on_train_begin(self, logs={}):
self.best_val_loss = 100000
def on_epoch_end(self, epoch, logs=None):
encoder = self.model.layers[0]
if logs.get('val_loss') < self.best_val_loss:
self.best_val_loss = logs.get('val_loss')
viz_latent_space(encoder, self.data, self.target, epoch, True, False)
def get_callbacks(model, data, target):
model_callbacks = [
EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=30),
ModelCheckpoint(filepath='./checkpoints/checkpoint',monitor='val_loss', mode='min', verbose=1, save_best_only=True, save_weights_only=True),
TensorBoard(log_dir='./logs'),
save_latent_space_viz(model, data, target)
]
return model_callbacks
def viz_latent_space(encoder, data, targets=[], epoch='Final', save=False, show=True):
z, _, _ = encoder.predict(data)
plt.figure(figsize=(8, 10))
if len(targets)>0:
plt.scatter(z[:, 0], z[:, 1], c=targets)
else:
plt.scatter(z[:, 0], z[:, 1])
plt.xlabel('z - dim 1')
plt.ylabel('z - dim 2')
plt.colorbar()
if show:
plt.show()
if save:
plt.savefig('./images/latent_space_epoch'+str(epoch)+'.png')
return z
# ---------------------------------------------------------------------------------------------------
代码说明:
- 代码中展示了一个数据预处理的函数,
add_remaining_useful_life()和add_operating_condition()。 - 代码还包括了几个数据生成函数,如
gen_data_wrapper()和gen_label_wrapper()。 get_data()函数是主要函数,它读取数据,执行预处理,并生成训练、验证和测试数据。get_callbacks()函数用于生成 Keras 模型训练的回调列表。save_latent_space_viz()类是一个自定义回调,它在每次迭代中保存潜在空间的可视化,前提是验证损失有所改善。viz_latent_space()函数是save_latent_space_viz()的辅助函数,用于可视化潜在空间。
Jupyter Notebook 中的 'out':
在 Jupyter Notebook 中,'out' 代表输出结果。每个代码块执行后,Jupyter Notebook 会在该代码块下方生成一个 'out' 变量,用于存储代码块的输出结果。例如,如果代码块执行的是一个 print() 语句,那么 'out' 变量将存储打印的内容。
总结
这段代码展示了一个基本的机器学习流程,涵盖了数据预处理、数据生成、模型训练和评估。理解代码中的注释,可以帮助你更深入地了解代码的逻辑和功能。
提示:
- 如果你想在 Jupyter Notebook 中运行代码,需要先安装必要的库,如 TensorFlow、Keras、numpy 和 pandas。
- 可以使用
!pip install <library_name>命令安装库。 - 确保你已经准备好了用于训练和测试的机器学习数据集。
原文地址: https://www.cveoy.top/t/topic/jPcI 著作权归作者所有。请勿转载和采集!