Jupyter Notebook 中的 'out' 是什么意思? 注释解释

这个代码示例展示了如何使用 Python 在 Jupyter Notebook 中进行机器学习任务,并使用 TensorFlow 和 Keras 库来构建和训练模型。

代码注释

import math
import numpy as np
from sklearn.model_selection import GroupShuffleSplit
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras import backend as K
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint, TensorBoard, LambdaCallback


# --------------------------------------- DATA PRE-PROCESSING ---------------------------------------
def add_remaining_useful_life(df):
    # 获取每个单元的总循环次数
    grouped_by_unit = df.groupby(by="unit_nr")
    max_cycle = grouped_by_unit["time_cycles"].max()
    
    # 将最大循环次数合并回原始数据框
    result_frame = df.merge(max_cycle.to_frame(name='max_cycle'), left_on='unit_nr', right_index=True)
    
    # 计算每行的剩余使用寿命
    remaining_useful_life = result_frame["max_cycle"] - result_frame["time_cycles"]
    result_frame["RUL"] = remaining_useful_life
    
    # 删除不再需要的 'max_cycle' 列
    result_frame = result_frame.drop("max_cycle", axis=1)
    return result_frame

def add_operating_condition(df):
    df_op_cond = df.copy()
    
    df_op_cond['setting_1'] = abs(df_op_cond['setting_1'].round())
    df_op_cond['setting_2'] = abs(df_op_cond['setting_2'].round(decimals=2))
    
    # 将设置转换为字符串并连接,将操作条件变成一个分类变量
    df_op_cond['op_cond'] = df_op_cond['setting_1'].astype(str) + '_' + \
                        df_op_cond['setting_2'].astype(str) + '_' + \
                        df_op_cond['setting_3'].astype(str)
    
    return df_op_cond

def condition_scaler(df_train, df_test, sensor_names):
    # 应用特定于操作条件的缩放
    scaler = StandardScaler()
    for condition in df_train['op_cond'].unique():
        scaler.fit(df_train.loc[df_train['op_cond']==condition, sensor_names])
        df_train.loc[df_train['op_cond']==condition, sensor_names] = scaler.transform(df_train.loc[df_train['op_cond']==condition, sensor_names])
        df_test.loc[df_test['op_cond']==condition, sensor_names] = scaler.transform(df_test.loc[df_test['op_cond']==condition, sensor_names])
    return df_train, df_test

def exponential_smoothing(df, sensors, n_samples, alpha=0.4):
    df = df.copy()
    # 首先,进行指数加权平均
    df[sensors] = df.groupby('unit_nr')[sensors].apply(lambda x: x.ewm(alpha=alpha).mean()).reset_index(level=0, drop=True)
    
    # 其次,删除每个 'unit_nr' 的前 'n_samples' 个样本,以减少滤波延迟
    def create_mask(data, samples):
        result = np.ones_like(data)
        result[0:samples] = 0
        return result
    
    mask = df.groupby('unit_nr')['unit_nr'].transform(create_mask, samples=n_samples).astype(bool)
    df = df[mask]
    
    return df

def gen_train_data(df, sequence_length, columns):
    data = df[columns].values
    num_elements = data.shape[0]

    # -1 和 +1 是因为 Python 的索引从 0 开始
    for start, stop in zip(range(0, num_elements-(sequence_length-1)), range(sequence_length, num_elements+1)):
        yield data[start:stop, :]
        
def gen_data_wrapper(df, sequence_length, columns, unit_nrs=np.array([])):
    if unit_nrs.size <= 0:
        unit_nrs = df['unit_nr'].unique()
        
    data_gen = (list(gen_train_data(df[df['unit_nr']==unit_nr], sequence_length, columns))
               for unit_nr in unit_nrs)
    data_array = np.concatenate(list(data_gen)).astype(np.float32)
    return data_array

def gen_labels(df, sequence_length, label):
    data_matrix = df[label].values
    num_elements = data_matrix.shape[0]

    # -1 是因为我们想要预测序列中最后一行数据的 'rul',而不是下一行数据
    return data_matrix[sequence_length-1:num_elements, :]
  
def gen_label_wrapper(df, sequence_length, label, unit_nrs=np.array([])):
    if unit_nrs.size <= 0:
        unit_nrs = df['unit_nr'].unique()
        
    label_gen = [gen_labels(df[df['unit_nr']==unit_nr], sequence_length, label) 
                for unit_nr in unit_nrs]
    label_array = np.concatenate(label_gen).astype(np.float32)
    return label_array

def gen_test_data(df, sequence_length, columns, mask_value):
    if df.shape[0] < sequence_length:
        data_matrix = np.full(shape=(sequence_length, len(columns)), fill_value=mask_value) # 填充
        idx = data_matrix.shape[0] - df.shape[0]
        data_matrix[idx:,:] = df[columns].values  # 用可用数据填充
    else:
        data_matrix = df[columns].values
        
    # 特别地,产生最后可能的序列
    stop = data_matrix.shape[0]
    start = stop - sequence_length
    for i in list(range(1)):
        yield data_matrix[start:stop, :]
        
	
def get_data(dataset, sensors, sequence_length, alpha, threshold):
	# 文件
	dir_path = './data/'
	train_file = 'train_'+dataset+'.txt'
	test_file = 'test_'+dataset+'.txt'
    # 列
	index_names = ['unit_nr', 'time_cycles']
	setting_names = ['setting_1', 'setting_2', 'setting_3']
	sensor_names = ['s_{}'.format(i+1) for i in range(0,21)]
	col_names = index_names + setting_names + sensor_names
    # 数据读取
	train = pd.read_csv((dir_path+train_file), sep=r'\s+', header=None, 
				 names=col_names)
	test = pd.read_csv((dir_path+test_file), sep=r'\s+', header=None, 
				 names=col_names)
	y_test = pd.read_csv((dir_path+'RUL_'+dataset+'.txt'), sep=r'\s+', header=None, 
				 names=['RemainingUsefulLife'])

    # 根据分段目标函数创建 RUL 值
	train = add_remaining_useful_life(train)
	train['RUL'].clip(upper=threshold, inplace=True)

    # 删除未使用的传感器
	drop_sensors = [element for element in sensor_names if element not in sensors]

    # 根据操作条件进行缩放
	X_train_pre = add_operating_condition(train.drop(drop_sensors, axis=1))
	X_test_pre = add_operating_condition(test.drop(drop_sensors, axis=1))
	X_train_pre, X_test_pre = condition_scaler(X_train_pre, X_test_pre, sensors)

    # 指数平滑
	X_train_pre= exponential_smoothing(X_train_pre, sensors, 0, alpha)
	X_test_pre = exponential_smoothing(X_test_pre, sensors, 0, alpha)

	# 训练-验证拆分
	gss = GroupShuffleSplit(n_splits=1, train_size=0.80, random_state=42)
	# 为每个样本生成训练/验证数据 - 为了做到这一点,我们迭代我们想要的训练和验证单元
	# 这是一个只迭代一次的循环,并且在这个迭代中同时迭代了我们想要的所有值,
	# 也就是说,'train_unit' 和 'val_unit' 不是单个值,而是一组训练/验证单元
	for train_unit, val_unit in gss.split(X_train_pre['unit_nr'].unique(), groups=X_train_pre['unit_nr'].unique()): 
		train_unit = X_train_pre['unit_nr'].unique()[train_unit]  # 'gss' 返回索引,索引从 1 开始
		val_unit = X_train_pre['unit_nr'].unique()[val_unit]

		x_train = gen_data_wrapper(X_train_pre, sequence_length, sensors, train_unit)
		y_train = gen_label_wrapper(X_train_pre, sequence_length, ['RUL'], train_unit)
		
		x_val = gen_data_wrapper(X_train_pre, sequence_length, sensors, val_unit)
		y_val = gen_label_wrapper(X_train_pre, sequence_length, ['RUL'], val_unit)

	# 为测试创建序列
	test_gen = (list(gen_test_data(X_test_pre[X_test_pre['unit_nr']==unit_nr], sequence_length, sensors, -99.))
			   for unit_nr in X_test_pre['unit_nr'].unique())
	x_test = np.concatenate(list(test_gen)).astype(np.float32)
	
	return x_train, y_train, x_val, y_val, x_test, y_test['RemainingUsefulLife']
# ---------------------------------------------------------------------------------------------------


# --------------------------------------- TRAINING CALLBACKS  ---------------------------------------
class save_latent_space_viz(Callback):
	def __init__(self, model, data, target):
		self.model = model
		self.data = data
		self.target = target
	
	def on_train_begin(self, logs={}):
		self.best_val_loss = 100000
		
	def on_epoch_end(self, epoch, logs=None):
		encoder = self.model.layers[0]
		if logs.get('val_loss') < self.best_val_loss:
			self.best_val_loss = logs.get('val_loss')
			viz_latent_space(encoder, self.data, self.target, epoch, True, False)
	

def get_callbacks(model, data, target):
	model_callbacks = [
		EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=30),
        ModelCheckpoint(filepath='./checkpoints/checkpoint',monitor='val_loss', mode='min', verbose=1, save_best_only=True, save_weights_only=True),
		TensorBoard(log_dir='./logs'),
        save_latent_space_viz(model, data, target)
	]
	return model_callbacks

def viz_latent_space(encoder, data, targets=[], epoch='Final', save=False, show=True):
    z, _, _  = encoder.predict(data)
    plt.figure(figsize=(8, 10))
    if len(targets)>0:
        plt.scatter(z[:, 0], z[:, 1], c=targets)
    else:
        plt.scatter(z[:, 0], z[:, 1])
    plt.xlabel('z - dim 1')
    plt.ylabel('z - dim 2')
    plt.colorbar()
    if show:
        plt.show()
    if save:
        plt.savefig('./images/latent_space_epoch'+str(epoch)+'.png')
    return z
# ---------------------------------------------------------------------------------------------------

代码说明:

  • 代码中展示了一个数据预处理的函数,add_remaining_useful_life()add_operating_condition()
  • 代码还包括了几个数据生成函数,如 gen_data_wrapper()gen_label_wrapper()
  • get_data() 函数是主要函数,它读取数据,执行预处理,并生成训练、验证和测试数据。
  • get_callbacks() 函数用于生成 Keras 模型训练的回调列表。
  • save_latent_space_viz() 类是一个自定义回调,它在每次迭代中保存潜在空间的可视化,前提是验证损失有所改善。
  • viz_latent_space() 函数是 save_latent_space_viz() 的辅助函数,用于可视化潜在空间。

Jupyter Notebook 中的 'out':

在 Jupyter Notebook 中,'out' 代表输出结果。每个代码块执行后,Jupyter Notebook 会在该代码块下方生成一个 'out' 变量,用于存储代码块的输出结果。例如,如果代码块执行的是一个 print() 语句,那么 'out' 变量将存储打印的内容。

总结

这段代码展示了一个基本的机器学习流程,涵盖了数据预处理、数据生成、模型训练和评估。理解代码中的注释,可以帮助你更深入地了解代码的逻辑和功能。

提示:

  • 如果你想在 Jupyter Notebook 中运行代码,需要先安装必要的库,如 TensorFlow、Keras、numpy 和 pandas。
  • 可以使用 !pip install <library_name> 命令安装库。
  • 确保你已经准备好了用于训练和测试的机器学习数据集。
Jupyter Notebook 中的 'out' 是什么意思? 注释解释

原文地址: https://www.cveoy.top/t/topic/jPcI 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录